Deltalake + Polars (dagster-deltalake-polars)
This library provides an integration with the Delta Lake storage framework.
Related guides:
- dagster_deltalake_polars.DeltaLakePolarsIOManager IOManagerDefinition
Base class for an IO manager definition that reads inputs from and writes outputs to Delta Lake.
Examples:
from dagster_deltalake import DeltaLakeIOManager
from dagster_deltalake_pandas import DeltaLakePandasTypeHandler
class MyDeltaLakeIOManager(DeltaLakeIOManager):
@staticmethod
def type_handlers() -> Sequence[DbTypeHandler]:
return [DeltaLakePandasTypeHandler()]
@asset(
key_prefix=["my_schema"] # will be used as the schema (parent folder) in Delta Lake
)
def my_table() -> pd.DataFrame: # the name of the asset will be the table name
...
defs = Definitions(
assets=[my_table],
resources=\{"io_manager": MyDeltaLakeIOManager()}
)If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O Manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a “schema” entry in output metadata. If none of these is provided, the schema will default to “public”.
@op(
out=\{"my_table": Out(metadata=\{"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
...To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.
@asset(
ins=\{"my_table": AssetIn("my_table", metadata=\{"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame):
# my_table will just contain the data from column "a"
...