Weights & Biases (dagster-wandb)
This library provides a Dagster integration with Weights & Biases.
Use Dagster and Weights & Biases (W&B) to orchestrate your MLOps pipelines and maintain ML assets.
The integration with W&B makes it easy within Dagster to:
- use and create W&B Artifacts.
- use and create Registered Models in the W&B Model Registry.
- run training jobs on dedicated compute using W&B Launch.
- use the wandb client in ops and assets.
Useful links
For a complete set of documentation, see Dagster integration on the W&B website.
For full-code examples, see examples/with_wandb in the Dagster’s Github repo.
Resource
- dagster_wandb.wandb_resource ResourceDefinition
Dagster resource used to communicate with the W&B API. It’s useful when you want to use the wandb client within your ops and assets. It’s a required resources if you are using the W&B IO Manager.
It automatically authenticates using the provided API key.
For a complete set of documentation, see Dagster integration.
To configure this resource, we recommend using the configured method.
Example:from dagster import job
from dagster_wandb import wandb_resource
my_wandb_resource = wandb_resource.configured(\{"api_key": \{"env": "WANDB_API_KEY"}})
@job(resource_defs=\{"wandb_resource": my_wandb_resource})
def my_wandb_job():
...
I/O Manager
- dagster_wandb.wandb_artifacts_io_manager IOManager
Dagster IO Manager to create and consume W&B Artifacts.
It allows any Dagster @op or @asset to create and consume W&B Artifacts natively.
For a complete set of documentation, see Dagster integration.
Example:@repository
def my_repository():
return [
*with_resources(
load_assets_from_current_module(),
resource_defs=\{
"wandb_config": make_values_resource(
entity=str,
project=str,
),
"wandb_resource": wandb_resource.configured(
\{"api_key": \{"env": "WANDB_API_KEY"}}
),
"wandb_artifacts_manager": wandb_artifacts_io_manager.configured(
\{"cache_duration_in_minutes": 60} # only cache files for one hour
),
},
resource_config_by_key=\{
"wandb_config": \{
"config": \{
"entity": "my_entity",
"project": "my_project"
}
}
},
),
]
@asset(
name="my_artifact",
metadata=\{
"wandb_artifact_configuration": \{
"type": "dataset",
}
},
io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
return [1, 2, 3]
Config
- class dagster_wandb.WandbArtifactConfiguration
W&B Artifacts IO Manager configuration. Useful for type checking.
- class dagster_wandb.SerializationModule
W&B Artifacts IO Manager configuration of the serialization module. Useful for type checking.
Errors
- exception dagster_wandb.WandbArtifactsIOManagerError
Represents an execution error of the W&B Artifacts IO Manager.
Ops
- dagster_wandb.run_launch_agent
It starts a Launch Agent and runs it as a long running process until stopped manually.
Agents are processes that poll launch queues and execute the jobs (or dispatch them to external services to be executed) in order.
Example:# config.yaml
resources:
wandb_config:
config:
entity: my_entity
project: my_project
ops:
run_launch_agent:
config:
max_jobs: -1
queues:
- my_dagster_queuefrom dagster_wandb.launch.ops import run_launch_agent
from dagster_wandb.resources import wandb_resource
from dagster import job, make_values_resource
@job(
resource_defs=\{
"wandb_config": make_values_resource(
entity=str,
project=str,
),
"wandb_resource": wandb_resource.configured(
\{"api_key": \{"env": "WANDB_API_KEY"}}
),
},
)
def run_launch_agent_example():
run_launch_agent()
- dagster_wandb.run_launch_job
Executes a Launch job.
A Launch job is assigned to a queue in order to be executed. You can create a queue or use the default one. Make sure you have an active agent listening to that queue. You can run an agent inside your Dagster instance but can also consider using a deployable agent in Kubernetes.
Example:# config.yaml
resources:
wandb_config:
config:
entity: my_entity
project: my_project
ops:
my_launched_job:
config:
entry_point:
- python
- train.py
queue: my_dagster_queue
uri: https://github.com/wandb/example-dagster-integration-with-launchfrom dagster_wandb.launch.ops import run_launch_job
from dagster_wandb.resources import wandb_resource
from dagster import job, make_values_resource
@job(
resource_defs=\{
"wandb_config": make_values_resource(
entity=str,
project=str,
),
"wandb_resource": wandb_resource.configured(
\{"api_key": \{"env": "WANDB_API_KEY"}}
),
},
)
def run_launch_job_example():
run_launch_job.alias("my_launched_job")() # we rename the job with an alias