From ce6ff12df33e6b361806380c6008bd21bf278a23 Mon Sep 17 00:00:00 2001 From: Sam Xie Date: Tue, 20 Oct 2020 15:29:08 +0800 Subject: [PATCH] Support more encodings for kafka.consumer in OTel Ingester Signed-off-by: Sam Xie --- cmd/ingester/app/flags.go | 7 ++++++- .../app/receiver/kafkareceiver/kafka_receiver.go | 8 ++++++++ .../kafkareceiver/kafka_receiver_test.go | 16 ++++++++++++++++ plugin/storage/kafka/options.go | 6 ++++++ 4 files changed, 36 insertions(+), 1 deletion(-) diff --git a/cmd/ingester/app/flags.go b/cmd/ingester/app/flags.go index 98efb6eb4ce2..4ce110f93483 100644 --- a/cmd/ingester/app/flags.go +++ b/cmd/ingester/app/flags.go @@ -114,7 +114,12 @@ func AddOTELFlags(flagSet *flag.FlagSet) { flagSet.String( KafkaConsumerConfigPrefix+SuffixEncoding, DefaultEncoding, - fmt.Sprintf(`The encoding of spans ("%s") consumed from kafka`, strings.Join(kafka.AllEncodings, "\", \""))) + fmt.Sprintf(`The encoding of spans ("%s") consumed from kafka`, strings.Join( + append(kafka.AllEncodings, + kafka.EncodingZipkinJSON, + kafka.EncodingZipkinProto, + kafka.EncodingOTLPProto, + ), "\", \""))) auth.AddFlags(KafkaConsumerConfigPrefix, flagSet) } diff --git a/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go b/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go index 3b4a3a2fd4ff..37356a4b59ee 100644 --- a/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go +++ b/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go @@ -145,6 +145,14 @@ func MustOtelEncodingForJaegerEncoding(jaegerEncoding string) string { return "jaeger_proto" case kafka.EncodingJSON: return "jaeger_json" + case kafka.EncodingOTLPProto: + return "otlp_proto" + case kafka.EncodingZipkinProto: + return "zipkin_proto" + case kafka.EncodingZipkinJSON: + return "zipkin_json" + case kafka.EncodingZipkinThrift: + return "zipkin_thrift" } panic(jaegerEncoding + " is not a supported kafka encoding in the OTEL collector.") diff --git a/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver_test.go b/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver_test.go index 921161bc1997..ee5e50ffe725 100644 --- a/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver_test.go +++ b/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver_test.go @@ -104,6 +104,22 @@ func TestMustOtelEncodingForJaegerEncoding(t *testing.T) { in: kafka.EncodingJSON, expected: "jaeger_json", }, + { + in: kafka.EncodingOTLPProto, + expected: "otlp_proto", + }, + { + in: kafka.EncodingZipkinProto, + expected: "zipkin_proto", + }, + { + in: kafka.EncodingZipkinJSON, + expected: "zipkin_json", + }, + { + in: kafka.EncodingZipkinThrift, + expected: "zipkin_thrift", + }, { in: "not-an-encoding", expectsPanic: true, diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index 57af48c63435..d7f89c150ae1 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -34,6 +34,12 @@ const ( EncodingProto = "protobuf" // EncodingZipkinThrift is used for spans encoded as Zipkin Thrift. EncodingZipkinThrift = "zipkin-thrift" + // EncodingZipkinProto is used for spans encoded as Zipkin Protobuf. + EncodingZipkinProto = "zipkin-proto" + // EncodingZipkinJSON is used for spans encoded as Zipkin JSON. + EncodingZipkinJSON = "zipkin-json" + // EncodingOTLPProto is used for spans encoded as OTLP Protobuf. + EncodingOTLPProto = "otlp-proto" configPrefix = "kafka.producer" suffixBrokers = ".brokers"