From 0f34c30c014a73763b6acca01a8ea7023e211a32 Mon Sep 17 00:00:00 2001 From: Davit Yeghshatyan Date: Wed, 25 Jul 2018 01:43:22 -0400 Subject: [PATCH] Improve span processor Signed-off-by: Davit Yeghshatyan --- cmd/ingester/app/processor/span_processor.go | 33 +++++++++++-------- .../app/processor/span_processor_test.go | 7 ++-- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/cmd/ingester/app/processor/span_processor.go b/cmd/ingester/app/processor/span_processor.go index 696545928e1..a0a52c1b2cb 100644 --- a/cmd/ingester/app/processor/span_processor.go +++ b/cmd/ingester/app/processor/span_processor.go @@ -23,7 +23,7 @@ import ( "github.com/jaegertracing/jaeger/storage/spanstore" ) -//go:generate mockery -name=SpanProcessor +//go:generate mockery -name=KafkaSpanProcessor // SpanProcessor processes kafka spans type SpanProcessor interface { @@ -31,27 +31,34 @@ type SpanProcessor interface { io.Closer } -type spanProcessor struct { - unmarshaller kafka.Unmarshaller - writer spanstore.Writer - io.Closer -} - // Message contains the fields of the kafka message that the span processor uses type Message interface { Value() []byte } -// NewSpanProcessor creates a new SpanProcessor -func NewSpanProcessor(writer spanstore.Writer, unmarshaller kafka.Unmarshaller) SpanProcessor { - return &spanProcessor{ - unmarshaller: unmarshaller, - writer: writer, +// SpanProcessorParams stores the necessary parameters for a SpanProcessor +type SpanProcessorParams struct { + Writer spanstore.Writer + Unmarshaller kafka.Unmarshaller +} + +// KafkaSpanProcessor implements SpanProcessor for Kafka messages +type KafkaSpanProcessor struct { + unmarshaller kafka.Unmarshaller + writer spanstore.Writer + io.Closer +} + +// NewSpanProcessor creates a new KafkaSpanProcessor +func NewSpanProcessor(params SpanProcessorParams) *KafkaSpanProcessor { + return &KafkaSpanProcessor{ + unmarshaller: params.Unmarshaller, + writer: params.Writer, } } // Process unmarshals and writes a single kafka message -func (s spanProcessor) Process(message Message) error { +func (s KafkaSpanProcessor) Process(message Message) error { mSpan, err := s.unmarshaller.Unmarshal(message.Value()) if err != nil { return errors.Wrap(err, "cannot unmarshall byte array into span") diff --git a/cmd/ingester/app/processor/span_processor_test.go b/cmd/ingester/app/processor/span_processor_test.go index 2d89b5e43b9..100b522ebef 100644 --- a/cmd/ingester/app/processor/span_processor_test.go +++ b/cmd/ingester/app/processor/span_processor_test.go @@ -27,13 +27,14 @@ import ( ) func TestNewSpanProcessor(t *testing.T) { - assert.NotNil(t, NewSpanProcessor(&smocks.Writer{}, &umocks.Unmarshaller{})) + p := SpanProcessorParams{} + assert.NotNil(t, NewSpanProcessor(p)) } func TestSpanProcessor_Process(t *testing.T) { writer := &smocks.Writer{} unmarshallerMock := &umocks.Unmarshaller{} - processor := &spanProcessor{ + processor := &KafkaSpanProcessor{ unmarshaller: unmarshallerMock, writer: writer, } @@ -55,7 +56,7 @@ func TestSpanProcessor_Process(t *testing.T) { func TestSpanProcessor_ProcessError(t *testing.T) { writer := &smocks.Writer{} unmarshallerMock := &umocks.Unmarshaller{} - processor := &spanProcessor{ + processor := &KafkaSpanProcessor{ unmarshaller: unmarshallerMock, writer: writer, }