Skip to content

Commit

Permalink
Split kafka config into consumer and producer
Browse files Browse the repository at this point in the history
Signed-off-by: Davit Yeghshatyan <[email protected]>
  • Loading branch information
Davit Yeghshatyan committed Jul 23, 2018
1 parent 4039f65 commit d0a7e7e
Show file tree
Hide file tree
Showing 12 changed files with 115 additions and 96 deletions.
6 changes: 3 additions & 3 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/ingester/app/processor"
"github.com/jaegertracing/jaeger/pkg/kafka/config"
"github.com/jaegertracing/jaeger/pkg/kafka/config/consumer"
)

// Params are the parameters of a Consumer
type Params struct {
ProcessorFactory ProcessorFactory
Factory metrics.Factory
Logger *zap.Logger
config.ConsumerBuilder
consumer.Builder
}

// Consumer uses sarama to consume and handle messages from kafka
Expand All @@ -43,7 +43,7 @@ type Consumer struct {
close chan struct{}
isClosed sync.WaitGroup

config.Consumer
consumer.Consumer
}

// New is a constructor for a Consumer
Expand Down
16 changes: 8 additions & 8 deletions cmd/ingester/app/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

kmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks"
pmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/processor/mocks"
"github.com/jaegertracing/jaeger/pkg/kafka/config"
"github.com/jaegertracing/jaeger/pkg/kafka/config/consumer"
)

//go:generate mockery -dir ../../../../pkg/kafka/config/ -name Consumer
Expand All @@ -43,23 +43,23 @@ type consumerTest struct {
}

type mockConsumerConfiguration struct {
config.ConsumerConfiguration
consumer.Configuration
err error
}

func (m *mockConsumerConfiguration) NewConsumer() (config.Consumer, error) {
func (m *mockConsumerConfiguration) NewConsumer() (consumer.Consumer, error) {
return &kmocks.Consumer{}, m.err
}

func TestConstructor(t *testing.T) {
params := Params{}
params.ConsumerBuilder = &mockConsumerConfiguration{}
consumer, err := New(params)
params.Builder = &mockConsumerConfiguration{}
newConsumer, err := New(params)
assert.NoError(t, err)
assert.NotNil(t, consumer)
assert.NotNil(t, consumer.processorFactory)
assert.NotNil(t, newConsumer)
assert.NotNil(t, newConsumer.processorFactory)

params.ConsumerBuilder = &mockConsumerConfiguration{
params.Builder = &mockConsumerConfiguration{
err: errors.New("consumerBuilder error"),
}
_, err = New(params)
Expand Down
4 changes: 2 additions & 2 deletions cmd/ingester/app/consumer/processor_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/offset"
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor"
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor/decorator"
"github.com/jaegertracing/jaeger/pkg/kafka/config"
"github.com/jaegertracing/jaeger/pkg/kafka/config/consumer"
)

// FactoryParams are the parameters of a ProcessorFactory
Expand All @@ -39,7 +39,7 @@ type FactoryParams struct {
// ProcessorFactory is a factory for creating startedProcessors
type ProcessorFactory struct {
topic string
consumer config.Consumer
consumer consumer.Consumer
metricsFactory metrics.Factory
logger *zap.Logger
baseProcessor processor.SpanProcessor
Expand Down
4 changes: 2 additions & 2 deletions cmd/ingester/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/pkg/kafka/config"
"github.com/jaegertracing/jaeger/pkg/kafka/config/consumer"
)

const (
Expand All @@ -39,7 +39,7 @@ const (

// Options stores the configuration options for a Kafka consumer
type Options struct {
config.ConsumerConfiguration
consumer.Configuration
Parallelism int
}

Expand Down
66 changes: 0 additions & 66 deletions pkg/kafka/config/config.go

This file was deleted.

File renamed without changes.
48 changes: 48 additions & 0 deletions pkg/kafka/config/consumer/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consumer

import (
"io"

"github.com/bsm/sarama-cluster"
)

// Consumer is an interface to features of Sarama that are necessary for the consumer
type Consumer interface {
Partitions() <-chan cluster.PartitionConsumer
MarkPartitionOffset(topic string, partition int32, offset int64, metadata string)
io.Closer
}

// Builder builds a new kafka consumer
type Builder interface {
NewConsumer() (Consumer, error)
}

// Configuration describes the configuration properties needed to create a Kafka consumer
type Configuration struct {
Brokers []string
Topic string
GroupID string
Consumer
}

// NewConsumer creates a new kafka consumer
func (c *Configuration) NewConsumer() (Consumer, error) {
saramaConfig := cluster.NewConfig()
saramaConfig.Group.Mode = cluster.ConsumerModePartitions
return cluster.NewConsumer(c.Brokers, c.GroupID, []string{c.Topic}, saramaConfig)
}
1 change: 1 addition & 0 deletions pkg/kafka/config/producer/.nocover
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
requires connection to Kafka
36 changes: 36 additions & 0 deletions pkg/kafka/config/producer/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package producer

import (
"github.com/Shopify/sarama"
)

// Builder builds a new kafka producer
type Builder interface {
NewProducer() (sarama.AsyncProducer, error)
}

// Configuration describes the configuration properties needed to create a Kafka producer
type Configuration struct {
Brokers []string
}

// NewProducer creates a new asynchronous kafka producer
func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Producer.Return.Successes = true
return sarama.NewAsyncProducer(c.Brokers, saramaConfig)
}
12 changes: 6 additions & 6 deletions plugin/storage/kafka/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/kafka/config"
"github.com/jaegertracing/jaeger/pkg/kafka/config/producer"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)
Expand All @@ -35,9 +35,9 @@ type Factory struct {
metricsFactory metrics.Factory
logger *zap.Logger

config config.ProducerBuilder
producer sarama.AsyncProducer
marshaller Marshaller
producer.Builder
}

// NewFactory creates a new Factory.
Expand All @@ -53,16 +53,16 @@ func (f *Factory) AddFlags(flagSet *flag.FlagSet) {
// InitFromViper implements plugin.Configurable
func (f *Factory) InitFromViper(v *viper.Viper) {
f.options.InitFromViper(v)
f.config = &f.options.config
f.Builder = &f.options.config
}

// Initialize implements storage.Factory
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger
logger.Info("Kafka storage configuration",
zap.Any("producer config", f.config),
logger.Info("Kafka producer builder",
zap.Any("producer builder", f.Builder),
zap.Any("topic", f.options.topic))
p, err := f.config.NewProducer()
p, err := f.NewProducer()
if err != nil {
return err
}
Expand Down
12 changes: 6 additions & 6 deletions plugin/storage/kafka/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/config"
kafkaConfig "github.com/jaegertracing/jaeger/pkg/kafka/config"
kafkaConfig "github.com/jaegertracing/jaeger/pkg/kafka/config/producer"
"github.com/jaegertracing/jaeger/storage"
)

// Checks that Kafka Factory conforms to storage.Factory API
var _ storage.Factory = new(Factory)

type mockProducerBuilder struct {
kafkaConfig.ProducerConfiguration
kafkaConfig.Configuration
err error
t *testing.T
}
Expand All @@ -51,13 +51,13 @@ func TestKafkaFactory(t *testing.T) {
command.ParseFlags([]string{})
f.InitFromViper(v)

f.config = &mockProducerBuilder{
f.Builder = &mockProducerBuilder{
err: errors.New("made-up error"),
t: t,
}
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error")

f.config = &mockProducerBuilder{t: t}
f.Builder = &mockProducerBuilder{t: t}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
assert.IsType(t, &protobufMarshaller{}, f.marshaller)

Expand Down Expand Up @@ -86,7 +86,7 @@ func TestKafkaFactoryEncoding(t *testing.T) {
command.ParseFlags([]string{"--kafka.encoding=" + test.encoding})
f.InitFromViper(v)

f.config = &mockProducerBuilder{t: t}
f.Builder = &mockProducerBuilder{t: t}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
assert.IsType(t, test.marshaller, f.marshaller)
})
Expand All @@ -99,6 +99,6 @@ func TestKafkaFactoryMarshallerErr(t *testing.T) {
command.ParseFlags([]string{"--kafka.encoding=bad-input"})
f.InitFromViper(v)

f.config = &mockProducerBuilder{t: t}
f.Builder = &mockProducerBuilder{t: t}
assert.Error(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
}
6 changes: 3 additions & 3 deletions plugin/storage/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/pkg/kafka/config"
"github.com/jaegertracing/jaeger/pkg/kafka/config/producer"
)

const (
Expand All @@ -40,7 +40,7 @@ const (

// Options stores the configuration options for Kafka
type Options struct {
config config.ProducerConfiguration
config producer.Configuration
topic string
encoding string
}
Expand All @@ -64,7 +64,7 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {

// InitFromViper initializes Options with properties from viper
func (opt *Options) InitFromViper(v *viper.Viper) {
opt.config = config.ProducerConfiguration{
opt.config = producer.Configuration{
Brokers: strings.Split(v.GetString(configPrefix+suffixBrokers), ","),
}
opt.topic = v.GetString(configPrefix + suffixTopic)
Expand Down

0 comments on commit d0a7e7e

Please sign in to comment.