Skip to main content

Weights & Biases (dagster-wandb)

This library provides a Dagster integration with Weights & Biases.

Use Dagster and Weights & Biases (W&B) to orchestrate your MLOps pipelines and maintain ML assets.

The integration with W&B makes it easy within Dagster to:

Resource

dagster_wandb.wandb_resource ResourceDefinition

Dagster resource used to communicate with the W&B API. It’s useful when you want to use the wandb client within your ops and assets. It’s a required resources if you are using the W&B IO Manager.

It automatically authenticates using the provided API key.

For a complete set of documentation, see Dagster integration.

To configure this resource, we recommend using the configured method.

Example:
from dagster import job
from dagster_wandb import wandb_resource

my_wandb_resource = wandb_resource.configured(\{"api_key": \{"env": "WANDB_API_KEY"}})

@job(resource_defs=\{"wandb_resource": my_wandb_resource})
def my_wandb_job():
...

I/O Manager

dagster_wandb.wandb_artifacts_io_manager IOManager

Dagster IO Manager to create and consume W&B Artifacts.

It allows any Dagster @op or @asset to create and consume W&B Artifacts natively.

For a complete set of documentation, see Dagster integration.

Example:
@repository
def my_repository():
return [
*with_resources(
load_assets_from_current_module(),
resource_defs=\{
"wandb_config": make_values_resource(
entity=str,
project=str,
),
"wandb_resource": wandb_resource.configured(
\{"api_key": \{"env": "WANDB_API_KEY"}}
),
"wandb_artifacts_manager": wandb_artifacts_io_manager.configured(
\{"cache_duration_in_minutes": 60} # only cache files for one hour
),
},
resource_config_by_key=\{
"wandb_config": \{
"config": \{
"entity": "my_entity",
"project": "my_project"
}
}
},
),
]


@asset(
name="my_artifact",
metadata=\{
"wandb_artifact_configuration": \{
"type": "dataset",
}
},
io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
return [1, 2, 3]

Config

class dagster_wandb.WandbArtifactConfiguration

W&B Artifacts IO Manager configuration. Useful for type checking.

class dagster_wandb.SerializationModule

W&B Artifacts IO Manager configuration of the serialization module. Useful for type checking.

Errors

exception dagster_wandb.WandbArtifactsIOManagerError

Represents an execution error of the W&B Artifacts IO Manager.

Ops

dagster_wandb.run_launch_agent

It starts a Launch Agent and runs it as a long running process until stopped manually.

Agents are processes that poll launch queues and execute the jobs (or dispatch them to external services to be executed) in order.

Example:
# config.yaml

resources:
wandb_config:
config:
entity: my_entity
project: my_project
ops:
run_launch_agent:
config:
max_jobs: -1
queues:
- my_dagster_queue
from dagster_wandb.launch.ops import run_launch_agent
from dagster_wandb.resources import wandb_resource

from dagster import job, make_values_resource


@job(
resource_defs=\{
"wandb_config": make_values_resource(
entity=str,
project=str,
),
"wandb_resource": wandb_resource.configured(
\{"api_key": \{"env": "WANDB_API_KEY"}}
),
},
)
def run_launch_agent_example():
run_launch_agent()
dagster_wandb.run_launch_job

Executes a Launch job.

A Launch job is assigned to a queue in order to be executed. You can create a queue or use the default one. Make sure you have an active agent listening to that queue. You can run an agent inside your Dagster instance but can also consider using a deployable agent in Kubernetes.

Example:
# config.yaml

resources:
wandb_config:
config:
entity: my_entity
project: my_project
ops:
my_launched_job:
config:
entry_point:
- python
- train.py
queue: my_dagster_queue
uri: https://github.com/wandb/example-dagster-integration-with-launch
from dagster_wandb.launch.ops import run_launch_job
from dagster_wandb.resources import wandb_resource

from dagster import job, make_values_resource


@job(
resource_defs=\{
"wandb_config": make_values_resource(
entity=str,
project=str,
),
"wandb_resource": wandb_resource.configured(
\{"api_key": \{"env": "WANDB_API_KEY"}}
),
},
)
def run_launch_job_example():
run_launch_job.alias("my_launched_job")() # we rename the job with an alias