Pipes (dagster-pipes)
The dagster-pipes
library is intended for inclusion in an external process that integrates with Dagster using the Pipes protocol. This could be in an environment like Databricks, Kubernetes, or Docker. Using this library, you can write code in the external process that streams metadata back to Dagster.
For a detailed look at the Pipes process, including how to customize it, refer to the Dagster Pipes details and customization guide.
Looking to set up a Pipes client in Dagster? Refer to the Dagster Pipes API reference.
Note: This library isn’t included with dagster
and must be installed separately.
Context
- dagster_pipes.open_dagster_pipes
Initialize the Dagster Pipes context.
This function should be called near the entry point of a pipes process. It will load injected context information from Dagster and spin up the machinery for streaming messages back to Dagster.
If the process was not launched by Dagster, this function will emit a warning and return a MagicMock object. This should make all operations on the context no-ops and prevent your code from crashing.
Parameters:
- context_loader (Optional[PipesContextLoader]) – The context loader to use. Defaults to
- message_writer (Optional[PipesMessageWriter]) – The message writer to use. Defaults to
- params_loader (Optional[PipesParamsLoader]) – The params loader to use. Defaults to
Returns: The initialized context.Return type: PipesContext
- class dagster_pipes.PipesContext
The context for a Dagster Pipes process.
This class is analogous to
OpExecutionContext
on the Dagster side of the Pipes connection. It provides access to information such as the asset key(s) and partition key(s) in scope for the current step. It also provides methods for logging and emitting results that will be streamed back to Dagster.This class should not be directly instantiated by the user. Instead it should be initialized by calling
open_dagster_pipes()
, which will return the singleton instance of this class. After open_dagster_pipes() has been called, the singleton instance can also be retrieved by callingPipesContext.get()
.- close
Close the pipes connection. This will flush all buffered messages to the orchestration process and cause any further attempt to write a message to raise an error. This method is idempotent– subsequent calls after the first have no effect.
- classmethod get
Get the singleton instance of the context. Raises an error if the context has not been initialized.
- get_extra
Get the value of an extra provided by the user. Raises an error if the extra is not defined.
Parameters: key (str) – The key of the extra.Returns: The value of the extra.Return type: Any
- classmethod is_initialized
bool: Whether the context has been initialized.
- report_asset_check
Report to Dagster that an asset check has been performed. Streams a payload containing check result information back to Dagster. If no assets or associated checks are in scope, raises an error.
Parameters:
- check_name (str) – The name of the check.
- passed (bool) – Whether the check passed.
- severity (PipesAssetCheckSeverity) – The severity of the check. Defaults to “ERROR”.
- metadata (Optional[Mapping[str, Union[PipesMetadataRawValue, PipesMetadataValue]]]) – Metadata for the check. Defaults to None.
- asset_key (Optional[str]) – The asset key for the check. If only a single asset is in
- report_asset_materialization
Report to Dagster that an asset has been materialized. Streams a payload containing materialization information back to Dagster. If no assets are in scope, raises an error.
Parameters:
- metadata (Optional[Mapping[str, Union[PipesMetadataRawValue, PipesMetadataValue]]]) – Metadata for the materialized asset. Defaults to None.
- data_version (Optional[str]) – The data version for the materialized asset.
- asset_key (Optional[str]) – The asset key for the materialized asset. If only a
- report_custom_message
Send a JSON serializable payload back to the orchestration process. Can be retrieved there using get_custom_messages.
Parameters: payload (Any) – JSON serializable data.
- classmethod set
Set the singleton instance of the context.
- property asset_key
The AssetKey for the currently scoped asset. Raises an error if 0 or multiple assets are in scope.
Type: str
- property asset_keys
The AssetKeys for the currently scoped assets. Raises an error if no assets are in scope.
Type: Sequence[str]
- property code_version
The code version for the currently scoped asset. Raises an error if 0 or multiple assets are in scope.
Type: Optional[str]
- property code_version_by_asset_key
Mapping of asset key to code version for the currently scoped assets. Raises an error if no assets are in scope.
Type: Mapping[str, Optional[str]]
- property extras
Key-value map for all extras provided by the user.
Type: Mapping[str, Any]
- property is_asset_step
Whether the current step targets assets.
Type: bool
- property is_closed
Whether the context has been closed.
Type: bool
- property is_partition_step
Whether the current step is scoped to one or more partitions.
Type: bool
- property job_name
The job name for the currently executing run. Returns None if the run is not derived from a job.
Type: Optional[str]
- property log
A logger that streams log messages back to Dagster.
Type: logging.Logger
- property partition_key
The partition key for the currently scoped partition. Raises an error if 0 or multiple partitions are in scope.
Type: str
- property partition_key_range
The partition key range for the currently scoped partition or partitions. Raises an error if no partitions are in scope.
Type: PipesPartitionKeyRange
- property partition_time_window
The partition time window for the currently scoped partition or partitions. Returns None if partitions in scope are not temporal. Raises an error if no partitions are in scope.
Type: Optional[PipesTimeWindow]
- property provenance
The provenance for the currently scoped asset. Raises an error if 0 or multiple assets are in scope.
Type: Optional[PipesDataProvenance]
- property provenance_by_asset_key
Mapping of asset key to provenance for the currently scoped assets. Raises an error if no assets are in scope.
Type: Mapping[str, Optional[PipesDataProvenance]]
- property retry_number
The retry number for the currently executing run.
Type: int
- property run_id
The run ID for the currently executing pipeline run.
Type: str
Advanced
Most Pipes users won’t need to use the APIs in the following sections unless they are customizing the Pipes protocol.
Refer to the Dagster Pipes details and customization guide for more information.
Context loaders
Context loaders load the context payload from the location specified in the bootstrap payload.
- class dagster_pipes.PipesContextLoader
- abstract load_context
A @contextmanager that loads context data injected by the orchestration process.
This method should read and yield the context data from the location specified by the passed in PipesParams.
Parameters: params (PipesParams) – The params provided by the context injector in the orchestration process.Yields: PipesContextData – The context data.
- class dagster_pipes.PipesDefaultContextLoader
Context loader that loads context data from either a file or directly from the provided params.
The location of the context data is configured by the params received by the loader. If the params include a key path, then the context data will be loaded from a file at the specified path. If the params instead include a key data, then the corresponding value should be a dict representing the context data.
- load_context
A @contextmanager that loads context data injected by the orchestration process.
This method should read and yield the context data from the location specified by the passed in PipesParams.
Parameters: params (PipesParams) – The params provided by the context injector in the orchestration process.Yields: PipesContextData – The context data.
- class dagster_pipes.PipesDbfsContextLoader
Context loader that reads context from a JSON file on DBFS.
- load_context
A @contextmanager that loads context data injected by the orchestration process.
This method should read and yield the context data from the location specified by the passed in PipesParams.
Parameters: params (PipesParams) – The params provided by the context injector in the orchestration process.Yields: PipesContextData – The context data.
Params loaders
Params loaders load the bootstrap payload from some globally accessible key-value store.
- class dagster_pipes.PipesParamsLoader
Object that loads params passed from the orchestration process by the context injector and message reader. These params are used to respectively bootstrap the
PipesContextLoader
andPipesMessageWriter
.- abstract is_dagster_pipes_process
Whether or not this process has been provided with provided with information to create a PipesContext or should instead return a mock.
- abstract load_context_params
PipesParams: Load params passed by the orchestration-side context injector.
- abstract load_messages_params
PipesParams: Load params passed by the orchestration-side message reader.
- class dagster_pipes.PipesEnvVarParamsLoader
Params loader that extracts params from environment variables.
- class dagster_pipes.PipesCliArgsParamsLoader
Params loader that extracts params from known CLI arguments.
- is_dagster_pipes_process
Whether or not this process has been provided with provided with information to create a PipesContext or should instead return a mock.
- load_context_params
PipesParams: Load params passed by the orchestration-side context injector.
- load_messages_params
PipesParams: Load params passed by the orchestration-side message reader.
- class dagster_pipes.PipesMappingParamsLoader
Params loader that extracts params from a Mapping provided at init time.
- is_dagster_pipes_process
Whether or not this process has been provided with provided with information to create a PipesContext or should instead return a mock.
- load_context_params
PipesParams: Load params passed by the orchestration-side context injector.
- load_messages_params
PipesParams: Load params passed by the orchestration-side message reader.
Message writers
Message writers write messages to the location specified in the bootstrap payload.
- class dagster_pipes.PipesMessageWriter
- get_opened_extras
Return arbitary reader-specific information to be passed back to the orchestration process under the extras key of the initialization payload.
Returns: A dict of arbitrary data to be passed back to the orchestration process.Return type: PipesExtras
- final get_opened_payload
Return a payload containing information about the external process to be passed back to the orchestration process. This should contain information that cannot be known before the external process is launched.
This method should not be overridden by users. Instead, users should override get_opened_extras to inject custom data.
- abstract open
A @contextmanager that initializes a channel for writing messages back to Dagster.
This method should takes the params passed by the orchestration-side
PipesMessageReader
and use them to construct and yield aPipesMessageWriterChannel
.Parameters: params (PipesParams) – The params provided by the message reader in the orchestration process.Yields: PipesMessageWriterChannel – Channel for writing messagse back to Dagster.
- class dagster_pipes.PipesDefaultMessageWriter
Message writer that writes messages to either a file or the stdout or stderr stream.
The write location is configured by the params received by the writer. If the params include a key path, then messages will be written to a file at the specified path. If the params instead include a key stdio, then messages then the corresponding value must specify either stderr or stdout, and messages will be written to the selected stream.
- open
A @contextmanager that initializes a channel for writing messages back to Dagster.
This method should takes the params passed by the orchestration-side
PipesMessageReader
and use them to construct and yield aPipesMessageWriterChannel
.Parameters: params (PipesParams) – The params provided by the message reader in the orchestration process.Yields: PipesMessageWriterChannel – Channel for writing messagse back to Dagster.
- class dagster_pipes.PipesBlobStoreMessageWriter
- open
Construct and yield a
PipesBlobStoreMessageWriterChannel
.Parameters: params (PipesParams) – The params provided by the message reader in the orchestration process.Yields: PipesBlobStoreMessageWriterChannel – Channel that periodically uploads message chunks to a blob store.
- INCLUDE_STDIO_IN_MESSAGES_KEY
=
'include_stdio_in_messages' Message writer channel that periodically uploads message chunks to some blob store endpoint.
- class dagster_pipes.PipesS3MessageWriter
Message writer that writes messages by periodically writing message chunks to an S3 bucket.
Parameters:
- client (Any) – A boto3.client(“s3”) object.
- interval (float) – interval in seconds between upload chunk uploads
- class dagster_pipes.PipesDbfsMessageWriter
Message writer that writes messages by periodically writing message chunks to a directory on DBFS.
- get_opened_extras
Return arbitary reader-specific information to be passed back to the orchestration process under the extras key of the initialization payload.
Returns: A dict of arbitrary data to be passed back to the orchestration process.Return type: PipesExtras
Message writer channels
Message writer channels are objects that write messages back to the Dagster orchestration process.
- class dagster_pipes.PipesMessageWriterChannel
Object that writes messages back to the Dagster orchestration process.
- abstract write_message
Write a message to the orchestration process.
Parameters: message (PipesMessage) – The message to write.
- class dagster_pipes.PipesBlobStoreMessageWriterChannel
Message writer channel that periodically uploads message chunks to some blob store endpoint.
- write_message
Write a message to the orchestration process.
Parameters: message (PipesMessage) – The message to write.
- class dagster_pipes.PipesBufferedFilesystemMessageWriterChannel
Message writer channel that periodically writes message chunks to an endpoint mounted on the filesystem.
Parameters: interval (float) – interval in seconds between chunk uploads
- class dagster_pipes.PipesFileMessageWriterChannel
Message writer channel that writes one message per line to a file.
- write_message
Write a message to the orchestration process.
Parameters: message (PipesMessage) – The message to write.
- class dagster_pipes.PipesStreamMessageWriterChannel
Message writer channel that writes one message per line to a TextIO stream.
- write_message
Write a message to the orchestration process.
Parameters: message (PipesMessage) – The message to write.
- class dagster_pipes.PipesS3MessageWriterChannel
Message writer channel for writing messages by periodically writing message chunks to an S3 bucket.
Parameters:
- client (Any) – A boto3.client(“s3”) object.
- bucket (str) – The name of the S3 bucket to write to.
- key_prefix (Optional[str]) – An optional prefix to use for the keys of written blobs.
- interval (float) – interval in seconds between upload chunk uploads
Utilities
- dagster_pipes.encode_env_var
Encode value by serializing to JSON, compressing with zlib, and finally encoding with base64. base64_encode(compress(to_json(value))) in function notation.
Parameters: value (Any) – The value to encode. Must be JSON-serializable.Returns: The encoded value.Return type: str
- dagster_pipes.decode_env_var
Decode a value by decoding from base64, decompressing with zlib, and finally deserializing from JSON. from_json(decompress(base64_decode(value))) in function notation.
Parameters: value (Any) – The value to decode.Returns: The decoded value.Return type: Any