Skip to main content

Automate your pipeline

There are several ways to automate pipelines and assets in Dagster.

In this step you will:

  • Add automation to assets to run when upstream assets are materialized.
  • Create a schedule to run a set of assets on a cron schedule.

1. Automate asset materialization

Ideally, the reporting assets created in the last step should refresh whenever the upstream data is updated. Dagster's declarative automation framework allows you do this by adding an automation condition to the asset definition.

Update the monthly_sales_performance asset to add the automation condition to the decorator:

@dg.asset(
partitions_def=monthly_partition,
compute_kind="duckdb",
group_name="analysis",
deps=[joined_data],
automation_condition=dg.AutomationCondition.eager(),
)
def monthly_sales_performance(
context: dg.AssetExecutionContext, duckdb: DuckDBResource
):
partition_date_str = context.partition_key
month_to_fetch = partition_date_str[:-3]

with duckdb.get_connection() as conn:
conn.execute(
f"""
create table if not exists monthly_sales_performance (
partition_date varchar,
rep_name varchar,
product varchar,
total_dollar_amount double
);

delete from monthly_sales_performance where partition_date = '{month_to_fetch}';

insert into monthly_sales_performance
select
'{month_to_fetch}' as partition_date,
rep_name,
product_name,
sum(dollar_amount) as total_dollar_amount
from joined_data where strftime(date, '%Y-%m') = '{month_to_fetch}'
group by '{month_to_fetch}', rep_name, product_name;
"""
)

preview_query = f"select * from monthly_sales_performance where partition_date = '{month_to_fetch}';"
preview_df = conn.execute(preview_query).fetchdf()
row_count = conn.execute(
f"""
select count(*)
from monthly_sales_performance
where partition_date = '{month_to_fetch}'
"""
).fetchone()
count = row_count[0] if row_count else 0

return dg.MaterializeResult(
metadata={
"row_count": dg.MetadataValue.int(count),
"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)),
}
)

Do the same thing for product_performance:

@dg.asset(
deps=[joined_data],
partitions_def=product_category_partition,
group_name="analysis",
compute_kind="duckdb",
automation_condition=dg.AutomationCondition.eager(),
)
def product_performance(context: dg.AssetExecutionContext, duckdb: DuckDBResource):
product_category_str = context.partition_key

with duckdb.get_connection() as conn:
conn.execute(
f"""
create table if not exists product_performance (
product_category varchar,
product_name varchar,
total_dollar_amount double,
total_units_sold double
);

delete from product_performance where product_category = '{product_category_str}';

insert into product_performance
select
'{product_category_str}' as product_category,
product_name,
sum(dollar_amount) as total_dollar_amount,
sum(quantity) as total_units_sold
from joined_data
where category = '{product_category_str}'
group by '{product_category_str}', product_name;
"""
)
preview_query = f"select * from product_performance where product_category = '{product_category_str}';"
preview_df = conn.execute(preview_query).fetchdf()
row_count = conn.execute(
f"""
SELECT COUNT(*)
FROM product_performance
WHERE product_category = '{product_category_str}';
"""
).fetchone()
count = row_count[0] if row_count else 0

return dg.MaterializeResult(
metadata={
"row_count": dg.MetadataValue.int(count),
"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)),
}
)

2. Scheduled jobs

Cron-based schedules are common in data orchestration. For our pipeline, assume that updated CSVs are uploaded to a file location at a specific time every week by an external process.

Copy the following code underneath the product performance asset:

weekly_update_schedule = dg.ScheduleDefinition(
name="analysis_update_job",
target=dg.AssetSelection.keys("joined_data").upstream(),
cron_schedule="0 0 * * 1", # every Monday at midnight
)

3. Enable and test automations

The final step is to enable the automations in the UI.

To accomplish this:

  1. Navigate to the Automation page.
  2. Select all automations.
  3. Using actions, start all automations.
  4. Select the analysis_update_job.
  5. Test the schedule and evaluate for any time in the dropdown menu.
  6. Open in Launchpad.

The job is now executing.

Additionally, if you navigate to the Runs tab, you should see that materializations for monthly_sales_performance and product_performance have run as well.

2048 resolution

Next steps