Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #119 from signalfx/fix-tracing-for-queues
Browse files Browse the repository at this point in the history
Fixed span kind not being set correctly for consumer/produder use cases
  • Loading branch information
owais authored Apr 14, 2021
2 parents d853c71 + 785970c commit 404c627
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 21 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Added

- Sarama instrumentation now supports a `WithPeerServiceName` option. ([#119](https://github.com/signalfx/signalfx-go-tracing/pull/119))
- Zipkin translator now correctly maps `consumer` and `producer` span kind values. ([#119](https://github.com/signalfx/signalfx-go-tracing/pull/119))

### Changed

- `ext.SpanTypeMessageConsumer` and `ext.SpanTypeMessageProducer` now evaluate to `consumer` and `producer` respectively instead of `queue`. ([#119](https://github.com/signalfx/signalfx-go-tracing/pull/119))

## [1.8.0] - 2021-04-13

### Changed
Expand Down
13 changes: 11 additions & 2 deletions contrib/Shopify/sarama/option.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package sarama

type config struct {
serviceName string
analyticsRate float64
serviceName string
peerServiceName string
analyticsRate float64
}

func defaults(cfg *config) {
cfg.serviceName = "kafka"
cfg.peerServiceName = "kafka"
// cfg.analyticsRate = globalconfig.AnalyticsRate()
}

Expand Down Expand Up @@ -35,3 +37,10 @@ func WithAnalyticsRate(rate float64) Option {
cfg.analyticsRate = rate
}
}

// WithPeerServiceName sets the given value as "peer.service" tag on all spans for intercepted client.
func WithPeerServiceName(name string) Option {
return func(cfg *config) {
cfg.peerServiceName = name
}
}
2 changes: 2 additions & 0 deletions contrib/Shopify/sarama/sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.P
tracer.SpanType(ext.SpanTypeMessageConsumer),
tracer.Tag("partition", msg.Partition),
tracer.Tag("offset", msg.Offset),
tracer.Tag("peer.service", cfg.peerServiceName),
}
if cfg.analyticsRate > 0 {
opts = append(opts, tracer.Tag(ext.EventSampleRate, cfg.analyticsRate))
Expand Down Expand Up @@ -241,6 +242,7 @@ func startProducerSpan(cfg *config, version sarama.KafkaVersion, msg *sarama.Pro
tracer.ServiceName(cfg.serviceName),
tracer.ResourceName("Produce Topic " + msg.Topic),
tracer.SpanType(ext.SpanTypeMessageProducer),
tracer.Tag("peer.service", cfg.peerServiceName),
}
if cfg.analyticsRate > 0 {
opts = append(opts, tracer.Tag(ext.EventSampleRate, cfg.analyticsRate))
Expand Down
12 changes: 6 additions & 6 deletions contrib/Shopify/sarama/sarama_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestConsumer(t *testing.T) {
assert.Equal(t, int64(0), s.Tag("offset"))
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
assert.Equal(t, "Consume Topic test-topic", s.Tag(ext.ResourceName))
assert.Equal(t, "queue", s.Tag(ext.SpanType))
assert.Equal(t, "consumer", s.Tag(ext.SpanType))
assert.Equal(t, "kafka.consume", s.OperationName())
}
{
Expand All @@ -85,7 +85,7 @@ func TestConsumer(t *testing.T) {
assert.Equal(t, int64(1), s.Tag("offset"))
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
assert.Equal(t, "Consume Topic test-topic", s.Tag(ext.ResourceName))
assert.Equal(t, "queue", s.Tag(ext.SpanType))
assert.Equal(t, "consumer", s.Tag(ext.SpanType))
assert.Equal(t, "kafka.consume", s.OperationName())
}
}
Expand Down Expand Up @@ -130,7 +130,7 @@ func TestSyncProducer(t *testing.T) {
{
s := spans[0]
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
assert.Equal(t, "queue", s.Tag(ext.SpanType))
assert.Equal(t, "producer", s.Tag(ext.SpanType))
assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName))
assert.Equal(t, "kafka.produce", s.OperationName())
assert.Equal(t, int32(0), s.Tag("partition"))
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestSyncProducerSendMessages(t *testing.T) {
assert.Len(t, spans, 2)
for _, s := range spans {
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
assert.Equal(t, "queue", s.Tag(ext.SpanType))
assert.Equal(t, "producer", s.Tag(ext.SpanType))
assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName))
assert.Equal(t, "kafka.produce", s.OperationName())
assert.Equal(t, int32(0), s.Tag("partition"))
Expand Down Expand Up @@ -216,7 +216,7 @@ func TestAsyncProducer(t *testing.T) {
{
s := spans[0]
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
assert.Equal(t, "queue", s.Tag(ext.SpanType))
assert.Equal(t, "producer", s.Tag(ext.SpanType))
assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName))
assert.Equal(t, "kafka.produce", s.OperationName())
assert.Equal(t, int32(0), s.Tag("partition"))
Expand Down Expand Up @@ -251,7 +251,7 @@ func TestAsyncProducer(t *testing.T) {
{
s := spans[0]
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
assert.Equal(t, "queue", s.Tag(ext.SpanType))
assert.Equal(t, "producer", s.Tag(ext.SpanType))
assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName))
assert.Equal(t, "kafka.produce", s.OperationName())
assert.Equal(t, int32(0), s.Tag("partition"))
Expand Down
6 changes: 3 additions & 3 deletions contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestConsumerChannel(t *testing.T) {
assert.Equal(t, "kafka.consume", s.OperationName())
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
assert.Equal(t, "Consume Topic gotest", s.Tag(ext.ResourceName))
assert.Equal(t, "queue", s.Tag(ext.SpanType))
assert.Equal(t, "consumer", s.Tag(ext.SpanType))
assert.Equal(t, int32(1), s.Tag("partition"))
assert.Equal(t, 0.3, s.Tag(ext.EventSampleRate))
assert.Equal(t, kafka.Offset(i+1), s.Tag("offset"))
Expand Down Expand Up @@ -163,14 +163,14 @@ func TestConsumerPoll(t *testing.T) {
assert.Equal(t, "kafka", s0.Tag(ext.ServiceName))
assert.Equal(t, "Produce Topic gotest", s0.Tag(ext.ResourceName))
assert.Equal(t, 0.1, s0.Tag(ext.EventSampleRate))
assert.Equal(t, "queue", s0.Tag(ext.SpanType))
assert.Equal(t, "producer", s0.Tag(ext.SpanType))
assert.Equal(t, int32(0), s0.Tag("partition"))

s1 := spans[1] // consume
assert.Equal(t, "kafka.consume", s1.OperationName())
assert.Equal(t, "kafka", s1.Tag(ext.ServiceName))
assert.Equal(t, "Consume Topic gotest", s1.Tag(ext.ResourceName))
assert.Equal(t, nil, s1.Tag(ext.EventSampleRate))
assert.Equal(t, "queue", s1.Tag(ext.SpanType))
assert.Equal(t, "consumer", s1.Tag(ext.SpanType))
assert.Equal(t, int32(0), s1.Tag("partition"))
}
8 changes: 4 additions & 4 deletions ddtrace/ext/app_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ const (
// SpanTypeDNS marks a span as a DNS operation.
SpanTypeDNS = "dns"

// SpanTypeMessageConsumer marks a span as a queue operation
SpanTypeMessageConsumer = "queue"
// SpanTypeMessageConsumer marks a span as a consumer operation
SpanTypeMessageConsumer = "consumer"

// SpanTypeMessageProducer marks a span as a queue operation.
SpanTypeMessageProducer = "queue"
// SpanTypeMessageProducer marks a span as a producer operation.
SpanTypeMessageProducer = "producer"

// SpanTypeEcho marks a span as a echo request
SpanTypeEcho = "echo"
Expand Down
12 changes: 9 additions & 3 deletions ddtrace/tracer/zipkinpayload.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ import (
)

const (
spanKind = "span.kind"
spanKindServer = "SERVER"
spanKindClient = "CLIENT"
spanKind = "span.kind"
spanKindServer = "SERVER"
spanKindClient = "CLIENT"
spanKindProducer = "PRODUCER"
spanKindConsumer = "CONSUMER"
)

var _ encoder = (*zipkinPayload)(nil)
Expand Down Expand Up @@ -167,6 +169,10 @@ func deriveKind(s *span) *string {
return pointer.String(spanKindClient)
case ext.SpanTypeSQL:
return pointer.String(spanKindClient)
case ext.SpanTypeMessageConsumer:
return pointer.String(spanKindConsumer)
case ext.SpanTypeMessageProducer:
return pointer.String(spanKindProducer)
}

return nil
Expand Down
15 changes: 12 additions & 3 deletions ddtrace/tracer/zipkinpayload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package tracer

import (
"bytes"
"github.com/mailru/easyjson"
sfxtrace "github.com/signalfx/golib/trace"
traceformat "github.com/signalfx/golib/trace/format"
"io/ioutil"
"strconv"
"strings"
"testing"

"github.com/mailru/easyjson"
sfxtrace "github.com/signalfx/golib/trace"
traceformat "github.com/signalfx/golib/trace/format"

"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -80,6 +81,8 @@ func TestSpanKindRemovedFromTags(t *testing.T) {
spanList := []*span{
&span{Meta: map[string]string{"span.kind": "server"}},
&span{Meta: map[string]string{"span.kind": "client"}},
&span{Meta: map[string]string{"span.kind": "producer"}},
&span{Meta: map[string]string{"span.kind": "consumer"}},
}

converted := payload.convertSpans(spanList)
Expand All @@ -88,6 +91,12 @@ func TestSpanKindRemovedFromTags(t *testing.T) {

require.Equal(map[string]string{}, converted[1].Tags)
require.Equal("CLIENT", *(converted[1].Kind))

require.Equal(map[string]string{}, converted[2].Tags)
require.Equal("PRODUCER", *(converted[2].Kind))

require.Equal(map[string]string{}, converted[3].Tags)
require.Equal("CONSUMER", *(converted[3].Kind))
}

// TestZipkinPayloadDecode ensures that whatever we push into the payload can
Expand Down

0 comments on commit 404c627

Please sign in to comment.