Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Zipkin Thrift as kafka ingestion format #1256

Merged
merged 6 commits into from
Dec 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 2 additions & 26 deletions cmd/collector/app/zipkin/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import (
"strings"
"time"

"github.com/apache/thrift/lib/go/thrift"
"github.com/go-openapi/loads"
"github.com/go-openapi/strfmt"
"github.com/go-openapi/swag"
"github.com/gorilla/mux"
tchanThrift "github.com/uber/tchannel-go/thrift"

"github.com/jaegertracing/jaeger/cmd/collector/app"
"github.com/jaegertracing/jaeger/model/converter/thrift/zipkin"
"github.com/jaegertracing/jaeger/swagger-gen/models"
"github.com/jaegertracing/jaeger/swagger-gen/restapi"
"github.com/jaegertracing/jaeger/swagger-gen/restapi/operations"
Expand Down Expand Up @@ -89,7 +89,7 @@ func (aH *APIHandler) saveSpans(w http.ResponseWriter, r *http.Request) {

var tSpans []*zipkincore.Span
if contentType == "application/x-thrift" {
tSpans, err = deserializeThrift(bodyBytes)
tSpans, err = zipkin.DeserializeThrift(bodyBytes)
} else if contentType == "application/json" {
tSpans, err = DeserializeJSON(bodyBytes)
} else {
Expand Down Expand Up @@ -181,27 +181,3 @@ func (aH *APIHandler) saveThriftSpans(tSpans []*zipkincore.Span) error {
}
return nil
}

func deserializeThrift(b []byte) ([]*zipkincore.Span, error) {
buffer := thrift.NewTMemoryBuffer()
buffer.Write(b)

transport := thrift.NewTBinaryProtocolTransport(buffer)
_, size, err := transport.ReadListBegin() // Ignore the returned element type
if err != nil {
return nil, err
}

// We don't depend on the size returned by ReadListBegin to preallocate the array because it
// sometimes returns a nil error on bad input and provides an unreasonably large int for size
var spans []*zipkincore.Span
for i := 0; i < size; i++ {
zs := &zipkincore.Span{}
if err = zs.Read(transport); err != nil {
return nil, err
}
spans = append(spans, zs)
}

return spans, nil
}
23 changes: 3 additions & 20 deletions cmd/collector/app/zipkin/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ import (
"testing"
"time"

"github.com/apache/thrift/lib/go/thrift"
"github.com/gorilla/mux"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
jaegerClient "github.com/uber/jaeger-client-go"
zipkinTransport "github.com/uber/jaeger-client-go/transport/zipkin"
tchanThrift "github.com/uber/tchannel-go/thrift"

zipkinTrift "github.com/jaegertracing/jaeger/model/converter/thrift/zipkin"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)

Expand Down Expand Up @@ -105,7 +105,7 @@ func waitForSpans(t *testing.T, handler *mockZipkinHandler, expecting int) {
func TestThriftFormat(t *testing.T) {
server, _ := initializeTestServer(nil)
defer server.Close()
bodyBytes := zipkinSerialize([]*zipkincore.Span{{}})
bodyBytes := zipkinTrift.ZipkinSerialize([]*zipkincore.Span{{}})
statusCode, resBodyStr, err := postBytes(server.URL+`/api/v1/spans`, bodyBytes, createHeader("application/x-thrift"))
assert.NoError(t, err)
assert.EqualValues(t, http.StatusAccepted, statusCode)
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestJsonFormat(t *testing.T) {
func TestGzipEncoding(t *testing.T) {
server, _ := initializeTestServer(nil)
defer server.Close()
bodyBytes := zipkinSerialize([]*zipkincore.Span{{}})
bodyBytes := zipkinTrift.ZipkinSerialize([]*zipkincore.Span{{}})
header := createHeader("application/x-thrift")
header.Add("Content-Encoding", "gzip")
statusCode, resBodyStr, err := postBytes(server.URL+`/api/v1/spans`, gzipEncode(bodyBytes), header)
Expand Down Expand Up @@ -224,12 +224,6 @@ func TestFormatBadBody(t *testing.T) {
assert.EqualValues(t, "Unable to process request body: *zipkincore.Span field 0 read error: EOF\n", resBodyStr)
}

func TestDeserializeWithBadListStart(t *testing.T) {
spanBytes := zipkinSerialize([]*zipkincore.Span{{}})
_, err := deserializeThrift(append([]byte{0, 255, 255}, spanBytes...))
assert.Error(t, err)
}

func TestCannotReadBodyFromRequest(t *testing.T) {
handler := NewAPIHandler(&mockZipkinHandler{})
req, err := http.NewRequest(http.MethodPost, "whatever", &errReader{})
Expand Down Expand Up @@ -317,17 +311,6 @@ func createHeader(contentType string) *http.Header {
return header
}

func zipkinSerialize(spans []*zipkincore.Span) []byte {
t := thrift.NewTMemoryBuffer()
p := thrift.NewTBinaryProtocolTransport(t)
p.WriteListBegin(thrift.STRUCT, len(spans))
for _, s := range spans {
s.Write(p)
}
p.WriteListEnd()
return t.Buffer.Bytes()
}

func gzipEncode(b []byte) []byte {
buffer := &bytes.Buffer{}
z := gzip.NewWriter(buffer)
Expand Down
11 changes: 7 additions & 4 deletions cmd/ingester/app/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package builder

import (
"fmt"
"strings"

"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
Expand All @@ -31,13 +32,15 @@ import (
// CreateConsumer creates a new span consumer for the ingester
func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWriter spanstore.Writer, options app.Options) (*consumer.Consumer, error) {
var unmarshaller kafka.Unmarshaller
if options.Encoding == app.EncodingJSON {
if options.Encoding == kafka.EncodingJSON {
unmarshaller = kafka.NewJSONUnmarshaller()
} else if options.Encoding == app.EncodingProto {
} else if options.Encoding == kafka.EncodingProto {
unmarshaller = kafka.NewProtobufUnmarshaller()
} else if options.Encoding == kafka.EncodingZipkinThrift {
unmarshaller = kafka.NewZipkinThriftUnmarshaller()
} else {
return nil, fmt.Errorf(`encoding '%s' not recognised, use one of ("%s" or "%s")`,
options.Encoding, app.EncodingProto, app.EncodingJSON)
return nil, fmt.Errorf(`encoding '%s' not recognised, use one of ("%s")`,
options.Encoding, strings.Join(kafka.AllEncodings, "\", \""))
}

spParams := processor.SpanProcessorParams{
Expand Down
10 changes: 3 additions & 7 deletions cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,10 @@ import (
"github.com/spf13/viper"

kafkaConsumer "github.com/jaegertracing/jaeger/pkg/kafka/consumer"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

const (
// EncodingJSON indicates spans are encoded as a json byte array
EncodingJSON = "json"
// EncodingProto indicates spans are encoded as a protobuf byte array
EncodingProto = "protobuf"

// ConfigPrefix is a prefix for the ingester flags
ConfigPrefix = "ingester"
// KafkaConfigPrefix is a prefix for the Kafka flags
Expand Down Expand Up @@ -60,7 +56,7 @@ const (
// DefaultParallelism is the default parallelism for the span processor
DefaultParallelism = 1000
// DefaultEncoding is the default span encoding
DefaultEncoding = EncodingProto
DefaultEncoding = kafka.EncodingProto
// DefaultDeadlockInterval is the default deadlock interval
DefaultDeadlockInterval = 1 * time.Minute
// DefaultHTTPPort is the default HTTP port (e.g. for /metrics)
Expand Down Expand Up @@ -96,7 +92,7 @@ func AddFlags(flagSet *flag.FlagSet) {
flagSet.String(
KafkaConfigPrefix+SuffixEncoding,
DefaultEncoding,
fmt.Sprintf(`The encoding of spans ("%s" or "%s") consumed from kafka`, EncodingProto, EncodingJSON))
fmt.Sprintf(`The encoding of spans ("%s") consumed from kafka`, strings.Join(kafka.AllEncodings, "\", \"")))
flagSet.String(
ConfigPrefix+SuffixParallelism,
strconv.Itoa(DefaultParallelism),
Expand Down
3 changes: 2 additions & 1 deletion cmd/ingester/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

func TestOptionsWithFlags(t *testing.T) {
Expand All @@ -41,7 +42,7 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, "group1", o.GroupID)
assert.Equal(t, 5, o.Parallelism)
assert.Equal(t, 2*time.Minute, o.DeadlockInterval)
assert.Equal(t, EncodingJSON, o.Encoding)
assert.Equal(t, kafka.EncodingJSON, o.Encoding)
assert.Equal(t, 2345, o.IngesterHTTPPort)
}

Expand Down
57 changes: 57 additions & 0 deletions model/converter/thrift/zipkin/deserialize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package zipkin

import (
"github.com/apache/thrift/lib/go/thrift"

"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)

// Function used for testing purposes
func ZipkinSerialize(spans []*zipkincore.Span) []byte {
t := thrift.NewTMemoryBuffer()
p := thrift.NewTBinaryProtocolTransport(t)
p.WriteListBegin(thrift.STRUCT, len(spans))
for _, s := range spans {
s.Write(p)
}
p.WriteListEnd()
return t.Buffer.Bytes()
}

func DeserializeThrift(b []byte) ([]*zipkincore.Span, error) {
buffer := thrift.NewTMemoryBuffer()
buffer.Write(b)

transport := thrift.NewTBinaryProtocolTransport(buffer)
_, size, err := transport.ReadListBegin() // Ignore the returned element type
if err != nil {
return nil, err
}

// We don't depend on the size returned by ReadListBegin to preallocate the array because it
// sometimes returns a nil error on bad input and provides an unreasonably large int for size
var spans []*zipkincore.Span
for i := 0; i < size; i++ {
zs := &zipkincore.Span{}
if err = zs.Read(transport); err != nil {
return nil, err
}
spans = append(spans, zs)
}

return spans, nil
}
42 changes: 42 additions & 0 deletions model/converter/thrift/zipkin/deserialize_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package zipkin

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)

func TestDeserializeWithBadListStart(t *testing.T) {
spanBytes := ZipkinSerialize([]*zipkincore.Span{{}})
_, err := DeserializeThrift(append([]byte{0, 255, 255}, spanBytes...))
assert.Error(t, err)
}

func TestDeserializeWithCorruptedList(t *testing.T) {
spanBytes := ZipkinSerialize([]*zipkincore.Span{{}})
spanBytes[2] = 255
_, err := DeserializeThrift(spanBytes)
assert.Error(t, err)
}

func TestDeserialize(t *testing.T) {
spanBytes := ZipkinSerialize([]*zipkincore.Span{{}})
_, err := DeserializeThrift(spanBytes)
assert.NoError(t, err)
}
6 changes: 3 additions & 3 deletions plugin/storage/kafka/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
}
f.producer = p
switch f.options.encoding {
case encodingProto:
case EncodingProto:
f.marshaller = newProtobufMarshaller()
case encodingJSON:
case EncodingJSON:
f.marshaller = newJSONMarshaller()
default:
return errors.New("kafka encoding is not one of '" + encodingJSON + "' or '" + encodingProto + "'")
return errors.New("kafka encoding is not one of '" + EncodingJSON + "' or '" + EncodingProto + "'")
}
return nil
}
Expand Down
40 changes: 40 additions & 0 deletions plugin/storage/kafka/marshalling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
"github.com/jaegertracing/jaeger/model/converter/thrift/zipkin"
)

func TestProtobufMarshallerAndUnmarshaller(t *testing.T) {
Expand All @@ -39,3 +42,40 @@ func testMarshallerAndUnmarshaller(t *testing.T, marshaller Marshaller, unmarsha
assert.NoError(t, err)
assert.Equal(t, sampleSpan, resultSpan)
}

func TestZipkinThriftUnmarshaller(t *testing.T) {
operationName := "foo"
bytes := zipkin.ZipkinSerialize([]*zipkincore.Span{
{
ID: 12345,
Name: operationName,
Annotations: []*zipkincore.Annotation{
{Host: &zipkincore.Endpoint{ServiceName: "foobar"}},
},
},
})
unmarshaller := NewZipkinThriftUnmarshaller()
resultSpan, err := unmarshaller.Unmarshal(bytes)

assert.NoError(t, err)
assert.Equal(t, operationName, resultSpan.OperationName)
}

func TestZipkinThriftUnmarshallerErrorNoService(t *testing.T) {
bytes := zipkin.ZipkinSerialize([]*zipkincore.Span{
{
ID: 12345,
Name: "foo",
},
})
unmarshaller := NewZipkinThriftUnmarshaller()
_, err := unmarshaller.Unmarshal(bytes)
assert.Error(t, err)
}

func TestZipkinThriftUnmarshallerErrorCorrupted(t *testing.T) {
bytes := []byte("foo")
unmarshaller := NewZipkinThriftUnmarshaller()
_, err := unmarshaller.Unmarshal(bytes)
assert.Error(t, err)
}
13 changes: 9 additions & 4 deletions plugin/storage/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,17 @@ const (
suffixTopic = ".topic"
suffixEncoding = ".encoding"

encodingJSON = "json"
encodingProto = "protobuf"
EncodingJSON = "json"
EncodingProto = "protobuf"
EncodingZipkinThrift = "zipkin-thrift"

defaultBroker = "127.0.0.1:9092"
defaultTopic = "jaeger-spans"
defaultEncoding = encodingProto
defaultEncoding = EncodingProto
)

var (
AllEncodings = []string{EncodingJSON, EncodingProto, EncodingZipkinThrift}
)

// Options stores the configuration options for Kafka
Expand All @@ -58,7 +63,7 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
flagSet.String(
configPrefix+suffixEncoding,
defaultEncoding,
fmt.Sprintf(`(experimental) Encoding of spans ("%s" or "%s") sent to kafka.`, encodingProto, encodingJSON),
fmt.Sprintf(`(experimental) Encoding of spans ("%s" or "%s") sent to kafka.`, EncodingJSON, EncodingProto),
)
}

Expand Down
Loading