From d23d18f43147431e03eedd10b82b603388964f91 Mon Sep 17 00:00:00 2001 From: Geoffrey Beausire Date: Tue, 18 Dec 2018 19:32:10 +0100 Subject: [PATCH] Add Zipkin Thrift as kafka ingestion format (#1256) * Add Zipkin Thrift as kafka ingestion format Signed-off-by: Geoffrey Beausire * Fix order import and lint Signed-off-by: Geoffrey Beausire * Refactor the encoding definition Signed-off-by: Geoffrey Beausire * Refacto and implement tests Signed-off-by: Geoffrey Beausire * Improving marshaller testing Signed-off-by: Geoffrey Beausire * Improving deserializer testing Signed-off-by: Geoffrey Beausire --- cmd/collector/app/zipkin/http_handler.go | 28 +-------- cmd/collector/app/zipkin/http_handler_test.go | 23 +------- cmd/ingester/app/builder/builder.go | 11 ++-- cmd/ingester/app/flags.go | 10 +--- cmd/ingester/app/flags_test.go | 3 +- model/converter/thrift/zipkin/deserialize.go | 57 +++++++++++++++++++ .../thrift/zipkin/deserialize_test.go | 42 ++++++++++++++ plugin/storage/kafka/factory.go | 6 +- plugin/storage/kafka/marshalling_test.go | 40 +++++++++++++ plugin/storage/kafka/options.go | 13 +++-- plugin/storage/kafka/unmarshaller.go | 22 +++++++ 11 files changed, 190 insertions(+), 65 deletions(-) create mode 100644 model/converter/thrift/zipkin/deserialize.go create mode 100644 model/converter/thrift/zipkin/deserialize_test.go diff --git a/cmd/collector/app/zipkin/http_handler.go b/cmd/collector/app/zipkin/http_handler.go index 60432d25503..c880bf9abd4 100644 --- a/cmd/collector/app/zipkin/http_handler.go +++ b/cmd/collector/app/zipkin/http_handler.go @@ -24,7 +24,6 @@ import ( "strings" "time" - "github.com/apache/thrift/lib/go/thrift" "github.com/go-openapi/loads" "github.com/go-openapi/strfmt" "github.com/go-openapi/swag" @@ -32,6 +31,7 @@ import ( tchanThrift "github.com/uber/tchannel-go/thrift" "github.com/jaegertracing/jaeger/cmd/collector/app" + "github.com/jaegertracing/jaeger/model/converter/thrift/zipkin" "github.com/jaegertracing/jaeger/swagger-gen/models" "github.com/jaegertracing/jaeger/swagger-gen/restapi" "github.com/jaegertracing/jaeger/swagger-gen/restapi/operations" @@ -89,7 +89,7 @@ func (aH *APIHandler) saveSpans(w http.ResponseWriter, r *http.Request) { var tSpans []*zipkincore.Span if contentType == "application/x-thrift" { - tSpans, err = deserializeThrift(bodyBytes) + tSpans, err = zipkin.DeserializeThrift(bodyBytes) } else if contentType == "application/json" { tSpans, err = DeserializeJSON(bodyBytes) } else { @@ -181,27 +181,3 @@ func (aH *APIHandler) saveThriftSpans(tSpans []*zipkincore.Span) error { } return nil } - -func deserializeThrift(b []byte) ([]*zipkincore.Span, error) { - buffer := thrift.NewTMemoryBuffer() - buffer.Write(b) - - transport := thrift.NewTBinaryProtocolTransport(buffer) - _, size, err := transport.ReadListBegin() // Ignore the returned element type - if err != nil { - return nil, err - } - - // We don't depend on the size returned by ReadListBegin to preallocate the array because it - // sometimes returns a nil error on bad input and provides an unreasonably large int for size - var spans []*zipkincore.Span - for i := 0; i < size; i++ { - zs := &zipkincore.Span{} - if err = zs.Read(transport); err != nil { - return nil, err - } - spans = append(spans, zs) - } - - return spans, nil -} diff --git a/cmd/collector/app/zipkin/http_handler_test.go b/cmd/collector/app/zipkin/http_handler_test.go index c5d8b8864a3..0600ff6daa4 100644 --- a/cmd/collector/app/zipkin/http_handler_test.go +++ b/cmd/collector/app/zipkin/http_handler_test.go @@ -25,7 +25,6 @@ import ( "testing" "time" - "github.com/apache/thrift/lib/go/thrift" "github.com/gorilla/mux" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -33,6 +32,7 @@ import ( zipkinTransport "github.com/uber/jaeger-client-go/transport/zipkin" tchanThrift "github.com/uber/tchannel-go/thrift" + zipkinTrift "github.com/jaegertracing/jaeger/model/converter/thrift/zipkin" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) @@ -105,7 +105,7 @@ func waitForSpans(t *testing.T, handler *mockZipkinHandler, expecting int) { func TestThriftFormat(t *testing.T) { server, _ := initializeTestServer(nil) defer server.Close() - bodyBytes := zipkinSerialize([]*zipkincore.Span{{}}) + bodyBytes := zipkinTrift.ZipkinSerialize([]*zipkincore.Span{{}}) statusCode, resBodyStr, err := postBytes(server.URL+`/api/v1/spans`, bodyBytes, createHeader("application/x-thrift")) assert.NoError(t, err) assert.EqualValues(t, http.StatusAccepted, statusCode) @@ -178,7 +178,7 @@ func TestJsonFormat(t *testing.T) { func TestGzipEncoding(t *testing.T) { server, _ := initializeTestServer(nil) defer server.Close() - bodyBytes := zipkinSerialize([]*zipkincore.Span{{}}) + bodyBytes := zipkinTrift.ZipkinSerialize([]*zipkincore.Span{{}}) header := createHeader("application/x-thrift") header.Add("Content-Encoding", "gzip") statusCode, resBodyStr, err := postBytes(server.URL+`/api/v1/spans`, gzipEncode(bodyBytes), header) @@ -224,12 +224,6 @@ func TestFormatBadBody(t *testing.T) { assert.EqualValues(t, "Unable to process request body: *zipkincore.Span field 0 read error: EOF\n", resBodyStr) } -func TestDeserializeWithBadListStart(t *testing.T) { - spanBytes := zipkinSerialize([]*zipkincore.Span{{}}) - _, err := deserializeThrift(append([]byte{0, 255, 255}, spanBytes...)) - assert.Error(t, err) -} - func TestCannotReadBodyFromRequest(t *testing.T) { handler := NewAPIHandler(&mockZipkinHandler{}) req, err := http.NewRequest(http.MethodPost, "whatever", &errReader{}) @@ -317,17 +311,6 @@ func createHeader(contentType string) *http.Header { return header } -func zipkinSerialize(spans []*zipkincore.Span) []byte { - t := thrift.NewTMemoryBuffer() - p := thrift.NewTBinaryProtocolTransport(t) - p.WriteListBegin(thrift.STRUCT, len(spans)) - for _, s := range spans { - s.Write(p) - } - p.WriteListEnd() - return t.Buffer.Bytes() -} - func gzipEncode(b []byte) []byte { buffer := &bytes.Buffer{} z := gzip.NewWriter(buffer) diff --git a/cmd/ingester/app/builder/builder.go b/cmd/ingester/app/builder/builder.go index ee0301765f2..d9a62fab58d 100644 --- a/cmd/ingester/app/builder/builder.go +++ b/cmd/ingester/app/builder/builder.go @@ -16,6 +16,7 @@ package builder import ( "fmt" + "strings" "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" @@ -31,13 +32,15 @@ import ( // CreateConsumer creates a new span consumer for the ingester func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWriter spanstore.Writer, options app.Options) (*consumer.Consumer, error) { var unmarshaller kafka.Unmarshaller - if options.Encoding == app.EncodingJSON { + if options.Encoding == kafka.EncodingJSON { unmarshaller = kafka.NewJSONUnmarshaller() - } else if options.Encoding == app.EncodingProto { + } else if options.Encoding == kafka.EncodingProto { unmarshaller = kafka.NewProtobufUnmarshaller() + } else if options.Encoding == kafka.EncodingZipkinThrift { + unmarshaller = kafka.NewZipkinThriftUnmarshaller() } else { - return nil, fmt.Errorf(`encoding '%s' not recognised, use one of ("%s" or "%s")`, - options.Encoding, app.EncodingProto, app.EncodingJSON) + return nil, fmt.Errorf(`encoding '%s' not recognised, use one of ("%s")`, + options.Encoding, strings.Join(kafka.AllEncodings, "\", \"")) } spParams := processor.SpanProcessorParams{ diff --git a/cmd/ingester/app/flags.go b/cmd/ingester/app/flags.go index 9ffbffac113..097793decdf 100644 --- a/cmd/ingester/app/flags.go +++ b/cmd/ingester/app/flags.go @@ -24,14 +24,10 @@ import ( "github.com/spf13/viper" kafkaConsumer "github.com/jaegertracing/jaeger/pkg/kafka/consumer" + "github.com/jaegertracing/jaeger/plugin/storage/kafka" ) const ( - // EncodingJSON indicates spans are encoded as a json byte array - EncodingJSON = "json" - // EncodingProto indicates spans are encoded as a protobuf byte array - EncodingProto = "protobuf" - // ConfigPrefix is a prefix for the ingester flags ConfigPrefix = "ingester" // KafkaConfigPrefix is a prefix for the Kafka flags @@ -60,7 +56,7 @@ const ( // DefaultParallelism is the default parallelism for the span processor DefaultParallelism = 1000 // DefaultEncoding is the default span encoding - DefaultEncoding = EncodingProto + DefaultEncoding = kafka.EncodingProto // DefaultDeadlockInterval is the default deadlock interval DefaultDeadlockInterval = 1 * time.Minute // DefaultHTTPPort is the default HTTP port (e.g. for /metrics) @@ -96,7 +92,7 @@ func AddFlags(flagSet *flag.FlagSet) { flagSet.String( KafkaConfigPrefix+SuffixEncoding, DefaultEncoding, - fmt.Sprintf(`The encoding of spans ("%s" or "%s") consumed from kafka`, EncodingProto, EncodingJSON)) + fmt.Sprintf(`The encoding of spans ("%s") consumed from kafka`, strings.Join(kafka.AllEncodings, "\", \""))) flagSet.String( ConfigPrefix+SuffixParallelism, strconv.Itoa(DefaultParallelism), diff --git a/cmd/ingester/app/flags_test.go b/cmd/ingester/app/flags_test.go index c502581d3b1..d0f3b39b804 100644 --- a/cmd/ingester/app/flags_test.go +++ b/cmd/ingester/app/flags_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/jaegertracing/jaeger/pkg/config" + "github.com/jaegertracing/jaeger/plugin/storage/kafka" ) func TestOptionsWithFlags(t *testing.T) { @@ -41,7 +42,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "group1", o.GroupID) assert.Equal(t, 5, o.Parallelism) assert.Equal(t, 2*time.Minute, o.DeadlockInterval) - assert.Equal(t, EncodingJSON, o.Encoding) + assert.Equal(t, kafka.EncodingJSON, o.Encoding) assert.Equal(t, 2345, o.IngesterHTTPPort) } diff --git a/model/converter/thrift/zipkin/deserialize.go b/model/converter/thrift/zipkin/deserialize.go new file mode 100644 index 00000000000..f84948ca1d8 --- /dev/null +++ b/model/converter/thrift/zipkin/deserialize.go @@ -0,0 +1,57 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zipkin + +import ( + "github.com/apache/thrift/lib/go/thrift" + + "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" +) + +// Function used for testing purposes +func ZipkinSerialize(spans []*zipkincore.Span) []byte { + t := thrift.NewTMemoryBuffer() + p := thrift.NewTBinaryProtocolTransport(t) + p.WriteListBegin(thrift.STRUCT, len(spans)) + for _, s := range spans { + s.Write(p) + } + p.WriteListEnd() + return t.Buffer.Bytes() +} + +func DeserializeThrift(b []byte) ([]*zipkincore.Span, error) { + buffer := thrift.NewTMemoryBuffer() + buffer.Write(b) + + transport := thrift.NewTBinaryProtocolTransport(buffer) + _, size, err := transport.ReadListBegin() // Ignore the returned element type + if err != nil { + return nil, err + } + + // We don't depend on the size returned by ReadListBegin to preallocate the array because it + // sometimes returns a nil error on bad input and provides an unreasonably large int for size + var spans []*zipkincore.Span + for i := 0; i < size; i++ { + zs := &zipkincore.Span{} + if err = zs.Read(transport); err != nil { + return nil, err + } + spans = append(spans, zs) + } + + return spans, nil +} diff --git a/model/converter/thrift/zipkin/deserialize_test.go b/model/converter/thrift/zipkin/deserialize_test.go new file mode 100644 index 00000000000..8280f7bcd54 --- /dev/null +++ b/model/converter/thrift/zipkin/deserialize_test.go @@ -0,0 +1,42 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zipkin + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" +) + +func TestDeserializeWithBadListStart(t *testing.T) { + spanBytes := ZipkinSerialize([]*zipkincore.Span{{}}) + _, err := DeserializeThrift(append([]byte{0, 255, 255}, spanBytes...)) + assert.Error(t, err) +} + +func TestDeserializeWithCorruptedList(t *testing.T) { + spanBytes := ZipkinSerialize([]*zipkincore.Span{{}}) + spanBytes[2] = 255 + _, err := DeserializeThrift(spanBytes) + assert.Error(t, err) +} + +func TestDeserialize(t *testing.T) { + spanBytes := ZipkinSerialize([]*zipkincore.Span{{}}) + _, err := DeserializeThrift(spanBytes) + assert.NoError(t, err) +} diff --git a/plugin/storage/kafka/factory.go b/plugin/storage/kafka/factory.go index 3b5dda63ae2..51c8ae5220a 100644 --- a/plugin/storage/kafka/factory.go +++ b/plugin/storage/kafka/factory.go @@ -68,12 +68,12 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) } f.producer = p switch f.options.encoding { - case encodingProto: + case EncodingProto: f.marshaller = newProtobufMarshaller() - case encodingJSON: + case EncodingJSON: f.marshaller = newJSONMarshaller() default: - return errors.New("kafka encoding is not one of '" + encodingJSON + "' or '" + encodingProto + "'") + return errors.New("kafka encoding is not one of '" + EncodingJSON + "' or '" + EncodingProto + "'") } return nil } diff --git a/plugin/storage/kafka/marshalling_test.go b/plugin/storage/kafka/marshalling_test.go index 3ec556ed32f..df68324e523 100644 --- a/plugin/storage/kafka/marshalling_test.go +++ b/plugin/storage/kafka/marshalling_test.go @@ -18,6 +18,9 @@ import ( "testing" "github.com/stretchr/testify/assert" + + "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" + "github.com/jaegertracing/jaeger/model/converter/thrift/zipkin" ) func TestProtobufMarshallerAndUnmarshaller(t *testing.T) { @@ -39,3 +42,40 @@ func testMarshallerAndUnmarshaller(t *testing.T, marshaller Marshaller, unmarsha assert.NoError(t, err) assert.Equal(t, sampleSpan, resultSpan) } + +func TestZipkinThriftUnmarshaller(t *testing.T) { + operationName := "foo" + bytes := zipkin.ZipkinSerialize([]*zipkincore.Span{ + { + ID: 12345, + Name: operationName, + Annotations: []*zipkincore.Annotation{ + {Host: &zipkincore.Endpoint{ServiceName: "foobar"}}, + }, + }, + }) + unmarshaller := NewZipkinThriftUnmarshaller() + resultSpan, err := unmarshaller.Unmarshal(bytes) + + assert.NoError(t, err) + assert.Equal(t, operationName, resultSpan.OperationName) +} + +func TestZipkinThriftUnmarshallerErrorNoService(t *testing.T) { + bytes := zipkin.ZipkinSerialize([]*zipkincore.Span{ + { + ID: 12345, + Name: "foo", + }, + }) + unmarshaller := NewZipkinThriftUnmarshaller() + _, err := unmarshaller.Unmarshal(bytes) + assert.Error(t, err) +} + +func TestZipkinThriftUnmarshallerErrorCorrupted(t *testing.T) { + bytes := []byte("foo") + unmarshaller := NewZipkinThriftUnmarshaller() + _, err := unmarshaller.Unmarshal(bytes) + assert.Error(t, err) +} diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index e1f6f3a57e2..3b3581b02e0 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -30,12 +30,17 @@ const ( suffixTopic = ".topic" suffixEncoding = ".encoding" - encodingJSON = "json" - encodingProto = "protobuf" + EncodingJSON = "json" + EncodingProto = "protobuf" + EncodingZipkinThrift = "zipkin-thrift" defaultBroker = "127.0.0.1:9092" defaultTopic = "jaeger-spans" - defaultEncoding = encodingProto + defaultEncoding = EncodingProto +) + +var ( + AllEncodings = []string{EncodingJSON, EncodingProto, EncodingZipkinThrift} ) // Options stores the configuration options for Kafka @@ -58,7 +63,7 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { flagSet.String( configPrefix+suffixEncoding, defaultEncoding, - fmt.Sprintf(`(experimental) Encoding of spans ("%s" or "%s") sent to kafka.`, encodingProto, encodingJSON), + fmt.Sprintf(`(experimental) Encoding of spans ("%s" or "%s") sent to kafka.`, EncodingJSON, EncodingProto), ) } diff --git a/plugin/storage/kafka/unmarshaller.go b/plugin/storage/kafka/unmarshaller.go index d19b7181914..ae02b304e9f 100644 --- a/plugin/storage/kafka/unmarshaller.go +++ b/plugin/storage/kafka/unmarshaller.go @@ -21,6 +21,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/model/converter/thrift/zipkin" ) // Unmarshaller decodes a byte array to a span @@ -57,3 +58,24 @@ func (h *JSONUnmarshaller) Unmarshal(msg []byte) (*model.Span, error) { err := jsonpb.Unmarshal(bytes.NewReader(msg), newSpan) return newSpan, err } + +// zipkinThriftUnmarshaller implements Unmarshaller +type zipkinThriftUnmarshaller struct{} + +// NewZipkinThriftUnmarshaller constructs a zipkinThriftUnmarshaller +func NewZipkinThriftUnmarshaller() *zipkinThriftUnmarshaller { + return &zipkinThriftUnmarshaller{} +} + +// Unmarshal decodes a json byte array to a span +func (h *zipkinThriftUnmarshaller) Unmarshal(msg []byte) (*model.Span, error) { + tSpans, err := zipkin.DeserializeThrift(msg) + if err != nil { + return nil, err + } + mSpans, err := zipkin.ToDomainSpan(tSpans[0]) + if err != nil { + return nil, err + } + return mSpans[0], err +}