Build pipelines in JavaScript
This guide covers how to run JavaScript with Dagster using Pipes, however, the same principle will apply to other languages.
Prerequisites
Step 1: Create a script using Tensorflow in JavaScript
First, you'll create a JavaScript script that reads a CSV file and uses Tensorflow to train a sequential model.
Create a file named tensorflow/main.js
with the following contents:
import * as tf from '@tensorflow/tfjs';
const CONFIG = { /* ... */ };
async function train_model() {
const { path_to_data, data_config, path_to_model } = CONFIG;
const dataset = await tf.data.csv(path_to_data, data_config).map(({ xs, ys }) => {
return {
xs: tf.tensor2d(Object.values(xs), [Object.values(xs).length, 1]),
ys: tf.tensor2d(Object.values(ys), [Object.values(ys).length, 1])
};
})
.batch(100);
const model = tf.sequential()
model.add(tf.layers.dense({units: 1, inputShape: [1]}));
model.compile({loss: 'meanSquaredError', optimizer: 'sgd'});
await model.fitDataset(dataset, {epochs: 250})
await model.save(path_to_model);
model.summary();
}
Step 2: Create a Dagster asset that runs the script
In Dagster, create an asset that:
- Uses the
PipesSubprocessClient
resource to run the script withnode
- Sets the
compute_kind
tojavascript
. This makes it easy to identify that an alternate compute will be used for materialization.
import dagster as dg
@dg.asset(
# Set compute type metadata
compute_kind="javascript",
)
def tensorflow_model(
context: dg.AssetExecutionContext,
# Attaches the PipesSubprocessClient as a resource
pipes_subprocess_client: dg.PipesSubprocessClient,
):
# Run the script using Node
return pipes_subprocess_client.run(
command=["node", "tensorflow/main.js"],
context=context,
extras={},
).get_materialize_result()
# Define the Dagster Definitions object
defs = dg.Definitions(
assets=[tensorflow_model],
resources={"pipes_subprocess_client": dg.PipesSubprocessClient()},
)
When the asset is materialized, the stdout and stderr will be captured automatically and shown in the asset logs. If the command passed to Pipes returns a successful exit code, Dagster will produce an asset materialization result.
Step 3: Send and receive data from the script
To send context to your script or emit events back to Dagster, you can use environment variables provided by the PipesSubprocessClient
.
DAGSTER_PIPES_CONTEXT
- Input contextDAGSTER_PIPES_MESSAGES
- Output context
Create a new file with the following helper functions that read the environment variables, decode the data, and write messages back to Dagster:
import * as util from 'util';
import { promises as fs } from "fs";
import { inflate } from 'zlib';
const inflateAsync = util.promisify(inflate);
/**
* Decodes, decompresses, and parses the data sent by Dagster
*/
async function _decodeParam(value) {
if (!value) {
return null;
}
const decoded = Buffer.from(value, "base64");
const decompressed = await inflateAsync(decoded);
return JSON.parse(decompressed.toString("utf-8"));
}
/**
* Extracts the context from the file defined by `DAGSTER_PIPES_CONTEXT`
*/
async function getPipesContext() {
const encodedPipesContextParam = process.env.DAGSTER_PIPES_CONTEXT;
const decodedPipesContextParam = await _decodeParam(encodedPipesContextParam);
if (!decodedPipesContextParam) {
return null;
}
return await fs.readFile(decodedPipesContextParam.path, "utf-8")
.then((data) => JSON.parse(data));
}
/**
* Writes JSON messages to the file defined by `DAGSTER_PIPES_MESSAGES`
*/
async function setPipesMessages(message) {
const encodedPipesMessagesParam = process.env.DAGSTER_PIPES_MESSAGES;
const decodedPipesMessagesParam = await _decodeParam(encodedPipesMessagesParam);
if (!decodedPipesMessagesParam) {
return null;
}
const path = decodedPipesMessagesParam.path;
await fs.appendFile(path, JSON.stringify(message) + "\n");
}
Both environment variables are base64 encoded, zip compressed JSON objects. Each JSON object contains a path that indicates where to read or write data.
Step 4: Emit events and report materializations from your external process
Using the utility functions to decode the Dagster Pipes environment variables, you can send additional parameters into the JavaScript process. You can also output more information into the asset materializations.
Update the tensorflow/main.js
script to:
- Retrieve the model configuration from the Dagster context, and
- Report an asset materialization back to Dagster with model metadata
import * as tf from '@tensorflow/tfjs';
async function train_model() {
// Get configuration from Dagster instead of `CONFIG` object
const { asset_keys, extras: { path_to_data, data_config, path_to_model } } = await getPipesContext()
setPipesMessages({ info: `Materializing ${asset_keys}` });
const dataset = await tf.data.csv(path_to_data, data_config).map(({ xs, ys }) => {
return {
xs: tf.tensor2d(Object.values(xs), [Object.values(xs).length, 1]),
ys: tf.tensor2d(Object.values(ys), [Object.values(ys).length, 1])
};
})
.batch(100);
const model = tf.sequential()
model.add(tf.layers.dense({units: 1, inputShape: [1]}));
model.compile({loss: 'meanSquaredError', optimizer: 'sgd'});
await model.fitDataset(dataset, {epochs: 250})
await model.save(path_to_model);
model.summary();
// Report materialization to Dagster
await setPipesMessages({
method: "report_asset_materialization",
params: {
asset_key: asset_keys[0],
data_version: null,
metadata: {
metrics: model.metrics ? { raw_value: model.metrics, type: "text" } : undefined,
loss: { raw_value: model.loss, type: "text" },
},
},
});
}
Step 5: Update the asset to provide extra parameters
Finally, update your Dagster asset to pass in the model information that's used by the script:
import dagster as dg
@dg.asset(
compute_kind="javascript",
)
def my_asset(
context: dg.AssetExecutionContext,
pipes_subprocess_client: dg.PipesSubprocessClient,
):
"""Runs Javascript to generate an asset."""
return pipes_subprocess_client.run(
command=["node", "tensorflow/main.js"],
context=context,
extras={
"operation_name": "train_model",
"config": {
"path_to_data": "file://../tensorflow/data/data.csv",
"data_config": {"hasHeaders": True},
"path_to_model": "file://../tensorflow/model",
},
},
).get_materialize_result()
defs = dg.Definitions(
assets=[my_asset],
resources={"pipes_subprocess_client": dg.PipesSubprocessClient()},
)
What's next?
- Schedule your pipeline to run periodically with Automating Pipelines
- Explore adding asset checks to validate your script with Understanding Asset Checks