Skip to main content

Connecting to databases

When building a data pipeline, you may need to extract data from or load data into a database. In Dagster, resources can be used to connect to a database by acting as a wrapper around a database client.

This guide demonstrates how to standardize database connections and customize their configuration using Dagster resources.

note

This guide assumes familiarity with assets.

Prerequisites

To run the example code in this article, you'll need:

  • Connection information for a Snowflake database

  • To install the following:

    pip install dagster dagster-snowflake pandas

Step 1: Write a resource

This example creates a resource that represents a Snowflake database. Using SnowflakeResource, define a Dagster resource that connects to a Snowflake database:

from dagster_snowflake import SnowflakeResource

# Define a resource named `iris_db`
iris_db = SnowflakeResource(
# Passwords in code is bad practice; we'll fix this later
password="snowflake_password",
warehouse="snowflake_warehouse",
account="snowflake_account",
user="snowflake_user",
database="iris_database",
schema="iris_schema",
)

Step 2: Use the resource in an asset

To use the resource, provide it as a parameter to an asset and include it in the Definitions object:

import pandas as pd
from dagster_snowflake import SnowflakeResource
from snowflake.connector.pandas_tools import write_pandas

import dagster as dg

iris_db = SnowflakeResource(
password="snowflake_password",
warehouse="snowflake_warehouse",
account="snowflake_account",
user="snowflake_user",
database="iris_database",
schema="iris_schema",
)


@dg.asset
# Provide the resource to the asset
def iris_dataset(iris_db: SnowflakeResource) -> None:
iris_df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)

with iris_db.get_connection() as conn:
write_pandas(conn, iris_df, table_name="iris_dataset")


# A second asset that uses the `iris_db` resource
@dg.asset(deps=[iris_dataset])
def iris_setosa(iris_db: SnowflakeResource) -> None:
with iris_db.get_connection() as conn:
conn.cursor().execute(
"""
CREATE OR REPALCE TABLE iris_setosa as (
SELECT *
FROM iris.iris_dataset
WHERE species = 'Iris-setosa'
);"""
)


defs = dg.Definitions(
assets=[iris_dataset, iris_setosa],
# Include the resource in the `Definitions` object
resources={"iris_db": iris_db},
)

When you materialize these assets, Dagster will provide an initialized SnowflakeResource to the assets' iris_db parameter.

Step 3: Source configuration with environment variables

Resources can be configured using environment variables, allowing you to connect to environment-specific databases, swap credentials, and so on. You can use Dagster's built-in EnvVar class to source configuration values from environment variables at asset materialization time.

In this example, a second instance of the Snowflake resource, named production has been added:

import os

import pandas as pd
from dagster_snowflake import SnowflakeResource
from snowflake.connector.pandas_tools import write_pandas

import dagster as dg

# Define `local` and `production` versions of the Snowflake resource
resources = {
"local": {
"iris_db": SnowflakeResource(
# Retrieve dev credentials with environment variables
user=dg.EnvVar("DEV_SNOWFLAKE_USER"),
password=dg.EnvVar("DEV_SNOWFLAKE_PASSWORD"),
warehouse="snowflake_warehouse",
account="abc1234.us-east-1",
database="LOCAL",
schema="IRIS_SCHEMA",
),
},
"production": {
"iris_db": SnowflakeResource(
# Retrieve production credentials with environment variables
user=dg.EnvVar("PROD_SNOWFLAKE_USER"),
password=dg.EnvVar("PROD_SNOWFLAKE_PASSWORD"),
warehouse="snowflake_warehouse",
account="abc1234.us-east-1",
database="PRODUCTION",
schema="IRIS_SCHEMA",
),
},
}


@dg.asset
def iris_dataset(iris_db: SnowflakeResource) -> None:
iris_df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)

with iris_db.get_connection() as conn:
write_pandas(conn, iris_df, table_name="iris_dataset")


@dg.asset(deps=[iris_dataset])
def iris_setosa(iris_db: SnowflakeResource) -> None:
with iris_db.get_connection() as conn:
conn.cursor().execute(
"""
CREATE OR REPALCE TABLE iris_setosa as (
SELECT *
FROM iris.iris_dataset
WHERE species = 'Iris-setosa'
);"""
)


# Define a variable that determines environment; defaults to `local`
deployment_name = os.getenv("DAGSTER_DEPLOYMENT", "local")

defs = dg.Definitions(
assets=[iris_dataset, iris_setosa],
# Provide the dictionary of resources to `Definitions`
resources=resources[deployment_name],
)

When the assets are materialized, Dagster will use the deployment_name environment variable to determine which Snowflake resource to use (local or production). Then, Dagster will read the values set for each resource's environment variables (ex: DEV_SNOWFLAKE_PASSWORD) and initialize a SnowflakeResource with those values.

The initialized SnowflakeResource will be provided to the assets' iris_db parameter.

note

You can also fetch environment variables using the os library. Dagster treats each approach to fetching environment variables differently, such as when they're fetched or how they display in the UI. Refer to the Environment variables guide for more information.

Next steps