Snowflake (dagster-snowflake)
This library provides an integration with the Snowflake data warehouse.
To use this library, you should first ensure that you have an appropriate Snowflake user configured to access your data warehouse.
Related Guides:
- Using Dagster with Snowflake
- Snowflake I/O manager reference
- Transitioning data pipelines from development to production
- Testing against production with Dagster+ Branch Deployments
I/O Manager
- dagster_snowflake.SnowflakeIOManager IOManagerDefinition
Base class for an IO manager definition that reads inputs from and writes outputs to Snowflake.
Examples:
from dagster_snowflake import SnowflakeIOManager
from dagster_snowflake_pandas import SnowflakePandasTypeHandler
from dagster_snowflake_pyspark import SnowflakePySparkTypeHandler
from dagster import Definitions, EnvVar
class MySnowflakeIOManager(SnowflakeIOManager):
@staticmethod
def type_handlers() -> Sequence[DbTypeHandler]:
return [SnowflakePandasTypeHandler(), SnowflakePySparkTypeHandler()]
@asset(
key_prefix=["my_schema"] # will be used as the schema in snowflake
)
def my_table() -> pd.DataFrame: # the name of the asset will be the table name
...
defs = Definitions(
assets=[my_table],
resources=\{
"io_manager": MySnowflakeIOManager(database="my_database", account=EnvVar("SNOWFLAKE_ACCOUNT"), ...)
}
)You can set a default schema to store the assets using the
schema
configuration value of the Snowflake I/O Manager. This schema will be used if no other schema is specified directly on an asset or op.defs = Definitions(
assets=[my_table]
resources=\{
"io_manager" MySnowflakeIOManager(database="my_database", schema="my_schema", ...)
}
)On individual assets, you an also specify the schema where they should be stored using metadata or by adding a
key_prefix
to the asset key. If bothkey_prefix
and metadata are defined, the metadata will take precedence.@asset(
key_prefix=["my_schema"] # will be used as the schema in snowflake
)
def my_table() -> pd.DataFrame:
...
@asset(
metadata=\{"schema": "my_schema"} # will be used as the schema in snowflake
)
def my_other_table() -> pd.DataFrame:
...For ops, the schema can be specified by including a “schema” entry in output metadata.
@op(
out=\{"my_table": Out(metadata=\{"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
...If none of these is provided, the schema will default to “public”.
To only use specific columns of a table as input to a downstream op or asset, add the metadata
columns
to the In or AssetIn.@asset(
ins=\{"my_table": AssetIn("my_table", metadata=\{"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame:
# my_table will just contain the data from column "a"
...
Resource
- dagster_snowflake.SnowflakeResource ResourceDefinition
A resource for connecting to the Snowflake data warehouse.
If connector configuration is not set, SnowflakeResource.get_connection() will return a snowflake.connector.Connection object. If connector=”sqlalchemy” configuration is set, then SnowflakeResource.get_connection() will return a SQLAlchemy Connection or a SQLAlchemy raw connection.
A simple example of loading data into Snowflake and subsequently querying that data is shown below:
Examples:
from dagster import job, op
from dagster_snowflake import SnowflakeResource
@op
def get_one(snowflake_resource: SnowflakeResource):
with snowflake_resource.get_connection() as conn:
# conn is a snowflake.connector.Connection object
conn.cursor().execute("SELECT 1")
@job
def my_snowflake_job():
get_one()
my_snowflake_job.execute_in_process(
resources=\{
'snowflake_resource': SnowflakeResource(
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD")
database="MY_DATABASE",
schema="MY_SCHEMA",
warehouse="MY_WAREHOUSE"
)
}
)
- class dagster_snowflake.SnowflakeConnection
A connection to Snowflake that can execute queries. In general this class should not be directly instantiated, but rather used as a resource in an op or asset via the snowflake_resource()
snowflake_resource()
.Note that the SnowflakeConnection is only used by the snowflake_resource. The Pythonic SnowflakeResource does not use this SnowflakeConnection class.
- execute_queries
Execute multiple queries in Snowflake.
Parameters:
- sql_queries (str) – List of queries to be executed in series
- parameters (Optional[Union[Sequence[Any], Mapping[Any, Any]]]) – Parameters to be passed to every query. See the
- fetch_results (bool) – If True, will return the results of the queries as a list. Defaults to False. If True
- use_pandas_result (bool) – If True, will return the results of the queries as a list of a Pandas DataFrames.
Returns: The results of the queries as a list if fetch_results or use_pandas_result is True, otherwise returns None Examples:
@op
def create_fresh_database(snowflake: SnowflakeResource):
queries = ["DROP DATABASE IF EXISTS MY_DATABASE", "CREATE DATABASE MY_DATABASE"]
snowflake.execute_queries(
sql_queries=queries
)
- execute_query
Execute a query in Snowflake.
Parameters:
- sql (str) – the query to be executed
- parameters (Optional[Union[Sequence[Any], Mapping[Any, Any]]]) – Parameters to be passed to the query. See the
- fetch_results (bool) – If True, will return the result of the query. Defaults to False. If True
- use_pandas_result (bool) – If True, will return the result of the query as a Pandas DataFrame.
Returns: The result of the query if fetch_results or use_pandas_result is True, otherwise returns None Examples:
@op
def drop_database(snowflake: SnowflakeResource):
snowflake.execute_query(
"DROP DATABASE IF EXISTS MY_DATABASE"
)
- get_connection
Gets a connection to Snowflake as a context manager.
If using the execute_query, execute_queries, or load_table_from_local_parquet methods, you do not need to create a connection using this context manager.
Parameters: raw_conn (bool) – If using the sqlalchemy connector, you can set raw_conn to True to create a raw connection. Defaults to True. Examples:
@op(
required_resource_keys=\{"snowflake"}
)
def get_query_status(query_id):
with context.resources.snowflake.get_connection() as conn:
# conn is a Snowflake Connection object or a SQLAlchemy Connection if
# sqlalchemy is specified as the connector in the Snowflake Resource config
return conn.get_query_status(query_id)
- load_table_from_local_parquet
Stores the content of a parquet file to a Snowflake table.
Parameters:
- src (str) – the name of the file to store in Snowflake
- table (str) – the name of the table to store the data. If the table does not exist, it will
Examples:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
@op
def write_parquet_file(snowflake: SnowflakeResource):
df = pd.DataFrame(\{"one": [1, 2, 3], "ten": [11, 12, 13]})
table = pa.Table.from_pandas(df)
pq.write_table(table, "example.parquet')
snowflake.load_table_from_local_parquet(
src="example.parquet",
table="MY_TABLE"
)
Data Freshness
- dagster_snowflake.fetch_last_updated_timestamps
Fetch the last updated times of a list of tables in Snowflake.
If the underlying query to fetch the last updated time returns no results, a ValueError will be raised.
Parameters:
- snowflake_connection (Union[SqlDbConnection, SnowflakeConnectionSnowflakeConnection]) – A connection to Snowflake.
- schema (str) – The schema of the tables to fetch the last updated time for.
- tables (Sequence[str]) – A list of table names to fetch the last updated time for.
- database (Optional[str]) – The database of the table. Only required if the connection
- ignore_missing_tables (Optional[bool]) – If True, tables not found in Snowflake
Returns: A dictionary of table names to their last updated time in UTC.Return type: Mapping[str, datetime]
Ops
- dagster_snowflake.snowflake_op_for_query
This function is an op factory that constructs an op to execute a snowflake query.
Note that you can only use snowflake_op_for_query if you know the query you’d like to execute at graph construction time. If you’d like to execute queries dynamically during job execution, you should manually execute those queries in your custom op using the snowflake resource.
Parameters:
- sql (str) – The sql query that will execute against the provided snowflake resource.
- parameters (dict) – The parameters for the sql query.
Returns: Returns the constructed op definition.Return type: OpDefinition
Legacy
- dagster_snowflake.build_snowflake_io_manager IOManagerDefinition
Builds an IO manager definition that reads inputs from and writes outputs to Snowflake.
Parameters:
- type_handlers (Sequence[DbTypeHandler]) – Each handler defines how to translate between
- default_load_type (Type) – When an input has no type annotation, load it as this type.
Returns: IOManagerDefinition Examples:
from dagster_snowflake import build_snowflake_io_manager
from dagster_snowflake_pandas import SnowflakePandasTypeHandler
from dagster_snowflake_pyspark import SnowflakePySparkTypeHandler
from dagster import Definitions
@asset(
key_prefix=["my_prefix"]
metadata=\{"schema": "my_schema"} # will be used as the schema in snowflake
)
def my_table() -> pd.DataFrame: # the name of the asset will be the table name
...
@asset(
key_prefix=["my_schema"] # will be used as the schema in snowflake
)
def my_second_table() -> pd.DataFrame: # the name of the asset will be the table name
...
snowflake_io_manager = build_snowflake_io_manager([SnowflakePandasTypeHandler(), SnowflakePySparkTypeHandler()])
defs = Definitions(
assets=[my_table, my_second_table],
resources=\{
"io_manager": snowflake_io_manager.configured(\{
"database": "my_database",
"account" : \{"env": "SNOWFLAKE_ACCOUNT"}
...
})
}
)You can set a default schema to store the assets using the
schema
configuration value of the Snowflake I/O Manager. This schema will be used if no other schema is specified directly on an asset or op.defs = Definitions(
assets=[my_table]
resources=\{"io_manager" snowflake_io_manager.configured(
\{"database": "my_database", "schema": "my_schema", ...} # will be used as the schema
)}
)On individual assets, you an also specify the schema where they should be stored using metadata or by adding a
key_prefix
to the asset key. If bothkey_prefix
and metadata are defined, the metadata will take precedence.@asset(
key_prefix=["my_schema"] # will be used as the schema in snowflake
)
def my_table() -> pd.DataFrame:
...
@asset(
metadata=\{"schema": "my_schema"} # will be used as the schema in snowflake
)
def my_other_table() -> pd.DataFrame:
...For ops, the schema can be specified by including a “schema” entry in output metadata.
@op(
out=\{"my_table": Out(metadata=\{"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
...If none of these is provided, the schema will default to “public”.
To only use specific columns of a table as input to a downstream op or asset, add the metadata
columns
to the In or AssetIn.@asset(
ins=\{"my_table": AssetIn("my_table", metadata=\{"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame:
# my_table will just contain the data from column "a"
...
- dagster_snowflake.snowflake_resource ResourceDefinition
A resource for connecting to the Snowflake data warehouse. The returned resource object is an instance of SnowflakeConnection
SnowflakeConnection
.A simple example of loading data into Snowflake and subsequently querying that data is shown below:
Examples:
from dagster import job, op
from dagster_snowflake import snowflake_resource
@op(required_resource_keys=\{'snowflake'})
def get_one(context):
context.resources.snowflake.execute_query('SELECT 1')
@job(resource_defs=\{'snowflake': snowflake_resource})
def my_snowflake_job():
get_one()
my_snowflake_job.execute_in_process(
run_config=\{
'resources': \{
'snowflake': \{
'config': \{
'account': \{'env': 'SNOWFLAKE_ACCOUNT'},
'user': \{'env': 'SNOWFLAKE_USER'},
'password': \{'env': 'SNOWFLAKE_PASSWORD'},
'database': \{'env': 'SNOWFLAKE_DATABASE'},
'schema': \{'env': 'SNOWFLAKE_SCHEMA'},
'warehouse': \{'env': 'SNOWFLAKE_WAREHOUSE'},
}
}
}
}
)