Create and materialize assets
In the first step of the tutorial, you created your Dagster project with the raw data files. In this step, you will:
- Create your initial Definitions object
- Add a DuckDB resource
- Build software-defined assets
- Materialize your assets
1. Create a definitions object
In Dagster, the Definitions API docs object is where you define and organize various components within your project, such as assets and resources.
Open the definitions.py
file in the etl_tutorial
directory and copy the following code into it:
import json
import os
from dagster_duckdb import DuckDBResource
import dagster as dg
defs = dg.Definitions(
assets=[],
resources={},
)
2. Define the DuckDB resource
In Dagster, Resources API docs are the external services, tools, and storage backends you need to do your job. For the storage backend in this project, we'll use DuckDB, a fast, in-process SQL database that runs inside your application. We'll define it once in the definitions object, making it available to all assets and objects that need it.
defs = dg.Definitions(
assets=[],
resources={"duckdb": DuckDBResource(database="data/mydb.duckdb")},
)
3. Create assets
Software defined assets API docs are the main building blocks in Dagster. An asset is composed of three components:
- Asset key or unique identifier.
- An op which is a function that is invoked to produce the asset.
- Upstream dependencies that the asset depends on.
You can read more about our philosophy behind the asset centric approach.
Products asset
First, we will create an asset that creates a DuckDB table to hold data from the products CSV. This asset takes the duckdb
resource defined earlier and returns a MaterializeResult
object.
Additionally, this asset contains metadata in the @dg.asset
decorator parameters to help categorize the asset, and in the return
block to give us a preview of the asset in the Dagster UI.
To create this asset, open the definitions.py
file and copy the following code into it:
@dg.asset(
compute_kind="duckdb",
group_name="ingestion",
)
def products(duckdb: DuckDBResource) -> dg.MaterializeResult:
with duckdb.get_connection() as conn:
conn.execute(
"""
create or replace table products as (
select * from read_csv_auto('data/products.csv')
)
"""
)
preview_query = "select * from products limit 10"
preview_df = conn.execute(preview_query).fetchdf()
row_count = conn.execute("select count(*) from products").fetchone()
count = row_count[0] if row_count else 0
return dg.MaterializeResult(
metadata={
"row_count": dg.MetadataValue.int(count),
"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)),
}
)
Sales reps asset
The code for the sales reps asset is similar to the product asset code. In the definitions.py
file, copy the following code below the product asset code:
@dg.asset(
compute_kind="duckdb",
group_name="ingestion",
)
def sales_reps(duckdb: DuckDBResource) -> dg.MaterializeResult:
with duckdb.get_connection() as conn:
conn.execute(
"""
create or replace table sales_reps as (
select * from read_csv_auto('data/sales_reps.csv')
)
"""
)
preview_query = "select * from sales_reps limit 10"
preview_df = conn.execute(preview_query).fetchdf()
row_count = conn.execute("select count(*) from sales_reps").fetchone()
count = row_count[0] if row_count else 0
return dg.MaterializeResult(
metadata={
"row_count": dg.MetadataValue.int(count),
"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)),
}
)
Sales data asset
To add the sales data asset, copy the following code into your definitions.py
file below the sales reps asset:
@dg.asset(
compute_kind="duckdb",
group_name="ingestion",
)
def sales_data(duckdb: DuckDBResource) -> dg.MaterializeResult:
with duckdb.get_connection() as conn:
conn.execute(
"""
drop table if exists sales_data;
create table sales_data as select * from read_csv_auto('data/sales_data.csv')
"""
)
preview_query = "SELECT * FROM sales_data LIMIT 10"
preview_df = conn.execute(preview_query).fetchdf()
row_count = conn.execute("select count(*) from sales_data").fetchone()
count = row_count[0] if row_count else 0
return dg.MaterializeResult(
metadata={
"row_count": dg.MetadataValue.int(count),
"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)),
}
)
4. Add assets to the definitions object
Now to pull these assets into our Definitions object. Adding them to the Definitions object makes them available to the Dagster project. Add them to the empty list in the assets parameter.
defs = dg.Definitions(
assets=[products,
sales_reps,
sales_data,
],
resources={"duckdb": DuckDBResource(database="data/mydb.duckdb")},
)
5. Materialize assets
To materialize your assets:
-
In a browser, navigate to the URL of the Dagster server that yous started earlier.
-
Navigate to Deployment.
-
Click Reload definitions.
-
Click Assets, then click "View global asset lineage" to see all of your assets.
-
Click materialize all.
-
Navigate to the runs tab and select the most recent run. Here you can see the logs from the run.
Next steps
- Continue this tutorial with your asset dependencies