Airbyte (dagster-airbyte)
This library provides a Dagster integration with Airbyte.
For more information on getting started, see the Airbyte integration guide.
Resources
- dagster_airbyte.AirbyteResource ResourceDefinition
This resource allows users to programatically interface with the Airbyte REST API to launch syncs and monitor their progress.
Examples:from dagster import job, EnvVar
from dagster_airbyte import AirbyteResource
my_airbyte_resource = AirbyteResource(
host=EnvVar("AIRBYTE_HOST"),
port=EnvVar("AIRBYTE_PORT"),
# If using basic auth
username=EnvVar("AIRBYTE_USERNAME"),
password=EnvVar("AIRBYTE_PASSWORD"),
)
airbyte_assets = build_airbyte_assets(
connection_id="87b7fe85-a22c-420e-8d74-b30e7ede77df",
destination_tables=["releases", "tags", "teams"],
)
defs = Definitions(
assets=[airbyte_assets],
resources=\{"airbyte": my_airbyte_resource},
)
Assets
- dagster_airbyte.load_assets_from_airbyte_instance
Loads Airbyte connection assets from a configured AirbyteResource instance. This fetches information about defined connections at initialization time, and will error on workspace load if the Airbyte instance is not reachable.
Parameters:
- airbyte (ResourceDefinitionResourceDefinition) – An AirbyteResource configured with the appropriate connection
- workspace_id (Optional[str]) – The ID of the Airbyte workspace to load connections from. Only
- key_prefix (Optional[CoercibleToAssetKeyPrefix]) – A prefix for the asset keys created.
- create_assets_for_normalization_tables (bool) – If True, assets will be created for tables
- connection_to_group_fn (Optional[Callable[[str], Optional[str]]]) – Function which returns an asset
- connection_meta_to_group_fn (Optional[Callable[[AirbyteConnectionMetadata], Optional[str]]]) – Function which
- io_manager_key (Optional[str]) – The I/O manager key to use for all assets. Defaults to “io_manager”.
- connection_to_io_manager_key_fn (Optional[Callable[[str], Optional[str]]]) – Function which returns an
- connection_filter (Optional[Callable[[AirbyteConnectionMetadata], bool]]) – Optional function which takes
- connection_to_asset_key_fn (Optional[Callable[[AirbyteConnectionMetadata, str], AssetKeyAssetKey]]) – Optional function which
- connection_to_freshness_policy_fn (Optional[Callable[[AirbyteConnectionMetadata], Optional[FreshnessPolicy]]]) – Optional function
- connection_to_auto_materialize_policy_fn (Optional[Callable[[AirbyteConnectionMetadata], Optional[AutoMaterializePolicy]]]) – Optional
Loading all Airbyte connections as assets:
from dagster_airbyte import airbyte_resource, load_assets_from_airbyte_instance
airbyte_instance = airbyte_resource.configured(
\{
"host": "localhost",
"port": "8000",
}
)
airbyte_assets = load_assets_from_airbyte_instance(airbyte_instance)Filtering the set of loaded connections:
from dagster_airbyte import airbyte_resource, load_assets_from_airbyte_instance
airbyte_instance = airbyte_resource.configured(
\{
"host": "localhost",
"port": "8000",
}
)
airbyte_assets = load_assets_from_airbyte_instance(
airbyte_instance,
connection_filter=lambda meta: "snowflake" in meta.name,
)
- dagster_airbyte.build_airbyte_assets
Builds a set of assets representing the tables created by an Airbyte sync operation.
Parameters:
- connection_id (str) – The Airbyte Connection ID that this op will sync. You can retrieve this
- destination_tables (List[str]) – The names of the tables that you want to be represented
- destination_database (Optional[str]) – The name of the destination database.
- destination_schema (Optional[str]) – The name of the destination schema.
- normalization_tables (Optional[Mapping[str, List[str]]]) – If you are using Airbyte’s
- asset_key_prefix (Optional[List[str]]) – A prefix for the asset keys inside this asset.
- deps (Optional[Sequence[Union[AssetsDefinitionAssetsDefinition, SourceAssetSourceAsset, str, AssetKeyAssetKey]]]) – A list of assets to add as sources.
- upstream_assets (Optional[Set[AssetKeyAssetKey]]) – Deprecated, use deps instead. A list of assets to add as sources.
- freshness_policy (Optional[FreshnessPolicy]) – A freshness policy to apply to the assets
- stream_to_asset_map (Optional[Mapping[str, str]]) – A mapping of an Airbyte stream name to a Dagster asset.
- auto_materialize_policy (Optional[AutoMaterializePolicy]) – An auto materialization policy to apply to the assets.
Ops
- dagster_airbyte.airbyte_sync_op
=
<dagster._core.definitions.op_definition.OpDefinition object> Executes a Airbyte job sync for a given
connection_id
, and polls until that sync completes, raising an error if it is unsuccessful. It outputs a AirbyteOutput which contains the job details for a givenconnection_id
.It requires the use of the
airbyte_resource
, which allows it to communicate with the Airbyte API.Examples:
from dagster import job
from dagster_airbyte import airbyte_resource, airbyte_sync_op
my_airbyte_resource = airbyte_resource.configured(
\{
"host": \{"env": "AIRBYTE_HOST"},
"port": \{"env": "AIRBYTE_PORT"},
}
)
sync_foobar = airbyte_sync_op.configured(\{"connection_id": "foobar"}, name="sync_foobar")
@job(resource_defs=\{"airbyte": my_airbyte_resource})
def my_simple_airbyte_job():
sync_foobar()
@job(resource_defs=\{"airbyte": my_airbyte_resource})
def my_composed_airbyte_job():
final_foobar_state = sync_foobar(start_after=some_op())
other_op(final_foobar_state)