IO Managers
IO managers are user-provided objects that store op outputs and load them as inputs to downstream ops.
- class dagster.ConfigurableIOManager
Base class for Dagster IO managers that utilize structured config.
This class is a subclass of both IOManagerDefinition
IOManagerDefinition
, ConfigConfig
, and IOManagerIOManager
. Implementers must provide an implementation of thehandle_output()
andload_input()
methods.Example definition:
class MyIOManager(ConfigurableIOManager):
path_prefix: List[str]
def _get_path(self, context) -> str:
return "/".join(context.asset_key.path)
def handle_output(self, context, obj):
write_csv(self._get_path(context), obj)
def load_input(self, context):
return read_csv(self._get_path(context))
defs = Definitions(
...,
resources=\{
"io_manager": MyIOManager(path_prefix=["my", "prefix"])
}
)
- class dagster.ConfigurableIOManagerFactory
Base class for Dagster IO managers that utilize structured config. This base class is useful for cases in which the returned IO manager is not the same as the class itself (e.g. when it is a wrapper around the actual IO manager implementation).
This class is a subclass of both IOManagerDefinition
IOManagerDefinition
and ConfigConfig
. Implementers should provide an implementation of theresource_function()
method, which should return an instance of IOManagerIOManager
.Example definition:
class ExternalIOManager(IOManager):
def __init__(self, connection):
self._connection = connection
def handle_output(self, context, obj):
...
def load_input(self, context):
...
class ConfigurableExternalIOManager(ConfigurableIOManagerFactory):
username: str
password: str
def create_io_manager(self, context) -> IOManager:
with database.connect(username, password) as connection:
return MyExternalIOManager(connection)
defs = Definitions(
...,
resources=\{
"io_manager": ConfigurableExternalIOManager(
username="dagster",
password=EnvVar("DB_PASSWORD")
)
}
)
- class dagster.IOManager
Base class for user-provided IO managers.
IOManagers are used to store op outputs and load them as inputs to downstream ops.
Extend this class to handle how objects are loaded and stored. Users should implement
handle_output
to store an object andload_input
to retrieve an object.- abstract handle_output
User-defined method that stores an output of an op.
Parameters:
- context (OutputContextOutputContext) – The context of the step output that produces this object.
- obj (Any) – The object, returned by the op, to be stored.
- abstract load_input
User-defined method that loads an input to an op.
Parameters: context (InputContextInputContext) – The input context, which describes the input that’s being loaded and the upstream output that’s being loaded from.Returns: The data object.Return type: Any
- class dagster.IOManagerDefinition
Definition of an IO manager resource.
IOManagers are used to store op outputs and load them as inputs to downstream ops.
An IOManagerDefinition is a ResourceDefinition
ResourceDefinition
whose resource_fn returns an IOManagerIOManager
.The easiest way to create an IOManagerDefnition is with the @io_manager
@io_manager
decorator.- static hardcoded_io_manager
A helper function that creates an
IOManagerDefinition
with a hardcoded IOManager.Parameters:
- value (IOManagerIOManager) – A hardcoded IO Manager which helps mock the definition.
- description ([Optional[str]]) – The description of the IO Manager. Defaults to None.
Returns: A hardcoded resource.Return type: [IOManagerDefinition]
- @dagster.io_manager
Define an IO manager.
IOManagers are used to store op outputs and load them as inputs to downstream ops.
The decorated function should accept an InitResourceContext
InitResourceContext
and return an IOManagerIOManager
.Parameters:
- config_schema (Optional[ConfigSchemaConfigSchema]) – The schema for the resource config. Configuration
- description (Optional[str]) – A human-readable description of the resource.
- output_config_schema (Optional[ConfigSchemaConfigSchema]) – The schema for per-output config. If not set,
- input_config_schema (Optional[ConfigSchemaConfigSchema]) – The schema for per-input config. If not set,
- required_resource_keys (Optional[Set[str]]) – Keys for the resources required by the object
- version (Optional[str]) – (Experimental) The version of a resource function. Two wrapped
class MyIOManager(IOManager):
def handle_output(self, context, obj):
write_csv("some/path")
def load_input(self, context):
return read_csv("some/path")
@io_manager
def my_io_manager(init_context):
return MyIOManager()
@op(out=Out(io_manager_key="my_io_manager_key"))
def my_op(_):
return do_stuff()
@job(resource_defs=\{"my_io_manager_key": my_io_manager})
def my_job():
my_op()
Input and Output Contexts
- class dagster.InputContext
The
context
object available to the load_input method of InputManagerInputManager
.Users should not instantiate this object directly. In order to construct an InputContext for testing an IO Manager’s load_input method, use dagster.build_input_context()
dagster.build_input_context()
.Example:
from dagster import IOManager, InputContext
class MyIOManager(IOManager):
def load_input(self, context: InputContext):
...- get_asset_identifier
The sequence of strings making up the AssetKey for the asset being loaded as an input. If the asset is partitioned, the identifier contains the partition key as the final element in the sequence. For example, for the asset key
AssetKey(["foo", "bar", "baz"])
, materialized with partition key “2023-06-01”,get_asset_identifier
will return["foo", "bar", "baz", "2023-06-01"]
.
- get_identifier
Utility method to get a collection of identifiers that as a whole represent a unique step input.
If not using memoization, the unique identifier collection consists of
run_id
: the id of the run which generates the input.step_key
: the key for a compute step.name
: the name of the output. (default: ‘result’).
If using memoization, the
version
corresponding to the step output is used in place of therun_id
.Returns: A list of identifiers, i.e. (run_id or version), step_key, and output_nameReturn type: List[str, …]
- property asset_key
The
AssetKey
of the asset that is being loaded as an input.
- property asset_partition_key
The partition key for input asset.
Raises an error if the input asset has no partitioning, or if the run covers a partition range for the input asset.
- property asset_partition_key_range
The partition key range for input asset.
Raises an error if the input asset has no partitioning.
- property asset_partition_keys
The partition keys for input asset.
Raises an error if the input asset has no partitioning.
- property asset_partitions_def
The PartitionsDefinition on the upstream asset corresponding to this input.
- property asset_partitions_time_window
The time window for the partitions of the input asset.
Raises an error if either of the following are true:
- The input asset has no partitioning.
- The input asset is not partitioned with a TimeWindowPartitionsDefinition or a MultiPartitionsDefinition with one time-partitioned dimension.
- property config
The config attached to the input that we’re loading.
- property dagster_type
The type of this input. Dagster types do not propagate from an upstream output to downstream inputs, and this property only captures type information for the input that is either passed in explicitly with AssetIn
AssetIn
or InIn
, or can be infered from type hints. For an asset input, the Dagster type from the upstream asset definition is ignored.
- property definition_metadata
A dict of metadata that is assigned to the InputDefinition that we’re loading. This property only contains metadata passed in explicitly with AssetIn
AssetIn
or InIn
. To access metadata of an upstream asset or op definition, use the definition_metadata in InputContext.upstream_outputInputContext.upstream_output
.
- property has_asset_key
Returns True if an asset is being loaded as input, otherwise returns False. A return value of False indicates that an output from an op is being loaded as the input.
- property has_asset_partitions
Returns True if the asset being loaded as input is partitioned.
- property has_input_name
If we’re the InputContext is being used to load the result of a run from outside the run, then it won’t have an input name.
- property has_partition_key
Whether the current run is a partitioned run.
- property log
The log manager to use for this input.
- property metadata
- deprecated
This API will be removed in version 2.0.0. Use definition_metadata instead.
Use definitiion_metadata instead.
Type: Deprecated
- property name
The name of the input that we’re loading.
- property op_def
The definition of the op that’s loading the input.
- property partition_key
The partition key for the current run.
Raises an error if the current run is not a partitioned run.
- property resource_config
The config associated with the resource that initializes the InputManager.
- property resources
The resources required by the resource that initializes the input manager. If using the
@input_manager()
decorator, these resources correspond to those requested with the required_resource_keys parameter.
- property upstream_output
Info about the output that produced the object we’re loading.
- class dagster.OutputContext
The context object that is available to the handle_output method of an IOManager
IOManager
.Users should not instantiate this object directly. To construct an OutputContext for testing an IO Manager’s handle_output method, use dagster.build_output_context()
dagster.build_output_context()
.Example:
from dagster import IOManager, OutputContext
class MyIOManager(IOManager):
def handle_output(self, context: OutputContext, obj):
...- add_output_metadata
Add a dictionary of metadata to the handled output.
Metadata entries added will show up in the HANDLED_OUTPUT and ASSET_MATERIALIZATION events for the run.
Parameters: metadata (Mapping[str, RawMetadataValue]) – A metadata dictionary to log Examples:
from dagster import IOManager
class MyIOManager(IOManager):
def handle_output(self, context, obj):
context.add_output_metadata(\{"foo": "bar"})
- get_asset_identifier
The sequence of strings making up the AssetKey for the asset being stored as an output. If the asset is partitioned, the identifier contains the partition key as the final element in the sequence. For example, for the asset key
AssetKey(["foo", "bar", "baz"])
materialized with partition key “2023-06-01”,get_asset_identifier
will return["foo", "bar", "baz", "2023-06-01"]
.
- get_identifier
Utility method to get a collection of identifiers that as a whole represent a unique step output.
If not using memoization, the unique identifier collection consists of
run_id
: the id of the run which generates the output.step_key
: the key for a compute step.name
: the name of the output. (default: ‘result’).
If using memoization, the
version
corresponding to the step output is used in place of therun_id
.Returns: A list of identifiers, i.e. (run_id or version), step_key, and output_nameReturn type: Sequence[str, …]
- log_event
Log an AssetMaterialization or AssetObservation from within the body of an io manager’s handle_output method.
Events logged with this method will appear in the event log.
Parameters: event (Union[AssetMaterializationAssetMaterialization, AssetObservationAssetObservation]) – The event to log. Examples:
from dagster import IOManager, AssetMaterialization
class MyIOManager(IOManager):
def handle_output(self, context, obj):
context.log_event(AssetMaterialization("foo"))
- property asset_key
The
AssetKey
of the asset that is being stored as an output.
- property asset_partition_key
The partition key for output asset.
Raises an error if the output asset has no partitioning, or if the run covers a partition range for the output asset.
- property asset_partition_key_range
The partition key range for output asset.
Raises an error if the output asset has no partitioning.
- property asset_partition_keys
The partition keys for the output asset.
Raises an error if the output asset has no partitioning.
- property asset_partitions_def
The PartitionsDefinition on the asset corresponding to this output.
- property asset_partitions_time_window
The time window for the partitions of the output asset.
Raises an error if either of the following are true:
- The output asset has no partitioning.
- The output asset is not partitioned with a TimeWindowPartitionsDefinition or a MultiPartitionsDefinition with one time-partitioned dimension.
- property asset_spec
The
AssetSpec
that is being stored as an output.
- property config
The configuration for the output.
- property dagster_type
The type of this output.
- property definition_metadata
A dict of the metadata that is assigned to the OutputDefinition that produced the output. Metadata is assigned to an OutputDefinition either directly on the OutputDefinition or in the @asset decorator.
- property has_asset_key
Returns True if an asset is being stored, otherwise returns False. A return value of False indicates that an output from an op is being stored.
- property has_asset_partitions
Returns True if the asset being stored is partitioned.
- property has_partition_key
Whether the current run is a partitioned run.
- property log
The log manager to use for this output.
- property mapping_key
The key that identifies a unique mapped output. None for regular outputs.
- property metadata
- deprecated
This API will be removed in version 2.0.0. Use definition_metadata instead.
used definition_metadata instead.
Type: Deprecated
- property name
The name of the output that produced the output.
- property op_def
The definition of the op that produced the output.
- property output_metadata
A dict of the metadata that is assigned to the output at execution time.
- property partition_key
The partition key for the current run.
Raises an error if the current run is not a partitioned run.
- property resource_config
The config associated with the resource that initializes the InputManager.
- property resources
The resources required by the output manager, specified by the required_resource_keys parameter.
- property run_id
The id of the run that produced the output.
- property step_key
The step_key for the compute step that produced the output.
- property version
(Experimental) The version of the output.
- dagster.build_input_context
Builds input context from provided parameters.
build_input_context
can be used as either a function, or a context manager. If resources that are also context managers are provided, thenbuild_input_context
must be used as a context manager.Parameters:
- name (Optional[str]) – The name of the input that we’re loading.
- config (Optional[Any]) – The config attached to the input that we’re loading.
- definition_metadata (Optional[Dict[str, Any]]) – A dict of metadata that is assigned to the
- upstream_output (Optional[OutputContextOutputContext]) – Info about the output that produced the object
- dagster_type (Optional[DagsterTypeDagsterType]) – The type of this input.
- resource_config (Optional[Dict[str, Any]]) – The resource config to make available from the
- resources (Optional[Dict[str, Any]]) – The resources to make available from the context.
- asset_key (Optional[Union[AssetKeyAssetKey, Sequence[str], str]]) – The asset key attached to the InputDefinition.
- op_def (Optional[OpDefinitionOpDefinition]) – The definition of the op that’s loading the input.
- step_context (Optional[StepExecutionContextStepExecutionContext]) – For internal use.
- partition_key (Optional[str]) – String value representing partition key to execute with.
- asset_partition_key_range (Optional[PartitionKeyRangePartitionKeyRange]) – The range of asset partition keys
- asset_partitions_def – Optional[PartitionsDefinition]: The PartitionsDefinition of the asset
Examples:
build_input_context()
with build_input_context(resources=\{"foo": context_manager_resource}) as context:
do_something
- dagster.build_output_context
Builds output context from provided parameters.
build_output_context
can be used as either a function, or a context manager. If resources that are also context managers are provided, thenbuild_output_context
must be used as a context manager.Parameters:
- step_key (Optional[str]) – The step_key for the compute step that produced the output.
- name (Optional[str]) – The name of the output that produced the output.
- definition_metadata (Optional[Mapping[str, Any]]) – A dict of the metadata that is assigned to the
- mapping_key (Optional[str]) – The key that identifies a unique mapped output. None for regular outputs.
- config (Optional[Any]) – The configuration for the output.
- dagster_type (Optional[DagsterTypeDagsterType]) – The type of this output.
- version (Optional[str]) – (Experimental) The version of the output.
- resource_config (Optional[Mapping[str, Any]]) – The resource config to make available from the
- resources (Optional[Resources]) – The resources to make available from the context.
- op_def (Optional[OpDefinitionOpDefinition]) – The definition of the op that produced the output.
- asset_key – Optional[Union[AssetKey, Sequence[str], str]]: The asset key corresponding to the
- partition_key – Optional[str]: String value representing partition key to execute with.
- metadata (Optional[Mapping[str, Any]]) – deprecateddefinition_metadata instead.) Deprecated. Use definition_metadata instead.
Examples:
build_output_context()
with build_output_context(resources=\{"foo": context_manager_resource}) as context:
do_something
Built-in IO Managers
- dagster.FilesystemIOManager IOManagerDefinition
Built-in filesystem IO manager that stores and retrieves values using pickling.
The base directory that the pickle files live inside is determined by:
- The IO manager’s “base_dir” configuration value, if specified. Otherwise…
- A “storage/” directory underneath the value for “local_artifact_storage” in your dagster.yaml
- A “storage/” directory underneath the directory that the DAGSTER_HOME environment variable
- A temporary directory.
Assigns each op output to a unique filepath containing run ID, step key, and output name. Assigns each asset to a single filesystem path, at “<base_dir>/<asset_key>”. If the asset key has multiple components, the final component is used as the name of the file, and the preceding components as parent directories under the base_dir.
Subsequent materializations of an asset will overwrite previous materializations of that asset. So, with a base directory of “/my/base/path”, an asset with key AssetKey([“one”, “two”, “three”]) would be stored in a file called “three” in a directory with path “/my/base/path/one/two/”.
Example usage:
-
Attach an IO manager to a set of assets using the reserved resource key
"io_manager"
.from dagster import Definitions, asset, FilesystemIOManager
@asset
def asset1():
# create df ...
return df
@asset
def asset2(asset1):
return asset1[:5]
defs = Definitions(
assets=[asset1, asset2],
resources=\{
"io_manager": FilesystemIOManager(base_dir="/my/base/path")
},
) -
Specify a job-level IO manager using the reserved resource key
"io_manager"
, which will set the given IO manager on all ops in a job.from dagster import FilesystemIOManager, job, op
@op
def op_a():
# create df ...
return df
@op
def op_b(df):
return df[:5]
@job(
resource_defs=\{
"io_manager": FilesystemIOManager(base_dir="/my/base/path")
}
)
def job():
op_b(op_a()) -
Specify IO manager on Out
Out
, which allows you to set different IO managers on different step outputs.from dagster import FilesystemIOManager, job, op, Out
@op(out=Out(io_manager_key="my_io_manager"))
def op_a():
# create df ...
return df
@op
def op_b(df):
return df[:5]
@job(resource_defs=\{"my_io_manager": FilesystemIOManager()})
def job():
op_b(op_a())
- dagster.InMemoryIOManager IOManagerDefinition
I/O manager that stores and retrieves values in memory. After execution is complete, the values will be garbage-collected. Note that this means that each run will not have access to values from previous runs.
The UPathIOManager
can be used to easily define filesystem-based IO Managers.
- class dagster.UPathIOManager
Abstract IOManager base class compatible with local and cloud storage via universal-pathlib and fsspec.
Features:
- handles partitioned assets
- handles loading a single upstream partition
- handles loading multiple upstream partitions (with respect to PartitionMapping
PartitionMapping
) - supports loading multiple partitions concurrently with async load_from_path method
- the get_metadata method can be customized to add additional metadata to the output
- the allow_missing_partitions metadata value can be set to True to skip missing partitions
Input Managers (Experimental)
Input managers load inputs from either upstream outputs or from provided default values.
- @dagster.input_manager
Define an input manager.
Input managers load op inputs, either from upstream outputs or by providing default values.
The decorated function should accept a InputContext
InputContext
and resource config, and return a loaded object that will be passed into one of the inputs of an op.The decorator produces an InputManagerDefinition
InputManagerDefinition
.Parameters:
- config_schema (Optional[ConfigSchemaConfigSchema]) – The schema for the resource-level config. If not
- description (Optional[str]) – A human-readable description of the resource.
- input_config_schema (Optional[ConfigSchemaConfigSchema]) – A schema for the input-level config. Each
- required_resource_keys (Optional[Set[str]]) – Keys for the resources required by the input
- version (Optional[str]) – (Experimental) the version of the input manager definition.
from dagster import input_manager, op, job, In
@input_manager
def csv_loader(_):
return read_csv("some/path")
@op(ins=\{"input1": In(input_manager_key="csv_loader_key")})
def my_op(_, input1):
do_stuff(input1)
@job(resource_defs=\{"csv_loader_key": csv_loader})
def my_job():
my_op()
@input_manager(config_schema=\{"base_dir": str})
def csv_loader(context):
return read_csv(context.resource_config["base_dir"] + "/some/path")
@input_manager(input_config_schema=\{"path": str})
def csv_loader(context):
return read_csv(context.config["path"])
- class dagster.InputManager
Base interface for classes that are responsible for loading solid inputs.
- class dagster.InputManagerDefinition
Definition of an input manager resource.
Input managers load op inputs.
An InputManagerDefinition is a ResourceDefinition
ResourceDefinition
whose resource_fn returns an InputManagerInputManager
.The easiest way to create an InputManagerDefinition is with the @input_manager
@input_manager
decorator.
Legacy
- dagster.fs_io_manager IOManagerDefinition
Built-in filesystem IO manager that stores and retrieves values using pickling.
The base directory that the pickle files live inside is determined by:
- The IO manager’s “base_dir” configuration value, if specified. Otherwise…
- A “storage/” directory underneath the value for “local_artifact_storage” in your dagster.yaml
- A “storage/” directory underneath the directory that the DAGSTER_HOME environment variable
- A temporary directory.
Assigns each op output to a unique filepath containing run ID, step key, and output name. Assigns each asset to a single filesystem path, at “<base_dir>/<asset_key>”. If the asset key has multiple components, the final component is used as the name of the file, and the preceding components as parent directories under the base_dir.
Subsequent materializations of an asset will overwrite previous materializations of that asset. So, with a base directory of “/my/base/path”, an asset with key AssetKey([“one”, “two”, “three”]) would be stored in a file called “three” in a directory with path “/my/base/path/one/two/”.
Example usage:
-
Attach an IO manager to a set of assets using the reserved resource key
"io_manager"
.from dagster import Definitions, asset, fs_io_manager
@asset
def asset1():
# create df ...
return df
@asset
def asset2(asset1):
return asset1[:5]
defs = Definitions(
assets=[asset1, asset2],
resources=\{
"io_manager": fs_io_manager.configured(\{"base_dir": "/my/base/path"})
},
) -
Specify a job-level IO manager using the reserved resource key
"io_manager"
, which will set the given IO manager on all ops in a job.from dagster import fs_io_manager, job, op
@op
def op_a():
# create df ...
return df
@op
def op_b(df):
return df[:5]
@job(
resource_defs=\{
"io_manager": fs_io_manager.configured(\{"base_dir": "/my/base/path"})
}
)
def job():
op_b(op_a()) -
Specify IO manager on Out
Out
, which allows you to set different IO managers on different step outputs.from dagster import fs_io_manager, job, op, Out
@op(out=Out(io_manager_key="my_io_manager"))
def op_a():
# create df ...
return df
@op
def op_b(df):
return df[:5]
@job(resource_defs=\{"my_io_manager": fs_io_manager})
def job():
op_b(op_a())
- dagster.mem_io_manager IOManagerDefinition
Built-in IO manager that stores and retrieves values in memory.