Slack (dagster-slack)
This library provides an integration with Slack, to support posting messages in your company’s Slack workspace.
Presently, it provides a thin wrapper on the Slack client API chat.postMessage.
To use this integration, you’ll first need to create a Slack App for it.
-
Create App: Go to https://api.slack.com/apps and click “Create New App”:
-
Install App: After creating an app, on the left-hand side of the app configuration, click “Bot Users”, and then create a bot user. Then, click “Install App” on the left hand side, and finally “Install App to Workspace”.
-
Bot Token: Once finished, this will create a new bot token for your bot/workspace:
Copy this bot token and put it somewhere safe; see Safely Storing Credentials for more on this topic.
- dagster_slack.SlackResource ResourceDefinition
This resource is for connecting to Slack.
By configuring this Slack resource, you can post messages to Slack from any Dagster op, asset, schedule or sensor.
Examples:
import os
from dagster import EnvVar, job, op
from dagster_slack import SlackResource
@op
def slack_op(slack: SlackResource):
slack.get_client().chat_postMessage(channel='#noise', text=':wave: hey there!')
@job
def slack_job():
slack_op()
defs = Definitions(
jobs=[slack_job],
resources=\{
"slack": SlackResource(token=EnvVar("MY_SLACK_TOKEN")),
},
)
- dagster_slack.make_slack_on_run_failure_sensor
Create a sensor on job failures that will message the given Slack channel.
Parameters:
- channel (str) – The channel to send the message to (e.g. “#my_channel”)
- slack_token (str) – The slack token.
- text_fn (Optional(Callable[[RunFailureSensorContextRunFailureSensorContext], str])) – Function which
- blocks_fn (Callable[[RunFailureSensorContextRunFailureSensorContext], List[Dict]]) – Function which takes in
- name – (Optional[str]): The name of the sensor. Defaults to “slack_on_run_failure”.
- dagit_base_url – deprecatedwebserver_base_url instead.) (Optional[str]): The base url of your Dagit instance. Specify this to allow
- minimum_interval_seconds – (Optional[int]): The minimum number of seconds that will elapse
- monitored_jobs (Optional[List[Union[JobDefinitionJobDefinition, GraphDefinitionGraphDefinition, RepositorySelectorRepositorySelector, JobSelectorJobSelector, CodeLocationSensor]]]) – The jobs in the
- job_selection (Optional[List[Union[JobDefinitionJobDefinition, GraphDefinitionGraphDefinition, RepositorySelectorRepositorySelector, JobSelectorJobSelector, CodeLocationSensor]]]) – deprecatedmonitored_jobs instead.) (deprecated in favor of monitored_jobs)
- monitor_all_code_locations (bool) – If set to True, the sensor will monitor all runs in the
- default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default
- webserver_base_url – (Optional[str]): The base url of your webserver instance. Specify this to allow
- monitor_all_repositories (bool) – deprecatedmonitor_all_code_locations instead.) If set to True, the sensor will monitor all runs in the
Examples:
slack_on_run_failure = make_slack_on_run_failure_sensor(
"#my_channel",
os.getenv("MY_SLACK_TOKEN")
)
@repository
def my_repo():
return [my_job + slack_on_run_failure]def my_message_fn(context: RunFailureSensorContext) -> str:
return (
f"Job \{context.dagster_run.job_name} failed!"
f"Error: \{context.failure_event.message}"
)
slack_on_run_failure = make_slack_on_run_failure_sensor(
channel="#my_channel",
slack_token=os.getenv("MY_SLACK_TOKEN"),
text_fn=my_message_fn,
webserver_base_url="http://mycoolsite.com",
)
- dagster_slack.slack_on_failure HookDefinition
Create a hook on step failure events that will message the given Slack channel.
Parameters:
- channel (str) – The channel to send the message to (e.g. “#my_channel”)
- message_fn (Optional(Callable[[HookContextHookContext], str])) – Function which takes in the HookContext
- dagit_base_url – deprecatedwebserver_base_url instead.) (Optional[str]): The base url of your webserver instance. Specify this to allow
- webserver_base_url – (Optional[str]): The base url of your webserver instance. Specify this to allow
Examples:
@slack_on_failure("#foo", webserver_base_url="http://localhost:3000")
@job(...)
def my_job():
passdef my_message_fn(context: HookContext) -> str:
return f"Op \{context.op} failed!"
@op
def an_op(context):
pass
@job(...)
def my_job():
an_op.with_hooks(hook_defs=\{slack_on_failure("#foo", my_message_fn)})
- dagster_slack.slack_on_success HookDefinition
Create a hook on step success events that will message the given Slack channel.
Parameters:
- channel (str) – The channel to send the message to (e.g. “#my_channel”)
- message_fn (Optional(Callable[[HookContextHookContext], str])) – Function which takes in the HookContext
- dagit_base_url – deprecatedwebserver_base_url instead.) (Optional[str]): The base url of your webserver instance. Specify this to allow
- webserver_base_url – (Optional[str]): The base url of your webserver instance. Specify this to allow
Examples:
@slack_on_success("#foo", webserver_base_url="http://localhost:3000")
@job(...)
def my_job():
passdef my_message_fn(context: HookContext) -> str:
return f"Op \{context.op} worked!"
@op
def an_op(context):
pass
@job(...)
def my_job():
an_op.with_hooks(hook_defs=\{slack_on_success("#foo", my_message_fn)})
Legacy
- dagster_slack.slack_resource ResourceDefinition
This resource is for connecting to Slack.
The resource object is a slack_sdk.WebClient.
By configuring this Slack resource, you can post messages to Slack from any Dagster op, asset, schedule or sensor.
Examples:
import os
from dagster import job, op
from dagster_slack import slack_resource
@op(required_resource_keys=\{'slack'})
def slack_op(context):
context.resources.slack.chat_postMessage(channel='#noise', text=':wave: hey there!')
@job(resource_defs=\{'slack': slack_resource})
def slack_job():
slack_op()
slack_job.execute_in_process(
run_config=\{'resources': \{'slack': \{'config': \{'token': os.getenv('SLACK_TOKEN')}}}}
)