Skip to main content

GCP (dagster-gcp)

BigQuery

Related Guides:

BigQuery Resource

dagster_gcp.BigQueryResource ResourceDefinition

Resource for interacting with Google BigQuery.

Examples:

from dagster import Definitions, asset
from dagster_gcp import BigQueryResource

@asset
def my_table(bigquery: BigQueryResource):
with bigquery.get_client() as client:
client.query("SELECT * FROM my_dataset.my_table")

defs = Definitions(
assets=[my_table],
resources=\{
"bigquery": BigQueryResource(project="my-project")
}
)

BigQuery I/O Manager

dagster_gcp.BigQueryIOManager IOManagerDefinition

Base class for an I/O manager definition that reads inputs from and writes outputs to BigQuery.

Examples:

from dagster_gcp import BigQueryIOManager
from dagster_bigquery_pandas import BigQueryPandasTypeHandler
from dagster import Definitions, EnvVar

class MyBigQueryIOManager(BigQueryIOManager):
@staticmethod
def type_handlers() -> Sequence[DbTypeHandler]:
return [BigQueryPandasTypeHandler()]

@asset(
key_prefix=["my_dataset"] # my_dataset will be used as the dataset in BigQuery
)
def my_table() -> pd.DataFrame: # the name of the asset will be the table name
...

defs = Definitions(
assets=[my_table],
resources=\{
"io_manager": MyBigQueryIOManager(project=EnvVar("GCP_PROJECT"))
}
)

You can set a default dataset to store the assets using the dataset configuration value of the BigQuery I/O Manager. This dataset will be used if no other dataset is specified directly on an asset or op.

defs = Definitions(
assets=[my_table],
resources=\{
"io_manager": MyBigQueryIOManager(project=EnvVar("GCP_PROJECT"), dataset="my_dataset")
}
)

On individual assets, you an also specify the dataset where they should be stored using metadata or by adding a key_prefix to the asset key. If both key_prefix and metadata are defined, the metadata will take precedence.

@asset(
key_prefix=["my_dataset"] # will be used as the dataset in BigQuery
)
def my_table() -> pd.DataFrame:
...

@asset(
# note that the key needs to be "schema"
metadata=\{"schema": "my_dataset"} # will be used as the dataset in BigQuery
)
def my_other_table() -> pd.DataFrame:
...

For ops, the dataset can be specified by including a “schema” entry in output metadata.

@op(
out=\{"my_table": Out(metadata=\{"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
...

If none of these is provided, the dataset will default to “public”.

To only use specific columns of a table as input to a downstream op or asset, add the metadata columns to the In or AssetIn.

@asset(
ins=\{"my_table": AssetIn("my_table", metadata=\{"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame:
# my_table will just contain the data from column "a"
...

If you cannot upload a file to your Dagster deployment, or otherwise cannot authenticate with GCP via a standard method, you can provide a service account key as the gcp_credentials configuration. Dagster will store this key in a temporary file and set GOOGLE_APPLICATION_CREDENTIALS to point to the file. After the run completes, the file will be deleted, and GOOGLE_APPLICATION_CREDENTIALS will be unset. The key must be base64 encoded to avoid issues with newlines in the keys. You can retrieve the base64 encoded with this shell command: cat $GOOGLE_APPLICATION_CREDENTIALS | base64

BigQuery Ops

dagster_gcp.bq_create_dataset

BigQuery Create Dataset.

This op encapsulates creating a BigQuery dataset.

Expects a BQ client to be provisioned in resources as context.resources.bigquery.

dagster_gcp.bq_delete_dataset

BigQuery Delete Dataset.

This op encapsulates deleting a BigQuery dataset.

Expects a BQ client to be provisioned in resources as context.resources.bigquery.

dagster_gcp.bq_op_for_queries

Executes BigQuery SQL queries.

Expects a BQ client to be provisioned in resources as context.resources.bigquery.

dagster_gcp.import_df_to_bq
dagster_gcp.import_file_to_bq
dagster_gcp.import_gcs_paths_to_bq

Data Freshness

dagster_gcp.fetch_last_updated_timestamps

Get the last updated timestamps of a list BigQuery table.

Note that this only works on BigQuery tables, and not views.

Parameters:

  • client (bigquery.Client) – The BigQuery client.
  • dataset_id (str) – The BigQuery dataset ID.
  • table_ids (Sequence[str]) – The table IDs to get the last updated timestamp for.

Returns: A mapping of table IDs to their last updated timestamps (UTC).Return type: Mapping[str, datetime]

GCS

GCS Resource

dagster_gcp.GCSResource ResourceDefinition

Resource for interacting with Google Cloud Storage.

Example:

@asset
def my_asset(gcs: GCSResource):
with gcs.get_client() as client:
# client is a google.cloud.storage.Client
...

GCS I/O Manager

dagster_gcp.GCSPickleIOManager IOManagerDefinition

Persistent IO manager using GCS for storage.

Serializes objects via pickling. Suitable for objects storage for distributed executors, so long as each execution node has network connectivity and credentials for GCS and the backing bucket.

Assigns each op output to a unique filepath containing run ID, step key, and output name. Assigns each asset to a single filesystem path, at \<base_dir>/\<asset_key>. If the asset key has multiple components, the final component is used as the name of the file, and the preceding components as parent directories under the base_dir.

Subsequent materializations of an asset will overwrite previous materializations of that asset. With a base directory of /my/base/path, an asset with key AssetKey(["one", "two", "three"]) would be stored in a file called three in a directory with path /my/base/path/one/two/.

Example usage:

  1. Attach this IO manager to a set of assets.
    from dagster import asset, Definitions
    from dagster_gcp.gcs import GCSPickleIOManager, GCSResource

    @asset
    def asset1():
    # create df ...
    return df

    @asset
    def asset2(asset1):
    return asset1[:5]

    defs = Definitions(
    assets=[asset1, asset2],
    resources=\{
    "io_manager": GCSPickleIOManager(
    gcs_bucket="my-cool-bucket",
    gcs_prefix="my-cool-prefix",
    gcs=GCSResource(project="my-cool-project")
    ),

    }
    )
  2. Attach this IO manager to your job to make it available to your ops.
    from dagster import job
    from dagster_gcp.gcs import GCSPickleIOManager, GCSResource

    @job(
    resource_defs=\{
    "io_manager": GCSPickleIOManager(
    gcs=GCSResource(project="my-cool-project")
    gcs_bucket="my-cool-bucket",
    gcs_prefix="my-cool-prefix"
    ),
    }
    )
    def my_job():
    ...

GCS Sensor

dagster_gcp.gcs.sensor.get_gcs_keys

Return a list of updated keys in a GCS bucket.

Parameters:

  • bucket (str) – The name of the GCS bucket.
  • prefix (Optional[str]) – The prefix to filter the keys by.
  • since_key (Optional[str]) – The key to start from. If provided, only keys updated after this key will be returned.
  • gcs_session (Optional[google.cloud.storage.client.Client]) – A GCS client session. If not provided, a new session will be created.

Returns: A list of keys in the bucket, sorted by update time, that are newer than the since_key.Return type: List[str] Example:

@resource
def google_cloud_storage_client(context):
return storage.Client().from_service_account_json("my-service-account.json")

@sensor(job=my_job, required_resource_keys=\{"google_cloud_storage_client"})
def my_gcs_sensor(context):
since_key = context.cursor or None
new_gcs_keys = get_gcs_keys(
"my-bucket",
prefix="data",
since_key=since_key,
gcs_session=context.resources.google_cloud_storage_client
)

if not new_gcs_keys:
return SkipReason("No new gcs files found for bucket 'my-bucket'.")

for gcs_key in new_gcs_keys:
yield RunRequest(run_key=gcs_key, run_config=\{
"ops": \{
"gcs_files": \{
"config": \{
"gcs_key": gcs_key
}
}
}
})

last_key = new_gcs_keys[-1]
context.update_cursor(last_key)

File Manager (Experimental)

class dagster_gcp.GCSFileHandle

A reference to a file on GCS.

dagster_gcp.GCSFileManagerResource ResourceDefinition

FileManager that provides abstract access to GCS.

GCS Compute Log Manager

class dagster_gcp.gcs.GCSComputeLogManager

Logs op compute function stdout and stderr to GCS.

Users should not instantiate this class directly. Instead, use a YAML block in dagster.yaml such as the following:

compute_logs:
module: dagster_gcp.gcs.compute_log_manager
class: GCSComputeLogManager
config:
bucket: "mycorp-dagster-compute-logs"
local_dir: "/tmp/cool"
prefix: "dagster-test-"
upload_interval: 30

There are more configuration examples in the instance documentation guide: https://docs.dagster.io/deployment/dagster-instance#compute-log-storage

Parameters:

  • bucket (str) – The name of the GCS bucket to which to log.
  • local_dir (Optional[str]) – Path to the local directory in which to stage logs. Default:
  • prefix (Optional[str]) – Prefix for the log file keys.
  • json_credentials_envvar (Optional[str]) – Environment variable that contains the JSON with a private key
  • upload_interval – (Optional[int]): Interval in seconds to upload partial log files to GCS. By default, will only upload when the capture is complete.
  • show_url_only – (Optional[bool]): Only show the URL of the log file in the UI, instead of fetching and displaying the full content. Default False.
  • inst_data (Optional[ConfigurableClassData]) – Serializable representation of the compute

Dataproc

Dataproc Resource

dagster_gcp.DataprocResource ResourceDefinition

Resource for connecting to a Dataproc cluster.

Example:

@asset
def my_asset(dataproc: DataprocResource):
with dataproc.get_client() as client:
# client is a dagster_gcp.DataprocClient
...

Legacy

dagster_gcp.ConfigurablePickledObjectGCSIOManager IOManagerDefinition
deprecated

This API will be removed in version 2.0. Please use GCSPickleIOManager instead..

Renamed to GCSPickleIOManager. See GCSPickleIOManager for documentation.

dagster_gcp.bigquery_resource ResourceDefinition
dagster_gcp.build_bigquery_io_manager IOManagerDefinition
experimental

This API may break in future versions, even between dot releases.

Builds an I/O manager definition that reads inputs from and writes outputs to BigQuery.

Parameters:

  • type_handlers (Sequence[DbTypeHandler]) – Each handler defines how to translate between
  • default_load_type (Type) – When an input has no type annotation, load it as this type.

Returns: IOManagerDefinition Examples:

from dagster_gcp import build_bigquery_io_manager
from dagster_bigquery_pandas import BigQueryPandasTypeHandler
from dagster import Definitions

@asset(
key_prefix=["my_prefix"],
metadata=\{"schema": "my_dataset"} # will be used as the dataset in BigQuery
)
def my_table() -> pd.DataFrame: # the name of the asset will be the table name
...

@asset(
key_prefix=["my_dataset"] # my_dataset will be used as the dataset in BigQuery
)
def my_second_table() -> pd.DataFrame: # the name of the asset will be the table name
...

bigquery_io_manager = build_bigquery_io_manager([BigQueryPandasTypeHandler()])

defs = Definitions(
assets=[my_table, my_second_table],
resources=\{
"io_manager": bigquery_io_manager.configured(\{
"project" : \{"env": "GCP_PROJECT"}
})
}
)

You can set a default dataset to store the assets using the dataset configuration value of the BigQuery I/O Manager. This dataset will be used if no other dataset is specified directly on an asset or op.

defs = Definitions(
assets=[my_table],
resources=\{
"io_manager": bigquery_io_manager.configured(\{
"project" : \{"env": "GCP_PROJECT"}
"dataset": "my_dataset"
})
}
)

On individual assets, you an also specify the dataset where they should be stored using metadata or by adding a key_prefix to the asset key. If both key_prefix and metadata are defined, the metadata will take precedence.

@asset(
key_prefix=["my_dataset"] # will be used as the dataset in BigQuery
)
def my_table() -> pd.DataFrame:
...

@asset(
# note that the key needs to be "schema"
metadata=\{"schema": "my_dataset"} # will be used as the dataset in BigQuery
)
def my_other_table() -> pd.DataFrame:
...

For ops, the dataset can be specified by including a “schema” entry in output metadata.

@op(
out=\{"my_table": Out(metadata=\{"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
...

If none of these is provided, the dataset will default to “public”.

To only use specific columns of a table as input to a downstream op or asset, add the metadata columns to the In or AssetIn.

@asset(
ins=\{"my_table": AssetIn("my_table", metadata=\{"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame:
# my_table will just contain the data from column "a"
...

If you cannot upload a file to your Dagster deployment, or otherwise cannot authenticate with GCP via a standard method, you can provide a service account key as the gcp_credentials configuration. Dagster willstore this key in a temporary file and set GOOGLE_APPLICATION_CREDENTIALS to point to the file. After the run completes, the file will be deleted, and GOOGLE_APPLICATION_CREDENTIALS will be unset. The key must be base64 encoded to avoid issues with newlines in the keys. You can retrieve the base64 encoded with this shell command: cat $GOOGLE_APPLICATION_CREDENTIALS | base64

dagster_gcp.gcs_resource ResourceDefinition
dagster_gcp.gcs_pickle_io_manager IOManagerDefinition

Persistent IO manager using GCS for storage.

Serializes objects via pickling. Suitable for objects storage for distributed executors, so long as each execution node has network connectivity and credentials for GCS and the backing bucket.

Assigns each op output to a unique filepath containing run ID, step key, and output name. Assigns each asset to a single filesystem path, at \<base_dir>/\<asset_key>. If the asset key has multiple components, the final component is used as the name of the file, and the preceding components as parent directories under the base_dir.

Subsequent materializations of an asset will overwrite previous materializations of that asset. With a base directory of /my/base/path, an asset with key AssetKey(["one", "two", "three"]) would be stored in a file called three in a directory with path /my/base/path/one/two/.

Example usage:

  1. Attach this IO manager to a set of assets.
    from dagster import Definitions, asset
    from dagster_gcp.gcs import gcs_pickle_io_manager, gcs_resource

    @asset
    def asset1():
    # create df ...
    return df

    @asset
    def asset2(asset1):
    return asset1[:5]

    defs = Definitions(
    assets=[asset1, asset2],
    resources=\{
    "io_manager": gcs_pickle_io_manager.configured(
    \{"gcs_bucket": "my-cool-bucket", "gcs_prefix": "my-cool-prefix"}
    ),
    "gcs": gcs_resource.configured(\{"project": "my-cool-project"}),
    },
    )
  2. Attach this IO manager to your job to make it available to your ops.
    from dagster import job
    from dagster_gcp.gcs import gcs_pickle_io_manager, gcs_resource

    @job(
    resource_defs=\{
    "io_manager": gcs_pickle_io_manager.configured(
    \{"gcs_bucket": "my-cool-bucket", "gcs_prefix": "my-cool-prefix"}
    ),
    "gcs": gcs_resource.configured(\{"project": "my-cool-project"}),
    },
    )
    def my_job():
    ...
dagster_gcp.gcs_file_manager ResourceDefinition

FileManager that provides abstract access to GCS.

Implements the FileManager API.

dagster_gcp.dataproc_resource ResourceDefinition