Build pipelines with AWS ECS
This article covers how to use Dagster Pipes with AWS ECS.
The dagster-aws integration library provides the pipes.PipesECSClient
resource which can be used to launch AWS ECS tasks from Dagster assets and ops. Dagster can receive regular events like logs, asset checks, or asset materializations from jobs launched with this client. Using it requires minimal code changes on the task side.
Prerequisites
-
In the Dagster environment, you'll need to:
-
Install the following packages:
pip install dagster dagster-webserver dagster-aws
Refer to the Dagster installation guide for more info.
-
Configure AWS authentication credentials. If you don't have this set up already, refer to the boto3 quickstart.
-
In AWS, you'll need:
-
An existing AWS account
-
An AWS ECS task. To receive logs and events from a task container, it must have
"logDriver"
set to"awslogs"
in"logConfiguration"
.
Step 1: Install the dagster-pipes module in your ECS environment
Install the dagster-pipes
module in the image used for your ECS task. For example, you can install the dependency with pip
in your image Dockerfile:
FROM python:3.11-slim
RUN python -m pip install dagster-pipes
# copy the task script
COPY . .
Step 2: Add dagster-pipes to the ECS task script
Call open_dagster_pipes
in the ECS task script to create a context that can be used to send messages to Dagster:
from dagster_pipes import (
PipesEnvVarParamsLoader,
PipesS3ContextLoader,
open_dagster_pipes,
)
def main():
with open_dagster_pipes() as pipes:
pipes.log.info("Hello from AWS ECS task!")
pipes.report_asset_materialization(
metadata={"some_metric": {"raw_value": 0, "type": "int"}},
data_version="alpha",
)
if __name__ == "__main__":
main()
Step 3: Create an asset using the PipesECSClient to launch the task
In the Dagster asset/op code, use the PipesECSClient
resource to launch the job:
import os
# dagster_glue_pipes.py
import boto3
from dagster_aws.pipes import PipesECSClient
from docutils.nodes import entry
from dagster import AssetExecutionContext, asset
@asset
def ecs_pipes_asset(context: AssetExecutionContext, pipes_ecs_client: PipesECSClient):
return pipes_ecs_client.run(
context=context,
run_task_params={
"taskDefinition": "my-task",
"count": 1,
},
).get_materialize_result()
This will launch the AWS ECS task and wait until it reaches "STOPPED"
status. If any of the tasks's containers fail, the Dagster process will raise an exception. If the Dagster process is interrupted while the task is still running, the task will be terminated.
Step 4: Create Dagster definitions
Next, add the PipesECSClient
resource to your project's Definitions
object:
from dagster import Definitions # noqa
from dagster_aws.pipes import PipesS3MessageReader
defs = Definitions(
assets=[ecs_pipes_asset],
resources={"pipes_ecs_client": PipesECSClient()},
)
Dagster will now be able to launch the AWS ECS task from the ecs_pipes_asset
asset, and receive logs and events from the task. If using the default message_reader
PipesCloudwatchLogReader
, logs will be read from the Cloudwatch log group specified in the container "logConfiguration"
field definition. Logs from all containers in the task will be read.