Google Cloud Pub/Sub message schemas is a feature promising to bring some order into the potential chaos that lurks within a highly event driven architecture. In this post we’re going to investigate using Protocol Buffers as Pub/Sub schemas together with Go and Terraform.

Originally written in June 2021, I never got around publishing this post. I’ve had a quick glance at it, and the content seems decent enough, but keep in mind that some things may have changed since then…

Released as a preview on March 01, 2021, some of the message schemas features may still have limited suuport, and may have breaking changes in the future, so consider yourself warned before using it in production systems!

Schemas in general allows you to define a fixed contract between producers and consumers, and Pub/Sub message schemas helps you enforce this contract by not allowing any data to published to a topic that does not follow the schema. At the time of writing we have two alternatives for defining schemas:

While I’m a big fan of Avro, we’re going to use Protocol Buffers, or protobuf for short, as they work really well with Go (both are Google technologies after all).

Basic Schema

Let’s define s super simple schema for a message where we send a greeting to a user. For simplicity’s sake we’re using an int64 for the user ID (probably not a good idea for a production system), and a lone string for the greeting. To make things a bit more complex we’re also going to include a timestamp:

syntax = "proto3";

import "google/protobuf/timestamp.proto";

message UserGreeting {
  int64  user_id = 1;
  string greeting = 2;
  google.protobuf.Timestamp timestamp = 3;
}

The syntax should be quite familiar for anyone who knows Go, but they are not identical. We start by saying that this schema is defined using the latest version of Protocol Buffers. As there is not native primitive for timestamps, we’re also importing one of Googles well-known types; you can think of them almost like a standard library of sorts.

Save the schema to a file (e.g. greeting.proto), and then we need to install the protobuf compiler before we can actually generate the Go code we’re after.

Compiler

In order to compile a protobuf schema into code, you need to install the Protocol Buffer compiler. The best instructions for this can be found on the gRPC website, and if you’re using Homebrew on Mac it’s as simple as a one-liner:

brew install protobuf

If you’re on a Debian-based Linux system, it’s equally simple:

apt install protobuf-compiler

If you’re on any other system, simply follow the instructions on the gRPC website.

Before you can use the compiler to generate Go code, you must also install the Go language bindings:

go install google.golang.org/protobuf/cmd/protoc-gen-go

This will download and install the plugin into your $GOBIN folder (typically $GOPATH/bin). You must also ensure that it is on your $PATH, otherwise the compiler will not find it. Once you have installed everything, try it out by running:

protoc --go_out=. greeting.proto

You will see a warning, but never mind that for now, and you should see a new Go-file named greeting.pb.go (or similar, depending on what you named your file).

Pub/Sub Schema

Let’s turn our local schema file into something that can be used by Pub/Sub, and let’s use Terraform to do so:

resource "google_pubsub_schema" "greeting" {
  name = "greeting"
  type = "PROTOCOL_BUFFER"
  definition = <<-EOF
    syntax = "proto3";

    import "google/protobuf/timestamp.proto";

    message UserGreeting {
      int64  user_id = 1;
      string greeting = 2;
      google.protobuf.Timestamp timestamp = 3;
    }
  EOF
}

This resource definition should be fairly straight-forward, we’re simply wrapping our previously defined schema in some terraform code to upload it to Google Cloud. You can find more parameters in the provider documentation.

However, if you try to include this in your own code and apply it, you’ll find that it will fail with a couple of errors. First, it won’t accept the import statement, and as a natural follow-up to that it won’t know what a google.protobuf.Timestamp is.

This is the first restriction that worth remembering. There are some limitations when defining the Pub/Sub schemas, and you are note able to reuse external definitions.

So, since we cannot import the timestamp type, and protobufs have no built-in primitive for it, we’ll have to simplify. Two quick and dirty options that come to mind are either to use a string and deal with manual formatting validation, or switch to a unix epoch using an int64. In this example I’ll go for the latter:

resource "google_pubsub_schema" "greeting" {
  name       = "greeting"
  type       = "PROTOCOL_BUFFER"
  definition = <<-EOF
    syntax = "proto3";

    message UserGreeting {
      int64  user_id = 1;
      string greeting = 2;
      int64 timestamp = 3;
    }
  EOF
}

This resource should be possible to create, altough of course I have left out all of the boilerplate (creating a porject, enabling the Pub/Sub service, and so on).

Pub/Sub Topic

Just defining the schema does not make any difference, we must also use it when creating a topic:

resource "google_pubsub_topic" "greetings" {
  name = "greetings"

  schema_settings {
    schema   = google_pubsub_schema.greeting.id
    encoding = "BINARY"
  }
}

This is really basic stuff, and almost identical to any topic you’ve already created. The only difference is that we’re referencing the schema using the schema_settings block (more documentation available here). The cool thing here is that this is really all you need to do in order to use the schema. If you try to publish a message to this topic, and the data field does not match the schema, Pub/Sub will refuse to accept the message and the call will fail. That way it enforces your contract, and makes sure that you won’t get any malformed data on your topic.

In order to publish a message that matches the schema, we simply use the code that we generated using the protoc compiler. But before we go down that route, let’s look at a way of reusing the schema definition.

Sharing the schema

If you have a mature infrastructure you may already have some kind of schema registry in place, but if you’re looking into this feature for the first time, chances are that you do not. One of the benefits of defining a schema is to be able to share it in a controlled manner, and ensure that you have a single source of truth. A schema registry is one to accomplish this, but simply using a Git repository is also possible, and that is something that fits very well into the Go ecosystem.

For this scenario we’re going to have three differet pieces that we want to fit together:

  1. A Cloud Function that subscribes to the Pub/Sub topic.
  2. A tiny CLI application that publishes to the topic.
  3. The protobuf schema definition that will be shared by the other components.

All components will reside in their own (hypothetical) Git repositories, and we’ll use Github as an upstream repository to tie it all together. Let’s start from the bottom and define the schema library first.

Schema Library

The hypothetical schema library will be hosted on Github at github.com/hedlund/schemas and we will initiate it as a Go module so that we can import it into the other applications:

go mod init github.com/hedlund/schema

In this repository, we create a folder called greetings and in that folder we create a file called greeting.proto and in that file we put our schema, but with a little twist:

syntax = "proto3";

option go_package = "github.com/hedlund/schema/greetings";

message UserGreeting {
  int64  user_id = 1;
  string greeting = 2;
  int64 timestamp = 3;
}

As you can see we’ve added an additional directive, go_package, that will inform the protoc Go generator of which package the resulting Go file belongs to. And the package name itself matches the file structure of the repository, as any Go module would. This will fix the warning we got when we generated the first file.

When generating the resulting Go file, you want to change the command slightly as well:

protoc --proto_path=greetings --go_out=greetings --go_opt=paths=source_relative greeting.proto

This command is designed to be run from the root of the repository, and instructs protoc that our schema exists in the greetings folder and that we want the resulting Go code in the same folder. The go_opt flag also makes sure that the generated code follows the source structure, which also works with nested folders.

The result should be a new file greetings/greeting.gb.go, which contains everything you need to read and write Protocol Buffers matching your schema. In this example, I’m keeping everything simple by having everything in the same folder, and you might want to do something else, like keeping schemas in one folder structure, and the generated code in another. You may also be working in a polyglot organisation, so you may want to add more directives for other languages as well

To make this Go module work properly, we should add the few dependencies we have for the generated code. You can either add them manually by go get‘ing both github.com/golang/protobuf (the Go V1 protobuf API) and google.golang.org/protobuf (the V2 API). Or you can be lazy like me and simply run:

go mod tidy

There is one final little touch I want to add to this repository, before we add all files and push it to Github. In the greetings folder create a file called outputs.tf and add:

output "user_greeting" {
  value = file("${path.module}/greeting.proto")
}

This is a little hack that will help us when using Terraform.

Cloud Function

The next piece we will implement is the Cloud Function that will listen to the topic. This is also where I would typically add the Terraform code that creates the topic itself, so let’s make a few changes to the Terraform code from earlier:

module "schemas" {
  source = "git@github.com:hedlund/schemas.git//greetings?ref=master"
}

resource "google_pubsub_schema" "greeting_v1" {
  name       = "greeting-v1"
  type       = "PROTOCOL_BUFFER"
  definition = module.schemas.user_greeting
}

resource "google_pubsub_topic" "greetings_v1" {
  name = "greetings-v1"

  schema_settings {
    schema   = google_pubsub_schema.greeting_v1.id
    encoding = "BINARY"
  }
}

There are a number of things going on here. First off, I added a module to import the schema definition directly from the library as hosted on GitHub. This is why we added the outputs.tf file, to make it dead simple to read the schema defition. I’m using SSH to reference the repository, but you can also use HTTPS if you prefer.

You can also see that I’ve added a v1 suffix to both the schema and the topic. The reason is that you are not allowed to change a schema for a topic once you have created it. This makes sense as the schema is the contract for the topic, and breaches of contract obviously should be avoided. You are also not allowed to change the content of the schema itself, so adding a simple version suffix will make it cleaner to add new versions in the future.

The only way you can change the schema for a Pub/Sub topic is to delete the topic and create it anew. This also means that you will lose any subscriptions to the original topic, so versioning your topics avoids that whole mess.

I’m not going to go into details about the Cloud Function itself, as it’s a topic we’ve covered before. We create a new repository, initiate it as a new Go module, and finally add our schema repository as a dependency:

go get github.com/hedlund/schemas

The code for our Cloud Function is super-simple, as I’m defining a Pub/Sub triggered function, which handles some of the deserialization for you:

package function

import (
	"context"
	"log"

	"github.com/hedlund/schemas/greetings"
	"google.golang.org/protobuf/proto"
)

type PubSubMessage struct {
	Data []byte `json:"data"`
}

func ReceiveGreeting(ctx context.Context, m PubSubMessage) error {
	var greet greetings.UserGreeting
	if err := proto.Unmarshal(m.Data, &greet); err != nil {
		return err
	}

	log.Printf("%d received %s!", greet.UserId, greet.Greeting)
	return nil
}

As you can see, unmarshalling a protobuf message is just as easy and working with JSON.

If you prefer using a HTTP triggered function, it is almost as easy. The only thing you have to remember is that the main content of the Pub/Sub message is still sent as JSON, and that your protobuf message is base64 encoded within the data field in the payload.

Publishing

The final piece of our setup is the publisher, a small CLI application that will encode and send a message to the Pub/Sub topic. Again, we create a repository, initate it as a Go module, and install the github.com/hedlund/schemasdependency. The application itself can be written as a single file:

package main

import (
	"context"
	"log"
	"time"

	"cloud.google.com/go/pubsub"
	"github.com/hedlund/schemas/greetings"
	"google.golang.org/protobuf/proto"
)

func main() {
	// Construct the message
	greet := &greetings.UserGreeting{
		UserId:   1234,
		Greeting: "Hello, world!",
		Timestamp: time.Now().Unix(),
	}

	// Marshal it into a byte slice
	data, err := proto.Marshal(greet)
	if err != nil {
		log.Fatal(err)
	}

  // Create a Pub/Sub client
	client, err := pubsub.NewClient(ctx, "project-id")
	if err != nil {
		log.Fatal(err)
	}

  // Publish the message
	topic := client.Topic("greetings-v1")
	res := topic.Publish(ctx, &pubsub.Message{
		Data: data,
	})

	// Wait for the message to be published
	_, err := res.Get(context.Background())
	if err != nil {
		log.Fatal(err)
	}
}

With the comments the code should be pretty self-explanatory. We’re reusing the same library as before to define the message, and use the protobuf library to marshal it into a byte slice (again, very similar to working with JSON). Most of the remaning code is just to get a Pub/Sub client up and going and firing off the message.

Wrapping up!

I hope you’ve learned something about working with schemas and Pub/Sub, and perhaps gotten a few ideas of how easy it is to tie everything together with a few hacks.