Skip to main content

Defining assets

The most common way to create a data asset in Dagster is by annotating a Python function with an asset decorator. The function computes the contents of the asset, such as a database table or file.

An asset definition includes the following:

  • An AssetKey, which is a handle for referring to the asset.
  • A set of upstream asset keys, which refer to assets that the contents of the asset definition are derived from.
  • A Python function, which is responsible for computing the contents of the asset from its upstream dependencies and storing the results.
Prerequisites

To run the code in this article, you'll need to install Dagster. For more information, see the Installation guide.

Asset decorators

Dagster has four types of asset decorators:

DecoratorDescription
@assetDefines a single asset. See an example.
@multi_assetOutputs multiple assets from a single operation. See an example.
@graph_assetOutputs a single asset from multiple operations without making each operation itself an asset. See an example.
@graph_multi_assetOutputs multiple assets from multiple operations

Defining operations that create a single asset

The simplest way to define a data asset in Dagster is by using the @asset decorator. This decorator marks a Python function as an asset.

Using @dg.asset decorator
from typing import List  

import dagster as dg


@dg.asset
def daily_sales() -> None: ...


@dg.asset(deps=[daily_sales], group_name="sales")
def weekly_sales() -> None: ...


@dg.asset(
deps=[weekly_sales],
owners=["bighead@hooli.com", "team:roof", "team:corpdev"],
)
def weekly_sales_report(context: dg.AssetExecutionContext):
context.log.info("Loading data for my_dataset")


defs = dg.Definitions(assets=[daily_sales, weekly_sales, weekly_sales_report])

In this example, my_data_asset is an asset that logs its output. Dagster automatically tracks its dependencies and handles its execution within the pipeline.

Defining operations that create multiple assets

When you need to generate multiple assets from a single operation, you can use the @multi_asset decorator. This allows you to output multiple assets while maintaining a single processing function, which could be useful for:

  • Making a single call to an API that updates multiple tables
  • Using the same in-memory object to compute multiple assets

In this example, my_multi_asset produces two assets: asset_one and asset_two. Each is derived from the same function, which makes it easier to handle related data transformations together:

Using @dg.multi_asset decorator
import dagster as dg


@dg.multi_asset(specs=[dg.AssetSpec("asset_one"), dg.AssetSpec("asset_two")])
def my_multi_asset():
dg.MaterializeResult(asset_key="asset_one", metadata={"num_rows": 10})
dg.MaterializeResult(asset_key="asset_two", metadata={"num_rows": 24})


defs = dg.Definitions(assets=[my_multi_asset])

This example could be expressed as:

Defining multiple operations that create a single asset

For cases where you need to perform multiple operations to produce a single asset, you can use the @graph_asset decorator. This approach encapsulates a series of operations and exposes them as a single asset, allowing you to model complex pipelines while only exposing the final output.

Using @dg.graph_asset decorator
from random import randint

import dagster as dg


@dg.op(
retry_policy=dg.RetryPolicy(
max_retries=5,
delay=0.2, # 200ms
backoff=dg.Backoff.EXPONENTIAL,
jitter=dg.Jitter.PLUS_MINUS,
)
)
def step_one() -> int:
if randint(0, 2) >= 1:
raise Exception("Flaky step that may fail randomly")
return 42


@dg.op
def step_two(num: int):
return num**2


@dg.graph_asset
def complex_asset():
return step_two(step_one())


defs = dg.Definitions(assets=[complex_asset])

In this example, complex_asset is an asset that's the result of two operations: step_one and step_two. These steps are combined into a single asset, abstracting away the intermediate representations.

This example could be expressed as:

Next steps