Defining dependencies with asset factories
In data engineering, it's often helpful to reuse code to define similar assets. For example, you may want to represent every file in a directory as an asset.
Additionally, you may be serving stakeholders who aren't familiar with Python or Dagster. They may prefer interacting with assets using a domain-specific language (DSL) built on top of a configuration language such as YAML.
Using an asset factory reduces complexity and creates a pluggable entry point to define additional assets.
This guide assumes familiarity with asset factories.
Building an asset factory in Python
Imagine a data analytics team that maintains a large number of tables. To support analytics needs, the team runs queries and constructs new tables from the results.
Each table can be represented in YAML by a name, upstream asset dependencies, and a query:
etl_tables:
- name: cleaned_transactions
deps:
- transactions
query: |
create or replace table cleaned_transactions as (
SELECT * FROM transactions WHERE amount IS NOT NULL
)
- name: risky_customers
deps:
- customers
query: |
create or replace table risky_customers as (
SELECT * FROM customers WHERE risk_score > 0.8
)
- name: risky_transactions
deps:
- cleaned_transactions
- risky_customers
query: |
create or replace table risky_transactions as (
SELECT *
FROM cleaned_transactions JOIN risky_customers
ON cleaned_transactions.customer_id = risky_customers.customer_id
)
Here's how you might add Python logic to define these assets in Dagster.
from collections.abc import Sequence
import yaml
from dagster_snowflake import SnowflakeResource
import dagster as dg
def build_etl_table(name: str, deps: Sequence[str], query: str) -> dg.Definitions:
@dg.asset(name=name, deps=deps)
def etl_table(context, snowflake: SnowflakeResource):
with snowflake.get_connection() as conn:
conn.cursor.execute(query)
return etl_table
def load_etl_tables_from_yaml(yaml_path: str) -> Sequence[dg.AssetsDefinition]:
config = yaml.safe_load(open(yaml_path))
factory_assets = [
build_etl_table(
name=table_config["name"],
deps=table_config["deps"],
query=table_config["query"],
)
for table_config in config["etl_tables"]
]
return factory_assets
defs = dg.Definitions(
assets=load_etl_tables_from_yaml(
dg.file_relative_path(__file__, "table_definitions.yaml")
),
resources={
"snowflake": SnowflakeResource(
user=dg.EnvVar("SNOWFLAKE_USER"),
account=dg.EnvVar("SNOWFLAKE_ACCOUNT"),
password=dg.EnvVar("SNOWFLAKE_PASSWORD"),
)
},
)
Defining dependencies between factory assets and regular assets
Here's how you might add Python logic to define a Dagster asset downstream of factory assets:
from collections.abc import Sequence
import yaml
from dagster_snowflake import SnowflakeResource
import dagster as dg
def build_etl_table(name: str, deps: Sequence[str], query: str) -> dg.Definitions:
@dg.asset(name=name, deps=deps)
def etl_table(context, snowflake: SnowflakeResource):
with snowflake.get_connection() as conn:
conn.cursor.execute(query)
return etl_table
def load_etl_tables_from_yaml(yaml_path: str) -> Sequence[dg.AssetsDefinition]:
config = yaml.safe_load(open(yaml_path))
factory_assets = [
build_etl_table(
name=table_config["name"],
deps=table_config["deps"],
query=table_config["query"],
)
for table_config in config["etl_tables"]
]
return factory_assets
etl_tables = load_etl_tables_from_yaml(
dg.file_relative_path(__file__, "table_definitions.yaml")
)
@dg.asset(deps=[etl_table.key for etl_table in etl_tables] if etl_tables else [])
def aggregated_metrics(): ...
defs = dg.Definitions(assets=[*etl_tables, aggregated_metrics])