Skip to content

Commit

Permalink
Update kafka example to use 2 services for producer and consumer (#893)
Browse files Browse the repository at this point in the history
* Update kafka example to use 2 services for producer and consumer

* precommit

---------

Co-authored-by: Tyler Yahn <[email protected]>
  • Loading branch information
RonFed and MrAlias authored Jul 9, 2024
1 parent 024038a commit 6f99aef
Show file tree
Hide file tree
Showing 11 changed files with 220 additions and 53 deletions.
11 changes: 10 additions & 1 deletion .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,16 @@ updates:
interval: weekly
day: sunday
- package-ecosystem: gomod
directory: /examples/kafka-go
directory: /examples/kafka-go/consumer
labels:
- dependencies
- go
- Skip Changelog
schedule:
interval: weekly
day: sunday
- package-ecosystem: gomod
directory: /examples/kafka-go/producer
labels:
- dependencies
- go
Expand Down
6 changes: 4 additions & 2 deletions examples/kafka-go/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
FROM golang:1.22.5
WORKDIR /app
COPY . .
RUN go build -o main
ENTRYPOINT ["/app/main"]
ARG BINARY_NAME
ENV BINARY_NAME=$BINARY_NAME
RUN go build -o $BINARY_NAME
ENTRYPOINT ["sh", "-c", "/app/$BINARY_NAME"]
6 changes: 3 additions & 3 deletions examples/kafka-go/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Example of Auto instrumentation of HTTP server + Kafka producer + Kafka consumer + Manual span

This example shows a trace being generated which is composed of a HTTP server handler which produces
a batch of 2 message to different kafka topics, and a single consumer consuming one of these messages.
The consumer generate a manual span for each message it handles, this span is visible as the son of the consumer span.
This example shows a trace being generated which is composed of:
- `kafkaproducer` HTTP server handler which produces a batch of 2 message to different kafka topics.
- `kafkaconsumer` consuming one of these messages, and generates a manual span for each message it handles, this span is visible as the son of the consumer span.

To run the example, bring up the services using the command.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module go.opentelemetry.io/auto/examples/kafka-go
module go.opentelemetry.io/auto/examples/kafka-go/consumer

go 1.22.0

Expand Down
File renamed without changes.
79 changes: 79 additions & 0 deletions examples/kafka-go/consumer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"

kafka "github.com/segmentio/kafka-go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
)

var tracer = otel.Tracer("trace-example")

func getKafkaReader() *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"kafka:9092"},
GroupID: "some group id",
Topic: "topic1",
ReadBatchTimeout: 1 * time.Millisecond,
})
}

func reader(ctx context.Context) {
reader := getKafkaReader()

defer reader.Close()

fmt.Println("start consuming ... !!")
for {
select {
case <-ctx.Done():
return
default:
m, err := reader.ReadMessage(ctx)
if err != nil {
fmt.Printf("failed to read message: %v\n", err)
continue
}
_, span := tracer.Start(ctx, "consumer manual span")
span.SetAttributes(
attribute.String("topic", m.Topic),
attribute.Int64("partition", int64(m.Partition)),
attribute.Int64("offset", int64(m.Offset)),
)
fmt.Printf("consumed message at topic:%v partition:%v offset:%v %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
span.End()
}
}
}

func main() {
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)

time.Sleep(5 * time.Second)
go reader(ctx)

<-ch
cancel()
}
48 changes: 41 additions & 7 deletions examples/kafka-go/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,31 +57,65 @@ services:
ports:
- 2181:2181

kafkaapp:
kafkaproducer:
depends_on:
kafka:
condition: service_healthy
build:
context: .
dockerfile: ./Dockerfile
context: ./producer
dockerfile: ../Dockerfile
args:
BINARY_NAME: producer
pid: "host"
ports:
- "8080:8080"
volumes:
- /proc:/host/proc

go-auto:
kafkaconsumer:
depends_on:
- kafkaapp
kafka:
condition: service_healthy
build:
context: ./consumer
dockerfile: ../Dockerfile
args:
BINARY_NAME: consumer
pid: "host"
volumes:
- /proc:/host/proc

go-auto-producer:
depends_on:
- kafkaproducer
build:
context: ../..
dockerfile: Dockerfile
privileged: true
pid: "host"
environment:
- OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318
- OTEL_GO_AUTO_TARGET_EXE=/app/producer
- OTEL_SERVICE_NAME=kafkaproducer
- OTEL_PROPAGATORS=tracecontext,baggage
- OTEL_GO_AUTO_SHOW_VERIFIER_LOG=true
volumes:
- /proc:/host/proc
- debugfs:/sys/kernel/debug
command: ["/otel-go-instrumentation", "-global-impl"]

go-auto-consumer:
depends_on:
- kafkaconsumer
build:
context: ../..
dockerfile: Dockerfile
privileged: true
pid: "host"
environment:
- OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318
- OTEL_GO_AUTO_TARGET_EXE=/app/main
- OTEL_SERVICE_NAME=kafkaapp
- OTEL_GO_AUTO_TARGET_EXE=/app/consumer
- OTEL_SERVICE_NAME=kkafkaconsumer
- OTEL_PROPAGATORS=tracecontext,baggage
- OTEL_GO_AUTO_SHOW_VERIFIER_LOG=true
volumes:
Expand Down
11 changes: 11 additions & 0 deletions examples/kafka-go/producer/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module go.opentelemetry.io/auto/examples/kafka-go/producer

go 1.22.0

require github.com/segmentio/kafka-go v0.4.47

require (
github.com/klauspost/compress v1.15.9 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/stretchr/testify v1.9.0 // indirect
)
69 changes: 69 additions & 0 deletions examples/kafka-go/producer/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0=
github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
38 changes: 0 additions & 38 deletions examples/kafka-go/main.go → examples/kafka-go/producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,8 @@ import (
"time"

kafka "github.com/segmentio/kafka-go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
)

var tracer = otel.Tracer("trace-example")

type server struct {
kafkaWriter *kafka.Writer
}
Expand Down Expand Up @@ -84,39 +80,6 @@ func getKafkaWriter() *kafka.Writer {
}
}

func getKafkaReader() *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"kafka:9092"},
GroupID: "some group id",
Topic: "topic1",
ReadBatchTimeout: 1 * time.Millisecond,
})
}

func reader() {
reader := getKafkaReader()

defer reader.Close()
ctx := context.Background()

fmt.Println("start consuming ... !!")
for {
m, err := reader.ReadMessage(ctx)
if err != nil {
fmt.Printf("failed to read message: %v\n", err)
continue
}
_, span := tracer.Start(ctx, "consumer manual span")
span.SetAttributes(
attribute.String("topic", m.Topic),
attribute.Int64("partition", int64(m.Partition)),
attribute.Int64("offset", int64(m.Offset)),
)
fmt.Printf("consumed message at topic:%v partition:%v offset:%v %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
span.End()
}
}

func main() {
kafkaWriter := getKafkaWriter()
defer kafkaWriter.Close()
Expand All @@ -132,7 +95,6 @@ func main() {
}

time.Sleep(5 * time.Second)
go reader()

s := &server{kafkaWriter: kafkaWriter}

Expand Down
3 changes: 2 additions & 1 deletion versions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ excluded-modules:
- github.com/hashicorp/go-version
- go.opentelemetry.io/auto/examples
- go.opentelemetry.io/auto/examples/httpPlusdb
- go.opentelemetry.io/auto/examples/kafka-go
- go.opentelemetry.io/auto/examples/kafka-go/producer
- go.opentelemetry.io/auto/examples/kafka-go/consumer
- go.opentelemetry.io/auto/examples/rolldice
- go.opentelemetry.io/auto/internal/test/e2e/databasesql
- go.opentelemetry.io/auto/internal/test/e2e/gin
Expand Down

0 comments on commit 6f99aef

Please sign in to comment.