Skip to main content

Fivetran (dagster-fivetran)

This library provides a Dagster integration with Fivetran.

Resources

dagster_fivetran.FivetranResource ResourceDefinition

This class exposes methods on top of the Fivetran REST API.

Assets

dagster_fivetran.load_assets_from_fivetran_instance

Loads Fivetran connector assets from a configured FivetranResource instance. This fetches information about defined connectors at initialization time, and will error on workspace load if the Fivetran instance is not reachable.

Parameters:

  • fivetran (ResourceDefinitionResourceDefinition) – A FivetranResource configured with the appropriate connection
  • key_prefix (Optional[CoercibleToAssetKeyPrefix]) – A prefix for the asset keys created.
  • connector_to_group_fn (Optional[Callable[[str], Optional[str]]]) – Function which returns an asset
  • io_manager_key (Optional[str]) – The IO manager key to use for all assets. Defaults to “io_manager”.
  • connector_to_io_manager_key_fn (Optional[Callable[[str], Optional[str]]]) – Function which returns an
  • connector_filter (Optional[Callable[[FivetranConnectorMetadata], bool]]) – Optional function which takes
  • connector_to_asset_key_fn (Optional[Callable[[FivetranConnectorMetadata, str], AssetKeyAssetKey]]) – Optional function
  • destination_ids (Optional[List[str]]) – A list of destination IDs to fetch connectors from. If None, all destinations
  • poll_interval (float) – The time (in seconds) that will be waited between successive polls.
  • poll_timeout (Optional[float]) – The maximum time that will waited before this operation is
  • fetch_column_metadata (bool) – If True, will fetch column schema information for each table in the connector.
Examples:

Loading all Fivetran connectors as assets:

from dagster_fivetran import fivetran_resource, load_assets_from_fivetran_instance

fivetran_instance = fivetran_resource.configured(
\{
"api_key": "some_key",
"api_secret": "some_secret",
}
)
fivetran_assets = load_assets_from_fivetran_instance(fivetran_instance)

Filtering the set of loaded connectors:

from dagster_fivetran import fivetran_resource, load_assets_from_fivetran_instance

fivetran_instance = fivetran_resource.configured(
\{
"api_key": "some_key",
"api_secret": "some_secret",
}
)
fivetran_assets = load_assets_from_fivetran_instance(
fivetran_instance,
connector_filter=lambda meta: "snowflake" in meta.name,
)
dagster_fivetran.build_fivetran_assets

Build a set of assets for a given Fivetran connector.

Returns an AssetsDefinition which connects the specified asset_keys to the computation that will update them. Internally, executes a Fivetran sync for a given connector_id, and polls until that sync completes, raising an error if it is unsuccessful. Requires the use of the fivetran_resourcefivetran_resource, which allows it to communicate with the Fivetran API.

Parameters:

  • connector_id (str) – The Fivetran Connector ID that this op will sync. You can retrieve this
  • destination_tables (List[str]) – schema_name.table_name for each table that you want to be
  • poll_interval (float) – The time (in seconds) that will be waited between successive polls.
  • poll_timeout (Optional[float]) – The maximum time that will waited before this operation is
  • io_manager_key (Optional[str]) – The io_manager to be used to handle each of these assets.
  • asset_key_prefix (Optional[List[str]]) – A prefix for the asset keys inside this asset.
  • metadata_by_table_name (Optional[Mapping[str, RawMetadataMapping]]) – A mapping from destination
  • group_name (Optional[str]) – A string name used to organize multiple assets into groups. This
  • infer_missing_tables (bool) – If True, will create asset materializations for tables specified
  • op_tags (Optional[Dict[str, Any]]) – A dictionary of tags for the op that computes the asset. Frameworks may expect and
  • fetch_column_metadata (bool) – If True, will fetch column schema information for each table in the connector.
Examples:

Basic example:

from dagster import AssetKey, repository, with_resources

from dagster_fivetran import fivetran_resource
from dagster_fivetran.assets import build_fivetran_assets

my_fivetran_resource = fivetran_resource.configured(
\{
"api_key": \{"env": "FIVETRAN_API_KEY"},
"api_secret": \{"env": "FIVETRAN_API_SECRET"},
}
)

Attaching metadata:

fivetran_assets = build_fivetran_assets(
connector_id="foobar",
table_names=["schema1.table1", "schema2.table2"],
metadata_by_table_name=\{
"schema1.table1": \{
"description": "This is a table that contains foo and bar",
},
"schema2.table2": \{
"description": "This is a table that contains baz and quux",
},
},
)

Ops

dagster_fivetran.fivetran_sync_op = <dagster._core.definitions.op_definition.OpDefinition object>

Executes a Fivetran sync for a given connector_id, and polls until that sync completes, raising an error if it is unsuccessful. It outputs a FivetranOutput which contains the details of the Fivetran connector after the sync successfully completes, as well as details about which tables the sync updates.

It requires the use of the fivetran_resourcefivetran_resource, which allows it to communicate with the Fivetran API.

Examples:

from dagster import job
from dagster_fivetran import fivetran_resource, fivetran_sync_op

my_fivetran_resource = fivetran_resource.configured(
\{
"api_key": \{"env": "FIVETRAN_API_KEY"},
"api_secret": \{"env": "FIVETRAN_API_SECRET"},
}
)

sync_foobar = fivetran_sync_op.configured(\{"connector_id": "foobar"}, name="sync_foobar")

@job(resource_defs=\{"fivetran": my_fivetran_resource})
def my_simple_fivetran_job():
sync_foobar()

@job(resource_defs=\{"fivetran": my_fivetran_resource})
def my_composed_fivetran_job():
final_foobar_state = sync_foobar(start_after=some_op())
other_op(final_foobar_state)

Legacy

dagster_fivetran.fivetran_resource ResourceDefinition

This resource allows users to programatically interface with the Fivetran REST API to launch syncs and monitor their progress. This currently implements only a subset of the functionality exposed by the API.

For a complete set of documentation on the Fivetran REST API, including expected response JSON schemae, see the Fivetran API Docs.

To configure this resource, we recommend using the configured method.

Examples:
from dagster import job
from dagster_fivetran import fivetran_resource

my_fivetran_resource = fivetran_resource.configured(
\{
"api_key": \{"env": "FIVETRAN_API_KEY"},
"api_secret": \{"env": "FIVETRAN_API_SECRET"},
}
)

@job(resource_defs=\{"fivetran":my_fivetran_resource})
def my_fivetran_job():
...