Skip to main content

Slack (dagster-slack)

This library provides an integration with Slack, to support posting messages in your company’s Slack workspace.

Presently, it provides a thin wrapper on the Slack client API chat.postMessage.

To use this integration, you’ll first need to create a Slack App for it.

  1. Create App: Go to https://api.slack.com/apps and click “Create New App”:

  2. Install App: After creating an app, on the left-hand side of the app configuration, click “Bot Users”, and then create a bot user. Then, click “Install App” on the left hand side, and finally “Install App to Workspace”.

  3. Bot Token: Once finished, this will create a new bot token for your bot/workspace:

Copy this bot token and put it somewhere safe; see Safely Storing Credentials for more on this topic.

dagster_slack.SlackResource ResourceDefinition

This resource is for connecting to Slack.

By configuring this Slack resource, you can post messages to Slack from any Dagster op, asset, schedule or sensor.

Examples:

import os

from dagster import EnvVar, job, op
from dagster_slack import SlackResource


@op
def slack_op(slack: SlackResource):
slack.get_client().chat_postMessage(channel='#noise', text=':wave: hey there!')

@job
def slack_job():
slack_op()

defs = Definitions(
jobs=[slack_job],
resources=\{
"slack": SlackResource(token=EnvVar("MY_SLACK_TOKEN")),
},
)
dagster_slack.make_slack_on_run_failure_sensor

Create a sensor on job failures that will message the given Slack channel.

Parameters:

  • channel (str) – The channel to send the message to (e.g. “#my_channel”)
  • slack_token (str) – The slack token.
  • text_fn (Optional(Callable[[RunFailureSensorContextRunFailureSensorContext], str])) – Function which
  • blocks_fn (Callable[[RunFailureSensorContextRunFailureSensorContext], List[Dict]]) – Function which takes in
  • name – (Optional[str]): The name of the sensor. Defaults to “slack_on_run_failure”.
  • dagit_base_urldeprecatedwebserver_base_url instead.) (Optional[str]): The base url of your Dagit instance. Specify this to allow
  • minimum_interval_seconds – (Optional[int]): The minimum number of seconds that will elapse
  • monitored_jobs (Optional[List[Union[JobDefinitionJobDefinition, GraphDefinitionGraphDefinition, RepositorySelectorRepositorySelector, JobSelectorJobSelector, CodeLocationSensor]]]) – The jobs in the
  • job_selection (Optional[List[Union[JobDefinitionJobDefinition, GraphDefinitionGraphDefinition, RepositorySelectorRepositorySelector, JobSelectorJobSelector, CodeLocationSensor]]]) – deprecatedmonitored_jobs instead.) (deprecated in favor of monitored_jobs)
  • monitor_all_code_locations (bool) – If set to True, the sensor will monitor all runs in the
  • default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default
  • webserver_base_url – (Optional[str]): The base url of your webserver instance. Specify this to allow
  • monitor_all_repositories (bool) – deprecatedmonitor_all_code_locations instead.) If set to True, the sensor will monitor all runs in the

Examples:

slack_on_run_failure = make_slack_on_run_failure_sensor(
"#my_channel",
os.getenv("MY_SLACK_TOKEN")
)

@repository
def my_repo():
return [my_job + slack_on_run_failure]
def my_message_fn(context: RunFailureSensorContext) -> str:
return (
f"Job \{context.dagster_run.job_name} failed!"
f"Error: \{context.failure_event.message}"
)

slack_on_run_failure = make_slack_on_run_failure_sensor(
channel="#my_channel",
slack_token=os.getenv("MY_SLACK_TOKEN"),
text_fn=my_message_fn,
webserver_base_url="http://mycoolsite.com",
)
dagster_slack.slack_on_failure HookDefinition

Create a hook on step failure events that will message the given Slack channel.

Parameters:

  • channel (str) – The channel to send the message to (e.g. “#my_channel”)
  • message_fn (Optional(Callable[[HookContextHookContext], str])) – Function which takes in the HookContext
  • dagit_base_urldeprecatedwebserver_base_url instead.) (Optional[str]): The base url of your webserver instance. Specify this to allow
  • webserver_base_url – (Optional[str]): The base url of your webserver instance. Specify this to allow

Examples:

@slack_on_failure("#foo", webserver_base_url="http://localhost:3000")
@job(...)
def my_job():
pass
def my_message_fn(context: HookContext) -> str:
return f"Op \{context.op} failed!"

@op
def an_op(context):
pass

@job(...)
def my_job():
an_op.with_hooks(hook_defs=\{slack_on_failure("#foo", my_message_fn)})
dagster_slack.slack_on_success HookDefinition

Create a hook on step success events that will message the given Slack channel.

Parameters:

  • channel (str) – The channel to send the message to (e.g. “#my_channel”)
  • message_fn (Optional(Callable[[HookContextHookContext], str])) – Function which takes in the HookContext
  • dagit_base_urldeprecatedwebserver_base_url instead.) (Optional[str]): The base url of your webserver instance. Specify this to allow
  • webserver_base_url – (Optional[str]): The base url of your webserver instance. Specify this to allow

Examples:

@slack_on_success("#foo", webserver_base_url="http://localhost:3000")
@job(...)
def my_job():
pass
def my_message_fn(context: HookContext) -> str:
return f"Op \{context.op} worked!"

@op
def an_op(context):
pass

@job(...)
def my_job():
an_op.with_hooks(hook_defs=\{slack_on_success("#foo", my_message_fn)})

Legacy

dagster_slack.slack_resource ResourceDefinition

This resource is for connecting to Slack.

The resource object is a slack_sdk.WebClient.

By configuring this Slack resource, you can post messages to Slack from any Dagster op, asset, schedule or sensor.

Examples:

import os

from dagster import job, op
from dagster_slack import slack_resource


@op(required_resource_keys=\{'slack'})
def slack_op(context):
context.resources.slack.chat_postMessage(channel='#noise', text=':wave: hey there!')

@job(resource_defs=\{'slack': slack_resource})
def slack_job():
slack_op()

slack_job.execute_in_process(
run_config=\{'resources': \{'slack': \{'config': \{'token': os.getenv('SLACK_TOKEN')}}}}
)