AWS (dagster-aws)
Utilities for interfacing with AWS with Dagster.
S3
- dagster_aws.s3.S3Resource ResourceDefinition
Resource that gives access to S3.
The underlying S3 session is created by calling
boto3.session.Session(profile_name)
. The returned resource object is an S3 client, an instance of botocore.client.S3.Example:
from dagster import job, op, Definitions
from dagster_aws.s3 import S3Resource
@op
def example_s3_op(s3: S3Resource):
return s3.get_client().list_objects_v2(
Bucket='my-bucket',
Prefix='some-key'
)
@job
def example_job():
example_s3_op()
defs = Definitions(
jobs=[example_job],
resources=\{'s3': S3Resource(region_name='us-west-1')}
)
- dagster_aws.s3.S3PickleIOManager IOManagerDefinition
Persistent IO manager using S3 for storage.
Serializes objects via pickling. Suitable for objects storage for distributed executors, so long as each execution node has network connectivity and credentials for S3 and the backing bucket.
Assigns each op output to a unique filepath containing run ID, step key, and output name. Assigns each asset to a single filesystem path, at “<base_dir>/<asset_key>”. If the asset key has multiple components, the final component is used as the name of the file, and the preceding components as parent directories under the base_dir.
Subsequent materializations of an asset will overwrite previous materializations of that asset. With a base directory of “/my/base/path”, an asset with key AssetKey([“one”, “two”, “three”]) would be stored in a file called “three” in a directory with path “/my/base/path/one/two/”.
Example usage:
from dagster import asset, Definitions
from dagster_aws.s3 import S3PickleIOManager, S3Resource
@asset
def asset1():
# create df ...
return df
@asset
def asset2(asset1):
return asset1[:5]
defs = Definitions(
assets=[asset1, asset2],
resources=\{
"io_manager": S3PickleIOManager(
s3_resource=S3Resource(),
s3_bucket="my-cool-bucket",
s3_prefix="my-cool-prefix",
)
}
)
- class dagster_aws.s3.S3ComputeLogManager
Logs compute function stdout and stderr to S3.
Users should not instantiate this class directly. Instead, use a YAML block in
dagster.yaml
such as the following:compute_logs:
module: dagster_aws.s3.compute_log_manager
class: S3ComputeLogManager
config:
bucket: "mycorp-dagster-compute-logs"
local_dir: "/tmp/cool"
prefix: "dagster-test-"
use_ssl: true
verify: true
verify_cert_path: "/path/to/cert/bundle.pem"
endpoint_url: "http://alternate-s3-host.io"
skip_empty_files: true
upload_interval: 30
upload_extra_args:
ServerSideEncryption: "AES256"
show_url_only: false
region: "us-west-1"Parameters:
- bucket (str) – The name of the s3 bucket to which to log.
- local_dir (Optional[str]) – Path to the local directory in which to stage logs. Default:
- prefix (Optional[str]) – Prefix for the log file keys.
- use_ssl (Optional[bool]) – Whether or not to use SSL. Default True.
- verify (Optional[bool]) – Whether or not to verify SSL certificates. Default True.
- verify_cert_path (Optional[str]) – A filename of the CA cert bundle to use. Only used if
- endpoint_url (Optional[str]) – Override for the S3 endpoint url.
- skip_empty_files – (Optional[bool]): Skip upload of empty log files.
- upload_interval – (Optional[int]): Interval in seconds to upload partial log files to S3. By default, will only upload when the capture is complete.
- upload_extra_args – (Optional[dict]): Extra args for S3 file upload
- show_url_only – (Optional[bool]): Only show the URL of the log file in the UI, instead of fetching and displaying the full content. Default False.
- region – (Optional[str]): The region of the S3 bucket. If not specified, will use the default region of the AWS session.
- inst_data (Optional[ConfigurableClassDataConfigurableClassData]) – Serializable representation of the compute
- dagster_aws.s3.S3Coordinate DagsterType
A dagster.DagsterType
dagster.DagsterType
intended to make it easier to pass information about files on S3 from op to op. Objects of this type should be dicts with'bucket'
and'key'
keys, and may be hydrated from config in the intuitive way, e.g., for an input with the names3_file
:inputs:
s3_file:
value:
bucket: my-bucket
key: my-key
File Manager (Experimental)
- class dagster_aws.s3.S3FileHandle
A reference to a file on S3.
- dagster_aws.s3.S3FileManagerResource ResourceDefinition
Base class for Dagster resources that utilize structured config.
This class is a subclass of both
ResourceDefinition
andConfig
.Example definition:
class WriterResource(ConfigurableResource):
prefix: str
def output(self, text: str) -> None:
print(f"\{self.prefix}\{text}")Example usage:
@asset
def asset_that_uses_writer(writer: WriterResource):
writer.output("text")
defs = Definitions(
assets=[asset_that_uses_writer],
resources=\{"writer": WriterResource(prefix="a_prefix")},
)You can optionally use this class to model configuration only and vend an object of a different type for use at runtime. This is useful for those who wish to have a separate object that manages configuration and a separate object at runtime. Or where you want to directly use a third-party class that you do not control.
To do this you override the create_resource methods to return a different object.
class WriterResource(ConfigurableResource):
str: prefix
def create_resource(self, context: InitResourceContext) -> Writer:
# Writer is pre-existing class defined else
return Writer(self.prefix)Example usage:
@asset
def use_preexisting_writer_as_resource(writer: ResourceParam[Writer]):
writer.output("text")
defs = Definitions(
assets=[use_preexisting_writer_as_resource],
resources=\{"writer": WriterResource(prefix="a_prefix")},
)
ECS
- dagster_aws.ecs.EcsRunLauncher RunLauncher
RunLauncher that starts a task in ECS for each Dagster job run.
Redshift
- dagster_aws.redshift.RedshiftClientResource ResourceDefinition
This resource enables connecting to a Redshift cluster and issuing queries against that cluster.
Example:
from dagster import Definitions, asset, EnvVar
from dagster_aws.redshift import RedshiftClientResource
@asset
def example_redshift_asset(context, redshift: RedshiftClientResource):
redshift.get_client().execute_query('SELECT 1', fetch_results=True)
redshift_configured = RedshiftClientResource(
host='my-redshift-cluster.us-east-1.redshift.amazonaws.com',
port=5439,
user='dagster',
password=EnvVar("DAGSTER_REDSHIFT_PASSWORD"),
database='dev',
)
defs = Definitions(
assets=[example_redshift_asset],
resources=\{'redshift': redshift_configured},
)
Testing
- dagster_aws.redshift.FakeRedshiftClientResource ResourceDefinition
This resource enables connecting to a Redshift cluster and issuing queries against that cluster.
Example:
from dagster import Definitions, asset, EnvVar
from dagster_aws.redshift import RedshiftClientResource
@asset
def example_redshift_asset(context, redshift: RedshiftClientResource):
redshift.get_client().execute_query('SELECT 1', fetch_results=True)
redshift_configured = RedshiftClientResource(
host='my-redshift-cluster.us-east-1.redshift.amazonaws.com',
port=5439,
user='dagster',
password=EnvVar("DAGSTER_REDSHIFT_PASSWORD"),
database='dev',
)
defs = Definitions(
assets=[example_redshift_asset],
resources=\{'redshift': redshift_configured},
)
EMR
- dagster_aws.emr.emr_pyspark_step_launcher ResourceDefinition
- superseded
This API has been superseded and it's usage is discouraged. Consider using Dagster Pipes instead. Learn more here: https://docs.dagster.io/concepts/dagster-pipes.
- spark_config:
- cluster_id: Name of the job flow (cluster) on which to execute.
- region_name: The AWS region that the cluster is in.
- action_on_failure: The EMR action to take when the cluster step fails: https://docs.aws.amazon.com/emr/latest/APIReference/API_StepConfig.html
- staging_bucket: S3 bucket to use for passing files between the plan process and EMR process.
- staging_prefix: S3 key prefix inside the staging_bucket to use for files passed the plan process and EMR process
- wait_for_logs: If set, the system will wait for EMR logs to appear on S3. Note that logs are copied every 5 minutes, so enabling this will add several minutes to the job runtime.
- local_job_package_path: Absolute path to the package that contains the job definition(s) whose steps will execute remotely on EMR. This is a path on the local fileystem of the process executing the job. The expectation is that this package will also be available on the python path of the launched process running the Spark step on EMR, either deployed on step launch via the deploy_local_job_package option, referenced on s3 via the s3_job_package_path option, or installed on the cluster via bootstrap actions.
- local_pipeline_package_path: (legacy) Absolute path to the package that contains the pipeline definition(s) whose steps will execute remotely on EMR. This is a path on the local fileystem of the process executing the pipeline. The expectation is that this package will also be available on the python path of the launched process running the Spark step on EMR, either deployed on step launch via the deploy_local_pipeline_package option, referenced on s3 via the s3_pipeline_package_path option, or installed on the cluster via bootstrap actions.
- deploy_local_job_package: If set, before every step run, the launcher will zip up all the code in local_job_package_path, upload it to s3, and pass it to spark-submit’s –py-files option. This gives the remote process access to up-to-date user code. If not set, the assumption is that some other mechanism is used for distributing code to the EMR cluster. If this option is set to True, s3_job_package_path should not also be set.
- deploy_local_pipeline_package: (legacy) If set, before every step run, the launcher will zip up all the code in local_job_package_path, upload it to s3, and pass it to spark-submit’s –py-files option. This gives the remote process access to up-to-date user code. If not set, the assumption is that some other mechanism is used for distributing code to the EMR cluster. If this option is set to True, s3_job_package_path should not also be set.
- s3_job_package_path: If set, this path will be passed to the –py-files option of spark-submit. This should usually be a path to a zip file. If this option is set, deploy_local_job_package should not be set to True.
- s3_pipeline_package_path: If set, this path will be passed to the –py-files option of spark-submit. This should usually be a path to a zip file. If this option is set, deploy_local_pipeline_package should not be set to True.
- dagster_aws.emr.EmrClusterState
=
<enum 'EmrClusterState'> Cluster state for EMR.
- dagster_aws.emr.EmrStepState
=
<enum 'EmrStepState'> Step state for EMR.
CloudWatch
- dagster_aws.cloudwatch.cloudwatch_logger LoggerDefinition
Core class for defining loggers.
Loggers are job-scoped logging handlers, which will be automatically invoked whenever dagster messages are logged from within a job.
Parameters:
- logger_fn (Callable[[InitLoggerContextInitLoggerContext], logging.Logger]) – User-provided function to
- config_schema (Optional[ConfigSchemaConfigSchema]) – The schema for the config. Configuration data available in
- description (Optional[str]) – A human-readable description of this logger.
SecretsManager
Resources which surface SecretsManager secrets for use in Dagster resources and jobs.
- dagster_aws.secretsmanager.SecretsManagerResource ResourceDefinition
Resource that gives access to AWS SecretsManager.
The underlying SecretsManager session is created by calling
boto3.session.Session(profile_name)
. The returned resource object is a SecretsManager client, an instance of botocore.client.SecretsManager.Example:
from dagster import build_op_context, job, op
from dagster_aws.secretsmanager import SecretsManagerResource
@op
def example_secretsmanager_op(secretsmanager: SecretsManagerResource):
return secretsmanager.get_client().get_secret_value(
SecretId='arn:aws:secretsmanager:region:aws_account_id:secret:appauthexample-AbCdEf'
)
@job
def example_job():
example_secretsmanager_op()
defs = Definitions(
jobs=[example_job],
resources=\{
'secretsmanager': SecretsManagerResource(
region_name='us-west-1'
)
}
)
- dagster_aws.secretsmanager.SecretsManagerSecretsResource ResourceDefinition
Resource that provides a dict which maps selected SecretsManager secrets to their string values. Also optionally sets chosen secrets as environment variables.
Example:
import os
from dagster import build_op_context, job, op, ResourceParam
from dagster_aws.secretsmanager import SecretsManagerSecretsResource
@op
def example_secretsmanager_secrets_op(secrets: SecretsManagerSecretsResource):
return secrets.fetch_secrets().get("my-secret-name")
@op
def example_secretsmanager_secrets_op_2(secrets: SecretsManagerSecretsResource):
with secrets.secrets_in_environment():
return os.getenv("my-other-secret-name")
@job
def example_job():
example_secretsmanager_secrets_op()
example_secretsmanager_secrets_op_2()
defs = Definitions(
jobs=[example_job],
resources=\{
'secrets': SecretsManagerSecretsResource(
region_name='us-west-1',
secrets_tag="dagster",
add_to_environment=True,
)
}
)Note that your ops must also declare that they require this resource with or it will not be initialized for the execution of their compute functions.
Pipes
Context Injectors
- class dagster_aws.pipes.PipesS3ContextInjector
A context injector that injects context by writing to a temporary S3 location.
Parameters:
- bucket (str) – The S3 bucket to write to.
- client (boto3.client) – A boto3 client to use to write to S3.
- key_prefix (Optional[str]) – An optional prefix to use for the S3 key. Defaults to a random
- class dagster_aws.pipes.PipesLambdaEventContextInjector
Injects context via AWS Lambda event input. Should be paired with :py:class
~dagster_pipes.PipesMappingParamsLoader
on the Lambda side.
Message Readers
- class dagster_aws.pipes.PipesS3MessageReader
Message reader that reads messages by periodically reading message chunks from a specified S3 bucket.
If log_readers is passed, this reader will also start the passed readers when the first message is received from the external process.
Parameters:
- interval (float) – interval in seconds between attempts to download a chunk
- bucket (str) – The S3 bucket to read from.
- client (WorkspaceClient) – A boto3 client.
- log_readers (Optional[Sequence[PipesLogReader]]) – A set of log readers for logs on S3.
- class dagster_aws.pipes.PipesCloudWatchMessageReader
- experimental
This API may break in future versions, even between dot releases.
Message reader that consumes AWS CloudWatch logs to read pipes messages.
Clients
- class dagster_aws.pipes.PipesLambdaClient
A pipes client for invoking AWS lambda.
By default context is injected via the lambda input event and messages are parsed out of the 4k tail of logs.
Parameters:
- client (boto3.client) – The boto lambda client used to call invoke.
- context_injector (Optional[PipesContextInjectorPipesContextInjector]) – A context injector to use to inject
- message_reader (Optional[PipesMessageReaderPipesMessageReader]) – A message reader to use to read messages
- run
Synchronously invoke a lambda function, enriched with the pipes protocol.
Parameters:
- function_name (str) – The name of the function to use.
- event (Mapping[str, Any]) – A JSON serializable object to pass as input to the lambda.
- context (Union[OpExecutionContextOpExecutionContext, AssetExecutionContextAssetExecutionContext]) – The context of the currently executing Dagster op or asset.
Returns: Wrapper containing results reported by the external process.Return type: PipesClientCompletedInvocation
- class dagster_aws.pipes.PipesGlueClient
- experimental
This API may break in future versions, even between dot releases.
A pipes client for invoking AWS Glue jobs.
Parameters:
- context_injector (Optional[PipesContextInjectorPipesContextInjector]) – A context injector to use to inject
- message_reader (Optional[PipesMessageReaderPipesMessageReader]) – A message reader to use to read messages
- client (Optional[boto3.client]) – The boto Glue client used to launch the Glue job
- forward_termination (bool) – Whether to cancel the Glue job run when the Dagster process receives a termination signal.
- run
Start a Glue job, enriched with the pipes protocol.
See also: AWS API Documentation
Parameters:
- context (Union[OpExecutionContextOpExecutionContext, AssetExecutionContextAssetExecutionContext]) – The context of the currently executing Dagster op or asset.
- start_job_run_params (Dict) – Parameters for the
start_job_run
boto3 Glue client call. - extras (Optional[Dict[str, Any]]) – Additional Dagster metadata to pass to the Glue job.
Returns: Wrapper containing results reported by the external process.Return type: PipesClientCompletedInvocation
- class dagster_aws.pipes.PipesECSClient
- experimental
This API may break in future versions, even between dot releases.
A pipes client for running AWS ECS tasks.
Parameters:
- client (Any) – The boto ECS client used to launch the ECS task
- context_injector (Optional[PipesContextInjectorPipesContextInjector]) – A context injector to use to inject
- message_reader (Optional[PipesMessageReaderPipesMessageReader]) – A message reader to use to read messages
- forward_termination (bool) – Whether to cancel the ECS task when the Dagster process receives a termination signal.
- run
Run ECS tasks, enriched with the pipes protocol.
Parameters:
- context (Union[OpExecutionContextOpExecutionContext, AssetExecutionContextAssetExecutionContext]) – The context of the currently executing Dagster op or asset.
- run_task_params (dict) – Parameters for the
run_task
boto3 ECS client call. - extras (Optional[Dict[str, Any]]) – Additional information to pass to the Pipes session in the external process.
- pipes_container_name (Optional[str]) – If running more than one container in the task,
Returns: Wrapper containing results reported by the external process.Return type: PipesClientCompletedInvocation
- class dagster_aws.pipes.PipesEMRClient
- experimental
This API may break in future versions, even between dot releases.
A pipes client for running jobs on AWS EMR.
Parameters:
- message_reader (Optional[PipesMessageReaderPipesMessageReader]) – A message reader to use to read messages
- client (Optional[boto3.client]) – The boto3 EMR client used to interact with AWS EMR.
- context_injector (Optional[PipesContextInjectorPipesContextInjector]) – A context injector to use to inject
- forward_termination (bool) – Whether to cancel the EMR job if the Dagster process receives a termination signal.
- wait_for_s3_logs_seconds (int) – The number of seconds to wait for S3 logs to be written after execution completes.
- run
Run a job on AWS EMR, enriched with the pipes protocol.
Starts a new EMR cluster for each invocation.
Parameters:
- context (Union[OpExecutionContextOpExecutionContext, AssetExecutionContextAssetExecutionContext]) – The context of the currently executing Dagster op or asset.
- run_job_flow_params (Optional[dict]) – Parameters for the
run_job_flow
boto3 EMR client call. - extras (Optional[Dict[str, Any]]) – Additional information to pass to the Pipes session in the external process.
Returns: Wrapper containing results reported by the external process.Return type: PipesClientCompletedInvocation
- class dagster_aws.pipes.PipesEMRServerlessClient
- experimental
This API may break in future versions, even between dot releases.
A pipes client for running workloads on AWS EMR Serverless.
Parameters:
- client (Optional[boto3.client]) – The boto3 AWS EMR Serverless client used to interact with AWS EMR Serverless.
- context_injector (Optional[PipesContextInjectorPipesContextInjector]) – A context injector to use to inject
- message_reader (Optional[PipesMessageReaderPipesMessageReader]) – A message reader to use to read messages
- forward_termination (bool) – Whether to cancel the AWS EMR Serverless workload if the Dagster process receives a termination signal.
- poll_interval (float) – The interval in seconds to poll the AWS EMR Serverless workload for status updates. Defaults to 5 seconds.
- run
Run a workload on AWS EMR Serverless, enriched with the pipes protocol.
Parameters:
- context (Union[OpExecutionContextOpExecutionContext, AssetExecutionContextAssetExecutionContext]) – The context of the currently executing Dagster op or asset.
- params (dict) – Parameters for the
start_job_run
boto3 AWS EMR Serverless client call. - extras (Optional[Dict[str, Any]]) – Additional information to pass to the Pipes session in the external process.
Returns: Wrapper containing results reported by the external process.Return type: PipesClientCompletedInvocation
Legacy
- dagster_aws.s3.ConfigurablePickledObjectS3IOManager IOManagerDefinition
- deprecated
This API will be removed in version 2.0. Please use S3PickleIOManager instead..
Renamed to S3PickleIOManager. See S3PickleIOManager for documentation.
- dagster_aws.s3.s3_resource ResourceDefinition
Resource that gives access to S3.
The underlying S3 session is created by calling
boto3.session.Session(profile_name)
. The returned resource object is an S3 client, an instance of botocore.client.S3.Example:
from dagster import build_op_context, job, op
from dagster_aws.s3 import s3_resource
@op(required_resource_keys=\{'s3'})
def example_s3_op(context):
return context.resources.s3.list_objects_v2(
Bucket='my-bucket',
Prefix='some-key'
)
@job(resource_defs=\{'s3': s3_resource})
def example_job():
example_s3_op()
example_job.execute_in_process(
run_config=\{
'resources': \{
's3': \{
'config': \{
'region_name': 'us-west-1',
}
}
}
}
)Note that your ops must also declare that they require this resource with required_resource_keys, or it will not be initialized for the execution of their compute functions.
You may configure this resource as follows:
resources:
s3:
config:
region_name: "us-west-1"
# Optional[str]: Specifies a custom region for the S3 session. Default is chosen
# through the ordinary boto credential chain.
use_unsigned_session: false
# Optional[bool]: Specifies whether to use an unsigned S3 session. Default: True
endpoint_url: "http://localhost"
# Optional[str]: Specifies a custom endpoint for the S3 session. Default is None.
profile_name: "dev"
# Optional[str]: Specifies a custom profile for S3 session. Default is default
# profile as specified in ~/.aws/credentials file
use_ssl: true
# Optional[bool]: Whether or not to use SSL. By default, SSL is used.
verify: None
# Optional[str]: Whether or not to verify SSL certificates. By default SSL certificates are verified.
# You can also specify this argument if you want to use a different CA cert bundle than the one used by botocore."
aws_access_key_id: None
# Optional[str]: The access key to use when creating the client.
aws_secret_access_key: None
# Optional[str]: The secret key to use when creating the client.
aws_session_token: None
# Optional[str]: The session token to use when creating the client.
- dagster_aws.s3.s3_pickle_io_manager IOManagerDefinition
Persistent IO manager using S3 for storage.
Serializes objects via pickling. Suitable for objects storage for distributed executors, so long as each execution node has network connectivity and credentials for S3 and the backing bucket.
Assigns each op output to a unique filepath containing run ID, step key, and output name. Assigns each asset to a single filesystem path, at “<base_dir>/<asset_key>”. If the asset key has multiple components, the final component is used as the name of the file, and the preceding components as parent directories under the base_dir.
Subsequent materializations of an asset will overwrite previous materializations of that asset. With a base directory of “/my/base/path”, an asset with key AssetKey([“one”, “two”, “three”]) would be stored in a file called “three” in a directory with path “/my/base/path/one/two/”.
Example usage:
- Attach this IO manager to a set of assets.
from dagster import Definitions, asset
from dagster_aws.s3 import s3_pickle_io_manager, s3_resource
@asset
def asset1():
# create df ...
return df
@asset
def asset2(asset1):
return asset1[:5]
defs = Definitions(
assets=[asset1, asset2],
resources=\{
"io_manager": s3_pickle_io_manager.configured(
\{"s3_bucket": "my-cool-bucket", "s3_prefix": "my-cool-prefix"}
),
"s3": s3_resource,
},
) - Attach this IO manager to your job to make it available to your ops.
from dagster import job
from dagster_aws.s3 import s3_pickle_io_manager, s3_resource
@job(
resource_defs=\{
"io_manager": s3_pickle_io_manager.configured(
\{"s3_bucket": "my-cool-bucket", "s3_prefix": "my-cool-prefix"}
),
"s3": s3_resource,
},
)
def my_job():
...
- Attach this IO manager to a set of assets.
- dagster_aws.s3.s3_file_manager ResourceDefinition
FileManager that provides abstract access to S3.
Implements the FileManager
FileManager
API.
- dagster_aws.redshift.redshift_resource ResourceDefinition
This resource enables connecting to a Redshift cluster and issuing queries against that cluster.
Example:
from dagster import build_op_context, op
from dagster_aws.redshift import redshift_resource
@op(required_resource_keys=\{'redshift'})
def example_redshift_op(context):
return context.resources.redshift.execute_query('SELECT 1', fetch_results=True)
redshift_configured = redshift_resource.configured(\{
'host': 'my-redshift-cluster.us-east-1.redshift.amazonaws.com',
'port': 5439,
'user': 'dagster',
'password': 'dagster',
'database': 'dev',
})
context = build_op_context(resources=\{'redshift': redshift_configured})
assert example_redshift_op(context) == [(1,)]
- dagster_aws.secretsmanager.secretsmanager_resource ResourceDefinition
Resource that gives access to AWS SecretsManager.
The underlying SecretsManager session is created by calling
boto3.session.Session(profile_name)
. The returned resource object is a SecretsManager client, an instance of botocore.client.SecretsManager.Example:
from dagster import build_op_context, job, op
from dagster_aws.secretsmanager import secretsmanager_resource
@op(required_resource_keys=\{'secretsmanager'})
def example_secretsmanager_op(context):
return context.resources.secretsmanager.get_secret_value(
SecretId='arn:aws:secretsmanager:region:aws_account_id:secret:appauthexample-AbCdEf'
)
@job(resource_defs=\{'secretsmanager': secretsmanager_resource})
def example_job():
example_secretsmanager_op()
example_job.execute_in_process(
run_config=\{
'resources': \{
'secretsmanager': \{
'config': \{
'region_name': 'us-west-1',
}
}
}
}
)You may configure this resource as follows:
resources:
secretsmanager:
config:
region_name: "us-west-1"
# Optional[str]: Specifies a custom region for the SecretsManager session. Default is chosen
# through the ordinary boto credential chain.
profile_name: "dev"
# Optional[str]: Specifies a custom profile for SecretsManager session. Default is default
# profile as specified in ~/.aws/credentials file
- dagster_aws.secretsmanager.secretsmanager_secrets_resource ResourceDefinition
Resource that provides a dict which maps selected SecretsManager secrets to their string values. Also optionally sets chosen secrets as environment variables.
Example:
import os
from dagster import build_op_context, job, op
from dagster_aws.secretsmanager import secretsmanager_secrets_resource
@op(required_resource_keys=\{'secrets'})
def example_secretsmanager_secrets_op(context):
return context.resources.secrets.get("my-secret-name")
@op(required_resource_keys=\{'secrets'})
def example_secretsmanager_secrets_op_2(context):
return os.getenv("my-other-secret-name")
@job(resource_defs=\{'secrets': secretsmanager_secrets_resource})
def example_job():
example_secretsmanager_secrets_op()
example_secretsmanager_secrets_op_2()
example_job.execute_in_process(
run_config=\{
'resources': \{
'secrets': \{
'config': \{
'region_name': 'us-west-1',
'secrets_tag': 'dagster',
'add_to_environment': True,
}
}
}
}
)Note that your ops must also declare that they require this resource with required_resource_keys, or it will not be initialized for the execution of their compute functions.
You may configure this resource as follows:
resources:
secretsmanager:
config:
region_name: "us-west-1"
# Optional[str]: Specifies a custom region for the SecretsManager session. Default is chosen
# through the ordinary boto credential chain.
profile_name: "dev"
# Optional[str]: Specifies a custom profile for SecretsManager session. Default is default
# profile as specified in ~/.aws/credentials file
secrets: ["arn:aws:secretsmanager:region:aws_account_id:secret:appauthexample-AbCdEf"]
# Optional[List[str]]: Specifies a list of secret ARNs to pull from SecretsManager.
secrets_tag: "dagster"
# Optional[str]: Specifies a tag, all secrets which have the tag set will be pulled
# from SecretsManager.
add_to_environment: true
# Optional[bool]: Whether to set the selected secrets as environment variables. Defaults
# to false.