Shell (dagster-shell)
The Dagster shell library provides utilities and op factories for executing inline shell scripts or script files.
APIs
- dagster_shell.create_shell_command_op
- deprecated
This API will be removed in version 0.25. Use PipesSubprocessClient instead..
DEPRECATED: Use PipesSubprocessClient instead.
This function is a factory that constructs ops to execute a shell command.
Note that you can only use
shell_command_op
if you know the command you’d like to execute at job construction time. If you’d like to construct shell commands dynamically during job execution and pass them between ops, you should useshell_op
instead.The resulting op can take a single
start
argument that is a Nothing dependency to allow you to run ops before the shell op.Examples:
from dagster import graph
from dagster_shell import create_shell_command_op
@graph
def my_graph():
a = create_shell_command_op('echo "hello, world!"', name="a")
a()@op
def run_before_shell_op():
do_some_work()
@graph
def my_graph():
my_echo_op = create_shell_command_op("echo hello world!", name="echo_op")
my_echo_op(start=run_before_shell_op())Parameters:
- shell_command (str) – The shell command that the constructed op will execute.
- name (str) – The name of the constructed op.
- description (Optional[str]) – Human-readable description of this op.
- required_resource_keys (Optional[Set[str]]) – Set of resource handles required by this op.
- tags (Optional[Dict[str, Any]]) – Arbitrary metadata for the op. Frameworks may
Raises: FailureFailure – Raised when the shell command returns a non-zero exit code.Returns: Returns the constructed op definition.Return type: OpDefinition
- dagster_shell.create_shell_script_op
- deprecated
This API will be removed in version 0.25. Use PipesSubprocessClient instead..
DEPRECATED: Use PipesSubprocessClient instead.
This function is a factory which constructs an op that will execute a shell command read from a script file.
Any kwargs passed to this function will be passed along to the underlying
@op
decorator. However, note that overridingconfig
oroutput_defs
is not supported.You might consider using
@graph
to wrap this op in the cases where you’d like to configure the shell op with different config fields.If no
ins
are passed then the resulting op can take a singlestart
argument that is a Nothing dependency to allow you to run ops before the shell op.Examples:
from dagster import file_relative_path, graph
from dagster_shell import create_shell_script_op
@graph
def my_graph():
a = create_shell_script_op(file_relative_path(__file__, "hello_world.sh"), name="a")
a()@op
def run_before_shell_op():
do_some_work()
@graph
def my_graph():
my_echo_op = create_shell_script_op(file_relative_path(__file__, "hello_world.sh"), name="echo_op")
my_echo_op(start=run_before_shell_op())Parameters:
- shell_script_path (str) – The script file to execute.
- name (Optional[str]) – The name of this op. Defaults to “create_shell_script_op”.
- ins (Optional[Mapping[str, In]]) – Ins for the op. Defaults to
Raises: FailureFailure – Raised when the shell command returns a non-zero exit code.Returns: Returns the constructed op definition.Return type: OpDefinition
- dagster_shell.shell_op
This op executes a shell command it receives as input. This op is suitable for uses where the command to execute is generated dynamically by upstream ops. If you know the command to execute at job construction time, consider
shell_command_op
instead.Parameters:
- shell_command – The shell command to be executed
- config (ShellOpConfig) – A ShellOpConfig object specifying configuration options
Examples:
@op
def create_shell_command():
return "echo hello world!"
@graph
def echo_graph():
shell_op(create_shell_command())
- dagster_shell.execute_shell_command
This function is a utility for executing shell commands from within a Dagster op (or from Python in general). It can be used to execute shell commands on either op input data, or any data generated within a generic python op.
Internally, it executes a shell script specified by the argument
shell_command
. The script will be written to a temporary file first and invoked viasubprocess.Popen(['bash', shell_script_path], ...)
.In the Popen invocation,
stdout=PIPE, stderr=STDOUT
is used, and the combined stdout/stderr output is retrieved.Examples:
from dagster import OpExecutionContext, op
from dagster_shell import execute_shell_command
@op
def my_shell_op(context: OpExecutionContext, data: str):
temp_file = "/tmp/data.txt"
with open(temp_file, "w", encoding="utf-8") as temp_file_writer:
temp_file_writer.write(data)
execute_shell_command(f"cat \{temp_file}", output_logging="STREAM", log=context.log)Parameters:
- shell_command (str) – The shell command to execute
- output_logging (str) – The logging mode to use. Supports STREAM, BUFFER, and NONE.
- log (Union[logging.Logger, DagsterLogManager]) – Any logger which responds to .info()
- cwd (str, optional) – Working directory for the shell command to use. Defaults to the
- env (Dict[str, str], optional) – Environment dictionary to pass to
subprocess.Popen
. - log_shell_command (bool, optional) – Whether to log the shell command before executing it.
Returns: A tuple where the first element is the combined stdout/stderr output of running the shell command and the second element is the return code.Return type: Tuple[str, int]
- dagster_shell.execute_shell_script
Execute a shell script file specified by the argument
shell_script_path
. The script will be invoked viasubprocess.Popen(['bash', shell_script_path], ...)
.In the Popen invocation,
stdout=PIPE, stderr=STDOUT
is used, and the combined stdout/stderr output is retrieved.Examples:
from dagster import OpExecutionContext, op
from dagster_shell import execute_shell_script
@op
def my_shell_op(context: OpExecutionContext, data: str):
temp_file = "/tmp/echo_data.sh"
with open(temp_file, "w", encoding="utf-8") as temp_file_writer:
temp_file_writer.write(f"echo \{data}")
execute_shell_script(temp_file, output_logging="STREAM", log=context.log)Parameters:
- shell_script_path (str) – The shell script to execute.
- output_logging (str) – The logging mode to use. Supports STREAM, BUFFER, and NONE.
- log (Union[logging.Logger, DagsterLogManager]) – Any logger which responds to .info()
- cwd (str, optional) – Working directory for the shell command to use. Defaults to the
- env (Dict[str, str], optional) – Environment dictionary to pass to
subprocess.Popen
. - log_shell_command (bool, optional) – Whether to log the shell command before executing it.
Raises: Exception – When an invalid output_logging is selected. Unreachable from op-based invocation since the config system will check output_logging against the config enum.Returns: A tuple where the first element is the combined stdout/stderr output of running the shell command and the second element is the return code.Return type: Tuple[str, int]