Track Snowflake usage with Dagster+ Insights
Dagster allows you to track external metrics, such as Snowflake usage, in the Insights UI. Out of the box integrations are provided to capture query runtime and billed usage, and associate them with the relevant assets or jobs.
Requirements
To use these features, you will need:
- A Dagster+ account on the Pro plan
- Access to the Dagster+ Insights feature
- Snowflake credentials which have access to the
snowflake.account_usage.query_history
.- For more information, see the Snowflake Documentation
- The following packages installed:
pip install dagster dagster-cloud dagster-snowflake
Limitations
- Up to two million individual data points may be added to Insights, per month
- External metrics data will be retained for 120 days
- Insights data may take up to 24 hours to appear in the UI
Tracking usage with the SnowflakeResource
The dagster-cloud
package provides an InsightsSnowflakeResource
, which is a drop-in replacement for the SnowflakeResource
provided by dagster-snowflake
.
This resource will emit Snowflake usage metrics to the Dagster+ Insights API whenever it makes a query.
To enable this behavior, replace usage of SnowflakeResource
with InsightsSnowflakeResource
, and add Snowflake-specific insights definitions to your code using create_snowflake_insights_asset_and_schedule
.
These additional definitions are required because Snowflake usage information is only available after a delay. These definitions automatically handle running a computation on a schedule to ingest Snowflake usage information from the previous hour.
Only use create_snowflake_insights_asset_and_schedule
in a single code location per deployment, as this will handle ingesting usage data from your entire deployment.
- Before
- After
from dagster_snowflake import SnowflakeResource
import dagster as dg
@dg.asset
def snowflake_asset(snowflake: SnowflakeResource):
with snowflake.get_connection() as conn:
conn.cursor().execute("select 1")
defs = dg.Definitions(
assets=[snowflake_asset],
resources={
"snowflake": SnowflakeResource(
user=dg.EnvVar("SNOWFLAKE_USER"), password=dg.EnvVar("SNOWFLAKE_PASSWORD")
),
},
)
from dagster_cloud.dagster_insights import (
InsightsSnowflakeResource,
create_snowflake_insights_asset_and_schedule,
)
import dagster as dg
insights_definitions = create_snowflake_insights_asset_and_schedule(
start_date="2024-01-01-00:00"
)
@dg.asset
def snowflake_asset(snowflake: InsightsSnowflakeResource):
with snowflake.get_connection() as conn:
conn.cursor().execute("select 1")
defs = dg.Definitions(
assets=[snowflake_asset, *insights_definitions.assets],
schedules=[insights_definitions.schedule],
resources={
"snowflake": InsightsSnowflakeResource(
user=dg.EnvVar("SNOWFLAKE_USER"), password=dg.EnvVar("SNOWFLAKE_PASSWORD")
),
},
)
Tracking usage with dagster-dbt
If you use dagster-dbt
to manage a dbt project that targets Snowflake, you can emit usage metrics to the Dagster+ API with the DbtCliResource
.
First, add a .with_insights()
call to your dbt.cli()
command(s), and add Snowflake-specific insights definitions to your code using create_snowflake_insights_asset_and_schedule
.
These additional definitions are required because Snowflake usage information is only available after a delay. These definitions automatically handle running a computation on a schedule to ingest Snowflake usage information from the previous hour.
Only use create_snowflake_insights_asset_and_schedule
in a single code location per deployment, as this will handle ingesting usage data from your entire deployment.
- Before
- After
from dagster_dbt import DbtCliResource, dbt_assets
from path import Path
import dagster as dg
@dbt_assets(manifest=Path(__file__).parent / "manifest.json")
def my_asset(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
defs = dg.Definitions(
assets=[my_asset],
)
from dagster_cloud.dagster_insights import (
InsightsSnowflakeResource,
create_snowflake_insights_asset_and_schedule,
)
from dagster_dbt import DbtCliResource, dbt_assets
from path import Path
import dagster as dg
insights_definitions = create_snowflake_insights_asset_and_schedule(
start_date="2024-01-01-00:00"
)
@dbt_assets(manifest=Path(__file__).parent / "manifest.json")
def my_asset(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream().with_insights()
defs = dg.Definitions(
assets=[my_asset, *insights_definitions.assets],
schedules=[insights_definitions.schedule],
resources={
"snowflake": InsightsSnowflakeResource(
user=dg.EnvVar("SNOWFLAKE_USER"), password=dg.EnvVar("SNOWFLAKE_PASSWORD")
)
},
)
Then, add the following to your dbt_project.yml
:
- Before
- After
name: "dbt_project"
version: "0.0.1"
config-version: 2
name: "dbt_project"
version: "0.0.1"
config-version: 2
query-comment:
comment: "snowflake_dagster_dbt_v1_opaque_id[[[{{ node.unique_id }}:{{ invocation_id }}]]]"
append: true
This adds a comment to each query, which is used by Dagster+ to attribute cost metrics to the correct assets.