Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Sarama improve async producer #120

Merged
merged 2 commits into from
Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Changed

- `ext.SpanTypeMessageConsumer` and `ext.SpanTypeMessageProducer` now evaluate to `consumer` and `producer` respectively instead of `queue`. ([#119](https://github.com/signalfx/signalfx-go-tracing/pull/119))
- Sarama instrumentation requires Kafka version 0.11.0 or newer to work correctly. ([#120](https://github.com/signalfx/signalfx-go-tracing/pull/120))

### Fixed

- Sarama instrumentation correctly identifies and finishes spans. ([#120](https://github.com/signalfx/signalfx-go-tracing/pull/120))
pellared marked this conversation as resolved.
Show resolved Hide resolved
owais marked this conversation as resolved.
Show resolved Hide resolved

## [1.8.0] - 2021-04-13

Expand Down
56 changes: 38 additions & 18 deletions contrib/Shopify/sarama/sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
package sarama // import "github.com/signalfx/signalfx-go-tracing/contrib/Shopify/sarama"

import (
"log"

"github.com/Shopify/sarama"

"github.com/signalfx/signalfx-go-tracing/ddtrace"
Expand Down Expand Up @@ -178,56 +180,60 @@ func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts
}
if saramaConfig == nil {
saramaConfig = sarama.NewConfig()
saramaConfig.Version = sarama.V0_11_0_0
}

headersSupported := true
if !saramaConfig.Version.IsAtLeast(sarama.V0_11_0_0) {
headersSupported = false
log.Println("Tracing Sarama async producer requires at least sarama.V0_11_0_0 version")
pellared marked this conversation as resolved.
Show resolved Hide resolved
}

wrapped := &asyncProducer{
AsyncProducer: p,
input: make(chan *sarama.ProducerMessage),
successes: make(chan *sarama.ProducerMessage),
errors: make(chan *sarama.ProducerError),
}
go func() {
type spanKey struct {
topic string
partition int32
offset int64
}
spans := make(map[spanKey]ddtrace.Span)
spans := make(map[uint64]ddtrace.Span)
defer close(wrapped.successes)
defer close(wrapped.errors)
for {
select {
case msg := <-wrapped.input:
key := spanKey{msg.Topic, msg.Partition, msg.Offset}
span := startProducerSpan(cfg, saramaConfig.Version, msg)
p.Input() <- msg
if saramaConfig.Producer.Return.Successes {
spans[key] = span
if headersSupported && saramaConfig.Producer.Return.Successes {
spans[tracer.SpanID(span.Context())] = span
} else {
// if returning successes isn't enabled, we just finish the
// span right away because there's no way to know when it will
// be done
finishProducerSpan(span, key.partition, key.offset, nil)
finishProducerSpan(span, msg.Partition, msg.Offset, nil)
}
case msg, ok := <-p.Successes():
if !ok {
// producer was closed, so exit
return
}
key := spanKey{msg.Topic, msg.Partition, msg.Offset}
if span, ok := spans[key]; ok {
delete(spans, key)
finishProducerSpan(span, msg.Partition, msg.Offset, nil)
if id, found := getSpanIDFromMsg(msg); found {
if span, ok := spans[id]; ok {
delete(spans, id)
finishProducerSpan(span, msg.Partition, msg.Offset, nil)
}
}
wrapped.successes <- msg
case err, ok := <-p.Errors():
if !ok {
// producer was closed
return
}
key := spanKey{err.Msg.Topic, err.Msg.Partition, err.Msg.Offset}
if span, ok := spans[key]; ok {
delete(spans, key)
finishProducerSpan(span, err.Msg.Partition, err.Msg.Offset, err.Err)
if id, found := getSpanIDFromMsg(err.Msg); found {
if span, ok := spans[id]; ok {
delete(spans, id)
finishProducerSpan(span, err.Msg.Partition, err.Msg.Offset, err.Err)
}
}
wrapped.errors <- err
}
Expand Down Expand Up @@ -264,3 +270,17 @@ func finishProducerSpan(span ddtrace.Span, partition int32, offset int64, err er
span.SetTag("offset", offset)
span.FinishWithOptionsExt(tracer.WithError(err))
}

func getSpanIDFromMsg(msg *sarama.ProducerMessage) (uint64, bool) {
carrier := NewProducerMessageCarrier(msg)
ctx, err := tracer.Extract(carrier)
if err != nil {
return 0, false
}

id := tracer.SpanID(ctx)
if id == 0 {
return 0, false
}
return id, true
}
3 changes: 3 additions & 0 deletions contrib/Shopify/sarama/sarama_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,15 @@ func TestAsyncProducer(t *testing.T) {
})

t.Run("With Successes", func(t *testing.T) {
t.Skip("Skipping test because sarama.MockBroker doesn't work with versions >= sarama.V0_11_0_0 " +
"https://github.com/Shopify/sarama/issues/1665")
mt := mocktracer.Start()
defer mt.Stop()

broker := newMockBroker(t)

cfg := sarama.NewConfig()
cfg.Version = sarama.V0_11_0_0
cfg.Producer.Return.Successes = true

producer, err := sarama.NewAsyncProducer([]string{broker.Addr()}, cfg)
Expand Down
5 changes: 3 additions & 2 deletions ddtrace/globaltracer.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package ddtrace // import "github.com/signalfx/signalfx-go-tracing/ddtrace"

import (
"sync"

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
"sync"
)

var (
mu sync.RWMutex // guards globalTracer
globalTracer Tracer = &NoopTracer{}
globalTracer Tracer = &NoopTracer{}
pellared marked this conversation as resolved.
Show resolved Hide resolved
)

// SetGlobalTracer sets the global tracer to t.
Expand Down
20 changes: 10 additions & 10 deletions ddtrace/mocktracer/mockspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package mocktracer // import "github.com/signalfx/signalfx-go-tracing/ddtrace/mo

import (
"fmt"
"sync"
"time"

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
"github.com/signalfx/signalfx-go-tracing/ddtrace/tracer"
"sync"
"time"

"github.com/signalfx/signalfx-go-tracing/ddtrace"
"github.com/signalfx/signalfx-go-tracing/ddtrace/ext"
Expand Down Expand Up @@ -92,14 +93,13 @@ func newSpan(t *mocktracer, operationName string, cfg *ddtrace.StartSpanConfig)

type mockspan struct {
sync.RWMutex // guards below fields
name string
tags map[string]interface{}
finishTime time.Time

startTime time.Time
parentID uint64
context *spanContext
tracer *mocktracer
name string
tags map[string]interface{}
finishTime time.Time
pellared marked this conversation as resolved.
Show resolved Hide resolved
startTime time.Time
parentID uint64
context *spanContext
tracer *mocktracer
}

func (s *mockspan) Finish() {
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/tracer/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package tracer
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/signalfx/signalfx-go-tracing/internal/globalconfig"
"github.com/stretchr/testify/assert"
)

func withTransport(t transport) StartOption {
Expand Down
10 changes: 5 additions & 5 deletions ddtrace/tracer/spancontext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/signalfx/signalfx-go-tracing/ddtrace/ext"
"github.com/stretchr/testify/assert"
)

func setupteardown(start, max int) func() {
Expand Down Expand Up @@ -246,8 +246,8 @@ func TestSpanFinishPriority(t *testing.T) {
func TestTracePriorityLocked(t *testing.T) {
assert := assert.New(t)
b3Headers := TextMapCarrier(map[string]string{
b3TraceIDHeader: "2",
b3SpanIDHeader: "2",
b3TraceIDHeader: "2",
b3SpanIDHeader: "2",
b3SampledHeader: "2",
})

Expand Down Expand Up @@ -298,8 +298,8 @@ func TestNewSpanContext(t *testing.T) {
defer stop()
assert := assert.New(t)
ctx, err := NewPropagator(nil).Extract(TextMapCarrier(map[string]string{
b3TraceIDHeader: "1",
b3SpanIDHeader: "2",
b3TraceIDHeader: "1",
b3SpanIDHeader: "2",
b3SampledHeader: "3",
}))
assert.Nil(err)
Expand Down
6 changes: 3 additions & 3 deletions ddtrace/tracer/traceparent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (
"github.com/signalfx/signalfx-go-tracing/ddtrace"
)

func FormatAsTraceParent(context ddtrace.SpanContext) (string,bool) {
func FormatAsTraceParent(context ddtrace.SpanContext) (string, bool) {
ctx, ok := context.(*spanContext)
if !ok || ctx.traceID == 0 || ctx.spanID == 0 {
return "",false
return "", false
}
answer := fmt.Sprintf("traceparent;desc=\"00-%032x-%016x-01\"", ctx.traceID, ctx.spanID)
return answer,true
return answer, true
}
8 changes: 4 additions & 4 deletions ddtrace/tracer/traceparent_test.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package tracer

import (
"testing"
"regexp"
"testing"

"github.com/stretchr/testify/assert"
)

func TestTraceParent(t *testing.T) {
func TestTraceParent(t *testing.T) {
tracer := newTracer()
span := tracer.StartSpan("web.request").(*span)
traceParent,ok := FormatAsTraceParent(span.Context())
traceParent, ok := FormatAsTraceParent(span.Context())

assert := assert.New(t)
assert.True(ok)
matched,_ := regexp.MatchString("^traceparent;desc=\"00-[0-9a-f]{32}-[0-9a-f]{16}-01\"$", traceParent)
matched, _ := regexp.MatchString("^traceparent;desc=\"00-[0-9a-f]{32}-[0-9a-f]{16}-01\"$", traceParent)
assert.True(matched)
}
16 changes: 16 additions & 0 deletions ddtrace/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,22 @@ func TraceIDHex(ctx ddtrace.SpanContext) string {
return ""
}

// SpanID returns the span ID from ddtrace.SpanContext
func SpanID(ctx ddtrace.SpanContext) uint64 {
if c, ok := ctx.(*spanContext); ok {
return c.spanID
}
return 0
}

// TraceID returns the span ID from ddtrace.SpanContext
func TraceID(ctx ddtrace.SpanContext) uint64 {
if c, ok := ctx.(*spanContext); ok {
return c.traceID
}
return 0
}

pellared marked this conversation as resolved.
Show resolved Hide resolved
pellared marked this conversation as resolved.
Show resolved Hide resolved
const (
// payloadQueueSize is the buffer size of the trace channel.
payloadQueueSize = 1000
Expand Down
20 changes: 20 additions & 0 deletions ddtrace/tracer/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1160,3 +1160,23 @@ func BenchmarkTracerStackFrames(b *testing.B) {
span.FinishWithOptionsExt(StackFrames(64, 0))
}
}

type externalSpanContext struct{}

func (e externalSpanContext) ForeachBaggageItem(handler func(k, v string) bool) {
}

func TestSpanAndTraceID(t *testing.T) {
c1 := &spanContext{}
assert.Equal(t, SpanID(c1), uint64(0))
assert.Equal(t, TraceID(c1), uint64(0))

c2 := &spanContext{spanID: 1, traceID: 2}
assert.Equal(t, SpanID(c2), uint64(1))
assert.Equal(t, TraceID(c2), uint64(2))

c3 := &externalSpanContext{}
assert.Equal(t, SpanID(c3), uint64(0))
assert.Equal(t, TraceID(c3), uint64(0))

Copy link
Contributor

@pellared pellared Apr 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can remove this empty line 😉

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably you could also add:

	var c4 SpanContext
	assert.Equal(t, SpanID(c4), uint64(0))
	assert.Equal(t, TraceID(c4), uint64(0))

also I like table-driven tests for tests like this https://blog.golang.org/subtests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not gonna write a loop for 3 items 😆

}