Data freshness checks
Freshness checks provide a way to identify data assets that are overdue for an update. For example, you can use freshness checks to identify stale assets caused by:
- The pipeline hitting an error and failing
- Runs not being scheduled
- A backed up run queue
- Runs taking longer than expected to complete
Freshness checks can also communicate SLAs for their data freshness. For example, downstream asset consumers can determine how often assets are expected to be updated by looking at the defined checks.
Prerequisites
To follow the steps in this guide, you'll need familiarity with:
Getting started
To get started with freshness checks, follow these general steps:
-
Define a freshness check: Freshness checks are defined using
build_last_update_freshness_checks
, which utilizes an asset's last updated time to determine freshness.If using Dagster+ Pro, you can also use
build_anomaly_detection_freshness_checks
to define a freshness check that uses an anomaly detection model to determine freshness. -
Define a schedule or sensor: Defining a schedule or sensor (
build_sensor_for_freshness_checks
) is required to ensure the freshness check executes. If the check only runs after the asset has been materialized, the check won't be able to detect the times materialization fails. -
Pass the freshness check and schedule/sensor to the
Definitions
object: Freshness checks and the associated schedule or sensor must be added to aDefinitions
object for Dagster to recognize them. -
View the freshness check results in the Dagster UI: Freshness check results will appear in the UI, allowing you to track the results over time.
Materializable asset freshness
Materializable assets are assets materialized by Dagster. To calculate whether a materializable asset is overdue, Dagster uses the asset's last materialization timestamp.
The example below defines a freshness check on an asset that fails if the asset's latest materialization occurred more than one hour before the current time.
from datetime import timedelta
import dagster as dg
@dg.asset
def hourly_sales(context: dg.AssetExecutionContext):
context.log.info("Fetching and emitting hourly sales data")
...
# Define freshness check. If the asset's last materialization occurred
# more than 1 hour before the current time (lower_bound_delta), the check will fail
hourly_sales_freshness_check = dg.build_last_update_freshness_checks(
assets=[hourly_sales], lower_bound_delta=timedelta(hours=1)
)
# Define freshness check sensor
freshness_checks_sensor = dg.build_sensor_for_freshness_checks(
freshness_checks=hourly_sales_freshness_check
)
defs = dg.Definitions(
assets=[hourly_sales],
asset_checks=hourly_sales_freshness_check,
sensors=[freshness_checks_sensor],
)
External asset freshness
External assets are assets orchestrated by systems other than Dagster.
To run freshness checks on external assets, the checks need to know when the external assets were last updated. Emitting these update timestamps as values for the dagster/last_updated_timestamp
observation metadata key allows Dagster to calculate whether the asset is overdue.
The example below defines a freshness check and adds a schedule to run the check periodically.
from datetime import timedelta
import dagster_snowflake as dg_snowflake
import dagster as dg
@dg.observable_source_asset
def hourly_sales(snowflake: dg_snowflake.SnowflakeResource):
table_name = "hourly_sales"
with snowflake.get_connection() as conn:
freshness_results = dg_snowflake.fetch_last_updated_timestamps(
snowflake_connection=conn.cursor(),
tables=[table_name],
schema="PUBLIC",
)
return dg.ObserveResult(
asset_key=table_name,
# Emit the asset's last update time as metadata
metadata={
"dagster/last_updated_timestamp": dg.MetadataValue.timestamp(
freshness_results[table_name]
)
},
)
# Define a schedule to run the freshness check
freshness_check_schedule = dg.ScheduleDefinition(
job=dg.define_asset_job(
"hourly_sales_observation_job",
selection=dg.AssetSelection.keys("hourly_sales"),
),
# Runs every minute. Usually, a much less frequent cadence is necessary,
# but a short cadence makes it easier to play around with this example.
cron_schedule="* * * * *",
)
# Define the freshness check
hourly_sales_freshness_check = dg.build_last_update_freshness_checks(
assets=[hourly_sales],
lower_bound_delta=timedelta(hours=1),
)
defs = dg.Definitions(
assets=[hourly_sales],
asset_checks=hourly_sales_freshness_check,
schedules=[freshness_check_schedule],
resources={
"snowflake": dg_snowflake.SnowflakeResource(
user=dg.EnvVar("SNOWFLAKE_USER"),
account=dg.EnvVar("SNOWFLAKE_ACCOUNT"),
password=dg.EnvVar("SNOWFLAKE_PASSWORD"),
)
},
)
Testing freshness with anomaly detection
Anomaly detection is a Dagster+ Pro feature.
Instead of applying policies on an asset-by-asset basis, Dagster+ Pro users can use build_anomaly_detection_freshness_checks
to take advantage of a time series anomaly detection model to determine if data arrives later than expected.
from dagster_cloud.anomaly_detection import build_anomaly_detection_freshness_checks
import dagster as dg
@dg.observable_source_asset
def hourly_sales(): ...
freshness_checks = build_anomaly_detection_freshness_checks(
assets=[hourly_sales], params=None
)
If the asset hasn't been updated enough times, the check will pass with a message indicating that more data is needed to detect anomalies.
Alerting on overdue assets
Freshness check alerts are a Dagster+ feature.
In Dagster+, you can set up alerts to notify you when assets are overdue for an update. Refer to the Dagster+ alerting guide for more information.
Next steps
- Explore more asset checks
- Explore how to raise alerts when assets are overdue (Dagster+ Pro)