Automate your pipeline
There are several ways to automate pipelines and assets in Dagster.
In this step you will:
- Add automation to assets to run when upstream assets are materialized.
- Create a schedule to run a set of assets on a cron schedule.
1. Automate asset materialization
Ideally, the reporting assets created in the last step should refresh whenever the upstream data is updated. Dagster's declarative automation framework allows you do this by adding an automation condition to the asset definition.
Update the monthly_sales_performance
asset to add the automation condition to the decorator:
@dg.asset(
partitions_def=monthly_partition,
compute_kind="duckdb",
group_name="analysis",
deps=[joined_data],
automation_condition=dg.AutomationCondition.eager(),
)
def monthly_sales_performance(
context: dg.AssetExecutionContext, duckdb: DuckDBResource
):
partition_date_str = context.partition_key
month_to_fetch = partition_date_str[:-3]
with duckdb.get_connection() as conn:
conn.execute(
f"""
create table if not exists monthly_sales_performance (
partition_date varchar,
rep_name varchar,
product varchar,
total_dollar_amount double
);
delete from monthly_sales_performance where partition_date = '{month_to_fetch}';
insert into monthly_sales_performance
select
'{month_to_fetch}' as partition_date,
rep_name,
product_name,
sum(dollar_amount) as total_dollar_amount
from joined_data where strftime(date, '%Y-%m') = '{month_to_fetch}'
group by '{month_to_fetch}', rep_name, product_name;
"""
)
preview_query = f"select * from monthly_sales_performance where partition_date = '{month_to_fetch}';"
preview_df = conn.execute(preview_query).fetchdf()
row_count = conn.execute(
f"""
select count(*)
from monthly_sales_performance
where partition_date = '{month_to_fetch}'
"""
).fetchone()
count = row_count[0] if row_count else 0
return dg.MaterializeResult(
metadata={
"row_count": dg.MetadataValue.int(count),
"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)),
}
)
Do the same thing for product_performance
:
@dg.asset(
deps=[joined_data],
partitions_def=product_category_partition,
group_name="analysis",
compute_kind="duckdb",
automation_condition=dg.AutomationCondition.eager(),
)
def product_performance(context: dg.AssetExecutionContext, duckdb: DuckDBResource):
product_category_str = context.partition_key
with duckdb.get_connection() as conn:
conn.execute(
f"""
create table if not exists product_performance (
product_category varchar,
product_name varchar,
total_dollar_amount double,
total_units_sold double
);
delete from product_performance where product_category = '{product_category_str}';
insert into product_performance
select
'{product_category_str}' as product_category,
product_name,
sum(dollar_amount) as total_dollar_amount,
sum(quantity) as total_units_sold
from joined_data
where category = '{product_category_str}'
group by '{product_category_str}', product_name;
"""
)
preview_query = f"select * from product_performance where product_category = '{product_category_str}';"
preview_df = conn.execute(preview_query).fetchdf()
row_count = conn.execute(
f"""
SELECT COUNT(*)
FROM product_performance
WHERE product_category = '{product_category_str}';
"""
).fetchone()
count = row_count[0] if row_count else 0
return dg.MaterializeResult(
metadata={
"row_count": dg.MetadataValue.int(count),
"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)),
}
)
2. Scheduled jobs
Cron-based schedules are common in data orchestration. For our pipeline, assume that updated CSVs are uploaded to a file location at a specific time every week by an external process.
Copy the following code underneath the product performance
asset:
weekly_update_schedule = dg.ScheduleDefinition(
name="analysis_update_job",
target=dg.AssetSelection.keys("joined_data").upstream(),
cron_schedule="0 0 * * 1", # every Monday at midnight
)
3. Enable and test automations
The final step is to enable the automations in the UI.
To accomplish this:
- Navigate to the Automation page.
- Select all automations.
- Using actions, start all automations.
- Select the
analysis_update_job
. - Test the schedule and evaluate for any time in the dropdown menu.
- Open in Launchpad.
The job is now executing.
Additionally, if you navigate to the Runs tab, you should see that materializations for monthly_sales_performance
and product_performance
have run as well.
Next steps
- Continue this tutorial with adding a sensor based asset