Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add instrumentation for Kafka #134

Merged
merged 20 commits into from
Jul 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,7 @@ updates:
directory: "/instrumentation/runtime" # Location of package manifests
schedule:
interval: "daily"
- package-ecosystem: "gomod" # See documentation for possible values
directory: "/instrumentation/github.com/Shopify/sarama" # Location of package manifests
schedule:
interval: "daily"
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Added

- Add instrumentation for Kafka (github.com/Shopify/sarama). (#134)
- Add links and status message for mock span. (#134)

## [0.9.0] - 2020-07-20

This release upgrades its [go.opentelemetry.io/otel](https://github.com/open-telemetry/opentelemetry-go/releases/tag/v0.9.0) dependency to v0.9.0.
Expand Down
111 changes: 111 additions & 0 deletions instrumentation/github.com/Shopify/sarama/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sarama

import (
"context"
"strconv"

"github.com/Shopify/sarama"

"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/propagation"
"go.opentelemetry.io/otel/api/standard"
"go.opentelemetry.io/otel/api/trace"
)

type partitionConsumer struct {
sarama.PartitionConsumer
messages chan *sarama.ConsumerMessage
}

// Messages returns the read channel for the messages that are returned by
// the broker.
func (pc *partitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
return pc.messages
}

// WrapPartitionConsumer wraps a sarama.PartitionConsumer causing each received
// message to be traced.
func WrapPartitionConsumer(serviceName string, pc sarama.PartitionConsumer, opts ...Option) sarama.PartitionConsumer {
cfg := newConfig(serviceName, opts...)

wrapped := &partitionConsumer{
PartitionConsumer: pc,
messages: make(chan *sarama.ConsumerMessage),
}
go func() {
msgs := pc.Messages()

for msg := range msgs {
// Extract a span context from message to link.
carrier := NewConsumerMessageCarrier(msg)
parentSpanContext := propagation.ExtractHTTP(context.Background(), cfg.Propagators, carrier)

// Create a span.
attrs := []kv.KeyValue{
standard.ServiceNameKey.String(cfg.ServiceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String(msg.Topic),
standard.MessagingOperationReceive,
standard.MessagingMessageIDKey.String(strconv.FormatInt(msg.Offset, 10)),
kafkaPartitionKey.Int32(msg.Partition),
}
opts := []trace.StartOption{
trace.WithAttributes(attrs...),
trace.WithSpanKind(trace.SpanKindConsumer),
}
newCtx, span := cfg.Tracer.Start(parentSpanContext, "kafka.consume", opts...)

// Inject current span context, so consumers can use it to propagate span.
propagation.InjectHTTP(newCtx, cfg.Propagators, carrier)

// Send messages back to user.
wrapped.messages <- msg

span.End()
}
close(wrapped.messages)
}()
return wrapped
}

type consumer struct {
sarama.Consumer

serviceName string
opts []Option
}

// ConsumePartition invokes Consumer.ConsumePartition and wraps the resulting
// PartitionConsumer.
func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
pc, err := c.Consumer.ConsumePartition(topic, partition, offset)
if err != nil {
return nil, err
}
return WrapPartitionConsumer(c.serviceName, pc, c.opts...), nil
}

// WrapConsumer wraps a sarama.Consumer wrapping any PartitionConsumer created
// via Consumer.ConsumePartition.
func WrapConsumer(serviceName string, c sarama.Consumer, opts ...Option) sarama.Consumer {
return &consumer{
Consumer: c,
serviceName: serviceName,
opts: opts,
}
}
211 changes: 211 additions & 0 deletions instrumentation/github.com/Shopify/sarama/consumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sarama

import (
"context"
"fmt"
"testing"

"github.com/Shopify/sarama"
"github.com/Shopify/sarama/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/propagation"
"go.opentelemetry.io/otel/api/standard"
"go.opentelemetry.io/otel/api/trace"

mocktracer "go.opentelemetry.io/contrib/internal/trace"
)

const (
serviceName = "test-service-name"
topic = "test-topic"
)

var (
propagators = global.Propagators()
)

func TestWrapPartitionConsumer(t *testing.T) {
// Mock tracer
mt := mocktracer.NewTracer("kafka")

// Mock partition consumer controller
consumer := mocks.NewConsumer(t, sarama.NewConfig())
mockPartitionConsumer := consumer.ExpectConsumePartition(topic, 0, 0)

// Create partition consumer
partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0)
require.NoError(t, err)

partitionConsumer = WrapPartitionConsumer(serviceName, partitionConsumer, WithTracer(mt))

consumeAndCheck(t, mt, mockPartitionConsumer, partitionConsumer)
}

func TestWrapConsumer(t *testing.T) {
// Mock tracer
mt := mocktracer.NewTracer("kafka")

// Mock partition consumer controller
mockConsumer := mocks.NewConsumer(t, sarama.NewConfig())
mockPartitionConsumer := mockConsumer.ExpectConsumePartition(topic, 0, 0)

// Wrap consumer
consumer := WrapConsumer(serviceName, mockConsumer, WithTracer(mt))

// Create partition consumer
partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0)
require.NoError(t, err)

consumeAndCheck(t, mt, mockPartitionConsumer, partitionConsumer)
}

func consumeAndCheck(t *testing.T, mt *mocktracer.Tracer, mockPartitionConsumer *mocks.PartitionConsumer, partitionConsumer sarama.PartitionConsumer) {
// Create message with span context
ctx, _ := mt.Start(context.Background(), "")
message := sarama.ConsumerMessage{Key: []byte("foo")}
propagation.InjectHTTP(ctx, propagators, NewConsumerMessageCarrier(&message))

// Produce message
mockPartitionConsumer.YieldMessage(&message)
mockPartitionConsumer.YieldMessage(&sarama.ConsumerMessage{Key: []byte("foo2")})

// Consume messages
msgList := make([]*sarama.ConsumerMessage, 2)
msgList[0] = <-partitionConsumer.Messages()
msgList[1] = <-partitionConsumer.Messages()
require.NoError(t, partitionConsumer.Close())
// Wait for the channel to be closed
<-partitionConsumer.Messages()

// Check spans length
spans := mt.EndedSpans()
assert.Len(t, spans, 2)

expectedList := []struct {
kvList []kv.KeyValue
parentSpanID trace.SpanID
kind trace.SpanKind
msgKey []byte
}{
{
kvList: []kv.KeyValue{
standard.ServiceNameKey.String(serviceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String("test-topic"),
standard.MessagingOperationReceive,
standard.MessagingMessageIDKey.String("1"),
kafkaPartitionKey.Int32(0),
},
parentSpanID: trace.SpanFromContext(ctx).SpanContext().SpanID,
kind: trace.SpanKindConsumer,
msgKey: []byte("foo"),
},
{
kvList: []kv.KeyValue{
standard.ServiceNameKey.String(serviceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String("test-topic"),
standard.MessagingOperationReceive,
standard.MessagingMessageIDKey.String("2"),
kafkaPartitionKey.Int32(0),
},
kind: trace.SpanKindConsumer,
msgKey: []byte("foo2"),
},
}

for i, expected := range expectedList {
t.Run(fmt.Sprint("index", i), func(t *testing.T) {
span := spans[i]

assert.Equal(t, expected.parentSpanID, span.ParentSpanID)

remoteSpanFromMessage := trace.RemoteSpanContextFromContext(propagation.ExtractHTTP(context.Background(), propagators, NewConsumerMessageCarrier(msgList[i])))
assert.Equal(t, span.SpanContext(), remoteSpanFromMessage,
"span context should be injected into the consumer message headers")

assert.Equal(t, "kafka.consume", span.Name)
assert.Equal(t, expected.kind, span.Kind)
assert.Equal(t, expected.msgKey, msgList[i].Key)
for _, k := range expected.kvList {
assert.Equal(t, k.Value, span.Attributes[k.Key], k.Key)
}
})
}
}

func TestConsumerConsumePartitionWithError(t *testing.T) {
// Mock partition consumer controller
mockConsumer := mocks.NewConsumer(t, sarama.NewConfig())
mockConsumer.ExpectConsumePartition(topic, 0, 0)

consumer := WrapConsumer(serviceName, mockConsumer)
_, err := consumer.ConsumePartition(topic, 0, 0)
assert.NoError(t, err)
// Consume twice
_, err = consumer.ConsumePartition(topic, 0, 0)
assert.Error(t, err)
}

func BenchmarkWrapPartitionConsumer(b *testing.B) {
// Mock tracer
mt := mocktracer.NewTracer("kafka")

mockPartitionConsumer, partitionConsumer := createMockPartitionConsumer(b)

partitionConsumer = WrapPartitionConsumer(serviceName, partitionConsumer, WithTracer(mt))
message := sarama.ConsumerMessage{Key: []byte("foo")}

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
mockPartitionConsumer.YieldMessage(&message)
<-partitionConsumer.Messages()
}
}

func BenchmarkMockPartitionConsumer(b *testing.B) {
mockPartitionConsumer, partitionConsumer := createMockPartitionConsumer(b)

message := sarama.ConsumerMessage{Key: []byte("foo")}

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
mockPartitionConsumer.YieldMessage(&message)
<-partitionConsumer.Messages()
}
}

func createMockPartitionConsumer(b *testing.B) (*mocks.PartitionConsumer, sarama.PartitionConsumer) {
// Mock partition consumer controller
consumer := mocks.NewConsumer(b, sarama.NewConfig())
mockPartitionConsumer := consumer.ExpectConsumePartition(topic, 0, 0)

// Create partition consumer
partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0)
require.NoError(b, err)
return mockPartitionConsumer, partitionConsumer
}
23 changes: 23 additions & 0 deletions instrumentation/github.com/Shopify/sarama/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package sarama provides functions to trace the Shopify/sarama package. (https://github.com/Shopify/sarama)
//
// The consumer's span will be created as a child of the producer's span.
//
// Context propagation only works on Kafka versions higher than 0.11.0.0 which supports record headers.
// (https://archive.apache.org/dist/kafka/0.11.0.0/RELEASE_NOTES.html)
//
// Based on: https://github.com/DataDog/dd-trace-go/tree/v1/contrib/Shopify/sarama
package sarama // import "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama"
13 changes: 13 additions & 0 deletions instrumentation/github.com/Shopify/sarama/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
module go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama

go 1.14

replace go.opentelemetry.io/contrib => ../../../..

require (
github.com/Shopify/sarama v1.26.4
github.com/stretchr/testify v1.6.1
go.opentelemetry.io/contrib v0.9.0
go.opentelemetry.io/otel v0.9.0
google.golang.org/grpc v1.30.0
)
Loading