Defining assets
The most common way to create a data asset in Dagster is by annotating a Python function with an asset decorator. The function computes the contents of the asset, such as a database table or file.
An asset definition includes the following:
- An
AssetKey
, which is a handle for referring to the asset. - A set of upstream asset keys, which refer to assets that the contents of the asset definition are derived from.
- A Python function, which is responsible for computing the contents of the asset from its upstream dependencies and storing the results.
Prerequisites
To run the code in this article, you'll need to install Dagster. For more information, see the Installation guide.
Asset decorators
Dagster has four types of asset decorators:
Decorator | Description |
---|---|
@asset | Defines a single asset. See an example. |
@multi_asset | Outputs multiple assets from a single operation. See an example. |
@graph_asset | Outputs a single asset from multiple operations without making each operation itself an asset. See an example. |
@graph_multi_asset | Outputs multiple assets from multiple operations |
Defining operations that create a single asset
The simplest way to define a data asset in Dagster is by using the @asset
decorator. This decorator marks a Python function as an asset.
from typing import List
import dagster as dg
@dg.asset
def daily_sales() -> None: ...
@dg.asset(deps=[daily_sales], group_name="sales")
def weekly_sales() -> None: ...
@dg.asset(
deps=[weekly_sales],
owners=["bighead@hooli.com", "team:roof", "team:corpdev"],
)
def weekly_sales_report(context: dg.AssetExecutionContext):
context.log.info("Loading data for my_dataset")
defs = dg.Definitions(assets=[daily_sales, weekly_sales, weekly_sales_report])
In this example, my_data_asset
is an asset that logs its output. Dagster automatically tracks its dependencies and handles its execution within the pipeline.
Defining operations that create multiple assets
When you need to generate multiple assets from a single operation, you can use the @multi_asset
decorator. This allows you to output multiple assets while maintaining a single processing function, which could be useful for:
- Making a single call to an API that updates multiple tables
- Using the same in-memory object to compute multiple assets
In this example, my_multi_asset
produces two assets: asset_one
and asset_two
. Each is derived from the same function, which makes it easier to handle related data transformations together:
import dagster as dg
@dg.multi_asset(specs=[dg.AssetSpec("asset_one"), dg.AssetSpec("asset_two")])
def my_multi_asset():
dg.MaterializeResult(asset_key="asset_one", metadata={"num_rows": 10})
dg.MaterializeResult(asset_key="asset_two", metadata={"num_rows": 24})
defs = dg.Definitions(assets=[my_multi_asset])
This example could be expressed as:
Defining multiple operations that create a single asset
For cases where you need to perform multiple operations to produce a single asset, you can use the @graph_asset
decorator. This approach encapsulates a series of operations and exposes them as a single asset, allowing you to model complex pipelines while only exposing the final output.
from random import randint
import dagster as dg
@dg.op(
retry_policy=dg.RetryPolicy(
max_retries=5,
delay=0.2, # 200ms
backoff=dg.Backoff.EXPONENTIAL,
jitter=dg.Jitter.PLUS_MINUS,
)
)
def step_one() -> int:
if randint(0, 2) >= 1:
raise Exception("Flaky step that may fail randomly")
return 42
@dg.op
def step_two(num: int):
return num**2
@dg.graph_asset
def complex_asset():
return step_two(step_one())
defs = dg.Definitions(assets=[complex_asset])
In this example, complex_asset
is an asset that's the result of two operations: step_one
and step_two
. These steps are combined into a single asset, abstracting away the intermediate representations.
This example could be expressed as:
Next steps
- Enrich Dagster's built-in data catalog with asset metadata
- Learn to pass data between assets
- Learn to use a factory pattern to create multiple, similar assets