Skip to content

Commit

Permalink
Improve span processor
Browse files Browse the repository at this point in the history
Signed-off-by: Davit Yeghshatyan <[email protected]>
  • Loading branch information
Davit Yeghshatyan committed Jul 26, 2018
1 parent 94479c7 commit 0f34c30
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 16 deletions.
33 changes: 20 additions & 13 deletions cmd/ingester/app/processor/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,42 @@ import (
"github.com/jaegertracing/jaeger/storage/spanstore"
)

//go:generate mockery -name=SpanProcessor
//go:generate mockery -name=KafkaSpanProcessor

// SpanProcessor processes kafka spans
type SpanProcessor interface {
Process(input Message) error
io.Closer
}

type spanProcessor struct {
unmarshaller kafka.Unmarshaller
writer spanstore.Writer
io.Closer
}

// Message contains the fields of the kafka message that the span processor uses
type Message interface {
Value() []byte
}

// NewSpanProcessor creates a new SpanProcessor
func NewSpanProcessor(writer spanstore.Writer, unmarshaller kafka.Unmarshaller) SpanProcessor {
return &spanProcessor{
unmarshaller: unmarshaller,
writer: writer,
// SpanProcessorParams stores the necessary parameters for a SpanProcessor
type SpanProcessorParams struct {
Writer spanstore.Writer
Unmarshaller kafka.Unmarshaller
}

// KafkaSpanProcessor implements SpanProcessor for Kafka messages
type KafkaSpanProcessor struct {
unmarshaller kafka.Unmarshaller
writer spanstore.Writer
io.Closer
}

// NewSpanProcessor creates a new KafkaSpanProcessor
func NewSpanProcessor(params SpanProcessorParams) *KafkaSpanProcessor {
return &KafkaSpanProcessor{
unmarshaller: params.Unmarshaller,
writer: params.Writer,
}
}

// Process unmarshals and writes a single kafka message
func (s spanProcessor) Process(message Message) error {
func (s KafkaSpanProcessor) Process(message Message) error {
mSpan, err := s.unmarshaller.Unmarshal(message.Value())
if err != nil {
return errors.Wrap(err, "cannot unmarshall byte array into span")
Expand Down
7 changes: 4 additions & 3 deletions cmd/ingester/app/processor/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ import (
)

func TestNewSpanProcessor(t *testing.T) {
assert.NotNil(t, NewSpanProcessor(&smocks.Writer{}, &umocks.Unmarshaller{}))
p := SpanProcessorParams{}
assert.NotNil(t, NewSpanProcessor(p))
}

func TestSpanProcessor_Process(t *testing.T) {
writer := &smocks.Writer{}
unmarshallerMock := &umocks.Unmarshaller{}
processor := &spanProcessor{
processor := &KafkaSpanProcessor{
unmarshaller: unmarshallerMock,
writer: writer,
}
Expand All @@ -55,7 +56,7 @@ func TestSpanProcessor_Process(t *testing.T) {
func TestSpanProcessor_ProcessError(t *testing.T) {
writer := &smocks.Writer{}
unmarshallerMock := &umocks.Unmarshaller{}
processor := &spanProcessor{
processor := &KafkaSpanProcessor{
unmarshaller: unmarshallerMock,
writer: writer,
}
Expand Down

0 comments on commit 0f34c30

Please sign in to comment.