Skip to main content

Databricks (dagster-databricks)

The dagster_databricks package provides these main pieces of functionality:

  • A resource, databricks_pyspark_step_launcher, which will execute a op within a Databricks
  • An op factory, create_databricks_run_now_op, which creates an op that launches an existing
  • A op factory, create_databricks_submit_run_op, which creates an op that submits a one-time run

Note that, for the databricks_pyspark_step_launcher, either S3 or Azure Data Lake Storage config must be specified for ops to succeed, and the credentials for this storage must also be stored as a Databricks Secret and stored in the resource config so that the Databricks cluster can access storage.

APIs

Resources

dagster_databricks.DatabricksClientResource ResourceDefinition

Resource which provides a Python client for interacting with Databricks within an op or asset.

class dagster_databricks.DatabricksClient

A thin wrapper over the Databricks REST API.

property workspace_client

Retrieve a reference to the underlying Databricks Workspace client. For more information, see the Databricks SDK for Python.

Examples:
from dagster import op
from databricks.sdk import WorkspaceClient

@op(required_resource_keys=\{"databricks_client"})
def op1(context):
# Initialize the Databricks Jobs API
client = context.resources.databricks_client.api_client

# Example 1: Run a Databricks job with some parameters.
client.jobs.run_now(...)

# Example 2: Trigger a one-time run of a Databricks workload.
client.jobs.submit(...)

# Example 3: Get an existing run.
client.jobs.get_run(...)

# Example 4: Cancel a run.
client.jobs.cancel_run(...)

Returns: The authenticated Databricks SDK Workspace Client.Return type: WorkspaceClient

Ops

dagster_databricks.create_databricks_run_now_op

Creates an op that launches an existing databricks job.

As config, the op accepts a blob of the form described in Databricks’ Job API: https://docs.databricks.com/api/workspace/jobs/runnow. The only required field is job_id, which is the ID of the job to be executed. Additional fields can be used to specify override parameters for the Databricks Job.

Parameters:

  • databricks_job_id (int) – The ID of the Databricks Job to be executed.
  • databricks_job_configuration (dict) – Configuration for triggering a new job run of a
  • poll_interval_seconds (float) – How often to poll the Databricks API to check whether the
  • max_wait_time_seconds (float) – How long to wait for the Databricks job to finish running
  • name (Optional[str]) – The name of the op. If not provided, the name will be
  • databricks_resource_key (str) – The name of the resource key used by this op. If not

Returns: An op definition to run the Databricks Job.Return type: OpDefinition Example:

from dagster import job
from dagster_databricks import create_databricks_run_now_op, DatabricksClientResource

DATABRICKS_JOB_ID = 1234


run_now_op = create_databricks_run_now_op(
databricks_job_id=DATABRICKS_JOB_ID,
databricks_job_configuration=\{
"python_params": [
"--input",
"schema.db.input_table",
"--output",
"schema.db.output_table",
],
},
)

@job(
resource_defs=\{
"databricks": DatabricksClientResource(
host=EnvVar("DATABRICKS_HOST"),
token=EnvVar("DATABRICKS_TOKEN")
)
}
)
def do_stuff():
run_now_op()
dagster_databricks.create_databricks_submit_run_op

Creates an op that submits a one-time run of a set of tasks on Databricks.

As config, the op accepts a blob of the form described in Databricks’ Job API: https://docs.databricks.com/api/workspace/jobs/submit.

Parameters:

  • databricks_job_configuration (dict) – Configuration for submitting a one-time run of a set
  • poll_interval_seconds (float) – How often to poll the Databricks API to check whether the
  • max_wait_time_seconds (float) – How long to wait for the Databricks job to finish running
  • name (Optional[str]) – The name of the op. If not provided, the name will be
  • databricks_resource_key (str) – The name of the resource key used by this op. If not

Returns: An op definition to submit a one-time run of a set of tasks on Databricks.Return type: OpDefinition Example:

from dagster import job
from dagster_databricks import create_databricks_submit_run_op, DatabricksClientResource


submit_run_op = create_databricks_submit_run_op(
databricks_job_configuration=\{
"new_cluster": \{
"spark_version": '2.1.0-db3-scala2.11',
"num_workers": 2
},
"notebook_task": \{
"notebook_path": "/Users/dagster@example.com/PrepareData",
},
}
)

@job(
resource_defs=\{
"databricks": DatabricksClientResource(
host=EnvVar("DATABRICKS_HOST"),
token=EnvVar("DATABRICKS_TOKEN")
)
}
)
def do_stuff():
submit_run_op()

Step Launcher

dagster_databricks.databricks_pyspark_step_launcher ResourceDefinition
superseded

This API has been superseded and its usage is discouraged. Consider using Dagster Pipes instead. Learn more here: https://docs.dagster.io/concepts/dagster-pipes.

Resource for running ops as a Databricks Job.

When this resource is used, the op will be executed in Databricks using the ‘Run Submit’ API. Pipeline code will be zipped up and copied to a directory in DBFS along with the op’s execution context.

Use the ‘run_config’ configuration to specify the details of the Databricks cluster used, and the ‘storage’ key to configure persistent storage on that cluster. Storage is accessed by setting the credentials in the Spark context, as documented here for S3 and here for ADLS.

Pipes

class dagster_databricks.PipesDatabricksClient

Pipes client for databricks.

Parameters:

  • client (WorkspaceClient) – A databricks WorkspaceClient object.
  • (Optional[Mapping[str (env) – An optional dict of environment
  • str]] – An optional dict of environment
  • context_injector (Optional[PipesContextInjector]) – A context injector to use to inject
  • message_reader (Optional[PipesMessageReader]) – A message reader to use to read messages
  • poll_interval_seconds (float) – How long to sleep between checking the status of the job run.
  • forward_termination (bool) – Whether to cancel the Databricks job if the orchestration process
class dagster_databricks.PipesDbfsContextInjector

A context injector that injects context into a Databricks job by writing a JSON file to DBFS.

Parameters: client (WorkspaceClient) – A databricks WorkspaceClient object.

class dagster_databricks.PipesDbfsMessageReader

Message reader that reads messages by periodically reading message chunks from an automatically-generated temporary directory on DBFS.

If log_readers is passed, this reader will also start the passed readers when the first message is received from the external process.

Parameters:

  • interval (float) – interval in seconds between attempts to download a chunk
  • client (WorkspaceClient) – A databricks WorkspaceClient object.
  • cluster_log_root (Optional[str]) – The root path on DBFS where the cluster logs are written.
  • include_stdio_in_messages (bool) – Whether to send stdout/stderr to Dagster via Pipes messages. Defaults to False.
  • log_readers (Optional[Sequence[PipesLogReader]]) – A set of log readers for logs on DBFS.
class dagster_databricks.PipesDbfsLogReader

Reader that reads a log file from DBFS.

Parameters:

  • interval (float) – interval in seconds between attempts to download a log chunk
  • remote_log_name (Literal["stdout", "stderr"]) – The name of the log file to read.
  • target_stream (TextIO) – The stream to which to forward log chunks that have been read.
  • client (WorkspaceClient) – A databricks WorkspaceClient object.
  • debug_info (Optional[str]) – An optional message containing debug information about the log reader.