Skip to main content

Polars (dagster-polars)

This library provides Dagster integration with Polars. It allows using Polars eager or lazy DataFrames as inputs and outputs with Dagster’s @asset and @op. Type annotations are used to control whether to load an eager or lazy DataFrame. Lazy DataFrames can be sinked as output. Multiple serialization formats (Parquet, Delta Lake, BigQuery) and filesystems (local, S3, GCS, …) are supported.

Comprehensive list of dagster-polars behavior for supported type annotations can be found in Type AnnotationsType Annotations section.

Installation

pip install dagster-polars

Some IOManagers (like PolarsDeltaIOManager) may require additional dependencies, which are provided with extras like dagster-polars[delta]. Please check the documentation for each IOManager for more details.

Quickstart

Common filesystem-based IOManagers features highlights, using PolarsParquetIOManager as an example (see BasePolarsUPathIOManager for the full list of features provided by dagster-polars):

Type annotations are not required. By default an eager pl.DataFrame will be loaded.

from dagster import asset
import polars as pl

@asset(io_manager_key="polars_parquet_io_manager")
def upstream():
return DataFrame(\{"foo": [1, 2, 3]})

@asset(io_manager_key="polars_parquet_io_manager")
def downstream(upstream) -> pl.LazyFrame:
assert isinstance(upstream, pl.DataFrame)
return upstream.lazy() # LazyFrame will be sinked

Lazy pl.LazyFrame can be scanned by annotating the input with pl.LazyFrame, and returning a pl.LazyFrame will sink it:

@asset(io_manager_key="polars_parquet_io_manager")
def downstream(upstream: pl.LazyFrame) -> pl.LazyFrame:
assert isinstance(upstream, pl.LazyFrame)
return upstream

The same logic applies to partitioned assets:

@asset
def downstream(partitioned_upstream: Dict[str, pl.LazyFrame]):
assert isinstance(partitioned_upstream, dict)
assert isinstance(partitioned_upstream["my_partition"], pl.LazyFrame)

Optional inputs and outputs are supported:

@asset
def upstream() -> Optional[pl.DataFrame]:
if has_data:
return DataFrame(\{"foo": [1, 2, 3]}) # type check will pass
else:
return None # type check will pass and `dagster_polars` will skip writing the output completely

@asset
def downstream(upstream: Optional[pl.LazyFrame]): # upstream will be None if it doesn't exist in storage
...

By default all the IOManagers store separate partitions as physically separated locations, such as:

  • /my/asset/key/partition_0.extension
  • /my/asset/key/partition_1.extension

This mode is useful for e.g. snapshotting.

Some IOManagers (like PolarsDeltaIOManager) support reading and writing partitions in storage-native format in the same location. This mode can be typically enabled by setting “partition_by” metadata value. For example, PolarsDeltaIOManager would store different partitions in the same /my/asset/key.delta directory, which will be properly partitioned.

This mode should be preferred for true partitioning.

Type Annotations

Type aliases like DataFrameWithPartitions are provided by dagster_polars.types for convenience.

Supported type annotations and dagster-polars behavior

Type annotationType AliasBehavior
DataFrameread/write aDataFrame
LazyFrameread/sink aLazyFrame
Optional[DataFrame]read/write aDataFrame. Do nothing if no data is found in storage or the output isNone
Optional[LazyFrame]read aLazyFrame. Do nothing if no data is found in storage
Dict[str, DataFrame]DataFrameWithPartitionsread multipleDataFrames as Dict[str, DataFrame]. Raises an error for missing partitions, unless“allow_missing_partitions”input metadata is set toTrue
Dict[str, LazyFrame]LazyFramePartitionsread multipleLazyFrames as Dict[str, LazyFrame]. Raises an error for missing partitions, unless“allow_missing_partitions”input metadata is set toTrue

Generic builtins (like tuple[…] instead of Tuple[…]) are supported for Python >= 3.9.

API Documentation

dagster_polars.BasePolarsUPathIOManager IOManagerDefinition

Base class for dagster-polars IOManagers.

Doesn’t define a specific storage format.

To implement a specific storage format (parquet, csv, etc), inherit from this class and implement the write_df_to_path, sink_df_to_path and scan_df_from_path methods.

Features:

  • All the features of UPathIOManager - works with local and remote filesystems (like S3), supports loading multiple partitions with respect to PartitionMapping, and more
  • loads the correct type - polars.DataFrame, polars.LazyFrame, or other types defined in dagster_polars.types - based on the input type annotation (or dagster.DagsterType’s typing_type)
  • can sink lazy pl.LazyFrame DataFrames
  • handles Nones with Optional types by skipping loading missing inputs or saving None outputs
  • logs various metadata about the DataFrame - size, schema, sample, stats, …
  • the “columns” input metadata value can be used to select a subset of columns to load
dagster_polars.PolarsParquetIOManager IOManagerDefinition
experimental

This API may break in future versions, even between dot releases.

Implements reading and writing Polars DataFrames in Apache Parquet format.

Features:

  • All features provided by BasePolarsUPathIOManager.
  • All read/write options can be set via corresponding metadata or config parameters (metadata takes precedence).
  • Supports reading partitioned Parquet datasets (for example, often produced by Spark).
  • Supports reading/writing custom metadata in the Parquet file’s schema as json-serialized bytes at “dagster_polars_metadata” key.

Examples:

from dagster import asset
from dagster_polars import PolarsParquetIOManager
import polars as pl

@asset(
io_manager_key="polars_parquet_io_manager",
key_prefix=["my_dataset"]
)
def my_asset() -> pl.DataFrame: # data will be stored at \<base_dir>/my_dataset/my_asset.parquet
...

defs = Definitions(
assets=[my_table],
resources=\{
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir="s3://my-bucket/my-dir")
}
)

Reading partitioned Parquet datasets:

from dagster import SourceAsset

my_asset = SourceAsset(
key=["path", "to", "dataset"],
io_manager_key="polars_parquet_io_manager",
metadata=\{
"partition_by": ["year", "month", "day"]
}
)
dagster_polars.PolarsDeltaIOManager IOManagerDefinition
experimental

This API may break in future versions, even between dot releases.

Implements writing and reading DeltaLake tables.

Features:

  • All features provided by BasePolarsUPathIOManager.
  • All read/write options can be set via corresponding metadata or config parameters (metadata takes precedence).
  • Supports native DeltaLake partitioning by storing different asset partitions in the same DeltaLake table.

Install dagster-polars[delta] to use this IOManager.

Examples:

from dagster import asset
from dagster_polars import PolarsDeltaIOManager
import polars as pl

@asset(
io_manager_key="polars_delta_io_manager",
key_prefix=["my_dataset"]
)
def my_asset() -> pl.DataFrame: # data will be stored at \<base_dir>/my_dataset/my_asset.delta
...

defs = Definitions(
assets=[my_table],
resources=\{
"polars_parquet_io_manager": PolarsDeltaIOManager(base_dir="s3://my-bucket/my-dir")
}
)

Appending to a DeltaLake table:

@asset(
io_manager_key="polars_delta_io_manager",
metadata=\{
"mode": "append"
},
)
def my_table() -> pl.DataFrame:
...

Using native DeltaLake partitioning by storing different asset partitions in the same DeltaLake table:

from dagster import AssetExecutionContext, DailyPartitionedDefinition
from dagster_polars import LazyFramePartitions

@asset(
io_manager_key="polars_delta_io_manager",
metadata=\{
"partition_by": "partition_col"
},
partitions_def=StaticPartitionsDefinition(["a, "b", "c"])
)
def upstream(context: AssetExecutionContext) -> pl.DataFrame:
df = ...

# column with the partition_key must match `partition_by` metadata value
return df.with_columns(pl.lit(context.partition_key).alias("partition_col"))

@asset
def downstream(upstream: pl.LazyFrame) -> pl.DataFrame:
...

When using MuiltiPartitionsDefinition, partition_by metadata value should be a dictionary mapping dimensions to column names.

from dagster import AssetExecutionContext, DailyPartitionedDefinition, MultiPartitionsDefinition, StaticPartitionsDefinition
from dagster_polars import LazyFramePartitions

@asset(
io_manager_key="polars_delta_io_manager",
metadata=\{
"partition_by": \{"time": "date", "clients": "client"} # dimension -> column mapping
},
partitions_def=MultiPartitionsDefinition(
\{
"date": DailyPartitionedDefinition(...),
"clients": StaticPartitionsDefinition(...)
}
)
)
def upstream(context: AssetExecutionContext) -> pl.DataFrame:
df = ...

partition_keys_by_dimension = context.partition_key.keys_by_dimension

return df.with_columns(
pl.lit(partition_keys_by_dimension["time"]).alias("date"), # time dimension matches date column
pl.lit(partition_keys_by_dimension["clients"]).alias("client") # clients dimension matches client column
)


@asset
def downstream(upstream: pl.LazyFrame) -> pl.DataFrame:
...
dagster_polars.PolarsBigQueryIOManager IOManagerDefinition

Implements reading and writing Polars DataFrames from/to BigQuery).

Features:

  • All DBIOManager features
  • Supports writing partitioned tables (“partition_expr” input metadata key must be specified).

Returns: IOManagerDefinition Examples:

from dagster import Definitions, EnvVar
from dagster_polars import PolarsBigQueryIOManager

@asset(
key_prefix=["my_dataset"] # will be used as the dataset in BigQuery
)
def my_table() -> pl.DataFrame: # the name of the asset will be the table name
...

defs = Definitions(
assets=[my_table],
resources=\{
"io_manager": PolarsBigQueryIOManager(project=EnvVar("GCP_PROJECT"))
}
)

You can tell Dagster in which dataset to create tables by setting the “dataset” configuration value. If you do not provide a dataset as configuration to the I/O manager, Dagster will determine a dataset based on the assets and ops using the I/O Manager. For assets, the dataset will be determined from the asset key, as shown in the above example. The final prefix before the asset name will be used as the dataset. For example, if the asset “my_table” had the key prefix [“gcp”, “bigquery”, “my_dataset”], the dataset “my_dataset” will be used. For ops, the dataset can be specified by including a “schema” entry in output metadata. If “schema” is not provided via config or on the asset/op, “public” will be used for the dataset.

@op(
out=\{"my_table": Out(metadata=\{"schema": "my_dataset"})}
)
def make_my_table() -> pl.DataFrame:
# the returned value will be stored at my_dataset.my_table
...

To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.

@asset(
ins=\{"my_table": AssetIn("my_table", metadata=\{"columns": ["a"]})}
)
def my_table_a(my_table: pl.DataFrame) -> pd.DataFrame:
# my_table will just contain the data from column "a"
...

If you cannot upload a file to your Dagster deployment, or otherwise cannot authenticate with GCP via a standard method, you can provide a service account key as the “gcp_credentials” configuration. Dagster will store this key in a temporary file and set GOOGLE_APPLICATION_CREDENTIALS to point to the file. After the run completes, the file will be deleted, and GOOGLE_APPLICATION_CREDENTIALS will be unset. The key must be base64 encoded to avoid issues with newlines in the keys. You can retrieve the base64 encoded key with this shell command: cat $GOOGLE_APPLICATION_CREDENTIALS | base64

The “write_disposition” metadata key can be used to set the write_disposition parameter of bigquery.JobConfig. For example, set it to “WRITE_APPEND” to append to an existing table intead of overwriting it.

Install dagster-polars[gcp] to use this IOManager.