Skip to main content

Airbyte (dagster-airbyte)

This library provides a Dagster integration with Airbyte.

For more information on getting started, see the Airbyte integration guide.

Resources

dagster_airbyte.AirbyteResource ResourceDefinition

This resource allows users to programatically interface with the Airbyte REST API to launch syncs and monitor their progress.

Examples:
from dagster import job, EnvVar
from dagster_airbyte import AirbyteResource

my_airbyte_resource = AirbyteResource(
host=EnvVar("AIRBYTE_HOST"),
port=EnvVar("AIRBYTE_PORT"),
# If using basic auth
username=EnvVar("AIRBYTE_USERNAME"),
password=EnvVar("AIRBYTE_PASSWORD"),
)

airbyte_assets = build_airbyte_assets(
connection_id="87b7fe85-a22c-420e-8d74-b30e7ede77df",
destination_tables=["releases", "tags", "teams"],
)

defs = Definitions(
assets=[airbyte_assets],
resources=\{"airbyte": my_airbyte_resource},
)

Assets

dagster_airbyte.load_assets_from_airbyte_instance

Loads Airbyte connection assets from a configured AirbyteResource instance. This fetches information about defined connections at initialization time, and will error on workspace load if the Airbyte instance is not reachable.

Parameters:

  • airbyte (ResourceDefinitionResourceDefinition) – An AirbyteResource configured with the appropriate connection
  • workspace_id (Optional[str]) – The ID of the Airbyte workspace to load connections from. Only
  • key_prefix (Optional[CoercibleToAssetKeyPrefix]) – A prefix for the asset keys created.
  • create_assets_for_normalization_tables (bool) – If True, assets will be created for tables
  • connection_to_group_fn (Optional[Callable[[str], Optional[str]]]) – Function which returns an asset
  • connection_meta_to_group_fn (Optional[Callable[[AirbyteConnectionMetadata], Optional[str]]]) – Function which
  • io_manager_key (Optional[str]) – The I/O manager key to use for all assets. Defaults to “io_manager”.
  • connection_to_io_manager_key_fn (Optional[Callable[[str], Optional[str]]]) – Function which returns an
  • connection_filter (Optional[Callable[[AirbyteConnectionMetadata], bool]]) – Optional function which takes
  • connection_to_asset_key_fn (Optional[Callable[[AirbyteConnectionMetadata, str], AssetKeyAssetKey]]) – Optional function which
  • connection_to_freshness_policy_fn (Optional[Callable[[AirbyteConnectionMetadata], Optional[FreshnessPolicy]]]) – Optional function
  • connection_to_auto_materialize_policy_fn (Optional[Callable[[AirbyteConnectionMetadata], Optional[AutoMaterializePolicy]]]) – Optional
Examples:

Loading all Airbyte connections as assets:

from dagster_airbyte import airbyte_resource, load_assets_from_airbyte_instance

airbyte_instance = airbyte_resource.configured(
\{
"host": "localhost",
"port": "8000",
}
)
airbyte_assets = load_assets_from_airbyte_instance(airbyte_instance)

Filtering the set of loaded connections:

from dagster_airbyte import airbyte_resource, load_assets_from_airbyte_instance

airbyte_instance = airbyte_resource.configured(
\{
"host": "localhost",
"port": "8000",
}
)
airbyte_assets = load_assets_from_airbyte_instance(
airbyte_instance,
connection_filter=lambda meta: "snowflake" in meta.name,
)
dagster_airbyte.build_airbyte_assets

Builds a set of assets representing the tables created by an Airbyte sync operation.

Parameters:

  • connection_id (str) – The Airbyte Connection ID that this op will sync. You can retrieve this
  • destination_tables (List[str]) – The names of the tables that you want to be represented
  • destination_database (Optional[str]) – The name of the destination database.
  • destination_schema (Optional[str]) – The name of the destination schema.
  • normalization_tables (Optional[Mapping[str, List[str]]]) – If you are using Airbyte’s
  • asset_key_prefix (Optional[List[str]]) – A prefix for the asset keys inside this asset.
  • deps (Optional[Sequence[Union[AssetsDefinitionAssetsDefinition, SourceAssetSourceAsset, str, AssetKeyAssetKey]]]) – A list of assets to add as sources.
  • upstream_assets (Optional[Set[AssetKeyAssetKey]]) – Deprecated, use deps instead. A list of assets to add as sources.
  • freshness_policy (Optional[FreshnessPolicy]) – A freshness policy to apply to the assets
  • stream_to_asset_map (Optional[Mapping[str, str]]) – A mapping of an Airbyte stream name to a Dagster asset.
  • auto_materialize_policy (Optional[AutoMaterializePolicy]) – An auto materialization policy to apply to the assets.

Ops

dagster_airbyte.airbyte_sync_op = <dagster._core.definitions.op_definition.OpDefinition object>

Executes a Airbyte job sync for a given connection_id, and polls until that sync completes, raising an error if it is unsuccessful. It outputs a AirbyteOutput which contains the job details for a given connection_id.

It requires the use of the airbyte_resource, which allows it to communicate with the Airbyte API.

Examples:

from dagster import job
from dagster_airbyte import airbyte_resource, airbyte_sync_op

my_airbyte_resource = airbyte_resource.configured(
\{
"host": \{"env": "AIRBYTE_HOST"},
"port": \{"env": "AIRBYTE_PORT"},
}
)

sync_foobar = airbyte_sync_op.configured(\{"connection_id": "foobar"}, name="sync_foobar")

@job(resource_defs=\{"airbyte": my_airbyte_resource})
def my_simple_airbyte_job():
sync_foobar()

@job(resource_defs=\{"airbyte": my_airbyte_resource})
def my_composed_airbyte_job():
final_foobar_state = sync_foobar(start_after=some_op())
other_op(final_foobar_state)