Skip to main content

Repositories

dagster.repository RepositoryDefinition

Create a repository from the decorated function.

In most cases, DefinitionsDefinitions should be used instead.

The decorated function should take no arguments and its return value should one of:

  1. List[Union[JobDefinition, ScheduleDefinition, SensorDefinition]]. Use this form when you have no need to lazy load jobs or other definitions. This is the typical use case.

  2. A dict of the form:

    \{
    'jobs': Dict[str, Callable[[], JobDefinition]],
    'schedules': Dict[str, Callable[[], ScheduleDefinition]]
    'sensors': Dict[str, Callable[[], SensorDefinition]]
    }

This form is intended to allow definitions to be created lazily when accessed by name, which can be helpful for performance when there are many definitions in a repository, or when constructing the definitions is costly.

  1. A RepositoryDataRepositoryData. Return this object if you need fine-grained control over the construction and indexing of definitions within the repository, e.g., to create definitions dynamically from .yaml files in a directory.

Parameters:

  • name (Optional[str]) – The name of the repository. Defaults to the name of the decorated
  • description (Optional[str]) – A string description of the repository.
  • metadata (Optional[Dict[str, RawMetadataValue]]) – Arbitrary metadata for the repository. Not
  • top_level_resources (Optional[Mapping[str, ResourceDefinitionResourceDefinition]]) – A dict of top-level

Example:

######################################################################
# A simple repository using the first form of the decorated function
######################################################################

@op(config_schema=\{n: Field(Int)})
def return_n(context):
return context.op_config['n']

@job
def simple_job():
return_n()

@job
def some_job():
...

@sensor(job=some_job)
def some_sensor():
if foo():
yield RunRequest(
run_key= ...,
run_config=\{
'ops': \{'return_n': \{'config': \{'n': bar()}}}
}
)

@job
def my_job():
...

my_schedule = ScheduleDefinition(cron_schedule="0 0 * * *", job=my_job)

@repository
def simple_repository():
return [simple_job, some_sensor, my_schedule]

######################################################################
# A simple repository using the first form of the decorated function
# and custom metadata that will be displayed in the UI
######################################################################

...

@repository(
name='my_repo',
metadata=\{
'team': 'Team A',
'repository_version': '1.2.3',
'environment': 'production',
})
def simple_repository():
return [simple_job, some_sensor, my_schedule]

######################################################################
# A lazy-loaded repository
######################################################################

def make_expensive_job():
@job
def expensive_job():
for i in range(10000):
return_n.alias(f'return_n_\{i}')()

return expensive_job

def make_expensive_schedule():
@job
def other_expensive_job():
for i in range(11000):
return_n.alias(f'my_return_n_\{i}')()

return ScheduleDefinition(cron_schedule="0 0 * * *", job=other_expensive_job)

@repository
def lazy_loaded_repository():
return \{
'jobs': \{'expensive_job': make_expensive_job},
'schedules': \{'expensive_schedule': make_expensive_schedule}
}


######################################################################
# A complex repository that lazily constructs jobs from a directory
# of files in a bespoke YAML format
######################################################################

class ComplexRepositoryData(RepositoryData):
def __init__(self, yaml_directory):
self._yaml_directory = yaml_directory

def get_all_jobs(self):
return [
self._construct_job_def_from_yaml_file(
self._yaml_file_for_job_name(file_name)
)
for file_name in os.listdir(self._yaml_directory)
]

...

@repository
def complex_repository():
return ComplexRepositoryData('some_directory')
class dagster.RepositoryDefinition

Define a repository that contains a group of definitions.

Users should typically not create objects of this class directly. Instead, use the @repository() decorator.

Parameters:

  • name (str) – The name of the repository.
  • repository_data (RepositoryDataRepositoryData) – Contains the definitions making up the repository.
  • description (Optional[str]) – A string description of the repository.
  • metadata (Optional[MetadataMapping]) – Arbitrary metadata for the repository. Not
get_all_jobs

Return all jobs in the repository as a list.

Note that this will construct any job in the lazily evaluated dictionary that has not yet been constructed.

Returns: All jobs in the repository.Return type: List[JobDefinition]

get_asset_value_loader

Returns an object that can load the contents of assets as Python objects.

Invokes load_input on the IOManagerIOManager associated with the assets. Avoids spinning up resources separately for each asset.

Usage:

with my_repo.get_asset_value_loader() as loader:
asset1 = loader.load_asset_value("asset1")
asset2 = loader.load_asset_value("asset2")
get_job

Get a job by name.

If this job is present in the lazily evaluated dictionary passed to the constructor, but has not yet been constructed, only this job is constructed, and will be cached for future calls.

Parameters: name (str) – Name of the job to retrieve.Returns: The job definition corresponding to the given name.Return type: JobDefinition

get_schedule_def

Get a schedule definition by name.

Parameters: name (str) – The name of the schedule.Returns: The schedule definition.Return type: ScheduleDefinition

get_sensor_def

Get a sensor definition by name.

Parameters: name (str) – The name of the sensor.Returns: The sensor definition.Return type: SensorDefinition

has_job

Check if a job with a given name is present in the repository.

Parameters: name (str) – The name of the job.Returns: bool

has_schedule_def

bool: Check if a schedule with a given name is present in the repository.

has_sensor_def

bool: Check if a sensor with a given name is present in the repository.

load_asset_value

Load the contents of an asset as a Python object.

Invokes load_input on the IOManagerIOManager associated with the asset.

If you want to load the values of multiple assets, it’s more efficient to use get_asset_value_loader()get_asset_value_loader(), which avoids spinning up resources separately for each asset.

Parameters:

  • asset_key (Union[AssetKeyAssetKey, Sequence[str], str]) – The key of the asset to load.
  • python_type (Optional[Type]) – The python type to load the asset as. This is what will
  • partition_key (Optional[str]) – The partition of the asset to load.
  • metadata (Optional[Dict[str, Any]]) – Input metadata to pass to the IOManagerIOManager
  • resource_config (Optional[Any]) – A dictionary of resource configurations to be passed

Returns: The contents of an asset as a Python object.

property asset_checks_defs_by_key

The assets checks defined in the repository.

Type: Mapping[AssetCheckKey, AssetChecksDefinition]

property assets_defs_by_key

The assets definitions defined in the repository.

Type: Mapping[AssetKey, AssetsDefinition]

property description

A human-readable description of the repository.

Type: Optional[str]

property job_names

Names of all jobs in the repository.

Type: List[str]

property metadata

Arbitrary metadata for the repository.

Type: Optional[MetadataMapping]

property name

The name of the repository.

Type: str

property schedule_defs

All schedules in the repository.

Type: List[ScheduleDefinition]

property sensor_defs

All sensors in the repository.

Type: Sequence[SensorDefinition]

property source_assets_by_key

The source assets defined in the repository.

Type: Mapping[AssetKey, SourceAsset]

class dagster.RepositoryData

Users should usually rely on the @repository@repository decorator to create new repositories, which will in turn call the static constructors on this class. However, users may subclass RepositoryDataRepositoryData for fine-grained control over access to and lazy creation of repository members.

abstract get_all_jobs

Return all jobs in the repository as a list.

Returns: All jobs in the repository.Return type: List[JobDefinition]

get_all_schedules

Return all schedules in the repository as a list.

Returns: All jobs in the repository.Return type: List[ScheduleDefinition]

get_all_sensors

Sequence[SensorDefinition]: Return all sensors in the repository as a list.

get_asset_checks_defs_by_key

Mapping[AssetCheckKey, AssetChecksDefinition]: Get the asset checks definitions for the repository.

get_assets_defs_by_key

Mapping[AssetKey, AssetsDefinition]: Get the asset definitions for the repository.

get_job

Get a job by name.

Parameters: job_name (str) – Name of the job to retrieve.Returns: The job definition corresponding to the given name.Return type: JobDefinition

get_job_names

Get the names of all jobs in the repository.

Returns: List[str]

get_schedule

Get a schedule by name.

Parameters: schedule_name (str) – name of the schedule to retrieve.Returns: The schedule definition corresponding to the given name.Return type: ScheduleDefinition

get_schedule_names

Get the names of all schedules in the repository.

Returns: List[str]

get_sensor

Get a sensor by name.

Parameters: sensor_name (str) – name of the sensor to retrieve.Returns: The sensor definition corresponding to the given name.Return type: SensorDefinition

get_sensor_names

Sequence[str]: Get the names of all sensors in the repository.

get_source_assets_by_key

Mapping[AssetKey, SourceAsset]: Get the source assets for the repository.

has_job

Check if a job with a given name is present in the repository.

Parameters: job_name (str) – The name of the job.Returns: bool

has_schedule

Check if a schedule with a given name is present in the repository.

has_sensor

Check if a sensor with a given name is present in the repository.