Skip to main content

Utilities

dagster.file_relative_path

Get a path relative to the currently executing Python file.

This function is useful when one needs to load a file that is relative to the position of the current file. (Such as when you encode a configuration file path in source file and want in runnable in any current working directory)

Parameters:

  • dunderfile (str) – Should always be __file__.
  • relative_path (str) – Path to get relative to the currently executing file.

Examples:

file_relative_path(__file__, 'path/relative/to/file')
dagster.config_from_files

Constructs run config from YAML files.

Parameters: config_files (List[str]) – List of paths or glob patterns for yaml files to load and parse as the run config.Returns: A run config dictionary constructed from provided YAML files.Return type: Dict[str, Any]Raises:

  • FileNotFoundError – When a config file produces no results
  • DagsterInvariantViolationErrorDagsterInvariantViolationError – When one of the YAML files is invalid and has a parse
dagster.config_from_pkg_resources

Load a run config from a package resource, using pkg_resources.resource_string().

Example:

config_from_pkg_resources(
pkg_resource_defs=[
('dagster_examples.airline_demo.environments', 'local_base.yaml'),
('dagster_examples.airline_demo.environments', 'local_warehouse.yaml'),
],
)

Parameters: pkg_resource_defs (List[(str, str)]) – List of pkg_resource modules/files to load as the run config.Returns: A run config dictionary constructed from the provided yaml stringsReturn type: Dict[Str, Any]Raises: DagsterInvariantViolationErrorDagsterInvariantViolationError – When one of the YAML documents is invalid and has a parse error.

dagster.config_from_yaml_strings

Static constructor for run configs from YAML strings.

Parameters: yaml_strings (List[str]) – List of yaml strings to parse as the run config.Returns: A run config dictionary constructed from the provided yaml stringsReturn type: Dict[Str, Any]Raises: DagsterInvariantViolationErrorDagsterInvariantViolationError – When one of the YAML documents is invalid and has a parse error.

dagster.get_dagster_logger

Creates a python logger whose output messages will be captured and converted into Dagster log messages. This means they will have structured information such as the step_key, run_id, etc. embedded into them, and will show up in the Dagster event log.

This can be used as a more convenient alternative to context.log in most cases. If log level is not set explicitly, defaults to DEBUG.

Parameters: name (Optional[str]) – If supplied, will create a logger with the name “dagster.builtin.{name}”, with properties inherited from the base Dagster logger. If omitted, the returned logger will be named “dagster.builtin”.Returns: A logger whose output will be captured by Dagster.Return type: logging.Logger Example:

from dagster import get_dagster_logger, op

@op
def hello_op():
log = get_dagster_logger()
for i in range(5):
# do something
log.info(f"Did \{i+1} things!")
class dagster.ExperimentalWarning
dagster.make_email_on_run_failure_sensor

Create a job failure sensor that sends email via the SMTP protocol.

Parameters:

  • email_from (str) – The sender email address to send the message from.
  • email_password (str) – The password of the sender.
  • email_to (List[str]) – The receipt email addresses to send the message to.
  • email_body_fn (Optional(Callable[[RunFailureSensorContextRunFailureSensorContext], str])) – Function which
  • email_subject_fn (Optional(Callable[[RunFailureSensorContextRunFailureSensorContext], str])) – Function which
  • smtp_host (str) – The hostname of the SMTP server. Defaults to “smtp.gmail.com”.
  • smtp_type (str) – The protocol; either “SSL” or “STARTTLS”. Defaults to SSL.
  • smtp_port (Optional[int]) – The SMTP port. Defaults to 465 for SSL, 587 for STARTTLS.
  • smtp_user (Optional[str]) – The SMTP user for authenticatication in the SMTP server. Defaults to the value of email_from.
  • name – (Optional[str]): The name of the sensor. Defaults to “email_on_job_failure”.
  • webserver_base_url – (Optional[str]): The base url of your dagster-webserver instance. Specify this to allow
  • monitored_jobs (Optional[List[Union[JobDefinitionJobDefinition, GraphDefinitionGraphDefinition, JobDefinitionJobDefinition, RepositorySelectorRepositorySelector, JobSelectorJobSelector]]]) – The jobs that will be monitored by this failure sensor. Defaults to None, which means the alert will
  • monitor_all_code_locations (bool) – If set to True, the sensor will monitor all runs in the
  • job_selection (Optional[List[Union[JobDefinitionJobDefinition, GraphDefinitionGraphDefinition, JobDefinitionJobDefinition, RepositorySelectorRepositorySelector, JobSelectorJobSelector]]]) – deprecatedmonitored_jobs instead.) (deprecated in favor of monitored_jobs) The jobs that will be monitored by this failure
  • default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default
  • monitor_all_repositories (bool) – deprecatedmonitor_all_code_locations instead.) If set to True, the sensor will monitor all runs in the

Examples:

email_on_run_failure = make_email_on_run_failure_sensor(
email_from="no-reply@example.com",
email_password=os.getenv("ALERT_EMAIL_PASSWORD"),
email_to=["xxx@example.com"],
)

@repository
def my_repo():
return [my_job + email_on_run_failure]
def my_message_fn(context: RunFailureSensorContext) -> str:
return (
f"Job \{context.dagster_run.job_name} failed!"
f"Error: \{context.failure_event.message}"
)

email_on_run_failure = make_email_on_run_failure_sensor(
email_from="no-reply@example.com",
email_password=os.getenv("ALERT_EMAIL_PASSWORD"),
email_to=["xxx@example.com"],
email_body_fn=my_message_fn,
email_subject_fn=lambda _: "Dagster Alert",
webserver_base_url="http://mycoolsite.com",
)
class dagster._utils.forked_pdb.ForkedPdb

A pdb subclass that may be used from a forked multiprocessing child.

Examples:

from dagster._utils.forked_pdb import ForkedPdb

@solid
def complex_solid(_):
# some complicated stuff

ForkedPdb().set_trace()

# some other complicated stuff

You can initiate pipeline execution via the webserver and use the pdb debugger to examine/step through execution at the breakpoint.