Skip to main content

Kubernetes (dagster-k8s)

See also the Kubernetes deployment guide.

This library contains utilities for running Dagster with Kubernetes. This includes a Python API allowing the webserver to launch runs as Kubernetes Jobs, as well as a Helm chart you can use as the basis for a Dagster deployment on a Kubernetes cluster.

APIs

dagster_k8s.K8sRunLauncher RunLauncher

RunLauncher that starts a Kubernetes Job for each Dagster job run.

Encapsulates each run in a separate, isolated invocation of dagster-graphql.

You can configure a Dagster instance to use this RunLauncher by adding a section to your dagster.yaml like the following:

run_launcher:
module: dagster_k8s.launcher
class: K8sRunLauncher
config:
service_account_name: your_service_account
job_image: my_project/dagster_image:latest
instance_config_map: dagster-instance
postgres_password_secret: dagster-postgresql-secret
dagster_k8s.k8s_job_executor ExecutorDefinition

Executor which launches steps as Kubernetes Jobs.

To use the k8s_job_executor, set it as the executor_def when defining a job:

from dagster_k8s import k8s_job_executor

from dagster import job

@job(executor_def=k8s_job_executor)
def k8s_job():
pass

Then you can configure the executor with run config as follows:

execution:
config:
job_namespace: 'some-namespace'
image_pull_policy: ...
image_pull_secrets: ...
service_account_name: ...
env_config_maps: ...
env_secrets: ...
env_vars: ...
job_image: ... # leave out if using userDeployments
max_concurrent: ...

max_concurrent limits the number of pods that will execute concurrently for one run. By default there is no limit- it will maximally parallel as allowed by the DAG. Note that this is not a global limit.

Configuration set on the Kubernetes Jobs and Pods created by the K8sRunLauncher will also be set on Kubernetes Jobs and Pods created by the k8s_job_executor.

Configuration set using tags on a @job will only apply to the run level. For configuration to apply at each step it must be set using tags for each @op.

Ops

dagster_k8s.k8s_job_op = <dagster._core.definitions.op_definition.OpDefinition object>
experimental

This API may break in future versions, even between dot releases.

An op that runs a Kubernetes job using the k8s API.

Contrast with the k8s_job_executor, which runs each Dagster op in a Dagster job in its own k8s job.

This op may be useful when:

  • You need to orchestrate a command that isn’t a Dagster op (or isn’t written in Python)
  • You want to run the rest of a Dagster job using a specific executor, and only a single

For example:

from dagster_k8s import k8s_job_op

from dagster import job

first_op = k8s_job_op.configured(
\{
"image": "busybox",
"command": ["/bin/sh", "-c"],
"args": ["echo HELLO"],
},
name="first_op",
)
second_op = k8s_job_op.configured(
\{
"image": "busybox",
"command": ["/bin/sh", "-c"],
"args": ["echo GOODBYE"],
},
name="second_op",
)

@job
def full_job():
second_op(first_op())

You can create your own op with the same implementation by calling the execute_k8s_job function inside your own op.

The service account that is used to run this job should have the following RBAC permissions:

rules:
- apiGroups: ["batch"]
resources: ["jobs", "jobs/status"]
verbs: ["*"]
# The empty arg "" corresponds to the core API group
- apiGroups: [""]
resources: ["pods", "pods/log", "pods/status"]
verbs: ["*"]'
dagster_k8s.execute_k8s_job
experimental

This API may break in future versions, even between dot releases.

This function is a utility for executing a Kubernetes job from within a Dagster op.

Parameters:

  • image (str) – The image in which to launch the k8s job.
  • command (Optional[List[str]]) – The command to run in the container within the launched
  • args (Optional[List[str]]) – The args for the command for the container. Default: None.
  • namespace (Optional[str]) – Override the kubernetes namespace in which to run the k8s job.
  • image_pull_policy (Optional[str]) – Allows the image pull policy to be overridden, e.g. to
  • image_pull_secrets (Optional[List[Dict[str, str]]]) – Optionally, a list of dicts, each of
  • service_account_name (Optional[str]) – The name of the Kubernetes service account under which
  • env_secrets (Optional[List[str]]) – A list of custom Secret names from which to
  • env_vars (Optional[List[str]]) – A list of environment variables to inject into the Job.
  • volume_mounts (Optional[List[PermissivePermissive]]) – A list of volume mounts to include in the job’s
  • volumes (Optional[List[PermissivePermissive]]) – A list of volumes to include in the Job’s Pod. Default: []. See:
  • labels (Optional[Dict[str, str]]) – Additional labels that should be included in the Job’s Pod. See:
  • resources (Optional[Dict[str, Any]]) – https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
  • scheduler_name (Optional[str]) – Use a custom Kubernetes scheduler for launched Pods. See:
  • load_incluster_config (bool) – Whether the op is running within a k8s cluster. If True,
  • kubeconfig_file (Optional[str]) – The kubeconfig file from which to load config. Defaults to
  • timeout (Optional[int]) – Raise an exception if the op takes longer than this timeout in
  • container_config (Optional[Dict[str, Any]]) – Raw k8s config for the k8s pod’s main container
  • pod_template_spec_metadata (Optional[Dict[str, Any]]) – Raw k8s config for the k8s pod’s
  • pod_spec_config (Optional[Dict[str, Any]]) – Raw k8s config for the k8s pod’s pod spec
  • job_metadata (Optional[Dict[str, Any]]) – Raw k8s config for the k8s job’s metadata
  • job_spec_config (Optional[Dict[str, Any]]) – Raw k8s config for the k8s job’s job spec
  • k8s_job_name (Optional[str]) – Overrides the name of the k8s job. If not set, will be set
  • merge_behavior (Optional[K8sConfigMergeBehavior]) – How raw k8s config set on this op should
  • delete_failed_k8s_jobs (bool) – Whether to immediately delete failed Kubernetes jobs. If False,

Python API

The K8sRunLauncher allows webserver instances to be configured to launch new runs by starting per-run Kubernetes Jobs. To configure the K8sRunLauncher, your dagster.yaml should include a section like:

run_launcher:
module: dagster_k8s.launcher
class: K8sRunLauncher
config:
image_pull_secrets:
service_account_name: dagster
job_image: "my-company.com/image:latest"
dagster_home: "/opt/dagster/dagster_home"
postgres_password_secret: "dagster-postgresql-secret"
image_pull_policy: "IfNotPresent"
job_namespace: "dagster"
instance_config_map: "dagster-instance"
env_config_maps:
- "dagster-k8s-job-runner-env"
env_secrets:
- "dagster-k8s-some-secret"

Helm chart

For local dev (e.g., on kind or minikube):

helm install \
--set dagsterWebserver.image.repository="dagster.io/buildkite-test-image" \
--set dagsterWebserver.image.tag="py310-latest" \
--set job_runner.image.repository="dagster.io/buildkite-test-image" \
--set job_runner.image.tag="py310-latest" \
--set imagePullPolicy="IfNotPresent" \
dagster \
helm/dagster/

Upon installation, the Helm chart will provide instructions for port forwarding the Dagster webserver and Flower (if configured).

Running tests

To run the unit tests:

pytest -m "not integration"

To run the integration tests, you must have Docker, kind, and helm installed.

On macOS:

brew install kind
brew install helm

Docker must be running.

You may experience slow first test runs thanks to image pulls (run pytest -svv --fulltrace for visibility). Building images and loading them to the kind cluster is slow, and there is no visibility into the progress of the load.

NOTE: This process is quite slow, as it requires bootstrapping a local kind cluster with Docker images and the dagster-k8s Helm chart. For faster development, you can either:

  1. Keep a warm kind cluster
  2. Use a remote K8s cluster, e.g. via AWS EKS or GCP GKE Instructions are below.

Faster local development (with kind)

You may find that the kind cluster creation, image loading, and kind cluster creation loop is too slow for effective local dev.

You may bypass cluster creation and image loading in the following way. First add the --no-cleanup flag to your pytest invocation:

pytest --no-cleanup -s -vvv -m "not integration"

The tests will run as before, but the kind cluster will be left running after the tests are completed.

For subsequent test runs, you can run:

pytest --kind-cluster="cluster-d9971c84d44d47f382a2928c8c161faa" --existing-helm-namespace="dagster-test-95590a" -s -vvv -m "not integration"

This will bypass cluster creation, image loading, and Helm chart installation, for much faster tests.

The kind cluster name and Helm namespace for this command can be found in the logs, or retrieved via the respective CLIs, using kind get clusters and kubectl get namespaces. Note that for kubectl and helm to work correctly with a kind cluster, you should override your kubeconfig file location with:

kind get kubeconfig --name kind-test > /tmp/kubeconfig
export KUBECONFIG=/tmp/kubeconfig

Manual kind cluster setup

The test fixtures provided by dagster-k8s automate the process described below, but sometimes it’s useful to manually configure a kind cluster and load images onto it.

First, ensure you have a Docker image appropriate for your Python version. Run, from the root of the repo:

./python_modules/dagster-test/dagster_test/test_project/build.sh 3.7.6 \
dagster.io.priv/buildkite-test-image:py310-latest

In the above invocation, the Python majmin version should be appropriate for your desired tests.

Then run the following commands to create the cluster and load the image. Note that there is no feedback from the loading process.

kind create cluster --name kind-test
kind load docker-image --name kind-test dagster.io/dagster-docker-buildkite:py310-latest

If you are deploying the Helm chart with an in-cluster Postgres (rather than an external database), and/or with dagster-celery workers (and a RabbitMQ), you’ll also want to have images present for rabbitmq and postgresql:

docker pull docker.io/bitnami/rabbitmq
docker pull docker.io/bitnami/postgresql

kind load docker-image --name kind-test docker.io/bitnami/rabbitmq:latest
kind load docker-image --name kind-test docker.io/bitnami/postgresql:latest

Then you can run pytest as follows:

pytest --kind-cluster=kind-test

Faster local development (with an existing K8s cluster)

If you already have a development K8s cluster available, you can run tests on that cluster vs. running locally in kind.

For this to work, first build and deploy the test image to a registry available to your cluster. For example, with a private ECR repository:

./python_modules/dagster-test/dagster_test/test_project/build.sh 3.7.6
docker tag dagster-docker-buildkite:latest $AWS_ACCOUNT_ID.dkr.ecr.us-west-2.amazonaws.com/dagster-k8s-tests:2020-04-21T21-04-06

aws ecr get-login --no-include-email --region us-west-1 | sh
docker push $AWS_ACCOUNT_ID.dkr.ecr.us-west-1.amazonaws.com/dagster-k8s-tests:2020-04-21T21-04-06

Then, you can run tests on EKS with:

export DAGSTER_DOCKER_IMAGE_TAG="2020-04-21T21-04-06"
export DAGSTER_DOCKER_REPOSITORY="$AWS_ACCOUNT_ID.dkr.ecr.us-west-2.amazonaws.com"
export DAGSTER_DOCKER_IMAGE="dagster-k8s-tests"

# First run with --no-cleanup to leave Helm chart in place
pytest --cluster-provider="kubeconfig" --no-cleanup -s -vvv

# Subsequent runs against existing Helm chart
pytest --cluster-provider="kubeconfig" --existing-helm-namespace="dagster-test-\<some id>" -s -vvv

Validating Helm charts

To test / validate Helm charts, you can run:

helm install dagster --dry-run --debug helm/dagster
helm lint

Enabling GCR access from Minikube

To enable GCR access from Minikube:

kubectl create secret docker-registry element-dev-key \
--docker-server=https://gcr.io \
--docker-username=oauth2accesstoken \
--docker-password="$(gcloud auth print-access-token)" \
--docker-email=my@email.com

A note about PVCs

Both the Postgres and the RabbitMQ Helm charts will store credentials using Persistent Volume Claims, which will outlive test invocations and calls to helm uninstall. These must be deleted if you want to change credentials. To view your pvcs, run:

kubectl get pvc

Testing Redis

The Redis Helm chart installs w/ a randomly-generated password by default; turn this off:

helm install dagredis stable/redis --set usePassword=false

Then, to connect to your database from outside the cluster execute the following commands:

kubectl port-forward --namespace default svc/dagredis-master 6379:6379
redis-cli -h 127.0.0.1 -p 6379

Pipes

class dagster_k8s.PipesK8sClient

A pipes client for launching kubernetes pods.

By default context is injected via environment variables and messages are parsed out of the pod logs, with other logs forwarded to stdout of the orchestration process.

The first container within the containers list of the pod spec is expected (or set) to be the container prepared for pipes protocol communication.

Parameters:

  • env (Optional[Mapping[str, str]]) – An optional dict of environment variables to pass to the
  • context_injector (Optional[PipesContextInjectorPipesContextInjector]) – A context injector to use to inject
  • message_reader (Optional[PipesMessageReaderPipesMessageReader]) – A message reader to use to read messages
  • load_incluster_config (Optional[bool]) – Whether this client is expected to be running from inside
  • kubeconfig_file (Optional[str]) – The value to pass as the config_file argument to
  • kube_context (Optional[str]) – The value to pass as the context argument to
  • poll_interval (Optional[float]) – How many seconds to wait between requests when
run

Publish a kubernetes pod and wait for it to complete, enriched with the pipes protocol.

Parameters:

  • context (Union[OpExecutionContextOpExecutionContext, AssetExecutionContextAssetExecutionContext]) – The execution context.
  • image (Optional[str]) – The image to set the first container in the pod spec to use.
  • command (Optional[Union[str, Sequence[str]]]) – The command to set the first container in the pod spec to use.
  • namespace (Optional[str]) – Which kubernetes namespace to use, defaults to the current namespace if
  • env (Optional[Mapping[str,str]]) – A mapping of environment variable names to values to set on the first
  • base_pod_meta (Optional[Mapping[str, Any]]) – Raw k8s config for the k8s pod’s metadata
  • base_pod_spec (Optional[Mapping[str, Any]]) – Raw k8s config for the k8s pod’s pod spec
  • extras (Optional[PipesExtras]) – Extra values to pass along as part of the ext protocol.
  • context_injector (Optional[PipesContextInjectorPipesContextInjector]) – Override the default ext protocol context injection.
  • message_reader (Optional[PipesMessageReaderPipesMessageReader]) – Override the default ext protocol message reader.
  • ignore_containers (Optional[Set]) – Ignore certain containers from waiting for termination. Defaults to
  • enable_multi_container_logs (bool) – Whether or not to enable multi-container log consumption.

Returns: Wrapper containing results reported by the external process.

Return type: PipesClientCompletedInvocation

class dagster_k8s.PipesK8sPodLogsMessageReader

Message reader that reads messages from kubernetes pod logs.