diff --git a/cmd/opentelemetry/app/exporter/badgerexporter/doc.go b/cmd/opentelemetry/app/exporter/badgerexporter/doc.go index d7e1903c17b..b93e381624c 100644 --- a/cmd/opentelemetry/app/exporter/badgerexporter/doc.go +++ b/cmd/opentelemetry/app/exporter/badgerexporter/doc.go @@ -12,6 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. - // Package badgerexporter implements Jaeger Badger storage as OpenTelemetry exporter. package badgerexporter diff --git a/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter.go b/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter.go index e8f41602ef3..1613e0613a7 100644 --- a/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter.go +++ b/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter.go @@ -23,7 +23,6 @@ import ( "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/exporter/kafkaexporter" - "github.com/jaegertracing/jaeger/cmd/opentelemetry/app/receiver/kafkareceiver" "github.com/jaegertracing/jaeger/plugin/storage/kafka" ) @@ -59,7 +58,7 @@ func (f Factory) CreateDefaultConfig() configmodels.Exporter { opts := &kafka.Options{} opts.InitFromViper(f.Viper) - cfg.Encoding = kafkareceiver.MustOtelEncodingForJaegerEncoding(opts.Encoding) + cfg.Encoding = mustOtelEncodingForJaegerEncoding(opts.Encoding) cfg.Topic = opts.Topic cfg.Brokers = opts.Config.Brokers cfg.ProtocolVersion = opts.Config.ProtocolVersion @@ -127,3 +126,15 @@ func (f Factory) CreateLogsExporter( ) (component.LogsExporter, error) { return f.Wrapped.CreateLogsExporter(ctx, params, cfg) } + +// mustOtelEncodingForJaegerEncoding translates a jaeger encoding to a otel encoding +func mustOtelEncodingForJaegerEncoding(jaegerEncoding string) string { + switch jaegerEncoding { + case kafka.EncodingProto: + return "jaeger_proto" + case kafka.EncodingJSON: + return "jaeger_json" + } + + panic(jaegerEncoding + " is not a supported kafka encoding in the OTEL collector.") +} diff --git a/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter_test.go b/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter_test.go index df36331786a..b04195f6a57 100644 --- a/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter_test.go +++ b/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter_test.go @@ -27,6 +27,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/flags" jConfig "github.com/jaegertracing/jaeger/pkg/config" + "github.com/jaegertracing/jaeger/plugin/storage/kafka" ) func TestDefaultConfig(t *testing.T) { @@ -86,3 +87,33 @@ func TestLoadConfigAndFlags(t *testing.T) { assert.Equal(t, "/etc/foo", kafkaCfg.Authentication.Kerberos.ConfigPath) assert.Equal(t, "from-jaeger-config", kafkaCfg.Authentication.Kerberos.Username) } + +func TestMustOtelEncodingForJaegerEncoding(t *testing.T) { + tests := []struct { + in string + expected string + expectsPanic bool + }{ + { + in: kafka.EncodingProto, + expected: "jaeger_proto", + }, + { + in: kafka.EncodingJSON, + expected: "jaeger_json", + }, + { + in: "not-an-encoding", + expectsPanic: true, + }, + } + + for _, tt := range tests { + if tt.expectsPanic { + assert.Panics(t, func() { mustOtelEncodingForJaegerEncoding(tt.in) }) + continue + } + + assert.Equal(t, tt.expected, mustOtelEncodingForJaegerEncoding(tt.in)) + } +} diff --git a/cmd/opentelemetry/app/receiver/kafkareceiver/flags.go b/cmd/opentelemetry/app/receiver/kafkareceiver/flags.go index 91be467e217..1ccab212706 100644 --- a/cmd/opentelemetry/app/receiver/kafkareceiver/flags.go +++ b/cmd/opentelemetry/app/receiver/kafkareceiver/flags.go @@ -16,11 +16,30 @@ package kafkareceiver import ( "flag" + "fmt" + "strings" ingesterApp "github.com/jaegertracing/jaeger/cmd/ingester/app" + "github.com/jaegertracing/jaeger/plugin/storage/kafka" +) + +const ( + // encodingZipkinProto is used for spans encoded as Zipkin Protobuf. + encodingZipkinProto = "zipkin-proto" + // encodingZipkinJSON is used for spans encoded as Zipkin JSON. + encodingZipkinJSON = "zipkin-json" + // encodingOTLPProto is used for spans encoded as OTLP Protobuf. + encodingOTLPProto = "otlp-proto" ) // AddFlags adds Ingester flags. func AddFlags(flags *flag.FlagSet) { ingesterApp.AddOTELFlags(flags) + // Modify kafka.consumer.encoding flag + flags.Lookup(ingesterApp.KafkaConsumerConfigPrefix + ingesterApp.SuffixEncoding).Usage = fmt.Sprintf(`The encoding of spans ("%s") consumed from kafka`, strings.Join( + append(kafka.AllEncodings, + encodingZipkinJSON, + encodingZipkinProto, + encodingOTLPProto, + ), "\", \"")) } diff --git a/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go b/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go index 3b4a3a2fd4f..ad68851c6c1 100644 --- a/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go +++ b/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go @@ -59,7 +59,7 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver { cfg.Brokers = opts.Brokers cfg.ClientID = opts.ClientID - cfg.Encoding = MustOtelEncodingForJaegerEncoding(opts.Encoding) + cfg.Encoding = mustOtelEncodingForJaegerEncoding(opts.Encoding) cfg.GroupID = opts.GroupID cfg.Topic = opts.Topic cfg.ProtocolVersion = opts.ProtocolVersion @@ -138,13 +138,21 @@ func (f Factory) CreateLogsReceiver( return f.Wrapped.CreateLogsReceiver(ctx, params, cfg, nextConsumer) } -// MustOtelEncodingForJaegerEncoding translates a jaeger encoding to a otel encoding -func MustOtelEncodingForJaegerEncoding(jaegerEncoding string) string { +// mustOtelEncodingForJaegerEncoding translates a jaeger encoding to a otel encoding +func mustOtelEncodingForJaegerEncoding(jaegerEncoding string) string { switch jaegerEncoding { case kafka.EncodingProto: return "jaeger_proto" case kafka.EncodingJSON: return "jaeger_json" + case encodingOTLPProto: + return "otlp_proto" + case encodingZipkinProto: + return "zipkin_proto" + case encodingZipkinJSON: + return "zipkin_json" + case kafka.EncodingZipkinThrift: + return "zipkin_thrift" } panic(jaegerEncoding + " is not a supported kafka encoding in the OTEL collector.") diff --git a/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver_test.go b/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver_test.go index 921161bc199..34318287371 100644 --- a/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver_test.go +++ b/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver_test.go @@ -104,6 +104,22 @@ func TestMustOtelEncodingForJaegerEncoding(t *testing.T) { in: kafka.EncodingJSON, expected: "jaeger_json", }, + { + in: encodingOTLPProto, + expected: "otlp_proto", + }, + { + in: encodingZipkinProto, + expected: "zipkin_proto", + }, + { + in: encodingZipkinJSON, + expected: "zipkin_json", + }, + { + in: kafka.EncodingZipkinThrift, + expected: "zipkin_thrift", + }, { in: "not-an-encoding", expectsPanic: true, @@ -112,19 +128,10 @@ func TestMustOtelEncodingForJaegerEncoding(t *testing.T) { for _, tt := range tests { if tt.expectsPanic { - assertPanic(t, func() { MustOtelEncodingForJaegerEncoding(tt.in) }) + assert.Panics(t, func() { mustOtelEncodingForJaegerEncoding(tt.in) }) continue } - assert.Equal(t, tt.expected, MustOtelEncodingForJaegerEncoding(tt.in)) + assert.Equal(t, tt.expected, mustOtelEncodingForJaegerEncoding(tt.in)) } } - -func assertPanic(t *testing.T, f func()) { - defer func() { - if r := recover(); r == nil { - t.Errorf("The code did not panic") - } - }() - f() -}