Skip to main content

Microsoft Teams (dagster-msteams)

Resource

dagster_msteams.MSTeamsResource ResourceDefinition

This resource is for connecting to Microsoft Teams.

Provides a dagster_msteams.TeamsClient which can be used to interface with the MS Teams API.

By configuring this resource, you can post messages to MS Teams from any Dagster op, asset, schedule, or sensor:

Examples:

import os

from dagster import op, job, Definitions, EnvVar
from dagster_msteams import Card, MSTeamsResource


@op
def teams_op(msteams: MSTeamsResource):
card = Card()
card.add_attachment(text_message="Hello There !!")
msteams.get_client().post_message(payload=card.payload)


@job
def teams_job():
teams_op()

defs = Definitions(
jobs=[teams_job],
resources=\{
"msteams": MSTeamsResource(
hook_url=EnvVar("TEAMS_WEBHOOK_URL")
)
}
)

Sensors

dagster_msteams.teams_on_failure HookDefinition

Create a hook on step failure events that will message the given MS Teams webhook URL.

Parameters:

  • message_fn (Optional(Callable[[HookContextHookContext], str])) – Function which takes in the
  • dagit_base_urldeprecatedwebserver_base_url instead.) (Optional[str]): The base url of your webserver instance. Specify this
  • webserver_base_url – (Optional[str]): The base url of your webserver instance. Specify this

Examples:

@teams_on_failure(webserver_base_url="http://localhost:3000")
@job(...)
def my_job():
pass
def my_message_fn(context: HookContext) -> str:
return f"Op \{context.op.name} failed!"

@op
def a_op(context):
pass

@job(...)
def my_job():
a_op.with_hooks(hook_defs=\{teams_on_failure("#foo", my_message_fn)})
dagster_msteams.teams_on_success HookDefinition

Create a hook on step success events that will message the given MS Teams webhook URL.

Parameters:

  • message_fn (Optional(Callable[[HookContextHookContext], str])) – Function which takes in the
  • dagit_base_urldeprecatedwebserver_base_url instead.) (Optional[str]): The base url of your webserver instance. Specify this

Examples:

@teams_on_success(webserver_base_url="http://localhost:3000")
@job(...)
def my_job():
pass
def my_message_fn(context: HookContext) -> str:
return f"Op \{context.op.name} failed!"

@op
def a_op(context):
pass

@job(...)
def my_job():
a_op.with_hooks(hook_defs=\{teams_on_success("#foo", my_message_fn)})
dagster_msteams.make_teams_on_run_failure_sensor

Create a sensor on run failures that will message the given MS Teams webhook URL.

Parameters:

  • hook_url (str) – MS Teams incoming webhook URL.
  • message_fn (Optional(Callable[[RunFailureSensorContextRunFailureSensorContext], str])) – Function which
  • http_proxy – (Optional[str]): Proxy for requests using http protocol.
  • https_proxy – (Optional[str]): Proxy for requests using https protocol.
  • timeout – (Optional[float]): Connection timeout in seconds. Defaults to 60.
  • verify – (Optional[bool]): Whether to verify the servers TLS certificate.
  • name – (Optional[str]): The name of the sensor. Defaults to “teams_on_run_failure”.
  • dagit_base_urldeprecatedwebserver_base_url instead.) (Optional[str]): The base url of your webserver instance. Specify this to allow
  • default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default
  • monitored_jobs (Optional[List[Union[JobDefinitionJobDefinition, GraphDefinitionGraphDefinition, UnresolvedAssetJobDefinition, RepositorySelectorRepositorySelector, JobSelectorJobSelector]]]) – Jobs in the current repository that will be monitored by this sensor. Defaults to None,
  • monitor_all_code_locations (bool) – If set to True, the sensor will monitor all runs in the
  • webserver_base_url – (Optional[str]): The base url of your webserver instance. Specify this to allow
  • monitor_all_repositories (bool) – deprecatedmonitor_all_code_locations instead.) If set to True, the sensor will monitor all runs in the

Examples:

teams_on_run_failure = make_teams_on_run_failure_sensor(
hook_url=os.getenv("TEAMS_WEBHOOK_URL")
)

@repository
def my_repo():
return [my_job + teams_on_run_failure]
def my_message_fn(context: RunFailureSensorContext) -> str:
return "Job \{job_name} failed! Error: \{error}".format(
job_name=context.dagster_run.job_name,
error=context.failure_event.message,
)

teams_on_run_failure = make_teams_on_run_failure_sensor(
hook_url=os.getenv("TEAMS_WEBHOOK_URL"),
message_fn=my_message_fn,
webserver_base_url="http://localhost:3000",
)

Legacy

dagster_msteams.msteams_resource ResourceDefinition

This resource is for connecting to Microsoft Teams.

The resource object is a dagster_msteams.TeamsClient.

By configuring this resource, you can post messages to MS Teams from any Dagster solid:

Examples:

import os

from dagster import op, job
from dagster_msteams import Card, msteams_resource


@op(required_resource_keys=\{"msteams"})
def teams_op(context):
card = Card()
card.add_attachment(text_message="Hello There !!")
context.resources.msteams.post_message(payload=card.payload)


@job(resource_defs=\{"msteams": msteams_resource})
def teams_job():
teams_op()


teams_job.execute_in_process(
\{"resources": \{"msteams": \{"config": \{"hook_url": os.getenv("TEAMS_WEBHOOK_URL")}}}}
)