Skip to main content

Data freshness checks

Freshness checks provide a way to identify data assets that are overdue for an update. For example, you can use freshness checks to identify stale assets caused by:

  • The pipeline hitting an error and failing
  • Runs not being scheduled
  • A backed up run queue
  • Runs taking longer than expected to complete

Freshness checks can also communicate SLAs for their data freshness. For example, downstream asset consumers can determine how often assets are expected to be updated by looking at the defined checks.

Prerequisites

To follow the steps in this guide, you'll need familiarity with:

Getting started

To get started with freshness checks, follow these general steps:

  1. Define a freshness check: Freshness checks are defined using build_last_update_freshness_checks, which utilizes an asset's last updated time to determine freshness.

    If using Dagster+ Pro, you can also use build_anomaly_detection_freshness_checks to define a freshness check that uses an anomaly detection model to determine freshness.

  2. Define a schedule or sensor: Defining a schedule or sensor (build_sensor_for_freshness_checks) is required to ensure the freshness check executes. If the check only runs after the asset has been materialized, the check won't be able to detect the times materialization fails.

  3. Pass the freshness check and schedule/sensor to the Definitions object: Freshness checks and the associated schedule or sensor must be added to a Definitions object for Dagster to recognize them.

  4. View the freshness check results in the Dagster UI: Freshness check results will appear in the UI, allowing you to track the results over time.

Materializable asset freshness

Materializable assets are assets materialized by Dagster. To calculate whether a materializable asset is overdue, Dagster uses the asset's last materialization timestamp.

The example below defines a freshness check on an asset that fails if the asset's latest materialization occurred more than one hour before the current time.

from datetime import timedelta

import dagster as dg


@dg.asset
def hourly_sales(context: dg.AssetExecutionContext):
context.log.info("Fetching and emitting hourly sales data")
...


# Define freshness check. If the asset's last materialization occurred
# more than 1 hour before the current time (lower_bound_delta), the check will fail
hourly_sales_freshness_check = dg.build_last_update_freshness_checks(
assets=[hourly_sales], lower_bound_delta=timedelta(hours=1)
)

# Define freshness check sensor
freshness_checks_sensor = dg.build_sensor_for_freshness_checks(
freshness_checks=hourly_sales_freshness_check
)

defs = dg.Definitions(
assets=[hourly_sales],
asset_checks=hourly_sales_freshness_check,
sensors=[freshness_checks_sensor],
)

External asset freshness

External assets are assets orchestrated by systems other than Dagster.

To run freshness checks on external assets, the checks need to know when the external assets were last updated. Emitting these update timestamps as values for the dagster/last_updated_timestamp observation metadata key allows Dagster to calculate whether the asset is overdue.

The example below defines a freshness check and adds a schedule to run the check periodically.

from datetime import timedelta

import dagster_snowflake as dg_snowflake

import dagster as dg


@dg.observable_source_asset
def hourly_sales(snowflake: dg_snowflake.SnowflakeResource):
table_name = "hourly_sales"
with snowflake.get_connection() as conn:
freshness_results = dg_snowflake.fetch_last_updated_timestamps(
snowflake_connection=conn.cursor(),
tables=[table_name],
schema="PUBLIC",
)
return dg.ObserveResult(
asset_key=table_name,
# Emit the asset's last update time as metadata
metadata={
"dagster/last_updated_timestamp": dg.MetadataValue.timestamp(
freshness_results[table_name]
)
},
)


# Define a schedule to run the freshness check
freshness_check_schedule = dg.ScheduleDefinition(
job=dg.define_asset_job(
"hourly_sales_observation_job",
selection=dg.AssetSelection.keys("hourly_sales"),
),
# Runs every minute. Usually, a much less frequent cadence is necessary,
# but a short cadence makes it easier to play around with this example.
cron_schedule="* * * * *",
)

# Define the freshness check
hourly_sales_freshness_check = dg.build_last_update_freshness_checks(
assets=[hourly_sales],
lower_bound_delta=timedelta(hours=1),
)


defs = dg.Definitions(
assets=[hourly_sales],
asset_checks=hourly_sales_freshness_check,
schedules=[freshness_check_schedule],
resources={
"snowflake": dg_snowflake.SnowflakeResource(
user=dg.EnvVar("SNOWFLAKE_USER"),
account=dg.EnvVar("SNOWFLAKE_ACCOUNT"),
password=dg.EnvVar("SNOWFLAKE_PASSWORD"),
)
},
)

Testing freshness with anomaly detection

note

Anomaly detection is a Dagster+ Pro feature.

Instead of applying policies on an asset-by-asset basis, Dagster+ Pro users can use build_anomaly_detection_freshness_checks to take advantage of a time series anomaly detection model to determine if data arrives later than expected.

from dagster_cloud.anomaly_detection import build_anomaly_detection_freshness_checks

import dagster as dg


@dg.observable_source_asset
def hourly_sales(): ...


freshness_checks = build_anomaly_detection_freshness_checks(
assets=[hourly_sales], params=None
)
note

If the asset hasn't been updated enough times, the check will pass with a message indicating that more data is needed to detect anomalies.

Alerting on overdue assets

note

Freshness check alerts are a Dagster+ feature.

In Dagster+, you can set up alerts to notify you when assets are overdue for an update. Refer to the Dagster+ alerting guide for more information.

Next steps