Testing partitioned config and jobs
In this article, we'll cover a few ways to test your partitioned config and jobs.
note
This article assumes familiarity with partitioned assets.
Testing partitioned config
Invoking a PartitionedConfig
object directly invokes the decorated function.
If you want to check whether the generated run config is valid for the config of a job, you can use the validate_run_config
function.
from dagster import validate_run_config, daily_partitioned_config
from datetime import datetime
@daily_partitioned_config(start_date=datetime(2020, 1, 1))
def my_partitioned_config(start: datetime, _end: datetime):
return {
"ops": {
"process_data_for_date": {"config": {"date": start.strftime("%Y-%m-%d")}}
}
}
def test_my_partitioned_config():
# assert that the decorated function returns the expected output
run_config = my_partitioned_config(datetime(2020, 1, 3), datetime(2020, 1, 4))
assert run_config == {
"ops": {"process_data_for_date": {"config": {"date": "2020-01-03"}}}
}
# assert that the output of the decorated function is valid configuration for the
# partitioned_op_job job
assert validate_run_config(partitioned_op_job, run_config)
If you want to test that a PartitionedConfig
creates the partitions you expect, use the get_partition_keys
or get_run_config_for_partition_key
functions:
from dagster import Config, OpExecutionContext
@daily_partitioned_config(start_date=datetime(2020, 1, 1), minute_offset=15)
def my_offset_partitioned_config(start: datetime, _end: datetime):
return {
"ops": {
"process_data": {
"config": {
"start": start.strftime("%Y-%m-%d-%H:%M"),
"end": _end.strftime("%Y-%m-%d-%H:%M"),
}
}
}
}
class ProcessDataConfig(Config):
start: str
end: str
@op
def process_data(context: OpExecutionContext, config: ProcessDataConfig):
s = config.start
e = config.end
context.log.info(f"processing data for {s} - {e}")
@job(config=my_offset_partitioned_config)
def do_more_stuff_partitioned():
process_data()
def test_my_offset_partitioned_config():
# test that the partition keys are what you expect
keys = my_offset_partitioned_config.get_partition_keys()
assert keys[0] == "2020-01-01"
assert keys[1] == "2020-01-02"
# test that the run_config for a partition is valid for partitioned_op_job
run_config = my_offset_partitioned_config.get_run_config_for_partition_key(keys[0])
assert validate_run_config(do_more_stuff_partitioned, run_config)
# test that the contents of run_config are what you expect
assert run_config == {
"ops": {
"process_data": {
"config": {"start": "2020-01-01-00:15", "end": "2020-01-02-00:15"}
}
}
}
Testing partitioned jobs
To run a partitioned job in-process on a particular partition, supply a value for the partition_key
argument of dagster.JobDefinition.execute_in_process
:
def test_partitioned_op_job():
assert partitioned_op_job.execute_in_process(partition_key="2020-01-01").success