Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update kafka example to use 2 services for producer and consumer #893

Merged
merged 5 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,16 @@ updates:
interval: weekly
day: sunday
- package-ecosystem: gomod
directory: /examples/kafka-go
directory: /examples/kafka-go/consumer
labels:
- dependencies
- go
- Skip Changelog
schedule:
interval: weekly
day: sunday
- package-ecosystem: gomod
directory: /examples/kafka-go/producer
labels:
- dependencies
- go
Expand Down
6 changes: 4 additions & 2 deletions examples/kafka-go/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
FROM golang:1.22.5
WORKDIR /app
COPY . .
RUN go build -o main
ENTRYPOINT ["/app/main"]
ARG BINARY_NAME
ENV BINARY_NAME=$BINARY_NAME
RUN go build -o $BINARY_NAME
ENTRYPOINT ["sh", "-c", "/app/$BINARY_NAME"]
6 changes: 3 additions & 3 deletions examples/kafka-go/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Example of Auto instrumentation of HTTP server + Kafka producer + Kafka consumer + Manual span

This example shows a trace being generated which is composed of a HTTP server handler which produces
a batch of 2 message to different kafka topics, and a single consumer consuming one of these messages.
The consumer generate a manual span for each message it handles, this span is visible as the son of the consumer span.
This example shows a trace being generated which is composed of:
- `kafkaproducer` HTTP server handler which produces a batch of 2 message to different kafka topics.
- `kafkaconsumer` consuming one of these messages, and generates a manual span for each message it handles, this span is visible as the son of the consumer span.

To run the example, bring up the services using the command.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module go.opentelemetry.io/auto/examples/kafka-go
module go.opentelemetry.io/auto/examples/kafka-go/consumer

go 1.22.0

Expand Down
File renamed without changes.
79 changes: 79 additions & 0 deletions examples/kafka-go/consumer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"

kafka "github.com/segmentio/kafka-go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
)

var tracer = otel.Tracer("trace-example")

func getKafkaReader() *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"kafka:9092"},
GroupID: "some group id",
Topic: "topic1",
ReadBatchTimeout: 1 * time.Millisecond,
})
}

func reader(ctx context.Context) {
reader := getKafkaReader()

defer reader.Close()

fmt.Println("start consuming ... !!")
for {
select {
case <-ctx.Done():
return
default:
m, err := reader.ReadMessage(ctx)
if err != nil {
fmt.Printf("failed to read message: %v\n", err)
continue
}
_, span := tracer.Start(ctx, "consumer manual span")
span.SetAttributes(
attribute.String("topic", m.Topic),
attribute.Int64("partition", int64(m.Partition)),
attribute.Int64("offset", int64(m.Offset)),
)
fmt.Printf("consumed message at topic:%v partition:%v offset:%v %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
span.End()
}
}
}

func main() {
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)

time.Sleep(5 * time.Second)
go reader(ctx)

<-ch
cancel()
}
48 changes: 41 additions & 7 deletions examples/kafka-go/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,31 +57,65 @@ services:
ports:
- 2181:2181

kafkaapp:
kafkaproducer:
depends_on:
kafka:
condition: service_healthy
build:
context: .
dockerfile: ./Dockerfile
context: ./producer
dockerfile: ../Dockerfile
args:
BINARY_NAME: producer
pid: "host"
ports:
- "8080:8080"
volumes:
- /proc:/host/proc

go-auto:
kafkaconsumer:
depends_on:
- kafkaapp
kafka:
condition: service_healthy
build:
context: ./consumer
dockerfile: ../Dockerfile
args:
BINARY_NAME: consumer
pid: "host"
volumes:
- /proc:/host/proc

go-auto-producer:
depends_on:
- kafkaproducer
build:
context: ../..
dockerfile: Dockerfile
privileged: true
pid: "host"
environment:
- OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318
- OTEL_GO_AUTO_TARGET_EXE=/app/producer
- OTEL_SERVICE_NAME=kafkaproducer
- OTEL_PROPAGATORS=tracecontext,baggage
- OTEL_GO_AUTO_SHOW_VERIFIER_LOG=true
volumes:
- /proc:/host/proc
- debugfs:/sys/kernel/debug
command: ["/otel-go-instrumentation", "-global-impl"]

go-auto-consumer:
depends_on:
- kafkaconsumer
build:
context: ../..
dockerfile: Dockerfile
privileged: true
pid: "host"
environment:
- OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318
- OTEL_GO_AUTO_TARGET_EXE=/app/main
- OTEL_SERVICE_NAME=kafkaapp
- OTEL_GO_AUTO_TARGET_EXE=/app/consumer
- OTEL_SERVICE_NAME=kkafkaconsumer
- OTEL_PROPAGATORS=tracecontext,baggage
- OTEL_GO_AUTO_SHOW_VERIFIER_LOG=true
volumes:
Expand Down
11 changes: 11 additions & 0 deletions examples/kafka-go/producer/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module go.opentelemetry.io/auto/examples/kafka-go/producer

go 1.22.0

require github.com/segmentio/kafka-go v0.4.47

require (
github.com/klauspost/compress v1.15.9 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/stretchr/testify v1.9.0 // indirect
)
69 changes: 69 additions & 0 deletions examples/kafka-go/producer/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0=
github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,8 @@ import (
"time"

kafka "github.com/segmentio/kafka-go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
)

var tracer = otel.Tracer("trace-example")

type server struct {
kafkaWriter *kafka.Writer
}
Expand Down Expand Up @@ -84,39 +80,6 @@ func getKafkaWriter() *kafka.Writer {
}
}

func getKafkaReader() *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"kafka:9092"},
GroupID: "some group id",
Topic: "topic1",
ReadBatchTimeout: 1 * time.Millisecond,
})
}

func reader() {
reader := getKafkaReader()

defer reader.Close()
ctx := context.Background()

fmt.Println("start consuming ... !!")
for {
m, err := reader.ReadMessage(ctx)
if err != nil {
fmt.Printf("failed to read message: %v\n", err)
continue
}
_, span := tracer.Start(ctx, "consumer manual span")
span.SetAttributes(
attribute.String("topic", m.Topic),
attribute.Int64("partition", int64(m.Partition)),
attribute.Int64("offset", int64(m.Offset)),
)
fmt.Printf("consumed message at topic:%v partition:%v offset:%v %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
span.End()
}
}

func main() {
kafkaWriter := getKafkaWriter()
defer kafkaWriter.Close()
Expand All @@ -132,7 +95,6 @@ func main() {
}

time.Sleep(5 * time.Second)
go reader()

s := &server{kafkaWriter: kafkaWriter}

Expand Down
3 changes: 2 additions & 1 deletion versions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ excluded-modules:
- github.com/hashicorp/go-version
- go.opentelemetry.io/auto/examples
- go.opentelemetry.io/auto/examples/httpPlusdb
- go.opentelemetry.io/auto/examples/kafka-go
- go.opentelemetry.io/auto/examples/kafka-go/producer
- go.opentelemetry.io/auto/examples/kafka-go/consumer
- go.opentelemetry.io/auto/examples/rolldice
- go.opentelemetry.io/auto/internal/test/e2e/databasesql
- go.opentelemetry.io/auto/internal/test/e2e/gin
Expand Down
Loading