Skip to content

Commit

Permalink
added redis pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelperel committed May 25, 2021
1 parent 9d1eba6 commit 028cece
Show file tree
Hide file tree
Showing 16 changed files with 539 additions and 130 deletions.
3 changes: 3 additions & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
OTEL_AGENT_URL=otel-agent:4317
REDIS_URL=redis://redis:6379/0
SERVER_URL=http://server:8080/hello
File renamed without changes.
76 changes: 56 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,29 +1,51 @@
# What is this?
This is a demo of Open Telemetry's distributed tracing capabilities.
In `docker-compose.yml` there are variety of services:
* `server` - a service that implements an http server
* `client` - a service that sends a few requests to the server
* `server` - a service that implements an HTTP server and publishes a message
per request via [redis' pubsub](https://redis.io/topics/pubsub)
* `worker` - a service that listens for messages on redis' pubsub and
does work when a message is published
* `jaeger` - an open source telemetry backend
* `zipkin` - an open source telemetry backend
* `otel-agent` - a service that receives traces from `server` and `client`
* `otel-collector` - a service that receives traces forwarded from `otel-agent`
and exports them to `jaeger` and `zipkin`

# Why is this interesting?
By using Open Telemetry with the agent and collector, backends are swappable
and all services handle tracing in the same way, regardless of language.

Specifically, applications send traces to the agent, which forwards them to the
collector, and the collector defines backends via exporters in yaml.

Here we use 2 exporters, `jaeger` and `zipkin`, but there are many possible
exporters including
[Azure Monitor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/azuremonitorexporter).
1. By using Open Telemetry with the collector, backends are swappable
and all services handle tracing in the same way, regardless of programming
language.

Specifically, applications send traces to the agent, which forwards them to
the collector, and the collector defines backends via exporters in yaml.

Here we use 2 exporters, `jaeger` and `zipkin`, but there are many possible
exporters including
[Azure Monitor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/azuremonitorexporter).

2. Cloud architectures often use some form of a message broker to communicate
long running operations. While HTTP is covered via docs, many messaging
systems use protocols that are not supported by the Open Telemetry SDK
(there are no helper functions that inject and extract spans for you).
One such example would be redis' pubsub wire protocol. In this repo, we show
how to add distributed tracing to any arbitrary messaging system.

3. Many popular libraries integrate with Open Telemetry with no extra work
required. One library is [go-redis](https://github.com/go-redis/redis). This
is great because if a library is not instrumented, the best you can do is
either modify the library or instrument code which calls the library (which
inherently misses internal events in the library that do not bubble up to
the surface of the exposed API).

# How to use?
`docker-compose up --build` brings up all services.
The `client` sends a few requests to `server`. The distributed traces appear in
`jaeger` and `zipkin`.

The `client` sends a few requests to `server`. The `server` publishes messages
to `redis`. The `worker` listens for messages and performs work when they are
published.

The distributed traces appear in `jaeger` and `zipkin`.

`jaeger` can be accessed at `http://localhost:16686`.

Expand All @@ -45,22 +67,36 @@ a trace, you can see the distributed spans that make up the trace:
![Spans](./docs/jaeger-span.png)

# How to navigate the code?
Start by reading the comments in `src/cmd/client/client.go`.
They describe how to create a trace that propagates to the server over
an http request.
Start by reading the comments in `cmd/client/client.go`.
They describe how to create a trace that propagates to the server via
an HTTP request.

Next, read the comments in `src/cmd/server/server.go`. They describe
how the propagated trace is used in subsequent spans.
Next, read the comments in `cmd/server/server.go`. They describe
how the propagated trace is used in children spans.

Finally, read the comments in `src/pkg/tracer_provider/tracer_provider.go`. They
describe boilerplate code that sets up a tracer provider for each application.
Next, read the comments in `pkg/message.go`. They describe how to
add headers to the message that propagate the trace context from the `server`
to the `worker`, in the same way as would be done via HTTP.

Next, read the comments in `cmd/worker/worker.go`. They describe how to
extract the trace context from messages on redis' pubsub and create child spans
with this context.

Next, read the comments in `pkg/broker.go`. They describe how the trace context
can be manually injected and extracted, when publishing and receiving messages.

Finally, read the comments in `pkg/tracer.go`. They describe boilerplate code
that sets up a tracer provider for each application.

# Development
A dev container has been provided. To use:
* Ensure the `Remote - Containers` extension is installed in VSCode
* Open the project in the container
* Install the Go extension libraries with `Go: Install/Update tools` from
the command palette
the command palette

> Note: When running any docker commands, run them from outside of the
dev container (on the host machine)

# Citations
The collector code is adapted from
Expand Down
103 changes: 61 additions & 42 deletions src/cmd/client/client.go → cmd/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,84 @@ package main
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"os"
"time"

"github.com/michaelperel/otel-demo/pkg/tracer_provider"
"github.com/michaelperel/otel-demo/pkg"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

// Annotate the tracer with the library name. In Jaeger, this shows up as a tag.
var tr = otel.Tracer("cmd/client")
var tr = otel.Tracer("client")

func makeRequest() error {
func main() {
var (
otelURL = mustGetEnvStr("OTEL_AGENT_URL")
serverURL = mustGetEnvStr("SERVER_URL")
)

// Initializing the tracer with the service name makes
// all traces easy to find by service.
shutdown := pkg.InitializeGlobalTracer(otelURL, "client")
defer shutdown()

// Wait for the server to start (obviously, in a real project, use retry
// logic rather than sleeping)
//
// Start a span. A trace is a collection of spans, where spans can have
// children spans. Information necessary to create a child span is
// returned in the first value (normally named "ctx", but we are not
// creating a child span, so we have assigned it to "_").
_, span := tr.Start(context.Background(), "wait for server")
time.Sleep(10 * time.Second)

// Adding an event is the same as logging a message in the span.
span.AddEvent("slept for 10 seconds, to wait for server to come up")
span.End()

for i := 0; i < 5; i++ {
if err := makeRequest(serverURL); err != nil {
panic(err)
}
}

// Record an error, just to see what it looks like in the backend
_, errSpan := tr.Start(context.Background(), "example error")
defer errSpan.End()

errSpan.RecordError(errors.New("example error"))

// Setting the status fails the entire span. In Jaeger, this causes the
// span to appear red.
errSpan.SetStatus(codes.Error, "fail entire span")
}

func mustGetEnvStr(k string) string {
v := os.Getenv(k)
if v == "" {
panic(fmt.Sprintf("'%s' environment variable missing", k))
}

return v
}

func makeRequest(url string) error {
ctx, span := tr.Start(
context.Background(),
"make request",
trace.WithSpanKind(trace.SpanKindClient),
)
defer span.End()

// Auto instrument the http client.
// Auto instrument the http client. This will create a new span whenever
// a request is made, and set the span kind to client (setting span kinds,
// like many attributes, is optional and appears as tags in Jaeger).
client := http.Client{
Transport: otelhttp.NewTransport(http.DefaultTransport),
}
Expand All @@ -41,15 +94,15 @@ func makeRequest() error {
req, err := http.NewRequestWithContext(
ctx,
"GET",
"http://server:8080/hello",
url,
nil,
)
if err != nil {
span.RecordError(err)
return err
}

// Simulate work before the client is done, so you can view
// Simulate work before the client is done, so you can more easily view
// different time lengths in the span in telemetry backends.
time.Sleep(1 * time.Second)

Expand All @@ -70,37 +123,3 @@ func makeRequest() error {

return nil
}

func main() {
addr := "otel-agent:4317"

// Initializing the trace provider with the service name makes
// all traces easy to find by service.
shutdown := tracer_provider.Initialize(addr, "client")
defer shutdown()

// Wait for the server to start (obviously, in a real project, use retry
// logic rather than sleeping)
_, span := tr.Start(context.Background(), "wait for server")
time.Sleep(10 * time.Second)

// Adding an event is the same as logging a message in the span.
span.AddEvent("slept for 10 seconds, to wait for server to come up")
span.End()

for i := 0; i < 5; i++ {
if err := makeRequest(); err != nil {
panic(err)
}
}

// Record an error, just to see what it looks like in the backend
_, errSpan := tr.Start(context.Background(), "example error")
defer errSpan.End()

errSpan.RecordError(errors.New("example error"))

// Setting the status fails the entire span. In Jaeger, this causes the
// span to appear red.
errSpan.SetStatus(codes.Error, "fail entire span")
}
114 changes: 114 additions & 0 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package main

import (
"context"
"fmt"
"net/http"
"os"
"time"

"github.com/michaelperel/otel-demo/pkg"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/trace"
)

var tr = otel.Tracer("server")

func main() {
var (
otelURL = mustGetEnvStr("OTEL_AGENT_URL")
redisURL = mustGetEnvStr("REDIS_URL")
)

shutdown := pkg.InitializeGlobalTracer(otelURL, "server")
defer shutdown()

server := mustNewServer(redisURL)

// Auto instrument any request to /hello. This will create a span whenever
// a request to this endpoint is made. It will also set the span kind to
// server.
otelHandler := otelhttp.NewHandler(server, "/hello")
http.Handle("/hello", otelHandler)

err := http.ListenAndServe(":8080", nil)
if err != nil {
panic(err)
}
}

type server struct{ b *pkg.Broker }

func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Simulate work to make viewing the span easier in the telemetry backend.
time.Sleep(1 * time.Second)

// Create a child span, using the trace context propagated from the
// client request in r.Context().
ctx, span := tr.Start(
r.Context(),
"handle request",
)
defer span.End()

// Baggage from client propagates as well
uk := attribute.Key("username")
username := baggage.Value(ctx, uk)
span.AddEvent(
"reading username baggage...",
trace.WithAttributes(uk.String(username.AsString())),
)

// Add the response as an attribute to the span. This can show up
// differently in different backends, but in Jaeger attributes show up
// as tags.
body := "hello, world"
span.SetAttributes(attribute.String("body", body))

// Publish the body to redis pubsub so that it can be picked up by the
// worker service
if err := s.b.Publish(ctx, body); err != nil {
span.RecordError(err)

http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

fmt.Fprint(w, body)
}

func mustGetEnvStr(k string) string {
v := os.Getenv(k)
if v == "" {
panic(fmt.Sprintf("'%s' environment variable missing", k))
}

return v
}

func mustNewServer(url string) *server {
c, err := pkg.NewClient(url)
if err != nil {
panic(err)
}

a := time.After(60 * time.Second)
t := time.NewTicker(3 * time.Second)
loop:
for {
select {
case <-t.C:
if err := c.Ping(context.Background()).Err(); err == nil {
break loop
}
case <-a:
panic("timeout connecting to redis")
}
}

b := pkg.NewBroker(c)
return &server{b}
}
Loading

0 comments on commit 028cece

Please sign in to comment.