Skip to main content

AWS (dagster-aws)

Utilities for interfacing with AWS with Dagster.

S3

dagster_aws.s3.S3Resource ResourceDefinition

Resource that gives access to S3.

The underlying S3 session is created by calling boto3.session.Session(profile_name). The returned resource object is an S3 client, an instance of botocore.client.S3.

Example:

from dagster import job, op, Definitions
from dagster_aws.s3 import S3Resource

@op
def example_s3_op(s3: S3Resource):
return s3.get_client().list_objects_v2(
Bucket='my-bucket',
Prefix='some-key'
)

@job
def example_job():
example_s3_op()

defs = Definitions(
jobs=[example_job],
resources=\{'s3': S3Resource(region_name='us-west-1')}
)
dagster_aws.s3.S3PickleIOManager IOManagerDefinition

Persistent IO manager using S3 for storage.

Serializes objects via pickling. Suitable for objects storage for distributed executors, so long as each execution node has network connectivity and credentials for S3 and the backing bucket.

Assigns each op output to a unique filepath containing run ID, step key, and output name. Assigns each asset to a single filesystem path, at “<base_dir>/<asset_key>”. If the asset key has multiple components, the final component is used as the name of the file, and the preceding components as parent directories under the base_dir.

Subsequent materializations of an asset will overwrite previous materializations of that asset. With a base directory of “/my/base/path”, an asset with key AssetKey([“one”, “two”, “three”]) would be stored in a file called “three” in a directory with path “/my/base/path/one/two/”.

Example usage:

from dagster import asset, Definitions
from dagster_aws.s3 import S3PickleIOManager, S3Resource


@asset
def asset1():
# create df ...
return df

@asset
def asset2(asset1):
return asset1[:5]

defs = Definitions(
assets=[asset1, asset2],
resources=\{
"io_manager": S3PickleIOManager(
s3_resource=S3Resource(),
s3_bucket="my-cool-bucket",
s3_prefix="my-cool-prefix",
)
}
)
class dagster_aws.s3.S3ComputeLogManager

Logs compute function stdout and stderr to S3.

Users should not instantiate this class directly. Instead, use a YAML block in dagster.yaml such as the following:

compute_logs:
module: dagster_aws.s3.compute_log_manager
class: S3ComputeLogManager
config:
bucket: "mycorp-dagster-compute-logs"
local_dir: "/tmp/cool"
prefix: "dagster-test-"
use_ssl: true
verify: true
verify_cert_path: "/path/to/cert/bundle.pem"
endpoint_url: "http://alternate-s3-host.io"
skip_empty_files: true
upload_interval: 30
upload_extra_args:
ServerSideEncryption: "AES256"
show_url_only: false
region: "us-west-1"

Parameters:

  • bucket (str) – The name of the s3 bucket to which to log.
  • local_dir (Optional[str]) – Path to the local directory in which to stage logs. Default:
  • prefix (Optional[str]) – Prefix for the log file keys.
  • use_ssl (Optional[bool]) – Whether or not to use SSL. Default True.
  • verify (Optional[bool]) – Whether or not to verify SSL certificates. Default True.
  • verify_cert_path (Optional[str]) – A filename of the CA cert bundle to use. Only used if
  • endpoint_url (Optional[str]) – Override for the S3 endpoint url.
  • skip_empty_files – (Optional[bool]): Skip upload of empty log files.
  • upload_interval – (Optional[int]): Interval in seconds to upload partial log files to S3. By default, will only upload when the capture is complete.
  • upload_extra_args – (Optional[dict]): Extra args for S3 file upload
  • show_url_only – (Optional[bool]): Only show the URL of the log file in the UI, instead of fetching and displaying the full content. Default False.
  • region – (Optional[str]): The region of the S3 bucket. If not specified, will use the default region of the AWS session.
  • inst_data (Optional[ConfigurableClassData]) – Serializable representation of the compute
dagster_aws.s3.S3Coordinate DagsterType

A dagster.DagsterType intended to make it easier to pass information about files on S3 from op to op. Objects of this type should be dicts with 'bucket' and 'key' keys, and may be hydrated from config in the intuitive way, e.g., for an input with the name s3_file:

inputs:
s3_file:
value:
bucket: my-bucket
key: my-key

File Manager (Experimental)

class dagster_aws.s3.S3FileHandle

A reference to a file on S3.

dagster_aws.s3.S3FileManagerResource ResourceDefinition

Base class for Dagster resources that utilize structured config.

This class is a subclass of both ResourceDefinition and Config.

Example definition:

class WriterResource(ConfigurableResource):
prefix: str

def output(self, text: str) -> None:
print(f"\{self.prefix}\{text}")

Example usage:

@asset
def asset_that_uses_writer(writer: WriterResource):
writer.output("text")

defs = Definitions(
assets=[asset_that_uses_writer],
resources=\{"writer": WriterResource(prefix="a_prefix")},
)

You can optionally use this class to model configuration only and vend an object of a different type for use at runtime. This is useful for those who wish to have a separate object that manages configuration and a separate object at runtime. Or where you want to directly use a third-party class that you do not control.

To do this you override the create_resource methods to return a different object.

class WriterResource(ConfigurableResource):
str: prefix

def create_resource(self, context: InitResourceContext) -> Writer:
# Writer is pre-existing class defined else
return Writer(self.prefix)

Example usage:

@asset
def use_preexisting_writer_as_resource(writer: ResourceParam[Writer]):
writer.output("text")

defs = Definitions(
assets=[use_preexisting_writer_as_resource],
resources=\{"writer": WriterResource(prefix="a_prefix")},
)

ECS

dagster_aws.ecs.EcsRunLauncher RunLauncher

RunLauncher that starts a task in ECS for each Dagster job run.

dagster_aws.ecs.ecs_executor ExecutorDefinition
experimental

This API may break in future versions, even between dot releases.

Executor which launches steps as ECS tasks.

To use the ecs_executor, set it as the executor_def when defining a job:

from dagster_aws.ecs import ecs_executor

from dagster import job, op


@op(
tags=\{"ecs/cpu": "256", "ecs/memory": "512"},
)
def ecs_op():
pass


@job(executor_def=ecs_executor)
def ecs_job():
ecs_op()


Then you can configure the executor with run config as follows:

execution:
config:
cpu: 1024
memory: 2048
ephemeral_storage: 10
task_overrides:
containerOverrides:
- name: run
environment:
- name: MY_ENV_VAR
value: "my_value"

max_concurrent limits the number of ECS tasks that will execute concurrently for one run. By default there is no limit- it will maximally parallel as allowed by the DAG. Note that this is not a global limit.

Configuration set on the ECS tasks created by the ECSRunLauncher will also be set on the tasks created by the ecs_executor.

Configuration set using tags on a @job will only apply to the run level. For configuration to apply at each step it must be set using tags for each @op.

Redshift

dagster_aws.redshift.RedshiftClientResource ResourceDefinition

This resource enables connecting to a Redshift cluster and issuing queries against that cluster.

Example:

from dagster import Definitions, asset, EnvVar
from dagster_aws.redshift import RedshiftClientResource

@asset
def example_redshift_asset(context, redshift: RedshiftClientResource):
redshift.get_client().execute_query('SELECT 1', fetch_results=True)

redshift_configured = RedshiftClientResource(
host='my-redshift-cluster.us-east-1.redshift.amazonaws.com',
port=5439,
user='dagster',
password=EnvVar("DAGSTER_REDSHIFT_PASSWORD"),
database='dev',
)

defs = Definitions(
assets=[example_redshift_asset],
resources=\{'redshift': redshift_configured},
)

Testing

dagster_aws.redshift.FakeRedshiftClientResource ResourceDefinition

This resource enables connecting to a Redshift cluster and issuing queries against that cluster.

Example:

from dagster import Definitions, asset, EnvVar
from dagster_aws.redshift import RedshiftClientResource

@asset
def example_redshift_asset(context, redshift: RedshiftClientResource):
redshift.get_client().execute_query('SELECT 1', fetch_results=True)

redshift_configured = RedshiftClientResource(
host='my-redshift-cluster.us-east-1.redshift.amazonaws.com',
port=5439,
user='dagster',
password=EnvVar("DAGSTER_REDSHIFT_PASSWORD"),
database='dev',
)

defs = Definitions(
assets=[example_redshift_asset],
resources=\{'redshift': redshift_configured},
)

EMR

dagster_aws.emr.emr_pyspark_step_launcher ResourceDefinition
superseded

This API has been superseded and its usage is discouraged. Consider using Dagster Pipes instead. Learn more here: https://docs.dagster.io/concepts/dagster-pipes.

  • spark_config:
  • cluster_id: Name of the job flow (cluster) on which to execute.
  • region_name: The AWS region that the cluster is in.
  • action_on_failure: The EMR action to take when the cluster step fails: https://docs.aws.amazon.com/emr/latest/APIReference/API_StepConfig.html
  • staging_bucket: S3 bucket to use for passing files between the plan process and EMR process.
  • staging_prefix: S3 key prefix inside the staging_bucket to use for files passed the plan process and EMR process
  • wait_for_logs: If set, the system will wait for EMR logs to appear on S3. Note that logs are copied every 5 minutes, so enabling this will add several minutes to the job runtime.
  • local_job_package_path: Absolute path to the package that contains the job definition(s) whose steps will execute remotely on EMR. This is a path on the local fileystem of the process executing the job. The expectation is that this package will also be available on the python path of the launched process running the Spark step on EMR, either deployed on step launch via the deploy_local_job_package option, referenced on s3 via the s3_job_package_path option, or installed on the cluster via bootstrap actions.
  • local_pipeline_package_path: (legacy) Absolute path to the package that contains the pipeline definition(s) whose steps will execute remotely on EMR. This is a path on the local fileystem of the process executing the pipeline. The expectation is that this package will also be available on the python path of the launched process running the Spark step on EMR, either deployed on step launch via the deploy_local_pipeline_package option, referenced on s3 via the s3_pipeline_package_path option, or installed on the cluster via bootstrap actions.
  • deploy_local_job_package: If set, before every step run, the launcher will zip up all the code in local_job_package_path, upload it to s3, and pass it to spark-submit’s –py-files option. This gives the remote process access to up-to-date user code. If not set, the assumption is that some other mechanism is used for distributing code to the EMR cluster. If this option is set to True, s3_job_package_path should not also be set.
  • deploy_local_pipeline_package: (legacy) If set, before every step run, the launcher will zip up all the code in local_job_package_path, upload it to s3, and pass it to spark-submit’s –py-files option. This gives the remote process access to up-to-date user code. If not set, the assumption is that some other mechanism is used for distributing code to the EMR cluster. If this option is set to True, s3_job_package_path should not also be set.
  • s3_job_package_path: If set, this path will be passed to the –py-files option of spark-submit. This should usually be a path to a zip file. If this option is set, deploy_local_job_package should not be set to True.
  • s3_pipeline_package_path: If set, this path will be passed to the –py-files option of spark-submit. This should usually be a path to a zip file. If this option is set, deploy_local_pipeline_package should not be set to True.
class dagster_aws.emr.EmrJobRunner
class dagster_aws.emr.EmrError
dagster_aws.emr.EmrClusterState = <enum 'EmrClusterState'>

Cluster state for EMR.

dagster_aws.emr.EmrStepState = <enum 'EmrStepState'>

Step state for EMR.

CloudWatch

dagster_aws.cloudwatch.cloudwatch_logger LoggerDefinition

Core class for defining loggers.

Loggers are job-scoped logging handlers, which will be automatically invoked whenever dagster messages are logged from within a job.

Parameters:

  • logger_fn (Callable[[InitLoggerContext], logging.Logger]) – User-provided function to
  • config_schema (Optional[ConfigSchema]) – The schema for the config. Configuration data available in
  • description (Optional[str]) – A human-readable description of this logger.

SecretsManager

Resources which surface SecretsManager secrets for use in Dagster resources and jobs.

dagster_aws.secretsmanager.SecretsManagerResource ResourceDefinition

Resource that gives access to AWS SecretsManager.

The underlying SecretsManager session is created by calling boto3.session.Session(profile_name). The returned resource object is a SecretsManager client, an instance of botocore.client.SecretsManager.

Example:

from dagster import build_op_context, job, op
from dagster_aws.secretsmanager import SecretsManagerResource

@op
def example_secretsmanager_op(secretsmanager: SecretsManagerResource):
return secretsmanager.get_client().get_secret_value(
SecretId='arn:aws:secretsmanager:region:aws_account_id:secret:appauthexample-AbCdEf'
)

@job
def example_job():
example_secretsmanager_op()

defs = Definitions(
jobs=[example_job],
resources=\{
'secretsmanager': SecretsManagerResource(
region_name='us-west-1'
)
}
)
dagster_aws.secretsmanager.SecretsManagerSecretsResource ResourceDefinition

Resource that provides a dict which maps selected SecretsManager secrets to their string values. Also optionally sets chosen secrets as environment variables.

Example:

import os
from dagster import build_op_context, job, op, ResourceParam
from dagster_aws.secretsmanager import SecretsManagerSecretsResource

@op
def example_secretsmanager_secrets_op(secrets: SecretsManagerSecretsResource):
return secrets.fetch_secrets().get("my-secret-name")

@op
def example_secretsmanager_secrets_op_2(secrets: SecretsManagerSecretsResource):
with secrets.secrets_in_environment():
return os.getenv("my-other-secret-name")

@job
def example_job():
example_secretsmanager_secrets_op()
example_secretsmanager_secrets_op_2()

defs = Definitions(
jobs=[example_job],
resources=\{
'secrets': SecretsManagerSecretsResource(
region_name='us-west-1',
secrets_tag="dagster",
add_to_environment=True,
)
}
)

Note that your ops must also declare that they require this resource with or it will not be initialized for the execution of their compute functions.

Pipes

Context Injectors

class dagster_aws.pipes.PipesS3ContextInjector

A context injector that injects context by writing to a temporary S3 location.

Parameters:

  • bucket (str) – The S3 bucket to write to.
  • client (boto3.client) – A boto3 client to use to write to S3.
  • key_prefix (Optional[str]) – An optional prefix to use for the S3 key. Defaults to a random
class dagster_aws.pipes.PipesLambdaEventContextInjector

Injects context via AWS Lambda event input. Should be paired with :py:class~dagster_pipes.PipesMappingParamsLoader on the Lambda side.

Message Readers

class dagster_aws.pipes.PipesS3MessageReader

Message reader that reads messages by periodically reading message chunks from a specified S3 bucket.

If log_readers is passed, this reader will also start the passed readers when the first message is received from the external process.

Parameters:

  • interval (float) – interval in seconds between attempts to download a chunk
  • bucket (str) – The S3 bucket to read from.
  • client (WorkspaceClient) – A boto3 client.
  • log_readers (Optional[Sequence[PipesLogReader]]) – A set of log readers for logs on S3.
  • include_stdio_in_messages (bool) – Whether to send stdout/stderr to Dagster via Pipes messages. Defaults to False.
class dagster_aws.pipes.PipesCloudWatchMessageReader
experimental

This API may break in future versions, even between dot releases.

Message reader that consumes AWS CloudWatch logs to read pipes messages.

Clients

class dagster_aws.pipes.PipesLambdaClient

A pipes client for invoking AWS lambda.

By default context is injected via the lambda input event and messages are parsed out of the 4k tail of logs.

Parameters:

  • client (boto3.client) – The boto lambda client used to call invoke.
  • context_injector (Optional[PipesContextInjector]) – A context injector to use to inject
  • message_reader (Optional[PipesMessageReader]) – A message reader to use to read messages
run

Synchronously invoke a lambda function, enriched with the pipes protocol.

Parameters:

  • function_name (str) – The name of the function to use.
  • event (Mapping[str, Any]) – A JSON serializable object to pass as input to the lambda.
  • context (Union[OpExecutionContext, AssetExecutionContext]) – The context of the currently executing Dagster op or asset.

Returns: Wrapper containing results reported by the external process.Return type: PipesClientCompletedInvocation

class dagster_aws.pipes.PipesGlueClient
experimental

This API may break in future versions, even between dot releases.

A pipes client for invoking AWS Glue jobs.

Parameters:

  • context_injector (Optional[PipesContextInjector]) – A context injector to use to inject
  • message_reader (Optional[PipesMessageReader]) – A message reader to use to read messages
  • client (Optional[boto3.client]) – The boto Glue client used to launch the Glue job
  • forward_termination (bool) – Whether to cancel the Glue job run when the Dagster process receives a termination signal.
run

Start a Glue job, enriched with the pipes protocol.

See also: AWS API Documentation

Parameters:

  • context (Union[OpExecutionContext, AssetExecutionContext]) – The context of the currently executing Dagster op or asset.
  • start_job_run_params (Dict) – Parameters for the start_job_run boto3 Glue client call.
  • extras (Optional[Dict[str, Any]]) – Additional Dagster metadata to pass to the Glue job.

Returns: Wrapper containing results reported by the external process.Return type: PipesClientCompletedInvocation

class dagster_aws.pipes.PipesECSClient
experimental

This API may break in future versions, even between dot releases.

A pipes client for running AWS ECS tasks.

Parameters:

  • client (Any) – The boto ECS client used to launch the ECS task
  • context_injector (Optional[PipesContextInjector]) – A context injector to use to inject
  • message_reader (Optional[PipesMessageReader]) – A message reader to use to read messages
  • forward_termination (bool) – Whether to cancel the ECS task when the Dagster process receives a termination signal.
run

Run ECS tasks, enriched with the pipes protocol.

Parameters:

  • context (Union[OpExecutionContext, AssetExecutionContext]) – The context of the currently executing Dagster op or asset.
  • run_task_params (dict) – Parameters for the run_task boto3 ECS client call.
  • extras (Optional[Dict[str, Any]]) – Additional information to pass to the Pipes session in the external process.
  • pipes_container_name (Optional[str]) – If running more than one container in the task,
  • waiter_config (Optional[WaiterConfig]) – Optional waiter configuration to use. Defaults to 70 days (Delay: 6, MaxAttempts: 1000000).

Returns: Wrapper containing results reported by the external process.Return type: PipesClientCompletedInvocation

class dagster_aws.pipes.PipesEMRClient
experimental

This API may break in future versions, even between dot releases.

A pipes client for running jobs on AWS EMR.

Parameters:

  • message_reader (Optional[PipesMessageReader]) – A message reader to use to read messages
  • client (Optional[boto3.client]) – The boto3 EMR client used to interact with AWS EMR.
  • context_injector (Optional[PipesContextInjector]) – A context injector to use to inject
  • forward_termination (bool) – Whether to cancel the EMR job if the Dagster process receives a termination signal.
  • wait_for_s3_logs_seconds (int) – The number of seconds to wait for S3 logs to be written after execution completes.
run

Run a job on AWS EMR, enriched with the pipes protocol.

Starts a new EMR cluster for each invocation.

Parameters:

  • context (Union[OpExecutionContext, AssetExecutionContext]) – The context of the currently executing Dagster op or asset.
  • run_job_flow_params (Optional[dict]) – Parameters for the run_job_flow boto3 EMR client call.
  • extras (Optional[Dict[str, Any]]) – Additional information to pass to the Pipes session in the external process.

Returns: Wrapper containing results reported by the external process.Return type: PipesClientCompletedInvocation

class dagster_aws.pipes.PipesEMRServerlessClient
experimental

This API may break in future versions, even between dot releases.

A pipes client for running workloads on AWS EMR Serverless.

Parameters:

  • client (Optional[boto3.client]) – The boto3 AWS EMR Serverless client used to interact with AWS EMR Serverless.
  • context_injector (Optional[PipesContextInjector]) – A context injector to use to inject
  • message_reader (Optional[PipesMessageReader]) – A message reader to use to read messages
  • forward_termination (bool) – Whether to cancel the AWS EMR Serverless workload if the Dagster process receives a termination signal.
  • poll_interval (float) – The interval in seconds to poll the AWS EMR Serverless workload for status updates. Defaults to 5 seconds.
run

Run a workload on AWS EMR Serverless, enriched with the pipes protocol.

Parameters:

  • context (Union[OpExecutionContext, AssetExecutionContext]) – The context of the currently executing Dagster op or asset.
  • params (dict) – Parameters for the start_job_run boto3 AWS EMR Serverless client call.
  • extras (Optional[Dict[str, Any]]) – Additional information to pass to the Pipes session in the external process.

Returns: Wrapper containing results reported by the external process.Return type: PipesClientCompletedInvocation

Legacy

dagster_aws.s3.ConfigurablePickledObjectS3IOManager IOManagerDefinition
deprecated

This API will be removed in version 2.0. Please use S3PickleIOManager instead..

Renamed to S3PickleIOManager. See S3PickleIOManager for documentation.

dagster_aws.s3.s3_resource ResourceDefinition

Resource that gives access to S3.

The underlying S3 session is created by calling boto3.session.Session(profile_name). The returned resource object is an S3 client, an instance of botocore.client.S3.

Example:

from dagster import build_op_context, job, op
from dagster_aws.s3 import s3_resource

@op(required_resource_keys=\{'s3'})
def example_s3_op(context):
return context.resources.s3.list_objects_v2(
Bucket='my-bucket',
Prefix='some-key'
)

@job(resource_defs=\{'s3': s3_resource})
def example_job():
example_s3_op()

example_job.execute_in_process(
run_config=\{
'resources': \{
's3': \{
'config': \{
'region_name': 'us-west-1',
}
}
}
}
)

Note that your ops must also declare that they require this resource with required_resource_keys, or it will not be initialized for the execution of their compute functions.

You may configure this resource as follows:

resources:
s3:
config:
region_name: "us-west-1"
# Optional[str]: Specifies a custom region for the S3 session. Default is chosen
# through the ordinary boto credential chain.
use_unsigned_session: false
# Optional[bool]: Specifies whether to use an unsigned S3 session. Default: True
endpoint_url: "http://localhost"
# Optional[str]: Specifies a custom endpoint for the S3 session. Default is None.
profile_name: "dev"
# Optional[str]: Specifies a custom profile for S3 session. Default is default
# profile as specified in ~/.aws/credentials file
use_ssl: true
# Optional[bool]: Whether or not to use SSL. By default, SSL is used.
verify: None
# Optional[str]: Whether or not to verify SSL certificates. By default SSL certificates are verified.
# You can also specify this argument if you want to use a different CA cert bundle than the one used by botocore."
aws_access_key_id: None
# Optional[str]: The access key to use when creating the client.
aws_secret_access_key: None
# Optional[str]: The secret key to use when creating the client.
aws_session_token: None
# Optional[str]: The session token to use when creating the client.
dagster_aws.s3.s3_pickle_io_manager IOManagerDefinition

Persistent IO manager using S3 for storage.

Serializes objects via pickling. Suitable for objects storage for distributed executors, so long as each execution node has network connectivity and credentials for S3 and the backing bucket.

Assigns each op output to a unique filepath containing run ID, step key, and output name. Assigns each asset to a single filesystem path, at “<base_dir>/<asset_key>”. If the asset key has multiple components, the final component is used as the name of the file, and the preceding components as parent directories under the base_dir.

Subsequent materializations of an asset will overwrite previous materializations of that asset. With a base directory of “/my/base/path”, an asset with key AssetKey([“one”, “two”, “three”]) would be stored in a file called “three” in a directory with path “/my/base/path/one/two/”.

Example usage:

  1. Attach this IO manager to a set of assets.
    from dagster import Definitions, asset
    from dagster_aws.s3 import s3_pickle_io_manager, s3_resource


    @asset
    def asset1():
    # create df ...
    return df

    @asset
    def asset2(asset1):
    return asset1[:5]

    defs = Definitions(
    assets=[asset1, asset2],
    resources=\{
    "io_manager": s3_pickle_io_manager.configured(
    \{"s3_bucket": "my-cool-bucket", "s3_prefix": "my-cool-prefix"}
    ),
    "s3": s3_resource,
    },
    )
  2. Attach this IO manager to your job to make it available to your ops.
    from dagster import job
    from dagster_aws.s3 import s3_pickle_io_manager, s3_resource

    @job(
    resource_defs=\{
    "io_manager": s3_pickle_io_manager.configured(
    \{"s3_bucket": "my-cool-bucket", "s3_prefix": "my-cool-prefix"}
    ),
    "s3": s3_resource,
    },
    )
    def my_job():
    ...
dagster_aws.s3.s3_file_manager ResourceDefinition

FileManager that provides abstract access to S3.

Implements the FileManager API.

dagster_aws.redshift.redshift_resource ResourceDefinition

This resource enables connecting to a Redshift cluster and issuing queries against that cluster.

Example:

from dagster import build_op_context, op
from dagster_aws.redshift import redshift_resource

@op(required_resource_keys=\{'redshift'})
def example_redshift_op(context):
return context.resources.redshift.execute_query('SELECT 1', fetch_results=True)

redshift_configured = redshift_resource.configured(\{
'host': 'my-redshift-cluster.us-east-1.redshift.amazonaws.com',
'port': 5439,
'user': 'dagster',
'password': 'dagster',
'database': 'dev',
})
context = build_op_context(resources=\{'redshift': redshift_configured})
assert example_redshift_op(context) == [(1,)]
dagster_aws.redshift.fake_redshift_resource ResourceDefinition
dagster_aws.secretsmanager.secretsmanager_resource ResourceDefinition

Resource that gives access to AWS SecretsManager.

The underlying SecretsManager session is created by calling boto3.session.Session(profile_name). The returned resource object is a SecretsManager client, an instance of botocore.client.SecretsManager.

Example:

from dagster import build_op_context, job, op
from dagster_aws.secretsmanager import secretsmanager_resource

@op(required_resource_keys=\{'secretsmanager'})
def example_secretsmanager_op(context):
return context.resources.secretsmanager.get_secret_value(
SecretId='arn:aws:secretsmanager:region:aws_account_id:secret:appauthexample-AbCdEf'
)

@job(resource_defs=\{'secretsmanager': secretsmanager_resource})
def example_job():
example_secretsmanager_op()

example_job.execute_in_process(
run_config=\{
'resources': \{
'secretsmanager': \{
'config': \{
'region_name': 'us-west-1',
}
}
}
}
)

You may configure this resource as follows:

resources:
secretsmanager:
config:
region_name: "us-west-1"
# Optional[str]: Specifies a custom region for the SecretsManager session. Default is chosen
# through the ordinary boto credential chain.
profile_name: "dev"
# Optional[str]: Specifies a custom profile for SecretsManager session. Default is default
# profile as specified in ~/.aws/credentials file
dagster_aws.secretsmanager.secretsmanager_secrets_resource ResourceDefinition

Resource that provides a dict which maps selected SecretsManager secrets to their string values. Also optionally sets chosen secrets as environment variables.

Example:

import os
from dagster import build_op_context, job, op
from dagster_aws.secretsmanager import secretsmanager_secrets_resource

@op(required_resource_keys=\{'secrets'})
def example_secretsmanager_secrets_op(context):
return context.resources.secrets.get("my-secret-name")

@op(required_resource_keys=\{'secrets'})
def example_secretsmanager_secrets_op_2(context):
return os.getenv("my-other-secret-name")

@job(resource_defs=\{'secrets': secretsmanager_secrets_resource})
def example_job():
example_secretsmanager_secrets_op()
example_secretsmanager_secrets_op_2()

example_job.execute_in_process(
run_config=\{
'resources': \{
'secrets': \{
'config': \{
'region_name': 'us-west-1',
'secrets_tag': 'dagster',
'add_to_environment': True,
}
}
}
}
)

Note that your ops must also declare that they require this resource with required_resource_keys, or it will not be initialized for the execution of their compute functions.

You may configure this resource as follows:

resources:
secretsmanager:
config:
region_name: "us-west-1"
# Optional[str]: Specifies a custom region for the SecretsManager session. Default is chosen
# through the ordinary boto credential chain.
profile_name: "dev"
# Optional[str]: Specifies a custom profile for SecretsManager session. Default is default
# profile as specified in ~/.aws/credentials file
secrets: ["arn:aws:secretsmanager:region:aws_account_id:secret:appauthexample-AbCdEf"]
# Optional[List[str]]: Specifies a list of secret ARNs to pull from SecretsManager.
secrets_tag: "dagster"
# Optional[str]: Specifies a tag, all secrets which have the tag set will be pulled
# from SecretsManager.
add_to_environment: true
# Optional[bool]: Whether to set the selected secrets as environment variables. Defaults
# to false.