Skip to main content

Defining dependencies between partitioned assets

Now that you've seen how to model partitioned assets in different ways, you may want to define dependencies between the partitioned assets, or even between unpartitioned assets.

Partitioned assets in Dagster can have dependencies on other partitioned assets, allowing you to create complex data pipelines where the output of one partitioned asset feeds into another. Here's how it works:

  • A downstream asset can depend on one or more partitions of an upstream asset
  • The partitioning schemes don't need to be identical, but they should be compatible
note

This article assumes familiarity with assets and partitions.

Dependencies between different time-based partitions

The following example creates two partitions: daily_sales_data and daily_sales_summary, which can be executed at the same time in a single schedule.

Show example
import datetime
import os

import pandas as pd

import dagster as dg

# Create the PartitionDefinition,
# which will create a range of partitions from
# 2024-01-01 to the day before the current time
daily_partitions = dg.DailyPartitionsDefinition(start_date="2024-01-01")


# Define the partitioned asset
@dg.asset(partitions_def=daily_partitions)
def daily_sales_data(context: dg.AssetExecutionContext) -> None:
date = context.partition_key
# Simulate fetching daily sales data
df = pd.DataFrame(
{
"date": [date] * 10,
"sales": [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000],
}
)

os.makedirs("data/daily_sales", exist_ok=True)
filename = f"data/daily_sales/sales_{date}.csv"
df.to_csv(filename, index=False)

context.log.info(f"Daily sales data written to {filename}")


@dg.asset(
partitions_def=daily_partitions, # Use the daily partitioning scheme
deps=[daily_sales_data], # Define dependency on `daily_sales_data` asset
)
def daily_sales_summary(context):
partition_date_str = context.partition_key
# Read the CSV file for the given partition date
filename = f"data/daily_sales/sales_{partition_date_str}.csv"
df = pd.read_csv(filename)

# Summarize daily sales
summary = {
"date": partition_date_str,
"total_sales": df["sales"].sum(),
}

context.log.info(f"Daily sales summary for {partition_date_str}: {summary}")


# Create a partitioned asset job
daily_sales_job = dg.define_asset_job(
name="daily_sales_job",
selection=[daily_sales_data, daily_sales_summary],
)


# Create a schedule to run the job daily
@dg.schedule(
job=daily_sales_job,
cron_schedule="0 1 * * *", # Run at 1:00 AM every day
)
def daily_sales_schedule(context):
"""Process previous day's sales data."""
# Calculate the previous day's date
previous_day = context.scheduled_execution_time.date() - datetime.timedelta(days=1)
date = previous_day.strftime("%Y-%m-%d")
return dg.RunRequest(
run_key=date,
partition_key=date,
)


# Define the Definitions object
defs = dg.Definitions(
assets=[daily_sales_data, daily_sales_summary],
jobs=[daily_sales_job],
schedules=[daily_sales_schedule],
)

However, sometimes you might want to define dependencies between different time-based partitions. For example, you might want to aggregate daily data into a weekly report.

Consider the following example:

import datetime
import os

import pandas as pd

import dagster as dg

# Create the PartitionDefinition
daily_partitions = dg.DailyPartitionsDefinition(start_date="2024-01-01")
weekly_partitions = dg.WeeklyPartitionsDefinition(start_date="2024-01-01")


# Define the partitioned asset
@dg.asset(
partitions_def=daily_partitions,
automation_condition=dg.AutomationCondition.on_cron(cron_schedule="0 1 * * *"),
)
def daily_sales_data(context: dg.AssetExecutionContext):
date = context.partition_key
# Simulate fetching daily sales data
df = pd.DataFrame({"date": [date], "sales": [1000]})

os.makedirs("data/daily_sales", exist_ok=True)
filename = f"data/daily_sales/sales_{date}.csv"
df.to_csv(filename, index=False)

context.log.info(f"Daily sales data written to {filename}")


@dg.asset(
partitions_def=weekly_partitions,
automation_condition=dg.AutomationCondition.eager(),
deps=[daily_sales_data],
)
def weekly_sales_summary(context: dg.AssetExecutionContext):
week = context.partition_key
partition_key_range = context.asset_partition_key_range_for_input(
"daily_sales_data"
)
start_date = partition_key_range.start
end_date = partition_key_range.end
context.log.info(f"start_date: {start_date}, end_date: {end_date}")

df = pd.DataFrame()
for date in pd.date_range(start_date, end_date):
filename = f"data/daily_sales/sales_{date.strftime('%Y-%m-%d')}.csv"
df = pd.concat([df, pd.read_csv(filename)])
context.log.info(f"df: {df}")

weekly_summary = {
"week": week,
"total_sales": df["sales"].sum(),
}

context.log.info(f"weekly sales summary for {week}: {weekly_summary}")


# Define the Definitions object
defs = dg.Definitions(
assets=[daily_sales_data, weekly_sales_summary],
)

In this example:

  • We have a daily_sales_data asset partitioned by day, which will be executed daily.

  • The weekly_sales_summary asset depends on the daily_sales_data asset, which will be executed weekly.

    • In this asset, the weekly partition depends on all its parent partitions (all seven days of the week). We use context.asset_partition_key_range_for_input("daily_sales_data") to get a range of partition keys, which includes the start and end of the week.
  • To automate the execution of these assets:

    • First, we specify automation_condition=AutomationCondition.eager() to the weekly_sales_summary asset. This ensures it runs weekly after all seven daily partitions of daily_sales_data are up-to-date.
    • Second, we specify automation_condition=AutomationCondition.cron(cron_schedule="0 1 * * *") to the daily_sales_data asset. This ensures it runs daily.

Note: In a simpler example above, we manually set up a daily schedule for asset execution. For more complex dependency logic, it's recommended to use automation conditions instead of schedules. Automation conditions specify when an asset should run, which allows you to define execution criteria without custom scheduling logic. For more details, see Declarative Automation.

Dependencies between time-based partitions and un-partitioned assets

TODO

Dependencies between time-based and static partitions

Combining time-based and static partitions allows you to analyze data across both temporal and categorical dimensions. This is particularly useful for scenarios like regional time series analysis.

Dependencies between time-based and dynamic partitions

Dependencies between time-based partitions and un-partitioned assets

Integrating Dagster partitions with external systems: incremental models and dbt