Skip to content

Commit

Permalink
Add Kafka OTEL storage exporter (#2135)
Browse files Browse the repository at this point in the history
* Add Kafka OTEL storage exporter

Signed-off-by: Pavol Loffay <[email protected]>

* Add init from options test to factory

Signed-off-by: Pavol Loffay <[email protected]>
  • Loading branch information
pavolloffay authored Mar 25, 2020
1 parent 87c466d commit e124801
Show file tree
Hide file tree
Showing 26 changed files with 828 additions and 65 deletions.
14 changes: 11 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
JAEGER_IMPORT_PATH=github.com/jaegertracing/jaeger
STORAGE_PKGS = ./plugin/storage/integration/...
OTEL_COLLECTOR_DIR = ./cmd/opentelemetry-collector

# all .go files that are not auto-generated and should be auto-formatted and linted.
ALL_SRC := $(shell find . -name '*.go' \
Expand Down Expand Up @@ -87,9 +88,13 @@ clean:
rm -rf cover.out .cover/ cover.html lint.log fmt.log

.PHONY: test
test: go-gen
test: go-gen test-otel
bash -c "set -e; set -o pipefail; $(GOTEST) ./... | $(COLORIZE)"

.PHONY: test-otel
test-otel:
cd ${OTEL_COLLECTOR_DIR} && bash -c "set -e; set -o pipefail; $(GOTEST) ./... | $(COLORIZE)"

.PHONY: all-in-one-integration-test
all-in-one-integration-test: go-gen
$(GOTEST) -tags=integration ./cmd/all-in-one/...
Expand Down Expand Up @@ -160,7 +165,7 @@ lint-staticcheck:
@[ ! -s "$(LINT_LOG)" ] || (echo "Detected staticcheck failures:" | cat - $(LINT_LOG) && false)

.PHONY: lint
lint: lint-staticcheck lint-gosec
lint: lint-staticcheck lint-gosec lint-otel
$(GOVET) ./...
$(MAKE) go-lint
@echo Running go fmt on ALL_SRC ...
Expand All @@ -169,6 +174,10 @@ lint: lint-staticcheck lint-gosec
./scripts/import-order-cleanup.sh stdout > $(IMPORT_LOG)
@[ ! -s "$(FMT_LOG)" -a ! -s "$(IMPORT_LOG)" ] || (echo "Go fmt, license check, or import ordering failures, run 'make fmt'" | cat - $(FMT_LOG) && false)

.PHONY: lint-otel
lint-otel:
cd ${OTEL_COLLECTOR_DIR} && $(GOVET) ./...

.PHONY: go-lint
go-lint:
@cat /dev/null > $(LINT_LOG)
Expand Down Expand Up @@ -246,7 +255,6 @@ else
$(GOBUILD) -o ./cmd/collector/collector-$(GOOS) $(BUILD_INFO) ./cmd/collector/main.go
endif

OTEL_COLLECTOR_DIR = ./cmd/opentelemetry-collector
.PHONY: build-otel-collector
build-otel-collector:
ifeq ($(GOARCH), s390x)
Expand Down
37 changes: 37 additions & 0 deletions cmd/opentelemetry-collector/app/defaults/defaults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package defaults

import (
"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/open-telemetry/opentelemetry-collector/defaults"
"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
storageKafka "github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

// Components creates default and Jaeger factories
func Components(v *viper.Viper) config.Factories {
kafkaExp := kafka.Factory{OptionsFactory: func() *storageKafka.Options {
opts := kafka.DefaultOptions()
opts.InitFromViper(v)
return opts
}}

factories, _ := defaults.Components()
factories.Exporters[kafkaExp.Type()] = kafkaExp
return factories
}
34 changes: 34 additions & 0 deletions cmd/opentelemetry-collector/app/defaults/defaults_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package defaults

import (
"testing"

"github.com/magiconair/properties/assert"

"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
jConfig "github.com/jaegertracing/jaeger/pkg/config"
)

func TestComponents(t *testing.T) {
v, _ := jConfig.Viperize(kafka.DefaultOptions().AddFlags)
factories := Components(v)
assert.Equal(t, "jaeger_kafka", factories.Exporters[kafka.TypeStr].Type())

kafkaFactory := factories.Exporters[kafka.TypeStr]
kc := kafkaFactory.CreateDefaultConfig().(*kafka.Config)
assert.Equal(t, []string{"127.0.0.1:9092"}, kc.Config.Brokers)
}
27 changes: 27 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/kafka/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"

"github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

// Config hold configuration of Jaeger Kafka exporter/storage.
type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
kafka.Options `mapstructure:",squash"`
}
93 changes: 93 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/kafka/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"path"
"testing"

"github.com/Shopify/sarama"
"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/open-telemetry/opentelemetry-collector/config/configcheck"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/cmd/flags"
jConfig "github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

func TestDefaultConfig(t *testing.T) {
v, c := jConfig.Viperize(DefaultOptions().AddFlags)
err := c.ParseFlags([]string{""})
require.NoError(t, err)
factory := &Factory{OptionsFactory: func() *kafka.Options {
opts := DefaultOptions()
opts.InitFromViper(v)
return opts
}}
defaultCfg := factory.CreateDefaultConfig().(*Config)
assert.NoError(t, configcheck.ValidateConfig(defaultCfg))
assert.Equal(t, "jaeger-spans", defaultCfg.Topic)
assert.Equal(t, "protobuf", defaultCfg.Encoding)
assert.Equal(t, []string{"127.0.0.1:9092"}, defaultCfg.Config.Brokers)
assert.Equal(t, sarama.WaitForLocal, defaultCfg.Config.RequiredAcks)
assert.Equal(t, "none", defaultCfg.Config.Authentication)
assert.Equal(t, "/etc/krb5.conf", defaultCfg.Config.Kerberos.ConfigPath)
assert.Equal(t, "kafka", defaultCfg.Config.Kerberos.ServiceName)
assert.Equal(t, false, defaultCfg.Config.TLS.Enabled)
}

func TestLoadConfigAndFlags(t *testing.T) {
factories, err := config.ExampleComponents()
require.NoError(t, err)

v, c := jConfig.Viperize(DefaultOptions().AddFlags, flags.AddConfigFileFlag)
err = c.ParseFlags([]string{"--config-file=./testdata/jaeger-config.yaml", "--kafka.producer.topic=jaeger-test", "--kafka.producer.brokers=host1,host2"})
require.NoError(t, err)

err = flags.TryLoadConfigFile(v)
require.NoError(t, err)

factory := &Factory{OptionsFactory: func() *kafka.Options {
opts := DefaultOptions()
opts.InitFromViper(v)
assert.Equal(t, "jaeger-test", opts.Topic)
assert.Equal(t, []string{"host1", "host2"}, opts.Config.Brokers)
return opts
}}

factories.Exporters[TypeStr] = factory
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, cfg)

kafkaCfg := cfg.Exporters[TypeStr].(*Config)
assert.Equal(t, TypeStr, kafkaCfg.Name())
assert.Equal(t, "jaeger-prod", kafkaCfg.Topic)
assert.Equal(t, "emojis", kafkaCfg.Encoding)
assert.Equal(t, []string{"foo", "bar"}, kafkaCfg.Config.Brokers)
assert.Equal(t, "tls", kafkaCfg.Config.Authentication)
assert.Equal(t, "user", kafkaCfg.Config.PlainText.UserName)
assert.Equal(t, "123", kafkaCfg.Config.PlainText.Password)
assert.Equal(t, true, kafkaCfg.Config.TLS.Enabled)
assert.Equal(t, "ca.crt", kafkaCfg.Config.TLS.CAPath)
assert.Equal(t, "key.crt", kafkaCfg.Config.TLS.KeyPath)
assert.Equal(t, "cert.crt", kafkaCfg.Config.TLS.CertPath)
assert.Equal(t, true, kafkaCfg.Config.TLS.SkipHostVerify)
assert.Equal(t, "jaeger", kafkaCfg.Config.Kerberos.Realm)
assert.Equal(t, "/etc/foo", kafkaCfg.Config.Kerberos.ConfigPath)
assert.Equal(t, "from-jaeger-config", kafkaCfg.Config.Kerberos.Username)
}
16 changes: 16 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/kafka/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package kafka implements Jaeger Kafka exporter.
package kafka
45 changes: 45 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/kafka/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"github.com/open-telemetry/opentelemetry-collector/exporter"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

storageOtelExporter "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
"github.com/jaegertracing/jaeger/storage"
)

// New creates new Kafka exporter
func New(config *Config, log *zap.Logger) (exporter.TraceExporter, error) {
f := kafka.NewFactory()
f.InitFromOptions(config.Options)
err := f.Initialize(metrics.NullFactory, log)
if err != nil {
return nil, err
}
return create(f, config)
}

func create(factory storage.Factory, config *Config) (exporter.TraceExporter, error) {
// ignoring error for code coverage. Kafka factory never returns an error
spanWriter, err := factory.CreateSpanWriter()
if err != nil {
return nil, err
}
return storageOtelExporter.NewSpanWriterExporter(config, spanWriter)
}
57 changes: 57 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/kafka/exporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

func TestNew(t *testing.T) {
m := &mockProducerBuilder{}
c := &Config{}
exporter, err := create(m, c)
require.Nil(t, err)
assert.NotNil(t, exporter)
m = &mockProducerBuilder{err: errors.New("failed to create")}
exporter, err = create(m, c)
assert.Error(t, err, "failed to create")
assert.Nil(t, exporter)
}

type mockProducerBuilder struct {
err error
}

func (m mockProducerBuilder) CreateSpanWriter() (spanstore.Writer, error) {
return nil, m.err
}
func (mockProducerBuilder) CreateSpanReader() (spanstore.Reader, error) {
return nil, nil
}
func (mockProducerBuilder) CreateDependencyReader() (dependencystore.Reader, error) {
return nil, nil
}
func (mockProducerBuilder) Initialize(metrics.Factory, *zap.Logger) error {
return nil
}
Loading

0 comments on commit e124801

Please sign in to comment.