Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add instrumentation for Kafka #134

Merged
merged 20 commits into from
Jul 23, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Added

- Add instrumentation for Kafka (github.com/Shopify/sarama). (#134)
- Add links and status message for mock span. (#134)

## [0.8.0] - 2020-07-10

This release upgrades its [go.opentelemetry.io/otel](https://github.com/open-telemetry/opentelemetry-go/releases/tag/v0.8.0) dependency to v0.8.0, includes minor fixes, and new instrumentation.
Expand Down
122 changes: 122 additions & 0 deletions instrumentation/github.com/Shopify/sarama/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sarama

import (
"context"

"github.com/Shopify/sarama"

"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/propagation"
"go.opentelemetry.io/otel/api/standard"
"go.opentelemetry.io/otel/api/trace"
)

type partitionConsumer struct {
sarama.PartitionConsumer
messages chan *sarama.ConsumerMessage
}

// Messages returns the read channel for the messages that are returned by
// the broker.
func (pc *partitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
return pc.messages
}

// WrapPartitionConsumer wraps a sarama.PartitionConsumer causing each received
// message to be traced.
func WrapPartitionConsumer(serviceName string, pc sarama.PartitionConsumer, opts ...Option) sarama.PartitionConsumer {
cfg := newConfig(serviceName, opts...)

wrapped := &partitionConsumer{
PartitionConsumer: pc,
messages: make(chan *sarama.ConsumerMessage),
}
go func() {
msgs := pc.Messages()

var prevSpan trace.Span
for msg := range msgs {
// Extract a span context from message to link.
carrier := NewConsumerMessageCarrier(msg)
parentSpanContext := trace.RemoteSpanContextFromContext(propagation.ExtractHTTP(context.Background(), cfg.Propagators, carrier))
Aneurysm9 marked this conversation as resolved.
Show resolved Hide resolved

// Create a span.
attrs := []kv.KeyValue{
standard.ServiceNameKey.String(cfg.ServiceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String(msg.Topic),
standard.MessagingOperationReceive,
standard.MessagingMessageIDKey.Int64(msg.Offset),
XSAM marked this conversation as resolved.
Show resolved Hide resolved
kafkaPartitionKey.Int32(msg.Partition),
}
opts := []trace.StartOption{
trace.WithAttributes(attrs...),
trace.WithSpanKind(trace.SpanKindConsumer),
}
if parentSpanContext.IsValid() {
opts = append(opts, trace.LinkedTo(parentSpanContext))
}
newCtx, span := cfg.Tracer.Start(context.Background(), "kafka.consume", opts...)

// Inject current span context, so consumers can use it to propagate span.
propagation.InjectHTTP(newCtx, cfg.Propagators, carrier)

// Send messages back to user.
wrapped.messages <- msg

// Finish the previous span.
if prevSpan != nil {
prevSpan.End()
}
prevSpan = span
}
// Finish any remaining span.
if prevSpan != nil {
prevSpan.End()
}
close(wrapped.messages)
}()
return wrapped
}

type consumer struct {
sarama.Consumer

serviceName string
opts []Option
}

// ConsumePartition invokes Consumer.ConsumePartition and wraps the resulting
// PartitionConsumer.
func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
pc, err := c.Consumer.ConsumePartition(topic, partition, offset)
if err != nil {
return nil, err
}
return WrapPartitionConsumer(c.serviceName, pc, c.opts...), nil
}

// WrapConsumer wraps a sarama.Consumer wrapping any PartitionConsumer created
// via Consumer.ConsumePartition.
func WrapConsumer(serviceName string, c sarama.Consumer, opts ...Option) sarama.Consumer {
return &consumer{
Consumer: c,
serviceName: serviceName,
opts: opts,
}
}
170 changes: 170 additions & 0 deletions instrumentation/github.com/Shopify/sarama/consumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sarama

import (
"context"
"fmt"
"testing"

"github.com/Shopify/sarama"
"github.com/Shopify/sarama/mocks"
"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/propagation"
"go.opentelemetry.io/otel/api/standard"
"go.opentelemetry.io/otel/api/trace"

mocktracer "go.opentelemetry.io/contrib/internal/trace"
)

const (
serviceName = "test-service-name"
topic = "test-topic"
)

var (
propagators = global.Propagators()
)

func TestWrapPartitionConsumer(t *testing.T) {
// Mock tracer
mt := mocktracer.NewTracer("kafka")

// Mock partition consumer controller
consumer := mocks.NewConsumer(t, sarama.NewConfig())
mockPartitionConsumer := consumer.ExpectConsumePartition(topic, 0, 0)

// Create partition consumer
partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0)
assert.NoError(t, err)
XSAM marked this conversation as resolved.
Show resolved Hide resolved

partitionConsumer = WrapPartitionConsumer(serviceName, partitionConsumer, WithTracer(mt))

consumeAndCheck(t, mt, mockPartitionConsumer, partitionConsumer)
}

func TestWrapConsumer(t *testing.T) {
// Mock tracer
mt := mocktracer.NewTracer("kafka")

// Mock partition consumer controller
mockConsumer := mocks.NewConsumer(t, sarama.NewConfig())
mockPartitionConsumer := mockConsumer.ExpectConsumePartition(topic, 0, 0)

// Wrap consumer
consumer := WrapConsumer(serviceName, mockConsumer, WithTracer(mt))

// Create partition consumer
partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0)
assert.NoError(t, err)
XSAM marked this conversation as resolved.
Show resolved Hide resolved

consumeAndCheck(t, mt, mockPartitionConsumer, partitionConsumer)
}

func consumeAndCheck(t *testing.T, mt *mocktracer.Tracer, mockPartitionConsumer *mocks.PartitionConsumer, partitionConsumer sarama.PartitionConsumer) {
// Create message with span context
ctx, _ := mt.Start(context.Background(), "")
message := sarama.ConsumerMessage{Key: []byte("foo")}
propagation.InjectHTTP(ctx, propagators, NewConsumerMessageCarrier(&message))

// Produce message
mockPartitionConsumer.YieldMessage(&message)
mockPartitionConsumer.YieldMessage(&sarama.ConsumerMessage{Key: []byte("foo2")})

// Consume messages
msgList := make([]*sarama.ConsumerMessage, 2)
msgList[0] = <-partitionConsumer.Messages()
msgList[1] = <-partitionConsumer.Messages()
assert.NoError(t, partitionConsumer.Close())
// Wait for the channel to be closed
<-partitionConsumer.Messages()

// Check spans length
spans := mt.EndedSpans()
assert.Len(t, spans, 2)

expectedList := []struct {
kvList []kv.KeyValue
links map[trace.SpanContext][]kv.KeyValue
kind trace.SpanKind
msgKey []byte
}{
{
kvList: []kv.KeyValue{
standard.ServiceNameKey.String(serviceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String("test-topic"),
standard.MessagingOperationReceive,
standard.MessagingMessageIDKey.Int64(1),
kafkaPartitionKey.Int32(0),
},
links: map[trace.SpanContext][]kv.KeyValue{
trace.SpanFromContext(ctx).SpanContext(): nil,
},
kind: trace.SpanKindConsumer,
msgKey: []byte("foo"),
},
{
kvList: []kv.KeyValue{
standard.ServiceNameKey.String(serviceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String("test-topic"),
standard.MessagingOperationReceive,
standard.MessagingMessageIDKey.Int64(2),
kafkaPartitionKey.Int32(0),
},
links: make(map[trace.SpanContext][]kv.KeyValue),
kind: trace.SpanKindConsumer,
msgKey: []byte("foo2"),
},
}

for i, expected := range expectedList {
t.Run(fmt.Sprint("index", i), func(t *testing.T) {
span := spans[i]

assert.Equal(t, expected.links, span.Links)

remoteSpanFromMessage := trace.RemoteSpanContextFromContext(propagation.ExtractHTTP(context.Background(), propagators, NewConsumerMessageCarrier(msgList[i])))
assert.Equal(t, span.SpanContext(), remoteSpanFromMessage,
"span context should be injected into the consumer message headers")

assert.Equal(t, "kafka.consume", span.Name)
assert.Equal(t, expected.kind, span.Kind)
assert.Equal(t, expected.msgKey, msgList[i].Key)
for _, k := range expected.kvList {
assert.Equal(t, k.Value, span.Attributes[k.Key], k.Key)
}
})
}
}

func TestConsumer_ConsumePartitionWithError(t *testing.T) {
lizthegrey marked this conversation as resolved.
Show resolved Hide resolved
// Mock partition consumer controller
mockConsumer := mocks.NewConsumer(t, sarama.NewConfig())
mockConsumer.ExpectConsumePartition(topic, 0, 0)

consumer := WrapConsumer(serviceName, mockConsumer)
_, err := consumer.ConsumePartition(topic, 0, 0)
assert.NoError(t, err)
// Consume twice
_, err = consumer.ConsumePartition(topic, 0, 0)
assert.Error(t, err)
}
24 changes: 24 additions & 0 deletions instrumentation/github.com/Shopify/sarama/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package sarama provides functions to trace the Shopify/sarama package. (https://github.com/Shopify/sarama)
//
// The consumer's span will not be created as a child of the producer's span; instead, it will link the producer's span.
// (https://github.com/open-telemetry/opentelemetry-specification/blob/v0.6.0/specification/trace/semantic_conventions/messaging.md#batch-receiving)
//
// Context propagation only works on Kafka versions higher than 0.11.0.0 which supports record headers.
// (https://archive.apache.org/dist/kafka/0.11.0.0/RELEASE_NOTES.html)
//
// Based on: https://github.com/DataDog/dd-trace-go/tree/v1/contrib/Shopify/sarama
package sarama // import "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama"
14 changes: 14 additions & 0 deletions instrumentation/github.com/Shopify/sarama/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama

go 1.14

replace go.opentelemetry.io/contrib => ../../../..

require (
github.com/Shopify/sarama v1.26.4
github.com/google/uuid v1.1.1
github.com/stretchr/testify v1.6.1
go.opentelemetry.io/contrib v0.7.0
XSAM marked this conversation as resolved.
Show resolved Hide resolved
go.opentelemetry.io/otel v0.8.0
google.golang.org/grpc v1.30.0
)
Loading