Connecting to databases
When building a data pipeline, you may need to extract data from or load data into a database. In Dagster, resources can be used to connect to a database by acting as a wrapper around a database client.
This guide demonstrates how to standardize database connections and customize their configuration using Dagster resources.
This guide assumes familiarity with assets.
Prerequisites
To run the example code in this article, you'll need:
-
Connection information for a Snowflake database
-
To install the following:
pip install dagster dagster-snowflake pandas
Step 1: Write a resource
This example creates a resource that represents a Snowflake database. Using SnowflakeResource
, define a Dagster resource that connects to a Snowflake database:
from dagster_snowflake import SnowflakeResource
# Define a resource named `iris_db`
iris_db = SnowflakeResource(
# Passwords in code is bad practice; we'll fix this later
password="snowflake_password",
warehouse="snowflake_warehouse",
account="snowflake_account",
user="snowflake_user",
database="iris_database",
schema="iris_schema",
)
Step 2: Use the resource in an asset
To use the resource, provide it as a parameter to an asset and include it in the Definitions
object:
import pandas as pd
from dagster_snowflake import SnowflakeResource
from snowflake.connector.pandas_tools import write_pandas
import dagster as dg
iris_db = SnowflakeResource(
password="snowflake_password",
warehouse="snowflake_warehouse",
account="snowflake_account",
user="snowflake_user",
database="iris_database",
schema="iris_schema",
)
@dg.asset
# Provide the resource to the asset
def iris_dataset(iris_db: SnowflakeResource) -> None:
iris_df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
with iris_db.get_connection() as conn:
write_pandas(conn, iris_df, table_name="iris_dataset")
# A second asset that uses the `iris_db` resource
@dg.asset(deps=[iris_dataset])
def iris_setosa(iris_db: SnowflakeResource) -> None:
with iris_db.get_connection() as conn:
conn.cursor().execute(
"""
CREATE OR REPALCE TABLE iris_setosa as (
SELECT *
FROM iris.iris_dataset
WHERE species = 'Iris-setosa'
);"""
)
defs = dg.Definitions(
assets=[iris_dataset, iris_setosa],
# Include the resource in the `Definitions` object
resources={"iris_db": iris_db},
)
When you materialize these assets, Dagster will provide an initialized SnowflakeResource
to the assets' iris_db
parameter.
Step 3: Source configuration with environment variables
Resources can be configured using environment variables, allowing you to connect to environment-specific databases, swap credentials, and so on. You can use Dagster's built-in EnvVar
class to source configuration values from environment variables at asset materialization time.
In this example, a second instance of the Snowflake resource, named production
has been added:
import os
import pandas as pd
from dagster_snowflake import SnowflakeResource
from snowflake.connector.pandas_tools import write_pandas
import dagster as dg
# Define `local` and `production` versions of the Snowflake resource
resources = {
"local": {
"iris_db": SnowflakeResource(
# Retrieve dev credentials with environment variables
user=dg.EnvVar("DEV_SNOWFLAKE_USER"),
password=dg.EnvVar("DEV_SNOWFLAKE_PASSWORD"),
warehouse="snowflake_warehouse",
account="abc1234.us-east-1",
database="LOCAL",
schema="IRIS_SCHEMA",
),
},
"production": {
"iris_db": SnowflakeResource(
# Retrieve production credentials with environment variables
user=dg.EnvVar("PROD_SNOWFLAKE_USER"),
password=dg.EnvVar("PROD_SNOWFLAKE_PASSWORD"),
warehouse="snowflake_warehouse",
account="abc1234.us-east-1",
database="PRODUCTION",
schema="IRIS_SCHEMA",
),
},
}
@dg.asset
def iris_dataset(iris_db: SnowflakeResource) -> None:
iris_df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
with iris_db.get_connection() as conn:
write_pandas(conn, iris_df, table_name="iris_dataset")
@dg.asset(deps=[iris_dataset])
def iris_setosa(iris_db: SnowflakeResource) -> None:
with iris_db.get_connection() as conn:
conn.cursor().execute(
"""
CREATE OR REPALCE TABLE iris_setosa as (
SELECT *
FROM iris.iris_dataset
WHERE species = 'Iris-setosa'
);"""
)
# Define a variable that determines environment; defaults to `local`
deployment_name = os.getenv("DAGSTER_DEPLOYMENT", "local")
defs = dg.Definitions(
assets=[iris_dataset, iris_setosa],
# Provide the dictionary of resources to `Definitions`
resources=resources[deployment_name],
)
When the assets are materialized, Dagster will use the deployment_name
environment variable to determine which Snowflake resource to use (local
or production
). Then, Dagster will read the values set for each resource's environment variables (ex: DEV_SNOWFLAKE_PASSWORD
) and initialize a SnowflakeResource
with those values.
The initialized SnowflakeResource
will be provided to the assets' iris_db
parameter.
You can also fetch environment variables using the os
library. Dagster treats each approach to fetching environment variables differently, such as when they're fetched or how they display in the UI. Refer to the Environment variables guide for more information.
Next steps
- Explore how to use resources for connecting to APIs
- Go deeper into understanding resources