Create event-based pipelines with sensors
Sensors enable you to take action in response to events that occur either internally within Dagster or in external systems. They check for events at regular intervals and either perform an action or provide an explanation for why the action was skipped.
Examples of events include:
- a run completes in Dagster
- a run fails in Dagster
- a job materializes a specific asset
- a file appears in an s3 bucket
- an external system is down
Examples of actions include:
- launching a run
- sending a Slack message
- inserting a row into a database
An alternative to polling with sensors is to push events to Dagster using the Dagster API.
Prerequisites
Basic sensor
Sensors are defined with the @sensor
decorator. The following example includes a check_for_new_files
function that simulates finding new files. In a real scenario, this function would check an actual system or directory.
If the sensor finds new files, it starts a run of my_job
. If not, it skips the run and logs No new files found
in the Dagster UI.
import random
from typing import List
import dagster as dg
# Define the asset
@dg.asset
def my_asset(context: dg.AssetExecutionContext):
context.log.info("Hello, world!")
# Define asset job
my_job = dg.define_asset_job("my_job", selection=[my_asset])
# Define file check
def check_for_new_files() -> list[str]:
if random.random() > 0.5:
return ["file1", "file2"]
return []
# Define the sensor
@dg.sensor(
job=my_job,
minimum_interval_seconds=5,
default_status=dg.DefaultSensorStatus.RUNNING, # Sensor is turned on by default
)
def new_file_sensor():
new_files = check_for_new_files()
# New files, run `my_job`
if new_files:
for filename in new_files:
yield dg.RunRequest(run_key=filename)
# No new files, skip the run and log the reason
else:
yield dg.SkipReason("No new files found")
defs = dg.Definitions(assets=[my_asset], jobs=[my_job], sensors=[new_file_sensor])
Unless a sensor has a default_status
of DefaultSensorStatus.RUNNING
, it won't be enabled when first deployed to a Dagster instance. To find and enable the sensor, click Automation > Sensors in the Dagster UI.
Customizing intervals between evaluations
The minimum_interval_seconds
argument allows you to specify the minimum number of seconds that will elapse between sensor evaluations. This means that the sensor won't be evaluated more frequently than the specified interval.
It's important to note that this interval represents a minimum interval between runs of the sensor and not the exact frequency the sensor runs. If a sensor takes longer to complete than the specified interval, the next evaluation will be delayed accordingly.
# Sensor will be evaluated at least every 30 seconds
@dg.sensor(job=my_job, minimum_interval_seconds=30)
def new_file_sensor():
...
In this example, if the new_file_sensor
's evaluation function takes less than a second to run, you can expect the sensor to run consistently around every 30 seconds. However, if the evaluation function takes longer, the interval between evaluations will be longer.