Airbyte (dagster-airbyte)
This library provides a Dagster integration with Airbyte.
For more information on getting started, see the Airbyte integration guide.
Assets (Airbyte API)
- class dagster_airbyte.AirbyteCloudWorkspace
- experimental
This API may break in future versions, even between dot releases.
This class represents a Airbyte Cloud workspace and provides utilities to interact with Airbyte APIs.
- sync_and_poll
- experimental
This API may break in future versions, even between dot releases.
Executes a sync and poll process to materialize Airbyte Cloud assets. This method can only be used in the context of an asset execution.
Parameters: context (AssetExecutionContext) – The execution context from within @airbyte_assets.Returns: An iterator of MaterializeResult or AssetMaterialization.
Return type: Iterator[Union[AssetMaterialization, MaterializeResult]]
- class dagster_airbyte.DagsterAirbyteTranslator
- experimental
This API may break in future versions, even between dot releases.
Translator class which converts a AirbyteConnectionTableProps object into AssetSpecs. Subclass this class to implement custom logic how to translate Airbyte content into asset spec.
- dagster_airbyte.load_airbyte_cloud_asset_specs
- experimental
This API may break in future versions, even between dot releases.
Returns a list of AssetSpecs representing the Airbyte content in the workspace.
Parameters:
- workspace (AirbyteCloudWorkspace) – The Airbyte Cloud workspace to fetch assets from.
- dagster_airbyte_translator (Optional[DagsterAirbyteTranslator], optional) – The translator to use
Returns: The set of assets representing the Airbyte content in the workspace.Return type: List[AssetSpec] Examples:
Loading the asset specs for a given Airbyte Cloud workspace:
from dagster_airbyte import AirbyteCloudWorkspace, load_airbyte_cloud_asset_specs
import dagster as dg
airbyte_cloud_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)
airbyte_cloud_specs = load_airbyte_cloud_asset_specs(airbyte_cloud_workspace)
defs = dg.Definitions(assets=airbyte_cloud_specs)
- @dagster_airbyte.airbyte_assets
- experimental
This API may break in future versions, even between dot releases.
Create a definition for how to sync the tables of a given Airbyte connection.
Parameters:
- connection_id (str) – The Airbyte Connection ID.
- workspace (AirbyteCloudWorkspace) – The Airbyte workspace to fetch assets from.
- name (Optional[str], optional) – The name of the op.
- group_name (Optional[str], optional) – The name of the asset group.
- dagster_airbyte_translator (Optional[DagsterAirbyteTranslator], optional) – The translator to use
Examples:
Sync the tables of an Airbyte connection:
from dagster_airbyte import AirbyteCloudWorkspace, airbyte_assets
import dagster as dg
airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)
@airbyte_assets(
connection_id="airbyte_connection_id",
workspace=airbyte_workspace,
)
def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace):
yield from airbyte.sync_and_poll(context=context)
defs = dg.Definitions(
assets=[airbyte_connection_assets],
resources=\{"airbyte": airbyte_workspace},
)Sync the tables of an Airbyte connection with a custom translator:
from dagster_airbyte import (
DagsterAirbyteTranslator,
AirbyteConnectionTableProps,
AirbyteCloudWorkspace,
airbyte_assets
)
import dagster as dg
class CustomDagsterAirbyteTranslator(DagsterAirbyteTranslator):
def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec:
default_spec = super().get_asset_spec(props)
return default_spec.merge_attributes(
metadata=\{"custom": "metadata"},
)
airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)
@airbyte_assets(
connection_id="airbyte_connection_id",
workspace=airbyte_workspace,
dagster_airbyte_translator=CustomDagsterAirbyteTranslator()
)
def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace):
yield from airbyte.sync_and_poll(context=context)
defs = dg.Definitions(
assets=[airbyte_connection_assets],
resources=\{"airbyte": airbyte_workspace},
)
- dagster_airbyte.build_airbyte_assets_definitions
- experimental
This API may break in future versions, even between dot releases.
The list of AssetsDefinition for all connections in the Airbyte workspace.
Parameters:
- workspace (AirbyteCloudWorkspace) – The Airbyte workspace to fetch assets from.
- dagster_airbyte_translator (Optional[DagsterAirbyteTranslator], optional) – The translator to use
Returns: The list of AssetsDefinition for all connections in the Airbyte workspace.Return type: List[AssetsDefinition] Examples:
Sync the tables of a Airbyte connection:
from dagster_airbyte import AirbyteCloudWorkspace, build_airbyte_assets_definitions
import dagster as dg
airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)
airbyte_assets = build_airbyte_assets_definitions(workspace=workspace)
defs = dg.Definitions(
assets=airbyte_assets,
resources=\{"airbyte": airbyte_workspace},
)Sync the tables of a Airbyte connection with a custom translator:
from dagster_airbyte import (
DagsterAirbyteTranslator,
AirbyteConnectionTableProps,
AirbyteCloudWorkspace,
build_airbyte_assets_definitions
)
import dagster as dg
class CustomDagsterAirbyteTranslator(DagsterAirbyteTranslator):
def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec:
default_spec = super().get_asset_spec(props)
return default_spec.merge_attributes(
metadata=\{"custom": "metadata"},
)
airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)
airbyte_assets = build_airbyte_assets_definitions(
workspace=workspace,
dagster_airbyte_translator=CustomDagsterAirbyteTranslator()
)
defs = dg.Definitions(
assets=airbyte_assets,
resources=\{"airbyte": airbyte_workspace},
)
Legacy
- dagster_airbyte.AirbyteResource ResourceDefinition
This resource allows users to programatically interface with the Airbyte REST API to launch syncs and monitor their progress.
Examples:from dagster import job, EnvVar
from dagster_airbyte import AirbyteResource
my_airbyte_resource = AirbyteResource(
host=EnvVar("AIRBYTE_HOST"),
port=EnvVar("AIRBYTE_PORT"),
# If using basic auth
username=EnvVar("AIRBYTE_USERNAME"),
password=EnvVar("AIRBYTE_PASSWORD"),
)
airbyte_assets = build_airbyte_assets(
connection_id="87b7fe85-a22c-420e-8d74-b30e7ede77df",
destination_tables=["releases", "tags", "teams"],
)
defs = Definitions(
assets=[airbyte_assets],
resources=\{"airbyte": my_airbyte_resource},
)
- dagster_airbyte.load_assets_from_airbyte_instance
Loads Airbyte connection assets from a configured AirbyteResource instance. This fetches information about defined connections at initialization time, and will error on workspace load if the Airbyte instance is not reachable.
Parameters:
- airbyte (ResourceDefinition) – An AirbyteResource configured with the appropriate connection
- workspace_id (Optional[str]) – The ID of the Airbyte workspace to load connections from. Only
- key_prefix (Optional[CoercibleToAssetKeyPrefix]) – A prefix for the asset keys created.
- create_assets_for_normalization_tables (bool) – If True, assets will be created for tables
- connection_to_group_fn (Optional[Callable[[str], Optional[str]]]) – Function which returns an asset
- connection_meta_to_group_fn (Optional[Callable[[AirbyteConnectionMetadata], Optional[str]]]) – Function which
- io_manager_key (Optional[str]) – The I/O manager key to use for all assets. Defaults to “io_manager”.
- connection_to_io_manager_key_fn (Optional[Callable[[str], Optional[str]]]) – Function which returns an
- connection_filter (Optional[Callable[[AirbyteConnectionMetadata], bool]]) – Optional function which takes
- connection_to_asset_key_fn (Optional[Callable[[AirbyteConnectionMetadata, str], AssetKey]]) – Optional function which
- connection_to_freshness_policy_fn (Optional[Callable[[AirbyteConnectionMetadata], Optional[FreshnessPolicy]]]) – Optional function
- connection_to_auto_materialize_policy_fn (Optional[Callable[[AirbyteConnectionMetadata], Optional[AutoMaterializePolicy]]]) – Optional
Loading all Airbyte connections as assets:
from dagster_airbyte import airbyte_resource, load_assets_from_airbyte_instance
airbyte_instance = airbyte_resource.configured(
\{
"host": "localhost",
"port": "8000",
}
)
airbyte_assets = load_assets_from_airbyte_instance(airbyte_instance)Filtering the set of loaded connections:
from dagster_airbyte import airbyte_resource, load_assets_from_airbyte_instance
airbyte_instance = airbyte_resource.configured(
\{
"host": "localhost",
"port": "8000",
}
)
airbyte_assets = load_assets_from_airbyte_instance(
airbyte_instance,
connection_filter=lambda meta: "snowflake" in meta.name,
)
- dagster_airbyte.build_airbyte_assets
Builds a set of assets representing the tables created by an Airbyte sync operation.
Parameters:
- connection_id (str) – The Airbyte Connection ID that this op will sync. You can retrieve this
- destination_tables (List[str]) – The names of the tables that you want to be represented
- destination_database (Optional[str]) – The name of the destination database.
- destination_schema (Optional[str]) – The name of the destination schema.
- normalization_tables (Optional[Mapping[str, List[str]]]) – If you are using Airbyte’s
- asset_key_prefix (Optional[List[str]]) – A prefix for the asset keys inside this asset.
- deps (Optional[Sequence[Union[AssetsDefinition, SourceAsset, str, AssetKey]]]) – A list of assets to add as sources.
- upstream_assets (Optional[Set[AssetKey]]) – Deprecated, use deps instead. A list of assets to add as sources.
- freshness_policy (Optional[FreshnessPolicy]) – A freshness policy to apply to the assets
- stream_to_asset_map (Optional[Mapping[str, str]]) – A mapping of an Airbyte stream name to a Dagster asset.
- auto_materialize_policy (Optional[AutoMaterializePolicy]) – An auto materialization policy to apply to the assets.
- dagster_airbyte.airbyte_sync_op
=
<dagster._core.definitions.op_definition.OpDefinition object> Executes a Airbyte job sync for a given
connection_id
, and polls until that sync completes, raising an error if it is unsuccessful. It outputs a AirbyteOutput which contains the job details for a givenconnection_id
.It requires the use of the
airbyte_resource
, which allows it to communicate with the Airbyte API.Examples:
from dagster import job
from dagster_airbyte import airbyte_resource, airbyte_sync_op
my_airbyte_resource = airbyte_resource.configured(
\{
"host": \{"env": "AIRBYTE_HOST"},
"port": \{"env": "AIRBYTE_PORT"},
}
)
sync_foobar = airbyte_sync_op.configured(\{"connection_id": "foobar"}, name="sync_foobar")
@job(resource_defs=\{"airbyte": my_airbyte_resource})
def my_simple_airbyte_job():
sync_foobar()
@job(resource_defs=\{"airbyte": my_airbyte_resource})
def my_composed_airbyte_job():
final_foobar_state = sync_foobar(start_after=some_op())
other_op(final_foobar_state)