Skip to main content

Embedded ELT (dagster-embedded-elt)

This package provides a framework for building ELT pipelines with Dagster through helpful pre-built assets and resources. This package currently includes the following integrations:

  • Sling, which provides a simple way to sync data between databases and file systems
  • dlt, or data load tool, which provides a way to load data from systems and APIs

For more information on getting started, see the Embedded ELT documentation.


Sling (dagster-embedded-elt.sling)

Refer to the Sling guide to get started.

Assets (Sling)

@dagster_embedded_elt.sling.sling_assets

Create a definition for how to materialize a set of Sling replication streams as Dagster assets, as described by a Sling replication config. This will create on Asset for every Sling target stream.

A Sling Replication config is a configuration that maps sources to destinations. For the full spec and descriptions, see Sling’s Documentation.

Parameters:

  • replication_config (Union[Mapping[str, Any], str, Path]) – A path to a Sling replication config, or a dictionary
  • dagster_sling_translator – (DagsterSlingTranslator): Allows customization of how to map a Sling stream to a Dagster
  • (Optional[str] (name) – The name of the op.
  • partitions_def (Optional[PartitionsDefinitionPartitionsDefinition]) – The partitions definition for this asset.
  • backfill_policy (Optional[BackfillPolicyBackfillPolicy]) – The backfill policy for this asset.
  • op_tags (Optional[Mapping[str, Any]]) – The tags for this asset.

Examples:

Running a sync by providing a path to a Sling Replication config:

from dagster_embedded_elt.sling import sling_assets, SlingResource, SlingConnectionResource

sling_resource = SlingResource(
connections=[
SlingConnectionResource(
name="MY_POSTGRES", type="postgres", connection_string=EnvVar("POSTGRES_URL")
),
SlingConnectionResource(
name="MY_DUCKDB",
type="duckdb",
connection_string="duckdb:///var/tmp/duckdb.db",
),
]
)

config_path = "/path/to/replication.yaml"
@sling_assets(replication_config=config_path)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(context=context)
class dagster_embedded_elt.sling.DagsterSlingTranslator
get_asset_key

A function that takes a stream definition from a Sling replication config and returns a Dagster AssetKey.

The stream definition is a dictionary key/value pair where the key is the stream name and the value is a dictionary representing the Sling Replication Stream Config.

For example:

stream_definition = \{"public.users":
\{'sql': 'select all_user_id, name from public."all_Users"',
'object': 'public.all_users'}
}

By default, this returns the class’s target_prefix paramater concatenated with the stream name. A stream named “public.accounts” will create an AssetKey named “target_public_accounts”.

Override this function to customize how to map a Sling stream to a Dagster AssetKey.

Alternatively, you can provide metadata in your Sling replication config to specify the Dagster AssetKey for a stream as follows:

public.users:
meta:
dagster:
asset_key: "mydb_users"

Parameters: stream_definition (Mapping[str, Any]) – A dictionary representing the stream definitionReturns: The Dagster AssetKey for the replication stream.Return type: AssetKey Examples:

Using a custom mapping for streams:

class CustomSlingTranslator(DagsterSlingTranslator):
def get_asset_key_for_target(self, stream_definition) -> AssetKey:
map = \{"stream1": "asset1", "stream2": "asset2"}
return AssetKey(map[stream_name])
get_auto_materialize_policy

Defines the auto-materialize policy for a given stream definition.

This method checks the provided stream definition for a specific configuration indicating an auto-materialize policy. If the configuration is found, it returns an eager auto-materialize policy. Otherwise, it returns None.

Parameters:

  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
  • details. (which includes configuration)

Returns: An eager auto-materialize policy if the configuration is found, otherwise None.Return type: Optional[AutoMaterializePolicy]

get_deps_asset_key

A function that takes a stream name from a Sling replication config and returns a Dagster AssetKey for the dependencies of the replication stream.

By default, this returns the stream name. For example, a stream named “public.accounts” will create an AssetKey named “target_public_accounts” and a dependency named “public_accounts”.

Override this function to customize how to map a Sling stream to a Dagster depenency. Alternatively, you can provide metadata in your Sling replication config to specify the Dagster AssetKey for a stream as follows:

public.users:
meta:
dagster:
deps: "sourcedb_users"

Parameters: stream_name (str) – The name of the stream.Returns: The Dagster AssetKey dependency for the replication stream.Return type: AssetKey Examples:

Using a custom mapping for streams:

class CustomSlingTranslator(DagsterSlingTranslator):
def get_deps_asset_key(self, stream_name: str) -> AssetKey:
map = \{"stream1": "asset1", "stream2": "asset2"}
return AssetKey(map[stream_name])
get_description

Retrieves the description for a given stream definition.

This method checks the provided stream definition for a description. It first looks for an “sql” key in the configuration and returns its value if found. If not, it looks for a description in the metadata under the “dagster” key.

Parameters:

  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
  • details. (which includes configuration)

Returns: The description of the stream if found, otherwise None.Return type: Optional[str]

get_freshness_policy

Retrieves the freshness policy for a given stream definition.

This method checks the provided stream definition for a specific configuration indicating a freshness policy. If the configuration is found, it constructs and returns a FreshnessPolicy object based on the provided parameters. Otherwise, it returns None.

Parameters:

  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
  • details. (which includes configuration)

Returns: A FreshnessPolicy object if the configuration is found, otherwise None.Return type: Optional[FreshnessPolicy]

get_group_name

Retrieves the group name for a given stream definition.

This method checks the provided stream definition for a group name in the metadata under the “dagster” key.

Parameters:

  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
  • details. (which includes configuration)

Returns: The group name if found, otherwise None.Return type: Optional[str]

get_kinds

Retrieves the kinds for a given stream definition.

This method returns “sling” by default. This method can be overridden to provide custom kinds.

Parameters:

  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
  • details. (which includes configuration)

Returns: A set containing kinds for the stream’s assets.Return type: Set[str]

get_metadata

Retrieves the metadata for a given stream definition.

This method extracts the configuration from the provided stream definition and returns it as a JSON metadata value.

Parameters:

  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
  • details. (which includes configuration)

Returns: A dictionary containing the stream configuration as JSON metadata.Return type: Mapping[str, Any]

get_tags

Retrieves the tags for a given stream definition.

This method returns an empty dictionary, indicating that no tags are associated with the stream definition by default. This method can be overridden to provide custom tags.

Parameters:

  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
  • details. (which includes configuration)

Returns: An empty dictionary.Return type: Mapping[str, Any]

sanitize_stream_name

A function that takes a stream name from a Sling replication config and returns a sanitized name for the stream.

By default, this removes any non-alphanumeric characters from the stream name and replaces them with underscores, while removing any double quotes.

Parameters: stream_name (str) – The name of the stream. Examples:

Using a custom stream name sanitizer:

class CustomSlingTranslator(DagsterSlingTranslator):
def sanitize_stream_name(self, stream_name: str) -> str:
return stream_name.replace(".", "")

Resources (Sling)

class dagster_embedded_elt.sling.SlingResource

Resource for interacting with the Sling package. This resource can be used to run Sling replications.

Parameters: connections (List[SlingConnectionResourceSlingConnectionResource]) – A list of connections to use for the replication. Examples:

from dagster_etl.sling import SlingResource, SlingConnectionResource

sling_resource = SlingResource(
connections=[
SlingConnectionResource(
name="MY_POSTGRES",
type="postgres",
connection_string=EnvVar("POSTGRES_CONNECTION_STRING"),
),
SlingConnectionResource(
name="MY_SNOWFLAKE",
type="snowflake",
host=EnvVar("SNOWFLAKE_HOST"),
user=EnvVar("SNOWFLAKE_USER"),
database=EnvVar("SNOWFLAKE_DATABASE"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
role=EnvVar("SNOWFLAKE_ROLE"),
),
]
)
class dagster_embedded_elt.sling.SlingConnectionResource

A representation of a connection to a database or file to be used by Sling. This resource can be used as a source or a target for a Sling syncs.

Reference the Sling docs for more information on possible connection types and parameters: https://docs.slingdata.io/connections

The name of the connection is passed to Sling and must match the name of the connection provided in the replication configuration: https://docs.slingdata.io/sling-cli/run/configuration/replication You may provide either a connection string or keyword arguments for the connection.

Examples:

Creating a Sling Connection for a file, such as CSV or JSON:

source = SlingConnectionResource(name="MY_FILE", type="file")

Create a Sling Connection for a Postgres database, using a connection string:

postgres_conn = SlingConnectionResource(name="MY_POSTGRES", type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING"))
mysql_conn = SlingConnectionResource(name="MY_MYSQL", type="mysql", connection_string="mysql://user:password@host:port/schema")

Create a Sling Connection for a Postgres or Snowflake database, using keyword arguments:

dlt (dagster-embedded-elt.dlt)

Refer to the dlt guide to get started.

Assets (dlt)

@dagster_embedded_elt.dlt.dlt_assets

Asset Factory for using data load tool (dlt).

Parameters:

  • dlt_source (DltSource) – The DltSource to be ingested.
  • dlt_pipeline (Pipeline) – The dlt Pipeline defining the destination parameters.
  • name (Optional[str], optional) – The name of the op.
  • group_name (Optional[str], optional) – The name of the asset group.
  • dagster_dlt_translator (DltDagsterTranslator, optional) – Customization object for defining asset parameters from dlt resources.

Examples:

Loading Hubspot data to Snowflake with an auto materialize policy using the dlt verified source:

class HubspotDltDagsterTranslator(DltDagsterTranslator):
@public
def get_auto_materialize_policy(self, resource: DltResource) -> Optional[AutoMaterializePolicy]:
return AutoMaterializePolicy.eager().with_rules(
AutoMaterializeRule.materialize_on_cron("0 0 * * *")
)


@dlt_assets(
dlt_source=hubspot(include_history=True),
dlt_pipeline=pipeline(
pipeline_name="hubspot",
dataset_name="hubspot",
destination="snowflake",
progress="log",
),
name="hubspot",
group_name="hubspot",
dagster_dlt_translator=HubspotDltDagsterTranslator(),
)
def hubspot_assets(context: AssetExecutionContext, dlt: DltDagsterResource):
yield from dlt.run(context=context)

Loading Github issues to snowflake:

@dlt_assets(
dlt_source=github_reactions(
"dagster-io", "dagster", items_per_page=100, max_items=250
),
dlt_pipeline=pipeline(
pipeline_name="github_issues",
dataset_name="github",
destination="snowflake",
progress="log",
),
name="github",
group_name="github",
)
def github_reactions_dagster_assets(context: AssetExecutionContext, dlt: DltDagsterResource):
yield from dlt.run(context=context)
dagster_embedded_elt.dlt.build_dlt_asset_specs

Build a list of asset specs from a dlt source and pipeline.

Parameters:

  • dlt_source (DltSource) – dlt source object
  • dlt_pipeline (Pipeline) – dlt pipeline object
  • dagster_dlt_translator (Optional[DagsterDltTranslatorDagsterDltTranslator]) – Allows customizing how to

Returns: List[AssetSpec] list of asset specs from dlt source and pipeline

class dagster_embedded_elt.dlt.DagsterDltTranslator
get_asset_key

Defines asset key for a given dlt resource key and dataset name.

This method can be overriden to provide custom asset key for a dlt resource.

Parameters: resource (DltResource) – dlt resourceReturns: AssetKey of Dagster asset derived from dlt resource

get_auto_materialize_policy

Defines resource specific auto materialize policy.

This method can be overriden to provide custom auto materialize policy for a dlt resource.

Parameters: resource (DltResource) – dlt resourceReturns: The automaterialize policy for a resourceReturn type: Optional[AutoMaterializePolicy]

get_automation_condition

Defines resource specific automation condition.

This method can be overridden to provide custom automation condition for a dlt resource.

Parameters: resource (DltResource) – dlt resourceReturns: The automation condition for a resourceReturn type: Optional[AutomationCondition]

get_deps_asset_keys

Defines upstream asset dependencies given a dlt resource.

Defaults to a concatenation of resource.source_name and resource.name.

Parameters: resource (DltResource) – dlt resourceReturns: The Dagster asset keys upstream of dlt_resource_key.Return type: Iterable[AssetKey]

get_description

A method that takes in a dlt resource returns the Dagster description of the resource.

This method can be overriden to provide a custom description for a dlt resource.

Parameters: resource (DltResource) – dlt resourceReturns: The Dagster description for the dlt resource.Return type: Optional[str]

get_group_name

A method that takes in a dlt resource and returns the Dagster group name of the resource.

This method can be overriden to provide a custom group name for a dlt resource.

Parameters: resource (DltResource) – dlt resourceReturns: A Dagster group name for the dlt resource.Return type: Optional[str]

get_kinds

A method that takes in a dlt resource and returns the kinds which should be attached. Defaults to the destination type and “dlt”.

This method can be overriden to provide custom kinds for a dlt resource.

Parameters: resource (DltResource) – dlt resourceReturns: The kinds of the asset.Return type: Set[str]

get_metadata

Defines resource specific metadata.

Parameters: resource (DltResource) – dlt resourceReturns: The custom metadata entries for this resource.Return type: Mapping[str, Any]

get_owners

A method that takes in a dlt resource and returns the Dagster owners of the resource.

This method can be overriden to provide custom owners for a dlt resource.

Parameters: resource (DltResource) – dlt resourceReturns: A sequence of Dagster owners for the dlt resource.Return type: Optional[Sequence[str]]

get_tags

A method that takes in a dlt resource and returns the Dagster tags of the structure.

This method can be overriden to provide custom tags for a dlt resource.

Parameters: resource (DltResource) – dlt resourceReturns: A dictionary representing the Dagster tags for the dlt resource.

Return type: Optional[Mapping[str, str]]

Resources (dlt)

class dagster_embedded_elt.dlt.DagsterDltResource
experimental

This API may break in future versions, even between dot releases.

run

Runs the dlt pipeline with subset support.

Parameters:

  • context (Union[OpExecutionContextOpExecutionContext, AssetExecutionContextAssetExecutionContext]) – Asset or op execution context
  • dlt_source (Optional[DltSource]) – optional dlt source if resource is used from an @op
  • dlt_pipeline (Optional[Pipeline]) – optional dlt pipeline if resource is used from an @op
  • dagster_dlt_translator (Optional[DagsterDltTranslatorDagsterDltTranslator]) – optional dlt translator if resource is used from an @op
  • **kwargs (dict[str, Any]) – Keyword args passed to pipeline run method

Returns: An iterator of MaterializeResult or AssetMaterializationReturn type: DltEventIterator[DltEventType]