Skip to main content

Build pipelines in JavaScript

This guide covers how to run JavaScript with Dagster using Pipes, however, the same principle will apply to other languages.

Prerequisites

To follow this guide, you'll need:

  • Familiarity with Assets
  • A basic understanding of JavaScript and Node.js

To run the examples, you'll need to install:

  • Node.js

  • The following Python packages:

    pip install dagster dagster-webserver
  • The following Node packages:

    npm install @tensorflow/tfjs

Step 1: Create a script using Tensorflow in JavaScript

First, you'll create a JavaScript script that reads a CSV file and uses Tensorflow to train a sequential model.

Create a file named tensorflow/main.js with the following contents:

tensorflow/main.js
import * as tf from '@tensorflow/tfjs';

const CONFIG = { /* ... */ };

async function train_model() {
const { path_to_data, data_config, path_to_model } = CONFIG;
const dataset = await tf.data.csv(path_to_data, data_config).map(({ xs, ys }) => {
return {
xs: tf.tensor2d(Object.values(xs), [Object.values(xs).length, 1]),
ys: tf.tensor2d(Object.values(ys), [Object.values(ys).length, 1])
};
})
.batch(100);

const model = tf.sequential()
model.add(tf.layers.dense({units: 1, inputShape: [1]}));
model.compile({loss: 'meanSquaredError', optimizer: 'sgd'});

await model.fitDataset(dataset, {epochs: 250})
await model.save(path_to_model);
model.summary();
}

Step 2: Create a Dagster asset that runs the script

In Dagster, create an asset that:

  • Uses the PipesSubprocessClient resource to run the script with node
  • Sets the compute_kind to javascript. This makes it easy to identify that an alternate compute will be used for materialization.
import dagster as dg


@dg.asset(
# Set compute type metadata
compute_kind="javascript",
)
def tensorflow_model(
context: dg.AssetExecutionContext,
# Attaches the PipesSubprocessClient as a resource
pipes_subprocess_client: dg.PipesSubprocessClient,
):
# Run the script using Node
return pipes_subprocess_client.run(
command=["node", "tensorflow/main.js"],
context=context,
extras={},
).get_materialize_result()


# Define the Dagster Definitions object
defs = dg.Definitions(
assets=[tensorflow_model],
resources={"pipes_subprocess_client": dg.PipesSubprocessClient()},
)

When the asset is materialized, the stdout and stderr will be captured automatically and shown in the asset logs. If the command passed to Pipes returns a successful exit code, Dagster will produce an asset materialization result.

Step 3: Send and receive data from the script

To send context to your script or emit events back to Dagster, you can use environment variables provided by the PipesSubprocessClient.

  • DAGSTER_PIPES_CONTEXT - Input context
  • DAGSTER_PIPES_MESSAGES - Output context

Create a new file with the following helper functions that read the environment variables, decode the data, and write messages back to Dagster:

import * as util from 'util';
import { promises as fs } from "fs";
import { inflate } from 'zlib';

const inflateAsync = util.promisify(inflate);


/**
* Decodes, decompresses, and parses the data sent by Dagster
*/
async function _decodeParam(value) {
if (!value) {
return null;
}
const decoded = Buffer.from(value, "base64");
const decompressed = await inflateAsync(decoded);
return JSON.parse(decompressed.toString("utf-8"));
}

/**
* Extracts the context from the file defined by `DAGSTER_PIPES_CONTEXT`
*/
async function getPipesContext() {
const encodedPipesContextParam = process.env.DAGSTER_PIPES_CONTEXT;
const decodedPipesContextParam = await _decodeParam(encodedPipesContextParam);
if (!decodedPipesContextParam) {
return null;
}
return await fs.readFile(decodedPipesContextParam.path, "utf-8")
.then((data) => JSON.parse(data));
}

/**
* Writes JSON messages to the file defined by `DAGSTER_PIPES_MESSAGES`
*/
async function setPipesMessages(message) {
const encodedPipesMessagesParam = process.env.DAGSTER_PIPES_MESSAGES;
const decodedPipesMessagesParam = await _decodeParam(encodedPipesMessagesParam);
if (!decodedPipesMessagesParam) {
return null;
}
const path = decodedPipesMessagesParam.path;
await fs.appendFile(path, JSON.stringify(message) + "\n");
}

Both environment variables are base64 encoded, zip compressed JSON objects. Each JSON object contains a path that indicates where to read or write data.

Step 4: Emit events and report materializations from your external process

Using the utility functions to decode the Dagster Pipes environment variables, you can send additional parameters into the JavaScript process. You can also output more information into the asset materializations.

Update the tensorflow/main.js script to:

  • Retrieve the model configuration from the Dagster context, and
  • Report an asset materialization back to Dagster with model metadata
import * as tf from '@tensorflow/tfjs';

async function train_model() {
// Get configuration from Dagster instead of `CONFIG` object
const { asset_keys, extras: { path_to_data, data_config, path_to_model } } = await getPipesContext()

setPipesMessages({ info: `Materializing ${asset_keys}` });

const dataset = await tf.data.csv(path_to_data, data_config).map(({ xs, ys }) => {
return {
xs: tf.tensor2d(Object.values(xs), [Object.values(xs).length, 1]),
ys: tf.tensor2d(Object.values(ys), [Object.values(ys).length, 1])
};
})
.batch(100);

const model = tf.sequential()
model.add(tf.layers.dense({units: 1, inputShape: [1]}));
model.compile({loss: 'meanSquaredError', optimizer: 'sgd'});

await model.fitDataset(dataset, {epochs: 250})
await model.save(path_to_model);
model.summary();

// Report materialization to Dagster
await setPipesMessages({
method: "report_asset_materialization",
params: {
asset_key: asset_keys[0],
data_version: null,
metadata: {
metrics: model.metrics ? { raw_value: model.metrics, type: "text" } : undefined,
loss: { raw_value: model.loss, type: "text" },
},
},
});
}

Step 5: Update the asset to provide extra parameters

Finally, update your Dagster asset to pass in the model information that's used by the script:

import dagster as dg


@dg.asset(
compute_kind="javascript",
)
def my_asset(
context: dg.AssetExecutionContext,
pipes_subprocess_client: dg.PipesSubprocessClient,
):
"""Runs Javascript to generate an asset."""
return pipes_subprocess_client.run(
command=["node", "tensorflow/main.js"],
context=context,
extras={
"operation_name": "train_model",
"config": {
"path_to_data": "file://../tensorflow/data/data.csv",
"data_config": {"hasHeaders": True},
"path_to_model": "file://../tensorflow/model",
},
},
).get_materialize_result()


defs = dg.Definitions(
assets=[my_asset],
resources={"pipes_subprocess_client": dg.PipesSubprocessClient()},
)

What's next?