Databricks (dagster-databricks)
The dagster_databricks
package provides these main pieces of functionality:
- A resource,
databricks_pyspark_step_launcher
, which will execute a op within a Databricks - An op factory,
create_databricks_run_now_op
, which creates an op that launches an existing - A op factory,
create_databricks_submit_run_op
, which creates an op that submits a one-time run
Note that, for the databricks_pyspark_step_launcher
, either S3 or Azure Data Lake Storage config
must be specified for ops to succeed, and the credentials for this storage must also be
stored as a Databricks Secret and stored in the resource config so that the Databricks cluster can
access storage.
APIs
Resources
- dagster_databricks.DatabricksClientResource ResourceDefinition
Resource which provides a Python client for interacting with Databricks within an op or asset.
- class dagster_databricks.DatabricksClient
A thin wrapper over the Databricks REST API.
- property workspace_client
Retrieve a reference to the underlying Databricks Workspace client. For more information, see the Databricks SDK for Python.
Examples:from dagster import op
from databricks.sdk import WorkspaceClient
@op(required_resource_keys=\{"databricks_client"})
def op1(context):
# Initialize the Databricks Jobs API
client = context.resources.databricks_client.api_client
# Example 1: Run a Databricks job with some parameters.
client.jobs.run_now(...)
# Example 2: Trigger a one-time run of a Databricks workload.
client.jobs.submit(...)
# Example 3: Get an existing run.
client.jobs.get_run(...)
# Example 4: Cancel a run.
client.jobs.cancel_run(...)Returns: The authenticated Databricks SDK Workspace Client.Return type: WorkspaceClient
Ops
- dagster_databricks.create_databricks_run_now_op
Creates an op that launches an existing databricks job.
As config, the op accepts a blob of the form described in Databricks’ Job API: https://docs.databricks.com/api/workspace/jobs/runnow. The only required field is
job_id
, which is the ID of the job to be executed. Additional fields can be used to specify override parameters for the Databricks Job.Parameters:
- databricks_job_id (int) – The ID of the Databricks Job to be executed.
- databricks_job_configuration (dict) – Configuration for triggering a new job run of a
- poll_interval_seconds (float) – How often to poll the Databricks API to check whether the
- max_wait_time_seconds (float) – How long to wait for the Databricks job to finish running
- name (Optional[str]) – The name of the op. If not provided, the name will be
- databricks_resource_key (str) – The name of the resource key used by this op. If not
Returns: An op definition to run the Databricks Job.Return type: OpDefinition Example:
from dagster import job
from dagster_databricks import create_databricks_run_now_op, DatabricksClientResource
DATABRICKS_JOB_ID = 1234
run_now_op = create_databricks_run_now_op(
databricks_job_id=DATABRICKS_JOB_ID,
databricks_job_configuration=\{
"python_params": [
"--input",
"schema.db.input_table",
"--output",
"schema.db.output_table",
],
},
)
@job(
resource_defs=\{
"databricks": DatabricksClientResource(
host=EnvVar("DATABRICKS_HOST"),
token=EnvVar("DATABRICKS_TOKEN")
)
}
)
def do_stuff():
run_now_op()
- dagster_databricks.create_databricks_submit_run_op
Creates an op that submits a one-time run of a set of tasks on Databricks.
As config, the op accepts a blob of the form described in Databricks’ Job API: https://docs.databricks.com/api/workspace/jobs/submit.
Parameters:
- databricks_job_configuration (dict) – Configuration for submitting a one-time run of a set
- poll_interval_seconds (float) – How often to poll the Databricks API to check whether the
- max_wait_time_seconds (float) – How long to wait for the Databricks job to finish running
- name (Optional[str]) – The name of the op. If not provided, the name will be
- databricks_resource_key (str) – The name of the resource key used by this op. If not
Returns: An op definition to submit a one-time run of a set of tasks on Databricks.Return type: OpDefinition Example:
from dagster import job
from dagster_databricks import create_databricks_submit_run_op, DatabricksClientResource
submit_run_op = create_databricks_submit_run_op(
databricks_job_configuration=\{
"new_cluster": \{
"spark_version": '2.1.0-db3-scala2.11',
"num_workers": 2
},
"notebook_task": \{
"notebook_path": "/Users/dagster@example.com/PrepareData",
},
}
)
@job(
resource_defs=\{
"databricks": DatabricksClientResource(
host=EnvVar("DATABRICKS_HOST"),
token=EnvVar("DATABRICKS_TOKEN")
)
}
)
def do_stuff():
submit_run_op()
Step Launcher
- dagster_databricks.databricks_pyspark_step_launcher ResourceDefinition
- superseded
This API has been superseded and its usage is discouraged. Consider using Dagster Pipes instead. Learn more here: https://docs.dagster.io/concepts/dagster-pipes.
Resource for running ops as a Databricks Job.
When this resource is used, the op will be executed in Databricks using the ‘Run Submit’ API. Pipeline code will be zipped up and copied to a directory in DBFS along with the op’s execution context.
Use the ‘run_config’ configuration to specify the details of the Databricks cluster used, and the ‘storage’ key to configure persistent storage on that cluster. Storage is accessed by setting the credentials in the Spark context, as documented here for S3 and here for ADLS.
Pipes
- class dagster_databricks.PipesDatabricksClient
Pipes client for databricks.
Parameters:
- client (WorkspaceClient) – A databricks WorkspaceClient object.
- (Optional[Mapping[str (env) – An optional dict of environment
- str]] – An optional dict of environment
- context_injector (Optional[PipesContextInjector]) – A context injector to use to inject
- message_reader (Optional[PipesMessageReader]) – A message reader to use to read messages
- poll_interval_seconds (float) – How long to sleep between checking the status of the job run.
- forward_termination (bool) – Whether to cancel the Databricks job if the orchestration process
- class dagster_databricks.PipesDbfsContextInjector
A context injector that injects context into a Databricks job by writing a JSON file to DBFS.
Parameters: client (WorkspaceClient) – A databricks WorkspaceClient object.
- class dagster_databricks.PipesDbfsMessageReader
Message reader that reads messages by periodically reading message chunks from an automatically-generated temporary directory on DBFS.
If log_readers is passed, this reader will also start the passed readers when the first message is received from the external process.
Parameters:
- interval (float) – interval in seconds between attempts to download a chunk
- client (WorkspaceClient) – A databricks WorkspaceClient object.
- cluster_log_root (Optional[str]) – The root path on DBFS where the cluster logs are written.
- include_stdio_in_messages (bool) – Whether to send stdout/stderr to Dagster via Pipes messages. Defaults to False.
- log_readers (Optional[Sequence[PipesLogReader]]) – A set of log readers for logs on DBFS.
- class dagster_databricks.PipesDbfsLogReader
Reader that reads a log file from DBFS.
Parameters:
- interval (float) – interval in seconds between attempts to download a log chunk
- remote_log_name (Literal["stdout", "stderr"]) – The name of the log file to read.
- target_stream (TextIO) – The stream to which to forward log chunks that have been read.
- client (WorkspaceClient) – A databricks WorkspaceClient object.
- debug_info (Optional[str]) – An optional message containing debug information about the log reader.