Ensign for Data Engineers #
We love data engineers — it’s how a lot of us got our starts in tech. One of the main reasons we made Ensign is to make it easier for you to put your data in motion. We know that a clumsy ETL routine can quickly turn a data lake into a data landfill.
In this example we’ll see how to move data around with Ensign. We’ll be using Watermill-Ensign and Watermill to call a Weather API and insert weather data into a PostgreSQL database. If you haven’t used Watermill yet, you’re in for a treat! Check out this introductory post that covers the basics.
Just want the code? Check out this repo for the full example.
ETL Design #
The architecture for this weather ingestor is composed of three components:
- An Ensign publisher that calls the Weather API and publishes the weather data to a topic.
- An Ensign subscriber that listens on this topic and runs a check against the PostgreSQL database to see if this is a new record. The weather data doesn’t change that often, so it is possible to receive a duplicate record. If the record is new, we’ll put the record into a second topic.
- A sql publisher that inserts the records from the second topic into the database.
The Ensign subscriber and the sql publisher are chained together using the router
and handler
functionality described in this post.
Prerequisites #
This tutorial assumes that the following steps have been completed:
- You have installed watermil, ensign, watermill-ensign, and watermill-sql.
- You have received an Ensign Client ID and Client Secret. Refer to the getting started guide on how to obtain the key.
- You have received an API key from the Weather API website (it’s free!).
- You have Docker installed and running on your machine.
Project Setup #
First, let’s create a root directory weather_data
called for the application.
mkdir weather_data
We will then create two subfolders, one for the component that calls the Weather API to get the latest weather data and the other for the component that receives the data and inserts it into the database.
cd weather_data
mkdir producer
mkdir consumer
Create the Ensign Publisher #
Creating a publisher is very straightforward. You will need to have environment variables set up for the Ensign Client ID and Client Secret from your API Key. (Need a new key?)
publisher, err := ensign.NewPublisher(
ensign.PublisherConfig{
EnsignConfig: &ensign.Options{
ClientID: os.Getenv("ENSIGN_CLIENT_ID"),
ClientSecret: os.Getenv("ENSIGN_CLIENT_SECRET"),
},
Marshaler: ensign.EventMarshaler{},
},
logger,
)
Call the Weather API #
Before we call the Weather API, we need to create the following structs:
First, a high level struct to represent the updates that come back from the Weather API.
type Response struct {
Current Current `json:"current,omitempty"`
}
Next, a more detailed struct to help us parse all of the components of the Weather API’s response. Some of this will depend on how much detail you need to ingest for your downstream data users (will the data scientists on your team complain if you forget to ingest the full text description provided by the response?)
type Current struct {
LastUpdated string `json:"last_updated,omitempty"`
TempF float64 `json:"temp_f,omitempty"`
Condition *CurrentCondition `json:"condition,omitempty"`
WindMph float64 `json:"wind_mph,omitempty"`
WindDir string `json:"wind_dir,omitempty"`
PrecipIn float64 `json:"precip_in,omitempty"`
Humidity int32 `json:"humidity,omitempty"`
FeelslikeF float64 `json:"feelslike_f,omitempty"`
VisMiles float64 `json:"vis_miles,omitempty"`
}
type CurrentCondition struct {
Text string `json:"text,omitempty"`
}
Finally, a struct to represent whatever structure makes the most sense for the weather data in your organization (e.g. with your company’s database schemas or use cases in mind):
type ApiWeatherInfo struct {
LastUpdated string
Temperature float64
FeelsLike float64
Humidity int32
Condition string
WindMph float64
WindDirection string
Visibility float64
Precipitation float64
}
Here is the code to create the request
object:
req, err := http.NewRequest("GET", "http://api.weatherapi.com/v1/current.json?", nil)
Next, we will define the query parameters and add it to req
. Note that you will need to create an environment variable called WAPIKEY
that will be set to the API key you received from the Weather API.
q := req.URL.Query()
q.Add("key", os.Getenv("WAPIKEY"))
q.Add("q", "Washington DC")
req.URL.RawQuery = q.Encode()
Let’s create the http client to call the Weather API, parse the response
object, and create an ApiWeatherInfo
object.
// create the client object
client := &http.Client{}
// retrieve the response
resp, err := client.Do(req)
// read the body of the response
body, _ := ioutil.ReadAll(resp.Body)
// unmarshal the body into a Response object
err = json.Unmarshal(body, &response)
// Convert the Response object into a ApiWeatherInfo object
current := response.Current
currentWeatherInfo := ApiWeatherInfo{
LastUpdated: current.LastUpdated,
Temperature: current.TempF,
FeelsLike: current.FeelslikeF,
Humidity: current.Humidity,
Condition: current.Condition.Text,
WindMph: current.WindMph,
WindDirection: current.WindDir,
Visibility: current.VisMiles,
Precipitation: current.PrecipIn,
}
Here is the complete function with some additional error handling.
Publish the Data to a Topic #
We’ll create a helper method publishWeatherData
that takes in a publisher and a channel that is used as a signal to stop publishing. First, create a ticker to call the Weather API every 5 seconds. Next, we will call the GetCurrentWeather
function that we constructed previously to retrieve weather data, serialize it, construct a Watermill message, and publish the message to the current_weather
topic.
func publishWeatherData(publisher message.Publisher, closeCh chan struct{}) {
//weather doesn't change that often - call the Weather API every 5 minutes
ticker := time.NewTicker(5 * time.Minute)
for {
select {
//if a signal has been sent through closeCh, publisher will stop publishing
case <-closeCh:
ticker.Stop()
return
case <-ticker.C:
}
//call the API to get the weather data
weatherData, err := GetCurrentWeather()
if err != nil {
fmt.Println("Issue retrieving weather data: ", err)
continue
}
//serialize the weather data
payload, err := json.Marshal(weatherData)
if err != nil {
fmt.Println("Could not marshall weatherData: ", err)
continue
}
//construct a watermill message
msg := message.NewMessage(watermill.NewUUID(), payload)
// Use a middleware to set the correlation ID, it's useful for debugging
middleware.SetCorrelationID(watermill.NewShortUUID(), msg)
//publish the message to the "current weather" topic
err = publisher.Publish("current_weather", msg)
if err != nil {
fmt.Println("cannot publish message: ", err)
continue
}
}
}
Start the First Stream #
Next we want to create a long-running process that will continue pinging the Weather API, parsing the response, and publishing it for downstream consumption.
In our main()
function, we will first create a logger using watermill.NewStdLogger
. Then we create the publisher and create the closeCh
channel that will be used to send a signal to the publisher to stop publishing. We then pass the publisher
and the closeCh
to the publishWeatherData
function and run it in a goroutine.
We then create another channel that listens for a os.Interrupt
signal and will close when it receives the signal. If it receives the signal (e.g. because we want to stop the process, or if something goes wrong), it closes and the code moves on to close the closeCh
channel and that notifies the publisher to stop publishing.
func main() {
// add a logger
logger := watermill.NewStdLogger(false, false)
logger.Info("Starting the producer", watermill.LogFields{})
//create the publisher
publisher, err := ensign.NewPublisher(
ensign.PublisherConfig{
Marshaler: ensign.EventMarshaler{},
},
logger,
)
if err != nil {
panic(err)
}
defer publisher.Close()
//used to signal the publisher to stop publishing
closeCh := make(chan struct{})
go publishWeatherData(publisher, closeCh)
// wait for SIGINT - this will end processing
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
// signal for the publisher to stop publishing
close(closeCh)
logger.Info("All messages published", nil)
}
Create the Ensign Subscriber #
Next we’ll create a subscriber in much the same way as we created our publisher. Our subscriber will be in charge of listing for incoming weather messages from the publisher, and checking to see if the incoming data is actually new.
subscriber, err := ensign.NewSubscriber(
ensign.SubscriberConfig{
EnsignConfig: &ensign.Options{
ClientID: os.Getenv("ENSIGN_CLIENT_ID"),
ClientSecret: os.Getenv("ENSIGN_CLIENT_SECRET"),
},
Unmarshaler: ensign.EventMarshaler{},
},
logger,
)
Connect to the Database #
Let’s write a quick function createPostgresConnection
that will allow us to connect to our database. You will need to create the following environment variables to connect to your local PostgreSQL database: POSTGRES_USER
, POSTGRES_PASSWORD
, and POSTGRES_DB
. You can use any values of your choosing for these variables. For convenience, in this example, we’ll use a docker PostgreSQL container. The function is below.
func createPostgresConnection() *stdSQL.DB {
host := "weather_db"
port := 5432
user := os.Getenv("POSTGRES_USER")
password := os.Getenv("POSTGRES_PASSWORD")
dbname := os.Getenv("POSTGRES_DB")
dsn := fmt.Sprintf(
"host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", host, port, user, password, dbname
)
db, err := stdSQL.Open("postgres", dsn)
if err != nil {
panic(err)
}
err = db.Ping()
if err != nil {
panic(err)
}
createQuery := `CREATE TABLE IF NOT EXISTS weather_info (
id SERIAL NOT NULL PRIMARY KEY,
last_updated VARCHAR(50) NOT NULL,
temperature DECIMAL,
feels_like DECIMAL,
humidity INTEGER,
condition VARCHAR(36),
wind_mph DECIMAL,
wind_direction VARCHAR(36),
visibility DECIMAL,
precipitation DECIMAL,
created_at VARCHAR(100) NOT NULL
);`
_, err = db.ExecContext(context.Background(), createQuery)
if err != nil {
panic(err)
}
log.Println("created table weather_info")
return db
}
Note that above, the host is defined as weather_db
and will match the name of the container. More on that later in the tutorial!
We will also define a WeatherInfo
struct that will contain the fields of the weather_info
table. It is similar to the ApiWeatherInfo
struct with the exception of the CreatedAt
, which is an additional field in the table.
type WeatherInfo struct {
LastUpdated string
Temperature float64
FeelsLike float64
Humidity int32
Condition string
WindMph float64
WindDirection string
Visibility float64
Precipitation float64
CreatedAt string
}
Does the Record Exist? #
Next, we’ll create a function that will check the database to see if the record already exists there. Before we create the function, we need to create a dbHandler
struct which implements the stdSQL.DB
interface that will be used to query the database.
type dbHandler struct {
db *stdSQL.DB
}
We then write our checkRecordExists
function, which will get executed when a message arrives on the current_info
topic.
The first step is to unmarshal the message into a ApiWeatherInfo
object. Next we will execute a query to see if a record with the LastUpdated
value exists and if it does not, we will create a new WeatherInfo
object and add the current timestamp for the CreatedAt
field. We will then marshal this object, create, and return a new watermill message which will get published to the weather_info
topic. If LastUpdated
already exists, we simply note that the record exists and return nil
.
func (d dbHandler) checkRecordExists(msg *message.Message) ([]*message.Message, error) {
weatherInfo := ApiWeatherInfo{}
err := json.Unmarshal(msg.Payload, &weatherInfo)
if err != nil {
return nil, err
}
log.Printf("received weather info: %+v", weatherInfo)
var count int
query := "SELECT count(*) FROM weather_info WHERE last_updated = $1"
err = d.db.QueryRow(query, weatherInfo.LastUpdated).Scan(&count)
switch {
case err != nil:
return nil, err
default:
if count > 0 {
log.Println("Found existing record in the database")
// not throwing an error here because this is not an issue
return nil, nil
}
newWeatherInfo := WeatherInfo{
LastUpdated: weatherInfo.LastUpdated,
Temperature: weatherInfo.Temperature,
FeelsLike: weatherInfo.FeelsLike,
Humidity: weatherInfo.Humidity,
Condition: weatherInfo.Condition,
WindMph: weatherInfo.WindMph,
WindDirection: weatherInfo.WindDirection,
Visibility: weatherInfo.Visibility,
Precipitation: weatherInfo.Precipitation,
CreatedAt: time.Now().String(),
}
log.Println(newWeatherInfo)
log.Println(len(newWeatherInfo.CreatedAt))
var payload bytes.Buffer
encoder := gob.NewEncoder(&payload)
err := encoder.Encode(newWeatherInfo)
if err != nil {
panic(err)
}
newMessage := message.NewMessage(watermill.NewULID(), payload.Bytes())
return []*message.Message{newMessage}, nil
}
}
Prepping the Database #
Now we need to create a SQL publisher. A SQL publisher is a Watermill implementation of a SQL based pub/sub mechanism whereby you can use publishers to insert or upsert records and you can use subscribers to retrieve records. For more details, refer to this post on the Watermill site.
The SQL publisher is created as follows. Note that we are going to set AutoInitializeSchema
to false
because we’ve already created the table. The postgresSchemaAdapter
is an extension of Watermill’s SchemaAdapter
which provides the schema-dependent queries and arguments.
pub, err := sql.NewPublisher(
db,
sql.PublisherConfig{
SchemaAdapter: postgresSchemaAdapter{},
AutoInitializeSchema: false,
},
logger,
)
Then we’ll create a SchemaInitializingQueries
function to make a table based on the topic name, but since we’ve already created the table, we’ll set this parameter to false
and simply return an empty list.
func (p postgresSchemaAdapter) SchemaInitializingQueries(topic string) []string {
return []string{}
}
Inserting New Data #
Next we will create an InsertQuery
function that unmarshals the list of messages and creates an insertQuery
sql statement that will be executed. The topic
name is the same as the table name and it is used in the insertQuery
sql statement. It then extracts the fields of the WeatherInfo
object and puts them in the list of args
. It then returns the sql statement and the arguments.
func (p postgresSchemaAdapter) InsertQuery(topic string, msgs message.Messages) (string, []interface{}, error) {
insertQuery := fmt.Sprintf(
`INSERT INTO %s (last_updated, temperature, feels_like, humidity, condition, wind_mph, wind_direction, visibility, precipitation, created_at) VALUES %s`,
topic,
strings.TrimRight(strings.Repeat(`($1,$2,$3,$4,$5,$6,$7,$8,$9,$10),`, len(msgs)), ","),
)
var args []interface{}
for _, msg := range msgs {
weatherInfo := WeatherInfo{}
decoder := gob.NewDecoder(bytes.NewBuffer(msg.Payload))
err := decoder.Decode(&weatherInfo)
if err != nil {
return "", nil, err
}
args = append(
args,
weatherInfo.LastUpdated,
weatherInfo.Temperature,
weatherInfo.FeelsLike,
weatherInfo.Humidity,
weatherInfo.Condition,
weatherInfo.WindMph,
weatherInfo.WindDirection,
weatherInfo.Visibility,
weatherInfo.Precipitation,
weatherInfo.CreatedAt,
)
}
return insertQuery, args, nil
}
Here’s the part where you’d probably set up a subscriber stream for those data scientists who are teaching GPTChat to be more conversant about the weather (or something like that). You’ll want to create custom SelectQuery
and UnmarshalMessage
functions, but since we are not using a SQL subscriber for this tutorial, we’ll skip that for now.
func (p postgresSchemaAdapter) SelectQuery(topic string, consumerGroup string, offsetsAdapter sql.OffsetsAdapter) (string, []interface{}) {
// No need to implement this method, as PostgreSQL subscriber is not used in this example.
return "", nil
}
func (p postgresSchemaAdapter) UnmarshalMessage(row *stdSQL.Row) (offset int, msg *message.Message, err error) {
return 0, nil, errors.New("not implemented")
}
Start the Second Stream #
Now we need to put all those last pieces together so that we’re storing data back to PostgreSQL.
Here we will use the router functionality described here. We could have had the Ensign subscriber do the entire work of checking the database and inserting new records and not create a publisher at all. However, by decoupling the checking and the inserting functions, we can enable Ensign to scale up and down them independently, which could save some serious $$$ depending on how much throughput you’re dealing with.
Here, we instantiate a new router, add a SignalsHandler
plugin that will shut down the router if it receives a SIGTERM
message. We also add a Recoverer
middleware which handles any panics sent by the handler.
Next, we will create the PostgreSQL connection and create the weather_info
table. We then create the ensign subscriber and the sql publisher.
We will then add a handler to the router called weather_info_inserter
, pass in the subscriber topic, subscriber, publisher, publisher topic, and the handlerFunc
that will be executed when a new message appears on the subscriber topic.
Finally, we will run the router.
func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
//SignalsHandler will gracefully shutdown Router when SIGTERM is received
router.AddPlugin(plugin.SignalsHandler)
//The Recoverer middleware handles panics from handlers
router.AddMiddleware(middleware.Recoverer)
postgresDB := createPostgresConnection()
log.Println("added postgres connection and created weather_info table")
subscriber := createSubscriber(logger)
publisher := createPublisher(postgresDB)
router.AddHandler(
"weather_info_inserter",
weather_api_topic, //subscriber topic
subscriber,
weather_insert_topic, //publisher topic
publisher,
dbHandler{db: postgresDB}.checkRecordExists,
)
if err = router.Run(context.Background()); err != nil {
panic(err)
}
}
Composing our Docker Container #
Almost done! In this section we’ll create a docker-compose file in the weather_data
directory to run our application.
The docker-compose file contains three services. The first service is the producer
which requires the WAPIKEY
environment variable used to call the Weather API. It will also need the ENSIGN_CLIENT_ID
, and ENSIGN_CLIENT_SECRET
to use Ensign. The second service is the consumer
which needs the POSTGRES_USER
, POSTGRES_DB
, and POSTGRES_PASSWORD
environment variables in order to connect to the database and it will also need ENSIGN_CLIENT_ID
, and ENSIGN_CLIENT_SECRET
to use Ensign. The third service is the postgres
database, which is a docker image that will also require the same environment variables as the consumer. You will notice that the container name is weather_db
, which is the host name that the consumer application uses to connect to the database and it has also got the same Postgres environment variables as the consumer.
version: '3'
services:
producer:
image: golang:1.19
restart: unless-stopped
volumes:
- .:/app
- $GOPATH/pkg/mod:/go/pkg/mod
working_dir: /app/producer/
command: go run main.go
environment:
WAPIKEY: ${WAPIKEY}
ENSIGN_CLIENT_ID: ${ENSIGN_CLIENT_ID}
ENSIGN_CLIENT_SECRET: ${ENSIGN_CLIENT_SECRET}
consumer:
image: golang:1.19
restart: unless-stopped
depends_on:
- postgres
volumes:
- .:/app
- $GOPATH/pkg/mod:/go/pkg/mod
working_dir: /app/consumer/
command: go run main.go db.go
environment:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_DB: ${POSTGRES_DB}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
ENSIGN_CLIENT_ID: ${ENSIGN_CLIENT_ID}
ENSIGN_CLIENT_SECRET: ${ENSIGN_CLIENT_SECRET}
postgres:
image: postgres:12
restart: unless-stopped
ports:
- 5432:5432
container_name: weather_db
environment:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_DB: ${POSTGRES_DB}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
Let’s Gooooooooo #
We made it to the end! Once you have all of the code in place, run the following commands on the terminal in the producer
and consumer
directories:
go mod init
go mod tidy
This will create the go.mod
and the go.sum
files in both directories. Next, move up to the weather_data
directory and run the following command:
docker-compose up
You will see all the applications running and messages printing to the screen.
On a separate terminal window, run the following command to view the contents of the weather_info
table:
docker-compose exec weather_db psql -U $POSTGRES_USER -d $POSTGRES_DB -c 'select * from weather_info;'
Next Steps #
Hopefully running this example gives you a general idea on how to build an event-driven application using Watermill and Ensign. You can modify this example slightly and have the Ensign consumer do the entire work of checking and inserting new weather records into the database (replace the handler with a NoPublisherHandler
), but remember that loose coupling is the name of the game with event driven architectures! You can also challenge yourself by creating a consumer that takes the records produced by the publisher and updates a front end application with the latest weather data…
Let us know (info@rotational.io) what you end up making with Ensign!