Asset Checks
Dagster allows you to define and execute checks on your software-defined assets. Each asset check verifies some property of a data asset, e.g. that is has no null values in a particular column.
- @dagster.asset_check
Create a definition for how to execute an asset check.
Parameters:
- asset (Union[AssetKeyAssetKey, Sequence[str], str, AssetsDefinitionAssetsDefinition, SourceAssetSourceAsset]) – The
- name (Optional[str]) – The name of the check. If not specified, the name of the decorated
- description (Optional[str]) – The description of the check.
- blocking (bool) – When enabled, runs that include this check and any downstream assets that
- additional_ins (Optional[Mapping[str, AssetInAssetIn]]) – A mapping from input name to
- additional_deps (Optional[Iterable[CoercibleToAssetDep]]) – Assets that are upstream
- required_resource_keys (Optional[Set[str]]) – A set of keys for resources that are required
- config_schema (Optional[ConfigSchemaConfigSchema) – The configuration schema for the check’s underlying
- op_tags (Optional[Dict[str, Any]]) – A dictionary of tags for the op that executes the check.
- compute_kind (Optional[str]) – A string to represent the kind of computation that executes
- retry_policy (Optional[RetryPolicyRetryPolicy]) – The retry policy for the op that executes the check.
- metadata (Optional[Mapping[str, Any]]) – A dictionary of static metadata for the check.
- automation_condition (Optional[AutomationConditionAutomationCondition]) – An AutomationCondition which determines
Produces an AssetChecksDefinition
AssetChecksDefinition
object.Example:
from dagster import asset, asset_check, AssetCheckResult
@asset
def my_asset() -> None:
...
@asset_check(asset=my_asset, description="Check that my asset has enough rows")
def my_asset_has_enough_rows() -> AssetCheckResult:
num_rows = ...
return AssetCheckResult(passed=num_rows > 5, metadata=\{"num_rows": num_rows})Example with a DataFrame Output:
from dagster import asset, asset_check, AssetCheckResult
from pandas import DataFrame
@asset
def my_asset() -> DataFrame:
...
@asset_check(asset=my_asset, description="Check that my asset has enough rows")
def my_asset_has_enough_rows(my_asset: DataFrame) -> AssetCheckResult:
num_rows = my_asset.shape[0]
return AssetCheckResult(passed=num_rows > 5, metadata=\{"num_rows": num_rows})
- class dagster.AssetCheckResult
The result of an asset check.
- check_name
The name of the check.
Type: Optional[str]
- passed
The pass/fail result of the check.
Type: bool
- metadata
Arbitrary metadata about the asset. Keys are displayed string labels, and values are one of the following: string, float, int, JSON-serializable dict, JSON-serializable list, and one of the data classes returned by a MetadataValue static method.
Type: Optional[Dict[str, RawMetadataValue]]
- severity
Severity of the check. Defaults to ERROR.
Type: AssetCheckSeverity
- description
A text description of the result of the check evaluation.
Type: Optional[str]
- class dagster.AssetCheckSpec
Defines information about an asset check, except how to execute it.
AssetCheckSpec is often used as an argument to decorators that decorator a function that can execute multiple checks - e.g. @asset, and @multi_asset. It defines one of the checks that will be executed inside that function.
Parameters:
- name (str) – Name of the check.
- asset (Union[AssetKeyAssetKey, Sequence[str], str, AssetsDefinitionAssetsDefinition, SourceAssetSourceAsset]) – The asset that
- description (Optional[str]) – Description for the check.
- additional_deps (Optional[Iterable[AssetDepAssetDep]]) – Additional dependencies for the check. The
- metadata (Optional[Mapping[str, Any]]) – A dict of static metadata for this asset check.
- class dagster.AssetCheckSeverity
Severity level for an AssetCheckResult.
- WARN: a potential issue with the asset
- ERROR: a definite issue with the asset
Severity does not impact execution of the asset or downstream assets.
- class dagster.AssetCheckKey
Check names are expected to be unique per-asset. Thus, this combination of asset key and check name uniquely identifies an asset check within a deployment.
- @dagster.multi_asset_check
Defines a set of asset checks that can be executed together with the same op.
Parameters:
- specs (Sequence[AssetCheckSpecAssetCheckSpec]) – Specs for the asset checks.
- name (Optional[str]) – The name of the op. If not specified, the name of the decorated
- description (Optional[str]) – Description of the op.
- required_resource_keys (Optional[Set[str]]) – A set of keys for resources that are required
- config_schema (Optional[ConfigSchemaConfigSchema) – The configuration schema for the asset checks’ underlying
- op_tags (Optional[Dict[str, Any]]) – A dictionary of tags for the op that executes the checks.
- compute_kind (Optional[str]) – A string to represent the kind of computation that executes
- retry_policy (Optional[RetryPolicyRetryPolicy]) – The retry policy for the op that executes the checks.
- can_subset (bool) – Whether the op can emit results for a subset of the asset checks
Examples:
@multi_asset_check(
specs=[
AssetCheckSpec("enough_rows", asset="asset1"),
AssetCheckSpec("no_dupes", asset="asset1"),
AssetCheckSpec("enough_rows", asset="asset2"),
],
)
def checks():
yield AssetCheckResult(passed=True, asset_key="asset1", check_name="enough_rows")
yield AssetCheckResult(passed=False, asset_key="asset1", check_name="no_dupes")
yield AssetCheckResult(passed=True, asset_key="asset2", check_name="enough_rows")
- dagster.load_asset_checks_from_modules
Constructs a list of asset checks from the given modules. This is most often used in conjunction with a call to load_assets_from_modules.
Parameters:
- modules (Iterable[ModuleType]) – The Python modules to look for checks inside.
- asset_key_prefix (Optional[Union[str, Sequence[str]]]) – The prefix for the asset keys targeted by the loaded checks. This should match the
Returns: A list containing asset checks defined in the given modules.Return type: Sequence[AssetChecksDefinition]
- dagster.load_asset_checks_from_current_module
Constructs a list of asset checks from the module where this function is called. This is most often used in conjunction with a call to load_assets_from_current_module.
Parameters: asset_key_prefix (Optional[Union[str, Sequence[str]]]) – The prefix for the asset keys targeted by the loaded checks. This should match the key_prefix argument to load_assets_from_current_module.Returns: A list containing asset checks defined in the current module.Return type: Sequence[AssetChecksDefinition]
- dagster.load_asset_checks_from_package_module
Constructs a list of asset checks from all sub-modules of the given package module. This is most often used in conjunction with a call to load_assets_from_package_module.
Parameters:
- package_module (ModuleType) – The Python module to look for checks inside.
- asset_key_prefix (Optional[Union[str, Sequence[str]]]) – The prefix for the asset keys targeted by the loaded checks. This should match the
Returns: A list containing asset checks defined in the package.Return type: Sequence[AssetChecksDefinition]
- dagster.load_asset_checks_from_package_name
Constructs a list of asset checks from all sub-modules of the given package. This is most often used in conjunction with a call to load_assets_from_package_name.
Parameters:
- package_name (str) – The name of the Python package to look for checks inside.
- asset_key_prefix (Optional[Union[str, Sequence[str]]]) – The prefix for the asset keys targeted by the loaded checks. This should match the
Returns: A list containing asset checks defined in the package.Return type: Sequence[AssetChecksDefinition]
- class dagster.AssetChecksDefinition
Defines a set of checks that are produced by the same op or op graph.
AssetChecksDefinition should not be instantiated directly, but rather produced using the @asset_check decorator or AssetChecksDefinition.create method.
- dagster.build_last_update_freshness_checks
- experimental
This API may break in future versions, even between dot releases.
Constructs an AssetChecksDefinition that checks the freshness of the provided assets.
This check passes if the asset is found to be “fresh”, and fails if the asset is found to be “overdue”. An asset is considered fresh if a record (i.e. a materialization or observation) exists with a timestamp greater than the “lower bound” derived from the parameters of this function.
deadline_cron is a cron schedule that defines the deadline for when we should expect the asset to arrive by; if not provided, we consider the deadline to be the execution time of the check. lower_bound_delta is a timedelta that defines the lower bound for when a record could have arrived by. If the most recent recent record’s timestamp is earlier than deadline-lower_bound_delta, the asset is considered overdue.
Let’s use two examples, one with a deadline_cron set and one without. Let’s say I have an asset which runs on a schedule every day at 8:00 AM UTC, and usually takes around 45 minutes to complete. To account for operational delays, I would expect the asset to be done materializing every day by 9:00 AM UTC. I would set the deadline_cron to “0 9 * * *”, and the lower_bound_delta to “45 minutes”. This would mean that starting at 9:00 AM, this check will expect a materialization record to have been created no earlier than 8:15 AM. Note that if the check runs at 8:59 AM, the deadline has not yet passed, and we’ll instead be checking for the most recently passed deadline, which is yesterday. Let’s say I have an observable source asset on a data source which I expect should never be more than 3 hours out of date. In this case, there’s no fixed schedule for when the data should be updated, so I would not provide a deadline_cron. Instead, I would set the lower_bound_delta parameter to “3 hours”. This would mean that the check will expect the most recent observation record to indicate data no older than 3 hours, relative to the current time, regardless of when it runs.
The check result will contain the following metadata: “dagster/freshness_params”: A dictionary containing the parameters used to construct the check “dagster/last_updated_time”: The time of the most recent update to the asset “dagster/overdue_seconds”: (Only present if asset is overdue) The number of seconds that the asset is overdue by. “dagster/overdue_deadline_timestamp”: The timestamp that we are expecting the asset to have arrived by. In the case of a provided deadline_cron, this is the timestamp of the most recent tick of the cron schedule. In the case of no deadline_cron, this is the current time.
Examples:
# Example 1: Assets that are expected to be updated every day within 45 minutes of
# 9:00 AM UTC
from dagster import build_last_update_freshness_checks, AssetKey
from .somewhere import my_daily_scheduled_assets_def
checks_def = build_last_update_freshness_checks(
[my_daily_scheduled_assets_def, AssetKey("my_other_daily_asset_key")],
lower_bound_delta=datetime.timedelta(minutes=45),
deadline_cron="0 9 * * *",
)
# Example 2: Assets that are expected to be updated within 3 hours of the current time
from dagster import build_last_update_freshness_checks, AssetKey
from .somewhere import my_observable_source_asset
checks_def = build_last_update_freshness_checks(
[my_observable_source_asset, AssetKey("my_other_observable_asset_key")],
lower_bound_delta=datetime.timedelta(hours=3),
)Parameters:
- assets (Sequence[Union[CoercibleToAssetKey, AssetsDefinitionAssetsDefinition, SourceAssetSourceAsset]) – The assets to
- lower_bound_delta (datetime.timedelta) – The check will pass if the asset was updated within
- deadline_cron (Optional[str]) – Defines the deadline for when we should start checking
- timezone (Optional[str]) – The timezone to use when calculating freshness and deadline. If
Returns: AssetChecksDefinition objects which execute freshness checks for the provided assets.
Return type: Sequence[AssetChecksDefinition]
- dagster.build_time_partition_freshness_checks
- experimental
This API may break in future versions, even between dot releases.
Construct an AssetChecksDefinition that checks the freshness of the provided assets.
This check passes if the asset is considered “fresh” by the time that execution begins. We consider an asset to be “fresh” if there exists a record for the most recent partition, once the deadline has passed.
deadline_cron is a cron schedule that defines the deadline for when we should expect the most recent partition to arrive by. Once a tick of the cron schedule has passed, this check will fail if the most recent partition has not been observed/materialized.
Let’s say I have a daily-partitioned asset which runs every day at 8:00 AM UTC, and takes around 45 minutes to complete. To account for operational delays, I would expect the asset to be done materializing every day by 9:00 AM UTC. I would set the deadline_cron to “0 9 * * *”. This means that starting at 9:00 AM, this check will expect a record to exist for the previous day’s partition. Note that if the check runs at 8:59 AM, the deadline has not yet passed, and we’ll instead be checking for the most recently passed deadline, which is yesterday (meaning the partition representing the day before yesterday).
The timestamp of an observation record is the timestamp indicated by the “dagster/last_updated_timestamp” metadata key. The timestamp of a materialization record is the timestamp at which that record was created.
The check will fail at runtime if a non-time-window partitioned asset is passed in.
The check result will contain the following metadata: “dagster/freshness_params”: A dictionary containing the parameters used to construct the check. “dagster/last_updated_time”: (Only present if the asset has been observed/materialized before) The time of the most recent update to the asset. “dagster/overdue_seconds”: (Only present if asset is overdue) The number of seconds that the asset is overdue by. “dagster/overdue_deadline_timestamp”: The timestamp that we are expecting the asset to have arrived by. This is the timestamp of the most recent tick of the cron schedule.
Examples:
from dagster import build_time_partition_freshness_checks, AssetKey
# A daily partitioned asset that is expected to be updated every day within 45 minutes
# of 9:00 AM UTC
from .somewhere import my_daily_scheduled_assets_def
checks_def = build_time_partition_freshness_checks(
[my_daily_scheduled_assets_def],
deadline_cron="0 9 * * *",
)Parameters:
- assets (Sequence[Union[CoercibleToAssetKey, AssetsDefinitionAssetsDefinition, SourceAssetSourceAsset]) – The assets to
- deadline_cron (str) – The check will pass if the partition time window most recently
- timezone (Optional[str]) – The timezone to use when calculating freshness and deadline. If
Returns: AssetChecksDefinition objects which execute freshness checks for the provided assets.
Return type: Sequence[AssetChecksDefinition]
- dagster.build_sensor_for_freshness_checks
- experimental
This API may break in future versions, even between dot releases.
Builds a sensor which kicks off evaluation of freshness checks.
This sensor will kick off an execution of a check in the following cases:
- The check has never been executed before.
- The check has been executed before, and the previous result was a success, but it is again possible for the check to be overdue based on the dagster/fresh_until_timestamp metadata on the check result.
Note that we will not execute if:
- The freshness check has been executed before, and the previous result was a failure. This is because whichever run materializes/observes the run to bring the check back to a passing state will end up also running the check anyway, so until that run occurs, there’s no point in evaluating the check.
- The freshness check has been executed before, and the previous result was a success, but it is not possible for the check to be overdue based on the dagster/fresh_until_timestamp metadata on the check result. Since the check cannot be overdue, we know the check result would not change with an additional execution.
Parameters:
- freshness_checks (Sequence[AssetChecksDefinitionAssetChecksDefinition]) – The freshness checks to evaluate.
- minimum_interval_seconds (Optional[int]) – The duration in seconds between evaluations of the sensor.
- name (Optional[str]) – The name of the sensor. Defaults to “freshness_check_sensor”, but a
- default_status (Optional[DefaultSensorStatus]) – The default status of the sensor. Defaults
Returns: The sensor that kicks off freshness evaluations.Return type: SensorDefinition
- dagster.build_column_schema_change_checks
- experimental
This API may break in future versions, even between dot releases.
Returns asset checks that pass if the column schema of the asset’s latest materialization is the same as the column schema of the asset’s previous materialization.
Parameters:
- assets (Sequence[Union[AssetKeyAssetKey, str, AssetsDefinitionAssetsDefinition, SourceAssetSourceAsset]]) – The assets to create
- severity (AssetCheckSeverityAssetCheckSeverity) – The severity if the check fails. Defaults to WARN.
Returns: Sequence[AssetsChecksDefinition]
- dagster.build_metadata_bounds_checks
- experimental
This API may break in future versions, even between dot releases.
Returns asset checks that pass if the metadata value of the asset’s latest materialization is within the specified range.
Parameters:
- assets (Sequence[Union[AssetKeyAssetKey, str, AssetsDefinitionAssetsDefinition, SourceAssetSourceAsset]]) – The assets to create
- severity (AssetCheckSeverityAssetCheckSeverity) – The severity if the check fails. Defaults to WARN.
- metadata_key (str) – The metadata key to check.
- min_value (Optional[Union[int, float]]) – The minimum value to check for. If None, no minimum
- max_value (Optional[Union[int, float]]) – The maximum value to check for. If None, no maximum
- exclusive_min (bool) – If True, the check will fail if the metadata value is equal to min_value.
- exclusive_max (bool) – If True, the check will fail if the metadata value is equal to max_value.
Returns: Sequence[AssetsChecksDefinition]