Cron-based schedules
Schedules enable automated execution of jobs at specified intervals. These intervals can range from common frequencies like hourly, daily, or weekly, to more intricate patterns defined using cron expressions.
Prerequisites
Basic schedule
A basic schedule is defined by a JobDefinition
and a cron_schedule
using the ScheduleDefinition
class. A job can be thought of as a selection of assets or operations executed together.
import dagster as dg
@dg.asset
def customer_data(): ...
@dg.asset
def sales_report(): ...
daily_refresh_job = dg.define_asset_job(
"daily_refresh", selection=["customer_data", "sales_report"]
)
daily_schedule = dg.ScheduleDefinition(
job=daily_refresh_job,
cron_schedule="0 0 * * *", # Runs at midnight daily
)
defs = dg.Definitions(
assets=[customer_data, sales_report],
jobs=[daily_refresh_job],
schedules=[daily_schedule],
)
Run schedules in a different timezone
By default, schedules without a timezone will run in Coordinated Universal Time (UTC). To run a schedule in a different timezone, set the timezone
parameter:
daily_schedule = ScheduleDefinition(
job=daily_refresh_job,
cron_schedule="0 0 * * *",
timezone="America/Los_Angeles",
)
Create schedules from partitions
If using partitions and jobs, you can create a schedule using the partition with build_schedule_from_partitioned_job
. The schedule will execute at the same cadence specified by the partition definition.
- Assets
- Ops
If you have a partitioned asset and job:
import dagster as dg
# Daily partition
daily_partition = dg.DailyPartitionsDefinition(start_date="2024-05-20")
@dg.asset(partitions_def=daily_partition)
def daily_asset(): ...
# Define the asset job
partitioned_asset_job = dg.define_asset_job("partitioned_job", selection=[daily_asset])
# This schedule will run daily
asset_partitioned_schedule = dg.build_schedule_from_partitioned_job(
partitioned_asset_job,
)
If you have a partitioned op job:
from datetime import datetime
import dagster as dg
class ProcessDateConfig(dg.Config):
date: str
@dg.daily_partitioned_config(start_date=datetime(2024, 1, 1))
def partitioned_config(start: datetime, _end: datetime): ...
@dg.op
def process_data_for_date(context: dg.OpExecutionContext, config: ProcessDateConfig):
date = config.date
context.log.info(f"processing data for {date}")
# Define the job
@dg.job(config=partitioned_config)
def partitioned_op_job(): ...
# Create the schedule from the partition
partitioned_op_schedule = dg.build_schedule_from_partitioned_job(
partitioned_op_job,
)
Next steps
By understanding and effectively using these automation methods, you can build more efficient data pipelines that respond to your specific needs and constraints:
- Learn more about schedules in Understanding automation
- React to events with sensors
- Explore Declarative Automation as an alternative to schedules