Skip to main content

dlt (dagster-dlt)

This library provides a Dagster integration with Sling.

For more information on getting started, see the Sling & Dagster documentation.

Assets

@dagster_sling.sling_assets

Create a definition for how to materialize a set of Sling replication streams as Dagster assets, as described by a Sling replication config. This will create on Asset for every Sling target stream.

A Sling Replication config is a configuration that maps sources to destinations. For the full spec and descriptions, see Sling’s Documentation.

Parameters:

  • replication_config (Union[Mapping[str, Any], str, Path]) – A path to a Sling replication config, or a dictionary
  • dagster_sling_translator – (DagsterSlingTranslator): Allows customization of how to map a Sling stream to a Dagster
  • (Optional[str] (name) – The name of the op.
  • partitions_def (Optional[PartitionsDefinition]) – The partitions definition for this asset.
  • backfill_policy (Optional[BackfillPolicy]) – The backfill policy for this asset.
  • op_tags (Optional[Mapping[str, Any]]) – The tags for the underlying op.

Examples:

Running a sync by providing a path to a Sling Replication config:

from dagster_sling import sling_assets, SlingResource, SlingConnectionResource

sling_resource = SlingResource(
connections=[
SlingConnectionResource(
name="MY_POSTGRES", type="postgres", connection_string=EnvVar("POSTGRES_URL")
),
SlingConnectionResource(
name="MY_DUCKDB",
type="duckdb",
connection_string="duckdb:///var/tmp/duckdb.db",
),
]
)

config_path = "/path/to/replication.yaml"
@sling_assets(replication_config=config_path)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(context=context)
class dagster_sling.DagsterSlingTranslator
get_asset_key

A function that takes a stream definition from a Sling replication config and returns a Dagster AssetKey.

The stream definition is a dictionary key/value pair where the key is the stream name and the value is a dictionary representing the Sling Replication Stream Config.

For example:

stream_definition = \{"public.users":
\{'sql': 'select all_user_id, name from public."all_Users"',
'object': 'public.all_users'}
}

By default, this returns the class’s target_prefix paramater concatenated with the stream name. A stream named “public.accounts” will create an AssetKey named “target_public_accounts”.

Override this function to customize how to map a Sling stream to a Dagster AssetKey.

Alternatively, you can provide metadata in your Sling replication config to specify the Dagster AssetKey for a stream as follows:

public.users:
meta:
dagster:
asset_key: "mydb_users"

Parameters: stream_definition (Mapping[str, Any]) – A dictionary representing the stream definitionReturns: The Dagster AssetKey for the replication stream.Return type: AssetKey Examples:

Using a custom mapping for streams:

class CustomSlingTranslator(DagsterSlingTranslator):
def get_asset_key_for_target(self, stream_definition) -> AssetKey:
map = \{"stream1": "asset1", "stream2": "asset2"}
return AssetKey(map[stream_name])
get_auto_materialize_policy

Defines the auto-materialize policy for a given stream definition.

This method checks the provided stream definition for a specific configuration indicating an auto-materialize policy. If the configuration is found, it returns an eager auto-materialize policy. Otherwise, it returns None.

Parameters:

  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
  • details. (which includes configuration)

Returns: An eager auto-materialize policy if the configuration is found, otherwise None.Return type: Optional[AutoMaterializePolicy]

get_deps_asset_key

A function that takes a stream name from a Sling replication config and returns a Dagster AssetKey for the dependencies of the replication stream.

By default, this returns the stream name. For example, a stream named “public.accounts” will create an AssetKey named “target_public_accounts” and a dependency named “public_accounts”.

Override this function to customize how to map a Sling stream to a Dagster depenency. Alternatively, you can provide metadata in your Sling replication config to specify the Dagster AssetKey for a stream as follows:

public.users:
meta:
dagster:
deps: "sourcedb_users"

Parameters: stream_name (str) – The name of the stream.Returns: The Dagster AssetKey dependency for the replication stream.Return type: AssetKey Examples:

Using a custom mapping for streams:

class CustomSlingTranslator(DagsterSlingTranslator):
def get_deps_asset_key(self, stream_name: str) -> AssetKey:
map = \{"stream1": "asset1", "stream2": "asset2"}
return AssetKey(map[stream_name])
get_description

Retrieves the description for a given stream definition.

This method checks the provided stream definition for a description. It first looks for an “sql” key in the configuration and returns its value if found. If not, it looks for a description in the metadata under the “dagster” key.

Parameters:

  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
  • details. (which includes configuration)

Returns: The description of the stream if found, otherwise None.Return type: Optional[str]

get_freshness_policy

Retrieves the freshness policy for a given stream definition.

This method checks the provided stream definition for a specific configuration indicating a freshness policy. If the configuration is found, it constructs and returns a FreshnessPolicy object based on the provided parameters. Otherwise, it returns None.

Parameters:

  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
  • details. (which includes configuration)

Returns: A FreshnessPolicy object if the configuration is found, otherwise None.Return type: Optional[FreshnessPolicy]

get_group_name

Retrieves the group name for a given stream definition.

This method checks the provided stream definition for a group name in the metadata under the “dagster” key.

Parameters:

  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
  • details. (which includes configuration)

Returns: The group name if found, otherwise None.Return type: Optional[str]

get_kinds

Retrieves the kinds for a given stream definition.

This method returns “sling” by default. This method can be overridden to provide custom kinds.

Parameters:

  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
  • details. (which includes configuration)

Returns: A set containing kinds for the stream’s assets.Return type: Set[str]

get_metadata

Retrieves the metadata for a given stream definition.

This method extracts the configuration from the provided stream definition and returns it as a JSON metadata value.

Parameters:

  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
  • details. (which includes configuration)

Returns: A dictionary containing the stream configuration as JSON metadata.Return type: Mapping[str, Any]

get_tags

Retrieves the tags for a given stream definition.

This method returns an empty dictionary, indicating that no tags are associated with the stream definition by default. This method can be overridden to provide custom tags.

Parameters:

  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
  • details. (which includes configuration)

Returns: An empty dictionary.Return type: Mapping[str, Any]

sanitize_stream_name

A function that takes a stream name from a Sling replication config and returns a sanitized name for the stream.

By default, this removes any non-alphanumeric characters from the stream name and replaces them with underscores, while removing any double quotes.

Parameters: stream_name (str) – The name of the stream. Examples:

Using a custom stream name sanitizer:

class CustomSlingTranslator(DagsterSlingTranslator):
def sanitize_stream_name(self, stream_name: str) -> str:
return stream_name.replace(".", "")

Resources

class dagster_sling.SlingResource

Resource for interacting with the Sling package. This resource can be used to run Sling replications.

Parameters: connections (List[SlingConnectionResource]) – A list of connections to use for the replication. Examples:

from dagster_etl.sling import SlingResource, SlingConnectionResource

sling_resource = SlingResource(
connections=[
SlingConnectionResource(
name="MY_POSTGRES",
type="postgres",
connection_string=EnvVar("POSTGRES_CONNECTION_STRING"),
),
SlingConnectionResource(
name="MY_SNOWFLAKE",
type="snowflake",
host=EnvVar("SNOWFLAKE_HOST"),
user=EnvVar("SNOWFLAKE_USER"),
database=EnvVar("SNOWFLAKE_DATABASE"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
role=EnvVar("SNOWFLAKE_ROLE"),
),
]
)
class dagster_sling.SlingConnectionResource

A representation of a connection to a database or file to be used by Sling. This resource can be used as a source or a target for a Sling syncs.

Reference the Sling docs for more information on possible connection types and parameters: https://docs.slingdata.io/connections

The name of the connection is passed to Sling and must match the name of the connection provided in the replication configuration: https://docs.slingdata.io/sling-cli/run/configuration/replication You may provide either a connection string or keyword arguments for the connection.

Examples:

Creating a Sling Connection for a file, such as CSV or JSON:

source = SlingConnectionResource(name="MY_FILE", type="file")

Create a Sling Connection for a Postgres database, using a connection string:

postgres_conn = SlingConnectionResource(name="MY_POSTGRES", type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING"))
mysql_conn = SlingConnectionResource(name="MY_MYSQL", type="mysql", connection_string="mysql://user:password@host:port/schema")

Create a Sling Connection for a Postgres or Snowflake database, using keyword arguments: