Skip to main content

Testing assets with asset checks

Asset checks are tests that verify specific properties of your data assets, allowing you to execute data quality checks on your data. For example, you can create checks to:

  • Ensure a particular column doesn't contain null values
  • Verify that a tabular asset adheres to a specified schema
  • Check if an asset's data needs refreshing

Each asset check should test only a single asset property to keep tests uncomplicated, reusable, and easy to track over time.

note

This article assumes familiarity with assets

Getting started

To get started with asset checks, follow these general steps:

  1. Define an asset check: Asset checks are typically defined using the @asset_check or @multi_asset_check decorator and run either within an asset or separate from the asset.
  2. Pass the asset checks to the Definitions object: Asset checks must be added to Definitions for Dagster to recognize them.
  3. Choose how to execute asset checks: By default, all jobs targeting an asset will also run associated checks, although you can run asset checks through the Dagster UI.
  4. View asset check results in the UI: Asset check results will appear in the UI and can be customized through the use of metadata and severity levels
  5. Alert on failed asset check results: If you are using Dagster+, you can choose to alert on asset checks.

Defining a single asset check

tip

Dagster's dbt integration can model existing dbt tests as asset checks. Refer to the dagster-dbt documentation for more information.

A asset check is defined using the @asset_check decorator.

The following example defines an asset check on an asset that fails if the order_id column of the asset contains a null value. The asset check will run after the asset has been materialized.

import pandas as pd

import dagster as dg


@dg.asset
def orders():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
orders_df.to_csv("orders.csv")


@dg.asset_check(asset=orders)
def orders_id_has_no_nulls():
orders_df = pd.read_csv("orders.csv")
num_null_order_ids = orders_df["order_id"].isna().sum()

# Return the result of the check
return dg.AssetCheckResult(
# Define passing criteria
passed=bool(num_null_order_ids == 0),
)


defs = dg.Definitions(
assets=[orders],
asset_checks=[orders_id_has_no_nulls],
)

Defining multiple asset checks

In most cases, checking the data quality of an asset will require multiple checks.

The following example defines two asset checks using the @multi_asset_check decorator:

  • One check that fails if the order_id column of the asset contains a null value
  • Another check that fails if the item_id column of the asset contains a null value

In this example, both asset checks will run in a single operation after the asset has been materialized.

from collections.abc import Iterable

import pandas as pd

import dagster as dg


@dg.asset
def orders():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
orders_df.to_csv("orders.csv")


@dg.multi_asset_check(
# Map checks to targeted assets
specs=[
dg.AssetCheckSpec(name="orders_id_has_no_nulls", asset="orders"),
dg.AssetCheckSpec(name="items_id_has_no_nulls", asset="orders"),
]
)
def orders_check() -> Iterable[dg.AssetCheckResult]:
orders_df = pd.read_csv("orders.csv")

# Check for null order_id column values
num_null_order_ids = orders_df["order_id"].isna().sum()
yield dg.AssetCheckResult(
check_name="orders_id_has_no_nulls",
passed=bool(num_null_order_ids == 0),
asset_key="orders",
)

# Check for null item_id column values
num_null_item_ids = orders_df["item_id"].isna().sum()
yield dg.AssetCheckResult(
check_name="items_id_has_no_nulls",
passed=bool(num_null_item_ids == 0),
asset_key="orders",
)


defs = dg.Definitions(
assets=[orders],
asset_checks=[orders_check],
)

Programmatically generating asset checks

Defining multiple checks can also be done using a factory pattern. The example below defines the same two asset checks as in the previous example, but this time using a factory pattern and the @multi_asset_check decorator.

from collections.abc import Iterable, Mapping, Sequence

import pandas as pd

import dagster as dg


@dg.asset
def orders():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
orders_df.to_csv("orders.csv")


def make_orders_checks(
check_blobs: Sequence[Mapping[str, str]],
) -> dg.AssetChecksDefinition:
@dg.multi_asset_check(
specs=[
dg.AssetCheckSpec(name=check_blob["name"], asset=check_blob["asset"])
for check_blob in check_blobs
]
)
def orders_check() -> Iterable[dg.AssetCheckResult]:
orders_df = pd.read_csv("orders.csv")

for check_blob in check_blobs:
num_null_order_ids = orders_df[check_blob["column"]].isna().sum()
yield dg.AssetCheckResult(
check_name=check_blob["name"],
passed=bool(num_null_order_ids == 0),
asset_key=check_blob["asset"],
)

return orders_check


check_blobs = [
{
"name": "orders_id_has_no_nulls",
"asset": "orders",
"column": "order_id",
},
{
"name": "items_id_has_no_nulls",
"asset": "orders",
"column": "item_id",
},
]


defs = dg.Definitions(
assets=[orders],
asset_checks=[make_orders_checks(check_blobs)],
)

Blocking downstream materialization

By default, if a parent's asset check fails during a run, the run will continue and downstream assets will be materialized. To prevent this behavior, set the blocking argument to True in the @asset_check decorator.

In the example bellow, if the orders_id_has_no_nulls check fails, the downstream augmented_orders asset won't be materialized.

import pandas as pd

import dagster as dg


@dg.asset
def orders():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
orders_df.to_csv("orders.csv")


# Check that targets `orders`; block materialization of `augmented_orders` on failure
@dg.asset_check(asset=orders, blocking=True)
def orders_id_has_no_nulls():
orders_df = pd.read_csv("orders.csv")
num_null_order_ids = orders_df["order_id"].isna().sum()
return dg.AssetCheckResult(
passed=bool(num_null_order_ids == 0),
)


# Asset downstream of `orders`
@dg.asset(deps=[orders])
def augmented_orders():
orders_df = pd.read_csv("orders.csv")
augmented_orders_df = orders_df.assign(description=["item_432", "item_878"])
augmented_orders_df.to_csv("augmented_orders.csv")


defs = dg.Definitions(
assets=[orders, augmented_orders],
asset_checks=[orders_id_has_no_nulls],
)

Scheduling and monitoring asset checks

In some cases, running asset checks separately from the job materializing the assets can be useful. For example, running all data quality checks once a day and sending an alert if they fail. This can be achieved using schedules and sensors.

In the example below, two jobs are defined: one for the asset and another for the asset check. Schedules are defined to materialize the asset and execute the asset check independently. A sensor is defined to send an email alert when the asset check job fails.

import os

import pandas as pd

import dagster as dg


@dg.asset
def orders():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
orders_df.to_csv("orders.csv")


@dg.asset_check(asset=orders)
def orders_id_has_no_nulls():
orders_df = pd.read_csv("orders.csv")
num_null_order_ids = orders_df["order_id"].isna().sum()
return dg.AssetCheckResult(
passed=bool(num_null_order_ids == 0),
)


# Only include the `orders` asset
asset_job = dg.define_asset_job(
"asset_job",
selection=dg.AssetSelection.assets(orders).without_checks(),
)

# Only include the `orders_id_has_no_nulls` check
check_job = dg.define_asset_job(
"check_job", selection=dg.AssetSelection.checks_for_assets(orders)
)

# Job schedules
asset_schedule = dg.ScheduleDefinition(job=asset_job, cron_schedule="0 0 * * *")
check_schedule = dg.ScheduleDefinition(job=check_job, cron_schedule="0 6 * * *")

# Send email on failure
check_sensor = dg.make_email_on_run_failure_sensor(
email_from="no-reply@example.com",
email_password=os.getenv("ALERT_EMAIL_PASSWORD"),
email_to=["xxx@example.com"],
monitored_jobs=[check_job],
)


defs = dg.Definitions(
assets=[orders],
asset_checks=[orders_id_has_no_nulls],
jobs=[asset_job, check_job],
schedules=[asset_schedule, check_schedule],
sensors=[check_sensor],
)

Next steps