Skip to main content

Dagster & dbt

Dagster orchestrates dbt alongside other technologies, so you can schedule dbt with Spark, Python, etc. in a single data pipeline.

Dagster assets understand dbt at the level of individual dbt models. This means that you can:

  • Use Dagster's UI or APIs to run subsets of your dbt models, seeds, and snapshots.
  • Track failures, logs, and run history for individual dbt models, seeds, and snapshots.
  • Define dependencies between individual dbt models and other data assets. For example, put dbt models after the Fivetran-ingested table that they read from, or put a machine learning after the dbt models that it's trained from.

Installation

pip install dagster-dbt

Example

from pathlib import Path

from dagster_dbt import (
DbtCliResource,
DbtProject,
build_schedule_from_dbt_selection,
dbt_assets,
)

from dagster import AssetExecutionContext, Definitions

RELATIVE_PATH_TO_MY_DBT_PROJECT = "./my_dbt_project"

my_project = DbtProject(
project_dir=Path(__file__)
.joinpath("..", RELATIVE_PATH_TO_MY_DBT_PROJECT)
.resolve(),
)
my_project.prepare_if_dev()


@dbt_assets(manifest=my_project.manifest_path)
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()


my_schedule = build_schedule_from_dbt_selection(
[my_dbt_assets],
job_name="materialize_dbt_models",
cron_schedule="0 0 * * *",
dbt_select="fqn:*",
)

defs = Definitions(
assets=[my_dbt_assets],
schedules=[my_schedule],
resources={
"dbt": DbtCliResource(project_dir=my_project),
},
)

About dbt

dbt is a SQL-first transformation workflow that lets teams quickly and collaboratively deploy analytics code following software engineering best practices like modularity, portability, CI/CD, and documentation.