Fivetran (dagster-fivetran)
This library provides a Dagster integration with Fivetran.
Resources
- dagster_fivetran.FivetranResource ResourceDefinition
This class exposes methods on top of the Fivetran REST API.
Assets
- dagster_fivetran.load_assets_from_fivetran_instance
Loads Fivetran connector assets from a configured FivetranResource instance. This fetches information about defined connectors at initialization time, and will error on workspace load if the Fivetran instance is not reachable.
Parameters:
- fivetran (ResourceDefinitionResourceDefinition) – A FivetranResource configured with the appropriate connection
- key_prefix (Optional[CoercibleToAssetKeyPrefix]) – A prefix for the asset keys created.
- connector_to_group_fn (Optional[Callable[[str], Optional[str]]]) – Function which returns an asset
- io_manager_key (Optional[str]) – The IO manager key to use for all assets. Defaults to “io_manager”.
- connector_to_io_manager_key_fn (Optional[Callable[[str], Optional[str]]]) – Function which returns an
- connector_filter (Optional[Callable[[FivetranConnectorMetadata], bool]]) – Optional function which takes
- connector_to_asset_key_fn (Optional[Callable[[FivetranConnectorMetadata, str], AssetKeyAssetKey]]) – Optional function
- destination_ids (Optional[List[str]]) – A list of destination IDs to fetch connectors from. If None, all destinations
- poll_interval (float) – The time (in seconds) that will be waited between successive polls.
- poll_timeout (Optional[float]) – The maximum time that will waited before this operation is
- fetch_column_metadata (bool) – If True, will fetch column schema information for each table in the connector.
Loading all Fivetran connectors as assets:
from dagster_fivetran import fivetran_resource, load_assets_from_fivetran_instance
fivetran_instance = fivetran_resource.configured(
\{
"api_key": "some_key",
"api_secret": "some_secret",
}
)
fivetran_assets = load_assets_from_fivetran_instance(fivetran_instance)Filtering the set of loaded connectors:
from dagster_fivetran import fivetran_resource, load_assets_from_fivetran_instance
fivetran_instance = fivetran_resource.configured(
\{
"api_key": "some_key",
"api_secret": "some_secret",
}
)
fivetran_assets = load_assets_from_fivetran_instance(
fivetran_instance,
connector_filter=lambda meta: "snowflake" in meta.name,
)
- dagster_fivetran.build_fivetran_assets
Build a set of assets for a given Fivetran connector.
Returns an AssetsDefinition which connects the specified
asset_keys
to the computation that will update them. Internally, executes a Fivetran sync for a givenconnector_id
, and polls until that sync completes, raising an error if it is unsuccessful. Requires the use of the fivetran_resourcefivetran_resource
, which allows it to communicate with the Fivetran API.Parameters:
- connector_id (str) – The Fivetran Connector ID that this op will sync. You can retrieve this
- destination_tables (List[str]) – schema_name.table_name for each table that you want to be
- poll_interval (float) – The time (in seconds) that will be waited between successive polls.
- poll_timeout (Optional[float]) – The maximum time that will waited before this operation is
- io_manager_key (Optional[str]) – The io_manager to be used to handle each of these assets.
- asset_key_prefix (Optional[List[str]]) – A prefix for the asset keys inside this asset.
- metadata_by_table_name (Optional[Mapping[str, RawMetadataMapping]]) – A mapping from destination
- group_name (Optional[str]) – A string name used to organize multiple assets into groups. This
- infer_missing_tables (bool) – If True, will create asset materializations for tables specified
- op_tags (Optional[Dict[str, Any]]) – A dictionary of tags for the op that computes the asset. Frameworks may expect and
- fetch_column_metadata (bool) – If True, will fetch column schema information for each table in the connector.
Basic example:
from dagster import AssetKey, repository, with_resources
from dagster_fivetran import fivetran_resource
from dagster_fivetran.assets import build_fivetran_assets
my_fivetran_resource = fivetran_resource.configured(
\{
"api_key": \{"env": "FIVETRAN_API_KEY"},
"api_secret": \{"env": "FIVETRAN_API_SECRET"},
}
)Attaching metadata:
fivetran_assets = build_fivetran_assets(
connector_id="foobar",
table_names=["schema1.table1", "schema2.table2"],
metadata_by_table_name=\{
"schema1.table1": \{
"description": "This is a table that contains foo and bar",
},
"schema2.table2": \{
"description": "This is a table that contains baz and quux",
},
},
)
Ops
- dagster_fivetran.fivetran_sync_op
=
<dagster._core.definitions.op_definition.OpDefinition object> Executes a Fivetran sync for a given
connector_id
, and polls until that sync completes, raising an error if it is unsuccessful. It outputs a FivetranOutput which contains the details of the Fivetran connector after the sync successfully completes, as well as details about which tables the sync updates.It requires the use of the fivetran_resource
fivetran_resource
, which allows it to communicate with the Fivetran API.Examples:
from dagster import job
from dagster_fivetran import fivetran_resource, fivetran_sync_op
my_fivetran_resource = fivetran_resource.configured(
\{
"api_key": \{"env": "FIVETRAN_API_KEY"},
"api_secret": \{"env": "FIVETRAN_API_SECRET"},
}
)
sync_foobar = fivetran_sync_op.configured(\{"connector_id": "foobar"}, name="sync_foobar")
@job(resource_defs=\{"fivetran": my_fivetran_resource})
def my_simple_fivetran_job():
sync_foobar()
@job(resource_defs=\{"fivetran": my_fivetran_resource})
def my_composed_fivetran_job():
final_foobar_state = sync_foobar(start_after=some_op())
other_op(final_foobar_state)
Legacy
- dagster_fivetran.fivetran_resource ResourceDefinition
This resource allows users to programatically interface with the Fivetran REST API to launch syncs and monitor their progress. This currently implements only a subset of the functionality exposed by the API.
For a complete set of documentation on the Fivetran REST API, including expected response JSON schemae, see the Fivetran API Docs.
To configure this resource, we recommend using the configured method.
Examples:from dagster import job
from dagster_fivetran import fivetran_resource
my_fivetran_resource = fivetran_resource.configured(
\{
"api_key": \{"env": "FIVETRAN_API_KEY"},
"api_secret": \{"env": "FIVETRAN_API_SECRET"},
}
)
@job(resource_defs=\{"fivetran":my_fivetran_resource})
def my_fivetran_job():
...