Skip to main content

Track Snowflake usage with Dagster+ Insights

Dagster allows you to track external metrics, such as Snowflake usage, in the Insights UI. Out of the box integrations are provided to capture query runtime and billed usage, and associate them with the relevant assets or jobs.

Requirements

To use these features, you will need:

  • A Dagster+ account on the Pro plan
  • Access to the Dagster+ Insights feature
  • Snowflake credentials which have access to the snowflake.account_usage.query_history.
  • The following packages installed:
pip install dagster dagster-cloud dagster-snowflake

Limitations

  • Up to two million individual data points may be added to Insights, per month
  • External metrics data will be retained for 120 days
  • Insights data may take up to 24 hours to appear in the UI

Tracking usage with the SnowflakeResource

The dagster-cloud package provides an InsightsSnowflakeResource, which is a drop-in replacement for the SnowflakeResource provided by dagster-snowflake.

This resource will emit Snowflake usage metrics to the Dagster+ Insights API whenever it makes a query.

To enable this behavior, replace usage of SnowflakeResource with InsightsSnowflakeResource, and add Snowflake-specific insights definitions to your code using create_snowflake_insights_asset_and_schedule.

These additional definitions are required because Snowflake usage information is only available after a delay. These definitions automatically handle running a computation on a schedule to ingest Snowflake usage information from the previous hour.

note

Only use create_snowflake_insights_asset_and_schedule in a single code location per deployment, as this will handle ingesting usage data from your entire deployment.

from dagster_cloud.dagster_insights import (
InsightsSnowflakeResource,
create_snowflake_insights_asset_and_schedule,
)

import dagster as dg

insights_definitions = create_snowflake_insights_asset_and_schedule(
start_date="2024-01-01-00:00"
)


@dg.asset
def snowflake_asset(snowflake: InsightsSnowflakeResource):
with snowflake.get_connection() as conn:
conn.cursor().execute("select 1")


defs = dg.Definitions(
assets=[snowflake_asset, *insights_definitions.assets],
schedules=[insights_definitions.schedule],
resources={
"snowflake": InsightsSnowflakeResource(
user=dg.EnvVar("SNOWFLAKE_USER"), password=dg.EnvVar("SNOWFLAKE_PASSWORD")
),
},
)

Tracking usage with dagster-dbt

If you use dagster-dbt to manage a dbt project that targets Snowflake, you can emit usage metrics to the Dagster+ API with the DbtCliResource.

First, add a .with_insights() call to your dbt.cli() command(s), and add Snowflake-specific insights definitions to your code using create_snowflake_insights_asset_and_schedule.

These additional definitions are required because Snowflake usage information is only available after a delay. These definitions automatically handle running a computation on a schedule to ingest Snowflake usage information from the previous hour.

note

Only use create_snowflake_insights_asset_and_schedule in a single code location per deployment, as this will handle ingesting usage data from your entire deployment.

from dagster_cloud.dagster_insights import (
InsightsSnowflakeResource,
create_snowflake_insights_asset_and_schedule,
)
from dagster_dbt import DbtCliResource, dbt_assets
from path import Path

import dagster as dg

insights_definitions = create_snowflake_insights_asset_and_schedule(
start_date="2024-01-01-00:00"
)


@dbt_assets(manifest=Path(__file__).parent / "manifest.json")
def my_asset(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream().with_insights()


defs = dg.Definitions(
assets=[my_asset, *insights_definitions.assets],
schedules=[insights_definitions.schedule],
resources={
"snowflake": InsightsSnowflakeResource(
user=dg.EnvVar("SNOWFLAKE_USER"), password=dg.EnvVar("SNOWFLAKE_PASSWORD")
)
},
)

Then, add the following to your dbt_project.yml:

name: "dbt_project"
version: "0.0.1"
config-version: 2

query-comment:
comment: "snowflake_dagster_dbt_v1_opaque_id[[[{{ node.unique_id }}:{{ invocation_id }}]]]"
append: true

This adds a comment to each query, which is used by Dagster+ to attribute cost metrics to the correct assets.