Skip to content

Commit

Permalink
Unmarshal messages and write spans
Browse files Browse the repository at this point in the history
Signed-off-by: Davit Yeghshatyan <[email protected]>
  • Loading branch information
Davit Yeghshatyan committed Jul 20, 2018
1 parent 933efb3 commit ece2341
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 8 deletions.
28 changes: 28 additions & 0 deletions cmd/ingester/app/processor/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ package processor

import (
"io"

"github.com/pkg/errors"

"github.com/jaegertracing/jaeger/plugin/storage/kafka"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

//go:generate mockery -name=SpanProcessor
Expand All @@ -26,7 +31,30 @@ type SpanProcessor interface {
io.Closer
}

type spanProcessor struct {
unmarshaller kafka.Unmarshaller
writer spanstore.Writer
io.Closer
}

// Message contains the fields of the kafka message that the span processor uses
type Message interface {
Value() []byte
}

// NewSpanProcessor creates a new SpanProcessor
func NewSpanProcessor(writer spanstore.Writer) SpanProcessor {
return &spanProcessor{
unmarshaller: kafka.NewProtobufUnmarshaller(),
writer: writer,
}
}

// Process unmarshals and writes a single kafka message
func (s spanProcessor) Process(message Message) error {
mSpan, err := s.unmarshaller.Unmarshal(message.Value())
if err != nil {
return errors.Wrap(err, "cannot read message")
}
return s.writer.WriteSpan(mSpan)
}
73 changes: 73 additions & 0 deletions cmd/ingester/app/processor/span_processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package processor

import (
"testing"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"

kmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks"
"github.com/jaegertracing/jaeger/model"
umocks "github.com/jaegertracing/jaeger/pkg/kafka/mocks"
smocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
)

func TestNewSpanProcessor(t *testing.T) {
assert.NotNil(t, NewSpanProcessor(&smocks.Writer{}))
}

func TestSpanProcessor_Process(t *testing.T) {
writer := &smocks.Writer{}
unmarshallerMock := &umocks.Unmarshaller{}
processor := &spanProcessor{
unmarshaller: unmarshallerMock,
writer: writer,
}

message := &kmocks.Message{}
data := []byte("police")
span := &model.Span{}

message.On("Value").Return(data)
unmarshallerMock.On("Unmarshal", data).Return(span, nil)
writer.On("WriteSpan", span).Return(nil)

assert.Nil(t, processor.Process(message))

message.AssertExpectations(t)
writer.AssertExpectations(t)
}

func TestSpanProcessor_ProcessError(t *testing.T) {
writer := &smocks.Writer{}
unmarshallerMock := &umocks.Unmarshaller{}
processor := &spanProcessor{
unmarshaller: unmarshallerMock,
writer: writer,
}

message := &kmocks.Message{}
data := []byte("police")

message.On("Value").Return(data)
unmarshallerMock.On("Unmarshal", data).Return(nil, errors.New("moocow"))

assert.Error(t, processor.Process(message))

message.AssertExpectations(t)
writer.AssertNotCalled(t, "WriteSpan")
}
48 changes: 48 additions & 0 deletions pkg/kafka/mocks/Unmarshaller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,22 @@ import (
"github.com/stretchr/testify/assert"
)

func TestProtoMarshaller(t *testing.T) {
marshaller := newProtobufMarshaller()
func TestProtobufMarshallerAndUnmarshaller(t *testing.T) {
testMarshallerAndUnmarshaller(t, newProtobufMarshaller(), NewProtobufUnmarshaller())
}

func TestJSONMarshallerAndUnmarshaller(t *testing.T) {
testMarshallerAndUnmarshaller(t, newJSONMarshaller(), NewJSONUnmarshaller())
}

func testMarshallerAndUnmarshaller(t *testing.T, marshaller Marshaller, unmarshaller Unmarshaller) {
bytes, err := marshaller.Marshal(sampleSpan)

assert.NoError(t, err)
assert.NotNil(t, bytes)
}

func TestJSONMarshaller(t *testing.T) {
marshaller := newJSONMarshaller()

bytes, err := marshaller.Marshal(sampleSpan)
resultSpan, err := unmarshaller.Unmarshal(bytes)

assert.NoError(t, err)
assert.NotNil(t, bytes)
assert.Equal(t, sampleSpan, resultSpan)
}
59 changes: 59 additions & 0 deletions plugin/storage/kafka/unmarshaller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"bytes"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"

"github.com/jaegertracing/jaeger/model"
)

// Unmarshaller decodes a byte array to a span
type Unmarshaller interface {
Unmarshal([]byte) (*model.Span, error)
}

// ProtobufUnmarshaller implements Unmarshaller
type ProtobufUnmarshaller struct{}

// NewProtobufUnmarshaller constructs a ProtobufUnmarshaller
func NewProtobufUnmarshaller() *ProtobufUnmarshaller {
return &ProtobufUnmarshaller{}
}

// Unmarshal decodes a protobuf byte array to a span
func (h *ProtobufUnmarshaller) Unmarshal(msg []byte) (*model.Span, error) {
newSpan := &model.Span{}
err := proto.Unmarshal(msg, newSpan)
return newSpan, err
}

// JSONUnmarshaller implements Unmarshaller
type JSONUnmarshaller struct{}

// NewJSONUnmarshaller constructs a ProtobufUnmarshaller
func NewJSONUnmarshaller() *JSONUnmarshaller {
return &JSONUnmarshaller{}
}

// Unmarshal decodes a json byte array to a span
func (h *JSONUnmarshaller) Unmarshal(msg []byte) (*model.Span, error) {
newSpan := &model.Span{}
err := jsonpb.Unmarshal(bytes.NewReader(msg), newSpan)
return newSpan, err
}

0 comments on commit ece2341

Please sign in to comment.