Skip to main content

I/O managers

I/O managers in Dagster allow you to keep the code for data processing separate from the code for reading and writing data. This reduces repetitive code and makes it easier to change where your data is stored.

In many Dagster pipelines, assets can be broken down as the following steps:

  1. Reading data a some data store into memory
  2. Applying in-memory transform
  3. Writing the transformed data to a data store

For assets that follow this pattern, an I/O manager can streamline the code that handles reading and writing data to and from a source.

note

This article assumes familiarity with: assets and resources

Before you begin

I/O managers aren't required to use Dagster, nor are they the best option in all scenarios. If you find yourself writing the same code at the start and end of each asset to load and store data, an I/O manager may be useful. For example:

  • You have assets that are stored in the same location and follow a consistent set of rules to determine the storage path
  • You have assets that are stored differently in local, staging, and production environments
  • You have assets that load upstream dependencies into memory to do the computation

I/O managers may not be the best fit if:

  • You want to run SQL queries that create or update a table in a database
  • Your pipeline manages I/O on its own by using other libraries/tools that write to storage
  • Your assets won't fit in memory, such as a database table with billions of rows

As a general rule, if your pipeline becomes more complicated in order to use I/O managers, it's likely that I/O managers aren't a good fit. In these cases you should use deps to define dependencies.

Using I/O managers in assets

Consider the following example, which contains assets that construct a DuckDB connection object, read data from an upstream table, apply some in-memory transform, and write the result to a new table in DuckDB:

import pandas as pd
from dagster_duckdb import DuckDBResource

import dagster as dg

raw_sales_data = dg.AssetSpec("raw_sales_data")


@dg.asset
def raw_sales_data(duckdb: DuckDBResource) -> None:
# Read data from a CSV
raw_df = pd.read_csv(
"https://raw.githubusercontent.com/dagster-io/dagster/master/docs/next/public/assets/raw_sales_data.csv"
)
# Construct DuckDB connection
with duckdb.get_connection() as conn:
# Use the data from the CSV to create or update a table
conn.execute(
"CREATE TABLE IF NOT EXISTS raw_sales_data AS SELECT * FROM raw_df"
)
if not conn.fetchall():
conn.execute("INSERT INTO raw_sales_data SELECT * FROM raw_df")


# Asset dependent on `raw_sales_data` asset
@dg.asset(deps=[raw_sales_data])
def clean_sales_data(duckdb: DuckDBResource) -> None:
# Construct DuckDB connection
with duckdb.get_connection() as conn:
# Select data from table
df = conn.execute("SELECT * FROM raw_sales_data").fetch_df()

# Apply transform
clean_df = df.fillna({"amount": 0.0})

# Use transformed result to create or update a table
conn.execute(
"CREATE TABLE IF NOT EXISTS clean_sales_data AS SELECT * FROM clean_df"
)
if not conn.fetchall():
conn.execute("INSERT INTO clean_sales_data SELECT * FROM clean_df")


# Asset dependent on `clean_sales_data` asset
@dg.asset(deps=[clean_sales_data])
def sales_summary(duckdb: DuckDBResource) -> None:
# Construct DuckDB connection
with duckdb.get_connection() as conn:
# Select data from table
df = conn.execute("SELECT * FROM clean_sales_data").fetch_df()

# Apply transform
summary = df.groupby(["owner"])["amount"].sum().reset_index()

# Use transformed result to create or update a table
conn.execute(
"CREATE TABLE IF NOT EXISTS sales_summary AS SELECT * from summary"
)
if not conn.fetchall():
conn.execute("INSERT INTO sales_summary SELECT * from summary")


defs = dg.Definitions(
assets=[raw_sales_data, clean_sales_data, sales_summary],
resources={"duckdb": DuckDBResource(database="sales.duckdb", schema="public")},
)

Using an I/O manager would remove the code that reads and writes data from the assets themselves, instead delegating it to the I/O manager. The assets would be left only with the code that applies transformations or retrieves the initial CSV file.

import pandas as pd
from dagster_duckdb_pandas import DuckDBPandasIOManager

import dagster as dg


@dg.asset
def raw_sales_data() -> pd.DataFrame:
return pd.read_csv(
"https://raw.githubusercontent.com/dagster-io/dagster/master/docs/next/public/assets/raw_sales_data.csv"
)


@dg.asset
# Load the upstream `raw_sales_data` asset as input & specify the returned data type (`pd.DataFrame`)
def clean_sales_data(raw_sales_data: pd.DataFrame) -> pd.DataFrame:
# Storing data with an I/O manager requires returning the data
return raw_sales_data.fillna({"amount": 0.0})


@dg.asset
def sales_summary(clean_sales_data: pd.DataFrame) -> pd.DataFrame:
return clean_sales_data.groupby(["owner"])["amount"].sum().reset_index()


defs = dg.Definitions(
assets=[raw_sales_data, clean_sales_data, sales_summary],
# Define the I/O manager and pass it to `Definitions`
resources={
"io_manager": DuckDBPandasIOManager(database="sales.duckdb", schema="public")
},
)

To load upstream assets using an I/O manager, specify the asset as an input parameter to the asset function. In this example, the DuckDBPandasIOManager I/O manager will read the DuckDB table with the same name as the upstream asset (raw_sales_data) and pass the data to clean_sales_data as a Pandas DataFrame.

To store data using an I/O manager, return the data in the asset function. The returned data must be a valid type. This example uses Pandas DataFrames, which the DuckDBPandasIOManager will write to a DuckDB table with the same name as the asset.

Refer to the individual I/O manager documentation for details on valid types and how they store data.

Swapping data stores

With I/O managers, swapping data stores consists of changing the implementation of the I/O manager. The asset definitions, which only contain transformational logic, won't need to change.

In the following example, a Snowflake I/O manager replaced the DuckDB I/O manager.

import pandas as pd
from dagster_snowflake_pandas import SnowflakePandasIOManager

import dagster as dg


@dg.asset
def raw_sales_data() -> pd.DataFrame:
return pd.read_csv(
"https://raw.githubusercontent.com/dagster-io/dagster/master/docs/next/public/assets/raw_sales_data.csv"
)


@dg.asset
def clean_sales_data(raw_sales_data: pd.DataFrame) -> pd.DataFrame:
return raw_sales_data.fillna({"amount": 0.0})


@dg.asset
def sales_summary(clean_sales_data: pd.DataFrame) -> pd.DataFrame:
return clean_sales_data.groupby(["owner"])["amount"].sum().reset_index()


defs = dg.Definitions(
assets=[raw_sales_data, clean_sales_data, sales_summary],
resources={
# Swap in a Snowflake I/O manager
"io_manager": SnowflakePandasIOManager(
database=dg.EnvVar("SNOWFLAKE_DATABASE"),
account=dg.EnvVar("SNOWFLAKE_ACCOUNT"),
user=dg.EnvVar("SNOWFLAKE_USER"),
password=dg.EnvVar("SNOWFLAKE_PASSWORD"),
)
},
)

Built-in I/O managers

Dagster offers built-in library implementations for I/O managers for popular data stores and in-memory formats.

NameDescription
FilesystemIOManagerDefault I/O manager. Stores outputs as pickle files on the local file system.
InMemoryIOManagerStores outputs in memory. Primarily useful for unit testing.
s3.S3PickleIOManagerStores outputs as pickle files in Amazon Web Services S3.
adls2.ConfigurablePickledObjectADLS2IOManagerStores outputs as pickle files in Azure ADLS2.
GCSPickleIOManagerStores outputs as pickle files in Google Cloud Platform GCS.
BigQueryPandasIOManagerStores Pandas DataFrame outputs in Google Cloud Platform BigQuery.
BigQueryPySparkIOManagerStores PySpark DataFrame outputs in Google Cloud Platform BigQuery.
SnowflakePandasIOManagerStores Pandas DataFrame outputs in Snowflake.
SnowflakePySparkIOManagerStores PySpark DataFrame outputs in Snowflake.
DuckDBPandasIOManagerStores Pandas DataFrame outputs in DuckDB.
DuckDBPySparkIOManagerStores PySpark DataFrame outputs in DuckDB.
DuckDBPolarsIOManagerStores Polars DataFrame outputs in DuckDB.

Next steps