OpenAI (dagster-openai)
The dagster_openai library provides utilities for using OpenAI with Dagster. A good place to start with dagster_openai is the guide.
- dagster_openai.with_usage_metadata
- experimental
This API may break in future versions, even between dot releases.
This wrapper can be used on any endpoint of the openai library <https://github.com/openai/openai-python> to log the OpenAI API usage metadata in the asset metadata.
Examples:
from dagster import (
AssetExecutionContext,
AssetKey,
AssetSelection,
AssetSpec,
Definitions,
EnvVar,
MaterializeResult,
asset,
define_asset_job,
multi_asset,
)
from dagster_openai import OpenAIResource, with_usage_metadata
@asset(compute_kind="OpenAI")
def openai_asset(context: AssetExecutionContext, openai: OpenAIResource):
with openai.get_client(context) as client:
client.fine_tuning.jobs.create = with_usage_metadata(
context=context, output_name="some_output_name", func=client.fine_tuning.jobs.create
)
client.fine_tuning.jobs.create(model="gpt-3.5-turbo", training_file="some_training_file")
openai_asset_job = define_asset_job(name="openai_asset_job", selection="openai_asset")
@multi_asset(
specs=[
AssetSpec("my_asset1"),
AssetSpec("my_asset2"),
]
)
def openai_multi_asset(context: AssetExecutionContext, openai: OpenAIResource):
with openai.get_client(context, asset_key=AssetKey("my_asset1")) as client:
client.chat.completions.create(
model="gpt-3.5-turbo", messages=[\{"role": "user", "content": "Say this is a test"}]
)
# The materialization of `my_asset1` will include both OpenAI usage metadata
# and the metadata added when calling `MaterializeResult`.
return (
MaterializeResult(asset_key="my_asset1", metadata=\{"foo": "bar"}),
MaterializeResult(asset_key="my_asset2", metadata=\{"baz": "qux"}),
)
openai_multi_asset_job = define_asset_job(
name="openai_multi_asset_job", selection=AssetSelection.assets(openai_multi_asset)
)
defs = Definitions(
assets=[openai_asset, openai_multi_asset],
jobs=[openai_asset_job, openai_multi_asset_job],
resources=\{
"openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")),
},
)
- class dagster_openai.OpenAIResource
- experimental
This API may break in future versions, even between dot releases.
This resource is wrapper over the openai library.
By configuring this OpenAI resource, you can interact with OpenAI API and log its usage metadata in the asset metadata.
Examples:
import os
from dagster import AssetExecutionContext, Definitions, EnvVar, asset, define_asset_job
from dagster_openai import OpenAIResource
@asset(compute_kind="OpenAI")
def openai_asset(context: AssetExecutionContext, openai: OpenAIResource):
with openai.get_client(context) as client:
client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[\{"role": "user", "content": "Say this is a test"}]
)
openai_asset_job = define_asset_job(name="openai_asset_job", selection="openai_asset")
defs = Definitions(
assets=[openai_asset],
jobs=[openai_asset_job],
resources=\{
"openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")),
},
)- get_client
Yields an
openai.Client
for interacting with the OpenAI API.By default, in an asset context, the client comes with wrapped endpoints for three API resources, Completions, Embeddings and Chat, allowing to log the API usage metadata in the asset metadata.
Note that the endpoints are not and cannot be wrapped to automatically capture the API usage metadata in an op context.
Parameters: context – The
context
object for computing the op or asset in whichget_client
is called. Examples:from dagster import (
AssetExecutionContext,
Definitions,
EnvVar,
GraphDefinition,
OpExecutionContext,
asset,
define_asset_job,
op,
)
from dagster_openai import OpenAIResource
@op
def openai_op(context: OpExecutionContext, openai: OpenAIResource):
with openai.get_client(context) as client:
client.chat.completions.create(
model="gpt-3.5-turbo", messages=[\{"role": "user", "content": "Say this is a test"}]
)
openai_op_job = GraphDefinition(name="openai_op_job", node_defs=[openai_op]).to_job()
@asset(compute_kind="OpenAI")
def openai_asset(context: AssetExecutionContext, openai: OpenAIResource):
with openai.get_client(context) as client:
client.chat.completions.create(
model="gpt-3.5-turbo", messages=[\{"role": "user", "content": "Say this is a test"}]
)
openai_asset_job = define_asset_job(name="openai_asset_job", selection="openai_asset")
defs = Definitions(
assets=[openai_asset],
jobs=[openai_asset_job, openai_op_job],
resources=\{
"openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")),
},
)
- get_client_for_asset
Yields an
openai.Client
for interacting with the OpenAI.When using this method, the OpenAI API usage metadata is automatically logged in the asset materializations associated with the provided
asset_key
.By default, the client comes with wrapped endpoints for three API resources, Completions, Embeddings and Chat, allowing to log the API usage metadata in the asset metadata.
This method can only be called when working with assets, i.e. the provided
context
must be of typeAssetExecutionContext
.Parameters:
- context – The
context
object for computing the asset in whichget_client
is called. - asset_key – the
asset_key
of the asset for which a materialization should include the metadata.
Examples:
from dagster import (
AssetExecutionContext,
AssetKey,
AssetSpec,
Definitions,
EnvVar,
MaterializeResult,
asset,
define_asset_job,
multi_asset,
)
from dagster_openai import OpenAIResource
@asset(compute_kind="OpenAI")
def openai_asset(context: AssetExecutionContext, openai: OpenAIResource):
with openai.get_client_for_asset(context, context.asset_key) as client:
client.chat.completions.create(
model="gpt-3.5-turbo", messages=[\{"role": "user", "content": "Say this is a test"}]
)
openai_asset_job = define_asset_job(name="openai_asset_job", selection="openai_asset")
@multi_asset(specs=[AssetSpec("my_asset1"), AssetSpec("my_asset2")], compute_kind="OpenAI")
def openai_multi_asset(context: AssetExecutionContext, openai_resource: OpenAIResource):
with openai_resource.get_client_for_asset(context, asset_key=AssetKey("my_asset1")) as client:
client.chat.completions.create(
model="gpt-3.5-turbo", messages=[\{"role": "user", "content": "Say this is a test"}]
)
return (
MaterializeResult(asset_key="my_asset1", metadata=\{"some_key": "some_value1"}),
MaterializeResult(asset_key="my_asset2", metadata=\{"some_key": "some_value2"}),
)
openai_multi_asset_job = define_asset_job(
name="openai_multi_asset_job", selection="openai_multi_asset"
)
defs = Definitions(
assets=[openai_asset, openai_multi_asset],
jobs=[openai_asset_job, openai_multi_asset_job],
resources=\{
"openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")),
},
)- context – The