Orchestration on Celery + Kubernetes
APIs
- dagster_celery_k8s.CeleryK8sRunLauncher RunLauncher
In contrast to the
K8sRunLauncher
, which launches dagster runs as single K8s Jobs, this run launcher is intended for use in concert with dagster_celery_k8s.celery_k8s_job_executor()dagster_celery_k8s.celery_k8s_job_executor()
.With this run launcher, execution is delegated to:
- A run worker Kubernetes Job, which traverses the dagster run execution plan and submits steps to Celery queues for execution;
- The step executions which are submitted to Celery queues are picked up by Celery workers,
and each step execution spawns a step execution Kubernetes Job. See the implementation
defined in
dagster_celery_k8.executor.create_k8s_job_task()
.
You can configure a Dagster instance to use this RunLauncher by adding a section to your
dagster.yaml
like the following:run_launcher:
module: dagster_k8s.launcher
class: CeleryK8sRunLauncher
config:
instance_config_map: "dagster-k8s-instance-config-map"
dagster_home: "/some/path"
postgres_password_secret: "dagster-k8s-pg-password"
broker: "some_celery_broker_url"
backend: "some_celery_backend_url"
- dagster_celery_k8s.celery_k8s_job_executor ExecutorDefinition
Celery-based executor which launches tasks as Kubernetes Jobs.
The Celery executor exposes config settings for the underlying Celery app under the
config_source
key. This config corresponds to the “new lowercase settings” introduced in Celery version 4.0 and the object constructed from config will be passed to thecelery.Celery
constructor as itsconfig_source
argument. (See https://docs.celeryq.dev/en/stable/userguide/configuration.html for details.)The executor also exposes the
broker
, backend, andinclude
arguments to thecelery.Celery
constructor.In the most common case, you may want to modify the
broker
andbackend
(e.g., to use Redis instead of RabbitMQ). We expect thatconfig_source
will be less frequently modified, but that when op executions are especially fast or slow, or when there are different requirements around idempotence or retry, it may make sense to execute dagster jobs with variations on these settings.To use the celery_k8s_job_executor, set it as the executor_def when defining a job:
from dagster import job
from dagster_celery_k8s.executor import celery_k8s_job_executor
@job(executor_def=celery_k8s_job_executor)
def celery_enabled_job():
passThen you can configure the executor as follows:
execution:
config:
job_image: 'my_repo.com/image_name:latest'
job_namespace: 'some-namespace'
broker: 'pyamqp://guest@localhost//' # Optional[str]: The URL of the Celery broker
backend: 'rpc://' # Optional[str]: The URL of the Celery results backend
include: ['my_module'] # Optional[List[str]]: Modules every worker should import
config_source: # Dict[str, Any]: Any additional parameters to pass to the
#... # Celery workers. This dict will be passed as the `config_source`
#... # argument of celery.Celery().Note that the YAML you provide here must align with the configuration with which the Celery workers on which you hope to run were started. If, for example, you point the executor at a different broker than the one your workers are listening to, the workers will never be able to pick up tasks for execution.
In deployments where the celery_k8s_job_executor is used all appropriate celery and dagster_celery commands must be invoked with the -A dagster_celery_k8s.app argument.