From b296ff39437bf173f292a3da6c1e75856220ffcb Mon Sep 17 00:00:00 2001 From: Pierre Tessier Date: Mon, 4 Dec 2023 03:02:47 -0500 Subject: [PATCH] [accountingservice] add attributes to kafka spans (#1286) * add attributes to kafka spans Signed-off-by: Pierre Tessier * add attributes to kafka spans Signed-off-by: Pierre Tessier --------- Signed-off-by: Pierre Tessier --- CHANGELOG.md | 5 ++++- src/accountingservice/kafka/consumer.go | 2 +- src/accountingservice/kafka/trace_interceptor.go | 5 ++++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 17dc0c89e0..6d0b7b31e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,10 @@ release. ([#1236](https://github.com/open-telemetry/opentelemetry-demo/pull/1236)) * [cartservice] Add .NET memory, CPU, and thread metrics ([#1265](https://github.com/open-telemetry/opentelemetry-demo/pull/1265)) -* enable browser traffic in loadgenerator using playwright ([#1266](https://github.com/open-telemetry/opentelemetry-demo/pull/1266)) +* enable browser traffic in loadgenerator using playwright + ([#1266](https://github.com/open-telemetry/opentelemetry-demo/pull/1266)) +* [accountingservice] Add additional attributes to Kafka spans + ([#1286](https://github.com/open-telemetry/opentelemetry-demo/pull/1286)) ## 1.6.0 diff --git a/src/accountingservice/kafka/consumer.go b/src/accountingservice/kafka/consumer.go index 56d82e08c2..1599a507ae 100644 --- a/src/accountingservice/kafka/consumer.go +++ b/src/accountingservice/kafka/consumer.go @@ -24,7 +24,7 @@ func StartConsumerGroup(ctx context.Context, brokers []string, log *logrus.Logge saramaConfig.Version = ProtocolVersion // So we can know the partition and offset of messages. saramaConfig.Producer.Return.Successes = true - saramaConfig.Consumer.Interceptors = []sarama.ConsumerInterceptor{NewOTelInterceptor()} + saramaConfig.Consumer.Interceptors = []sarama.ConsumerInterceptor{NewOTelInterceptor(GroupID)} consumerGroup, err := sarama.NewConsumerGroup(brokers, GroupID, saramaConfig) if err != nil { diff --git a/src/accountingservice/kafka/trace_interceptor.go b/src/accountingservice/kafka/trace_interceptor.go index 9ff2d3bf11..60fdb440a5 100644 --- a/src/accountingservice/kafka/trace_interceptor.go +++ b/src/accountingservice/kafka/trace_interceptor.go @@ -21,12 +21,13 @@ type OTelInterceptor struct { // NewOTelInterceptor processes span for intercepted messages and add some // headers with the span data. -func NewOTelInterceptor() *OTelInterceptor { +func NewOTelInterceptor(groupID string) *OTelInterceptor { oi := OTelInterceptor{} oi.tracer = otel.Tracer("github.com/open-telemetry/opentelemetry-demo/accountingservice/sarama") oi.fixedAttrs = []attribute.KeyValue{ semconv.MessagingSystem("kafka"), + semconv.MessagingKafkaConsumerGroup(groupID), semconv.NetTransportTCP, } return &oi @@ -50,6 +51,8 @@ func (oi *OTelInterceptor) OnConsume(msg *sarama.ConsumerMessage) { trace.WithAttributes( semconv.MessagingDestinationKindTopic, semconv.MessagingDestinationName(msg.Topic), + semconv.MessagingKafkaMessageOffset(int(msg.Offset)), + semconv.MessagingMessagePayloadSizeBytes(len(msg.Value)), semconv.MessagingOperationReceive, semconv.MessagingKafkaDestinationPartition(int(msg.Partition)), ),