Skip to main content

Defining dependencies with asset factories

In data engineering, it's often helpful to reuse code to define similar assets. For example, you may want to represent every file in a directory as an asset.

Additionally, you may be serving stakeholders who aren't familiar with Python or Dagster. They may prefer interacting with assets using a domain-specific language (DSL) built on top of a configuration language such as YAML.

Using an asset factory reduces complexity and creates a pluggable entry point to define additional assets.

note

This guide assumes familiarity with asset factories.


Building an asset factory in Python

Imagine a data analytics team that maintains a large number of tables. To support analytics needs, the team runs queries and constructs new tables from the results.

Each table can be represented in YAML by a name, upstream asset dependencies, and a query:

YAML Definition for ETL tables
etl_tables:
- name: cleaned_transactions
deps:
- transactions
query: |
create or replace table cleaned_transactions as (
SELECT * FROM transactions WHERE amount IS NOT NULL
)

- name: risky_customers
deps:
- customers
query: |
create or replace table risky_customers as (
SELECT * FROM customers WHERE risk_score > 0.8
)

- name: risky_transactions
deps:
- cleaned_transactions
- risky_customers
query: |
create or replace table risky_transactions as (
SELECT *
FROM cleaned_transactions JOIN risky_customers
ON cleaned_transactions.customer_id = risky_customers.customer_id
)

Here's how you might add Python logic to define these assets in Dagster.

Programmatically defining asset dependencies
from collections.abc import Sequence

import yaml
from dagster_snowflake import SnowflakeResource

import dagster as dg


def build_etl_table(name: str, deps: Sequence[str], query: str) -> dg.Definitions:
@dg.asset(name=name, deps=deps)
def etl_table(context, snowflake: SnowflakeResource):
with snowflake.get_connection() as conn:
conn.cursor.execute(query)

return etl_table


def load_etl_tables_from_yaml(yaml_path: str) -> Sequence[dg.AssetsDefinition]:
config = yaml.safe_load(open(yaml_path))
factory_assets = [
build_etl_table(
name=table_config["name"],
deps=table_config["deps"],
query=table_config["query"],
)
for table_config in config["etl_tables"]
]
return factory_assets


defs = dg.Definitions(
assets=load_etl_tables_from_yaml(
dg.file_relative_path(__file__, "table_definitions.yaml")
),
resources={
"snowflake": SnowflakeResource(
user=dg.EnvVar("SNOWFLAKE_USER"),
account=dg.EnvVar("SNOWFLAKE_ACCOUNT"),
password=dg.EnvVar("SNOWFLAKE_PASSWORD"),
)
},
)

Defining dependencies between factory assets and regular assets

Here's how you might add Python logic to define a Dagster asset downstream of factory assets:

Defining dependencies between factory assets and regular assets
from collections.abc import Sequence

import yaml
from dagster_snowflake import SnowflakeResource

import dagster as dg


def build_etl_table(name: str, deps: Sequence[str], query: str) -> dg.Definitions:
@dg.asset(name=name, deps=deps)
def etl_table(context, snowflake: SnowflakeResource):
with snowflake.get_connection() as conn:
conn.cursor.execute(query)

return etl_table


def load_etl_tables_from_yaml(yaml_path: str) -> Sequence[dg.AssetsDefinition]:
config = yaml.safe_load(open(yaml_path))
factory_assets = [
build_etl_table(
name=table_config["name"],
deps=table_config["deps"],
query=table_config["query"],
)
for table_config in config["etl_tables"]
]
return factory_assets


etl_tables = load_etl_tables_from_yaml(
dg.file_relative_path(__file__, "table_definitions.yaml")
)


@dg.asset(deps=[etl_table.key for etl_table in etl_tables] if etl_tables else [])
def aggregated_metrics(): ...


defs = dg.Definitions(assets=[*etl_tables, aggregated_metrics])