diff --git a/.chloggen/fix-tailsampling-lost-instrumentation-scope-info.yaml b/.chloggen/fix-tailsampling-lost-instrumentation-scope-info.yaml new file mode 100644 index 000000000000..f26ce6d2451b --- /dev/null +++ b/.chloggen/fix-tailsampling-lost-instrumentation-scope-info.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/tailsampling + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Added saving instrumentation library information for tail-sampling + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [13642] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index 8d22af8b667f..42f715dab669 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -52,6 +52,14 @@ type tailSamplingSpanProcessor struct { numTracesOnMap *atomic.Uint64 } +// spanAndScope a structure for holding information about span and its instrumentation scope. +// required for preserving the instrumentation library information while sampling. +// We use pointers there to fast find the span in the map. +type spanAndScope struct { + span *ptrace.Span + instrumentationScope *pcommon.InstrumentationScope +} + const ( sourceFormat = "tail_sampling" ) @@ -299,16 +307,21 @@ func (tsp *tailSamplingSpanProcessor) ConsumeTraces(_ context.Context, td ptrace return nil } -func (tsp *tailSamplingSpanProcessor) groupSpansByTraceKey(resourceSpans ptrace.ResourceSpans) map[pcommon.TraceID][]*ptrace.Span { - idToSpans := make(map[pcommon.TraceID][]*ptrace.Span) +func (tsp *tailSamplingSpanProcessor) groupSpansByTraceKey(resourceSpans ptrace.ResourceSpans) map[pcommon.TraceID][]spanAndScope { + idToSpans := make(map[pcommon.TraceID][]spanAndScope) ilss := resourceSpans.ScopeSpans() for j := 0; j < ilss.Len(); j++ { - spans := ilss.At(j).Spans() + scope := ilss.At(j) + spans := scope.Spans() + is := scope.Scope() spansLen := spans.Len() for k := 0; k < spansLen; k++ { span := spans.At(k) key := span.TraceID() - idToSpans[key] = append(idToSpans[key], &span) + idToSpans[key] = append(idToSpans[key], spanAndScope{ + span: &span, + instrumentationScope: &is, + }) } } return idToSpans @@ -316,9 +329,9 @@ func (tsp *tailSamplingSpanProcessor) groupSpansByTraceKey(resourceSpans ptrace. func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.ResourceSpans) { // Group spans per their traceId to minimize contention on idToTrace - idToSpans := tsp.groupSpansByTraceKey(resourceSpans) + idToSpansAndScope := tsp.groupSpansByTraceKey(resourceSpans) var newTraceIDs int64 - for id, spans := range idToSpans { + for id, spans := range idToSpansAndScope { lenSpans := int64(len(spans)) lenPolicies := len(tsp.policies) initialDecisions := make([]sampling.Decision, lenPolicies) @@ -422,12 +435,23 @@ func (tsp *tailSamplingSpanProcessor) dropTrace(traceID pcommon.TraceID, deletio stats.Record(tsp.ctx, statTraceRemovalAgeSec.M(int64(deletionTime.Sub(trace.ArrivalTime)/time.Second))) } -func appendToTraces(dest ptrace.Traces, rss ptrace.ResourceSpans, spans []*ptrace.Span) { +func appendToTraces(dest ptrace.Traces, rss ptrace.ResourceSpans, spanAndScopes []spanAndScope) { rs := dest.ResourceSpans().AppendEmpty() rss.Resource().CopyTo(rs.Resource()) - ils := rs.ScopeSpans().AppendEmpty() - for _, span := range spans { - sp := ils.Spans().AppendEmpty() - span.CopyTo(sp) + + scopePointerToNewScope := make(map[*pcommon.InstrumentationScope]*ptrace.ScopeSpans) + for _, spanAndScope := range spanAndScopes { + // If the scope of the spanAndScope is not in the map, add it to the map and the destination. + if scope, ok := scopePointerToNewScope[spanAndScope.instrumentationScope]; !ok { + is := rs.ScopeSpans().AppendEmpty() + spanAndScope.instrumentationScope.CopyTo(is.Scope()) + scopePointerToNewScope[spanAndScope.instrumentationScope] = &is + + sp := is.Spans().AppendEmpty() + spanAndScope.span.CopyTo(sp) + } else { + sp := scope.Spans().AppendEmpty() + spanAndScope.span.CopyTo(sp) + } } } diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index 6e0c7b65233c..0f567fef5e55 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -54,6 +54,111 @@ func (t *TestPolicyEvaluator) Evaluate(ctx context.Context, traceID pcommon.Trac return t.pe.Evaluate(ctx, traceID, trace) } +type spanInfo struct { + span ptrace.Span + resource pcommon.Resource + scope pcommon.InstrumentationScope +} + +func TestTraceIntegrity(t *testing.T) { + const spanCount = 4 + // Generate trace with several spans with different scopes + traces := ptrace.NewTraces() + spans := make(map[pcommon.SpanID]spanInfo, 0) + + // Fill resource + resourceSpans := traces.ResourceSpans().AppendEmpty() + resource := resourceSpans.Resource() + resourceSpans.Resource().Attributes().PutStr("key1", "value1") + resourceSpans.Resource().Attributes().PutInt("key2", 0) + + // Fill scopeSpans 1 + scopeSpans := resourceSpans.ScopeSpans().AppendEmpty() + scope := scopeSpans.Scope() + scopeSpans.Scope().SetName("scope1") + scopeSpans.Scope().Attributes().PutStr("key1", "value1") + scopeSpans.Scope().Attributes().PutInt("key2", 0) + + // Add spans to scopeSpans 1 + span := scopeSpans.Spans().AppendEmpty() + spanID := [8]byte{1, 2, 3, 4, 5, 6, 7, 8} + span.SetSpanID(pcommon.SpanID(spanID)) + span.SetTraceID(pcommon.TraceID([16]byte{1, 2, 3, 4})) + spans[spanID] = spanInfo{span: span, resource: resource, scope: scope} + + span = scopeSpans.Spans().AppendEmpty() + spanID = [8]byte{9, 10, 11, 12, 13, 14, 15, 16} + span.SetSpanID(pcommon.SpanID(spanID)) + span.SetTraceID(pcommon.TraceID([16]byte{5, 6, 7, 8})) + spans[spanID] = spanInfo{span: span, resource: resource, scope: scope} + + // Fill scopeSpans 2 + scopeSpans = resourceSpans.ScopeSpans().AppendEmpty() + scope = scopeSpans.Scope() + scopeSpans.Scope().SetName("scope2") + scopeSpans.Scope().Attributes().PutStr("key1", "value1") + scopeSpans.Scope().Attributes().PutInt("key2", 0) + + // Add spans to scopeSpans 2 + span = scopeSpans.Spans().AppendEmpty() + spanID = [8]byte{17, 18, 19, 20, 21, 22, 23, 24} + span.SetSpanID(pcommon.SpanID(spanID)) + span.SetTraceID(pcommon.TraceID([16]byte{9, 10, 11, 12})) + spans[spanID] = spanInfo{span: span, resource: resource, scope: scope} + + span = scopeSpans.Spans().AppendEmpty() + spanID = [8]byte{25, 26, 27, 28, 29, 30, 31, 32} + span.SetSpanID(pcommon.SpanID(spanID)) + span.SetTraceID(pcommon.TraceID([16]byte{13, 14, 15, 16})) + spans[spanID] = spanInfo{span: span, resource: resource, scope: scope} + + require.Equal(t, spanCount, len(spans)) + + msp := new(consumertest.TracesSink) + mpe := &mockPolicyEvaluator{} + mtt := &manualTTicker{} + tsp := &tailSamplingSpanProcessor{ + ctx: context.Background(), + nextConsumer: msp, + maxNumTraces: spanCount, + logger: zap.NewNop(), + decisionBatcher: newSyncIDBatcher(1), + policies: []*policy{{name: "mock-policy", evaluator: mpe, ctx: context.TODO()}}, + deleteChan: make(chan pcommon.TraceID, spanCount), + policyTicker: mtt, + tickerFrequency: 100 * time.Millisecond, + numTracesOnMap: &atomic.Uint64{}, + } + require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + require.NoError(t, tsp.Shutdown(context.Background())) + }() + + require.NoError(t, tsp.ConsumeTraces(context.Background(), traces)) + + tsp.samplingPolicyOnTick() + mpe.NextDecision = sampling.Sampled + tsp.samplingPolicyOnTick() + + consumed := msp.AllTraces() + require.Equal(t, 4, len(consumed)) + for _, trace := range consumed { + require.Equal(t, 1, trace.SpanCount()) + require.Equal(t, 1, trace.ResourceSpans().Len()) + require.Equal(t, 1, trace.ResourceSpans().At(0).ScopeSpans().Len()) + require.Equal(t, 1, trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().Len()) + + span := trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) + if spanInfo, ok := spans[span.SpanID()]; ok { + require.Equal(t, spanInfo.span, span) + require.Equal(t, spanInfo.resource, trace.ResourceSpans().At(0).Resource()) + require.Equal(t, spanInfo.scope, trace.ResourceSpans().At(0).ScopeSpans().At(0).Scope()) + } else { + require.Fail(t, "Span not found") + } + } +} + func TestSequentialTraceArrival(t *testing.T) { traceIds, batches := generateIdsAndBatches(128) cfg := Config{