Polars (dagster-polars)
This library provides Dagster integration with Polars. It allows using Polars eager or lazy DataFrames as inputs and outputs with Dagster’s @asset and @op. Type annotations are used to control whether to load an eager or lazy DataFrame. Lazy DataFrames can be sinked as output. Multiple serialization formats (Parquet, Delta Lake, BigQuery) and filesystems (local, S3, GCS, …) are supported.
Comprehensive list of dagster-polars behavior for supported type annotations can be found in Type AnnotationsType Annotations
section.
Installation
pip install dagster-polars
Some IOManagers (like PolarsDeltaIOManager
) may require additional dependencies, which are provided with extras like dagster-polars[delta].
Please check the documentation for each IOManager for more details.
Quickstart
Common filesystem-based IOManagers features highlights, using PolarsParquetIOManager
as an example (see BasePolarsUPathIOManager
for the full list of features provided by dagster-polars):
Type annotations are not required. By default an eager pl.DataFrame will be loaded.
from dagster import asset
import polars as pl
@asset(io_manager_key="polars_parquet_io_manager")
def upstream():
return DataFrame(\{"foo": [1, 2, 3]})
@asset(io_manager_key="polars_parquet_io_manager")
def downstream(upstream) -> pl.LazyFrame:
assert isinstance(upstream, pl.DataFrame)
return upstream.lazy() # LazyFrame will be sinked
Lazy pl.LazyFrame can be scanned by annotating the input with pl.LazyFrame, and returning a pl.LazyFrame will sink it:
@asset(io_manager_key="polars_parquet_io_manager")
def downstream(upstream: pl.LazyFrame) -> pl.LazyFrame:
assert isinstance(upstream, pl.LazyFrame)
return upstream
The same logic applies to partitioned assets:
@asset
def downstream(partitioned_upstream: Dict[str, pl.LazyFrame]):
assert isinstance(partitioned_upstream, dict)
assert isinstance(partitioned_upstream["my_partition"], pl.LazyFrame)
Optional inputs and outputs are supported:
@asset
def upstream() -> Optional[pl.DataFrame]:
if has_data:
return DataFrame(\{"foo": [1, 2, 3]}) # type check will pass
else:
return None # type check will pass and `dagster_polars` will skip writing the output completely
@asset
def downstream(upstream: Optional[pl.LazyFrame]): # upstream will be None if it doesn't exist in storage
...
By default all the IOManagers store separate partitions as physically separated locations, such as:
- /my/asset/key/partition_0.extension
- /my/asset/key/partition_1.extension
This mode is useful for e.g. snapshotting.
Some IOManagers (like PolarsDeltaIOManager
) support reading and writing partitions in storage-native format in the same location.
This mode can be typically enabled by setting “partition_by” metadata value. For example, PolarsDeltaIOManager
would store different partitions in the same /my/asset/key.delta directory, which will be properly partitioned.
This mode should be preferred for true partitioning.
Type Annotations
Type aliases like DataFrameWithPartitions are provided by dagster_polars.types
for convenience.
Supported type annotations and dagster-polars behavior
Type annotation | Type Alias | Behavior |
---|---|---|
DataFrame | read/write aDataFrame | |
LazyFrame | read/sink aLazyFrame | |
Optional[DataFrame] | read/write aDataFrame. Do nothing if no data is found in storage or the output isNone | |
Optional[LazyFrame] | read aLazyFrame. Do nothing if no data is found in storage | |
Dict[str, DataFrame] | DataFrameWithPartitions | read multipleDataFrames as Dict[str, DataFrame]. Raises an error for missing partitions, unless“allow_missing_partitions”input metadata is set toTrue |
Dict[str, LazyFrame] | LazyFramePartitions | read multipleLazyFrames as Dict[str, LazyFrame]. Raises an error for missing partitions, unless“allow_missing_partitions”input metadata is set toTrue |
Generic builtins (like tuple[…] instead of Tuple[…]) are supported for Python >= 3.9.
API Documentation
- dagster_polars.BasePolarsUPathIOManager IOManagerDefinition
Base class for dagster-polars IOManagers.
Doesn’t define a specific storage format.
To implement a specific storage format (parquet, csv, etc), inherit from this class and implement the write_df_to_path, sink_df_to_path and scan_df_from_path methods.
Features:
- All the features of
UPathIOManager
- works with local and remote filesystems (like S3), supports loading multiple partitions with respect toPartitionMapping
, and more - loads the correct type - polars.DataFrame, polars.LazyFrame, or other types defined in
dagster_polars.types
- based on the input type annotation (or dagster.DagsterType’s typing_type) - can sink lazy pl.LazyFrame DataFrames
- handles Nones with Optional types by skipping loading missing inputs or saving None outputs
- logs various metadata about the DataFrame - size, schema, sample, stats, …
- the “columns” input metadata value can be used to select a subset of columns to load
- All the features of
- dagster_polars.PolarsParquetIOManager IOManagerDefinition
- experimental
This API may break in future versions, even between dot releases.
Implements reading and writing Polars DataFrames in Apache Parquet format.
Features:
- All features provided by
BasePolarsUPathIOManager
. - All read/write options can be set via corresponding metadata or config parameters (metadata takes precedence).
- Supports reading partitioned Parquet datasets (for example, often produced by Spark).
- Supports reading/writing custom metadata in the Parquet file’s schema as json-serialized bytes at “dagster_polars_metadata” key.
Examples:
from dagster import asset
from dagster_polars import PolarsParquetIOManager
import polars as pl
@asset(
io_manager_key="polars_parquet_io_manager",
key_prefix=["my_dataset"]
)
def my_asset() -> pl.DataFrame: # data will be stored at \<base_dir>/my_dataset/my_asset.parquet
...
defs = Definitions(
assets=[my_table],
resources=\{
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir="s3://my-bucket/my-dir")
}
)Reading partitioned Parquet datasets:
from dagster import SourceAsset
my_asset = SourceAsset(
key=["path", "to", "dataset"],
io_manager_key="polars_parquet_io_manager",
metadata=\{
"partition_by": ["year", "month", "day"]
}
) - All features provided by
- dagster_polars.PolarsDeltaIOManager IOManagerDefinition
- experimental
This API may break in future versions, even between dot releases.
Implements writing and reading DeltaLake tables.
Features:
- All features provided by
BasePolarsUPathIOManager
. - All read/write options can be set via corresponding metadata or config parameters (metadata takes precedence).
- Supports native DeltaLake partitioning by storing different asset partitions in the same DeltaLake table.
Install dagster-polars[delta] to use this IOManager.
Examples:
from dagster import asset
from dagster_polars import PolarsDeltaIOManager
import polars as pl
@asset(
io_manager_key="polars_delta_io_manager",
key_prefix=["my_dataset"]
)
def my_asset() -> pl.DataFrame: # data will be stored at \<base_dir>/my_dataset/my_asset.delta
...
defs = Definitions(
assets=[my_table],
resources=\{
"polars_parquet_io_manager": PolarsDeltaIOManager(base_dir="s3://my-bucket/my-dir")
}
)Appending to a DeltaLake table:
@asset(
io_manager_key="polars_delta_io_manager",
metadata=\{
"mode": "append"
},
)
def my_table() -> pl.DataFrame:
...Using native DeltaLake partitioning by storing different asset partitions in the same DeltaLake table:
from dagster import AssetExecutionContext, DailyPartitionedDefinition
from dagster_polars import LazyFramePartitions
@asset(
io_manager_key="polars_delta_io_manager",
metadata=\{
"partition_by": "partition_col"
},
partitions_def=StaticPartitionsDefinition(["a, "b", "c"])
)
def upstream(context: AssetExecutionContext) -> pl.DataFrame:
df = ...
# column with the partition_key must match `partition_by` metadata value
return df.with_columns(pl.lit(context.partition_key).alias("partition_col"))
@asset
def downstream(upstream: pl.LazyFrame) -> pl.DataFrame:
...When using MuiltiPartitionsDefinition, partition_by metadata value should be a dictionary mapping dimensions to column names.
from dagster import AssetExecutionContext, DailyPartitionedDefinition, MultiPartitionsDefinition, StaticPartitionsDefinition
from dagster_polars import LazyFramePartitions
@asset(
io_manager_key="polars_delta_io_manager",
metadata=\{
"partition_by": \{"time": "date", "clients": "client"} # dimension -> column mapping
},
partitions_def=MultiPartitionsDefinition(
\{
"date": DailyPartitionedDefinition(...),
"clients": StaticPartitionsDefinition(...)
}
)
)
def upstream(context: AssetExecutionContext) -> pl.DataFrame:
df = ...
partition_keys_by_dimension = context.partition_key.keys_by_dimension
return df.with_columns(
pl.lit(partition_keys_by_dimension["time"]).alias("date"), # time dimension matches date column
pl.lit(partition_keys_by_dimension["clients"]).alias("client") # clients dimension matches client column
)
@asset
def downstream(upstream: pl.LazyFrame) -> pl.DataFrame:
... - All features provided by
- dagster_polars.PolarsBigQueryIOManager IOManagerDefinition
Implements reading and writing Polars DataFrames from/to BigQuery).
Features:
- All
DBIOManager
features - Supports writing partitioned tables (“partition_expr” input metadata key must be specified).
Returns: IOManagerDefinition Examples:
from dagster import Definitions, EnvVar
from dagster_polars import PolarsBigQueryIOManager
@asset(
key_prefix=["my_dataset"] # will be used as the dataset in BigQuery
)
def my_table() -> pl.DataFrame: # the name of the asset will be the table name
...
defs = Definitions(
assets=[my_table],
resources=\{
"io_manager": PolarsBigQueryIOManager(project=EnvVar("GCP_PROJECT"))
}
)You can tell Dagster in which dataset to create tables by setting the “dataset” configuration value. If you do not provide a dataset as configuration to the I/O manager, Dagster will determine a dataset based on the assets and ops using the I/O Manager. For assets, the dataset will be determined from the asset key, as shown in the above example. The final prefix before the asset name will be used as the dataset. For example, if the asset “my_table” had the key prefix [“gcp”, “bigquery”, “my_dataset”], the dataset “my_dataset” will be used. For ops, the dataset can be specified by including a “schema” entry in output metadata. If “schema” is not provided via config or on the asset/op, “public” will be used for the dataset.
@op(
out=\{"my_table": Out(metadata=\{"schema": "my_dataset"})}
)
def make_my_table() -> pl.DataFrame:
# the returned value will be stored at my_dataset.my_table
...To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.
@asset(
ins=\{"my_table": AssetIn("my_table", metadata=\{"columns": ["a"]})}
)
def my_table_a(my_table: pl.DataFrame) -> pd.DataFrame:
# my_table will just contain the data from column "a"
...If you cannot upload a file to your Dagster deployment, or otherwise cannot authenticate with GCP via a standard method, you can provide a service account key as the “gcp_credentials” configuration. Dagster will store this key in a temporary file and set GOOGLE_APPLICATION_CREDENTIALS to point to the file. After the run completes, the file will be deleted, and GOOGLE_APPLICATION_CREDENTIALS will be unset. The key must be base64 encoded to avoid issues with newlines in the keys. You can retrieve the base64 encoded key with this shell command: cat $GOOGLE_APPLICATION_CREDENTIALS | base64
The “write_disposition” metadata key can be used to set the write_disposition parameter of bigquery.JobConfig. For example, set it to “WRITE_APPEND” to append to an existing table intead of overwriting it.
Install dagster-polars[gcp] to use this IOManager.
- All