Connecting to APIs
When building a data pipeline, you'll likely need to connect to several external APIs, each with its own specific configuration and behavior. This guide demonstrates how to standardize your API connections and customize their configuration using Dagster resources.
Prerequisites
To run the example code in this article, you will need to install the requests
library:
pip install requests
Step 1: Write a resource that connects to an API
This example fetches the sunrise time for a given location from a REST API.
Using ConfigurableResource
, define a Dagster resource with a method that returns the sunrise time for a location. In the first version of this resource, the location is hard-coded to San Francisco International Airport.
import requests
import dagster as dg
# Use ConfigurableResource to define the resource
class SunResource(dg.ConfigurableResource):
@property
def query_string(self) -> str:
latittude = "37.615223"
longitude = "-122.389977"
time_zone = "America/Los_Angeles"
return f"https://api.sunrise-sunset.org/json?lat={latittude}&lng={longitude}&date=today&tzid={time_zone}"
def sunrise(self) -> str:
data = requests.get(self.query_string, timeout=5).json()
return data["results"]["sunrise"]
Step 2: Use the resource in an asset
To use the resource, provide it as a parameter to an asset and include it in the Definitions
object:
import requests
import dagster as dg
class SunResource(dg.ConfigurableResource):
@property
def query_string(self) -> str:
latittude = "37.615223"
longitude = "-122.389977"
time_zone = "America/Los_Angeles"
return f"https://api.sunrise-sunset.org/json?lat={latittude}&lng={longitude}&date=today&tzid={time_zone}"
def sunrise(self) -> str:
data = requests.get(self.query_string, timeout=5).json()
return data["results"]["sunrise"]
@dg.asset
# Provide the resource to the asset
def sfo_sunrise(context: dg.AssetExecutionContext, sun_resource: SunResource) -> None:
sunrise = sun_resource.sunrise()
context.log.info(f"Sunrise in San Francisco is at {sunrise}.")
# Include the resource in the Definitions object
defs = dg.Definitions(assets=[sfo_sunrise], resources={"sun_resource": SunResource()})
When you materialize sfo_sunrise
, Dagster will provide an initialized SunResource
to the sun_resource
parameter.
Step 3: Configure the resource
Many APIs have configuration you can set to customize your usage. The following example updates the resource with configuration to allow for setting the query location:
import requests
import dagster as dg
class SunResource(dg.ConfigurableResource):
# Define the configuration and
# remove previously hard-coded parameters
latitude: str
longitude: str
time_zone: str
@property
# Update the query string to use the configuration
def query_string(self) -> str:
return f"https://api.sunrise-sunset.org/json?lat={self.latittude}&lng={self.longitude}&date=today&tzid={self.time_zone}"
def sunrise(self) -> str:
data = requests.get(self.query_string, timeout=5).json()
return data["results"]["sunrise"]
@dg.asset
def sfo_sunrise(context: dg.AssetExecutionContext, sun_resource: SunResource) -> None:
sunrise = sun_resource.sunrise()
context.log.info(f"Sunrise in San Francisco is at {sunrise}.")
defs = dg.Definitions(
assets=[sfo_sunrise],
# Define configuration values
resources={
"sun_resource": SunResource(
latitude="37.615223",
longitude="-122.389977",
time_zone="America/Los_Angeles",
)
},
)
The configurable resource can be provided to an asset exactly as before. When the resource is initialized, you can pass values for each of the configuration options.
When you materialize sfo_sunrise
, Dagster will provide a SunResource
initialized with the configuration values to the sun_resource
parameter.
Step 4: Source configuration using environment variables
Resources can also be configured with environment variables. You can use Dagster's built-in EnvVar
class to source configuration values from environment variables at materialization time.
In this example, there's a new home_sunrise
asset. Rather than hard-coding the location of your home, you can set it in environment variables and configure the SunResource
by reading those values:
import requests
import dagster as dg
class SunResource(dg.ConfigurableResource):
latitude: str
longitude: str
time_zone: str
@property
def query_string(self) -> str:
return f"https://api.sunrise-sunset.org/json?lat={self.latitude}&lng={self.longitude}&date=today&tzid={self.time_zone}"
def sunrise(self) -> str:
data = requests.get(self.query_string, timeout=5).json()
return data["results"]["sunrise"]
# Define the home_sunrise asset and use the sun_resource
@dg.asset
def home_sunrise(context: dg.AssetExecutionContext, sun_resource: SunResource) -> None:
sunrise = sun_resource.sunrise()
context.log.info(f"Sunrise at home is at {sunrise}.")
defs = dg.Definitions(
assets=[home_sunrise],
# Update the configuration to use environment variables
resources={
"sun_resource": SunResource(
latitude=dg.EnvVar("HOME_LATITUDE"),
longitude=dg.EnvVar("HOME_LONGITUDE"),
time_zone=dg.EnvVar("HOME_TIMEZONE"),
)
},
)
When you materialize home_sunrise
, Dagster will read the values set for the HOME_LATITUDE
, HOME_LONGITUDE
, and HOME_TIMZONE
environment variables and initialize a SunResource
with those values.
The initialized SunResource
will be provided to the sun_resource
parameter.
You can also fetch environment variables using the os
library. Dagster treats each approach to fetching environment variables differently, such as when they're fetched or how they display in the UI. Refer to the Environment variables guide for more information.