Google Cloud Pub/Sub message schemas is a feature promising to bring some order into the potential chaos that lurks within a highly event driven architecture. In this post we’re going to investigate using Protocol Buffers as Pub/Sub schemas together with Go and Terraform.
Originally written in June 2021, I never got around publishing this post. I’ve had a quick glance at it, and the content seems decent enough, but keep in mind that some things may have changed since then…
Released as a preview on March 01, 2021, some of the message schemas features may still have limited suuport, and may have breaking changes in the future, so consider yourself warned before using it in production systems!
Schemas in general allows you to define a fixed contract between producers and consumers, and Pub/Sub message schemas helps you enforce this contract by not allowing any data to published to a topic that does not follow the schema. At the time of writing we have two alternatives for defining schemas:
While I’m a big fan of Avro, we’re going to use Protocol Buffers, or protobuf for short, as they work really well with Go (both are Google technologies after all).
Basic Schema
Let’s define s super simple schema for a message where we send a greeting to a user.
For simplicity’s sake we’re using an int64
for the user ID (probably not a good idea
for a production system), and a lone string
for the greeting. To make things a bit more
complex we’re also going to include a timestamp:
syntax = "proto3";
import "google/protobuf/timestamp.proto";
message UserGreeting {
int64 user_id = 1;
string greeting = 2;
google.protobuf.Timestamp timestamp = 3;
}
The syntax should be quite familiar for anyone who knows Go, but they are not identical. We start by saying that this schema is defined using the latest version of Protocol Buffers. As there is not native primitive for timestamps, we’re also importing one of Googles well-known types; you can think of them almost like a standard library of sorts.
Save the schema to a file (e.g. greeting.proto
), and then we need to install
the protobuf compiler before we can actually generate the Go code we’re after.
Compiler
In order to compile a protobuf schema into code, you need to install the Protocol Buffer compiler. The best instructions for this can be found on the gRPC website, and if you’re using Homebrew on Mac it’s as simple as a one-liner:
brew install protobuf
If you’re on a Debian-based Linux system, it’s equally simple:
apt install protobuf-compiler
If you’re on any other system, simply follow the instructions on the gRPC website.
Before you can use the compiler to generate Go code, you must also install the Go language bindings:
go install google.golang.org/protobuf/cmd/protoc-gen-go
This will download and install the plugin into your $GOBIN
folder (typically $GOPATH/bin
).
You must also ensure that it is on your $PATH
, otherwise the compiler will not find it.
Once you have installed everything, try it out by running:
protoc --go_out=. greeting.proto
You will see a warning, but never mind that for now, and you should see a new
Go-file named greeting.pb.go
(or similar, depending on what you named your file).
Pub/Sub Schema
Let’s turn our local schema file into something that can be used by Pub/Sub, and let’s use Terraform to do so:
resource "google_pubsub_schema" "greeting" {
name = "greeting"
type = "PROTOCOL_BUFFER"
definition = <<-EOF
syntax = "proto3";
import "google/protobuf/timestamp.proto";
message UserGreeting {
int64 user_id = 1;
string greeting = 2;
google.protobuf.Timestamp timestamp = 3;
}
EOF
}
This resource definition should be fairly straight-forward, we’re simply wrapping our previously defined schema in some terraform code to upload it to Google Cloud. You can find more parameters in the provider documentation.
However, if you try to include this in your own code and apply it, you’ll find that it
will fail with a couple of errors. First, it won’t accept the import
statement, and
as a natural follow-up to that it won’t know what a google.protobuf.Timestamp
is.
This is the first restriction that worth remembering. There are some limitations when defining the Pub/Sub schemas, and you are note able to reuse external definitions.
So, since we cannot import the timestamp type, and protobufs have no built-in primitive
for it, we’ll have to simplify. Two quick and dirty options that come to mind are either to
use a string
and deal with manual formatting validation, or switch to a unix epoch using an int64
.
In this example I’ll go for the latter:
resource "google_pubsub_schema" "greeting" {
name = "greeting"
type = "PROTOCOL_BUFFER"
definition = <<-EOF
syntax = "proto3";
message UserGreeting {
int64 user_id = 1;
string greeting = 2;
int64 timestamp = 3;
}
EOF
}
This resource should be possible to create, altough of course I have left out all of the boilerplate (creating a porject, enabling the Pub/Sub service, and so on).
Pub/Sub Topic
Just defining the schema does not make any difference, we must also use it when creating a topic:
resource "google_pubsub_topic" "greetings" {
name = "greetings"
schema_settings {
schema = google_pubsub_schema.greeting.id
encoding = "BINARY"
}
}
This is really basic stuff, and almost identical to any topic you’ve already created.
The only difference is that we’re referencing the schema using the schema_settings
block (more documentation available here). The cool thing here is that this is really all you need
to do in order to use the schema. If you try to publish a message to this topic, and
the data field does not match the schema, Pub/Sub will refuse to accept the message
and the call will fail. That way it enforces your contract, and makes sure that you
won’t get any malformed data on your topic.
In order to publish a message that matches the schema, we simply use the code that we
generated using the protoc
compiler. But before we go down that route, let’s look
at a way of reusing the schema definition.
Sharing the schema
If you have a mature infrastructure you may already have some kind of schema registry in place, but if you’re looking into this feature for the first time, chances are that you do not. One of the benefits of defining a schema is to be able to share it in a controlled manner, and ensure that you have a single source of truth. A schema registry is one to accomplish this, but simply using a Git repository is also possible, and that is something that fits very well into the Go ecosystem.
For this scenario we’re going to have three differet pieces that we want to fit together:
- A Cloud Function that subscribes to the Pub/Sub topic.
- A tiny CLI application that publishes to the topic.
- The protobuf schema definition that will be shared by the other components.
All components will reside in their own (hypothetical) Git repositories, and we’ll use Github as an upstream repository to tie it all together. Let’s start from the bottom and define the schema library first.
Schema Library
The hypothetical schema library will be hosted on Github at github.com/hedlund/schemas
and we
will initiate it as a Go module so that we can import it into the other applications:
go mod init github.com/hedlund/schema
In this repository, we create a folder called greetings
and in that folder we create a file
called greeting.proto
and in that file we put our schema, but with a little twist:
syntax = "proto3";
option go_package = "github.com/hedlund/schema/greetings";
message UserGreeting {
int64 user_id = 1;
string greeting = 2;
int64 timestamp = 3;
}
As you can see we’ve added an additional directive, go_package
, that will inform the
protoc
Go generator of which package the resulting Go file belongs to. And the package
name itself matches the file structure of the repository, as any Go module would.
This will fix the warning we got when we generated the first file.
When generating the resulting Go file, you want to change the command slightly as well:
protoc --proto_path=greetings --go_out=greetings --go_opt=paths=source_relative greeting.proto
This command is designed to be run from the root of the repository, and instructs protoc
that our schema exists in the greetings
folder and that we want the resulting Go code in the
same folder. The go_opt
flag also makes sure that the generated code follows the source
structure, which also works with nested folders.
The result should be a new file greetings/greeting.gb.go
, which contains everything
you need to read and write Protocol Buffers matching your schema. In this example, I’m keeping
everything simple by having everything in the same folder, and you might want to do something
else, like keeping schemas in one folder structure, and the generated code in another. You
may also be working in a polyglot organisation, so you may want to add more directives for
other languages as well
To make this Go module work properly, we should add the few dependencies we have for the
generated code. You can either add them manually by go get
‘ing both github.com/golang/protobuf
(the Go V1 protobuf API) and google.golang.org/protobuf
(the V2 API).
Or you can be lazy like me and simply run:
go mod tidy
There is one final little touch I want to add to this repository, before we add all files
and push it to Github. In the greetings
folder create a file called outputs.tf
and add:
output "user_greeting" {
value = file("${path.module}/greeting.proto")
}
This is a little hack that will help us when using Terraform.
Cloud Function
The next piece we will implement is the Cloud Function that will listen to the topic. This is also where I would typically add the Terraform code that creates the topic itself, so let’s make a few changes to the Terraform code from earlier:
module "schemas" {
source = "git@github.com:hedlund/schemas.git//greetings?ref=master"
}
resource "google_pubsub_schema" "greeting_v1" {
name = "greeting-v1"
type = "PROTOCOL_BUFFER"
definition = module.schemas.user_greeting
}
resource "google_pubsub_topic" "greetings_v1" {
name = "greetings-v1"
schema_settings {
schema = google_pubsub_schema.greeting_v1.id
encoding = "BINARY"
}
}
There are a number of things going on here. First off, I added a module to import the
schema definition directly from the library as hosted on GitHub. This is why we added
the outputs.tf
file, to make it dead simple to read the schema defition. I’m using SSH to reference
the repository, but you can also use HTTPS if you prefer.
You can also see that I’ve added a v1
suffix to both the schema and the topic. The
reason is that you are not allowed to change a schema for a topic once you have created it.
This makes sense as the schema is the contract for the topic, and breaches of contract obviously should
be avoided. You are also not allowed to change the content of the schema itself, so
adding a simple version suffix will make it cleaner to add new versions in the future.
The only way you can change the schema for a Pub/Sub topic is to delete the topic and create it anew. This also means that you will lose any subscriptions to the original topic, so versioning your topics avoids that whole mess.
I’m not going to go into details about the Cloud Function itself, as it’s a topic we’ve covered before. We create a new repository, initiate it as a new Go module, and finally add our schema repository as a dependency:
go get github.com/hedlund/schemas
The code for our Cloud Function is super-simple, as I’m defining a Pub/Sub triggered function, which handles some of the deserialization for you:
package function
import (
"context"
"log"
"github.com/hedlund/schemas/greetings"
"google.golang.org/protobuf/proto"
)
type PubSubMessage struct {
Data []byte `json:"data"`
}
func ReceiveGreeting(ctx context.Context, m PubSubMessage) error {
var greet greetings.UserGreeting
if err := proto.Unmarshal(m.Data, &greet); err != nil {
return err
}
log.Printf("%d received %s!", greet.UserId, greet.Greeting)
return nil
}
As you can see, unmarshalling a protobuf message is just as easy and working with JSON.
If you prefer using a HTTP triggered function,
it is almost as easy. The only thing you have to remember is that the main content
of the Pub/Sub message is still sent as JSON, and that your protobuf message is
base64 encoded within the data
field in the payload.
Publishing
The final piece of our setup is the publisher, a small CLI application that will
encode and send a message to the Pub/Sub topic. Again, we create a repository, initate it
as a Go module, and install the github.com/hedlund/schemas
dependency. The
application itself can be written as a single file:
package main
import (
"context"
"log"
"time"
"cloud.google.com/go/pubsub"
"github.com/hedlund/schemas/greetings"
"google.golang.org/protobuf/proto"
)
func main() {
// Construct the message
greet := &greetings.UserGreeting{
UserId: 1234,
Greeting: "Hello, world!",
Timestamp: time.Now().Unix(),
}
// Marshal it into a byte slice
data, err := proto.Marshal(greet)
if err != nil {
log.Fatal(err)
}
// Create a Pub/Sub client
client, err := pubsub.NewClient(ctx, "project-id")
if err != nil {
log.Fatal(err)
}
// Publish the message
topic := client.Topic("greetings-v1")
res := topic.Publish(ctx, &pubsub.Message{
Data: data,
})
// Wait for the message to be published
_, err := res.Get(context.Background())
if err != nil {
log.Fatal(err)
}
}
With the comments the code should be pretty self-explanatory. We’re reusing the same library as before to define the message, and use the protobuf library to marshal it into a byte slice (again, very similar to working with JSON). Most of the remaning code is just to get a Pub/Sub client up and going and firing off the message.
Wrapping up!
I hope you’ve learned something about working with schemas and Pub/Sub, and perhaps gotten a few ideas of how easy it is to tie everything together with a few hacks.