Skip to content

Commit

Permalink
Add Zipkin Thrift as kafka ingestion format (#1256)
Browse files Browse the repository at this point in the history
* Add Zipkin Thrift as kafka ingestion format

Signed-off-by: Geoffrey Beausire <[email protected]>

* Fix order import and lint

Signed-off-by: Geoffrey Beausire <[email protected]>

* Refactor the encoding definition

Signed-off-by: Geoffrey Beausire <[email protected]>

* Refacto and implement tests

Signed-off-by: Geoffrey Beausire <[email protected]>

* Improving marshaller testing

Signed-off-by: Geoffrey Beausire <[email protected]>

* Improving deserializer testing

Signed-off-by: Geoffrey Beausire <[email protected]>
  • Loading branch information
geobeau authored and yurishkuro committed Dec 18, 2018
1 parent 8f809ed commit d23d18f
Show file tree
Hide file tree
Showing 11 changed files with 190 additions and 65 deletions.
28 changes: 2 additions & 26 deletions cmd/collector/app/zipkin/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import (
"strings"
"time"

"github.com/apache/thrift/lib/go/thrift"
"github.com/go-openapi/loads"
"github.com/go-openapi/strfmt"
"github.com/go-openapi/swag"
"github.com/gorilla/mux"
tchanThrift "github.com/uber/tchannel-go/thrift"

"github.com/jaegertracing/jaeger/cmd/collector/app"
"github.com/jaegertracing/jaeger/model/converter/thrift/zipkin"
"github.com/jaegertracing/jaeger/swagger-gen/models"
"github.com/jaegertracing/jaeger/swagger-gen/restapi"
"github.com/jaegertracing/jaeger/swagger-gen/restapi/operations"
Expand Down Expand Up @@ -89,7 +89,7 @@ func (aH *APIHandler) saveSpans(w http.ResponseWriter, r *http.Request) {

var tSpans []*zipkincore.Span
if contentType == "application/x-thrift" {
tSpans, err = deserializeThrift(bodyBytes)
tSpans, err = zipkin.DeserializeThrift(bodyBytes)
} else if contentType == "application/json" {
tSpans, err = DeserializeJSON(bodyBytes)
} else {
Expand Down Expand Up @@ -181,27 +181,3 @@ func (aH *APIHandler) saveThriftSpans(tSpans []*zipkincore.Span) error {
}
return nil
}

func deserializeThrift(b []byte) ([]*zipkincore.Span, error) {
buffer := thrift.NewTMemoryBuffer()
buffer.Write(b)

transport := thrift.NewTBinaryProtocolTransport(buffer)
_, size, err := transport.ReadListBegin() // Ignore the returned element type
if err != nil {
return nil, err
}

// We don't depend on the size returned by ReadListBegin to preallocate the array because it
// sometimes returns a nil error on bad input and provides an unreasonably large int for size
var spans []*zipkincore.Span
for i := 0; i < size; i++ {
zs := &zipkincore.Span{}
if err = zs.Read(transport); err != nil {
return nil, err
}
spans = append(spans, zs)
}

return spans, nil
}
23 changes: 3 additions & 20 deletions cmd/collector/app/zipkin/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ import (
"testing"
"time"

"github.com/apache/thrift/lib/go/thrift"
"github.com/gorilla/mux"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
jaegerClient "github.com/uber/jaeger-client-go"
zipkinTransport "github.com/uber/jaeger-client-go/transport/zipkin"
tchanThrift "github.com/uber/tchannel-go/thrift"

zipkinTrift "github.com/jaegertracing/jaeger/model/converter/thrift/zipkin"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)

Expand Down Expand Up @@ -105,7 +105,7 @@ func waitForSpans(t *testing.T, handler *mockZipkinHandler, expecting int) {
func TestThriftFormat(t *testing.T) {
server, _ := initializeTestServer(nil)
defer server.Close()
bodyBytes := zipkinSerialize([]*zipkincore.Span{{}})
bodyBytes := zipkinTrift.ZipkinSerialize([]*zipkincore.Span{{}})
statusCode, resBodyStr, err := postBytes(server.URL+`/api/v1/spans`, bodyBytes, createHeader("application/x-thrift"))
assert.NoError(t, err)
assert.EqualValues(t, http.StatusAccepted, statusCode)
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestJsonFormat(t *testing.T) {
func TestGzipEncoding(t *testing.T) {
server, _ := initializeTestServer(nil)
defer server.Close()
bodyBytes := zipkinSerialize([]*zipkincore.Span{{}})
bodyBytes := zipkinTrift.ZipkinSerialize([]*zipkincore.Span{{}})
header := createHeader("application/x-thrift")
header.Add("Content-Encoding", "gzip")
statusCode, resBodyStr, err := postBytes(server.URL+`/api/v1/spans`, gzipEncode(bodyBytes), header)
Expand Down Expand Up @@ -224,12 +224,6 @@ func TestFormatBadBody(t *testing.T) {
assert.EqualValues(t, "Unable to process request body: *zipkincore.Span field 0 read error: EOF\n", resBodyStr)
}

func TestDeserializeWithBadListStart(t *testing.T) {
spanBytes := zipkinSerialize([]*zipkincore.Span{{}})
_, err := deserializeThrift(append([]byte{0, 255, 255}, spanBytes...))
assert.Error(t, err)
}

func TestCannotReadBodyFromRequest(t *testing.T) {
handler := NewAPIHandler(&mockZipkinHandler{})
req, err := http.NewRequest(http.MethodPost, "whatever", &errReader{})
Expand Down Expand Up @@ -317,17 +311,6 @@ func createHeader(contentType string) *http.Header {
return header
}

func zipkinSerialize(spans []*zipkincore.Span) []byte {
t := thrift.NewTMemoryBuffer()
p := thrift.NewTBinaryProtocolTransport(t)
p.WriteListBegin(thrift.STRUCT, len(spans))
for _, s := range spans {
s.Write(p)
}
p.WriteListEnd()
return t.Buffer.Bytes()
}

func gzipEncode(b []byte) []byte {
buffer := &bytes.Buffer{}
z := gzip.NewWriter(buffer)
Expand Down
11 changes: 7 additions & 4 deletions cmd/ingester/app/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package builder

import (
"fmt"
"strings"

"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
Expand All @@ -31,13 +32,15 @@ import (
// CreateConsumer creates a new span consumer for the ingester
func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWriter spanstore.Writer, options app.Options) (*consumer.Consumer, error) {
var unmarshaller kafka.Unmarshaller
if options.Encoding == app.EncodingJSON {
if options.Encoding == kafka.EncodingJSON {
unmarshaller = kafka.NewJSONUnmarshaller()
} else if options.Encoding == app.EncodingProto {
} else if options.Encoding == kafka.EncodingProto {
unmarshaller = kafka.NewProtobufUnmarshaller()
} else if options.Encoding == kafka.EncodingZipkinThrift {
unmarshaller = kafka.NewZipkinThriftUnmarshaller()
} else {
return nil, fmt.Errorf(`encoding '%s' not recognised, use one of ("%s" or "%s")`,
options.Encoding, app.EncodingProto, app.EncodingJSON)
return nil, fmt.Errorf(`encoding '%s' not recognised, use one of ("%s")`,
options.Encoding, strings.Join(kafka.AllEncodings, "\", \""))
}

spParams := processor.SpanProcessorParams{
Expand Down
10 changes: 3 additions & 7 deletions cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,10 @@ import (
"github.com/spf13/viper"

kafkaConsumer "github.com/jaegertracing/jaeger/pkg/kafka/consumer"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

const (
// EncodingJSON indicates spans are encoded as a json byte array
EncodingJSON = "json"
// EncodingProto indicates spans are encoded as a protobuf byte array
EncodingProto = "protobuf"

// ConfigPrefix is a prefix for the ingester flags
ConfigPrefix = "ingester"
// KafkaConfigPrefix is a prefix for the Kafka flags
Expand Down Expand Up @@ -60,7 +56,7 @@ const (
// DefaultParallelism is the default parallelism for the span processor
DefaultParallelism = 1000
// DefaultEncoding is the default span encoding
DefaultEncoding = EncodingProto
DefaultEncoding = kafka.EncodingProto
// DefaultDeadlockInterval is the default deadlock interval
DefaultDeadlockInterval = 1 * time.Minute
// DefaultHTTPPort is the default HTTP port (e.g. for /metrics)
Expand Down Expand Up @@ -96,7 +92,7 @@ func AddFlags(flagSet *flag.FlagSet) {
flagSet.String(
KafkaConfigPrefix+SuffixEncoding,
DefaultEncoding,
fmt.Sprintf(`The encoding of spans ("%s" or "%s") consumed from kafka`, EncodingProto, EncodingJSON))
fmt.Sprintf(`The encoding of spans ("%s") consumed from kafka`, strings.Join(kafka.AllEncodings, "\", \"")))
flagSet.String(
ConfigPrefix+SuffixParallelism,
strconv.Itoa(DefaultParallelism),
Expand Down
3 changes: 2 additions & 1 deletion cmd/ingester/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

func TestOptionsWithFlags(t *testing.T) {
Expand All @@ -41,7 +42,7 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, "group1", o.GroupID)
assert.Equal(t, 5, o.Parallelism)
assert.Equal(t, 2*time.Minute, o.DeadlockInterval)
assert.Equal(t, EncodingJSON, o.Encoding)
assert.Equal(t, kafka.EncodingJSON, o.Encoding)
assert.Equal(t, 2345, o.IngesterHTTPPort)
}

Expand Down
57 changes: 57 additions & 0 deletions model/converter/thrift/zipkin/deserialize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package zipkin

import (
"github.com/apache/thrift/lib/go/thrift"

"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)

// Function used for testing purposes
func ZipkinSerialize(spans []*zipkincore.Span) []byte {
t := thrift.NewTMemoryBuffer()
p := thrift.NewTBinaryProtocolTransport(t)
p.WriteListBegin(thrift.STRUCT, len(spans))
for _, s := range spans {
s.Write(p)
}
p.WriteListEnd()
return t.Buffer.Bytes()
}

func DeserializeThrift(b []byte) ([]*zipkincore.Span, error) {
buffer := thrift.NewTMemoryBuffer()
buffer.Write(b)

transport := thrift.NewTBinaryProtocolTransport(buffer)
_, size, err := transport.ReadListBegin() // Ignore the returned element type
if err != nil {
return nil, err
}

// We don't depend on the size returned by ReadListBegin to preallocate the array because it
// sometimes returns a nil error on bad input and provides an unreasonably large int for size
var spans []*zipkincore.Span
for i := 0; i < size; i++ {
zs := &zipkincore.Span{}
if err = zs.Read(transport); err != nil {
return nil, err
}
spans = append(spans, zs)
}

return spans, nil
}
42 changes: 42 additions & 0 deletions model/converter/thrift/zipkin/deserialize_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package zipkin

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)

func TestDeserializeWithBadListStart(t *testing.T) {
spanBytes := ZipkinSerialize([]*zipkincore.Span{{}})
_, err := DeserializeThrift(append([]byte{0, 255, 255}, spanBytes...))
assert.Error(t, err)
}

func TestDeserializeWithCorruptedList(t *testing.T) {
spanBytes := ZipkinSerialize([]*zipkincore.Span{{}})
spanBytes[2] = 255
_, err := DeserializeThrift(spanBytes)
assert.Error(t, err)
}

func TestDeserialize(t *testing.T) {
spanBytes := ZipkinSerialize([]*zipkincore.Span{{}})
_, err := DeserializeThrift(spanBytes)
assert.NoError(t, err)
}
6 changes: 3 additions & 3 deletions plugin/storage/kafka/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
}
f.producer = p
switch f.options.encoding {
case encodingProto:
case EncodingProto:
f.marshaller = newProtobufMarshaller()
case encodingJSON:
case EncodingJSON:
f.marshaller = newJSONMarshaller()
default:
return errors.New("kafka encoding is not one of '" + encodingJSON + "' or '" + encodingProto + "'")
return errors.New("kafka encoding is not one of '" + EncodingJSON + "' or '" + EncodingProto + "'")
}
return nil
}
Expand Down
40 changes: 40 additions & 0 deletions plugin/storage/kafka/marshalling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
"github.com/jaegertracing/jaeger/model/converter/thrift/zipkin"
)

func TestProtobufMarshallerAndUnmarshaller(t *testing.T) {
Expand All @@ -39,3 +42,40 @@ func testMarshallerAndUnmarshaller(t *testing.T, marshaller Marshaller, unmarsha
assert.NoError(t, err)
assert.Equal(t, sampleSpan, resultSpan)
}

func TestZipkinThriftUnmarshaller(t *testing.T) {
operationName := "foo"
bytes := zipkin.ZipkinSerialize([]*zipkincore.Span{
{
ID: 12345,
Name: operationName,
Annotations: []*zipkincore.Annotation{
{Host: &zipkincore.Endpoint{ServiceName: "foobar"}},
},
},
})
unmarshaller := NewZipkinThriftUnmarshaller()
resultSpan, err := unmarshaller.Unmarshal(bytes)

assert.NoError(t, err)
assert.Equal(t, operationName, resultSpan.OperationName)
}

func TestZipkinThriftUnmarshallerErrorNoService(t *testing.T) {
bytes := zipkin.ZipkinSerialize([]*zipkincore.Span{
{
ID: 12345,
Name: "foo",
},
})
unmarshaller := NewZipkinThriftUnmarshaller()
_, err := unmarshaller.Unmarshal(bytes)
assert.Error(t, err)
}

func TestZipkinThriftUnmarshallerErrorCorrupted(t *testing.T) {
bytes := []byte("foo")
unmarshaller := NewZipkinThriftUnmarshaller()
_, err := unmarshaller.Unmarshal(bytes)
assert.Error(t, err)
}
13 changes: 9 additions & 4 deletions plugin/storage/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,17 @@ const (
suffixTopic = ".topic"
suffixEncoding = ".encoding"

encodingJSON = "json"
encodingProto = "protobuf"
EncodingJSON = "json"
EncodingProto = "protobuf"
EncodingZipkinThrift = "zipkin-thrift"

defaultBroker = "127.0.0.1:9092"
defaultTopic = "jaeger-spans"
defaultEncoding = encodingProto
defaultEncoding = EncodingProto
)

var (
AllEncodings = []string{EncodingJSON, EncodingProto, EncodingZipkinThrift}
)

// Options stores the configuration options for Kafka
Expand All @@ -58,7 +63,7 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
flagSet.String(
configPrefix+suffixEncoding,
defaultEncoding,
fmt.Sprintf(`(experimental) Encoding of spans ("%s" or "%s") sent to kafka.`, encodingProto, encodingJSON),
fmt.Sprintf(`(experimental) Encoding of spans ("%s" or "%s") sent to kafka.`, EncodingJSON, EncodingProto),
)
}

Expand Down
Loading

0 comments on commit d23d18f

Please sign in to comment.