10 minutes
Go - Step by step guide for implementing tracing on a microservices architecture (2/2)
Update 2023: The repository with the source code is still having some traction on Github some I decided to update the dependencies as some of them were deprecated and the tracer exporter now uses a different port.
If you just want to jump into the repository with all the code, 👉 check it out here 👈
This article is the second of a series of two. If you had not read the first one, you can do it here.
In this post we are going to finish the implementation of the checkout-service
and the stock-service
and wrap up the proof of concept.
Revisiting the flow
In the previous article we finished with the gateway
, and while doing so we defined the proto
files required for both gateway
and checkout-service
. Now, we can tackle the mentioned one and stock-service
.
Back to the utils library
Since both checkout
and stock
are going to use RabbitMQ, there are two functions that can be added in order to simplify the development.
Connecting to RabbitMQ
The function below will connect a service to a RabbitMQ instance and declaring, if it does not exist, a default exchange that it is going to be used. Place this function under a new file named amqp.go
package utils
import (
"fmt"
"log"
"github.com/rabbitmq/amqp091-go"
)
func ConnectAmqp(user, pass, host, port string) (*amqp091.Channel, func() error) {
address := fmt.Sprintf("amqp://%s:%s@%s:%s/", user, pass, host, port)
connection, err := amqp091.Dial(address)
if err != nil {
log.Fatal(err)
}
channel, err := connection.Channel()
if err != nil {
log.Fatal(err)
}
err = channel.ExchangeDeclare("exchange", "direct", true, false, false, false, nil)
if err != nil {
log.Fatal(err)
}
return channel, connection.Close
}
Run go get github.com/rabbitmq/amqp091-go
and go mod tidy
to ensure the dependencies behave as expected.
Propagating the trace
When working with HTTP and gRPC calls, the trace can be propagated using go’s built-in context
, but for RabbitMQ this is not as straightforward. A RabbitMQ message contains both payload and headers, and in this case the headers are an excellent candidate to solve the propagation.
The library rabbitmq/amqp091-go
defines the headers as a map[string]interface{}
, so we are going to define our own type that is compliance with that:
type AmqpHeadersCarrier map[string]interface{}
The open-telemetry
library then will handle the injection and extraction of the trace using the map, but in order to do that our defined type needs to implement the TextMapCarrier
interface. This means that the functions Get
, Set
, and Keys
have to be defined.
Create the carriers.go
file and add the piece of code below:
package utils
import (
"context"
"go.opentelemetry.io/otel"
)
type AmqpHeadersCarrier map[string]interface{}
func (a AmqpHeadersCarrier) Get(key string) string {
v, ok := a[key]
if !ok {
return ""
}
return v.(string)
}
func (a AmqpHeadersCarrier) Set(key string, value string) {
a[key] = value
}
func (a AmqpHeadersCarrier) Keys() []string {
i := 0
r := make([]string, len(a))
for k := range a {
r[i] = k
i++
}
return r
}
// InjectAMQPHeaders injects the tracing from the context into the header map
func InjectAMQPHeaders(ctx context.Context) map[string]interface{} {
h := make(AmqpHeadersCarrier)
otel.GetTextMapPropagator().Inject(ctx, h)
return h
}
// ExtractAMQPHeaders extracts the tracing from the header and puts it into the context
func ExtractAMQPHeaders(ctx context.Context, headers map[string]interface{}) context.Context {
return otel.GetTextMapPropagator().Extract(ctx, AmqpHeadersCarrier(headers))
}
Commit and push these changes so the other services can reference these utilities.
Checkout service
This component has two responsibilities. First, is going to contain the gRPC server logic, in other words, is going to receive the call from the gateway
. Second, is going to publish a RabbitMQ message named checkout.processed
that other services may listen to.
In order to do so, these environment variables are needed:
- JAEGER_ENDPOINT
- GRPC_ADDRESS
- RABBITMQ_USER
- RABBITMQ_PASS
- RABBITMQ_HOST
- RABBITMQ_PORT
Create a folder named checkout
and inside initialize go mod and add the dependencies for proto
, utils
and otelgrpc
package (that as stated on the previous article it handles the tracing propagation between gRPC calls):
go mod init github.com/pmorelli92/open-telemetry-go/checkout
go get github.com/pmorelli92/open-telemetry-go/proto
go get github.com/pmorelli92/open-telemetry-go/utils
go get go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc
Then, create the main.go
file with the following content:
package main
import (
"context"
"fmt"
"log"
"net"
pb "github.com/pmorelli92/open-telemetry-go/proto"
"github.com/pmorelli92/open-telemetry-go/utils"
"github.com/rabbitmq/amqp091-go"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
)
func main() {
jaegerEndpoint := utils.EnvString("JAEGER_ENDPOINT", "localhost:4318")
jaegerPort := utils.EnvString("JAEGER_PORT", "6831")
grpcAddress := utils.EnvString("GRPC_ADDRESS", "localhost:8080")
amqpUser := utils.EnvString("RABBITMQ_USER", "guest")
amqpPass := utils.EnvString("RABBITMQ_PASS", "guest")
amqpHost := utils.EnvString("RABBITMQ_HOST", "localhost")
amqpPort := utils.EnvString("RABBITMQ_PORT", "5672")
err := utils.SetGlobalTracer(context.Background(), "checkout", jaegerAddress, jaegerPort)
if err != nil {
log.Fatalf("failed to create tracer: %v", err)
}
channel, closeConn := utils.ConnectAmqp(amqpUser, amqpPass, amqpHost, amqpPort)
defer func() {
_ = closeConn()
}()
lis, err := net.Listen("tcp", grpcAddress)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()))
pb.RegisterCheckoutServer(s, &server{channel: channel})
log.Printf("GRPC server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
type server struct {
pb.UnimplementedCheckoutServer
channel *amqp091.Channel
}
So far, the code is doing most of the things needed:
- Setting up the environment variables and the global tracer.
- Connecting to RabbitMQ.
- Serving the gRPC server on the provided url.
However, this file is lacking the function that specifies how the gRPC checkout function works. This function as stated before, should publish the amqp message:
func (s *server) DoCheckout(ctx context.Context, rq *pb.CheckoutRequest) (*pb.CheckoutResponse, error) {
messageName := "checkout.processed"
// Create a new span (child of the trace id) to inform the publishing of the message
tr := otel.Tracer("amqp")
amqpContext, messageSpan := tr.Start(ctx, fmt.Sprintf("AMQP - publish - %s", messageName))
defer messageSpan.End()
// Inject the context in the headers
headers := utils.InjectAMQPHeaders(amqpContext)
msg := amqp.Publishing{Headers: headers}
err := s.channel.PublishWithContext(ctx, "exchange", messageName, false, false, msg)
if err != nil {
log.Fatal(err)
}
return &pb.CheckoutResponse{TotalAmount: 1234}, nil
}
If our service would publish a lot of messages, this logic for getting a tracer and creating a child span could be isolated on another way, for example using the decorator’s pre-processor pattern.
This will do just fine, but tracing allow us to log different things as events for example the response:
response := &pb.CheckoutResponse{TotalAmount: 1234}
// Example on how to log specific events for a span
span := trace.SpanFromContext(ctx)
span.AddEvent(fmt.Sprintf("response: %v", response))
return response, nil
Code is then finished, now add a Dockerfile:
FROM golang:alpine3.18 AS compiler
RUN apk --update --no-cache add git
WORKDIR /checkout
ADD go.mod go.sum ./
RUN go mod download
ADD . .
RUN CGO_ENABLED=0 go build -o /bin/goapp ./main.go
FROM scratch
COPY --from=compiler /bin/goapp /checkout
ENTRYPOINT ["/checkout"]
Execute a go mod tidy
and you can commit and push.
Modifying the docker-compose
On the docker-compose file, now add a RabbitMQ instance, and the checkout service:
rabbitmq:
image: "rabbitmq:3-management"
ports:
- "15672:15672"
- "5672:5672"
checkout:
build:
context: checkout
restart: on-failure
ports:
- "8080:8080"
environment:
- JAEGER_ENDPOINT=jaeger:4318
- GRPC_ADDRESS=checkout:8080
- RABBITMQ_USER=guest
- RABBITMQ_PASS=guest
- RABBITMQ_HOST=rabbitmq
- RABBITMQ_PORT=5672
depends_on:
- jaeger
- rabbitmq
Trying things out
You should be able to start all the components executing from the root of the repository: docker-compose up --build
. After the services are up and running you can try calling the gateway with cURL:
curl -v -X POST http://localhost:8081/api/checkout
< HTTP/1.1 202 Accepted
< Date: Sun, 19 Dec 2021 13:53:04 GMT
< Content-Length: 0
The response is 202
as the checkout-service
is up and running, and therefore able to receive the call from the gateway. A trace should be created with 4 spans:
- HTTP call.
- gRPC client call.
- gRPC server handling (and logging the response).
- AMQP publishing of the message.
Go to http://localhost:16686
, on the UI select the service gateway
and click Find Traces. You should be able to see a trace, click on it to see more details:
In the previous article, the spans were decorated with a red error logo as the result of the flow was failed (due to the checkout-service
not being up). Now there are no warnings or errors indicating that anymore, and we can clearly see the event that was added in the code.
Stock service
This component has only one responsibility that is listening to the checkout.processed
message. In order to do so, these environment variables are needed:
- JAEGER_ENDPOINT
- RABBITMQ_USER
- RABBITMQ_PASS
- RABBITMQ_HOST
- RABBITMQ_PORT
Create a folder named stock
and inside initialize go mod and add the dependency to for utils
package:
go mod init github.com/pmorelli92/open-telemetry-go/stock
go get github.com/pmorelli92/open-telemetry-go/utils
Then, create the main.go
file with the following content:
import (
"context"
"log"
"sync"
"time"
"github.com/pmorelli92/open-telemetry-go/utils"
"github.com/rabbitmq/amqp091-go"
"go.opentelemetry.io/otel"
)
func main() {
jaegerEndpoint := utils.EnvString("JAEGER_ENDPOINT", "localhost:4318")
amqpUser := utils.EnvString("RABBITMQ_USER", "guest")
amqpPass := utils.EnvString("RABBITMQ_PASS", "guest")
amqpHost := utils.EnvString("RABBITMQ_HOST", "localhost")
amqpPort := utils.EnvString("RABBITMQ_PORT", "5672")
err := utils.SetGlobalTracer(context.Background(), "stock", jaegerEndpoint)
if err != nil {
log.Fatalf("failed to create tracer: %v", err)
}
channel, closeConn := utils.ConnectAmqp(amqpUser, amqpPass, amqpHost, amqpPort)
defer func() {
_ = closeConn()
}()
// Create queue and binding
_, err = channel.QueueDeclare("stock-queue", true, false, false, false, nil)
if err != nil {
log.Fatal(err)
}
err = channel.QueueBind("stock-queue", "checkout.processed", "exchange", false, nil)
if err != nil {
log.Fatal(err)
}
// Start consuming
go ConsumeFromAMQP(channel)
log.Println("AMQP listening")
// Block termination
wg := sync.WaitGroup{}
wg.Add(1)
wg.Wait()
}
func ConsumeFromAMQP(channel *amqp.Channel) {
}
This piece of code is very similar to the checkout-service
shown before. This means: parsing environment variables, setting up the tracer and creating an AMQP connection.
The only difference here is that the stock-service
is creating a queue (if it does not yet exists) and making this queue receive the checkout.processed
message. Then, there is a go routine being executed that will:
- Receive the amqp message.
- Extract the trace and add a span.
- Ack the message.
func ConsumeFromAMQP(channel *amqp.Channel) {
// Start the consumption
deliveries, err := channel.Consume("stock-queue", "some-tag", false, false, false, false, nil)
if err != nil {
log.Fatal(err)
}
for {
// For each message
d := <-deliveries
// Extract headers
ctx := utils.ExtractAMQPHeaders(context.Background(), d.Headers)
// Create a new span
tr := otel.Tracer("amqp")
_, messageSpan := tr.Start(ctx, "AMQP - consume - checkout.processed")
// Cannot use defer inside a for loop
time.Sleep(1 * time.Millisecond)
messageSpan.End()
// ACK the message
err = d.Ack(false)
if err != nil {
log.Fatal(err)
}
}
}
Code is then finished, now add a Dockerfile:
FROM golang:alpine3.18 AS compiler
RUN apk --update --no-cache add git
WORKDIR /stock
ADD go.mod go.sum ./
RUN go mod download
ADD . .
RUN CGO_ENABLED=0 go build -o /bin/goapp ./main.go
FROM scratch
COPY --from=compiler /bin/goapp /stock
ENTRYPOINT ["/stock"]
Execute a go mod tidy
and you can commit and push.
Updating the docker-compose
Last modification on this file, add the stock-service
:
stock:
build:
context: stock
restart: on-failure
environment:
- JAEGER_ENDPOINT=jaeger:4318
- RABBITMQ_USER=guest
- RABBITMQ_PASS=guest
- RABBITMQ_HOST=rabbitmq
- RABBITMQ_PORT=5672
depends_on:
- jaeger
- rabbitmq
Trying things out, again
From the root of the repository, run: docker-compose up --build
. Invoke the gateway again with cURL:
curl -v -X POST http://localhost:8081/api/checkout
< HTTP/1.1 202 Accepted
< Date: Sun, 19 Dec 2021 13:53:04 GMT
< Content-Length: 0
Go to http://localhost:16686
, and find the new trace, it should look like:
Now, we can see that there is one more span stating that the stock-service
consumed the message that was published by the checkout-service
.
Visualizing the journey
On the Jaeger menu, go to System Architecture and then click on DAG.
Awesome right? Let’s generate more traffic by re-executing the cURL command a couple of times:
Now, this is a very simple proof of concept, but imagine a real environment, with hundreds of services, having the ability to see who is talking to who, and being able to inspect traces knowing how much time each service took is pretty cool, right?
Summary
These two articles were code-heavy but I believe that the impact of having a tracing solution is more visible when several components are interacting with each other and using different protocols.
Jaeger, being an open source solution, is a great addition that can help us observe how different components are related, and what flows are they involved into.
Having spans in place, and adding events to them when necessary, can also be a very good replacement for a log solution. The gateway can add the trace id as a response header with span.SpanContext().TraceID()
and therefore frontend can show this to the end user.
Finally, users can report an error with a traceable ID on a support channel. Knowing the trace, a developer can use Jaeger UI to see what happened in that user journey and be able to diagnose what happened.