Partitions Definitions
- class dagster.PartitionsDefinition
Defines a set of partitions, which can be attached to a software-defined asset or job.
Abstract class with implementations for different kinds of partitions.
- abstract get_partition_keys
Returns a list of strings representing the partition keys of the PartitionsDefinition.
Parameters:
- current_time (Optional[datetime]) – A datetime object representing the current time, only
- dynamic_partitions_store (Optional[DynamicPartitionsStore]) – The DynamicPartitionsStore
Returns: Sequence[str]
- class dagster.HourlyPartitionsDefinition
A set of hourly partitions.
The first partition in the set will start on the start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If minute_offset is provided, the start and end times of each partition will be minute_offset past the hour.
Parameters:
-
start_date (Union[datetime.datetime, str]) – The first date in the set of partitions. Can
-
end_date (Union[datetime.datetime, str, None]) – The last date(excluding) in the set of partitions.
-
minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults
-
fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d. Note that if a non-UTC
-
timezone (Optional[str]) – The timezone in which each date should exist.
-
end_offset (int) – Extends the partition set by a number of partitions equal to the value
HourlyPartitionsDefinition(start_date=datetime(2022, 03, 12))
# creates partitions (2022-03-12-00:00, 2022-03-12-01:00), (2022-03-12-01:00, 2022-03-12-02:00), ...
HourlyPartitionsDefinition(start_date=datetime(2022, 03, 12), minute_offset=15)
# creates partitions (2022-03-12-00:15, 2022-03-12-01:15), (2022-03-12-01:15, 2022-03-12-02:15), ...
- get_cron_schedule
The schedule executes at the cadence specified by the partitioning, but may overwrite the minute/hour/day offset of the partitioning.
This is useful e.g. if you have partitions that span midnight to midnight but you want to schedule a job that runs at 2 am.
- property day_offset
For a weekly or monthly partitions definition, returns the day to “split” partitions by. Each partition will start on this day, and end before this day in the following week/month. Returns 0 if the day_offset parameter is unset in the WeeklyPartitionsDefinition, MonthlyPartitionsDefinition, or the provided cron schedule.
For weekly partitions, returns a value between 0 (representing Sunday) and 6 (representing Saturday). Providing a value of 1 means that a partition will exist weekly from Monday to the following Sunday.
For monthly partitions, returns a value between 0 (the first day of the month) and 31 (the last possible day of the month).
Type: int
- property hour_offset
Number of hours past 00:00 to “split” partitions. Defaults to 0.
For example, returns 1 if each partition starts at 01:00.
Type: int
- property minute_offset
Number of minutes past the hour to “split” partitions. Defaults to 0.
For example, returns 15 if each partition starts at 15 minutes past the hour.
Type: int
- property schedule_type
An enum representing the partition cadence (hourly, daily, weekly, or monthly).
Type: Optional[ScheduleType]
-
- class dagster.DailyPartitionsDefinition
A set of daily partitions.
The first partition in the set will start at the start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day.
Parameters:
-
start_date (Union[datetime.datetime, str]) – The first date in the set of partitions. Can
-
end_date (Union[datetime.datetime, str, None]) – The last date(excluding) in the set of partitions.
-
minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults
-
hour_offset (int) – Number of hours past 00:00 to “split” the partition. Defaults to 0.
-
timezone (Optional[str]) – The timezone in which each date should exist.
-
fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.
-
end_offset (int) – Extends the partition set by a number of partitions equal to the value
DailyPartitionsDefinition(start_date="2022-03-12")
# creates partitions (2022-03-12-00:00, 2022-03-13-00:00), (2022-03-13-00:00, 2022-03-14-00:00), ...
DailyPartitionsDefinition(start_date="2022-03-12", minute_offset=15, hour_offset=16)
# creates partitions (2022-03-12-16:15, 2022-03-13-16:15), (2022-03-13-16:15, 2022-03-14-16:15), ...
- get_cron_schedule
The schedule executes at the cadence specified by the partitioning, but may overwrite the minute/hour/day offset of the partitioning.
This is useful e.g. if you have partitions that span midnight to midnight but you want to schedule a job that runs at 2 am.
- property day_offset
For a weekly or monthly partitions definition, returns the day to “split” partitions by. Each partition will start on this day, and end before this day in the following week/month. Returns 0 if the day_offset parameter is unset in the WeeklyPartitionsDefinition, MonthlyPartitionsDefinition, or the provided cron schedule.
For weekly partitions, returns a value between 0 (representing Sunday) and 6 (representing Saturday). Providing a value of 1 means that a partition will exist weekly from Monday to the following Sunday.
For monthly partitions, returns a value between 0 (the first day of the month) and 31 (the last possible day of the month).
Type: int
- property hour_offset
Number of hours past 00:00 to “split” partitions. Defaults to 0.
For example, returns 1 if each partition starts at 01:00.
Type: int
- property minute_offset
Number of minutes past the hour to “split” partitions. Defaults to 0.
For example, returns 15 if each partition starts at 15 minutes past the hour.
Type: int
- property schedule_type
An enum representing the partition cadence (hourly, daily, weekly, or monthly).
Type: Optional[ScheduleType]
-
- class dagster.WeeklyPartitionsDefinition
Defines a set of weekly partitions.
The first partition in the set will start at the start_date. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If day_offset is provided, the start and end date of each partition will be day of the week corresponding to day_offset (0 indexed with Sunday as the start of the week). If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day.
Parameters:
-
start_date (Union[datetime.datetime, str]) – The first date in the set of partitions will
-
end_date (Union[datetime.datetime, str, None]) – The last date(excluding) in the set of partitions.
-
minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults
-
hour_offset (int) – Number of hours past 00:00 to “split” the partition. Defaults to 0.
-
day_offset (int) – Day of the week to “split” the partition. Defaults to 0 (Sunday).
-
timezone (Optional[str]) – The timezone in which each date should exist.
-
fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.
-
end_offset (int) – Extends the partition set by a number of partitions equal to the value
WeeklyPartitionsDefinition(start_date="2022-03-12")
# creates partitions (2022-03-13-00:00, 2022-03-20-00:00), (2022-03-20-00:00, 2022-03-27-00:00), ...
WeeklyPartitionsDefinition(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=6)
# creates partitions (2022-03-12-03:15, 2022-03-19-03:15), (2022-03-19-03:15, 2022-03-26-03:15), ...
- get_cron_schedule
The schedule executes at the cadence specified by the partitioning, but may overwrite the minute/hour/day offset of the partitioning.
This is useful e.g. if you have partitions that span midnight to midnight but you want to schedule a job that runs at 2 am.
- property day_offset
For a weekly or monthly partitions definition, returns the day to “split” partitions by. Each partition will start on this day, and end before this day in the following week/month. Returns 0 if the day_offset parameter is unset in the WeeklyPartitionsDefinition, MonthlyPartitionsDefinition, or the provided cron schedule.
For weekly partitions, returns a value between 0 (representing Sunday) and 6 (representing Saturday). Providing a value of 1 means that a partition will exist weekly from Monday to the following Sunday.
For monthly partitions, returns a value between 0 (the first day of the month) and 31 (the last possible day of the month).
Type: int
- property hour_offset
Number of hours past 00:00 to “split” partitions. Defaults to 0.
For example, returns 1 if each partition starts at 01:00.
Type: int
- property minute_offset
Number of minutes past the hour to “split” partitions. Defaults to 0.
For example, returns 15 if each partition starts at 15 minutes past the hour.
Type: int
- property schedule_type
An enum representing the partition cadence (hourly, daily, weekly, or monthly).
Type: Optional[ScheduleType]
-
- class dagster.MonthlyPartitionsDefinition
A set of monthly partitions.
The first partition in the set will start at the soonest first of the month after start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If day_offset is provided, the start and end date of each partition will be day_offset. If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day.
Parameters:
-
start_date (Union[datetime.datetime, str]) – The first date in the set of partitions will be
-
end_date (Union[datetime.datetime, str, None]) – The last date(excluding) in the set of partitions.
-
minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults
-
hour_offset (int) – Number of hours past 00:00 to “split” the partition. Defaults to 0.
-
day_offset (int) – Day of the month to “split” the partition. Defaults to 1.
-
timezone (Optional[str]) – The timezone in which each date should exist.
-
fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.
-
end_offset (int) – Extends the partition set by a number of partitions equal to the value
MonthlyPartitionsDefinition(start_date="2022-03-12")
# creates partitions (2022-04-01-00:00, 2022-05-01-00:00), (2022-05-01-00:00, 2022-06-01-00:00), ...
MonthlyPartitionsDefinition(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=5)
# creates partitions (2022-04-05-03:15, 2022-05-05-03:15), (2022-05-05-03:15, 2022-06-05-03:15), ...
- get_cron_schedule
The schedule executes at the cadence specified by the partitioning, but may overwrite the minute/hour/day offset of the partitioning.
This is useful e.g. if you have partitions that span midnight to midnight but you want to schedule a job that runs at 2 am.
- property day_offset
For a weekly or monthly partitions definition, returns the day to “split” partitions by. Each partition will start on this day, and end before this day in the following week/month. Returns 0 if the day_offset parameter is unset in the WeeklyPartitionsDefinition, MonthlyPartitionsDefinition, or the provided cron schedule.
For weekly partitions, returns a value between 0 (representing Sunday) and 6 (representing Saturday). Providing a value of 1 means that a partition will exist weekly from Monday to the following Sunday.
For monthly partitions, returns a value between 0 (the first day of the month) and 31 (the last possible day of the month).
Type: int
- property hour_offset
Number of hours past 00:00 to “split” partitions. Defaults to 0.
For example, returns 1 if each partition starts at 01:00.
Type: int
- property minute_offset
Number of minutes past the hour to “split” partitions. Defaults to 0.
For example, returns 15 if each partition starts at 15 minutes past the hour.
Type: int
- property schedule_type
An enum representing the partition cadence (hourly, daily, weekly, or monthly).
Type: Optional[ScheduleType]
-
- class dagster.TimeWindowPartitionsDefinition
A set of partitions where each partition corresponds to a time window.
The provided cron_schedule determines the bounds of the time windows. E.g. a cron_schedule of “0 0 * * *” will result in daily partitions that start at midnight and end at midnight of the following day.
The string partition_key associated with each partition corresponds to the start of the partition’s time window.
The first partition in the set will start on at the first cron_schedule tick that is equal to or after the given start datetime. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number.
We recommended limiting partition counts for each asset to 25,000 partitions or fewer.
Parameters:
- cron_schedule (str) – Determines the bounds of the time windows.
- start (datetime) – The first partition in the set will start on at the first cron_schedule
- timezone (Optional[str]) – The timezone in which each time should exist.
- end (datetime) – The last partition (excluding) in the set.
- fmt (str) – The date format to use for partition_keys. Note that if a non-UTC timezone is
- end_offset (int) – Extends the partition set by a number of partitions equal to the value
- get_cron_schedule
The schedule executes at the cadence specified by the partitioning, but may overwrite the minute/hour/day offset of the partitioning.
This is useful e.g. if you have partitions that span midnight to midnight but you want to schedule a job that runs at 2 am.
- property day_offset
For a weekly or monthly partitions definition, returns the day to “split” partitions by. Each partition will start on this day, and end before this day in the following week/month. Returns 0 if the day_offset parameter is unset in the WeeklyPartitionsDefinition, MonthlyPartitionsDefinition, or the provided cron schedule.
For weekly partitions, returns a value between 0 (representing Sunday) and 6 (representing Saturday). Providing a value of 1 means that a partition will exist weekly from Monday to the following Sunday.
For monthly partitions, returns a value between 0 (the first day of the month) and 31 (the last possible day of the month).
Type: int
- property hour_offset
Number of hours past 00:00 to “split” partitions. Defaults to 0.
For example, returns 1 if each partition starts at 01:00.
Type: int
- property minute_offset
Number of minutes past the hour to “split” partitions. Defaults to 0.
For example, returns 15 if each partition starts at 15 minutes past the hour.
Type: int
- property schedule_type
An enum representing the partition cadence (hourly, daily, weekly, or monthly).
Type: Optional[ScheduleType]
- class dagster.TimeWindow
An interval that is closed at the start and open at the end.
- start
A datetime that marks the start of the window.
Type: datetime
- end
A datetime that marks the end of the window.
Type: datetime
- class dagster.StaticPartitionsDefinition
A statically-defined set of partitions.
We recommended limiting partition counts for each asset to 25,000 partitions or fewer.
Example:
from dagster import StaticPartitionsDefinition, asset
oceans_partitions_def = StaticPartitionsDefinition(
["arctic", "atlantic", "indian", "pacific", "southern"]
)
@asset(partitions_def=oceans_partitions_defs)
def ml_model_for_each_ocean():
...- get_partition_keys
Returns a list of strings representing the partition keys of the PartitionsDefinition.
Parameters:
- current_time (Optional[datetime]) – A datetime object representing the current time, only
- dynamic_partitions_store (Optional[DynamicPartitionsStore]) – The DynamicPartitionsStore
Returns: Sequence[str]
- class dagster.MultiPartitionsDefinition
Takes the cross-product of partitions from two partitions definitions.
For example, with a static partitions definition where the partitions are [“a”, “b”, “c”] and a daily partitions definition, this partitions definition will have the following partitions:
2020-01-01|a 2020-01-01|b 2020-01-01|c 2020-01-02|a 2020-01-02|b …
We recommended limiting partition counts for each asset to 25,000 partitions or fewer.
Parameters: partitions_defs (Mapping[str, PartitionsDefinitionPartitionsDefinition]) – A mapping of dimension name to partitions definition. The total set of partitions will be the cross-product of the partitions from each PartitionsDefinition.
- partitions_defs
A sequence of PartitionDimensionDefinition objects, each of which contains a dimension name and a PartitionsDefinition. The total set of partitions will be the cross-product of the partitions from each PartitionsDefinition. This sequence is ordered by dimension name, to ensure consistent ordering of the partitions.
Type: Sequence[PartitionDimensionDefinition]
- get_partition_keys
Returns a list of MultiPartitionKeys representing the partition keys of the PartitionsDefinition.
Parameters:
- current_time (Optional[datetime]) – A datetime object representing the current time, only
- dynamic_partitions_store (Optional[DynamicPartitionsStore]) – The DynamicPartitionsStore
Returns: Sequence[MultiPartitionKey]
- class dagster.MultiPartitionKey
A multi-dimensional partition key stores the partition key for each dimension. Subclasses the string class to keep partition key type as a string.
Contains additional methods to access the partition key for each dimension. Creates a string representation of the partition key for each dimension, separated by a pipe (|). Orders the dimensions by name, to ensure consistent string representation.
- class dagster.DynamicPartitionsDefinition
A partitions definition whose partition keys can be dynamically added and removed.
This is useful for cases where the set of partitions is not known at definition time, but is instead determined at runtime.
Partitions can be added and removed using instance.add_dynamic_partitions and instance.delete_dynamic_partition methods.
We recommended limiting partition counts for each asset to 25,000 partitions or fewer.
Parameters:
- name (Optional[str]) – The name of the partitions definition.
- partition_fn (Optional[Callable[[Optional[datetime]], Union[Sequence[Partition], Sequence[str]]]]) – deprecated
Examples:
fruits = DynamicPartitionsDefinition(name="fruits")
@sensor(job=my_job)
def my_sensor(context):
return SensorResult(
run_requests=[RunRequest(partition_key="apple")],
dynamic_partitions_requests=[fruits.build_add_request(["apple"])]
)- get_partition_keys
Returns a list of strings representing the partition keys of the PartitionsDefinition.
Parameters:
- current_time (Optional[datetime]) – A datetime object representing the current time, only
- dynamic_partitions_store (Optional[DynamicPartitionsStore]) – The DynamicPartitionsStore
Returns: Sequence[str]
- class dagster.PartitionKeyRange
Defines a range of partitions.
- start
The starting partition key in the range (inclusive).
Type: str
- end
The ending partition key in the range (inclusive).
Type: str
Examples:
partitions_def = StaticPartitionsDefinition(["a", "b", "c", "d"])
partition_key_range = PartitionKeyRange(start="a", end="c") # Represents ["a", "b", "c"]
Partitioned Schedules
- dagster.build_schedule_from_partitioned_job
Creates a schedule from a job that targets time window-partitioned or statically-partitioned assets. The job can also be multi-partitioned, as long as one of the partition dimensions is time-partitioned.
The schedule executes at the cadence specified by the time partitioning of the job or assets.
Example:######################################
# Job that targets partitioned assets
######################################
from dagster import (
DailyPartitionsDefinition,
asset,
build_schedule_from_partitioned_job,
define_asset_job,
Definitions,
)
@asset(partitions_def=DailyPartitionsDefinition(start_date="2020-01-01"))
def asset1():
...
asset1_job = define_asset_job("asset1_job", selection=[asset1])
# The created schedule will fire daily
asset1_job_schedule = build_schedule_from_partitioned_job(asset1_job)
defs = Definitions(assets=[asset1], schedules=[asset1_job_schedule])
################
# Non-asset job
################
from dagster import DailyPartitionsDefinition, build_schedule_from_partitioned_job, jog
@job(partitions_def=DailyPartitionsDefinition(start_date="2020-01-01"))
def do_stuff_partitioned():
...
# The created schedule will fire daily
do_stuff_partitioned_schedule = build_schedule_from_partitioned_job(
do_stuff_partitioned,
)
defs = Definitions(schedules=[do_stuff_partitioned_schedule])
Partition Mapping
- class dagster.PartitionMapping
Defines a correspondence between the partitions in an asset and the partitions in an asset that it depends on.
Overriding PartitionMapping outside of Dagster is not supported. The abstract methods of this class may change at any time.
- abstract get_downstream_partitions_for_partitions
Returns the subset of partition keys in the downstream asset that use the data in the given partition key subset of the upstream asset.
Parameters:
- upstream_partitions_subset (Union[PartitionKeyRangePartitionKeyRange, PartitionsSubset]) – The
- downstream_partitions_def (PartitionsDefinitionPartitionsDefinition) – The partitions definition for the
- abstract get_upstream_mapped_partitions_result_for_partitions
Returns a UpstreamPartitionsResult object containing the partition keys the downstream partitions subset was mapped to in the upstream partitions definition.
Valid upstream partitions will be included in UpstreamPartitionsResult.partitions_subset. Invalid upstream partitions will be included in UpstreamPartitionsResult.required_but_nonexistent_partition_keys.
For example, if an upstream asset is time-partitioned and starts in June 2023, and the downstream asset is time-partitioned and starts in May 2023, this function would return a UpstreamPartitionsResult(PartitionsSubset(“2023-06-01”), required_but_nonexistent_partition_keys=[“2023-05-01”]) when downstream_partitions_subset contains 2023-05-01 and 2023-06-01.
- class dagster.TimeWindowPartitionMapping
The default mapping between two TimeWindowPartitionsDefinitions.
A partition in the downstream partitions definition is mapped to all partitions in the upstream asset whose time windows overlap it.
This means that, if the upstream and downstream partitions definitions share the same time period, then this mapping is essentially the identity partition mapping - plus conversion of datetime formats.
If the upstream time period is coarser than the downstream time period, then each partition in the downstream asset will map to a single (larger) upstream partition. E.g. if the downstream is hourly and the upstream is daily, then each hourly partition in the downstream will map to the daily partition in the upstream that contains that hour.
If the upstream time period is finer than the downstream time period, then each partition in the downstream asset will map to multiple upstream partitions. E.g. if the downstream is daily and the upstream is hourly, then each daily partition in the downstream asset will map to the 24 hourly partitions in the upstream that occur on that day.
- start_offset
If not 0, then the starts of the upstream windows are shifted by this offset relative to the starts of the downstream windows. For example, if start_offset=-1 and end_offset=0, then the downstream partition “2022-07-04” would map to the upstream partitions “2022-07-03” and “2022-07-04”. If the upstream and downstream PartitionsDefinitions are different, then the offset is in the units of the downstream. Defaults to 0.
Type: int
- end_offset
If not 0, then the ends of the upstream windows are shifted by this offset relative to the ends of the downstream windows. For example, if start_offset=0 and end_offset=1, then the downstream partition “2022-07-04” would map to the upstream partitions “2022-07-04” and “2022-07-05”. If the upstream and downstream PartitionsDefinitions are different, then the offset is in the units of the downstream. Defaults to 0.
Type: int
- allow_nonexistent_upstream_partitions
Defaults to false. If true, does not raise an error when mapped upstream partitions fall outside the start-end time window of the partitions def. For example, if the upstream partitions def starts on “2023-01-01” but the downstream starts on “2022-01-01”, setting this bool to true would return no partition keys when get_upstream_partitions_for_partitions is called with “2022-06-01”. When set to false, would raise an error.
Type: bool
Examples:
from dagster import DailyPartitionsDefinition, TimeWindowPartitionMapping, AssetIn, asset
partitions_def = DailyPartitionsDefinition(start_date="2020-01-01")
@asset(partitions_def=partitions_def)
def asset1():
...
@asset(
partitions_def=partitions_def,
ins=\{
"asset1": AssetIn(
partition_mapping=TimeWindowPartitionMapping(start_offset=-1)
)
}
)
def asset2(asset1):
...
- class dagster.IdentityPartitionMapping
Expects that the upstream and downstream assets are partitioned in the same way, and maps partitions in the downstream asset to the same partition in the upstream asset.
- class dagster.AllPartitionMapping
Maps every partition in the downstream asset to every partition in the upstream asset.
Commonly used in the case when the downstream asset is not partitioned, in which the entire downstream asset depends on all partitions of the usptream asset.
- class dagster.LastPartitionMapping
Maps all dependencies to the last partition in the upstream asset.
Commonly used in the case when the downstream asset is not partitioned, in which the entire downstream asset depends on the last partition of the upstream asset.
- class dagster.StaticPartitionMapping
Define an explicit correspondence between two StaticPartitionsDefinitions.
Parameters: downstream_partition_keys_by_upstream_partition_key (Dict[str, str | Collection[str]]) – The single or multi-valued correspondence from upstream keys to downstream keys.
- class dagster.SpecificPartitionsPartitionMapping
Maps to a specific subset of partitions in the upstream asset.
Example:
from dagster import SpecificPartitionsPartitionMapping, StaticPartitionsDefinition, asset
@asset(partitions_def=StaticPartitionsDefinition(["a", "b", "c"]))
def upstream():
...
@asset(
ins=\{
"upstream": AssetIn(partition_mapping=SpecificPartitionsPartitionMapping(["a"]))
}
)
def a_downstream(upstream):
...
- class dagster.MultiToSingleDimensionPartitionMapping
- experimental
This API may break in future versions, even between dot releases.
Defines a correspondence between an single-dimensional partitions definition and a MultiPartitionsDefinition. The single-dimensional partitions definition must be a dimension of the MultiPartitionsDefinition.
This class handles the case where the upstream asset is multipartitioned and the downstream asset is single dimensional, and vice versa.
For a partition key X, this partition mapping assumes that any multi-partition key with X in the selected dimension is a dependency.
Parameters: partition_dimension_name (Optional[str]) – The name of the partition dimension in the MultiPartitionsDefinition that matches the single-dimension partitions definition.
- class dagster.MultiPartitionMapping
- experimental
This API may break in future versions, even between dot releases.
Defines a correspondence between two MultiPartitionsDefinitions.
Accepts a mapping of upstream dimension name to downstream DimensionPartitionMapping, representing the explicit correspondence between the upstream and downstream MultiPartitions dimensions and the partition mapping used to calculate the downstream partitions.
Examples:
weekly_abc = MultiPartitionsDefinition(
\{
"abc": StaticPartitionsDefinition(["a", "b", "c"]),
"weekly": WeeklyPartitionsDefinition("2023-01-01"),
}
)
daily_123 = MultiPartitionsDefinition(
\{
"123": StaticPartitionsDefinition(["1", "2", "3"]),
"daily": DailyPartitionsDefinition("2023-01-01"),
}
)
MultiPartitionMapping(
\{
"abc": DimensionPartitionMapping(
dimension_name="123",
partition_mapping=StaticPartitionMapping(\{"a": "1", "b": "2", "c": "3"}),
),
"weekly": DimensionPartitionMapping(
dimension_name="daily",
partition_mapping=TimeWindowPartitionMapping(),
)
}
)For upstream or downstream dimensions not explicitly defined in the mapping, Dagster will assume an AllPartitionsMapping, meaning that all upstream partitions in those dimensions will be mapped to all downstream partitions in those dimensions.
Examples:
weekly_abc = MultiPartitionsDefinition(
\{
"abc": StaticPartitionsDefinition(["a", "b", "c"]),
"daily": DailyPartitionsDefinition("2023-01-01"),
}
)
daily_123 = MultiPartitionsDefinition(
\{
"123": StaticPartitionsDefinition(["1", "2", "3"]),
"daily": DailyPartitionsDefinition("2023-01-01"),
}
)
MultiPartitionMapping(
\{
"daily": DimensionPartitionMapping(
dimension_name="daily",
partition_mapping=IdentityPartitionMapping(),
)
}
)
# Will map `daily_123` partition key \{"123": "1", "daily": "2023-01-01"} to the upstream:
# \{"abc": "a", "daily": "2023-01-01"}
# \{"abc": "b", "daily": "2023-01-01"}
# \{"abc": "c", "daily": "2023-01-01"}Parameters: downstream_mappings_by_upstream_dimension (Mapping[str, DimensionPartitionMapping]) – A mapping that defines an explicit correspondence between one dimension of the upstream MultiPartitionsDefinition and one dimension of the downstream MultiPartitionsDefinition. Maps a string representing upstream dimension name to downstream DimensionPartitionMapping, containing the downstream dimension name and partition mapping.
Backfill Policy (Experimental)
- class dagster.BackfillPolicy
- experimental
This API may break in future versions, even between dot releases.
A BackfillPolicy specifies how Dagster should attempt to backfill a partitioned asset.
There are two main kinds of backfill policies: single-run and multi-run.
An asset with a single-run backfill policy will take a single run to backfill all of its partitions at once.
An asset with a multi-run backfill policy will take multiple runs to backfill all of its partitions. Each run will backfill a subset of the partitions. The number of partitions to backfill in each run is controlled by the max_partitions_per_run parameter.
For example:
- If an asset has 100 partitions, and the max_partitions_per_run is set to 10, then it will
- If an asset has 100 partitions, and the max_partitions_per_run is set to 11, then it will
Constructing an BackfillPolicy directly is not recommended as the API is subject to change. BackfillPolicy.single_run() and BackfillPolicy.multi_run(max_partitions_per_run=x) are the recommended APIs.
- static multi_run
Creates a BackfillPolicy that executes the entire backfill in multiple runs. Each run will backfill [max_partitions_per_run] number of partitions.
Parameters: max_partitions_per_run (Optional[int]) – The maximum number of partitions in each run of the multiple runs. Defaults to 1.
- static single_run
Creates a BackfillPolicy that executes the entire backfill in a single run.
Partitioned Config
- class dagster.PartitionedConfig
Defines a way of configuring a job where the job can be run on one of a discrete set of partitions, and each partition corresponds to run configuration for the job.
Setting PartitionedConfig as the config for a job allows you to launch backfills for that job and view the run history across partitions.
- get_partition_keys
Returns a list of partition keys, representing the full set of partitions that config can be applied to.
Parameters: current_time (Optional[datetime]) – A datetime object representing the current time. Only applicable to time-based partitions definitions.Returns: Sequence[str]
- property partitions_def
The partitions definition associated with this PartitionedConfig.
Type: T_PartitionsDefinition
- property run_config_for_partition_fn
- deprecated
This API will be removed in version 2.0. Use
run_config_for_partition_key_fn
instead..A function that accepts a partition and returns a dictionary representing the config to attach to runs for that partition. Deprecated as of 1.3.3.
Type: Optional[Callable[[Partition], Mapping[str, Any]]]
- property run_config_for_partition_key_fn
A function that accepts a partition key and returns a dictionary representing the config to attach to runs for that partition.
Type: Optional[Callable[[str], Union[RunConfig, Mapping[str, Any]]]]
- property tags_for_partition_fn
- deprecated
This API will be removed in version 2.0. Use
tags_for_partition_key_fn
instead..A function that accepts a partition and returns a dictionary of tags to attach to runs for that partition. Deprecated as of 1.3.3.
Type: Optional[Callable[[Partition], Mapping[str, str]]]
- property tags_for_partition_key_fn
A function that accepts a partition key and returns a dictionary of tags to attach to runs for that partition.
Type: Optional[Callable[[str], Mapping[str, str]]]
- dagster.static_partitioned_config
Creates a static partitioned config for a job.
The provided partition_keys is a static list of strings identifying the set of partitions. The list of partitions is static, so while the run config returned by the decorated function may change over time, the list of valid partition keys does not.
This has performance advantages over dynamic_partitioned_config in terms of loading different partition views in the Dagster UI.
The decorated function takes in a partition key and returns a valid run config for a particular target job.
Parameters:
- partition_keys (Sequence[str]) – A list of valid partition keys, which serve as the range of
- tags_for_partition_fn (Optional[Callable[[str], Mapping[str, str]]]) – deprecated
- tags_for_partition_key_fn (Optional[Callable[[str], Mapping[str, str]]]) – A function that
Returns: PartitionedConfig
- dagster.dynamic_partitioned_config
Creates a dynamic partitioned config for a job.
The provided partition_fn returns a list of strings identifying the set of partitions, given an optional datetime argument (representing the current time). The list of partitions returned may change over time.
The decorated function takes in a partition key and returns a valid run config for a particular target job.
Parameters:
- partition_fn (Callable[[datetime.datetime], Sequence[str]]) – A function that generates a
- tags_for_partition_fn (Optional[Callable[[str], Mapping[str, str]]]) – deprecated
Returns: PartitionedConfig
- dagster.hourly_partitioned_config
Defines run config over a set of hourly partitions.
The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate.
The decorated function should return a run config dictionary.
The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at the start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If minute_offset is provided, the start and end times of each partition will be minute_offset past the hour.
Parameters:
-
start_date (Union[datetime.datetime, str]) – The first date in the set of partitions. Can
-
minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults
-
fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.
-
timezone (Optional[str]) – The timezone in which each date should exist.
-
end_offset (int) – Extends the partition set by a number of partitions equal to the value
-
tags_for_partition_fn (Optional[Callable[[str], Mapping[str, str]]]) – A function that
@hourly_partitioned_config(start_date=datetime(2022, 03, 12))
# creates partitions (2022-03-12-00:00, 2022-03-12-01:00), (2022-03-12-01:00, 2022-03-12-02:00), ...
@hourly_partitioned_config(start_date=datetime(2022, 03, 12), minute_offset=15)
# creates partitions (2022-03-12-00:15, 2022-03-12-01:15), (2022-03-12-01:15, 2022-03-12-02:15), ...
-
- dagster.daily_partitioned_config
Defines run config over a set of daily partitions.
The decorated function should accept a start datetime and end datetime, which represent the bounds of the date partition the config should delineate.
The decorated function should return a run config dictionary.
The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at the start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day.
Parameters:
-
start_date (Union[datetime.datetime, str]) – The first date in the set of partitions. Can
-
minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults
-
hour_offset (int) – Number of hours past 00:00 to “split” the partition. Defaults to 0.
-
timezone (Optional[str]) – The timezone in which each date should exist.
-
fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.
-
end_offset (int) – Extends the partition set by a number of partitions equal to the value
-
tags_for_partition_fn (Optional[Callable[[str], Mapping[str, str]]]) – A function that
@daily_partitioned_config(start_date="2022-03-12")
# creates partitions (2022-03-12-00:00, 2022-03-13-00:00), (2022-03-13-00:00, 2022-03-14-00:00), ...
@daily_partitioned_config(start_date="2022-03-12", minute_offset=15, hour_offset=16)
# creates partitions (2022-03-12-16:15, 2022-03-13-16:15), (2022-03-13-16:15, 2022-03-14-16:15), ...
-
- dagster.weekly_partitioned_config
Defines run config over a set of weekly partitions.
The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate.
The decorated function should return a run config dictionary.
The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at the start_date. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If day_offset is provided, the start and end date of each partition will be day of the week corresponding to day_offset (0 indexed with Sunday as the start of the week). If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day.
Parameters:
-
start_date (Union[datetime.datetime, str]) – The first date in the set of partitions will
-
minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults
-
hour_offset (int) – Number of hours past 00:00 to “split” the partition. Defaults to 0.
-
day_offset (int) – Day of the week to “split” the partition. Defaults to 0 (Sunday).
-
timezone (Optional[str]) – The timezone in which each date should exist.
-
fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.
-
end_offset (int) – Extends the partition set by a number of partitions equal to the value
-
tags_for_partition_fn (Optional[Callable[[str], Mapping[str, str]]]) – A function that
@weekly_partitioned_config(start_date="2022-03-12")
# creates partitions (2022-03-13-00:00, 2022-03-20-00:00), (2022-03-20-00:00, 2022-03-27-00:00), ...
@weekly_partitioned_config(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=6)
# creates partitions (2022-03-12-03:15, 2022-03-19-03:15), (2022-03-19-03:15, 2022-03-26-03:15), ...
-
- dagster.monthly_partitioned_config
Defines run config over a set of monthly partitions.
The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate.
The decorated function should return a run config dictionary.
The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at midnight on the soonest first of the month after start_date. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If day_offset is provided, the start and end date of each partition will be day_offset. If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day.
Parameters:
-
start_date (Union[datetime.datetime, str]) – The first date in the set of partitions will be
-
minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults
-
hour_offset (int) – Number of hours past 00:00 to “split” the partition. Defaults to 0.
-
day_offset (int) – Day of the month to “split” the partition. Defaults to 1.
-
timezone (Optional[str]) – The timezone in which each date should exist.
-
fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.
-
end_offset (int) – Extends the partition set by a number of partitions equal to the value
-
tags_for_partition_fn (Optional[Callable[[str], Mapping[str, str]]]) – A function that
@monthly_partitioned_config(start_date="2022-03-12")
# creates partitions (2022-04-01-00:00, 2022-05-01-00:00), (2022-05-01-00:00, 2022-06-01-00:00), ...
@monthly_partitioned_config(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=5)
# creates partitions (2022-04-05-03:15, 2022-05-05-03:15), (2022-05-05-03:15, 2022-06-05-03:15), ...
-