Skip to main content

About Automation

There are several ways to automate the execution of your data pipelines with Dagster.

The first system, and the most basic, is the Schedule, which responds to time.

Sensors are like schedules, but they respond to an external event defined by the user.

Asset Sensors are a special case of sensor that responds to changes in asset materialization as reported by the Event Log.

Finally, the Declarative Automation system is a more complex system that uses conditions on the assets to determine when to execute.

Schedules

In Dagster, a schedule is defined by the ScheduleDefinition class, or through the @schedule decorator. The @schedule decorator is more flexible than the ScheduleDefinition class, allowing you to configure job behavior or emit log messages as the schedule is processed.

Schedules were one of the first types of automation in Dagster, created before the introduction of Software-Defined Assets. As such, you may find that many of the examples can seem foreign if you are used to only working within the asset framework.

For more on how assets and ops inter-relate, read about Assets and Ops

The dagster-daemon process is responsible for submitting runs by checking each schedule at a regular interval to determine if it's time to execute the underlying job.

A schedule can be thought of as a wrapper around two pieces:

  • A JobDefinition, which is a set of assets to materialize or ops to execute.
  • A cron string, which describes the schedule.

Define a schedule using ScheduleDefinition

ecommerce_schedule = ScheduleDefinition(
job=ecommerce_job,
cron_schedule="15 5 * * 1-5",
)

By default, schedules aren't enabled. You can enable them by visiting the Automation tab and toggling the schedule, or set a default status to RUNNING when you define the schedule.

ecommerce_schedule = ScheduleDefinition(
job=ecommerce_job,
cron_schedule="15 5 * * 1-5",
default_status=DefaultScheduleStatus.RUNNING,
)

Define a schedule using @schedule

If you want more control over the schedule, you can use the @schedule decorator. In doing so, you are then responsible for either emitting a RunRequest or a SkipReason. You can also emit logs, which will be visible in the Dagster UI for a given schedule's tick history.

@schedule(cron_schedule="15 5 * * 1-5")
def ecommerce_schedule(context):
context.log.info("This log message will be visible in the Dagster UI.")
return RunRequest()