DuckDB + Pandas (dagster-duckdb-pandas)
This library provides an integration with the DuckDB database and Pandas data processing library.
Related guides:
- dagster_duckdb_pandas.DuckDBPandasIOManager IOManagerDefinition
An I/O manager definition that reads inputs from and writes Pandas DataFrames to DuckDB. When using the DuckDBPandasIOManager, any inputs and outputs without type annotations will be loaded as Pandas DataFrames.
Returns: IOManagerDefinition Examples:
from dagster_duckdb_pandas import DuckDBPandasIOManager
@asset(
key_prefix=["my_schema"] # will be used as the schema in DuckDB
)
def my_table() -> pd.DataFrame: # the name of the asset will be the table name
...
defs = Definitions(
assets=[my_table],
resources=\{"io_manager": DuckDBPandasIOManager(database="my_db.duckdb")}
)You can set a default schema to store the assets using the
schema
configuration value of the DuckDB I/O Manager. This schema will be used if no other schema is specified directly on an asset or op.defs = Definitions(
assets=[my_table],
resources=\{"io_manager": DuckDBPandasIOManager(database="my_db.duckdb", schema="my_schema")}
)On individual assets, you an also specify the schema where they should be stored using metadata or by adding a
key_prefix
to the asset key. If bothkey_prefix
and metadata are defined, the metadata will take precedence.@asset(
key_prefix=["my_schema"] # will be used as the schema in duckdb
)
def my_table() -> pd.DataFrame:
...
@asset(
metadata=\{"schema": "my_schema"} # will be used as the schema in duckdb
)
def my_other_table() -> pd.DataFrame:
...For ops, the schema can be specified by including a “schema” entry in output metadata.
@op(
out=\{"my_table": Out(metadata=\{"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
...If none of these is provided, the schema will default to “public”.
To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.
@asset(
ins=\{"my_table": AssetIn("my_table", metadata=\{"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame:
# my_table will just contain the data from column "a"
...
- class dagster_duckdb_pandas.DuckDBPandasTypeHandler
Stores and loads Pandas DataFrames in DuckDB.
To use this type handler, return it from the
type_handlers
method of an I/O manager that inherits from ``DuckDBIOManager`.Example:
from dagster_duckdb import DuckDBIOManager
from dagster_duckdb_pandas import DuckDBPandasTypeHandler
class MyDuckDBIOManager(DuckDBIOManager):
@staticmethod
def type_handlers() -> Sequence[DbTypeHandler]:
return [DuckDBPandasTypeHandler()]
@asset(
key_prefix=["my_schema"] # will be used as the schema in duckdb
)
def my_table() -> pd.DataFrame: # the name of the asset will be the table name
...
defs = Definitions(
assets=[my_table],
resources=\{"io_manager": MyDuckDBIOManager(database="my_db.duckdb")}
)
Legacy
- dagster_duckdb_pandas.duckdb_pandas_io_manager IOManagerDefinition
An I/O manager definition that reads inputs from and writes Pandas DataFrames to DuckDB. When using the duckdb_pandas_io_manager, any inputs and outputs without type annotations will be loaded as Pandas DataFrames.
Returns: IOManagerDefinition Examples:
from dagster_duckdb_pandas import duckdb_pandas_io_manager
@asset(
key_prefix=["my_schema"] # will be used as the schema in DuckDB
)
def my_table() -> pd.DataFrame: # the name of the asset will be the table name
...
defs = Definitions(
assets=[my_table],
resources=\{"io_manager": duckdb_pandas_io_manager.configured(\{"database": "my_db.duckdb"})}
)You can set a default schema to store the assets using the
schema
configuration value of the DuckDB I/O Manager. This schema will be used if no other schema is specified directly on an asset or op.defs = Definitions(
assets=[my_table],
resources=\{"io_manager": duckdb_pandas_io_manager.configured(\{"database": "my_db.duckdb", "schema": "my_schema"})}
)On individual assets, you an also specify the schema where they should be stored using metadata or by adding a
key_prefix
to the asset key. If bothkey_prefix
and metadata are defined, the metadata will take precedence.@asset(
key_prefix=["my_schema"] # will be used as the schema in duckdb
)
def my_table() -> pd.DataFrame:
...
@asset(
metadata=\{"schema": "my_schema"} # will be used as the schema in duckdb
)
def my_other_table() -> pd.DataFrame:
...For ops, the schema can be specified by including a “schema” entry in output metadata.
@op(
out=\{"my_table": Out(metadata=\{"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
...If none of these is provided, the schema will default to “public”.
To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.
@asset(
ins=\{"my_table": AssetIn("my_table", metadata=\{"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame:
# my_table will just contain the data from column "a"
...