Skip to content

Commit

Permalink
Support more encodings for kafka.consumer in OTel Ingester
Browse files Browse the repository at this point in the history
Signed-off-by: Sam Xie <[email protected]>
  • Loading branch information
XSAM committed Oct 22, 2020
1 parent 9f954fe commit ec31e8a
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 17 deletions.
1 change: 0 additions & 1 deletion cmd/opentelemetry/app/exporter/badgerexporter/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.


// Package badgerexporter implements Jaeger Badger storage as OpenTelemetry exporter.
package badgerexporter
15 changes: 13 additions & 2 deletions cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/exporter/kafkaexporter"

"github.com/jaegertracing/jaeger/cmd/opentelemetry/app/receiver/kafkareceiver"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

Expand Down Expand Up @@ -59,7 +58,7 @@ func (f Factory) CreateDefaultConfig() configmodels.Exporter {
opts := &kafka.Options{}
opts.InitFromViper(f.Viper)

cfg.Encoding = kafkareceiver.MustOtelEncodingForJaegerEncoding(opts.Encoding)
cfg.Encoding = mustOtelEncodingForJaegerEncoding(opts.Encoding)
cfg.Topic = opts.Topic
cfg.Brokers = opts.Config.Brokers
cfg.ProtocolVersion = opts.Config.ProtocolVersion
Expand Down Expand Up @@ -127,3 +126,15 @@ func (f Factory) CreateLogsExporter(
) (component.LogsExporter, error) {
return f.Wrapped.CreateLogsExporter(ctx, params, cfg)
}

// mustOtelEncodingForJaegerEncoding translates a jaeger encoding to a otel encoding
func mustOtelEncodingForJaegerEncoding(jaegerEncoding string) string {
switch jaegerEncoding {
case kafka.EncodingProto:
return "jaeger_proto"
case kafka.EncodingJSON:
return "jaeger_json"
}

panic(jaegerEncoding + " is not a supported kafka encoding in the OTEL collector.")
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/flags"
jConfig "github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

func TestDefaultConfig(t *testing.T) {
Expand Down Expand Up @@ -86,3 +87,33 @@ func TestLoadConfigAndFlags(t *testing.T) {
assert.Equal(t, "/etc/foo", kafkaCfg.Authentication.Kerberos.ConfigPath)
assert.Equal(t, "from-jaeger-config", kafkaCfg.Authentication.Kerberos.Username)
}

func TestMustOtelEncodingForJaegerEncoding(t *testing.T) {
tests := []struct {
in string
expected string
expectsPanic bool
}{
{
in: kafka.EncodingProto,
expected: "jaeger_proto",
},
{
in: kafka.EncodingJSON,
expected: "jaeger_json",
},
{
in: "not-an-encoding",
expectsPanic: true,
},
}

for _, tt := range tests {
if tt.expectsPanic {
assert.Panics(t, func() { mustOtelEncodingForJaegerEncoding(tt.in) })
continue
}

assert.Equal(t, tt.expected, mustOtelEncodingForJaegerEncoding(tt.in))
}
}
19 changes: 19 additions & 0 deletions cmd/opentelemetry/app/receiver/kafkareceiver/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,30 @@ package kafkareceiver

import (
"flag"
"fmt"
"strings"

ingesterApp "github.com/jaegertracing/jaeger/cmd/ingester/app"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

const (
// encodingZipkinProto is used for spans encoded as Zipkin Protobuf.
encodingZipkinProto = "zipkin-proto"
// encodingZipkinJSON is used for spans encoded as Zipkin JSON.
encodingZipkinJSON = "zipkin-json"
// encodingOTLPProto is used for spans encoded as OTLP Protobuf.
encodingOTLPProto = "otlp-proto"
)

// AddFlags adds Ingester flags.
func AddFlags(flags *flag.FlagSet) {
ingesterApp.AddOTELFlags(flags)
// Modify kafka.consumer.encoding flag
flags.Lookup(ingesterApp.KafkaConsumerConfigPrefix + ingesterApp.SuffixEncoding).Usage = fmt.Sprintf(`The encoding of spans ("%s") consumed from kafka`, strings.Join(
append(kafka.AllEncodings,
encodingZipkinJSON,
encodingZipkinProto,
encodingOTLPProto,
), "\", \""))
}
14 changes: 11 additions & 3 deletions cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver {

cfg.Brokers = opts.Brokers
cfg.ClientID = opts.ClientID
cfg.Encoding = MustOtelEncodingForJaegerEncoding(opts.Encoding)
cfg.Encoding = mustOtelEncodingForJaegerEncoding(opts.Encoding)
cfg.GroupID = opts.GroupID
cfg.Topic = opts.Topic
cfg.ProtocolVersion = opts.ProtocolVersion
Expand Down Expand Up @@ -138,13 +138,21 @@ func (f Factory) CreateLogsReceiver(
return f.Wrapped.CreateLogsReceiver(ctx, params, cfg, nextConsumer)
}

// MustOtelEncodingForJaegerEncoding translates a jaeger encoding to a otel encoding
func MustOtelEncodingForJaegerEncoding(jaegerEncoding string) string {
// mustOtelEncodingForJaegerEncoding translates a jaeger encoding to a otel encoding
func mustOtelEncodingForJaegerEncoding(jaegerEncoding string) string {
switch jaegerEncoding {
case kafka.EncodingProto:
return "jaeger_proto"
case kafka.EncodingJSON:
return "jaeger_json"
case encodingOTLPProto:
return "otlp_proto"
case encodingZipkinProto:
return "zipkin_proto"
case encodingZipkinJSON:
return "zipkin_json"
case kafka.EncodingZipkinThrift:
return "zipkin_thrift"
}

panic(jaegerEncoding + " is not a supported kafka encoding in the OTEL collector.")
Expand Down
29 changes: 18 additions & 11 deletions cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,22 @@ func TestMustOtelEncodingForJaegerEncoding(t *testing.T) {
in: kafka.EncodingJSON,
expected: "jaeger_json",
},
{
in: encodingOTLPProto,
expected: "otlp_proto",
},
{
in: encodingZipkinProto,
expected: "zipkin_proto",
},
{
in: encodingZipkinJSON,
expected: "zipkin_json",
},
{
in: kafka.EncodingZipkinThrift,
expected: "zipkin_thrift",
},
{
in: "not-an-encoding",
expectsPanic: true,
Expand All @@ -112,19 +128,10 @@ func TestMustOtelEncodingForJaegerEncoding(t *testing.T) {

for _, tt := range tests {
if tt.expectsPanic {
assertPanic(t, func() { MustOtelEncodingForJaegerEncoding(tt.in) })
assert.Panics(t, func() { mustOtelEncodingForJaegerEncoding(tt.in) })
continue
}

assert.Equal(t, tt.expected, MustOtelEncodingForJaegerEncoding(tt.in))
assert.Equal(t, tt.expected, mustOtelEncodingForJaegerEncoding(tt.in))
}
}

func assertPanic(t *testing.T, f func()) {
defer func() {
if r := recover(); r == nil {
t.Errorf("The code did not panic")
}
}()
f()
}

0 comments on commit ec31e8a

Please sign in to comment.