GCP (dagster-gcp)
BigQuery
Related Guides:
BigQuery Resource
- dagster_gcp.BigQueryResource ResourceDefinition
Resource for interacting with Google BigQuery.
Examples:
from dagster import Definitions, asset
from dagster_gcp import BigQueryResource
@asset
def my_table(bigquery: BigQueryResource):
with bigquery.get_client() as client:
client.query("SELECT * FROM my_dataset.my_table")
defs = Definitions(
assets=[my_table],
resources=\{
"bigquery": BigQueryResource(project="my-project")
}
)
BigQuery I/O Manager
- dagster_gcp.BigQueryIOManager IOManagerDefinition
Base class for an I/O manager definition that reads inputs from and writes outputs to BigQuery.
Examples:
from dagster_gcp import BigQueryIOManager
from dagster_bigquery_pandas import BigQueryPandasTypeHandler
from dagster import Definitions, EnvVar
class MyBigQueryIOManager(BigQueryIOManager):
@staticmethod
def type_handlers() -> Sequence[DbTypeHandler]:
return [BigQueryPandasTypeHandler()]
@asset(
key_prefix=["my_dataset"] # my_dataset will be used as the dataset in BigQuery
)
def my_table() -> pd.DataFrame: # the name of the asset will be the table name
...
defs = Definitions(
assets=[my_table],
resources=\{
"io_manager": MyBigQueryIOManager(project=EnvVar("GCP_PROJECT"))
}
)You can set a default dataset to store the assets using the
dataset
configuration value of the BigQuery I/O Manager. This dataset will be used if no other dataset is specified directly on an asset or op.defs = Definitions(
assets=[my_table],
resources=\{
"io_manager": MyBigQueryIOManager(project=EnvVar("GCP_PROJECT"), dataset="my_dataset")
}
)On individual assets, you an also specify the dataset where they should be stored using metadata or by adding a
key_prefix
to the asset key. If bothkey_prefix
and metadata are defined, the metadata will take precedence.@asset(
key_prefix=["my_dataset"] # will be used as the dataset in BigQuery
)
def my_table() -> pd.DataFrame:
...
@asset(
# note that the key needs to be "schema"
metadata=\{"schema": "my_dataset"} # will be used as the dataset in BigQuery
)
def my_other_table() -> pd.DataFrame:
...For ops, the dataset can be specified by including a “schema” entry in output metadata.
@op(
out=\{"my_table": Out(metadata=\{"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
...If none of these is provided, the dataset will default to “public”.
To only use specific columns of a table as input to a downstream op or asset, add the metadata
columns
to theIn
orAssetIn
.@asset(
ins=\{"my_table": AssetIn("my_table", metadata=\{"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame:
# my_table will just contain the data from column "a"
...If you cannot upload a file to your Dagster deployment, or otherwise cannot authenticate with GCP via a standard method, you can provide a service account key as the
gcp_credentials
configuration. Dagster will store this key in a temporary file and setGOOGLE_APPLICATION_CREDENTIALS
to point to the file. After the run completes, the file will be deleted, andGOOGLE_APPLICATION_CREDENTIALS
will be unset. The key must be base64 encoded to avoid issues with newlines in the keys. You can retrieve the base64 encoded with this shell command:cat $GOOGLE_APPLICATION_CREDENTIALS | base64
BigQuery Ops
- dagster_gcp.bq_create_dataset
BigQuery Create Dataset.
This op encapsulates creating a BigQuery dataset.
Expects a BQ client to be provisioned in resources as context.resources.bigquery.
- dagster_gcp.bq_delete_dataset
BigQuery Delete Dataset.
This op encapsulates deleting a BigQuery dataset.
Expects a BQ client to be provisioned in resources as context.resources.bigquery.
- dagster_gcp.bq_op_for_queries
Executes BigQuery SQL queries.
Expects a BQ client to be provisioned in resources as context.resources.bigquery.
Data Freshness
- dagster_gcp.fetch_last_updated_timestamps
Get the last updated timestamps of a list BigQuery table.
Note that this only works on BigQuery tables, and not views.
Parameters:
- client (bigquery.Client) – The BigQuery client.
- dataset_id (str) – The BigQuery dataset ID.
- table_ids (Sequence[str]) – The table IDs to get the last updated timestamp for.
Returns: A mapping of table IDs to their last updated timestamps (UTC).Return type: Mapping[str, datetime]
GCS
GCS Resource
- dagster_gcp.GCSResource ResourceDefinition
Resource for interacting with Google Cloud Storage.
Example:
@asset
def my_asset(gcs: GCSResource):
with gcs.get_client() as client:
# client is a google.cloud.storage.Client
...
GCS I/O Manager
- dagster_gcp.GCSPickleIOManager IOManagerDefinition
Persistent IO manager using GCS for storage.
Serializes objects via pickling. Suitable for objects storage for distributed executors, so long as each execution node has network connectivity and credentials for GCS and the backing bucket.
Assigns each op output to a unique filepath containing run ID, step key, and output name. Assigns each asset to a single filesystem path, at
\<base_dir>/\<asset_key>
. If the asset key has multiple components, the final component is used as the name of the file, and the preceding components as parent directories under the base_dir.Subsequent materializations of an asset will overwrite previous materializations of that asset. With a base directory of
/my/base/path
, an asset with keyAssetKey(["one", "two", "three"])
would be stored in a file calledthree
in a directory with path/my/base/path/one/two/
.Example usage:
- Attach this IO manager to a set of assets.
from dagster import asset, Definitions
from dagster_gcp.gcs import GCSPickleIOManager, GCSResource
@asset
def asset1():
# create df ...
return df
@asset
def asset2(asset1):
return asset1[:5]
defs = Definitions(
assets=[asset1, asset2],
resources=\{
"io_manager": GCSPickleIOManager(
gcs_bucket="my-cool-bucket",
gcs_prefix="my-cool-prefix",
gcs=GCSResource(project="my-cool-project")
),
}
) - Attach this IO manager to your job to make it available to your ops.
from dagster import job
from dagster_gcp.gcs import GCSPickleIOManager, GCSResource
@job(
resource_defs=\{
"io_manager": GCSPickleIOManager(
gcs=GCSResource(project="my-cool-project")
gcs_bucket="my-cool-bucket",
gcs_prefix="my-cool-prefix"
),
}
)
def my_job():
...
- Attach this IO manager to a set of assets.
GCS Sensor
- dagster_gcp.gcs.sensor.get_gcs_keys
Return a list of updated keys in a GCS bucket.
Parameters:
- bucket (str) – The name of the GCS bucket.
- prefix (Optional[str]) – The prefix to filter the keys by.
- since_key (Optional[str]) – The key to start from. If provided, only keys updated after this key will be returned.
- gcs_session (Optional[google.cloud.storage.client.Client]) – A GCS client session. If not provided, a new session will be created.
Returns: A list of keys in the bucket, sorted by update time, that are newer than the since_key.Return type: List[str] Example:
@resource
def google_cloud_storage_client(context):
return storage.Client().from_service_account_json("my-service-account.json")
@sensor(job=my_job, required_resource_keys=\{"google_cloud_storage_client"})
def my_gcs_sensor(context):
since_key = context.cursor or None
new_gcs_keys = get_gcs_keys(
"my-bucket",
prefix="data",
since_key=since_key,
gcs_session=context.resources.google_cloud_storage_client
)
if not new_gcs_keys:
return SkipReason("No new gcs files found for bucket 'my-bucket'.")
for gcs_key in new_gcs_keys:
yield RunRequest(run_key=gcs_key, run_config=\{
"ops": \{
"gcs_files": \{
"config": \{
"gcs_key": gcs_key
}
}
}
})
last_key = new_gcs_keys[-1]
context.update_cursor(last_key)
File Manager (Experimental)
- class dagster_gcp.GCSFileHandle
A reference to a file on GCS.
- dagster_gcp.GCSFileManagerResource ResourceDefinition
FileManager that provides abstract access to GCS.
GCS Compute Log Manager
- class dagster_gcp.gcs.GCSComputeLogManager
Logs op compute function stdout and stderr to GCS.
Users should not instantiate this class directly. Instead, use a YAML block in
dagster.yaml
such as the following:compute_logs:
module: dagster_gcp.gcs.compute_log_manager
class: GCSComputeLogManager
config:
bucket: "mycorp-dagster-compute-logs"
local_dir: "/tmp/cool"
prefix: "dagster-test-"
upload_interval: 30There are more configuration examples in the instance documentation guide: https://docs.dagster.io/deployment/dagster-instance#compute-log-storage
Parameters:
- bucket (str) – The name of the GCS bucket to which to log.
- local_dir (Optional[str]) – Path to the local directory in which to stage logs. Default:
- prefix (Optional[str]) – Prefix for the log file keys.
- json_credentials_envvar (Optional[str]) – Environment variable that contains the JSON with a private key
- upload_interval – (Optional[int]): Interval in seconds to upload partial log files to GCS. By default, will only upload when the capture is complete.
- show_url_only – (Optional[bool]): Only show the URL of the log file in the UI, instead of fetching and displaying the full content. Default False.
- inst_data (Optional[ConfigurableClassData]) – Serializable representation of the compute
Dataproc
Dataproc Resource
- dagster_gcp.DataprocResource ResourceDefinition
Resource for connecting to a Dataproc cluster.
Example:
@asset
def my_asset(dataproc: DataprocResource):
with dataproc.get_client() as client:
# client is a dagster_gcp.DataprocClient
...
Legacy
- dagster_gcp.ConfigurablePickledObjectGCSIOManager IOManagerDefinition
- deprecated
This API will be removed in version 2.0. Please use GCSPickleIOManager instead..
Renamed to GCSPickleIOManager. See GCSPickleIOManager for documentation.
- dagster_gcp.build_bigquery_io_manager IOManagerDefinition
- experimental
This API may break in future versions, even between dot releases.
Builds an I/O manager definition that reads inputs from and writes outputs to BigQuery.
Parameters:
- type_handlers (Sequence[DbTypeHandler]) – Each handler defines how to translate between
- default_load_type (Type) – When an input has no type annotation, load it as this type.
Returns: IOManagerDefinition Examples:
from dagster_gcp import build_bigquery_io_manager
from dagster_bigquery_pandas import BigQueryPandasTypeHandler
from dagster import Definitions
@asset(
key_prefix=["my_prefix"],
metadata=\{"schema": "my_dataset"} # will be used as the dataset in BigQuery
)
def my_table() -> pd.DataFrame: # the name of the asset will be the table name
...
@asset(
key_prefix=["my_dataset"] # my_dataset will be used as the dataset in BigQuery
)
def my_second_table() -> pd.DataFrame: # the name of the asset will be the table name
...
bigquery_io_manager = build_bigquery_io_manager([BigQueryPandasTypeHandler()])
defs = Definitions(
assets=[my_table, my_second_table],
resources=\{
"io_manager": bigquery_io_manager.configured(\{
"project" : \{"env": "GCP_PROJECT"}
})
}
)You can set a default dataset to store the assets using the
dataset
configuration value of the BigQuery I/O Manager. This dataset will be used if no other dataset is specified directly on an asset or op.defs = Definitions(
assets=[my_table],
resources=\{
"io_manager": bigquery_io_manager.configured(\{
"project" : \{"env": "GCP_PROJECT"}
"dataset": "my_dataset"
})
}
)On individual assets, you an also specify the dataset where they should be stored using metadata or by adding a
key_prefix
to the asset key. If bothkey_prefix
and metadata are defined, the metadata will take precedence.@asset(
key_prefix=["my_dataset"] # will be used as the dataset in BigQuery
)
def my_table() -> pd.DataFrame:
...
@asset(
# note that the key needs to be "schema"
metadata=\{"schema": "my_dataset"} # will be used as the dataset in BigQuery
)
def my_other_table() -> pd.DataFrame:
...For ops, the dataset can be specified by including a “schema” entry in output metadata.
@op(
out=\{"my_table": Out(metadata=\{"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
...If none of these is provided, the dataset will default to “public”.
To only use specific columns of a table as input to a downstream op or asset, add the metadata
columns
to theIn
orAssetIn
.@asset(
ins=\{"my_table": AssetIn("my_table", metadata=\{"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame:
# my_table will just contain the data from column "a"
...If you cannot upload a file to your Dagster deployment, or otherwise cannot authenticate with GCP via a standard method, you can provide a service account key as the
gcp_credentials
configuration. Dagster willstore this key in a temporary file and setGOOGLE_APPLICATION_CREDENTIALS
to point to the file. After the run completes, the file will be deleted, andGOOGLE_APPLICATION_CREDENTIALS
will be unset. The key must be base64 encoded to avoid issues with newlines in the keys. You can retrieve the base64 encoded with this shell command:cat $GOOGLE_APPLICATION_CREDENTIALS | base64
- dagster_gcp.gcs_pickle_io_manager IOManagerDefinition
Persistent IO manager using GCS for storage.
Serializes objects via pickling. Suitable for objects storage for distributed executors, so long as each execution node has network connectivity and credentials for GCS and the backing bucket.
Assigns each op output to a unique filepath containing run ID, step key, and output name. Assigns each asset to a single filesystem path, at
\<base_dir>/\<asset_key>
. If the asset key has multiple components, the final component is used as the name of the file, and the preceding components as parent directories under the base_dir.Subsequent materializations of an asset will overwrite previous materializations of that asset. With a base directory of
/my/base/path
, an asset with keyAssetKey(["one", "two", "three"])
would be stored in a file calledthree
in a directory with path/my/base/path/one/two/
.Example usage:
- Attach this IO manager to a set of assets.
from dagster import Definitions, asset
from dagster_gcp.gcs import gcs_pickle_io_manager, gcs_resource
@asset
def asset1():
# create df ...
return df
@asset
def asset2(asset1):
return asset1[:5]
defs = Definitions(
assets=[asset1, asset2],
resources=\{
"io_manager": gcs_pickle_io_manager.configured(
\{"gcs_bucket": "my-cool-bucket", "gcs_prefix": "my-cool-prefix"}
),
"gcs": gcs_resource.configured(\{"project": "my-cool-project"}),
},
) - Attach this IO manager to your job to make it available to your ops.
from dagster import job
from dagster_gcp.gcs import gcs_pickle_io_manager, gcs_resource
@job(
resource_defs=\{
"io_manager": gcs_pickle_io_manager.configured(
\{"gcs_bucket": "my-cool-bucket", "gcs_prefix": "my-cool-prefix"}
),
"gcs": gcs_resource.configured(\{"project": "my-cool-project"}),
},
)
def my_job():
...
- Attach this IO manager to a set of assets.
- dagster_gcp.gcs_file_manager ResourceDefinition
FileManager that provides abstract access to GCS.
Implements the
FileManager
API.