External pipelines
Dagster Pipes provides a powerful mechanism for invoking code outside of Dagster, while providing all the benefits of scheduling, reporting, and observability of native Dagster pipelines. While Dagster is written in Python, you can run code in other languages and send information back to Dagster.
In this guide, we'll walk you through how to invoke non-Dagster code through Pipes.
This documentation assumes familiarity with Dagster assets.
Setting up an asset that invokes your external code
To set up invoking code outside of Dagster, you first need to set up an asset. We can invoke the external code within the asset function by using a Dagster Pipes client resource.
This external code is not required to know anything about Dagster. It can even be a process running a different language on a remote machine - the only requirement is that it can be triggered from Python.
In the following example, our external code is in a Python script that we invoke within a Dagster asset.
def main():
# do some work
...
import shutil
import dagster as dg
@dg.asset
def wrapper_asset(
context: dg.AssetExecutionContext, pipes_subprocess_client: dg.PipesSubprocessClient
) -> None:
external_python_code_path = "/usr/bin/external_code.py"
return pipes_subprocess_client.run(
command=[shutil.which("python"), external_python_code_path],
context=context,
).get_results()
defs = dg.Definitions(
assets=[wrapper_asset],
resources={"pipes_subprocess_client": dg.PipesSubprocessClient()},
)
Materializing this asset in Dagster from the UI or from a sensor/schedule will kick off the execution of that external code.
Sending logs and metadata back to Dagster from external code
Dagster Pipes also establishes a protocol for external code to optionally send back log and metadata back to Dagster. A Python client for this protocol is available as part of the dagster-pipes
package. To send back log and metadata back to Dagster, we can create a PipesContext
object within our external code:
from dagster_pipes import PipesContext, open_dagster_pipes
def main():
with open_dagster_pipes():
# get the context
context = PipesContext.get()
# do some work
...
# send some logs to Dagster
context.log.info("doing some stuff")
# report asset materialization metadata to Dagster
context.report_asset_materialization(metadata={"total_orders": 2})
The logs sent back using the PipesContext
will be visible in the structured logs of that asset materialization's run, and the materialization metadata will be reflected in the asset history.