diff --git a/Makefile b/Makefile index bf1faee82f1..fef17ab2fdb 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,6 @@ JAEGER_IMPORT_PATH=github.com/jaegertracing/jaeger STORAGE_PKGS = ./plugin/storage/integration/... +OTEL_COLLECTOR_DIR = ./cmd/opentelemetry-collector # all .go files that are not auto-generated and should be auto-formatted and linted. ALL_SRC := $(shell find . -name '*.go' \ @@ -87,9 +88,13 @@ clean: rm -rf cover.out .cover/ cover.html lint.log fmt.log .PHONY: test -test: go-gen +test: go-gen test-otel bash -c "set -e; set -o pipefail; $(GOTEST) ./... | $(COLORIZE)" +.PHONY: test-otel +test-otel: + cd ${OTEL_COLLECTOR_DIR} && bash -c "set -e; set -o pipefail; $(GOTEST) ./... | $(COLORIZE)" + .PHONY: all-in-one-integration-test all-in-one-integration-test: go-gen $(GOTEST) -tags=integration ./cmd/all-in-one/... @@ -160,7 +165,7 @@ lint-staticcheck: @[ ! -s "$(LINT_LOG)" ] || (echo "Detected staticcheck failures:" | cat - $(LINT_LOG) && false) .PHONY: lint -lint: lint-staticcheck lint-gosec +lint: lint-staticcheck lint-gosec lint-otel $(GOVET) ./... $(MAKE) go-lint @echo Running go fmt on ALL_SRC ... @@ -169,6 +174,10 @@ lint: lint-staticcheck lint-gosec ./scripts/import-order-cleanup.sh stdout > $(IMPORT_LOG) @[ ! -s "$(FMT_LOG)" -a ! -s "$(IMPORT_LOG)" ] || (echo "Go fmt, license check, or import ordering failures, run 'make fmt'" | cat - $(FMT_LOG) && false) +.PHONY: lint-otel +lint-otel: + cd ${OTEL_COLLECTOR_DIR} && $(GOVET) ./... + .PHONY: go-lint go-lint: @cat /dev/null > $(LINT_LOG) @@ -246,7 +255,6 @@ else $(GOBUILD) -o ./cmd/collector/collector-$(GOOS) $(BUILD_INFO) ./cmd/collector/main.go endif -OTEL_COLLECTOR_DIR = ./cmd/opentelemetry-collector .PHONY: build-otel-collector build-otel-collector: ifeq ($(GOARCH), s390x) diff --git a/cmd/opentelemetry-collector/app/defaults/defaults.go b/cmd/opentelemetry-collector/app/defaults/defaults.go new file mode 100644 index 00000000000..edeb99abd0a --- /dev/null +++ b/cmd/opentelemetry-collector/app/defaults/defaults.go @@ -0,0 +1,37 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package defaults + +import ( + "github.com/open-telemetry/opentelemetry-collector/config" + "github.com/open-telemetry/opentelemetry-collector/defaults" + "github.com/spf13/viper" + + "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka" + storageKafka "github.com/jaegertracing/jaeger/plugin/storage/kafka" +) + +// Components creates default and Jaeger factories +func Components(v *viper.Viper) config.Factories { + kafkaExp := kafka.Factory{OptionsFactory: func() *storageKafka.Options { + opts := kafka.DefaultOptions() + opts.InitFromViper(v) + return opts + }} + + factories, _ := defaults.Components() + factories.Exporters[kafkaExp.Type()] = kafkaExp + return factories +} diff --git a/cmd/opentelemetry-collector/app/defaults/defaults_test.go b/cmd/opentelemetry-collector/app/defaults/defaults_test.go new file mode 100644 index 00000000000..6b7670defa1 --- /dev/null +++ b/cmd/opentelemetry-collector/app/defaults/defaults_test.go @@ -0,0 +1,34 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package defaults + +import ( + "testing" + + "github.com/magiconair/properties/assert" + + "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka" + jConfig "github.com/jaegertracing/jaeger/pkg/config" +) + +func TestComponents(t *testing.T) { + v, _ := jConfig.Viperize(kafka.DefaultOptions().AddFlags) + factories := Components(v) + assert.Equal(t, "jaeger_kafka", factories.Exporters[kafka.TypeStr].Type()) + + kafkaFactory := factories.Exporters[kafka.TypeStr] + kc := kafkaFactory.CreateDefaultConfig().(*kafka.Config) + assert.Equal(t, []string{"127.0.0.1:9092"}, kc.Config.Brokers) +} diff --git a/cmd/opentelemetry-collector/app/exporter/kafka/config.go b/cmd/opentelemetry-collector/app/exporter/kafka/config.go new file mode 100644 index 00000000000..c4cc92eca2d --- /dev/null +++ b/cmd/opentelemetry-collector/app/exporter/kafka/config.go @@ -0,0 +1,27 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "github.com/open-telemetry/opentelemetry-collector/config/configmodels" + + "github.com/jaegertracing/jaeger/plugin/storage/kafka" +) + +// Config hold configuration of Jaeger Kafka exporter/storage. +type Config struct { + configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + kafka.Options `mapstructure:",squash"` +} diff --git a/cmd/opentelemetry-collector/app/exporter/kafka/config_test.go b/cmd/opentelemetry-collector/app/exporter/kafka/config_test.go new file mode 100644 index 00000000000..81cf0dfcc77 --- /dev/null +++ b/cmd/opentelemetry-collector/app/exporter/kafka/config_test.go @@ -0,0 +1,93 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "path" + "testing" + + "github.com/Shopify/sarama" + "github.com/open-telemetry/opentelemetry-collector/config" + "github.com/open-telemetry/opentelemetry-collector/config/configcheck" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/jaegertracing/jaeger/cmd/flags" + jConfig "github.com/jaegertracing/jaeger/pkg/config" + "github.com/jaegertracing/jaeger/plugin/storage/kafka" +) + +func TestDefaultConfig(t *testing.T) { + v, c := jConfig.Viperize(DefaultOptions().AddFlags) + err := c.ParseFlags([]string{""}) + require.NoError(t, err) + factory := &Factory{OptionsFactory: func() *kafka.Options { + opts := DefaultOptions() + opts.InitFromViper(v) + return opts + }} + defaultCfg := factory.CreateDefaultConfig().(*Config) + assert.NoError(t, configcheck.ValidateConfig(defaultCfg)) + assert.Equal(t, "jaeger-spans", defaultCfg.Topic) + assert.Equal(t, "protobuf", defaultCfg.Encoding) + assert.Equal(t, []string{"127.0.0.1:9092"}, defaultCfg.Config.Brokers) + assert.Equal(t, sarama.WaitForLocal, defaultCfg.Config.RequiredAcks) + assert.Equal(t, "none", defaultCfg.Config.Authentication) + assert.Equal(t, "/etc/krb5.conf", defaultCfg.Config.Kerberos.ConfigPath) + assert.Equal(t, "kafka", defaultCfg.Config.Kerberos.ServiceName) + assert.Equal(t, false, defaultCfg.Config.TLS.Enabled) +} + +func TestLoadConfigAndFlags(t *testing.T) { + factories, err := config.ExampleComponents() + require.NoError(t, err) + + v, c := jConfig.Viperize(DefaultOptions().AddFlags, flags.AddConfigFileFlag) + err = c.ParseFlags([]string{"--config-file=./testdata/jaeger-config.yaml", "--kafka.producer.topic=jaeger-test", "--kafka.producer.brokers=host1,host2"}) + require.NoError(t, err) + + err = flags.TryLoadConfigFile(v) + require.NoError(t, err) + + factory := &Factory{OptionsFactory: func() *kafka.Options { + opts := DefaultOptions() + opts.InitFromViper(v) + assert.Equal(t, "jaeger-test", opts.Topic) + assert.Equal(t, []string{"host1", "host2"}, opts.Config.Brokers) + return opts + }} + + factories.Exporters[TypeStr] = factory + cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories) + require.NoError(t, err) + require.NotNil(t, cfg) + + kafkaCfg := cfg.Exporters[TypeStr].(*Config) + assert.Equal(t, TypeStr, kafkaCfg.Name()) + assert.Equal(t, "jaeger-prod", kafkaCfg.Topic) + assert.Equal(t, "emojis", kafkaCfg.Encoding) + assert.Equal(t, []string{"foo", "bar"}, kafkaCfg.Config.Brokers) + assert.Equal(t, "tls", kafkaCfg.Config.Authentication) + assert.Equal(t, "user", kafkaCfg.Config.PlainText.UserName) + assert.Equal(t, "123", kafkaCfg.Config.PlainText.Password) + assert.Equal(t, true, kafkaCfg.Config.TLS.Enabled) + assert.Equal(t, "ca.crt", kafkaCfg.Config.TLS.CAPath) + assert.Equal(t, "key.crt", kafkaCfg.Config.TLS.KeyPath) + assert.Equal(t, "cert.crt", kafkaCfg.Config.TLS.CertPath) + assert.Equal(t, true, kafkaCfg.Config.TLS.SkipHostVerify) + assert.Equal(t, "jaeger", kafkaCfg.Config.Kerberos.Realm) + assert.Equal(t, "/etc/foo", kafkaCfg.Config.Kerberos.ConfigPath) + assert.Equal(t, "from-jaeger-config", kafkaCfg.Config.Kerberos.Username) +} diff --git a/cmd/opentelemetry-collector/app/exporter/kafka/doc.go b/cmd/opentelemetry-collector/app/exporter/kafka/doc.go new file mode 100644 index 00000000000..735f971fb01 --- /dev/null +++ b/cmd/opentelemetry-collector/app/exporter/kafka/doc.go @@ -0,0 +1,16 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package kafka implements Jaeger Kafka exporter. +package kafka diff --git a/cmd/opentelemetry-collector/app/exporter/kafka/exporter.go b/cmd/opentelemetry-collector/app/exporter/kafka/exporter.go new file mode 100644 index 00000000000..19a34e799f8 --- /dev/null +++ b/cmd/opentelemetry-collector/app/exporter/kafka/exporter.go @@ -0,0 +1,45 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "github.com/open-telemetry/opentelemetry-collector/exporter" + "github.com/uber/jaeger-lib/metrics" + "go.uber.org/zap" + + storageOtelExporter "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter" + "github.com/jaegertracing/jaeger/plugin/storage/kafka" + "github.com/jaegertracing/jaeger/storage" +) + +// New creates new Kafka exporter +func New(config *Config, log *zap.Logger) (exporter.TraceExporter, error) { + f := kafka.NewFactory() + f.InitFromOptions(config.Options) + err := f.Initialize(metrics.NullFactory, log) + if err != nil { + return nil, err + } + return create(f, config) +} + +func create(factory storage.Factory, config *Config) (exporter.TraceExporter, error) { + // ignoring error for code coverage. Kafka factory never returns an error + spanWriter, err := factory.CreateSpanWriter() + if err != nil { + return nil, err + } + return storageOtelExporter.NewSpanWriterExporter(config, spanWriter) +} diff --git a/cmd/opentelemetry-collector/app/exporter/kafka/exporter_test.go b/cmd/opentelemetry-collector/app/exporter/kafka/exporter_test.go new file mode 100644 index 00000000000..5b067348b15 --- /dev/null +++ b/cmd/opentelemetry-collector/app/exporter/kafka/exporter_test.go @@ -0,0 +1,57 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber/jaeger-lib/metrics" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +func TestNew(t *testing.T) { + m := &mockProducerBuilder{} + c := &Config{} + exporter, err := create(m, c) + require.Nil(t, err) + assert.NotNil(t, exporter) + m = &mockProducerBuilder{err: errors.New("failed to create")} + exporter, err = create(m, c) + assert.Error(t, err, "failed to create") + assert.Nil(t, exporter) +} + +type mockProducerBuilder struct { + err error +} + +func (m mockProducerBuilder) CreateSpanWriter() (spanstore.Writer, error) { + return nil, m.err +} +func (mockProducerBuilder) CreateSpanReader() (spanstore.Reader, error) { + return nil, nil +} +func (mockProducerBuilder) CreateDependencyReader() (dependencystore.Reader, error) { + return nil, nil +} +func (mockProducerBuilder) Initialize(metrics.Factory, *zap.Logger) error { + return nil +} diff --git a/cmd/opentelemetry-collector/app/exporter/kafka/factory.go b/cmd/opentelemetry-collector/app/exporter/kafka/factory.go new file mode 100644 index 00000000000..45092bb6acd --- /dev/null +++ b/cmd/opentelemetry-collector/app/exporter/kafka/factory.go @@ -0,0 +1,74 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "fmt" + + "github.com/open-telemetry/opentelemetry-collector/config/configerror" + "github.com/open-telemetry/opentelemetry-collector/config/configmodels" + "github.com/open-telemetry/opentelemetry-collector/exporter" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/plugin/storage/kafka" +) + +const ( + TypeStr = "jaeger_kafka" +) + +// OptionsFactory returns initialized kafka.Options structure. +type OptionsFactory func() *kafka.Options + +// DefaultOptions creates Kafka options supported by this exporter. +func DefaultOptions() *kafka.Options { + return &kafka.Options{} +} + +// Factory is the factory for Jaeger Kafka exporter. +type Factory struct { + OptionsFactory OptionsFactory +} + +// Type gets the type of exporter. +func (Factory) Type() string { + return TypeStr +} + +// CreateDefaultConfig returns default configuration of Factory. +func (f Factory) CreateDefaultConfig() configmodels.Exporter { + opts := f.OptionsFactory() + return &Config{ + Options: *opts, + ExporterSettings: configmodels.ExporterSettings{ + TypeVal: TypeStr, + NameVal: TypeStr, + }, + } +} + +// CreateTraceExporter creates Jaeger Kafka trace exporter. +func (Factory) CreateTraceExporter(log *zap.Logger, cfg configmodels.Exporter) (exporter.TraceExporter, error) { + kafkaCfg, ok := cfg.(*Config) + if !ok { + return nil, fmt.Errorf("could not cast configuration to %s", TypeStr) + } + return New(kafkaCfg, log) +} + +// CreateMetricsExporter is not implemented. +func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (exporter.MetricsExporter, error) { + return nil, configerror.ErrDataTypeIsNotSupported +} diff --git a/cmd/opentelemetry-collector/app/exporter/kafka/factory_test.go b/cmd/opentelemetry-collector/app/exporter/kafka/factory_test.go new file mode 100644 index 00000000000..2bb8f0cdebc --- /dev/null +++ b/cmd/opentelemetry-collector/app/exporter/kafka/factory_test.go @@ -0,0 +1,66 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "testing" + + "github.com/open-telemetry/opentelemetry-collector/config/configcheck" + "github.com/open-telemetry/opentelemetry-collector/config/configerror" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + jConfig "github.com/jaegertracing/jaeger/pkg/config" + "github.com/jaegertracing/jaeger/plugin/storage/kafka" +) + +func TestCreateTraceExporter(t *testing.T) { + v, _ := jConfig.Viperize(DefaultOptions().AddFlags) + opts := DefaultOptions() + opts.InitFromViper(v) + factory := &Factory{OptionsFactory: func() *kafka.Options { + return opts + }} + exporter, err := factory.CreateTraceExporter(zap.NewNop(), factory.CreateDefaultConfig()) + require.Nil(t, exporter) + assert.EqualError(t, err, "kafka: client has run out of available brokers to talk to (Is your cluster reachable?)") +} + +func TestCreateTraceExporter_nilConfig(t *testing.T) { + factory := &Factory{} + exporter, err := factory.CreateTraceExporter(zap.NewNop(), nil) + require.Nil(t, exporter) + assert.EqualError(t, err, "could not cast configuration to jaeger_kafka") +} + +func TestCreateMetricsExporter(t *testing.T) { + f := Factory{OptionsFactory: DefaultOptions} + mReceiver, err := f.CreateMetricsExporter(zap.NewNop(), f.CreateDefaultConfig()) + assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported) + assert.Nil(t, mReceiver) +} + +func TestCreateDefaultConfig(t *testing.T) { + factory := Factory{OptionsFactory: DefaultOptions} + cfg := factory.CreateDefaultConfig() + assert.NotNil(t, cfg, "failed to create default config") + assert.NoError(t, configcheck.ValidateConfig(cfg)) +} + +func TestType(t *testing.T) { + factory := Factory{OptionsFactory: DefaultOptions} + assert.Equal(t, TypeStr, factory.Type()) +} diff --git a/cmd/opentelemetry-collector/app/exporter/kafka/testdata/config.yaml b/cmd/opentelemetry-collector/app/exporter/kafka/testdata/config.yaml new file mode 100644 index 00000000000..1ed1a58f895 --- /dev/null +++ b/cmd/opentelemetry-collector/app/exporter/kafka/testdata/config.yaml @@ -0,0 +1,33 @@ +receivers: + examplereceiver: + +processors: + exampleprocessor: + +exporters: + jaeger_kafka: + topic: jaeger-prod + encoding: emojis + brokers: foo,bar + authentication: + type: tls + plaintext: + username: user + password: 123 + tls: + enabled: true + ca: ca.crt + key: key.crt + cert: cert.crt + skip_host_verify: true + kerberos: + realm: jaeger + config_file: /etc/foo + + +service: + pipelines: + traces: + receivers: [examplereceiver] + processors: [exampleprocessor] + exporters: [jaeger_kafka] diff --git a/cmd/opentelemetry-collector/app/exporter/kafka/testdata/jaeger-config.yaml b/cmd/opentelemetry-collector/app/exporter/kafka/testdata/jaeger-config.yaml new file mode 100644 index 00000000000..99563d92dd1 --- /dev/null +++ b/cmd/opentelemetry-collector/app/exporter/kafka/testdata/jaeger-config.yaml @@ -0,0 +1,3 @@ +kafka.producer: + kerberos: + username: from-jaeger-config diff --git a/cmd/opentelemetry-collector/app/exporter/span_writer_exporter.go b/cmd/opentelemetry-collector/app/exporter/span_writer_exporter.go new file mode 100644 index 00000000000..a8ba04fa714 --- /dev/null +++ b/cmd/opentelemetry-collector/app/exporter/span_writer_exporter.go @@ -0,0 +1,67 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporter + +import ( + "context" + "io" + + "github.com/open-telemetry/opentelemetry-collector/config/configmodels" + "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" + "github.com/open-telemetry/opentelemetry-collector/consumer/consumererror" + "github.com/open-telemetry/opentelemetry-collector/exporter" + "github.com/open-telemetry/opentelemetry-collector/exporter/exporterhelper" + "github.com/open-telemetry/opentelemetry-collector/oterr" + jaegertranslator "github.com/open-telemetry/opentelemetry-collector/translator/trace/jaeger" + + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +// NewSpanWriterExporter returns exporter.TraceExporter +func NewSpanWriterExporter(config configmodels.Exporter, spanWriter spanstore.Writer) (exporter.TraceExporter, error) { + storage := storage{Writer: spanWriter} + return exporterhelper.NewTraceExporter( + config, + storage.traceDataPusher, + exporterhelper.WithShutdown(func() error { + if closer, ok := spanWriter.(io.Closer); ok { + return closer.Close() + } + return nil + })) +} + +type storage struct { + Writer spanstore.Writer +} + +// traceDataPusher implements OTEL exporterhelper.traceDataPusher +func (s *storage) traceDataPusher(ctx context.Context, td consumerdata.TraceData) (droppedSpans int, err error) { + protoBatch, err := jaegertranslator.OCProtoToJaegerProto(td) + if err != nil { + return len(td.Spans), consumererror.Permanent(err) + } + dropped := 0 + var errs []error + for _, span := range protoBatch.Spans { + span.Process = protoBatch.Process + err := s.Writer.WriteSpan(span) + if err != nil { + errs = append(errs, err) + dropped++ + } + } + return dropped, oterr.CombineErrors(errs) +} diff --git a/cmd/opentelemetry-collector/app/exporter/span_writer_exporter_test.go b/cmd/opentelemetry-collector/app/exporter/span_writer_exporter_test.go new file mode 100644 index 00000000000..ebe06a75a10 --- /dev/null +++ b/cmd/opentelemetry-collector/app/exporter/span_writer_exporter_test.go @@ -0,0 +1,120 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporter + +import ( + "context" + "errors" + "testing" + + tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" + "github.com/open-telemetry/opentelemetry-collector/config/configmodels" + "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/jaegertracing/jaeger/model" +) + +func TestNew(t *testing.T) { + exporter, err := NewSpanWriterExporter(&configmodels.ExporterSettings{}, spanWriter{}) + require.NoError(t, err) + assert.NotNil(t, exporter) + assert.Nil(t, exporter.Shutdown()) + exporter, err = NewSpanWriterExporter(&configmodels.ExporterSettings{}, noClosableWriter{}) + require.NoError(t, err) + assert.NotNil(t, exporter) + assert.Nil(t, exporter.Shutdown()) +} + +func TestStore(t *testing.T) { + traceID := []byte("0123456789abcdef") + spanId := []byte("01234567") + errorName := &tracepb.TruncatableString{Value: "error"} + tests := []struct { + storage storage + data consumerdata.TraceData + err string + dropped int + caption string + }{ + { + caption: "nothing to store", + storage: storage{Writer: spanWriter{}}, + data: consumerdata.TraceData{Spans: []*tracepb.Span{}}, + dropped: 0, + }, + { + caption: "wrong data", + storage: storage{Writer: spanWriter{}}, + data: consumerdata.TraceData{Spans: []*tracepb.Span{{}}}, + err: "TraceID is nil", + dropped: 1, + }, + { + caption: "one error in writer", + storage: storage{Writer: spanWriter{err: errors.New("could not store")}}, + data: consumerdata.TraceData{Spans: []*tracepb.Span{ + {TraceId: traceID, SpanId: spanId, Name: errorName}, + {TraceId: traceID, SpanId: spanId}, + }}, + dropped: 1, + err: "could not store", + }, + { + caption: "two errors in writer", + storage: storage{Writer: spanWriter{err: errors.New("could not store")}}, + data: consumerdata.TraceData{Spans: []*tracepb.Span{ + {TraceId: traceID, SpanId: spanId, Name: errorName}, + {TraceId: traceID, SpanId: spanId, Name: errorName}, + }}, + dropped: 2, + err: "[could not store; could not store]", + }, + } + for _, test := range tests { + t.Run(test.caption, func(t *testing.T) { + dropped, err := test.storage.traceDataPusher(context.Background(), test.data) + assert.Equal(t, test.dropped, dropped) + if test.err != "" { + assert.EqualError(t, err, test.err) + } else { + require.NoError(t, err) + } + }) + } +} + +type spanWriter struct { + err error +} + +func (w spanWriter) WriteSpan(span *model.Span) error { + if span.GetOperationName() == "error" { + return w.err + } + return nil +} + +func (spanWriter) Close() error { + return nil +} + +type noClosableWriter struct { +} + +func (noClosableWriter) WriteSpan(span *model.Span) error { + return nil +} diff --git a/cmd/opentelemetry-collector/go.mod b/cmd/opentelemetry-collector/go.mod index 6b969f421f8..ddacc8bf93b 100644 --- a/cmd/opentelemetry-collector/go.mod +++ b/cmd/opentelemetry-collector/go.mod @@ -6,4 +6,16 @@ replace k8s.io/client-go => k8s.io/client-go v0.0.0-20190620085101-78d2af792bab replace github.com/jaegertracing/jaeger => ./../../ -require github.com/open-telemetry/opentelemetry-collector v0.2.8-0.20200318211436-c7a11d6181c1 // indirect +require ( + github.com/Shopify/sarama v1.22.2-0.20190604114437-cd910a683f9f + github.com/census-instrumentation/opencensus-proto v0.2.1 + github.com/crossdock/crossdock-go v0.0.0-20160816171116-049aabb0122b + github.com/gogo/protobuf v1.3.0 + github.com/jaegertracing/jaeger v1.17.0 + github.com/magiconair/properties v1.8.1 + github.com/open-telemetry/opentelemetry-collector v0.2.8-0.20200323151927-794a2b689bd9 + github.com/spf13/viper v1.6.2 + github.com/stretchr/testify v1.5.0 + github.com/uber/jaeger-lib v2.2.0+incompatible + go.uber.org/zap v1.13.0 +) diff --git a/cmd/opentelemetry-collector/go.sum b/cmd/opentelemetry-collector/go.sum index 8adbde2b21e..bc6534e838d 100644 --- a/cmd/opentelemetry-collector/go.sum +++ b/cmd/opentelemetry-collector/go.sum @@ -21,6 +21,7 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= +github.com/DataDog/zstd v1.4.4 h1:+IawcoXhCBylN7ccwdwf8LOH2jKq7NavGpEPanrlTzE= github.com/DataDog/zstd v1.4.4/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= @@ -34,6 +35,7 @@ github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdko github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= +github.com/Shopify/sarama v1.22.2-0.20190604114437-cd910a683f9f h1:SgZvxOvp9NLnAjkIiby0LQgXH0yQNTk2eDzbYPVoTA4= github.com/Shopify/sarama v1.22.2-0.20190604114437-cd910a683f9f/go.mod h1:XLH1GYJnLVE0XCr6KdJGVJRTwY30moWNJ4sERjXX6fs= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= @@ -90,6 +92,7 @@ github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfc github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/crossdock/crossdock-go v0.0.0-20160816171116-049aabb0122b h1:WR1qVJzbvrVywhAk4kMQKRPx09AZVI0NdEdYs59iHcA= github.com/crossdock/crossdock-go v0.0.0-20160816171116-049aabb0122b/go.mod h1:v9FBN7gdVTpiD/+LZ7Po0UKvROyT87uLVxTHVky/dlQ= github.com/davecgh/go-spew v0.0.0-20151105211317-5215b55f46b2/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -106,8 +109,11 @@ github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q= github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= @@ -253,14 +259,18 @@ github.com/gobuffalo/packr/v2 v2.2.0/go.mod h1:CaAwI0GPIAv+5wKLtv8Afwl+Cm78K/I/V github.com/gobuffalo/syncx v0.0.0-20190224160051-33c29581e754/go.mod h1:HhnNqWY95UYwwW3uSASeV7vtgYkT2t16hJgV3AEPUpw= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/gocql/gocql v0.0.0-20200226121155-e5c8c1f505c5/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY= +github.com/gocql/gocql v0.0.0-20200228163523-cd4b606dd2fb/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY= github.com/gofrs/flock v0.0.0-20190320160742-5135e617513b/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gogo/googleapis v1.0.1-0.20180501115203-b23578765ee5/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= +github.com/gogo/googleapis v1.2.0 h1:Z0v3OJDotX9ZBpdz2V+AI7F4fITSZhVE5mg6GQppwMM= +github.com/gogo/googleapis v1.2.0/go.mod h1:Njal3psf3qN6dwBtQfUmBZh2ybovJ0tlu3o/AC7HYjU= github.com/gogo/googleapis v1.3.0 h1:M695OaDJ5ipWvDPcoAg/YL9c3uORAegkEfBqTQF/fTQ= github.com/gogo/googleapis v1.3.0/go.mod h1:d+q1s/xVJxZGKWwC/6UfPIF33J+G1Tq4GYv9Y+Tg/EU= github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/gogo/protobuf v1.2.2-0.20190730201129-28a6bbf47e48/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= @@ -283,6 +293,7 @@ github.com/golang/protobuf v1.3.3 h1:gyjaxf+svBWX08ZjK86iN9geUJF0H6gp2IRKX6Nf6/I github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golangci/check v0.0.0-20180506172741-cfe4005ccda2/go.mod h1:k9Qvh+8juN+UKMCS/3jFtGICgW8O96FVaZsaxdzDkR4= github.com/golangci/dupl v0.0.0-20180902072040-3e9179ac440a/go.mod h1:ryS0uhF+x9jgbj/N71xsEqODy9BN81/GonCZiOzirOk= @@ -300,6 +311,7 @@ github.com/golangci/prealloc v0.0.0-20180630174525-215b22d4de21/go.mod h1:tf5+bz github.com/golangci/revgrep v0.0.0-20180526074752-d9c87f5ffaf0/go.mod h1:qOQCunEYvmd/TLamH+7LlVccLvUH5kZNhbCgTHoBbp4= github.com/golangci/unconvert v0.0.0-20180507085042-28b1c447d1f4/go.mod h1:Izgrg8RkN3rCIMLGE9CyYmU9pY2Jer6DgANEnZ/L/cQ= github.com/google/addlicense v0.0.0-20190510175307-22550fa7c1b0/go.mod h1:QtPG26W17m+OIQgE6gQ24gC1M6pUaMBAbFrTIDtwG/E= +github.com/google/addlicense v0.0.0-20200301095109-7c013a14f2e2/go.mod h1:EMjYTRimagHs1FwlIqKyX3wAM0u3rA+McvlIIWmSamA= github.com/google/btree v0.0.0-20160524151835-7d79101e329e/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -382,6 +394,7 @@ github.com/hashicorp/go-sockaddr v1.0.2/go.mod h1:rB4wwRAUzs07qva3c5SdrY/NEtAUjG github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE= github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -408,6 +421,7 @@ github.com/jaegertracing/jaeger v1.7.0/go.mod h1:LUWPSnzNPGRubM8pk0inANGitpiMOOx github.com/jaegertracing/jaeger v1.14.0 h1:C0En+gfcxf3NsAriMAvQ6LcSFrQ5VQGXddqfty1EpTI= github.com/jaegertracing/jaeger v1.14.0/go.mod h1:LUWPSnzNPGRubM8pk0inANGitpiMOOxihXx0+53llXI= github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= github.com/jessevdk/go-flags v0.0.0-20180331124232-1c38ed7ad0cc/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= @@ -533,12 +547,18 @@ github.com/open-telemetry/opentelemetry-collector v0.2.7 h1:8zWYD+iIkvvV3B0rElgU github.com/open-telemetry/opentelemetry-collector v0.2.7/go.mod h1:+z/j3zjZZ3M3vlSvS4hjIIDrkBzfq8ilWlAdGPV6wlo= github.com/open-telemetry/opentelemetry-collector v0.2.8-0.20200318211436-c7a11d6181c1 h1:wnIQei4voeeQjYWkNDwsCSQ1hfRCOgB5V37r017rOsU= github.com/open-telemetry/opentelemetry-collector v0.2.8-0.20200318211436-c7a11d6181c1/go.mod h1:+z/j3zjZZ3M3vlSvS4hjIIDrkBzfq8ilWlAdGPV6wlo= +github.com/open-telemetry/opentelemetry-collector v0.2.8-0.20200321024655-794059026e2b h1:XJJRstrqLT2VcNohwzKbCuiXRzF3cLgUtiB5x0Kjxzg= +github.com/open-telemetry/opentelemetry-collector v0.2.8-0.20200321024655-794059026e2b/go.mod h1:kpj/K5Bo/k97DouieBzd+3AhOJ2RRXSqbSkEA4qwUvg= +github.com/open-telemetry/opentelemetry-collector v0.2.8-0.20200323151927-794a2b689bd9 h1:MYcDlqLU+ZhWyqgKXyKik/bq+v89wlDRh0T0cYYX238= +github.com/open-telemetry/opentelemetry-collector v0.2.8-0.20200323151927-794a2b689bd9/go.mod h1:7Sxe+ROKhJSqPZqsKnvb/y5EbEsjbkDTbQ2sS7cF9rY= github.com/open-telemetry/opentelemetry-proto v0.0.0-20200206071824-8310c432e51c h1:nDOtl6j2Ei16tlnx/o4qKEelpHtGoZ9ArwU+tux4Ia8= github.com/open-telemetry/opentelemetry-proto v0.0.0-20200206071824-8310c432e51c/go.mod h1:PMR5GI0F7BSpio+rBGFxNm6SLzg3FypDTcFuQZnO+F8= github.com/open-telemetry/opentelemetry-proto v0.0.0-20200211051721-ff5f19c6217d h1:hZcHR0at6tb3jBjaPHlfLr6yK7rTrA8xGCS6jlUSLcU= github.com/open-telemetry/opentelemetry-proto v0.0.0-20200211051721-ff5f19c6217d/go.mod h1:PMR5GI0F7BSpio+rBGFxNm6SLzg3FypDTcFuQZnO+F8= github.com/open-telemetry/opentelemetry-proto v0.0.0-20200315170400-caed74b167ad h1:b0fNt65HvoUg+uDe63CA08SGzKO/gzPvzQQfEFX8Oks= github.com/open-telemetry/opentelemetry-proto v0.0.0-20200315170400-caed74b167ad/go.mod h1:PMR5GI0F7BSpio+rBGFxNm6SLzg3FypDTcFuQZnO+F8= +github.com/open-telemetry/opentelemetry-proto v0.0.0-20200316171511-e76584d22418 h1:R4Mjk1d++Hke9e4W33O6CMUS85hALRP5wKjhaeCYuSk= +github.com/open-telemetry/opentelemetry-proto v0.0.0-20200316171511-e76584d22418/go.mod h1:PMR5GI0F7BSpio+rBGFxNm6SLzg3FypDTcFuQZnO+F8= github.com/opentracing-contrib/go-stdlib v0.0.0-20190519235532-cf7a6c988dc9/go.mod h1:PLldrQSroqzH70Xl+1DQcGnefIbqsKR7UDaiux3zV+w= github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= @@ -559,6 +579,8 @@ github.com/pelletier/go-toml v1.6.0/go.mod h1:5N711Q9dKgbdkxHL+MEfF31hpT7l0S0s/t github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= +github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pierrec/lz4 v2.4.1+incompatible h1:mFe7ttWaflA46Mhqh+jUfjp2qTbPYxLB2/OyBppH9dg= github.com/pierrec/lz4 v2.4.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= @@ -567,6 +589,7 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= github.com/prashantv/protectmem v0.0.0-20171002184600-e20412882b3a/go.mod h1:lzZQ3Noex5pfAy7mkAeCjcBDteYU85uWWnJ/y6gKU8k= @@ -601,11 +624,13 @@ github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDa github.com/prometheus/procfs v0.0.10 h1:QJQN3jYQhkamO4mhfUWqdDH2asK7ONOI9MTWjyAxNKM= github.com/prometheus/procfs v0.0.10/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/prometheus v0.0.0-20180315085919-58e2a31db8de/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s= +github.com/prometheus/prometheus v1.8.1/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s= github.com/prometheus/prometheus v1.8.2-0.20190924101040-52e0504f83ea h1:DkijwrDmxCei6erPY2JZPJMOr8srbkbOJVkWbhSYWH4= github.com/prometheus/prometheus v1.8.2-0.20190924101040-52e0504f83ea/go.mod h1:elNqjVbwD3sCZJqKzyN7uEuwGcCpeJvv67D6BrHsDbw= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/quasilyte/go-consistent v0.0.0-20190521200055-c6f3937de18c/go.mod h1:5STLWrekHfjyYwxBRVRXNOSewLJ3PWfDJd1VyTS21fI= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 h1:dY6ETXrvDG7Sa4vE8ZQG4yqWg6UnOcbqTAahkV813vQ= github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= @@ -680,6 +705,7 @@ github.com/stretchr/testify v0.0.0-20151208002404-e3a8ff8ce365/go.mod h1:a8OnRci github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.0 h1:DMOzIV76tmoDNE9pX6RSN0aDtCYeCg5VueieJaAo1uw= github.com/stretchr/testify v1.5.0/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= @@ -829,6 +855,7 @@ golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190129075346-302c3dd5f1cc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190209173611-3b5209105503/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -867,6 +894,7 @@ golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZe golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181011042414-1f849cf54d09/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -932,6 +960,7 @@ google.golang.org/genproto v0.0.0-20190927181202-20e1ac93f88c/go.mod h1:IbNlFCBr google.golang.org/genproto v0.0.0-20200218151345-dad8c97a84f5 h1:jB9+PJSvu5tBfmJHy/OVapFdjDF3WvpkqRhxqrmzoEU= google.golang.org/genproto v0.0.0-20200218151345-dad8c97a84f5/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= +google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.0/go.mod h1:chYK+tFQF0nDUGJgXMSgLCQk3phJEuONr2DCgLDdAQM= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= @@ -959,11 +988,15 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.52.0 h1:j+Lt/M1oPPejkniCg1TkWE2J3Eh1oZTsHSXzMTzUXn4= gopkg.in/ini.v1 v1.52.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw= gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM= gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q= gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4= gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= +gopkg.in/jcmturner/gokrb5.v7 v7.5.0 h1:a9tsXlIDD9SKxotJMK3niV7rPZAJeX2aD/0yg3qlIrg= gopkg.in/jcmturner/gokrb5.v7 v7.5.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= +gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU= gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= @@ -975,6 +1008,7 @@ gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/cmd/opentelemetry-collector/main.go b/cmd/opentelemetry-collector/main.go index 3e686a1a87d..edc159632d9 100644 --- a/cmd/opentelemetry-collector/main.go +++ b/cmd/opentelemetry-collector/main.go @@ -15,10 +15,17 @@ package main import ( + "fmt" "log" + "os" - "github.com/open-telemetry/opentelemetry-collector/defaults" "github.com/open-telemetry/opentelemetry-collector/service" + "github.com/spf13/viper" + + jflags "github.com/jaegertracing/jaeger/cmd/flags" + "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/defaults" + "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka" + jconfig "github.com/jaegertracing/jaeger/pkg/config" ) func main() { @@ -36,8 +43,8 @@ func main() { //GitHash: version.GitHash, } - cmpts, err := defaults.Components() - handleErr(err) + v := viper.New() + cmpts := defaults.Components(v) svc, err := service.New(service.Parameters{ ApplicationStartInfo: info, @@ -45,6 +52,22 @@ func main() { }) handleErr(err) + // Add Jaeger specific flags to service command + // this passes flag values to viper. + cmd := svc.Command() + jconfig.AddFlags(v, + cmd, + jflags.AddConfigFileFlag, + kafka.DefaultOptions().AddFlags, + ) + + // parse flags to propagate Jaeger config file flag value to viper + cmd.ParseFlags(os.Args) + err = jflags.TryLoadConfigFile(v) + if err != nil { + handleErr(fmt.Errorf("could not load Jaeger configuration file %w", err)) + } + err = svc.Start() handleErr(err) } diff --git a/pkg/config/tlscfg/options.go b/pkg/config/tlscfg/options.go index 2e5779f3e0a..dd47182da9c 100644 --- a/pkg/config/tlscfg/options.go +++ b/pkg/config/tlscfg/options.go @@ -24,13 +24,13 @@ import ( // Options describes the configuration properties for TLS Connections. type Options struct { - Enabled bool - CAPath string - CertPath string - KeyPath string - ServerName string // only for client-side TLS config - ClientCAPath string // only for server-side TLS config for client auth - SkipHostVerify bool + Enabled bool `mapstructure:"enabled"` + CAPath string `mapstructure:"ca"` + CertPath string `mapstructure:"cert"` + KeyPath string `mapstructure:"key"` + ServerName string `mapstructure:"server_name"` // only for client-side TLS config + ClientCAPath string `mapstructure:"client_ca"` // only for server-side TLS config for client auth + SkipHostVerify bool `mapstructure:"skip_host_verify"` } var systemCertPool = x509.SystemCertPool // to allow overriding in unit test diff --git a/pkg/kafka/auth/config.go b/pkg/kafka/auth/config.go index 2221cd9d7a1..64c9741d41e 100644 --- a/pkg/kafka/auth/config.go +++ b/pkg/kafka/auth/config.go @@ -39,10 +39,10 @@ var authTypes = []string{ // AuthenticationConfig describes the configuration properties needed authenticate with kafka cluster type AuthenticationConfig struct { - Authentication string - Kerberos KerberosConfig - TLS tlscfg.Options - PlainText PlainTextConfig + Authentication string `mapstructure:"type"` + Kerberos KerberosConfig `mapstructure:"kerberos"` + TLS tlscfg.Options `mapstructure:"tls"` + PlainText PlainTextConfig `mapstructure:"plaintext"` } //SetConfiguration set configure authentication into sarama config structure diff --git a/pkg/kafka/auth/kerberos.go b/pkg/kafka/auth/kerberos.go index 5418a3f00bb..5fce43048e9 100644 --- a/pkg/kafka/auth/kerberos.go +++ b/pkg/kafka/auth/kerberos.go @@ -20,13 +20,13 @@ import ( // KerberosConfig describes the configuration properties needed for Kerberos authentication with kafka consumer type KerberosConfig struct { - ServiceName string - Realm string - UseKeyTab bool - Username string - Password string - ConfigPath string - KeyTabPath string + ServiceName string `mapstructure:"service_name"` + Realm string `mapstructure:"realm"` + UseKeyTab bool `mapstructure:"use_keytab"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` + ConfigPath string `mapstructure:"config_file"` + KeyTabPath string `mapstructure:"keytab_file"` } func setKerberosConfiguration(config *KerberosConfig, saramaConfig *sarama.Config) { diff --git a/pkg/kafka/auth/plaintext.go b/pkg/kafka/auth/plaintext.go index a72641bcff6..7bfa0db7061 100644 --- a/pkg/kafka/auth/plaintext.go +++ b/pkg/kafka/auth/plaintext.go @@ -20,8 +20,8 @@ import ( // PlainTextConfig describes the configuration properties needed for SASL/PLAIN with kafka type PlainTextConfig struct { - UserName string - Password string + UserName string `mapstructure:"username"` + Password string `mapstructure:"password"` } func setPlainTextConfiguration(config *PlainTextConfig, saramaConfig *sarama.Config) { diff --git a/pkg/kafka/producer/config.go b/pkg/kafka/producer/config.go index 05ed761735c..aed288c9d6a 100644 --- a/pkg/kafka/producer/config.go +++ b/pkg/kafka/producer/config.go @@ -29,15 +29,15 @@ type Builder interface { // Configuration describes the configuration properties needed to create a Kafka producer type Configuration struct { - Brokers []string - RequiredAcks sarama.RequiredAcks - Compression sarama.CompressionCodec - CompressionLevel int - ProtocolVersion string - BatchLinger time.Duration - BatchSize int - BatchMaxMessages int - auth.AuthenticationConfig + Brokers []string `mapstructure:"brokers"` + RequiredAcks sarama.RequiredAcks `mapstructure:"required_acks"` + Compression sarama.CompressionCodec `mapstructure:"compression"` + CompressionLevel int `mapstructure:"compression_level"` + ProtocolVersion string `mapstructure:"protocol_version"` + BatchLinger time.Duration `mapstructure:"batch_linger"` + BatchSize int `mapstructure:"batch_size"` + BatchMaxMessages int `mapstructure:"batch_max_messages"` + auth.AuthenticationConfig `mapstructure:"authentication"` } // NewProducer creates a new asynchronous kafka producer diff --git a/plugin/storage/kafka/factory.go b/plugin/storage/kafka/factory.go index 51c8ae5220a..40bbfc9961e 100644 --- a/plugin/storage/kafka/factory.go +++ b/plugin/storage/kafka/factory.go @@ -53,7 +53,13 @@ func (f *Factory) AddFlags(flagSet *flag.FlagSet) { // InitFromViper implements plugin.Configurable func (f *Factory) InitFromViper(v *viper.Viper) { f.options.InitFromViper(v) - f.Builder = &f.options.config + f.Builder = &f.options.Config +} + +// InitFromOptions initializes factory from options. +func (f *Factory) InitFromOptions(o Options) { + f.options = o + f.Builder = &f.options.Config } // Initialize implements storage.Factory @@ -61,13 +67,13 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) f.metricsFactory, f.logger = metricsFactory, logger logger.Info("Kafka factory", zap.Any("producer builder", f.Builder), - zap.Any("topic", f.options.topic)) + zap.Any("topic", f.options.Topic)) p, err := f.NewProducer() if err != nil { return err } f.producer = p - switch f.options.encoding { + switch f.options.Encoding { case EncodingProto: f.marshaller = newProtobufMarshaller() case EncodingJSON: @@ -85,7 +91,7 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { // CreateSpanWriter implements storage.Factory func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { - return NewSpanWriter(f.producer, f.marshaller, f.options.topic, f.metricsFactory, f.logger), nil + return NewSpanWriter(f.producer, f.marshaller, f.options.Topic, f.metricsFactory, f.logger), nil } // CreateDependencyReader implements storage.Factory diff --git a/plugin/storage/kafka/factory_test.go b/plugin/storage/kafka/factory_test.go index bac2231b1e4..48d2f833abe 100644 --- a/plugin/storage/kafka/factory_test.go +++ b/plugin/storage/kafka/factory_test.go @@ -104,3 +104,11 @@ func TestKafkaFactoryMarshallerErr(t *testing.T) { f.Builder = &mockProducerBuilder{t: t} assert.Error(t, f.Initialize(metrics.NullFactory, zap.NewNop())) } + +func TestInitFromOptions(t *testing.T) { + f := NewFactory() + o := Options{Topic: "testTopic", Config: kafkaConfig.Configuration{Brokers: []string{"host"}}} + f.InitFromOptions(o) + assert.Equal(t, o, f.options) + assert.Equal(t, &o.Config, f.Builder) +} diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index 2d3b5f8a60c..15839d1f18d 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -108,9 +108,9 @@ var ( // Options stores the configuration options for Kafka type Options struct { - config producer.Configuration - topic string - encoding string + Config producer.Configuration `mapstructure:",squash"` + Topic string `mapstructure:"topic"` + Encoding string `mapstructure:"encoding"` } // AddFlags adds flags for Options @@ -186,7 +186,7 @@ func (opt *Options) InitFromViper(v *viper.Viper) { log.Fatal(err) } - opt.config = producer.Configuration{ + opt.Config = producer.Configuration{ Brokers: strings.Split(stripWhiteSpace(v.GetString(configPrefix+suffixBrokers)), ","), RequiredAcks: requiredAcks, Compression: compressionModeCodec, @@ -197,8 +197,8 @@ func (opt *Options) InitFromViper(v *viper.Viper) { BatchSize: v.GetInt(configPrefix + suffixBatchSize), BatchMaxMessages: v.GetInt(configPrefix + suffixBatchMaxMessages), } - opt.topic = v.GetString(configPrefix + suffixTopic) - opt.encoding = v.GetString(configPrefix + suffixEncoding) + opt.Topic = v.GetString(configPrefix + suffixTopic) + opt.Encoding = v.GetString(configPrefix + suffixEncoding) } // stripWhiteSpace removes all whitespace characters from a string diff --git a/plugin/storage/kafka/options_test.go b/plugin/storage/kafka/options_test.go index 90b5632e5c0..b77700c9732 100644 --- a/plugin/storage/kafka/options_test.go +++ b/plugin/storage/kafka/options_test.go @@ -44,15 +44,15 @@ func TestOptionsWithFlags(t *testing.T) { }) opts.InitFromViper(v) - assert.Equal(t, "topic1", opts.topic) - assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, opts.config.Brokers) - assert.Equal(t, "protobuf", opts.encoding) - assert.Equal(t, sarama.WaitForLocal, opts.config.RequiredAcks) - assert.Equal(t, sarama.CompressionGZIP, opts.config.Compression) - assert.Equal(t, 7, opts.config.CompressionLevel) - assert.Equal(t, 128000, opts.config.BatchSize) - assert.Equal(t, time.Duration(1*time.Second), opts.config.BatchLinger) - assert.Equal(t, 100, opts.config.BatchMaxMessages) + assert.Equal(t, "topic1", opts.Topic) + assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, opts.Config.Brokers) + assert.Equal(t, "protobuf", opts.Encoding) + assert.Equal(t, sarama.WaitForLocal, opts.Config.RequiredAcks) + assert.Equal(t, sarama.CompressionGZIP, opts.Config.Compression) + assert.Equal(t, 7, opts.Config.CompressionLevel) + assert.Equal(t, 128000, opts.Config.BatchSize) + assert.Equal(t, time.Duration(1*time.Second), opts.Config.BatchLinger) + assert.Equal(t, 100, opts.Config.BatchMaxMessages) } func TestFlagDefaults(t *testing.T) { @@ -61,15 +61,15 @@ func TestFlagDefaults(t *testing.T) { command.ParseFlags([]string{}) opts.InitFromViper(v) - assert.Equal(t, defaultTopic, opts.topic) - assert.Equal(t, []string{defaultBroker}, opts.config.Brokers) - assert.Equal(t, defaultEncoding, opts.encoding) - assert.Equal(t, sarama.WaitForLocal, opts.config.RequiredAcks) - assert.Equal(t, sarama.CompressionNone, opts.config.Compression) - assert.Equal(t, 0, opts.config.CompressionLevel) - assert.Equal(t, 0, opts.config.BatchSize) - assert.Equal(t, time.Duration(0*time.Second), opts.config.BatchLinger) - assert.Equal(t, 0, opts.config.BatchMaxMessages) + assert.Equal(t, defaultTopic, opts.Topic) + assert.Equal(t, []string{defaultBroker}, opts.Config.Brokers) + assert.Equal(t, defaultEncoding, opts.Encoding) + assert.Equal(t, sarama.WaitForLocal, opts.Config.RequiredAcks) + assert.Equal(t, sarama.CompressionNone, opts.Config.Compression) + assert.Equal(t, 0, opts.Config.CompressionLevel) + assert.Equal(t, 0, opts.Config.BatchSize) + assert.Equal(t, time.Duration(0*time.Second), opts.Config.BatchLinger) + assert.Equal(t, 0, opts.Config.BatchMaxMessages) } func TestCompressionLevelDefaults(t *testing.T) { @@ -203,7 +203,7 @@ func TestTLSFlags(t *testing.T) { err := command.ParseFlags(test.flags) require.NoError(t, err) o.InitFromViper(v) - assert.Equal(t, test.expected, o.config.AuthenticationConfig) + assert.Equal(t, test.expected, o.Config.AuthenticationConfig) }) }