Skip to content

Commit

Permalink
[processor/tailsampling] fix tailsampling lost instrumentation scope …
Browse files Browse the repository at this point in the history
…info
  • Loading branch information
siriusfreak committed Aug 11, 2023
1 parent ffd4eeb commit 213a300
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 11 deletions.
46 changes: 35 additions & 11 deletions processor/tailsamplingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ type tailSamplingSpanProcessor struct {
numTracesOnMap *atomic.Uint64
}

// spanAndScope a structure for holding information about span and its instrumentation scope.
// required for preserving the instrumentation library information while sampling.
// We use pointers there to fast find the span in the map.
type spanAndScope struct {
span *ptrace.Span
instrumentationScope *pcommon.InstrumentationScope
}

const (
sourceFormat = "tail_sampling"
)
Expand Down Expand Up @@ -299,26 +307,31 @@ func (tsp *tailSamplingSpanProcessor) ConsumeTraces(_ context.Context, td ptrace
return nil
}

func (tsp *tailSamplingSpanProcessor) groupSpansByTraceKey(resourceSpans ptrace.ResourceSpans) map[pcommon.TraceID][]*ptrace.Span {
idToSpans := make(map[pcommon.TraceID][]*ptrace.Span)
func (tsp *tailSamplingSpanProcessor) groupSpansByTraceKey(resourceSpans ptrace.ResourceSpans) map[pcommon.TraceID][]spanAndScope {
idToSpans := make(map[pcommon.TraceID][]spanAndScope)
ilss := resourceSpans.ScopeSpans()
for j := 0; j < ilss.Len(); j++ {
spans := ilss.At(j).Spans()
scope := ilss.At(j)
spans := scope.Spans()
is := scope.Scope()
spansLen := spans.Len()
for k := 0; k < spansLen; k++ {
span := spans.At(k)
key := span.TraceID()
idToSpans[key] = append(idToSpans[key], &span)
idToSpans[key] = append(idToSpans[key], spanAndScope{
span: &span,
instrumentationScope: &is,
})
}
}
return idToSpans
}

func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.ResourceSpans) {
// Group spans per their traceId to minimize contention on idToTrace
idToSpans := tsp.groupSpansByTraceKey(resourceSpans)
idToSpansAndScope := tsp.groupSpansByTraceKey(resourceSpans)
var newTraceIDs int64
for id, spans := range idToSpans {
for id, spans := range idToSpansAndScope {
lenSpans := int64(len(spans))
lenPolicies := len(tsp.policies)
initialDecisions := make([]sampling.Decision, lenPolicies)
Expand Down Expand Up @@ -422,12 +435,23 @@ func (tsp *tailSamplingSpanProcessor) dropTrace(traceID pcommon.TraceID, deletio
stats.Record(tsp.ctx, statTraceRemovalAgeSec.M(int64(deletionTime.Sub(trace.ArrivalTime)/time.Second)))
}

func appendToTraces(dest ptrace.Traces, rss ptrace.ResourceSpans, spans []*ptrace.Span) {
func appendToTraces(dest ptrace.Traces, rss ptrace.ResourceSpans, spanAndScopes []spanAndScope) {
rs := dest.ResourceSpans().AppendEmpty()
rss.Resource().CopyTo(rs.Resource())
ils := rs.ScopeSpans().AppendEmpty()
for _, span := range spans {
sp := ils.Spans().AppendEmpty()
span.CopyTo(sp)

scopePointerToNewScope := make(map[*pcommon.InstrumentationScope]*ptrace.ScopeSpans)
for _, spanAndScope := range spanAndScopes {
// If the scope of the spanAndScope is not in the map, add it to the map and the destination.
if scope, ok := scopePointerToNewScope[spanAndScope.instrumentationScope]; !ok {
is := rs.ScopeSpans().AppendEmpty()
spanAndScope.instrumentationScope.CopyTo(is.Scope())
scopePointerToNewScope[spanAndScope.instrumentationScope] = &is

sp := is.Spans().AppendEmpty()
spanAndScope.span.CopyTo(sp)
} else {
sp := scope.Spans().AppendEmpty()
spanAndScope.span.CopyTo(sp)
}
}
}
105 changes: 105 additions & 0 deletions processor/tailsamplingprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,111 @@ func (t *TestPolicyEvaluator) Evaluate(ctx context.Context, traceID pcommon.Trac
return t.pe.Evaluate(ctx, traceID, trace)
}

type spanInfo struct {
span ptrace.Span
resource pcommon.Resource
scope pcommon.InstrumentationScope
}

func TestTraceIntegrity(t *testing.T) {
const spanCount = 4
// Generate trace with several spans with different scopes
traces := ptrace.NewTraces()
spans := make(map[pcommon.SpanID]spanInfo, 0)

// Fill resource
resourceSpans := traces.ResourceSpans().AppendEmpty()
resource := resourceSpans.Resource()
resourceSpans.Resource().Attributes().PutStr("key1", "value1")
resourceSpans.Resource().Attributes().PutInt("key2", 0)

// Fill scopeSpans 1
scopeSpans := resourceSpans.ScopeSpans().AppendEmpty()
scope := scopeSpans.Scope()
scopeSpans.Scope().SetName("scope1")
scopeSpans.Scope().Attributes().PutStr("key1", "value1")
scopeSpans.Scope().Attributes().PutInt("key2", 0)

// Add spans to scopeSpans 1
span := scopeSpans.Spans().AppendEmpty()
spanID := [8]byte{1, 2, 3, 4, 5, 6, 7, 8}
span.SetSpanID(pcommon.SpanID(spanID))
span.SetTraceID(pcommon.TraceID([16]byte{1, 2, 3, 4}))
spans[spanID] = spanInfo{span: span, resource: resource, scope: scope}

span = scopeSpans.Spans().AppendEmpty()
spanID = [8]byte{9, 10, 11, 12, 13, 14, 15, 16}
span.SetSpanID(pcommon.SpanID(spanID))
span.SetTraceID(pcommon.TraceID([16]byte{5, 6, 7, 8}))
spans[spanID] = spanInfo{span: span, resource: resource, scope: scope}

// Fill scopeSpans 2
scopeSpans = resourceSpans.ScopeSpans().AppendEmpty()
scope = scopeSpans.Scope()
scopeSpans.Scope().SetName("scope2")
scopeSpans.Scope().Attributes().PutStr("key1", "value1")
scopeSpans.Scope().Attributes().PutInt("key2", 0)

// Add spans to scopeSpans 2
span = scopeSpans.Spans().AppendEmpty()
spanID = [8]byte{17, 18, 19, 20, 21, 22, 23, 24}
span.SetSpanID(pcommon.SpanID(spanID))
span.SetTraceID(pcommon.TraceID([16]byte{9, 10, 11, 12}))
spans[spanID] = spanInfo{span: span, resource: resource, scope: scope}

span = scopeSpans.Spans().AppendEmpty()
spanID = [8]byte{25, 26, 27, 28, 29, 30, 31, 32}
span.SetSpanID(pcommon.SpanID(spanID))
span.SetTraceID(pcommon.TraceID([16]byte{13, 14, 15, 16}))
spans[spanID] = spanInfo{span: span, resource: resource, scope: scope}

require.Equal(t, spanCount, len(spans))

msp := new(consumertest.TracesSink)
mpe := &mockPolicyEvaluator{}
mtt := &manualTTicker{}
tsp := &tailSamplingSpanProcessor{
ctx: context.Background(),
nextConsumer: msp,
maxNumTraces: spanCount,
logger: zap.NewNop(),
decisionBatcher: newSyncIDBatcher(1),
policies: []*policy{{name: "mock-policy", evaluator: mpe, ctx: context.TODO()}},
deleteChan: make(chan pcommon.TraceID, spanCount),
policyTicker: mtt,
tickerFrequency: 100 * time.Millisecond,
numTracesOnMap: &atomic.Uint64{},
}
require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
require.NoError(t, tsp.Shutdown(context.Background()))
}()

require.NoError(t, tsp.ConsumeTraces(context.Background(), traces))

tsp.samplingPolicyOnTick()
mpe.NextDecision = sampling.Sampled
tsp.samplingPolicyOnTick()

consumed := msp.AllTraces()
require.Equal(t, 4, len(consumed))
for _, trace := range consumed {
require.Equal(t, 1, trace.SpanCount())
require.Equal(t, 1, trace.ResourceSpans().Len())
require.Equal(t, 1, trace.ResourceSpans().At(0).ScopeSpans().Len())
require.Equal(t, 1, trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().Len())

span := trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)
if spanInfo, ok := spans[span.SpanID()]; ok {
require.Equal(t, spanInfo.span, span)
require.Equal(t, spanInfo.resource, trace.ResourceSpans().At(0).Resource())
require.Equal(t, spanInfo.scope, trace.ResourceSpans().At(0).ScopeSpans().At(0).Scope())
} else {
require.Fail(t, "Span not found")
}
}
}

func TestSequentialTraceArrival(t *testing.T) {
traceIds, batches := generateIdsAndBatches(128)
cfg := Config{
Expand Down

0 comments on commit 213a300

Please sign in to comment.