This article is the second of a series of two. If you had not read the first one, you can do it here. In this post we are going to finish the implementation of the checkout-service and the stock-service and wrap up the proof of concept.

Revisiting the flow

In the previous article we finished with the gateway, and while doing so we defined the proto files required for both gateway and checkout-service. Now, we can tackle the mentioned one and stock-service.

checkout-user-journey

Back to the utils library

Since both checkout and stock are going to use RabbitMQ, there are two functions that can be added in order to simplify the development.

Connecting to RabbitMQ

The function below will connect a service to a RabbitMQ instance and declaring, if it does not exist, a default exchange that it is going to be used. Place this function under a new file named amqp.go

package utils

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

func ConnectAmqp(user, pass, host, port string) (*amqp.Channel, func() error) {
	address := fmt.Sprintf("amqp://%s:%[email protected]%s:%s/", user, pass, host, port)

	connection, err := amqp.Dial(address)
	if err != nil {
		log.Fatal(err)
	}

	channel, err := connection.Channel()
	if err != nil {
		log.Fatal(err)
	}

	err = channel.ExchangeDeclare("exchange", "direct", true, false, false, false, nil)
	if err != nil {
		log.Fatal(err)
	}

	return channel, connection.Close
}

Run go get github.com/streadway/amqp and go mod tidy to ensure the dependencies behave as expected.

Propagating the trace

When working with HTTP and gRPC calls, the trace can be propagated using go’s built-in context, but for RabbitMQ this is not as straightforward. A RabbitMQ message contains both payload and headers, and in this case the headers are an excellent candidate to solve the propagation.

The library streadway/amqp defines the headers as a map[string]interface{}, so we are going to define our own type that is compliance with that:

type AmqpHeadersCarrier map[string]interface{}

The open-telemetry library then will handle the injection and extraction of the trace using the map, but in order to do that our defined type needs to implement the TextMapCarrier interface. This means that the functions Get, Set, and Keys have to be defined.

Create the carriers.go file and add the piece of code below:

package utils

import (
	"context"

	"go.opentelemetry.io/otel"
)

type AmqpHeadersCarrier map[string]interface{}

func (a AmqpHeadersCarrier) Get(key string) string {
	v, ok := a[key]
	if !ok {
		return ""
	}
	return v.(string)
}

func (a AmqpHeadersCarrier) Set(key string, value string) {
	a[key] = value
}

func (a AmqpHeadersCarrier) Keys() []string {
	i := 0
	r := make([]string, len(a))

	for k, _ := range a {
		r[i] = k
		i++
	}

	return r
}

// InjectAMQPHeaders injects the tracing from the context into the header map
func InjectAMQPHeaders(ctx context.Context) map[string]interface{} {
	h := make(AmqpHeadersCarrier)
	otel.GetTextMapPropagator().Inject(ctx, h)
	return h
}

// ExtractAMQPHeaders extracts the tracing from the header and puts it into the context
func ExtractAMQPHeaders(ctx context.Context, headers map[string]interface{}) context.Context {
	return otel.GetTextMapPropagator().Extract(ctx, AmqpHeadersCarrier(headers))
}

Commit and push these changes so the other services can reference these utilities.

👉 Commit reference 👈

Checkout service

This component has two responsibilities. First, is going to contain the gRPC server logic, in other words, is going to receive the call from the gateway. Second, is going to publish a RabbitMQ message named checkout.processed that other services may listen to.

In order to do so, these environment variables are needed:

  • JAEGER_ADDRESS
  • JAEGER_PORT
  • GRPC_ADDRESS
  • RABBITMQ_USER
  • RABBITMQ_PASS
  • RABBITMQ_HOST
  • RABBITMQ_PORT

Create a folder named checkout and inside initialize go mod and add the dependencies for proto, utils and otelgrpc package (that as stated on the previous article it handles the tracing propagation between gRPC calls):

go mod init github.com/pmorelli92/open-telemetry-go/checkout
go get github.com/pmorelli92/open-telemetry-go/proto
go get github.com/pmorelli92/open-telemetry-go/utils
go get go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc

Then, create the main.go file with the following content:

package main

import (
	pb "github.com/pmorelli92/open-telemetry-go/proto"
	"github.com/pmorelli92/open-telemetry-go/utils"
	"github.com/streadway/amqp"
	"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
	"google.golang.org/grpc"
	"log"
	"net"
)

func main() {
	jaegerAddress := utils.EnvString("JAEGER_ADDRESS", "localhost")
	jaegerPort := utils.EnvString("JAEGER_PORT", "6831")
	grpcAddress := utils.EnvString("GRPC_ADDRESS", "localhost:8080")
	amqpUser := utils.EnvString("RABBITMQ_USER", "guest")
	amqpPass := utils.EnvString("RABBITMQ_PASS", "guest")
	amqpHost := utils.EnvString("RABBITMQ_HOST", "localhost")
	amqpPort := utils.EnvString("RABBITMQ_PORT", "5672")

	err := utils.SetGlobalTracer("checkout", jaegerAddress, jaegerPort)
	if err != nil {
		log.Fatalf("failed to create tracer: %v", err)
	}

	channel, closeConn := utils.ConnectAmqp(amqpUser, amqpPass, amqpHost, amqpPort)
	defer closeConn()

	lis, err := net.Listen("tcp", grpcAddress)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	s := grpc.NewServer(
		grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
		grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()))

	pb.RegisterCheckoutServer(s, &server{channel: channel})

	log.Printf("GRPC server listening at %v", lis.Addr())
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

type server struct {
	pb.UnimplementedCheckoutServer
	channel *amqp.Channel
}

So far, the code is doing most of the things needed:

  • Setting up the environment variables and the global tracer.
  • Connecting to RabbitMQ.
  • Serving the gRPC server on the provided url.

However, this file is lacking the function that specifies how the gRPC checkout function works. This function as stated before, should publish the amqp message:

func (s *server) DoCheckout(ctx context.Context, rq *pb.CheckoutRequest) (*pb.CheckoutResponse, error) {
	messageName := "checkout.processed"

	// Create a new span (child of the trace id) to inform the publishing of the message
	tr := otel.Tracer("amqp")
	amqpContext, messageSpan := tr.Start(ctx, fmt.Sprintf("AMQP - publish - %s", messageName))
	defer messageSpan.End()

	// Inject the context in the headers
	headers := utils.InjectAMQPHeaders(amqpContext)
	msg := amqp.Publishing{Headers: headers}
	err := s.channel.Publish("exchange", messageName, false, false, msg)
	if err != nil {
		log.Fatal(err)
	}

	return &pb.CheckoutResponse{TotalAmount: 1234}, nil
}

If our service would publish a lot of messages, this logic for getting a tracer and creating a child span could be isolated on another way, for example using the decorator’s pre-processor pattern.

This will do just fine, but tracing allow us to log different things as events for example the response:

response := &pb.CheckoutResponse{TotalAmount: 1234}

// Example on how to log specific events for a span
span := trace.SpanFromContext(ctx)
span.AddEvent(fmt.Sprintf("response: %v", response))

return response, nil

Code is then finished, now add a Dockerfile:

FROM golang:alpine3.14 AS compiler
RUN apk --update --no-cache add git
WORKDIR /checkout

ADD go.mod go.sum ./
RUN go mod download

ADD . .
RUN CGO_ENABLED=0 go build -o /bin/goapp ./main.go

FROM scratch
COPY --from=compiler /bin/goapp /checkout
ENTRYPOINT ["/checkout"]

Execute a go mod tidy and you can commit and push.

👉 Commit reference 👈

Modifying the docker-compose

On the docker-compose file, now add a RabbitMQ instance, and the checkout service:

rabbitmq:
image: "rabbitmq:3-management"
ports:
	- "15672:15672"
	- "5672:5672"
checkout:
build:
	context: checkout
restart: on-failure
ports:
	- "8080:8080"
environment:
	- JAEGER_ADDRESS=jaeger
	- JAEGER_PORT=6831
	- GRPC_ADDRESS=checkout:8080
	- RABBITMQ_USER=guest
	- RABBITMQ_PASS=guest
	- RABBITMQ_HOST=rabbitmq
	- RABBITMQ_PORT=5672
depends_on:
	- jaeger
	- rabbitmq

👉 Commit reference 👈

Trying things out

You should be able to start all the components executing from the root of the repository: docker-compose up --build. After the services are up and running you can try calling the gateway with cURL:

curl -v -X POST http://localhost:8081/api/checkout

< HTTP/1.1 202 Accepted
< Date: Sun, 19 Dec 2021 13:53:04 GMT
< Content-Length: 0

The response is 202 as the checkout-service is up and running, and therefore able to receive the call from the gateway. A trace should be created with 4 spans:

  • HTTP call.
  • gRPC client call.
  • gRPC server handling (and logging the response).
  • AMQP publishing of the message.

Go to http://localhost:16686, on the UI select the service gateway and click Find Traces. You should be able to see a trace, click on it to see more details:

tracing-details

In the previous article, the spans were decorated with a red error logo as the result of the flow was failed (due to the checkout-service not being up). Now there are no warnings or errors indicating that anymore, and we can clearly see the event that was added in the code.

Stock service

This component has only one responsibility that is listening to the checkout.processed message. In order to do so, these environment variables are needed:

  • JAEGER_ADDRESS
  • JAEGER_PORT
  • RABBITMQ_USER
  • RABBITMQ_PASS
  • RABBITMQ_HOST
  • RABBITMQ_PORT

Create a folder named stock and inside initialize go mod and add the dependency to for utils package:

go mod init github.com/pmorelli92/open-telemetry-go/stock
go get github.com/pmorelli92/open-telemetry-go/utils

Then, create the main.go file with the following content:

package main

import (
	"context"
	"github.com/pmorelli92/open-telemetry-go/utils"
	"github.com/streadway/amqp"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/trace"
	"log"
	"sync"
	"time"
)

func main() {
	jaegerAddress := utils.EnvString("JAEGER_ADDRESS", "localhost")
	jaegerPort := utils.EnvString("JAEGER_PORT", "6831")
	amqpUser := utils.EnvString("RABBITMQ_USER", "guest")
	amqpPass := utils.EnvString("RABBITMQ_PASS", "guest")
	amqpHost := utils.EnvString("RABBITMQ_HOST", "localhost")
	amqpPort := utils.EnvString("RABBITMQ_PORT", "5672")

	err := utils.SetGlobalTracer("stock", jaegerAddress, jaegerPort)
	if err != nil {
		log.Fatalf("failed to create tracer: %v", err)
	}

	channel, closeConn := utils.ConnectAmqp(amqpUser, amqpPass, amqpHost, amqpPort)
	defer closeConn()

	// Create queue and binding
	_, err = channel.QueueDeclare("stock-queue", true, false, false, false, nil)
	if err != nil {
		log.Fatal(err)
	}

	err = channel.QueueBind("stock-queue", "checkout.processed", "exchange", false, nil)
	if err != nil {
		log.Fatal(err)
	}

	// Start consuming
	go ConsumeFromAMQP(channel)
	log.Println("AMQP listening")

	// Block termination
	wg := sync.WaitGroup{}
	wg.Add(1)
	wg.Wait()
}

func ConsumeFromAMQP(channel *amqp.Channel) {

}

This piece of code is very similar to the checkout-service shown before. This means: parsing environment variables, setting up the tracer and creating an AMQP connection.

The only difference here is that the stock-service is creating a queue (if it does not yet exists) and making this queue receive the checkout.processed message. Then, there is a go routine being executed that will:

  • Receive the amqp message.
  • Extract the trace and add a span.
  • Ack the message.
func ConsumeFromAMQP(channel *amqp.Channel) {
	// Start the consumption
	deliveries, err := channel.Consume("stock-queue", "some-tag", false, false, false, false, nil)
	if err != nil {
		log.Fatal(err)
	}

	for {
		select {
		// For each message
		case d := <-deliveries:

			// Extract headers
			ctx := utils.ExtractAMQPHeaders(context.Background(), d.Headers)

			// Create a new span
			tr := otel.Tracer("amqp")
			ctx, messageSpan := tr.Start(ctx, "AMQP - consume - checkout.processed")

			// Cannot use defer inside a for loop
			time.Sleep(1 * time.Millisecond)
			messageSpan.End()

			// ACK the message
			err = d.Ack(false)
			if err != nil {
				log.Fatal(err)
			}
		}
	}
}

Code is then finished, now add a Dockerfile:

FROM golang:alpine3.14 AS compiler
RUN apk --update --no-cache add git
WORKDIR /stock

ADD go.mod go.sum ./
RUN go mod download

ADD . .
RUN CGO_ENABLED=0 go build -o /bin/goapp ./main.go

FROM scratch
COPY --from=compiler /bin/goapp /stock
ENTRYPOINT ["/stock"]

Execute a go mod tidy and you can commit and push.

👉 Commit reference 👈

Updating the docker-compose

Last modification on this file, add the stock-service:

stock:
build:
	context: stock
restart: on-failure
environment:
	- JAEGER_ADDRESS=jaeger
	- JAEGER_PORT=6831
	- RABBITMQ_USER=guest
	- RABBITMQ_PASS=guest
	- RABBITMQ_HOST=rabbitmq
	- RABBITMQ_PORT=5672
depends_on:
	- jaeger
	- rabbitmq

👉 Commit reference 👈

Trying things out, again

From the root of the repository, run: docker-compose up --build. Invoke the gateway again with cURL:

curl -v -X POST http://localhost:8081/api/checkout

< HTTP/1.1 202 Accepted
< Date: Sun, 19 Dec 2021 13:53:04 GMT
< Content-Length: 0

Go to http://localhost:16686, and find the new trace, it should look like:

tracing-details

Now, we can see that there is one more span stating that the stock-service consumed the message that was published by the checkout-service.

Visualizing the journey

On the Jaeger menu, go to System Architecture and then click on DAG.

tracing-details

Awesome right? Let’s generate more traffic by re-executing the cURL command a couple of times:

tracing-details

Now, this is a very simple proof of concept, but imagine a real environment, with hundreds of services, having the ability to see who is talking to who, and being able to inspect traces knowing how much time each service took is pretty cool, right?

Summary

These two articles were code-heavy but I believe that the impact of having a tracing solution is more visible when several components are interacting with each other and using different protocols.

Jaeger, being an open source solution, is a great addition that can help us observe how different components are related, and what flows are they involved into.

Having spans in place, and adding events to them when necessary, can also be a very good replacement for a log solution. The gateway can add the trace id as a response header with span.SpanContext().TraceID() and therefore frontend can show this to the end user.

Finally, users can report an error with a traceable ID on a support channel. Knowing the trace, a developer can use Jaeger UI to see what happened in that user journey and be able to diagnose what happened.