Utilities
- dagster.file_relative_path
Get a path relative to the currently executing Python file.
This function is useful when one needs to load a file that is relative to the position of the current file. (Such as when you encode a configuration file path in source file and want in runnable in any current working directory)
Parameters:
- dunderfile (str) – Should always be
__file__
. - relative_path (str) – Path to get relative to the currently executing file.
Examples:
file_relative_path(__file__, 'path/relative/to/file')
- dunderfile (str) – Should always be
- dagster.config_from_files
Constructs run config from YAML files.
Parameters: config_files (List[str]) – List of paths or glob patterns for yaml files to load and parse as the run config.Returns: A run config dictionary constructed from provided YAML files.Return type: Dict[str, Any]Raises:
- FileNotFoundError – When a config file produces no results
- DagsterInvariantViolationErrorDagsterInvariantViolationError – When one of the YAML files is invalid and has a parse
- dagster.config_from_pkg_resources
Load a run config from a package resource, using
pkg_resources.resource_string()
.Example:
config_from_pkg_resources(
pkg_resource_defs=[
('dagster_examples.airline_demo.environments', 'local_base.yaml'),
('dagster_examples.airline_demo.environments', 'local_warehouse.yaml'),
],
)Parameters: pkg_resource_defs (List[(str, str)]) – List of pkg_resource modules/files to load as the run config.Returns: A run config dictionary constructed from the provided yaml stringsReturn type: Dict[Str, Any]Raises: DagsterInvariantViolationErrorDagsterInvariantViolationError – When one of the YAML documents is invalid and has a parse error.
- dagster.config_from_yaml_strings
Static constructor for run configs from YAML strings.
Parameters: yaml_strings (List[str]) – List of yaml strings to parse as the run config.Returns: A run config dictionary constructed from the provided yaml stringsReturn type: Dict[Str, Any]Raises: DagsterInvariantViolationErrorDagsterInvariantViolationError – When one of the YAML documents is invalid and has a parse error.
- dagster.get_dagster_logger
Creates a python logger whose output messages will be captured and converted into Dagster log messages. This means they will have structured information such as the step_key, run_id, etc. embedded into them, and will show up in the Dagster event log.
This can be used as a more convenient alternative to context.log in most cases. If log level is not set explicitly, defaults to DEBUG.
Parameters: name (Optional[str]) – If supplied, will create a logger with the name “dagster.builtin.{name}”, with properties inherited from the base Dagster logger. If omitted, the returned logger will be named “dagster.builtin”.Returns: A logger whose output will be captured by Dagster.Return type:
logging.Logger
Example:from dagster import get_dagster_logger, op
@op
def hello_op():
log = get_dagster_logger()
for i in range(5):
# do something
log.info(f"Did \{i+1} things!")
- dagster.make_email_on_run_failure_sensor
Create a job failure sensor that sends email via the SMTP protocol.
Parameters:
- email_from (str) – The sender email address to send the message from.
- email_password (str) – The password of the sender.
- email_to (List[str]) – The receipt email addresses to send the message to.
- email_body_fn (Optional(Callable[[RunFailureSensorContextRunFailureSensorContext], str])) – Function which
- email_subject_fn (Optional(Callable[[RunFailureSensorContextRunFailureSensorContext], str])) – Function which
- smtp_host (str) – The hostname of the SMTP server. Defaults to “smtp.gmail.com”.
- smtp_type (str) – The protocol; either “SSL” or “STARTTLS”. Defaults to SSL.
- smtp_port (Optional[int]) – The SMTP port. Defaults to 465 for SSL, 587 for STARTTLS.
- smtp_user (Optional[str]) – The SMTP user for authenticatication in the SMTP server. Defaults to the value of email_from.
- name – (Optional[str]): The name of the sensor. Defaults to “email_on_job_failure”.
- webserver_base_url – (Optional[str]): The base url of your dagster-webserver instance. Specify this to allow
- monitored_jobs (Optional[List[Union[JobDefinitionJobDefinition, GraphDefinitionGraphDefinition, JobDefinitionJobDefinition, RepositorySelectorRepositorySelector, JobSelectorJobSelector]]]) – The jobs that will be monitored by this failure sensor. Defaults to None, which means the alert will
- monitor_all_code_locations (bool) – If set to True, the sensor will monitor all runs in the
- job_selection (Optional[List[Union[JobDefinitionJobDefinition, GraphDefinitionGraphDefinition, JobDefinitionJobDefinition, RepositorySelectorRepositorySelector, JobSelectorJobSelector]]]) – deprecatedmonitored_jobs instead.) (deprecated in favor of monitored_jobs) The jobs that will be monitored by this failure
- default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default
- monitor_all_repositories (bool) – deprecatedmonitor_all_code_locations instead.) If set to True, the sensor will monitor all runs in the
Examples:
email_on_run_failure = make_email_on_run_failure_sensor(
email_from="no-reply@example.com",
email_password=os.getenv("ALERT_EMAIL_PASSWORD"),
email_to=["xxx@example.com"],
)
@repository
def my_repo():
return [my_job + email_on_run_failure]def my_message_fn(context: RunFailureSensorContext) -> str:
return (
f"Job \{context.dagster_run.job_name} failed!"
f"Error: \{context.failure_event.message}"
)
email_on_run_failure = make_email_on_run_failure_sensor(
email_from="no-reply@example.com",
email_password=os.getenv("ALERT_EMAIL_PASSWORD"),
email_to=["xxx@example.com"],
email_body_fn=my_message_fn,
email_subject_fn=lambda _: "Dagster Alert",
webserver_base_url="http://mycoolsite.com",
)
- class dagster._utils.forked_pdb.ForkedPdb
A pdb subclass that may be used from a forked multiprocessing child.
Examples:
from dagster._utils.forked_pdb import ForkedPdb
@solid
def complex_solid(_):
# some complicated stuff
ForkedPdb().set_trace()
# some other complicated stuffYou can initiate pipeline execution via the webserver and use the pdb debugger to examine/step through execution at the breakpoint.