Skip to main content

Airbyte (dagster-airbyte)

This library provides a Dagster integration with Airbyte.

For more information on getting started, see the Airbyte integration guide.

Assets (Airbyte API)

class dagster_airbyte.AirbyteCloudWorkspace
experimental

This API may break in future versions, even between dot releases.

This class represents a Airbyte Cloud workspace and provides utilities to interact with Airbyte APIs.

sync_and_poll
experimental

This API may break in future versions, even between dot releases.

Executes a sync and poll process to materialize Airbyte Cloud assets. This method can only be used in the context of an asset execution.

Parameters: context (AssetExecutionContext) – The execution context from within @airbyte_assets.Returns: An iterator of MaterializeResult or AssetMaterialization.

Return type: Iterator[Union[AssetMaterialization, MaterializeResult]]

class dagster_airbyte.DagsterAirbyteTranslator
experimental

This API may break in future versions, even between dot releases.

Translator class which converts a AirbyteConnectionTableProps object into AssetSpecs. Subclass this class to implement custom logic how to translate Airbyte content into asset spec.

dagster_airbyte.load_airbyte_cloud_asset_specs
experimental

This API may break in future versions, even between dot releases.

Returns a list of AssetSpecs representing the Airbyte content in the workspace.

Parameters:

Returns: The set of assets representing the Airbyte content in the workspace.Return type: List[AssetSpec] Examples:

Loading the asset specs for a given Airbyte Cloud workspace:

from dagster_airbyte import AirbyteCloudWorkspace, load_airbyte_cloud_asset_specs

import dagster as dg

airbyte_cloud_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)


airbyte_cloud_specs = load_airbyte_cloud_asset_specs(airbyte_cloud_workspace)
defs = dg.Definitions(assets=airbyte_cloud_specs)
@dagster_airbyte.airbyte_assets
experimental

This API may break in future versions, even between dot releases.

Create a definition for how to sync the tables of a given Airbyte connection.

Parameters:

  • connection_id (str) – The Airbyte Connection ID.
  • workspace (AirbyteCloudWorkspace) – The Airbyte workspace to fetch assets from.
  • name (Optional[str], optional) – The name of the op.
  • group_name (Optional[str], optional) – The name of the asset group.
  • dagster_airbyte_translator (Optional[DagsterAirbyteTranslator], optional) – The translator to use

Examples:

Sync the tables of an Airbyte connection:

from dagster_airbyte import AirbyteCloudWorkspace, airbyte_assets

import dagster as dg

airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)


@airbyte_assets(
connection_id="airbyte_connection_id",
workspace=airbyte_workspace,
)
def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace):
yield from airbyte.sync_and_poll(context=context)


defs = dg.Definitions(
assets=[airbyte_connection_assets],
resources=\{"airbyte": airbyte_workspace},
)

Sync the tables of an Airbyte connection with a custom translator:

from dagster_airbyte import (
DagsterAirbyteTranslator,
AirbyteConnectionTableProps,
AirbyteCloudWorkspace,
airbyte_assets
)

import dagster as dg

class CustomDagsterAirbyteTranslator(DagsterAirbyteTranslator):
def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec:
default_spec = super().get_asset_spec(props)
return default_spec.merge_attributes(
metadata=\{"custom": "metadata"},
)

airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)


@airbyte_assets(
connection_id="airbyte_connection_id",
workspace=airbyte_workspace,
dagster_airbyte_translator=CustomDagsterAirbyteTranslator()
)
def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace):
yield from airbyte.sync_and_poll(context=context)


defs = dg.Definitions(
assets=[airbyte_connection_assets],
resources=\{"airbyte": airbyte_workspace},
)
dagster_airbyte.build_airbyte_assets_definitions
experimental

This API may break in future versions, even between dot releases.

The list of AssetsDefinition for all connections in the Airbyte workspace.

Parameters:

Returns: The list of AssetsDefinition for all connections in the Airbyte workspace.Return type: List[AssetsDefinition] Examples:

Sync the tables of a Airbyte connection:

from dagster_airbyte import AirbyteCloudWorkspace, build_airbyte_assets_definitions

import dagster as dg

airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)


airbyte_assets = build_airbyte_assets_definitions(workspace=workspace)

defs = dg.Definitions(
assets=airbyte_assets,
resources=\{"airbyte": airbyte_workspace},
)

Sync the tables of a Airbyte connection with a custom translator:

from dagster_airbyte import (
DagsterAirbyteTranslator,
AirbyteConnectionTableProps,
AirbyteCloudWorkspace,
build_airbyte_assets_definitions
)

import dagster as dg

class CustomDagsterAirbyteTranslator(DagsterAirbyteTranslator):
def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec:
default_spec = super().get_asset_spec(props)
return default_spec.merge_attributes(
metadata=\{"custom": "metadata"},
)

airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)


airbyte_assets = build_airbyte_assets_definitions(
workspace=workspace,
dagster_airbyte_translator=CustomDagsterAirbyteTranslator()
)

defs = dg.Definitions(
assets=airbyte_assets,
resources=\{"airbyte": airbyte_workspace},
)

Legacy

dagster_airbyte.AirbyteResource ResourceDefinition

This resource allows users to programatically interface with the Airbyte REST API to launch syncs and monitor their progress.

Examples:
from dagster import job, EnvVar
from dagster_airbyte import AirbyteResource

my_airbyte_resource = AirbyteResource(
host=EnvVar("AIRBYTE_HOST"),
port=EnvVar("AIRBYTE_PORT"),
# If using basic auth
username=EnvVar("AIRBYTE_USERNAME"),
password=EnvVar("AIRBYTE_PASSWORD"),
)

airbyte_assets = build_airbyte_assets(
connection_id="87b7fe85-a22c-420e-8d74-b30e7ede77df",
destination_tables=["releases", "tags", "teams"],
)

defs = Definitions(
assets=[airbyte_assets],
resources=\{"airbyte": my_airbyte_resource},
)
dagster_airbyte.load_assets_from_airbyte_instance

Loads Airbyte connection assets from a configured AirbyteResource instance. This fetches information about defined connections at initialization time, and will error on workspace load if the Airbyte instance is not reachable.

Parameters:

  • airbyte (ResourceDefinition) – An AirbyteResource configured with the appropriate connection
  • workspace_id (Optional[str]) – The ID of the Airbyte workspace to load connections from. Only
  • key_prefix (Optional[CoercibleToAssetKeyPrefix]) – A prefix for the asset keys created.
  • create_assets_for_normalization_tables (bool) – If True, assets will be created for tables
  • connection_to_group_fn (Optional[Callable[[str], Optional[str]]]) – Function which returns an asset
  • connection_meta_to_group_fn (Optional[Callable[[AirbyteConnectionMetadata], Optional[str]]]) – Function which
  • io_manager_key (Optional[str]) – The I/O manager key to use for all assets. Defaults to “io_manager”.
  • connection_to_io_manager_key_fn (Optional[Callable[[str], Optional[str]]]) – Function which returns an
  • connection_filter (Optional[Callable[[AirbyteConnectionMetadata], bool]]) – Optional function which takes
  • connection_to_asset_key_fn (Optional[Callable[[AirbyteConnectionMetadata, str], AssetKey]]) – Optional function which
  • connection_to_freshness_policy_fn (Optional[Callable[[AirbyteConnectionMetadata], Optional[FreshnessPolicy]]]) – Optional function
  • connection_to_auto_materialize_policy_fn (Optional[Callable[[AirbyteConnectionMetadata], Optional[AutoMaterializePolicy]]]) – Optional
Examples:

Loading all Airbyte connections as assets:

from dagster_airbyte import airbyte_resource, load_assets_from_airbyte_instance

airbyte_instance = airbyte_resource.configured(
\{
"host": "localhost",
"port": "8000",
}
)
airbyte_assets = load_assets_from_airbyte_instance(airbyte_instance)

Filtering the set of loaded connections:

from dagster_airbyte import airbyte_resource, load_assets_from_airbyte_instance

airbyte_instance = airbyte_resource.configured(
\{
"host": "localhost",
"port": "8000",
}
)
airbyte_assets = load_assets_from_airbyte_instance(
airbyte_instance,
connection_filter=lambda meta: "snowflake" in meta.name,
)
dagster_airbyte.build_airbyte_assets

Builds a set of assets representing the tables created by an Airbyte sync operation.

Parameters:

  • connection_id (str) – The Airbyte Connection ID that this op will sync. You can retrieve this
  • destination_tables (List[str]) – The names of the tables that you want to be represented
  • destination_database (Optional[str]) – The name of the destination database.
  • destination_schema (Optional[str]) – The name of the destination schema.
  • normalization_tables (Optional[Mapping[str, List[str]]]) – If you are using Airbyte’s
  • asset_key_prefix (Optional[List[str]]) – A prefix for the asset keys inside this asset.
  • deps (Optional[Sequence[Union[AssetsDefinition, SourceAsset, str, AssetKey]]]) – A list of assets to add as sources.
  • upstream_assets (Optional[Set[AssetKey]]) – Deprecated, use deps instead. A list of assets to add as sources.
  • freshness_policy (Optional[FreshnessPolicy]) – A freshness policy to apply to the assets
  • stream_to_asset_map (Optional[Mapping[str, str]]) – A mapping of an Airbyte stream name to a Dagster asset.
  • auto_materialize_policy (Optional[AutoMaterializePolicy]) – An auto materialization policy to apply to the assets.
dagster_airbyte.airbyte_sync_op = <dagster._core.definitions.op_definition.OpDefinition object>

Executes a Airbyte job sync for a given connection_id, and polls until that sync completes, raising an error if it is unsuccessful. It outputs a AirbyteOutput which contains the job details for a given connection_id.

It requires the use of the airbyte_resource, which allows it to communicate with the Airbyte API.

Examples:

from dagster import job
from dagster_airbyte import airbyte_resource, airbyte_sync_op

my_airbyte_resource = airbyte_resource.configured(
\{
"host": \{"env": "AIRBYTE_HOST"},
"port": \{"env": "AIRBYTE_PORT"},
}
)

sync_foobar = airbyte_sync_op.configured(\{"connection_id": "foobar"}, name="sync_foobar")

@job(resource_defs=\{"airbyte": my_airbyte_resource})
def my_simple_airbyte_job():
sync_foobar()

@job(resource_defs=\{"airbyte": my_airbyte_resource})
def my_composed_airbyte_job():
final_foobar_state = sync_foobar(start_after=some_op())
other_op(final_foobar_state)