diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 6bd80cf35f5..fc8584c4be0 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -49,3 +49,7 @@ updates: directory: "/instrumentation/runtime" # Location of package manifests schedule: interval: "daily" + - package-ecosystem: "gomod" # See documentation for possible values + directory: "/instrumentation/github.com/Shopify/sarama" # Location of package manifests + schedule: + interval: "daily" diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d8f2b54411..065db1a5640 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] +### Added + +- Add instrumentation for Kafka (github.com/Shopify/sarama). (#134) +- Add links and status message for mock span. (#134) + ## [0.9.0] - 2020-07-20 This release upgrades its [go.opentelemetry.io/otel](https://github.com/open-telemetry/opentelemetry-go/releases/tag/v0.9.0) dependency to v0.9.0. diff --git a/instrumentation/github.com/Shopify/sarama/consumer.go b/instrumentation/github.com/Shopify/sarama/consumer.go new file mode 100644 index 00000000000..e25aefb1dfe --- /dev/null +++ b/instrumentation/github.com/Shopify/sarama/consumer.go @@ -0,0 +1,111 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sarama + +import ( + "context" + "strconv" + + "github.com/Shopify/sarama" + + "go.opentelemetry.io/otel/api/kv" + "go.opentelemetry.io/otel/api/propagation" + "go.opentelemetry.io/otel/api/standard" + "go.opentelemetry.io/otel/api/trace" +) + +type partitionConsumer struct { + sarama.PartitionConsumer + messages chan *sarama.ConsumerMessage +} + +// Messages returns the read channel for the messages that are returned by +// the broker. +func (pc *partitionConsumer) Messages() <-chan *sarama.ConsumerMessage { + return pc.messages +} + +// WrapPartitionConsumer wraps a sarama.PartitionConsumer causing each received +// message to be traced. +func WrapPartitionConsumer(serviceName string, pc sarama.PartitionConsumer, opts ...Option) sarama.PartitionConsumer { + cfg := newConfig(serviceName, opts...) + + wrapped := &partitionConsumer{ + PartitionConsumer: pc, + messages: make(chan *sarama.ConsumerMessage), + } + go func() { + msgs := pc.Messages() + + for msg := range msgs { + // Extract a span context from message to link. + carrier := NewConsumerMessageCarrier(msg) + parentSpanContext := propagation.ExtractHTTP(context.Background(), cfg.Propagators, carrier) + + // Create a span. + attrs := []kv.KeyValue{ + standard.ServiceNameKey.String(cfg.ServiceName), + standard.MessagingSystemKey.String("kafka"), + standard.MessagingDestinationKindKeyTopic, + standard.MessagingDestinationKey.String(msg.Topic), + standard.MessagingOperationReceive, + standard.MessagingMessageIDKey.String(strconv.FormatInt(msg.Offset, 10)), + kafkaPartitionKey.Int32(msg.Partition), + } + opts := []trace.StartOption{ + trace.WithAttributes(attrs...), + trace.WithSpanKind(trace.SpanKindConsumer), + } + newCtx, span := cfg.Tracer.Start(parentSpanContext, "kafka.consume", opts...) + + // Inject current span context, so consumers can use it to propagate span. + propagation.InjectHTTP(newCtx, cfg.Propagators, carrier) + + // Send messages back to user. + wrapped.messages <- msg + + span.End() + } + close(wrapped.messages) + }() + return wrapped +} + +type consumer struct { + sarama.Consumer + + serviceName string + opts []Option +} + +// ConsumePartition invokes Consumer.ConsumePartition and wraps the resulting +// PartitionConsumer. +func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) { + pc, err := c.Consumer.ConsumePartition(topic, partition, offset) + if err != nil { + return nil, err + } + return WrapPartitionConsumer(c.serviceName, pc, c.opts...), nil +} + +// WrapConsumer wraps a sarama.Consumer wrapping any PartitionConsumer created +// via Consumer.ConsumePartition. +func WrapConsumer(serviceName string, c sarama.Consumer, opts ...Option) sarama.Consumer { + return &consumer{ + Consumer: c, + serviceName: serviceName, + opts: opts, + } +} diff --git a/instrumentation/github.com/Shopify/sarama/consumer_test.go b/instrumentation/github.com/Shopify/sarama/consumer_test.go new file mode 100644 index 00000000000..78d9fba8d3b --- /dev/null +++ b/instrumentation/github.com/Shopify/sarama/consumer_test.go @@ -0,0 +1,211 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sarama + +import ( + "context" + "fmt" + "testing" + + "github.com/Shopify/sarama" + "github.com/Shopify/sarama/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/api/global" + "go.opentelemetry.io/otel/api/kv" + "go.opentelemetry.io/otel/api/propagation" + "go.opentelemetry.io/otel/api/standard" + "go.opentelemetry.io/otel/api/trace" + + mocktracer "go.opentelemetry.io/contrib/internal/trace" +) + +const ( + serviceName = "test-service-name" + topic = "test-topic" +) + +var ( + propagators = global.Propagators() +) + +func TestWrapPartitionConsumer(t *testing.T) { + // Mock tracer + mt := mocktracer.NewTracer("kafka") + + // Mock partition consumer controller + consumer := mocks.NewConsumer(t, sarama.NewConfig()) + mockPartitionConsumer := consumer.ExpectConsumePartition(topic, 0, 0) + + // Create partition consumer + partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0) + require.NoError(t, err) + + partitionConsumer = WrapPartitionConsumer(serviceName, partitionConsumer, WithTracer(mt)) + + consumeAndCheck(t, mt, mockPartitionConsumer, partitionConsumer) +} + +func TestWrapConsumer(t *testing.T) { + // Mock tracer + mt := mocktracer.NewTracer("kafka") + + // Mock partition consumer controller + mockConsumer := mocks.NewConsumer(t, sarama.NewConfig()) + mockPartitionConsumer := mockConsumer.ExpectConsumePartition(topic, 0, 0) + + // Wrap consumer + consumer := WrapConsumer(serviceName, mockConsumer, WithTracer(mt)) + + // Create partition consumer + partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0) + require.NoError(t, err) + + consumeAndCheck(t, mt, mockPartitionConsumer, partitionConsumer) +} + +func consumeAndCheck(t *testing.T, mt *mocktracer.Tracer, mockPartitionConsumer *mocks.PartitionConsumer, partitionConsumer sarama.PartitionConsumer) { + // Create message with span context + ctx, _ := mt.Start(context.Background(), "") + message := sarama.ConsumerMessage{Key: []byte("foo")} + propagation.InjectHTTP(ctx, propagators, NewConsumerMessageCarrier(&message)) + + // Produce message + mockPartitionConsumer.YieldMessage(&message) + mockPartitionConsumer.YieldMessage(&sarama.ConsumerMessage{Key: []byte("foo2")}) + + // Consume messages + msgList := make([]*sarama.ConsumerMessage, 2) + msgList[0] = <-partitionConsumer.Messages() + msgList[1] = <-partitionConsumer.Messages() + require.NoError(t, partitionConsumer.Close()) + // Wait for the channel to be closed + <-partitionConsumer.Messages() + + // Check spans length + spans := mt.EndedSpans() + assert.Len(t, spans, 2) + + expectedList := []struct { + kvList []kv.KeyValue + parentSpanID trace.SpanID + kind trace.SpanKind + msgKey []byte + }{ + { + kvList: []kv.KeyValue{ + standard.ServiceNameKey.String(serviceName), + standard.MessagingSystemKey.String("kafka"), + standard.MessagingDestinationKindKeyTopic, + standard.MessagingDestinationKey.String("test-topic"), + standard.MessagingOperationReceive, + standard.MessagingMessageIDKey.String("1"), + kafkaPartitionKey.Int32(0), + }, + parentSpanID: trace.SpanFromContext(ctx).SpanContext().SpanID, + kind: trace.SpanKindConsumer, + msgKey: []byte("foo"), + }, + { + kvList: []kv.KeyValue{ + standard.ServiceNameKey.String(serviceName), + standard.MessagingSystemKey.String("kafka"), + standard.MessagingDestinationKindKeyTopic, + standard.MessagingDestinationKey.String("test-topic"), + standard.MessagingOperationReceive, + standard.MessagingMessageIDKey.String("2"), + kafkaPartitionKey.Int32(0), + }, + kind: trace.SpanKindConsumer, + msgKey: []byte("foo2"), + }, + } + + for i, expected := range expectedList { + t.Run(fmt.Sprint("index", i), func(t *testing.T) { + span := spans[i] + + assert.Equal(t, expected.parentSpanID, span.ParentSpanID) + + remoteSpanFromMessage := trace.RemoteSpanContextFromContext(propagation.ExtractHTTP(context.Background(), propagators, NewConsumerMessageCarrier(msgList[i]))) + assert.Equal(t, span.SpanContext(), remoteSpanFromMessage, + "span context should be injected into the consumer message headers") + + assert.Equal(t, "kafka.consume", span.Name) + assert.Equal(t, expected.kind, span.Kind) + assert.Equal(t, expected.msgKey, msgList[i].Key) + for _, k := range expected.kvList { + assert.Equal(t, k.Value, span.Attributes[k.Key], k.Key) + } + }) + } +} + +func TestConsumerConsumePartitionWithError(t *testing.T) { + // Mock partition consumer controller + mockConsumer := mocks.NewConsumer(t, sarama.NewConfig()) + mockConsumer.ExpectConsumePartition(topic, 0, 0) + + consumer := WrapConsumer(serviceName, mockConsumer) + _, err := consumer.ConsumePartition(topic, 0, 0) + assert.NoError(t, err) + // Consume twice + _, err = consumer.ConsumePartition(topic, 0, 0) + assert.Error(t, err) +} + +func BenchmarkWrapPartitionConsumer(b *testing.B) { + // Mock tracer + mt := mocktracer.NewTracer("kafka") + + mockPartitionConsumer, partitionConsumer := createMockPartitionConsumer(b) + + partitionConsumer = WrapPartitionConsumer(serviceName, partitionConsumer, WithTracer(mt)) + message := sarama.ConsumerMessage{Key: []byte("foo")} + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + mockPartitionConsumer.YieldMessage(&message) + <-partitionConsumer.Messages() + } +} + +func BenchmarkMockPartitionConsumer(b *testing.B) { + mockPartitionConsumer, partitionConsumer := createMockPartitionConsumer(b) + + message := sarama.ConsumerMessage{Key: []byte("foo")} + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + mockPartitionConsumer.YieldMessage(&message) + <-partitionConsumer.Messages() + } +} + +func createMockPartitionConsumer(b *testing.B) (*mocks.PartitionConsumer, sarama.PartitionConsumer) { + // Mock partition consumer controller + consumer := mocks.NewConsumer(b, sarama.NewConfig()) + mockPartitionConsumer := consumer.ExpectConsumePartition(topic, 0, 0) + + // Create partition consumer + partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0) + require.NoError(b, err) + return mockPartitionConsumer, partitionConsumer +} diff --git a/instrumentation/github.com/Shopify/sarama/doc.go b/instrumentation/github.com/Shopify/sarama/doc.go new file mode 100644 index 00000000000..49c40958a4c --- /dev/null +++ b/instrumentation/github.com/Shopify/sarama/doc.go @@ -0,0 +1,23 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package sarama provides functions to trace the Shopify/sarama package. (https://github.com/Shopify/sarama) +// +// The consumer's span will be created as a child of the producer's span. +// +// Context propagation only works on Kafka versions higher than 0.11.0.0 which supports record headers. +// (https://archive.apache.org/dist/kafka/0.11.0.0/RELEASE_NOTES.html) +// +// Based on: https://github.com/DataDog/dd-trace-go/tree/v1/contrib/Shopify/sarama +package sarama // import "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama" diff --git a/instrumentation/github.com/Shopify/sarama/go.mod b/instrumentation/github.com/Shopify/sarama/go.mod new file mode 100644 index 00000000000..9c3db97a33b --- /dev/null +++ b/instrumentation/github.com/Shopify/sarama/go.mod @@ -0,0 +1,13 @@ +module go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama + +go 1.14 + +replace go.opentelemetry.io/contrib => ../../../.. + +require ( + github.com/Shopify/sarama v1.26.4 + github.com/stretchr/testify v1.6.1 + go.opentelemetry.io/contrib v0.9.0 + go.opentelemetry.io/otel v0.9.0 + google.golang.org/grpc v1.30.0 +) diff --git a/instrumentation/github.com/Shopify/sarama/go.sum b/instrumentation/github.com/Shopify/sarama/go.sum new file mode 100644 index 00000000000..57735a300ca --- /dev/null +++ b/instrumentation/github.com/Shopify/sarama/go.sum @@ -0,0 +1,150 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/DataDog/sketches-go v0.0.0-20190923095040-43f19ad77ff7 h1:qELHH0AWCvf98Yf+CNIJx9vOZOfHFDDzgDRYsnNk/vs= +github.com/DataDog/sketches-go v0.0.0-20190923095040-43f19ad77ff7/go.mod h1:Q5DbzQ+3AkgGwymQO7aZFNP7ns2lZKGtvRBzRXfdi60= +github.com/Shopify/sarama v1.26.4 h1:+17TxUq/PJEAfZAll0T7XJjSgQWCpaQSoki/x5yN8o8= +github.com/Shopify/sarama v1.26.4/go.mod h1:NbSGBSSndYaIhRcBtY9V0U7AyH+x71bG668AuWys/yU= +github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg= +github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q= +github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/frankban/quicktest v1.7.2 h1:2QxQoC1TS09S7fhCPsrvqYdvP1H5M1P1ih5ABm3BTYk= +github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= +github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA= +github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= +github.com/pierrec/lz4 v2.4.1+incompatible h1:mFe7ttWaflA46Mhqh+jUfjp2qTbPYxLB2/OyBppH9dg= +github.com/pierrec/lz4 v2.4.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 h1:dY6ETXrvDG7Sa4vE8ZQG4yqWg6UnOcbqTAahkV813vQ= +github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +go.opentelemetry.io/otel v0.9.0 h1:nsdCDHzQx1Yv8E2nwCPcMXMfg+EMIlx1LBOXNC8qSQ8= +go.opentelemetry.io/otel v0.9.0/go.mod h1:ckxzUEfk7tAkTwEMVdkllBM+YOfE/K9iwg6zYntFYSg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72 h1:+ELyKg6m8UBf0nPFSqD0mi7zUfwPyXo23HNjMnXPz7w= +golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20191009194640-548a555dbc03 h1:4HYDjxeNXAOTv3o1N2tjo8UUSlhQgAD52FVkwxnWgM8= +google.golang.org/genproto v0.0.0-20191009194640-548a555dbc03/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.30.0 h1:M5a8xTlYTxwMn5ZFkwhRabsygDY5G8TYLyQDBxJNAxE= +google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw= +gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q= +gopkg.in/jcmturner/goidentity.v3 v3.0.0 h1:1duIyWiTaYvVx3YX2CYtpJbUFd7/UuPYCfgXtQ3VTbI= +gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4= +gopkg.in/jcmturner/gokrb5.v7 v7.5.0 h1:a9tsXlIDD9SKxotJMK3niV7rPZAJeX2aD/0yg3qlIrg= +gopkg.in/jcmturner/gokrb5.v7 v7.5.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= +gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU= +gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/instrumentation/github.com/Shopify/sarama/message.go b/instrumentation/github.com/Shopify/sarama/message.go new file mode 100644 index 00000000000..f49534eb4df --- /dev/null +++ b/instrumentation/github.com/Shopify/sarama/message.go @@ -0,0 +1,94 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sarama + +import ( + "github.com/Shopify/sarama" + + "go.opentelemetry.io/otel/api/propagation" +) + +var _ propagation.HTTPSupplier = (*ProducerMessageCarrier)(nil) +var _ propagation.HTTPSupplier = (*ConsumerMessageCarrier)(nil) + +// ProducerMessageCarrier injects and extracts traces from a sarama.ProducerMessage. +type ProducerMessageCarrier struct { + msg *sarama.ProducerMessage +} + +// NewProducerMessageCarrier creates a new ProducerMessageCarrier. +func NewProducerMessageCarrier(msg *sarama.ProducerMessage) ProducerMessageCarrier { + return ProducerMessageCarrier{msg: msg} +} + +// Get retrieves a single value for a given key. +func (c ProducerMessageCarrier) Get(key string) string { + for _, h := range c.msg.Headers { + if string(h.Key) == key { + return string(h.Value) + } + } + return "" +} + +// Set sets a header. +func (c ProducerMessageCarrier) Set(key, val string) { + // Ensure uniqueness of keys + for i := 0; i < len(c.msg.Headers); i++ { + if string(c.msg.Headers[i].Key) == key { + c.msg.Headers = append(c.msg.Headers[:i], c.msg.Headers[i+1:]...) + i-- + } + } + c.msg.Headers = append(c.msg.Headers, sarama.RecordHeader{ + Key: []byte(key), + Value: []byte(val), + }) +} + +// ConsumerMessageCarrier injects and extracts traces from a sarama.ConsumerMessage. +type ConsumerMessageCarrier struct { + msg *sarama.ConsumerMessage +} + +// NewConsumerMessageCarrier creates a new ConsumerMessageCarrier. +func NewConsumerMessageCarrier(msg *sarama.ConsumerMessage) ConsumerMessageCarrier { + return ConsumerMessageCarrier{msg: msg} +} + +// Get retrieves a single value for a given key. +func (c ConsumerMessageCarrier) Get(key string) string { + for _, h := range c.msg.Headers { + if h != nil && string(h.Key) == key { + return string(h.Value) + } + } + return "" +} + +// Set sets a header. +func (c ConsumerMessageCarrier) Set(key, val string) { + // Ensure uniqueness of keys + for i := 0; i < len(c.msg.Headers); i++ { + if c.msg.Headers[i] != nil && string(c.msg.Headers[i].Key) == key { + c.msg.Headers = append(c.msg.Headers[:i], c.msg.Headers[i+1:]...) + i-- + } + } + c.msg.Headers = append(c.msg.Headers, &sarama.RecordHeader{ + Key: []byte(key), + Value: []byte(val), + }) +} diff --git a/instrumentation/github.com/Shopify/sarama/message_test.go b/instrumentation/github.com/Shopify/sarama/message_test.go new file mode 100644 index 00000000000..bc3962baac3 --- /dev/null +++ b/instrumentation/github.com/Shopify/sarama/message_test.go @@ -0,0 +1,120 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sarama + +import ( + "testing" + + "github.com/Shopify/sarama" + "github.com/stretchr/testify/assert" +) + +func TestProducerMessageCarrierGet(t *testing.T) { + testCases := []struct { + name string + carrier ProducerMessageCarrier + key string + expected string + }{ + { + name: "exists", + carrier: ProducerMessageCarrier{msg: &sarama.ProducerMessage{Headers: []sarama.RecordHeader{ + {Key: []byte("foo"), Value: []byte("bar")}, + }}}, + key: "foo", + expected: "bar", + }, + { + name: "not exists", + carrier: ProducerMessageCarrier{msg: &sarama.ProducerMessage{Headers: []sarama.RecordHeader{}}}, + key: "foo", + expected: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := tc.carrier.Get(tc.key) + assert.Equal(t, tc.expected, result) + }) + } +} + +func TestProducerMessageCarrierSet(t *testing.T) { + msg := sarama.ProducerMessage{Headers: []sarama.RecordHeader{ + {Key: []byte("foo"), Value: []byte("bar")}, + }} + carrier := ProducerMessageCarrier{msg: &msg} + + carrier.Set("foo", "bar2") + carrier.Set("foo2", "bar2") + carrier.Set("foo2", "bar3") + carrier.Set("foo3", "bar4") + + assert.ElementsMatch(t, carrier.msg.Headers, []sarama.RecordHeader{ + {Key: []byte("foo"), Value: []byte("bar2")}, + {Key: []byte("foo2"), Value: []byte("bar3")}, + {Key: []byte("foo3"), Value: []byte("bar4")}, + }) +} + +func TestConsumerMessageCarrierGet(t *testing.T) { + testCases := []struct { + name string + carrier ConsumerMessageCarrier + key string + expected string + }{ + { + name: "exists", + carrier: ConsumerMessageCarrier{msg: &sarama.ConsumerMessage{Headers: []*sarama.RecordHeader{ + {Key: []byte("foo"), Value: []byte("bar")}, + }}}, + key: "foo", + expected: "bar", + }, + { + name: "not exists", + carrier: ConsumerMessageCarrier{msg: &sarama.ConsumerMessage{Headers: []*sarama.RecordHeader{}}}, + key: "foo", + expected: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := tc.carrier.Get(tc.key) + assert.Equal(t, tc.expected, result) + }) + } +} + +func TestConsumerMessageCarrierSet(t *testing.T) { + msg := sarama.ConsumerMessage{Headers: []*sarama.RecordHeader{ + {Key: []byte("foo"), Value: []byte("bar")}, + }} + carrier := ConsumerMessageCarrier{msg: &msg} + + carrier.Set("foo", "bar2") + carrier.Set("foo2", "bar2") + carrier.Set("foo2", "bar3") + carrier.Set("foo3", "bar4") + + assert.ElementsMatch(t, carrier.msg.Headers, []*sarama.RecordHeader{ + {Key: []byte("foo"), Value: []byte("bar2")}, + {Key: []byte("foo2"), Value: []byte("bar3")}, + {Key: []byte("foo3"), Value: []byte("bar4")}, + }) +} diff --git a/instrumentation/github.com/Shopify/sarama/option.go b/instrumentation/github.com/Shopify/sarama/option.go new file mode 100644 index 00000000000..14c9811f638 --- /dev/null +++ b/instrumentation/github.com/Shopify/sarama/option.go @@ -0,0 +1,68 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sarama + +import ( + "go.opentelemetry.io/otel/api/global" + "go.opentelemetry.io/otel/api/kv" + otelpropagation "go.opentelemetry.io/otel/api/propagation" + "go.opentelemetry.io/otel/api/trace" +) + +const ( + defaultTracerName = "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama" + + kafkaPartitionKey = kv.Key("messaging.kafka.partition") +) + +type config struct { + ServiceName string + Tracer trace.Tracer + Propagators otelpropagation.Propagators +} + +// newConfig returns a config with all Options set. +func newConfig(serviceName string, opts ...Option) config { + cfg := config{Propagators: global.Propagators(), ServiceName: serviceName} + for _, opt := range opts { + opt(&cfg) + } + if cfg.Tracer == nil { + cfg.Tracer = global.Tracer(defaultTracerName) + } + return cfg +} + +// Option specifies instrumentation configuration options. +type Option func(*config) + +// WithTracer specifies a tracer to use for creating spans. If none is +// specified, a tracer named +// "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama" +// from the global provider is used. +func WithTracer(tracer trace.Tracer) Option { + return func(cfg *config) { + cfg.Tracer = tracer + } +} + +// WithPropagators specifies propagators to use for extracting +// information from the HTTP requests. If none are specified, global +// ones will be used. +func WithPropagators(propagators otelpropagation.Propagators) Option { + return func(cfg *config) { + cfg.Propagators = propagators + } +} diff --git a/instrumentation/github.com/Shopify/sarama/option_test.go b/instrumentation/github.com/Shopify/sarama/option_test.go new file mode 100644 index 00000000000..fa17279ed0c --- /dev/null +++ b/instrumentation/github.com/Shopify/sarama/option_test.go @@ -0,0 +1,73 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sarama + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/otel/api/global" +) + +func TestNewConfig(t *testing.T) { + testCases := []struct { + name string + serviceName string + opts []Option + expected config + }{ + { + name: "set service name", + serviceName: serviceName, + expected: config{ + ServiceName: serviceName, + Tracer: global.Tracer(defaultTracerName), + Propagators: global.Propagators(), + }, + }, + { + name: "with tracer", + serviceName: serviceName, + opts: []Option{ + WithTracer(global.Tracer("new")), + }, + expected: config{ + ServiceName: serviceName, + Tracer: global.Tracer("new"), + Propagators: global.Propagators(), + }, + }, + { + name: "with propagators", + serviceName: serviceName, + opts: []Option{ + WithPropagators(nil), + }, + expected: config{ + ServiceName: serviceName, + Tracer: global.Tracer(defaultTracerName), + Propagators: nil, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := newConfig(tc.serviceName, tc.opts...) + assert.Equal(t, tc.expected, result) + }) + } +} diff --git a/instrumentation/github.com/Shopify/sarama/producer.go b/instrumentation/github.com/Shopify/sarama/producer.go new file mode 100644 index 00000000000..0d68a979bc7 --- /dev/null +++ b/instrumentation/github.com/Shopify/sarama/producer.go @@ -0,0 +1,265 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sarama + +import ( + "context" + "strconv" + + "github.com/Shopify/sarama" + "google.golang.org/grpc/codes" + + "go.opentelemetry.io/otel/api/kv" + "go.opentelemetry.io/otel/api/propagation" + "go.opentelemetry.io/otel/api/standard" + "go.opentelemetry.io/otel/api/trace" +) + +type syncProducer struct { + sarama.SyncProducer + cfg config + saramaConfig *sarama.Config +} + +// SendMessage calls sarama.SyncProducer.SendMessage and traces the request. +func (p *syncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) { + span := startProducerSpan(p.cfg, p.saramaConfig.Version, msg) + partition, offset, err = p.SyncProducer.SendMessage(msg) + finishProducerSpan(span, partition, offset, err) + return partition, offset, err +} + +// SendMessages calls sarama.SyncProducer.SendMessages and traces the requests. +func (p *syncProducer) SendMessages(msgs []*sarama.ProducerMessage) error { + // Although there's only one call made to the SyncProducer, the messages are + // treated individually, so we create a span for each one + spans := make([]trace.Span, len(msgs)) + for i, msg := range msgs { + spans[i] = startProducerSpan(p.cfg, p.saramaConfig.Version, msg) + } + err := p.SyncProducer.SendMessages(msgs) + for i, span := range spans { + finishProducerSpan(span, msgs[i].Partition, msgs[i].Offset, err) + } + return err +} + +// WrapSyncProducer wraps a sarama.SyncProducer so that all produced messages +// are traced. +func WrapSyncProducer(serviceName string, saramaConfig *sarama.Config, producer sarama.SyncProducer, opts ...Option) sarama.SyncProducer { + cfg := newConfig(serviceName, opts...) + if saramaConfig == nil { + saramaConfig = sarama.NewConfig() + } + + return &syncProducer{ + SyncProducer: producer, + cfg: cfg, + saramaConfig: saramaConfig, + } +} + +type closeType int + +const ( + closeSync closeType = iota + closeAsync +) + +type asyncProducer struct { + sarama.AsyncProducer + input chan *sarama.ProducerMessage + successes chan *sarama.ProducerMessage + errors chan *sarama.ProducerError + closeErr chan error +} + +// Input returns the input channel. +func (p *asyncProducer) Input() chan<- *sarama.ProducerMessage { + return p.input +} + +// Successes returns the successes channel. +func (p *asyncProducer) Successes() <-chan *sarama.ProducerMessage { + return p.successes +} + +// Errors returns the errors channel. +func (p *asyncProducer) Errors() <-chan *sarama.ProducerError { + return p.errors +} + +// AsyncClose async close producer. +func (p *asyncProducer) AsyncClose() { + p.input <- &sarama.ProducerMessage{ + Metadata: closeAsync, + } +} + +// Close shuts down the producer and waits for any buffered messages to be +// flushed. +// +// Due to the implement of sarama, some messages may lose successes or errors status +// while closing. +func (p *asyncProducer) Close() error { + p.input <- &sarama.ProducerMessage{ + Metadata: closeSync, + } + return <-p.closeErr +} + +type producerMessageContext struct { + span trace.Span + metadataBackup interface{} +} + +// WrapAsyncProducer wraps a sarama.AsyncProducer so that all produced messages +// are traced. It requires the underlying sarama Config so we can know whether +// or not successes will be returned. +// +// If `Return.Successes` is false, there is no way to know partition and offset of +// the message. +func WrapAsyncProducer(serviceName string, saramaConfig *sarama.Config, p sarama.AsyncProducer, opts ...Option) sarama.AsyncProducer { + cfg := newConfig(serviceName, opts...) + if saramaConfig == nil { + saramaConfig = sarama.NewConfig() + } + + wrapped := &asyncProducer{ + AsyncProducer: p, + input: make(chan *sarama.ProducerMessage), + successes: make(chan *sarama.ProducerMessage), + errors: make(chan *sarama.ProducerError), + closeErr: make(chan error), + } + go func() { + producerMessageContexts := make(map[interface{}]producerMessageContext) + // Clear all spans. + // Sarama will consume all the successes and errors by itself while closing, + // so our `Successes()` and `Errors()` may get nothing and those remaining spans + // cannot be closed. + defer func() { + for _, mc := range producerMessageContexts { + finishProducerSpan(mc.span, 0, 0, nil) + } + }() + defer close(wrapped.successes) + defer close(wrapped.errors) + for { + select { + case msg := <-wrapped.input: + // Shut down if message metadata is a close type. + // Sarama will close after dispatching every message. + // So wrapper should follow this mechanism by adding a special message at + // the end of the input channel. + if ct, ok := msg.Metadata.(closeType); ok { + switch ct { + case closeSync: + go func() { + wrapped.closeErr <- p.Close() + }() + case closeAsync: + p.AsyncClose() + } + continue + } + + span := startProducerSpan(cfg, saramaConfig.Version, msg) + + // Create message context, backend message metadata + mc := producerMessageContext{ + metadataBackup: msg.Metadata, + span: span, + } + + // Specific metadata with span id + msg.Metadata = span.SpanContext().SpanID + + p.Input() <- msg + if saramaConfig.Producer.Return.Successes { + producerMessageContexts[msg.Metadata] = mc + } else { + // If returning successes isn't enabled, we just finish the + // span right away because there's no way to know when it will + // be done. + finishProducerSpan(span, msg.Partition, msg.Offset, nil) + } + case msg, ok := <-p.Successes(): + if !ok { + // producer was closed, so exit + return + } + key := msg.Metadata + if mc, ok := producerMessageContexts[key]; ok { + delete(producerMessageContexts, key) + finishProducerSpan(mc.span, msg.Partition, msg.Offset, nil) + + // Restore message metadata + msg.Metadata = mc.metadataBackup + } + wrapped.successes <- msg + case err, ok := <-p.Errors(): + if !ok { + // producer was closed + return + } + key := err.Msg.Metadata + if mc, ok := producerMessageContexts[key]; ok { + delete(producerMessageContexts, key) + finishProducerSpan(mc.span, err.Msg.Partition, err.Msg.Offset, err.Err) + } + wrapped.errors <- err + } + } + }() + return wrapped +} + +func startProducerSpan(cfg config, version sarama.KafkaVersion, msg *sarama.ProducerMessage) trace.Span { + // If there's a span context in the message, use that as the parent context. + carrier := NewProducerMessageCarrier(msg) + ctx := propagation.ExtractHTTP(context.Background(), cfg.Propagators, carrier) + + // Create a span. + attrs := []kv.KeyValue{ + standard.ServiceNameKey.String(cfg.ServiceName), + standard.MessagingSystemKey.String("kafka"), + standard.MessagingDestinationKindKeyTopic, + standard.MessagingDestinationKey.String(msg.Topic), + } + opts := []trace.StartOption{ + trace.WithAttributes(attrs...), + trace.WithSpanKind(trace.SpanKindProducer), + } + ctx, span := cfg.Tracer.Start(ctx, "kafka.produce", opts...) + + if version.IsAtLeast(sarama.V0_11_0_0) { + // Inject current span context, so consumers can use it to propagate span. + propagation.InjectHTTP(ctx, cfg.Propagators, carrier) + } + + return span +} + +func finishProducerSpan(span trace.Span, partition int32, offset int64, err error) { + span.SetAttributes( + standard.MessagingMessageIDKey.String(strconv.FormatInt(offset, 10)), + kafkaPartitionKey.Int32(partition), + ) + if err != nil { + span.SetStatus(codes.Internal, err.Error()) + } + span.End() +} diff --git a/instrumentation/github.com/Shopify/sarama/producer_test.go b/instrumentation/github.com/Shopify/sarama/producer_test.go new file mode 100644 index 00000000000..3f1e7c994b4 --- /dev/null +++ b/instrumentation/github.com/Shopify/sarama/producer_test.go @@ -0,0 +1,419 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sarama + +import ( + "context" + "errors" + "testing" + + "github.com/Shopify/sarama" + "github.com/Shopify/sarama/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + + "go.opentelemetry.io/otel/api/kv" + "go.opentelemetry.io/otel/api/propagation" + "go.opentelemetry.io/otel/api/standard" + "go.opentelemetry.io/otel/api/trace" + + mocktracer "go.opentelemetry.io/contrib/internal/trace" +) + +func TestWrapSyncProducer(t *testing.T) { + var err error + + // Mock tracer + mt := mocktracer.NewTracer("kafka") + + cfg := newSaramaConfig() + // Mock sync producer + mockSyncProducer := mocks.NewSyncProducer(t, cfg) + + // Wrap sync producer + syncProducer := WrapSyncProducer(serviceName, cfg, mockSyncProducer, WithTracer(mt)) + + // Create message with span context + ctx, _ := mt.Start(context.Background(), "") + messageWithSpanContext := sarama.ProducerMessage{Topic: topic, Key: sarama.StringEncoder("foo")} + propagation.InjectHTTP(ctx, propagators, NewProducerMessageCarrier(&messageWithSpanContext)) + + // Expected + expectedList := []struct { + kvList []kv.KeyValue + parentSpanID trace.SpanID + kind trace.SpanKind + }{ + { + kvList: []kv.KeyValue{ + standard.ServiceNameKey.String(serviceName), + standard.MessagingSystemKey.String("kafka"), + standard.MessagingDestinationKindKeyTopic, + standard.MessagingDestinationKey.String(topic), + standard.MessagingMessageIDKey.String("1"), + kafkaPartitionKey.Int32(0), + }, + parentSpanID: trace.SpanFromContext(ctx).SpanContext().SpanID, + kind: trace.SpanKindProducer, + }, + { + kvList: []kv.KeyValue{ + standard.ServiceNameKey.String(serviceName), + standard.MessagingSystemKey.String("kafka"), + standard.MessagingDestinationKindKeyTopic, + standard.MessagingDestinationKey.String(topic), + standard.MessagingMessageIDKey.String("2"), + kafkaPartitionKey.Int32(0), + }, + kind: trace.SpanKindProducer, + }, + { + kvList: []kv.KeyValue{ + standard.ServiceNameKey.String(serviceName), + standard.MessagingSystemKey.String("kafka"), + standard.MessagingDestinationKindKeyTopic, + standard.MessagingDestinationKey.String(topic), + // TODO: The mock sync producer of sarama does not handle the offset while sending messages + // https://github.com/Shopify/sarama/pull/1747 + //standard.MessagingMessageIDKey.String("3"), + kafkaPartitionKey.Int32(0), + }, + kind: trace.SpanKindProducer, + }, + { + kvList: []kv.KeyValue{ + standard.ServiceNameKey.String(serviceName), + standard.MessagingSystemKey.String("kafka"), + standard.MessagingDestinationKindKeyTopic, + standard.MessagingDestinationKey.String(topic), + //standard.MessagingMessageIDKey.String("4"), + kafkaPartitionKey.Int32(0), + }, + kind: trace.SpanKindProducer, + }, + } + for i := 0; i < len(expectedList); i++ { + mockSyncProducer.ExpectSendMessageAndSucceed() + } + + // Send message + msgList := []*sarama.ProducerMessage{ + &messageWithSpanContext, + {Topic: topic, Key: sarama.StringEncoder("foo2")}, + {Topic: topic, Key: sarama.StringEncoder("foo3")}, + {Topic: topic, Key: sarama.StringEncoder("foo4")}, + } + _, _, err = syncProducer.SendMessage(msgList[0]) + require.NoError(t, err) + _, _, err = syncProducer.SendMessage(msgList[1]) + require.NoError(t, err) + // Send messages + require.NoError(t, syncProducer.SendMessages(msgList[2:])) + + spanList := mt.EndedSpans() + for i, expected := range expectedList { + span := spanList[i] + msg := msgList[i] + + // Check span + assert.True(t, span.SpanContext().IsValid()) + assert.Equal(t, expected.parentSpanID, span.ParentSpanID) + assert.Equal(t, "kafka.produce", span.Name) + assert.Equal(t, expected.kind, span.Kind) + for _, k := range expected.kvList { + assert.Equal(t, k.Value, span.Attributes[k.Key], k.Key) + } + + // Check tracing propagation + remoteSpanFromMessage := trace.RemoteSpanContextFromContext(propagation.ExtractHTTP(context.Background(), propagators, NewProducerMessageCarrier(msg))) + assert.True(t, remoteSpanFromMessage.IsValid()) + } +} + +func TestWrapAsyncProducer(t *testing.T) { + // Create message with span context + createMessages := func(mt *mocktracer.Tracer) []*sarama.ProducerMessage { + ctx, _ := mt.Start(context.Background(), "") + messageWithSpanContext := sarama.ProducerMessage{Topic: topic, Key: sarama.StringEncoder("foo")} + propagation.InjectHTTP(ctx, propagators, NewProducerMessageCarrier(&messageWithSpanContext)) + mt.EndedSpans() + + return []*sarama.ProducerMessage{ + &messageWithSpanContext, + {Topic: topic, Key: sarama.StringEncoder("foo2")}, + } + } + + t.Run("without successes config", func(t *testing.T) { + mt := mocktracer.NewTracer("kafka") + cfg := newSaramaConfig() + mockAsyncProducer := mocks.NewAsyncProducer(t, cfg) + ap := WrapAsyncProducer(serviceName, cfg, mockAsyncProducer, WithTracer(mt)) + + msgList := createMessages(mt) + // Send message + for _, msg := range msgList { + mockAsyncProducer.ExpectInputAndSucceed() + ap.Input() <- msg + } + + err := ap.Close() + require.NoError(t, err) + + spanList := mt.EndedSpans() + + // Expected + expectedList := []struct { + kvList []kv.KeyValue + parentSpanID trace.SpanID + kind trace.SpanKind + }{ + { + kvList: []kv.KeyValue{ + standard.ServiceNameKey.String(serviceName), + standard.MessagingSystemKey.String("kafka"), + standard.MessagingDestinationKindKeyTopic, + standard.MessagingDestinationKey.String(topic), + standard.MessagingMessageIDKey.String("0"), + kafkaPartitionKey.Int32(0), + }, + parentSpanID: trace.SpanID{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1}, + kind: trace.SpanKindProducer, + }, + { + kvList: []kv.KeyValue{ + standard.ServiceNameKey.String(serviceName), + standard.MessagingSystemKey.String("kafka"), + standard.MessagingDestinationKindKeyTopic, + standard.MessagingDestinationKey.String(topic), + standard.MessagingMessageIDKey.String("0"), + kafkaPartitionKey.Int32(0), + }, + kind: trace.SpanKindProducer, + }, + } + for i, expected := range expectedList { + span := spanList[i] + msg := msgList[i] + + // Check span + assert.True(t, span.SpanContext().IsValid()) + assert.Equal(t, expected.parentSpanID, span.ParentSpanID) + assert.Equal(t, "kafka.produce", span.Name) + assert.Equal(t, expected.kind, span.Kind) + for _, k := range expected.kvList { + assert.Equal(t, k.Value, span.Attributes[k.Key], k.Key) + } + + // Check tracing propagation + remoteSpanFromMessage := trace.RemoteSpanContextFromContext(propagation.ExtractHTTP(context.Background(), propagators, NewProducerMessageCarrier(msg))) + assert.True(t, remoteSpanFromMessage.IsValid()) + } + }) + + t.Run("with successes config", func(t *testing.T) { + mt := mocktracer.NewTracer("kafka") + + // Set producer with successes config + cfg := newSaramaConfig() + cfg.Producer.Return.Successes = true + + mockAsyncProducer := mocks.NewAsyncProducer(t, cfg) + ap := WrapAsyncProducer(serviceName, cfg, mockAsyncProducer, WithTracer(mt)) + + msgList := createMessages(mt) + // Send message + for i, msg := range msgList { + mockAsyncProducer.ExpectInputAndSucceed() + // Add metadata to msg + msg.Metadata = i + ap.Input() <- msg + newMsg := <-ap.Successes() + assert.Equal(t, newMsg, msg) + } + + err := ap.Close() + require.NoError(t, err) + + spanList := mt.EndedSpans() + + // Expected + expectedList := []struct { + kvList []kv.KeyValue + parentSpanID trace.SpanID + kind trace.SpanKind + }{ + { + kvList: []kv.KeyValue{ + standard.ServiceNameKey.String(serviceName), + standard.MessagingSystemKey.String("kafka"), + standard.MessagingDestinationKindKeyTopic, + standard.MessagingDestinationKey.String(topic), + standard.MessagingMessageIDKey.String("1"), + kafkaPartitionKey.Int32(0), + }, + parentSpanID: trace.SpanID{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1}, + kind: trace.SpanKindProducer, + }, + { + kvList: []kv.KeyValue{ + standard.ServiceNameKey.String(serviceName), + standard.MessagingSystemKey.String("kafka"), + standard.MessagingDestinationKindKeyTopic, + standard.MessagingDestinationKey.String(topic), + standard.MessagingMessageIDKey.String("2"), + kafkaPartitionKey.Int32(0), + }, + kind: trace.SpanKindProducer, + }, + } + for i, expected := range expectedList { + span := spanList[i] + msg := msgList[i] + + // Check span + assert.True(t, span.SpanContext().IsValid()) + assert.Equal(t, expected.parentSpanID, span.ParentSpanID) + assert.Equal(t, "kafka.produce", span.Name) + assert.Equal(t, expected.kind, span.Kind) + for _, k := range expected.kvList { + assert.Equal(t, k.Value, span.Attributes[k.Key], k.Key) + } + + // Check metadata + assert.Equal(t, i, msg.Metadata) + + // Check tracing propagation + remoteSpanFromMessage := trace.RemoteSpanContextFromContext(propagation.ExtractHTTP(context.Background(), propagators, NewProducerMessageCarrier(msg))) + assert.True(t, remoteSpanFromMessage.IsValid()) + } + }) +} + +func TestWrapAsyncProducerError(t *testing.T) { + mt := mocktracer.NewTracer("kafka") + + // Set producer with successes config + cfg := newSaramaConfig() + cfg.Producer.Return.Successes = true + + mockAsyncProducer := mocks.NewAsyncProducer(t, cfg) + ap := WrapAsyncProducer(serviceName, cfg, mockAsyncProducer, WithTracer(mt)) + + mockAsyncProducer.ExpectInputAndFail(errors.New("test")) + ap.Input() <- &sarama.ProducerMessage{Topic: topic, Key: sarama.StringEncoder("foo2")} + + err := <-ap.Errors() + require.Error(t, err) + + ap.AsyncClose() + + spanList := mt.EndedSpans() + assert.Len(t, spanList, 1) + + span := spanList[0] + + assert.Equal(t, codes.Internal, span.Status) + assert.Equal(t, "test", span.StatusMessage) +} + +func newSaramaConfig() *sarama.Config { + cfg := sarama.NewConfig() + cfg.Version = sarama.V0_11_0_0 + return cfg +} + +func BenchmarkWrapSyncProducer(b *testing.B) { + // Mock tracer + mt := mocktracer.NewTracer("kafka") + + cfg := newSaramaConfig() + // Mock sync producer + mockSyncProducer := mocks.NewSyncProducer(b, cfg) + + // Wrap sync producer + syncProducer := WrapSyncProducer(serviceName, cfg, mockSyncProducer, WithTracer(mt)) + message := sarama.ProducerMessage{Key: sarama.StringEncoder("foo")} + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + mockSyncProducer.ExpectSendMessageAndSucceed() + _, _, err := syncProducer.SendMessage(&message) + assert.NoError(b, err) + } +} + +func BenchmarkMockSyncProducer(b *testing.B) { + cfg := newSaramaConfig() + // Mock sync producer + mockSyncProducer := mocks.NewSyncProducer(b, cfg) + + // Wrap sync producer + syncProducer := mockSyncProducer + message := sarama.ProducerMessage{Key: sarama.StringEncoder("foo")} + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + mockSyncProducer.ExpectSendMessageAndSucceed() + _, _, err := syncProducer.SendMessage(&message) + assert.NoError(b, err) + } +} + +func BenchmarkWrapAsyncProducer(b *testing.B) { + // Mock tracer + mt := mocktracer.NewTracer("kafka") + + cfg := newSaramaConfig() + cfg.Producer.Return.Successes = true + mockAsyncProducer := mocks.NewAsyncProducer(b, cfg) + + // Wrap sync producer + asyncProducer := WrapAsyncProducer(serviceName, cfg, mockAsyncProducer, WithTracer(mt)) + message := sarama.ProducerMessage{Key: sarama.StringEncoder("foo")} + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + mockAsyncProducer.ExpectInputAndSucceed() + asyncProducer.Input() <- &message + <-asyncProducer.Successes() + } +} + +func BenchmarkMockAsyncProducer(b *testing.B) { + cfg := newSaramaConfig() + cfg.Producer.Return.Successes = true + mockAsyncProducer := mocks.NewAsyncProducer(b, cfg) + + // Wrap sync producer + asyncProducer := mockAsyncProducer + message := sarama.ProducerMessage{Key: sarama.StringEncoder("foo")} + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + mockAsyncProducer.ExpectInputAndSucceed() + mockAsyncProducer.Input() <- &message + <-asyncProducer.Successes() + } +} diff --git a/internal/trace/mock_span.go b/internal/trace/mock_span.go index c35f4b539fa..f9807592309 100644 --- a/internal/trace/mock_span.go +++ b/internal/trace/mock_span.go @@ -28,13 +28,15 @@ import ( // Span is a mock span used in association with Tracer for // testing purpose only. type Span struct { - sc oteltrace.SpanContext - tracer *Tracer - Name string - Attributes map[otelkv.Key]otelvalue.Value - Kind oteltrace.SpanKind - Status codes.Code - ParentSpanID oteltrace.SpanID + sc oteltrace.SpanContext + tracer *Tracer + Name string + Attributes map[otelkv.Key]otelvalue.Value + Kind oteltrace.SpanKind + Status codes.Code + StatusMessage string + ParentSpanID oteltrace.SpanID + Links map[oteltrace.SpanContext][]otelkv.KeyValue } var _ oteltrace.Span = (*Span)(nil) @@ -57,6 +59,7 @@ func (ms *Span) IsRecording() bool { // SetStatus sets the Status member. func (ms *Span) SetStatus(status codes.Code, msg string) { ms.Status = status + ms.StatusMessage = msg } // SetAttribute adds a single inferred attribute. diff --git a/internal/trace/mock_tracer.go b/internal/trace/mock_tracer.go index d5dd855a22c..553b74e1032 100644 --- a/internal/trace/mock_tracer.go +++ b/internal/trace/mock_tracer.go @@ -21,7 +21,10 @@ import ( "sync" "sync/atomic" + "go.opentelemetry.io/otel/api/kv" oteltrace "go.opentelemetry.io/otel/api/trace" + + "go.opentelemetry.io/contrib/internal/trace/parent" ) type Provider struct { @@ -116,7 +119,7 @@ func (mt *Tracer) Start(ctx context.Context, name string, o ...oteltrace.StartOp var span *Span var sc oteltrace.SpanContext - parentSpanContext := getSpanContext(ctx, opts.NewRoot) + parentSpanContext, _, links := parent.GetSpanContextAndLinks(ctx, opts.NewRoot) parentSpanID := parentSpanContext.SpanID if !parentSpanContext.IsValid() { @@ -136,6 +139,7 @@ func (mt *Tracer) Start(ctx context.Context, name string, o ...oteltrace.StartOp Name: name, Attributes: nil, ParentSpanID: parentSpanID, + Links: make(map[oteltrace.SpanContext][]kv.KeyValue), } if len(opts.Attributes) > 0 { span.SetAttributes(opts.Attributes...) @@ -145,23 +149,12 @@ func (mt *Tracer) Start(ctx context.Context, name string, o ...oteltrace.StartOp mt.OnSpanStarted(span) } - return oteltrace.ContextWithSpan(ctx, span), span -} - -func getSpanContext(ctx context.Context, ignoreContext bool) oteltrace.SpanContext { - if ignoreContext { - return oteltrace.EmptySpanContext() - } - - lsctx := oteltrace.SpanFromContext(ctx).SpanContext() - if lsctx.IsValid() { - return lsctx + for _, link := range links { + span.Links[link.SpanContext] = link.Attributes } - - rsctx := oteltrace.RemoteSpanContextFromContext(ctx) - if rsctx.IsValid() { - return rsctx + for _, link := range opts.Links { + span.Links[link.SpanContext] = link.Attributes } - return oteltrace.EmptySpanContext() + return oteltrace.ContextWithSpan(ctx, span), span } diff --git a/internal/trace/parent/parent.go b/internal/trace/parent/parent.go new file mode 100644 index 00000000000..5080aed9187 --- /dev/null +++ b/internal/trace/parent/parent.go @@ -0,0 +1,53 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package parent + +import ( + "context" + + "go.opentelemetry.io/otel/api/kv" + "go.opentelemetry.io/otel/api/trace" +) + +func GetSpanContextAndLinks(ctx context.Context, ignoreContext bool) (trace.SpanContext, bool, []trace.Link) { + lsctx := trace.SpanFromContext(ctx).SpanContext() + rsctx := trace.RemoteSpanContextFromContext(ctx) + + if ignoreContext { + links := addLinkIfValid(nil, lsctx, "current") + links = addLinkIfValid(links, rsctx, "remote") + + return trace.EmptySpanContext(), false, links + } + if lsctx.IsValid() { + return lsctx, false, nil + } + if rsctx.IsValid() { + return rsctx, true, nil + } + return trace.EmptySpanContext(), false, nil +} + +func addLinkIfValid(links []trace.Link, sc trace.SpanContext, kind string) []trace.Link { + if !sc.IsValid() { + return links + } + return append(links, trace.Link{ + SpanContext: sc, + Attributes: []kv.KeyValue{ + kv.String("ignored-on-demand", kind), + }, + }) +}