Skip to main content

Dagster & Sling

This integration allows you to use Sling to extract and load data from popular data sources to destinations with high performance and ease.

Installation

pip install dagster-sling

Example

from dagster_sling import SlingConnectionResource, SlingResource, sling_assets

import dagster as dg

source = SlingConnectionResource(
name="MY_PG",
type="postgres",
host="localhost", # type: ignore
port=5432, # type: ignore
database="my_database", # type: ignore
user="my_user", # type: ignore
password=dg.EnvVar("PG_PASS"), # type: ignore
)

target = SlingConnectionResource(
name="MY_SF",
type="snowflake",
host="hostname.snowflake", # type: ignore
user="username", # type: ignore
database="database", # type: ignore
password=dg.EnvVar("SF_PASSWORD"), # type: ignore
role="role", # type: ignore
)


@sling_assets(
replication_config={
"SOURCE": "MY_PG",
"TARGET": "MY_SF",
"defaults": {
"mode": "full-refresh",
"object": "{stream_schema}_{stream_table}",
},
"streams": {
"public.accounts": None,
"public.users": None,
"public.finance_departments": {"object": "departments"},
},
}
)
def my_sling_assets(context, sling: SlingResource):
yield from sling.replicate(context=context)


defs = dg.Definitions(
assets=[my_sling_assets],
resources={
"sling": SlingResource(
connections=[
source,
target,
]
)
},
)

About dlt

Sling provides an easy-to-use YAML configuration layer for loading data from files, replicating data between databases, exporting custom SQL queries to cloud storage, and much more.