Skip to content

Commit

Permalink
Split zipkin span
Browse files Browse the repository at this point in the history
Split zipkin span if it contains client and server annotations

Signed-off-by: Pavol Loffay <[email protected]>
  • Loading branch information
pavolloffay committed Oct 25, 2017
1 parent a7f203b commit 9da8b4b
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 22 deletions.
8 changes: 4 additions & 4 deletions cmd/collector/app/span_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ func NewZipkinSpanHandler(logger *zap.Logger, modelHandler SpanProcessor, saniti

// SubmitZipkinBatch records a batch of spans already in Zipkin Thrift format.
func (h *zipkinSpanHandler) SubmitZipkinBatch(ctx thrift.Context, spans []*zipkincore.Span) ([]*zipkincore.Response, error) {
mSpans := make([]*model.Span, len(spans))
for i, span := range spans {
mSpans := make([]*model.Span, 0, len(spans))
for _, span := range spans {
sanitized := h.sanitizer.Sanitize(span)
mSpans[i] = ConvertZipkinToModel(sanitized, h.logger)
mSpans = append(mSpans, convertZipkinToModel(sanitized, h.logger)...)
}
bools, err := h.modelProcessor.ProcessSpans(mSpans, ZipkinFormatType)
if err != nil {
Expand All @@ -129,7 +129,7 @@ func (h *zipkinSpanHandler) SubmitZipkinBatch(ctx thrift.Context, spans []*zipki
}

// ConvertZipkinToModel is a helper function that logs warnings during conversion
func ConvertZipkinToModel(zSpan *zipkincore.Span, logger *zap.Logger) *model.Span {
func convertZipkinToModel(zSpan *zipkincore.Span, logger *zap.Logger) []*model.Span {
mSpan, err := zipkin.ToDomainSpan(zSpan)
if err != nil {
logger.Warn("Warning while converting zipkin to domain span", zap.Error(err))
Expand Down
72 changes: 57 additions & 15 deletions model/converter/thrift/zipkin/to_domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ const (

var (
coreAnnotations = map[string]string{
zipkincore.SERVER_RECV: string(ext.SpanKindRPCServerEnum),
zipkincore.SERVER_SEND: string(ext.SpanKindRPCServerEnum),
zipkincore.CLIENT_RECV: string(ext.SpanKindRPCClientEnum),
zipkincore.CLIENT_SEND: string(ext.SpanKindRPCClientEnum),
zipkincore.SERVER_RECV: string(ext.SpanKindRPCServerEnum),
zipkincore.SERVER_SEND: string(ext.SpanKindRPCServerEnum),
}

// Some tags on Zipkin spans really describe the process emitting them rather than an individual span.
Expand Down Expand Up @@ -76,39 +76,62 @@ func ToDomain(zSpans []*zipkincore.Span) (*model.Trace, error) {
// A valid model.Span is always returned, even when there are errors.
// The errors are more of an "fyi", describing issues in the data.
// TODO consider using different return type instead of `error`.
func ToDomainSpan(zSpan *zipkincore.Span) (*model.Span, error) {
func ToDomainSpan(zSpan *zipkincore.Span) ([]*model.Span, error) {
return toDomain{}.ToDomainSpan(zSpan)
}

type toDomain struct{}

func (td toDomain) ToDomain(zSpans []*zipkincore.Span) (*model.Trace, error) {
var errors []error
trace := &model.Trace{}
processes := newProcessHashtable()
trace := &model.Trace{}
for _, zSpan := range zSpans {
jSpan, err := td.ToDomainSpan(zSpan)
jSpans, err := td.ToDomainSpan(zSpan)
if err != nil {
errors = append(errors, err)
}
// remove duplicate Process instances
jSpan.Process = processes.add(jSpan.Process)
trace.Spans = append(trace.Spans, jSpan)
for _, jSpan := range jSpans {
// remove duplicate Process instances
jSpan.Process = processes.add(jSpan.Process)
trace.Spans = append(trace.Spans, jSpan)
}
}
return trace, multierror.Wrap(errors)
}

func (td toDomain) ToDomainSpan(zSpan *zipkincore.Span) (*model.Span, error) {
jSpan := td.transformSpan(zSpan)
func (td toDomain) ToDomainSpan(zSpan *zipkincore.Span) ([]*model.Span, error) {
jSpans := td.transformSpan(zSpan)
jProcess, err := td.generateProcess(zSpan)
jSpan.Process = jProcess
return jSpan, err
for _, jSpan := range jSpans {
jSpan.Process = jProcess
}
return jSpans, err
}

// isClientAndServerSpan determines whether span represents both client and server processing
func (td toDomain) isClientAndServerSpan(zSpan *zipkincore.Span) bool {
var csOk bool
var srOk bool
for _, ann := range zSpan.Annotations {
if ann.Value == "cs" {
csOk = true
} else if ann.Value == "sr" {
srOk = true
}
if csOk && srOk {
return true
}
}
return false
}

// transformSpan transforms a zipkin span into a Jaeger span
func (td toDomain) transformSpan(zSpan *zipkincore.Span) *model.Span {
func (td toDomain) transformSpan(zSpan *zipkincore.Span) []*model.Span {
tags := td.getTags(zSpan.BinaryAnnotations, td.isSpanTag)
if spanKindTag, ok := td.getSpanKindTag(zSpan.Annotations); ok {
var spanKindTag model.KeyValue
spanKindTag, ok := td.getSpanKindTag(zSpan.Annotations)
if ok {
tags = append(tags, spanKindTag)
}
var traceIDHigh, parentID int64
Expand All @@ -118,7 +141,8 @@ func (td toDomain) transformSpan(zSpan *zipkincore.Span) *model.Span {
if zSpan.ParentID != nil {
parentID = *zSpan.ParentID
}
return &model.Span{

result := []*model.Span{{
TraceID: model.TraceID{High: uint64(traceIDHigh), Low: uint64(zSpan.TraceID)},
SpanID: model.SpanID(zSpan.ID),
OperationName: zSpan.Name,
Expand All @@ -128,7 +152,25 @@ func (td toDomain) transformSpan(zSpan *zipkincore.Span) *model.Span {
Duration: model.MicrosecondsAsDuration(uint64(zSpan.GetDuration())),
Tags: tags,
Logs: td.getLogs(zSpan.Annotations),
}}

if td.isClientAndServerSpan(zSpan) {
// if the span is client and server we split it into two separate spans
s := &model.Span{
TraceID: model.TraceID{High: uint64(traceIDHigh), Low: uint64(zSpan.TraceID)},
SpanID: model.SpanID(zSpan.ID),
OperationName: zSpan.Name,
ParentSpanID: model.SpanID(parentID),
}
// if the first span is a client span we create server span and vice-versa.
if spanKindTag.VStr == string(ext.SpanKindRPCClient.Value.(ext.SpanKindEnum)) {
s.Tags = []model.KeyValue{model.String(ext.SpanKindRPCServer.Key, string(ext.SpanKindRPCServer.Value.(ext.SpanKindEnum)))}
} else {
s.Tags = []model.KeyValue{model.String(ext.SpanKindRPCClient.Key, string(ext.SpanKindRPCClient.Value.(ext.SpanKindEnum)))}
}
result = append(result, s)
}
return result
}

// getFlags takes a Zipkin Span and deduces the proper flags settings
Expand Down
46 changes: 43 additions & 3 deletions model/converter/thrift/zipkin/to_domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/uber/jaeger/model"
z "github.com/uber/jaeger/thrift-gen/zipkincore"
)
Expand Down Expand Up @@ -57,10 +59,12 @@ func TestToDomain(t *testing.T) {
if i == 1 {
t.Run("ToDomainSpan", func(t *testing.T) {
zSpan := zSpans[0]
jSpan, err := ToDomainSpan(zSpan)
jSpans, err := ToDomainSpan(zSpan)
assert.NoError(t, err)
jSpan.NormalizeTimestamps()
assert.Equal(t, expectedTrace.Spans[0], jSpan)
for _, jSpan := range jSpans {
jSpan.NormalizeTimestamps()
assert.Equal(t, expectedTrace.Spans[0], jSpan)
}
})
}
}
Expand All @@ -83,6 +87,42 @@ func TestToDomainServiceNameInBinAnnotation(t *testing.T) {
assert.Equal(t, "bar", trace.Spans[0].Process.ServiceName)
}

func TestToDomainMultipleSpanKinds(t *testing.T) {
tests := []struct {
json string
tagFirst opentracing.Tag
tagSecond opentracing.Tag
}{
{json: `[{ "trace_id": -1, "id": 31,
"annotations": [{"value": "cs", "host": {"service_name": "bar", "ipv4": 23456}},
{"value": "sr", "host": {"service_name": "bar", "ipv4": 23456}}] }]`,
tagFirst: ext.SpanKindRPCClient,
tagSecond: ext.SpanKindRPCServer,
},
{json: `[{ "trace_id": -1, "id": 31,
"annotations": [{"value": "sr", "host": {"service_name": "bar", "ipv4": 23456}},
{"value": "cs", "host": {"service_name": "bar", "ipv4": 23456}}] }]`,
tagFirst: ext.SpanKindRPCServer,
tagSecond: ext.SpanKindRPCClient,
},
}

for _, test := range tests {
fmt.Println(test.json)
trace, err := ToDomain(getZipkinSpans(t, test.json))
require.Nil(t, err)

assert.Equal(t, 2, len(trace.Spans))
assert.Equal(t, 1, trace.Spans[0].Tags.Len())
assert.Equal(t, test.tagFirst.Key, trace.Spans[0].Tags[0].Key)
assert.Equal(t, string(test.tagFirst.Value.(ext.SpanKindEnum)), trace.Spans[0].Tags[0].VStr)

assert.Equal(t, 1, trace.Spans[1].Tags.Len())
assert.Equal(t, test.tagSecond.Key, trace.Spans[1].Tags[0].Key)
assert.Equal(t, string(test.tagSecond.Value.(ext.SpanKindEnum)), trace.Spans[1].Tags[0].VStr)
}
}

func TestInvalidAnnotationTypeError(t *testing.T) {
_, err := toDomain{}.transformBinaryAnnotation(&z.BinaryAnnotation{
AnnotationType: -1,
Expand Down

0 comments on commit 9da8b4b

Please sign in to comment.