Skip to main content

Detect and restart crashed workers with run monitoring

Dagster can detect hanging runs and restart crashed run workers. Using run monitoring requires:

  • Running the Dagster Daemon
  • Enabling run monitoring in the Dagster Instance:
# Opt in to run monitoring
run_monitoring:
enabled: true
# values below are the defaults, and don't need to be specified except to override them
start_timeout_seconds: 180
cancel_timeout_seconds: 180
max_resume_run_attempts: 3 # experimental if above 0
poll_interval_seconds: 120
note

In Dagster+, run monitoring is always enabled and can be configured in deployment settings

Run start timeouts

When Dagster launches a run, the run stays in STARTING status until the run worker spins up and marks the run as STARTED. In the event that some failure causes the run worker to not spin up, the run might be stuck in STARTING status. The start_timeout_seconds offers a time limit for how long runs can hang in this state before being marked as failed.

Run cancelation timeouts

When Dagster terminates a run, the run moves into CANCELING status and sends a termination signal to the run worker. When the run worker cleans up its resources, it moves into CANCELED status. In the event that some failure causes the run worker to not spin down cleanly, the run might be stuck in CANCELING status. The cancel_timeout_seconds offers a time limit for how long runs can hang in this state before being marked as canceled.

General run timeouts

After a run is marked as STARTED, it may hang indefinitely for various reasons (user API errors, network issues, etc.). You can configure a maximum runtime for every run in a deployment by setting the run_monitoring.max_runtime_seconds field in your dagster.yaml or Dagster+ deployment settings to the maximum runtime in seconds. If a run exceeds this timeout and run monitoring is enabled, it will be marked as failed. The dagster/max_runtime tag can also be used to set a timeout in seconds on a per-run basis.

For example, to configure a maximum of 2 hours for every run in your deployment:

run_monitoring:
enabled: true
max_runtime_seconds: 7200

or in Dagster+, add the following to your deployment settings:

run_monitoring:
max_runtime_seconds: 7200

The below code example shows how to set a run timeout of 10 seconds on a per-job basis:

from dagster import define_asset_job, job


@job(tags={"dagster/max_runtime": 10})
def my_job(): ...


asset_job = define_asset_job(
name="some_job", selection="*", tags={"dagster/max_runtime": 10}
)
# end_timeout

Detecting run worker crashes

note

Detecting run worker crashes only works when using a run launcher other than the DefaultRunLauncher.

It's possible for a run worker process to crash during a run. This can happen for a variety of reasons (the host it's running on could go down, it could run out of memory, etc.). Without the monitoring daemon, there are two possible outcomes, neither desirable:

  • If the run worker was able to catch the interrupt, it will mark the run as failed
  • If the run worker goes down without a grace period, the run could be left hanging in STARTED status

If a run worker crashes, the run it's managing can hang. The monitoring daemon can run health checks on run workers for all active runs to detect this. If a failed run worker is detected (e.g. by the K8s Job having a non-zero exit code), the run is either marked as failed or resumed (see below).

Resuming runs after run worker crashes (Experimental)

This feature is experimental and currently only supported when using:

The monitoring daemon handles these by performing health checks on the run workers. If a failure is detected, the daemon can launch a new run worker which resumes execution of the existing run. The run worker crash will be show in the event log, and the run will continue to completion. If the run worker continues to crash, the daemon will mark the run as failed after the configured number of attempts.

To enable, set max_resume_run_attempts to a value greater than 0.