Ensure data quality with asset checks
Data quality is critical in data pipelines. Inspecting individual assets ensures that data quality issues are caught before they affect the entire pipeline.
In Dagster, you define asset checks like you define assets. Asset checks run when an asset is materialized. In this step you will:
- Define an asset check
- Execute that asset check in the UI
1. Define an asset check
In this case we want to create a check to identify if there are any rows in joined_data
that are missing a value for rep_name
or product_name
.
Copy the following code beneath the joined_data
asset.
@dg.asset_check(asset=joined_data)
def missing_dimension_check(duckdb: DuckDBResource) -> dg.AssetCheckResult:
with duckdb.get_connection() as conn:
query_result = conn.execute(
"""
select count(*) from joined_data
where rep_name is null
or product_name is null
"""
).fetchone()
count = query_result[0] if query_result else 0
return dg.AssetCheckResult(
passed=count > 0, metadata={"missing dimensions": count}
)
2. Run the asset check
Before you can run the asset check, you need to add it to the Definitions object. Like assets, asset checks are added to their own list.
Your Definitions object should look like this now:
defs = dg.Definitions(
assets=[products,
sales_reps,
sales_data,
joined_data,
],
asset_checks=[missing_dimension_check],
resources={"duckdb": DuckDBResource(database="data/mydb.duckdb")},
)
Asset checks will run when an asset is materialized, but asset checks can also be executed manually in the UI:
- Reload your Definitions.
- Navigate to the Asset Details page for the
joined_data
asset. - Select the "Checks" tab.
- Click the Execute button for
missing_dimension_check
.
Next steps
- Continue this tutorial with Asset Checks