dbt (dagster-dbt)
Dagster orchestrates dbt alongside other technologies, so you can combine dbt with Spark, Python, etc. in a single workflow. Dagster’s software-defined asset abstractions make it simple to define data assets that depend on specific dbt models, or to define the computation required to compute the sources that your dbt models depend on.
Related documentation pages: dbt and dbt Cloud.
dagster-dbt
dagster-dbt project
Commands for using a dbt project in Dagster.
dagster-dbt project [OPTIONS] COMMAND [ARGS]...
prepare-and-package
This command will invoke prepare_and_package
on DbtProjectDbtProject
found in the target module or file.
Note that this command runs dbt deps and dbt parse.
dagster-dbt project prepare-and-package [OPTIONS]
Options:
- --file <file>
Required The file containing DbtProject definitions to prepare.
scaffold
This command will initialize a new Dagster project and create directories and files that load assets from an existing dbt project.
dagster-dbt project scaffold [OPTIONS]
Options:
- --project-name <project_name>
Required The name of the Dagster project to initialize for your dbt project.
- --dbt-project-dir <dbt_project_dir>
The path of your dbt project directory. This path must contain a dbt_project.yml file. By default, this command will assume that the current working directory contains a dbt project, but you can set a different directory by setting this option.
dbt Core
Here, we provide interfaces to manage dbt projects invoked by the local dbt command line interface (dbt CLI).
Assets (dbt Core)
- @dagster_dbt.dbt_assets
Create a definition for how to compute a set of dbt resources, described by a manifest.json. When invoking dbt commands using DbtCliResource
DbtCliResource
’s cli()cli()
method, Dagster events are emitted by callingyield from
on the event stream returned by stream()stream()
.Parameters:
- manifest (Union[Mapping[str, Any], str, Path]) – The contents of a manifest.json file
- select (str) – A dbt selection string for the models in a project that you want
- exclude (Optional[str]) – A dbt selection string for the models in a project that you want
- name (Optional[str]) – The name of the op.
- io_manager_key (Optional[str]) – The IO manager key that will be set on each of the returned
- partitions_def (Optional[PartitionsDefinitionPartitionsDefinition]) – Defines the set of partition keys that
- dagster_dbt_translator (Optional[DagsterDbtTranslatorDagsterDbtTranslator]) – Allows customizing how to map
- backfill_policy (Optional[BackfillPolicyBackfillPolicy]) – If a partitions_def is defined, this determines
- op_tags (Optional[Dict[str, Any]]) – A dictionary of tags for the op that computes the assets.
- required_resource_keys (Optional[Set[str]]) – Set of required resource handles.
- project (Optional[DbtProjectDbtProject]) – A DbtProject instance which provides a pointer to the dbt
- retry_policy (Optional[RetryPolicyRetryPolicy]) – The retry policy for the op that computes the asset.
Examples:
Running
dbt build
for a dbt project:from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()Running dbt commands with flags:
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build", "--full-refresh"], context=context).stream()Running dbt commands with
--vars
:import json
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
dbt_vars = \{"key": "value"}
yield from dbt.cli(["build", "--vars", json.dumps(dbt_vars)], context=context).stream()Retrieving dbt artifacts after running a dbt command:
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
dbt_build_invocation = dbt.cli(["build"], context=context)
yield from dbt_build_invocation.stream()
run_results_json = dbt_build_invocation.get_artifact("run_results.json")Running multiple dbt commands for a dbt project:
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["run"], context=context).stream()
yield from dbt.cli(["test"], context=context).stream()Accessing the dbt event stream alongside the Dagster event stream:
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
dbt_cli_invocation = dbt.cli(["build"], context=context)
# Each dbt event is structured: https://docs.getdbt.com/reference/events-logging
for dbt_event in dbt_invocation.stream_raw_events():
for dagster_event in dbt_event.to_default_asset_events(
manifest=dbt_invocation.manifest,
dagster_dbt_translator=dbt_invocation.dagster_dbt_translator,
context=dbt_invocation.context,
target_path=dbt_invocation.target_path,
):
# Manipulate `dbt_event`
...
# Then yield the Dagster event
yield dagster_eventCustomizing the Dagster asset definition metadata inferred from a dbt project using DagsterDbtTranslator
DagsterDbtTranslator
:from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
...
@dbt_assets(
manifest=Path("target", "manifest.json"),
dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()Using a custom resource key for dbt:
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, my_custom_dbt_resource_key: DbtCliResource):
yield from my_custom_dbt_resource_key.cli(["build"], context=context).stream()Using a dynamically generated resource key for dbt using required_resource_keys:
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
dbt_resource_key = "my_custom_dbt_resource_key"
@dbt_assets(manifest=Path("target", "manifest.json"), required_resource_keys=\{my_custom_dbt_resource_key})
def my_dbt_assets(context: AssetExecutionContext):
dbt = getattr(context.resources, dbt_resource_key)
yield from dbt.cli(["build"], context=context).stream()Invoking another Dagster ResourceDefinition
ResourceDefinition
alongside dbt:from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
from dagster_slack import SlackResource
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource, slack: SlackResource):
yield from dbt.cli(["build"], context=context).stream()
slack_client = slack.get_client()
slack_client.chat_postMessage(channel="#my-channel", text="dbt build succeeded!")Defining and accessing Dagster Config
Config
alongside dbt:from pathlib import Path
from dagster import AssetExecutionContext, Config
from dagster_dbt import DbtCliResource, dbt_assets
class MyDbtConfig(Config):
full_refresh: bool
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource, config: MyDbtConfig):
dbt_build_args = ["build"]
if config.full_refresh:
dbt_build_args += ["--full-refresh"]
yield from dbt.cli(dbt_build_args, context=context).stream()Defining Dagster
PartitionDefinition
alongside dbt:import json
from pathlib import Path
from dagster import AssetExecutionContext, DailyPartitionDefinition
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(
manifest=Path("target", "manifest.json"),
partitions_def=DailyPartitionsDefinition(start_date="2023-01-01")
)
def partitionshop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
time_window = context.partition_time_window
dbt_vars = \{
"min_date": time_window.start.isoformat(),
"max_date": time_window.end.isoformat()
}
dbt_build_args = ["build", "--vars", json.dumps(dbt_vars)]
yield from dbt.cli(dbt_build_args, context=context).stream()
- class dagster_dbt.DagsterDbtTranslator
Holds a set of methods that derive Dagster asset definition metadata given a representation of a dbt resource (models, tests, sources, etc).
This class is exposed so that methods can be overriden to customize how Dagster asset metadata is derived.
- get_asset_key
A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster asset key that represents that resource.
Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
This method can be overridden to provide a custom asset key for a dbt resource.
Parameters: dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.Returns: The Dagster asset key for the dbt resource.Return type: AssetKey Examples:
Adding a prefix to the default asset key generated for each dbt resource:
from typing import Any, Mapping
from dagster import AssetKey
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
return super().get_asset_key(dbt_resource_props).with_prefix("prefix")Adding a prefix to the default asset key generated for each dbt resource, but only for dbt sources:
from typing import Any, Mapping
from dagster import AssetKey
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
asset_key = super().get_asset_key(dbt_resource_props)
if dbt_resource_props["resource_type"] == "source":
asset_key = asset_key.with_prefix("my_prefix")
return asset_key
- get_auto_materialize_policy
- experimental
This API may break in future versions, even between dot releases.
A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster
dagster.AutoMaterializePolicy
for that resource.Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
This method can be overridden to provide a custom auto-materialize policy for a dbt resource.
Parameters: dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.Returns: A Dagster auto-materialize policy.Return type: Optional[AutoMaterializePolicy] Examples:
Set a custom auto-materialize policy for all dbt resources:
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_auto_materialize_policy(self, dbt_resource_props: Mapping[str, Any]) -> Optional[AutoMaterializePolicy]:
return AutoMaterializePolicy.eager()Set a custom auto-materialize policy for dbt resources with a specific tag:
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_auto_materialize_policy(self, dbt_resource_props: Mapping[str, Any]) -> Optional[AutoMaterializePolicy]:
auto_materialize_policy = None
if "my_custom_tag" in dbt_resource_props.get("tags", []):
auto_materialize_policy = AutoMaterializePolicy.eager()
return auto_materialize_policy
- get_automation_condition
- experimental
This API may break in future versions, even between dot releases.
A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster
dagster.AutoMaterializePolicy
for that resource.Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
This method can be overridden to provide a custom AutomationCondition for a dbt resource.
Parameters: dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.Returns: A Dagster auto-materialize policy.Return type: Optional[AutoMaterializePolicy] Examples:
Set a custom AutomationCondition for all dbt resources:
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_automation_condition(self, dbt_resource_props: Mapping[str, Any]) -> Optional[AutomationCondition]:
return AutomationCondition.eager()Set a custom AutomationCondition for dbt resources with a specific tag:
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_automation_condition(self, dbt_resource_props: Mapping[str, Any]) -> Optional[AutomationCondition]:
automation_condition = None
if "my_custom_tag" in dbt_resource_props.get("tags", []):
automation_condition = AutomationCondition.eager()
return automation_condition
- get_description
A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster description for that resource.
Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
This method can be overridden to provide a custom description for a dbt resource.
Parameters: dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.Returns: The description for the dbt resource.Return type: str Examples:
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_description(self, dbt_resource_props: Mapping[str, Any]) -> str:
return "custom description"
- get_freshness_policy
- experimental
This API may break in future versions, even between dot releases.
A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster
dagster.FreshnessPolicy
for that resource.Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
This method can be overridden to provide a custom freshness policy for a dbt resource.
Parameters: dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.Returns: A Dagster freshness policy.Return type: Optional[FreshnessPolicy] Examples:
Set a custom freshness policy for all dbt resources:
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_freshness_policy(self, dbt_resource_props: Mapping[str, Any]) -> Optional[FreshnessPolicy]:
return FreshnessPolicy(maximum_lag_minutes=60)Set a custom freshness policy for dbt resources with a specific tag:
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_freshness_policy(self, dbt_resource_props: Mapping[str, Any]) -> Optional[FreshnessPolicy]:
freshness_policy = None
if "my_custom_tag" in dbt_resource_props.get("tags", []):
freshness_policy = FreshnessPolicy(maximum_lag_minutes=60)
return freshness_policy
- get_group_name
A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster group name for that resource.
Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
This method can be overridden to provide a custom group name for a dbt resource.
Parameters: dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.Returns: A Dagster group name.Return type: Optional[str] Examples:
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:
return "custom_group_prefix" + dbt_resource_props.get("config", \{}).get("group")
- get_metadata
A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster metadata for that resource.
Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
This method can be overridden to provide a custom metadata for a dbt resource.
Parameters: dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.Returns: A dictionary representing the Dagster metadata for the dbt resource.Return type: Mapping[str, Any] Examples:
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]:
return \{"custom": "metadata"}
- get_owners
A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster owners for that resource.
Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
This method can be overridden to provide custom owners for a dbt resource.
Parameters: dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.Returns: A set of Dagster owners.Return type: Optional[Sequence[str]] Examples:
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_owners(self, dbt_resource_props: Mapping[str, Any]) -> Optional[Sequence[str]]:
return ["user@owner.com", "team:team@owner.com"]
- get_partition_mapping
- experimental
This API may break in future versions, even between dot releases.
A function that takes two dictionaries: the first, representing properties of a dbt resource; and the second, representing the properties of a parent dependency to the first dbt resource. The function returns the Dagster partition mapping for the dbt dependency.
Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
This method can be overridden to provide a custom partition mapping for a dbt dependency.
Parameters:
- dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt child resource.
- dbt_parent_resource_props (Mapping[str, Any]) – A dictionary representing the dbt parent resource, in relationship to the child.
Returns: The Dagster partition mapping for the dbt resource. If None is returned, the default partition mapping will be used.Return type: Optional[PartitionMapping]
- get_tags
A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster tags for that resource.
Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
dbt tags are strings, but Dagster tags are key-value pairs. To bridge this divide, the dbt tag string is used as the Dagster tag key, and the Dagster tag value is set to the empty string, “”.
Any dbt tags that don’t match Dagster’s supported tag key format (e.g. they contain unsupported characters) will be ignored.
This method can be overridden to provide custom tags for a dbt resource.
Parameters: dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.Returns: A dictionary representing the Dagster tags for the dbt resource.Return type: Mapping[str, str] Examples:
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_tags(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, str]:
return \{"custom": "tag"}
- class dagster_dbt.DagsterDbtTranslatorSettings
Settings to enable Dagster features for your dbt project.
Parameters:
- enable_asset_checks (bool) – Whether to load dbt tests as Dagster asset checks.
- enable_duplicate_source_asset_keys (bool) – Whether to allow dbt sources with duplicate
- enable_code_references (bool) – Whether to enable Dagster code references for dbt resources.
- enable_dbt_selection_by_name (bool) – Whether to enable selecting dbt resources by name,
- class dagster_dbt.DbtManifestAssetSelection
Defines a selection of assets from a dbt manifest wrapper and a dbt selection string.
Parameters:
- manifest (Mapping[str, Any]) – The dbt manifest blob.
- select (str) – A dbt selection string to specify a set of dbt resources.
- exclude (Optional[str]) – A dbt selection string to exclude a set of dbt resources.
Examples:
import json
from pathlib import Path
from dagster_dbt import DbtManifestAssetSelection
manifest = json.loads(Path("path/to/manifest.json").read_text())
# select the dbt assets that have the tag "foo".
my_selection = DbtManifestAssetSelection(manifest=manifest, select="tag:foo")
- dagster_dbt.build_dbt_asset_selection
Build an asset selection for a dbt selection string.
See https://docs.getdbt.com/reference/node-selection/syntax#how-does-selection-work for more information.
Parameters:
- dbt_select (str) – A dbt selection string to specify a set of dbt resources.
- dbt_exclude (Optional[str]) – A dbt selection string to exclude a set of dbt resources.
Returns: An asset selection for the selected dbt nodes.Return type: AssetSelection Examples:
from dagster_dbt import dbt_assets, build_dbt_asset_selection
@dbt_assets(manifest=...)
def all_dbt_assets():
...
# Select the dbt assets that have the tag "foo".
foo_selection = build_dbt_asset_selection([dbt_assets], dbt_select="tag:foo")
# Select the dbt assets that have the tag "foo" and all Dagster assets downstream
# of them (dbt-related or otherwise)
foo_and_downstream_selection = foo_selection.downstream()Building an asset selection on a dbt assets definition with an existing selection:
from dagster_dbt import dbt_assets, build_dbt_asset_selection
@dbt_assets(
manifest=...
select="bar+",
)
def bar_plus_dbt_assets():
...
# Select the dbt assets that are in the intersection of having the tag "foo" and being
# in the existing selection "bar+".
bar_plus_and_foo_selection = build_dbt_asset_selection(
[bar_plus_dbt_assets],
dbt_select="tag:foo"
)
# Furthermore, select all assets downstream (dbt-related or otherwise).
bar_plus_and_foo_and_downstream_selection = bar_plus_and_foo_selection.downstream()
- dagster_dbt.build_schedule_from_dbt_selection
Build a schedule to materialize a specified set of dbt resources from a dbt selection string.
See https://docs.getdbt.com/reference/node-selection/syntax#how-does-selection-work for more information.
Parameters:
- job_name (str) – The name of the job to materialize the dbt resources.
- cron_schedule (str) – The cron schedule to define the schedule.
- dbt_select (str) – A dbt selection string to specify a set of dbt resources.
- dbt_exclude (Optional[str]) – A dbt selection string to exclude a set of dbt resources.
- schedule_name (Optional[str]) – The name of the dbt schedule to create.
- tags (Optional[Mapping[str, str]]) – A dictionary of tags (string key-value pairs) to attach
- config (Optional[RunConfigRunConfig]) – The config that parameterizes the execution of this schedule.
- execution_timezone (Optional[str]) – Timezone in which the schedule should run.
Returns: A definition to materialize the selected dbt resources on a cron schedule.Return type: ScheduleDefinition Examples:
from dagster_dbt import dbt_assets, build_schedule_from_dbt_selection
@dbt_assets(manifest=...)
def all_dbt_assets():
...
daily_dbt_assets_schedule = build_schedule_from_dbt_selection(
[all_dbt_assets],
job_name="all_dbt_assets",
cron_schedule="0 0 * * *",
dbt_select="fqn:*",
)
- dagster_dbt.get_asset_key_for_model
Return the corresponding Dagster asset key for a dbt model, seed, or snapshot.
Parameters:
- dbt_assets (AssetsDefinitionAssetsDefinition) – An AssetsDefinition object produced by @dbt_assets.
- model_name (str) – The name of the dbt model, seed, or snapshot.
Returns: The corresponding Dagster asset key.Return type: AssetKey Examples:
from dagster import asset
from dagster_dbt import dbt_assets, get_asset_key_for_model
@dbt_assets(manifest=...)
def all_dbt_assets():
...
@asset(deps=\{get_asset_key_for_model([all_dbt_assets], "customers")})
def cleaned_customers():
...
- dagster_dbt.get_asset_key_for_source
Returns the corresponding Dagster asset key for a dbt source with a singular table.
Parameters: source_name (str) – The name of the dbt source.Raises: DagsterInvalidInvocationError – If the source has more than one table.Returns: The corresponding Dagster asset key.Return type: AssetKey Examples:
from dagster import asset
from dagster_dbt import dbt_assets, get_asset_key_for_source
@dbt_assets(manifest=...)
def all_dbt_assets():
...
@asset(key=get_asset_key_for_source([all_dbt_assets], "my_source"))
def upstream_python_asset():
...
- dagster_dbt.get_asset_keys_by_output_name_for_source
Returns the corresponding Dagster asset keys for all tables in a dbt source.
This is a convenience method that makes it easy to define a multi-asset that generates all the tables for a given dbt source.
Parameters: source_name (str) – The name of the dbt source.Returns: A mapping of the table name to corresponding Dagster asset key for all tables in the given dbt source.
Return type: Mapping[str, AssetKey] Examples:
from dagster import AssetOut, multi_asset
from dagster_dbt import dbt_assets, get_asset_keys_by_output_name_for_source
@dbt_assets(manifest=...)
def all_dbt_assets():
...
@multi_asset(
outs=\{
name: AssetOut(key=asset_key)
for name, asset_key in get_asset_keys_by_output_name_for_source(
[all_dbt_assets], "raw_data"
).items()
},
)
def upstream_python_asset():
...
- class dagster_dbt.DbtProject
Representation of a dbt project and related settings that assist with managing the project preparation.
Using this helps achieve a setup where the dbt manifest file and dbt dependencies are available and up-to-date:
- during development, pull the dependencies and reload the manifest at run time to pick up any changes.
- when deployed, expect a manifest that was created at build time to reduce start-up time.
The cli
dagster-dbt project prepare-and-package
can be used as part of the deployment process to handle the project preparation.This object can be passed directly to DbtCliResource
DbtCliResource
.Parameters:
- project_dir (Union[str, Path]) – The directory of the dbt project.
- target_path (Union[str, Path]) – The path, relative to the project directory, to output artifacts.
- target (Optional[str]) – The target from your dbt profiles.yml to use for execution, if it should be explicitly set.
- packaged_project_dir (Optional[Union[str, Path]]) – A directory that will contain a copy of the dbt project and the manifest.json
- state_path (Optional[Union[str, Path]]) – The path, relative to the project directory, to reference artifacts from another run.
Examples:
Creating a DbtProject with by referencing the dbt project directory:
from pathlib import Path
from dagster_dbt import DbtProject
my_project = DbtProject(project_dir=Path("path/to/dbt_project"))Creating a DbtProject that changes target based on environment variables and uses manged state artifacts:
import os
from pathlib import Path
from dagster_dbt import DbtProject
def get_env():
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1":
return "BRANCH"
if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "prod":
return "PROD"
return "LOCAL"
dbt_project = DbtProject(
project_dir=Path('path/to/dbt_project'),
state_path="target/managed_state",
target=get_env(),
)- prepare_if_dev
Prepare a dbt project at run time during development, i.e. when dagster dev is used. This method has no effect outside this development context.
The preparation process ensures that the dbt manifest file and dbt dependencies are available and up-to-date. During development, it pulls the dependencies and reloads the manifest at run time to pick up any changes.
If this method returns successfully, self.manifest_path will point to a loadable manifest file. This method causes errors if the manifest file has not been correctly created by the preparation process.
Examples:
Preparing a DbtProject during development:
from pathlib import Path
from dagster import Definitions
from dagster_dbt import DbtProject
my_project = DbtProject(project_dir=Path("path/to/dbt_project"))
my_project.prepare_if_dev()
defs = Definitions(
resources=\{
"dbt": DbtCliResource(project_dir=my_project),
},
...
)
Asset Checks (dbt Core)
- dagster_dbt.build_freshness_checks_from_dbt_assets
- experimental
This API may break in future versions, even between dot releases.
Returns a sequence of freshness checks constructed from the provided dbt assets.
Freshness checks can be configured on a per-model basis in the model schema configuration.
For assets which are not partitioned based on time, the freshness check configuration mirrors that of the
build_last_update_freshness_checks()
function. lower_bound_delta is provided in terms of seconds, and deadline_cron is optional.For time-partitioned assets, the freshness check configuration mirrors that of the
build_time_partition_freshness_checks()
function.Below is example of configuring a non-time-partitioned dbt asset with a freshness check. This code would be placed in the schema.yml file for the dbt model.
models:
- name: customers
...
meta:
dagster:
freshness_check:
lower_bound_delta_seconds: 86400 # 1 day
deadline_cron: "0 0 * * *" # Optional
severity: "WARN" # Optional, defaults to "WARN"Below is an example of configuring a time-partitioned dbt asset with a freshness check. This code would be placed in the schema.yml file for the dbt model.
models:
- name: customers
...
meta:
dagster:
freshness_check:
deadline_cron: "0 0 * * *"
severity: "WARN" # Optional, defaults to "WARN"Parameters: dbt_assets (Sequence[AssetsDefinitionAssetsDefinition]) – A sequence of dbt assets to construct freshness checks from.Returns: A sequence of asset checks definitions representing the freshness checks for the provided dbt assets.
Return type: Sequence[AssetChecksDefinition]
Resources (dbt Core)
CLI Resource
- class dagster_dbt.DbtCliResource
A resource used to execute dbt CLI commands.
- project_dir
The path to the dbt project directory. This directory should contain a dbt_project.yml. See https://docs.getdbt.com/reference/dbt_project.yml for more information.
Type: str
- global_config_flags
A list of global flags configuration to pass to the dbt CLI invocation. Invoke dbt –help to see a full list of global flags.
Type: List[str]
- profiles_dir
The path to the directory containing your dbt profiles.yml. By default, the current working directory is used, which is the dbt project directory. See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more information.
Type: Optional[str]
- profile
The profile from your dbt profiles.yml to use for execution. See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more information.
Type: Optional[str]
- target
The target from your dbt profiles.yml to use for execution. See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more information.
Type: Optional[str]
- dbt_executable
The path to the dbt executable. By default, this is dbt.
Type: str
- state_path
The path, relative to the project directory, to a directory of dbt artifacts to be used with –state / –defer-state.
Type: Optional[str]
Examples:
Creating a dbt resource with only a reference to
project_dir
:from dagster_dbt import DbtCliResource
dbt = DbtCliResource(project_dir="/path/to/dbt/project")Creating a dbt resource with a custom
profiles_dir
:from dagster_dbt import DbtCliResource
dbt = DbtCliResource(
project_dir="/path/to/dbt/project",
profiles_dir="/path/to/dbt/project/profiles",
)Creating a dbt resource with a custom
profile
andtarget
:from dagster_dbt import DbtCliResource
dbt = DbtCliResource(
project_dir="/path/to/dbt/project",
profiles_dir="/path/to/dbt/project/profiles",
profile="jaffle_shop",
target="dev",
)Creating a dbt resource with global configs, e.g. disabling colored logs with
--no-use-color
:from dagster_dbt import DbtCliResource
dbt = DbtCliResource(
project_dir="/path/to/dbt/project",
global_config_flags=["--no-use-color"],
)Creating a dbt resource with custom dbt executable path:
from dagster_dbt import DbtCliResource
dbt = DbtCliResource(
project_dir="/path/to/dbt/project",
dbt_executable="/path/to/dbt/executable",
)- cli
Create a subprocess to execute a dbt CLI command.
Parameters:
- args (Sequence[str]) – The dbt CLI command to execute.
- raise_on_error (bool) – Whether to raise an exception if the dbt CLI command fails.
- manifest (Optional[Union[Mapping[str, Any], str, Path]]) – The dbt manifest blob. If an
- dagster_dbt_translator (Optional[DagsterDbtTranslatorDagsterDbtTranslator]) – The translator to link dbt
- context (Optional[Union[OpExecutionContextOpExecutionContext, AssetExecutionContextAssetExecutionContext]]) – The execution context from within @dbt_assets.
- target_path (Optional[Path]) – An explicit path to a target folder to use to store and
Returns: A invocation instance that can be used to retrieve the output of the dbt CLI command.
Return type: DbtCliInvocation Examples:
Streaming Dagster events for dbt asset materializations and observations:
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["run"], context=context).stream()Retrieving a dbt artifact after streaming the Dagster events:
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
dbt_run_invocation = dbt.cli(["run"], context=context)
yield from dbt_run_invocation.stream()
# Retrieve the `run_results.json` dbt artifact as a dictionary:
run_results_json = dbt_run_invocation.get_artifact("run_results.json")
# Retrieve the `run_results.json` dbt artifact as a file path:
run_results_path = dbt_run_invocation.target_path.joinpath("run_results.json")Customizing the asset materialization metadata when streaming the Dagster events:
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
dbt_cli_invocation = dbt.cli(["run"], context=context)
for dagster_event in dbt_cli_invocation.stream():
if isinstance(dagster_event, Output):
context.add_output_metadata(
metadata=\{
"my_custom_metadata": "my_custom_metadata_value",
},
output_name=dagster_event.output_name,
)
yield dagster_eventSuppressing exceptions from a dbt CLI command when a non-zero exit code is returned:
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
dbt_run_invocation = dbt.cli(["run"], context=context, raise_on_error=False)
if dbt_run_invocation.is_successful():
yield from dbt_run_invocation.stream()
else:
...Invoking a dbt CLI command in a custom asset or op:
import json
from dagster import Nothing, Out, asset, op
from dagster_dbt import DbtCliResource
@asset
def my_dbt_asset(dbt: DbtCliResource):
dbt_macro_args = \{"key": "value"}
dbt.cli(["run-operation", "my-macro", json.dumps(dbt_macro_args)]).wait()
@op(out=Out(Nothing))
def my_dbt_op(dbt: DbtCliResource):
dbt_macro_args = \{"key": "value"}
yield from dbt.cli(["run-operation", "my-macro", json.dumps(dbt_macro_args)]).stream()
- get_defer_args
Build the defer arguments for the dbt CLI command, using the supplied state directory. If no state directory is supplied, or the state directory does not have a manifest for. comparison, an empty list of arguments is returned.
Returns: The defer arguments for the dbt CLI command.Return type: Sequence[str]
- get_state_args
Build the state arguments for the dbt CLI command, using the supplied state directory. If no state directory is supplied, or the state directory does not have a manifest for. comparison, an empty list of arguments is returned.
Returns: The state arguments for the dbt CLI command.Return type: Sequence[str]
- class dagster_dbt.DbtCliInvocation
The representation of an invoked dbt command.
Parameters:
- process (subprocess.Popen) – The process running the dbt command.
- manifest (Mapping[str, Any]) – The dbt manifest blob.
- project_dir (Path) – The path to the dbt project.
- target_path (Path) – The path to the dbt target folder.
- raise_on_error (bool) – Whether to raise an exception if the dbt command fails.
- get_artifact
Retrieve a dbt artifact from the target path.
See https://docs.getdbt.com/reference/artifacts/dbt-artifacts for more information.
Parameters: artifact (Union[Literal["manifest.json"], Literal["catalog.json"], Literal["run_results.json"], Literal["sources.json"]]) – The name of the artifact to retrieve.Returns: The artifact as a dictionary.Return type: Dict[str, Any] Examples:
from dagster_dbt import DbtCliResource
dbt = DbtCliResource(project_dir="/path/to/dbt/project")
dbt_cli_invocation = dbt.cli(["run"]).wait()
# Retrieve the run_results.json artifact.
run_results = dbt_cli_invocation.get_artifact("run_results.json")
- get_error
Return an exception if the dbt CLI process failed.
Returns: An exception if the dbt CLI process failed, and None otherwise.Return type: Optional[Exception] Examples:
from dagster_dbt import DbtCliResource
dbt = DbtCliResource(project_dir="/path/to/dbt/project")
dbt_cli_invocation = dbt.cli(["run"], raise_on_error=False)
error = dbt_cli_invocation.get_error()
if error:
logger.error(error)
- is_successful
Return whether the dbt CLI process completed successfully.
Returns: True, if the dbt CLI process returns with a zero exit code, and False otherwise.Return type: bool Examples:
from dagster_dbt import DbtCliResource
dbt = DbtCliResource(project_dir="/path/to/dbt/project")
dbt_cli_invocation = dbt.cli(["run"], raise_on_error=False)
if dbt_cli_invocation.is_successful():
...
- stream
Stream the events from the dbt CLI process and convert them to Dagster events.
Returns: A set of corresponding Dagster events.
In a Dagster asset definition, the following are yielded:
- Output for refables (e.g. models, seeds, snapshots.)
- AssetCheckResult for dbt test results that are enabled as asset checks.
- AssetObservation for dbt test results that are not enabled as asset checks.
In a Dagster op definition, the following are yielded:
- AssetMaterialization for dbt test results that are not enabled as asset checks.
- AssetObservation for dbt test results.
Return type: Iterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult]] Examples:
from pathlib import Path
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context, dbt: DbtCliResource):
yield from dbt.cli(["run"], context=context).stream()
- stream_raw_events
Stream the events from the dbt CLI process.
Returns: An iterator of events from the dbt CLI process.Return type: Iterator[DbtCliEventMessage]
- wait
Wait for the dbt CLI process to complete.
Returns: The current representation of the dbt CLI invocation.Return type: DbtCliInvocation Examples:
from dagster_dbt import DbtCliResource
dbt = DbtCliResource(project_dir="/path/to/dbt/project")
dbt_cli_invocation = dbt.cli(["run"]).wait()
- class dagster_dbt.core.dbt_cli_invocation.DbtEventIterator
A wrapper around an iterator of dbt events which contains additional methods for post-processing the events, such as fetching row counts for materialized tables.
- fetch_column_metadata
- experimental
This API may break in future versions, even between dot releases.
Experimental functionality which will fetch column schema metadata for dbt models in a run once they’re built. It will also fetch schema information for upstream models and generate column lineage metadata using sqlglot, if enabled.
Parameters: generate_column_lineage (bool) – Whether to generate column lineage metadata using sqlglot.Returns: A set of corresponding Dagster events for dbt models, with column metadata attached, yielded in the order they are emitted by dbt.Return type: Iterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult]]
- fetch_row_counts
- experimental
This API may break in future versions, even between dot releases.
Experimental functionality which will fetch row counts for materialized dbt models in a dbt run once they are built. Note that row counts will not be fetched for views, since this requires running the view’s SQL query which may be costly.
Returns: A set of corresponding Dagster events for dbt models, with row counts attached, yielded in the order they are emitted by dbt.Return type: Iterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult]]
- with_insights
- experimental
This API may break in future versions, even between dot releases.
Associate each warehouse query with the produced asset materializations for use in Dagster Plus Insights. Currently supports Snowflake and BigQuery.
For more information, see the documentation for dagster_cloud.dagster_insights.dbt_with_snowflake_insights and dagster_cloud.dagster_insights.dbt_with_bigquery_insights.
Parameters:
- skip_config_check (bool) – If true, skips the check that the dbt project config is set up
- record_observation_usage (bool) – If True, associates the usage associated with
@dbt_assets(manifest=DBT_MANIFEST_PATH)
def jaffle_shop_dbt_assets(
context: AssetExecutionContext,
dbt: DbtCliResource,
):
yield from dbt.cli(["build"], context=context).stream().with_insights()
- class dagster_dbt.DbtCliEventMessage
The representation of a dbt CLI event.
Parameters:
- raw_event (Dict[str, Any]) – The raw event dictionary.
- event_history_metadata (Dict[str, Any]) – A dictionary of metadata about the
- to_default_asset_events
Convert a dbt CLI event to a set of corresponding Dagster events.
Parameters:
- manifest (Union[Mapping[str, Any], str, Path]) – The dbt manifest blob.
- dagster_dbt_translator (DagsterDbtTranslatorDagsterDbtTranslator) – Optionally, a custom translator for
- context (Optional[Union[OpExecutionContextOpExecutionContext, AssetExecutionContextAssetExecutionContext]]) – The execution context.
- target_path (Optional[Path]) – An explicit path to a target folder used to retrieve
Returns: A set of corresponding Dagster events.
In a Dagster asset definition, the following are yielded:
- Output for refables (e.g. models, seeds, snapshots.)
- AssetCheckResult for dbt test results that are enabled as asset checks.
- AssetObservation for dbt test results that are not enabled as asset checks.
In a Dagster op definition, the following are yielded:
- AssetMaterialization for dbt test results that are not enabled as asset checks.
- AssetObservation for dbt test results.
Return type: Iterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult]]
dbt Cloud
Here, we provide interfaces to manage dbt projects invoked by the hosted dbt Cloud service.
Assets (dbt Cloud)
- dagster_dbt.load_assets_from_dbt_cloud_job
- experimental
This API may break in future versions, even between dot releases.
Loads a set of dbt models, managed by a dbt Cloud job, into Dagster assets. In order to determine the set of dbt models, the project is compiled to generate the necessary artifacts that define the dbt models and their dependencies.
One Dagster asset is created for each dbt model.
Parameters:
- dbt_cloud (ResourceDefinitionResourceDefinition) – The dbt Cloud resource to use to connect to the dbt Cloud API.
- job_id (int) – The ID of the dbt Cloud job to load assets from.
- node_info_to_asset_key – (Mapping[str, Any] -> AssetKey): A function that takes a dictionary
- node_info_to_group_fn (Dict[str, Any] -> Optional[str]) – A function that takes a
- node_info_to_freshness_policy_fn (Dict[str, Any] -> Optional[FreshnessPolicy]) – A function
- node_info_to_auto_materialize_policy_fn (Dict[str, Any] -> Optional[AutoMaterializePolicy]) – A function that takes a dictionary of dbt node info and optionally returns a AutoMaterializePolicy
- node_info_to_definition_metadata_fn (Dict[str, Any] -> Optional[Dict[str, RawMetadataMapping]]) – A function that takes a dictionary of dbt node info and optionally returns a dictionary
- partitions_def (Optional[PartitionsDefinitionPartitionsDefinition]) – experimental
- partition_key_to_vars_fn (Optional[str -> Dict[str, Any]]) – experimental
Returns: A definition for the loaded assets.Return type: CacheableAssetsDefinition Examples:
from dagster import repository
from dagster_dbt import dbt_cloud_resource, load_assets_from_dbt_cloud_job
DBT_CLOUD_JOB_ID = 1234
dbt_cloud = dbt_cloud_resource.configured(
\{
"auth_token": \{"env": "DBT_CLOUD_API_TOKEN"},
"account_id": \{"env": "DBT_CLOUD_ACCOUNT_ID"},
}
)
dbt_cloud_assets = load_assets_from_dbt_cloud_job(
dbt_cloud=dbt_cloud, job_id=DBT_CLOUD_JOB_ID
)
@repository
def dbt_cloud_sandbox():
return [dbt_cloud_assets]
Ops (dbt Cloud)
- dagster_dbt.dbt_cloud_run_op
=
<dagster._core.definitions.op_definition.OpDefinition object> Initiates a run for a dbt Cloud job, then polls until the run completes. If the job fails or is otherwised stopped before succeeding, a dagster.Failure exception will be raised, and this op will fail.
It requires the use of a ‘dbt_cloud’ resource, which is used to connect to the dbt Cloud API.
Config Options:job_id (int) The integer ID of the relevant dbt Cloud job. You can find this value by going to the details page of your job in the dbt Cloud UI. It will be the final number in the url, e.g.:
https://cloud.getdbt.com/#/accounts/\{account_id}/projects/\{project_id}/jobs/\{job_id}/
poll_interval (float) The time (in seconds) that will be waited between successive polls. Defaults to
10
.poll_timeout (float) The maximum time (in seconds) that will waited before this operation is timed out. By default, this will never time out.
yield_materializations (bool) If True, materializations corresponding to the results of the dbt operation will be yielded when the solid executes. Defaults to
True
.rasset_key_prefix (float) If provided and yield_materializations is True, these components will be used to ” prefix the generated asset keys. Defaults to [“dbt”].
Examples:from dagster import job
from dagster_dbt import dbt_cloud_resource, dbt_cloud_run_op
my_dbt_cloud_resource = dbt_cloud_resource.configured(
\{"auth_token": \{"env": "DBT_CLOUD_AUTH_TOKEN"}, "account_id": 77777}
)
run_dbt_nightly_sync = dbt_cloud_run_op.configured(
\{"job_id": 54321}, name="run_dbt_nightly_sync"
)
@job(resource_defs=\{"dbt_cloud": my_dbt_cloud_resource})
def dbt_cloud():
run_dbt_nightly_sync()
Resources (dbt Cloud)
- class dagster_dbt.DbtCloudClientResource
This resource helps interact with dbt Cloud connectors.
Deprecated (dbt Cloud)
- dagster_dbt.dbt_cloud_resource ResourceDefinition
This resource allows users to programatically interface with the dbt Cloud Administrative REST API (v2) to launch jobs and monitor their progress. This currently implements only a subset of the functionality exposed by the API.
For a complete set of documentation on the dbt Cloud Administrative REST API, including expected response JSON schemae, see the dbt Cloud API Docs.
To configure this resource, we recommend using the configured method.
Examples:from dagster import job
from dagster_dbt import dbt_cloud_resource
my_dbt_cloud_resource = dbt_cloud_resource.configured(
\{
"auth_token": \{"env": "DBT_CLOUD_AUTH_TOKEN"},
"account_id": \{"env": "DBT_CLOUD_ACCOUNT_ID"},
}
)
@job(resource_defs=\{"dbt_cloud": my_dbt_cloud_resource})
def my_dbt_cloud_job():
...
Errors
- exception dagster_dbt.DagsterDbtError
The base exception of the
dagster-dbt
library.
- exception dagster_dbt.DagsterDbtCliRuntimeError
Represents an error while executing a dbt CLI command.
Utils
- dagster_dbt.default_group_from_dbt_resource_props
Get the group name for a dbt node.
If a Dagster group is configured in the metadata for the node, use that.
Otherwise, if a dbt group is configured for the node, use that.
- dagster_dbt.group_from_dbt_resource_props_fallback_to_directory
Get the group name for a dbt node.
Has the same behavior as the default_group_from_dbt_resource_props, except for that, if no group can be determined from config or metadata, falls back to using the subdirectory of the models directory that the source file is in.
Parameters: dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.