Skip to main content

Snowflake with Pandas (dagster-snowflake-pandas)

This library provides an integration with the Snowflake data warehouse and Pandas data processing library.

To use this library, you should first ensure that you have an appropriate Snowflake user configured to access your data warehouse.

Related Guides:

dagster_snowflake_pandas.SnowflakePandasIOManager IOManagerDefinition

An I/O manager definition that reads inputs from and writes Pandas DataFrames to Snowflake. When using the SnowflakePandasIOManager, any inputs and outputs without type annotations will be loaded as Pandas DataFrames.

Returns: IOManagerDefinition Examples:

from dagster_snowflake_pandas import SnowflakePandasIOManager
from dagster import asset, Definitions, EnvVar

@asset(
key_prefix=["my_schema"] # will be used as the schema in snowflake
)
def my_table() -> pd.DataFrame: # the name of the asset will be the table name
...

defs = Definitions(
assets=[my_table],
resources=\{
"io_manager": SnowflakePandasIOManager(database="MY_DATABASE", account=EnvVar("SNOWFLAKE_ACCOUNT"), ...)
}
)

You can set a default schema to store the assets using the schema configuration value of the Snowflake I/O Manager. This schema will be used if no other schema is specified directly on an asset or op.

defs = Definitions(
assets=[my_table]
resources=\{
"io_manager" SnowflakePandasIOManager(database="my_database", schema="my_schema", ...)
}
)

On individual assets, you an also specify the schema where they should be stored using metadata or by adding a key_prefix to the asset key. If both key_prefix and metadata are defined, the metadata will take precedence.

@asset(
key_prefix=["my_schema"] # will be used as the schema in snowflake
)
def my_table() -> pd.DataFrame:
...

@asset(
metadata=\{"schema": "my_schema"} # will be used as the schema in snowflake
)
def my_other_table() -> pd.DataFrame:
...

For ops, the schema can be specified by including a “schema” entry in output metadata.

@op(
out=\{"my_table": Out(metadata=\{"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
...

If none of these is provided, the schema will default to “public”.

To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.

@asset(
ins=\{"my_table": AssetIn("my_table", metadata=\{"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame:
# my_table will just contain the data from column "a"
...
class dagster_snowflake_pandas.SnowflakePandasTypeHandler

Plugin for the Snowflake I/O Manager that can store and load Pandas DataFrames as Snowflake tables.

Examples:

from dagster_snowflake import SnowflakeIOManager
from dagster_snowflake_pandas import SnowflakePandasTypeHandler
from dagster_snowflake_pyspark import SnowflakePySparkTypeHandler
from dagster import Definitions, EnvVar

class MySnowflakeIOManager(SnowflakeIOManager):
@staticmethod
def type_handlers() -> Sequence[DbTypeHandler]:
return [SnowflakePandasTypeHandler(), SnowflakePySparkTypeHandler()]

@asset(
key_prefix=["my_schema"] # will be used as the schema in snowflake
)
def my_table() -> pd.DataFrame: # the name of the asset will be the table name
...

defs = Definitions(
assets=[my_table],
resources=\{
"io_manager": MySnowflakeIOManager(database="MY_DATABASE", account=EnvVar("SNOWFLAKE_ACCOUNT"), ...)
}
)

Legacy

dagster_snowflake_pandas.snowflake_pandas_io_manager IOManagerDefinition

An I/O manager definition that reads inputs from and writes Pandas DataFrames to Snowflake. When using the snowflake_pandas_io_manager, any inputs and outputs without type annotations will be loaded as Pandas DataFrames.

Returns: IOManagerDefinition Examples:

from dagster_snowflake_pandas import snowflake_pandas_io_manager
from dagster import asset, Definitions

@asset(
key_prefix=["my_schema"] # will be used as the schema in snowflake
)
def my_table() -> pd.DataFrame: # the name of the asset will be the table name
...

defs = Definitions(
assets=[my_table],
resources=\{
"io_manager": snowflake_pandas_io_manager.configured(\{
"database": "my_database",
"account" : \{"env": "SNOWFLAKE_ACCOUNT"}
...
})
}
)

You can set a default schema to store the assets using the schema configuration value of the Snowflake I/O Manager. This schema will be used if no other schema is specified directly on an asset or op.

defs = Definitions(
assets=[my_table]
resources=\{"io_manager" snowflake_pandas_io_manager.configured(
\{"database": "my_database", "schema": "my_schema", ...} # will be used as the schema
)}
)

On individual assets, you an also specify the schema where they should be stored using metadata or by adding a key_prefix to the asset key. If both key_prefix and metadata are defined, the metadata will take precedence.

@asset(
key_prefix=["my_schema"] # will be used as the schema in snowflake
)
def my_table() -> pd.DataFrame:
...

@asset(
metadata=\{"schema": "my_schema"} # will be used as the schema in snowflake
)
def my_other_table() -> pd.DataFrame:
...

For ops, the schema can be specified by including a “schema” entry in output metadata.

@op(
out=\{"my_table": Out(metadata=\{"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
...

If none of these is provided, the schema will default to “public”.

To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.

@asset(
ins=\{"my_table": AssetIn("my_table", metadata=\{"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame:
# my_table will just contain the data from column "a"
...