diff --git a/processor/tailsamplingprocessor/internal/sampling/and_test.go b/processor/tailsamplingprocessor/internal/sampling/and_test.go index 49bc55808502..13c3a9e9fb91 100644 --- a/processor/tailsamplingprocessor/internal/sampling/and_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/and_test.go @@ -42,7 +42,7 @@ func TestAndEvaluatorNotSampled(t *testing.T) { span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) trace := &TraceData{ - ReceivedBatches: []ptrace.Traces{traces}, + ReceivedBatches: traces, } decision, err := and.Evaluate(traceID, trace) require.NoError(t, err, "Failed to evaluate and policy: %v", err) @@ -70,7 +70,7 @@ func TestAndEvaluatorSampled(t *testing.T) { span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) trace := &TraceData{ - ReceivedBatches: []ptrace.Traces{traces}, + ReceivedBatches: traces, } decision, err := and.Evaluate(traceID, trace) require.NoError(t, err, "Failed to evaluate and policy: %v", err) @@ -98,7 +98,7 @@ func TestAndEvaluatorStringInvertSampled(t *testing.T) { span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) trace := &TraceData{ - ReceivedBatches: []ptrace.Traces{traces}, + ReceivedBatches: traces, } decision, err := and.Evaluate(traceID, trace) require.NoError(t, err, "Failed to evaluate and policy: %v", err) @@ -126,7 +126,7 @@ func TestAndEvaluatorStringInvertNotSampled(t *testing.T) { span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) trace := &TraceData{ - ReceivedBatches: []ptrace.Traces{traces}, + ReceivedBatches: traces, } decision, err := and.Evaluate(traceID, trace) require.NoError(t, err, "Failed to evaluate and policy: %v", err) diff --git a/processor/tailsamplingprocessor/internal/sampling/composite_test.go b/processor/tailsamplingprocessor/internal/sampling/composite_test.go index c99c513b1b1d..d0da5aa80513 100644 --- a/processor/tailsamplingprocessor/internal/sampling/composite_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/composite_test.go @@ -36,17 +36,11 @@ func (f FakeTimeProvider) getCurSecond() int64 { var traceID = pcommon.TraceID([16]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x52, 0x96, 0x9A, 0x89, 0x55, 0x57, 0x1A, 0x3F}) func createTrace() *TraceData { - trace := &TraceData{SpanCount: atomic.NewInt64(1)} + trace := &TraceData{SpanCount: atomic.NewInt64(1), ReceivedBatches: ptrace.NewTraces()} return trace } -func newTraceID() pcommon.TraceID { - r := [16]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x52, 0x96, 0x9A, 0x89, 0x55, 0x57, 0x1A, 0x3F} - return pcommon.TraceID(r) -} - func newTraceWithKV(traceID pcommon.TraceID, key string, val int64) *TraceData { - var traceBatches []ptrace.Traces traces := ptrace.NewTraces() rs := traces.ResourceSpans().AppendEmpty() ils := rs.ScopeSpans().AppendEmpty() @@ -61,9 +55,8 @@ func newTraceWithKV(traceID pcommon.TraceID, key string, val int64) *TraceData { )) span.Attributes().PutInt(key, val) - traceBatches = append(traceBatches, traces) return &TraceData{ - ReceivedBatches: traceBatches, + ReceivedBatches: traces, SpanCount: atomic.NewInt64(1), } } @@ -112,29 +105,26 @@ func TestCompositeEvaluator_OverflowAlwaysSampled(t *testing.T) { n2 := NewAlwaysSample(zap.NewNop()) c := NewComposite(zap.NewNop(), 3, []SubPolicyEvalParams{{n1, 1}, {n2, 1}}, timeProvider) - trcID := newTraceID() - trace := newTraceWithKV(trcID, "tag", int64(10)) + trace := newTraceWithKV(traceID, "tag", int64(10)) - decision, err := c.Evaluate(trcID, trace) + decision, err := c.Evaluate(traceID, trace) require.NoError(t, err, "Failed to evaluate composite policy: %v", err) // The first policy is NewNumericAttributeFilter and trace tag matches criteria, so the decision should be Sampled. expected := Sampled assert.Equal(t, decision, expected) - trcID = newTraceID() - trace = newTraceWithKV(trcID, "tag", int64(11)) + trace = newTraceWithKV(traceID, "tag", int64(11)) - decision, err = c.Evaluate(trcID, trace) + decision, err = c.Evaluate(traceID, trace) require.NoError(t, err, "Failed to evaluate composite policy: %v", err) // The first policy is NewNumericAttributeFilter and trace tag matches criteria, so the decision should be Sampled. expected = NotSampled assert.Equal(t, decision, expected) - trcID = newTraceID() - trace = newTraceWithKV(trcID, "tag", int64(1001)) - decision, err = c.Evaluate(trcID, trace) + trace = newTraceWithKV(traceID, "tag", int64(1001)) + decision, err = c.Evaluate(traceID, trace) require.NoError(t, err, "Failed to evaluate composite policy: %v", err) // The first policy fails as the tag value is higher than the range set where as the second policy is AlwaysSample, so the decision should be Sampled. diff --git a/processor/tailsamplingprocessor/internal/sampling/latency_test.go b/processor/tailsamplingprocessor/internal/sampling/latency_test.go index 8642bd335ea7..2401ea24ae2b 100644 --- a/processor/tailsamplingprocessor/internal/sampling/latency_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/latency_test.go @@ -87,7 +87,6 @@ type spanWithTimeAndDuration struct { } func newTraceWithSpans(spans []spanWithTimeAndDuration) *TraceData { - var traceBatches []ptrace.Traces traces := ptrace.NewTraces() rs := traces.ResourceSpans().AppendEmpty() ils := rs.ScopeSpans().AppendEmpty() @@ -100,8 +99,7 @@ func newTraceWithSpans(spans []spanWithTimeAndDuration) *TraceData { span.SetEndTimestamp(pcommon.NewTimestampFromTime(s.StartTime.Add(s.Duration))) } - traceBatches = append(traceBatches, traces) return &TraceData{ - ReceivedBatches: traceBatches, + ReceivedBatches: traces, } } diff --git a/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter_test.go b/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter_test.go index 9d1d95ddcc57..b3c97d2cbc37 100644 --- a/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter_test.go @@ -76,7 +76,6 @@ func TestNumericTagFilter(t *testing.T) { } func newTraceIntAttrs(nodeAttrs map[string]interface{}, spanAttrKey string, spanAttrValue int64) *TraceData { - var traceBatches []ptrace.Traces traces := ptrace.NewTraces() rs := traces.ResourceSpans().AppendEmpty() rs.Resource().Attributes().FromRaw(nodeAttrs) @@ -85,8 +84,7 @@ func newTraceIntAttrs(nodeAttrs map[string]interface{}, spanAttrKey string, span span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) span.Attributes().PutInt(spanAttrKey, spanAttrValue) - traceBatches = append(traceBatches, traces) return &TraceData{ - ReceivedBatches: traceBatches, + ReceivedBatches: traces, } } diff --git a/processor/tailsamplingprocessor/internal/sampling/policy.go b/processor/tailsamplingprocessor/internal/sampling/policy.go index a85767a15c5f..6dc45442a0e0 100644 --- a/processor/tailsamplingprocessor/internal/sampling/policy.go +++ b/processor/tailsamplingprocessor/internal/sampling/policy.go @@ -35,7 +35,7 @@ type TraceData struct { // SpanCount track the number of spans on the trace. SpanCount *atomic.Int64 // ReceivedBatches stores all the batches received for the trace. - ReceivedBatches []ptrace.Traces + ReceivedBatches ptrace.Traces } // Decision gives the status of sampling decision. diff --git a/processor/tailsamplingprocessor/internal/sampling/span_count_sampler_test.go b/processor/tailsamplingprocessor/internal/sampling/span_count_sampler_test.go index 1f617976f4f0..34de276e8290 100644 --- a/processor/tailsamplingprocessor/internal/sampling/span_count_sampler_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/span_count_sampler_test.go @@ -96,13 +96,12 @@ func TestEvaluate_NumberSpans(t *testing.T) { } func newTraceWithMultipleSpans(numberSpans []int32) *TraceData { - var traceBatches []ptrace.Traces var totalNumberSpans = int32(0) - // For each trace, going to create the number of spans defined in the array + // For each resource, going to create the number of spans defined in the array + traces := ptrace.NewTraces() for i := range numberSpans { // Creates trace - traces := ptrace.NewTraces() rs := traces.ResourceSpans().AppendEmpty() ils := rs.ScopeSpans().AppendEmpty() @@ -111,12 +110,11 @@ func newTraceWithMultipleSpans(numberSpans []int32) *TraceData { span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) } - traceBatches = append(traceBatches, traces) totalNumberSpans += numberSpans[i] } return &TraceData{ - ReceivedBatches: traceBatches, + ReceivedBatches: traces, SpanCount: atomic.NewInt64(int64(totalNumberSpans)), } } diff --git a/processor/tailsamplingprocessor/internal/sampling/status_code_test.go b/processor/tailsamplingprocessor/internal/sampling/status_code_test.go index 738bf5bf135b..e36ff582b36e 100644 --- a/processor/tailsamplingprocessor/internal/sampling/status_code_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/status_code_test.go @@ -80,7 +80,7 @@ func TestStatusCodeSampling(t *testing.T) { } trace := &TraceData{ - ReceivedBatches: []ptrace.Traces{traces}, + ReceivedBatches: traces, } statusCodeFilter, err := NewStatusCodeFilter(zap.NewNop(), c.StatusCodesToFilterOn) diff --git a/processor/tailsamplingprocessor/internal/sampling/string_tag_filter_test.go b/processor/tailsamplingprocessor/internal/sampling/string_tag_filter_test.go index 5b7eb61b17d2..f6319d787ebf 100644 --- a/processor/tailsamplingprocessor/internal/sampling/string_tag_filter_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/string_tag_filter_test.go @@ -241,7 +241,6 @@ func BenchmarkStringTagFilterEvaluateRegex(b *testing.B) { } func newTraceStringAttrs(nodeAttrs map[string]interface{}, spanAttrKey string, spanAttrValue string) *TraceData { - var traceBatches []ptrace.Traces traces := ptrace.NewTraces() rs := traces.ResourceSpans().AppendEmpty() rs.Resource().Attributes().FromRaw(nodeAttrs) @@ -250,8 +249,7 @@ func newTraceStringAttrs(nodeAttrs map[string]interface{}, spanAttrKey string, s span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) span.Attributes().PutString(spanAttrKey, spanAttrValue) - traceBatches = append(traceBatches, traces) return &TraceData{ - ReceivedBatches: traceBatches, + ReceivedBatches: traces, } } diff --git a/processor/tailsamplingprocessor/internal/sampling/trace_state_filter_test.go b/processor/tailsamplingprocessor/internal/sampling/trace_state_filter_test.go index 569b38514db1..8b65069e1c13 100644 --- a/processor/tailsamplingprocessor/internal/sampling/trace_state_filter_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/trace_state_filter_test.go @@ -98,7 +98,6 @@ func TestTraceStateFilter(t *testing.T) { } func newTraceState(traceState string) *TraceData { - var traceBatches []ptrace.Traces traces := ptrace.NewTraces() rs := traces.ResourceSpans().AppendEmpty() ils := rs.ScopeSpans().AppendEmpty() @@ -106,8 +105,7 @@ func newTraceState(traceState string) *TraceData { span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) span.TraceState().FromRaw(traceState) - traceBatches = append(traceBatches, traces) return &TraceData{ - ReceivedBatches: traceBatches, + ReceivedBatches: traces, } } diff --git a/processor/tailsamplingprocessor/internal/sampling/util.go b/processor/tailsamplingprocessor/internal/sampling/util.go index a72a7bfe0a26..b45f2c5552ec 100644 --- a/processor/tailsamplingprocessor/internal/sampling/util.go +++ b/processor/tailsamplingprocessor/internal/sampling/util.go @@ -22,24 +22,20 @@ import ( // hasResourceOrSpanWithCondition iterates through all the resources and instrumentation library spans until any // callback returns true. func hasResourceOrSpanWithCondition( - batches []ptrace.Traces, + td ptrace.Traces, shouldSampleResource func(resource pcommon.Resource) bool, shouldSampleSpan func(span ptrace.Span) bool, ) Decision { - for _, batch := range batches { - rspans := batch.ResourceSpans() + for i := 0; i < td.ResourceSpans().Len(); i++ { + rs := td.ResourceSpans().At(i) - for i := 0; i < rspans.Len(); i++ { - rs := rspans.At(i) - - resource := rs.Resource() - if shouldSampleResource(resource) { - return Sampled - } + resource := rs.Resource() + if shouldSampleResource(resource) { + return Sampled + } - if hasInstrumentationLibrarySpanWithCondition(rs.ScopeSpans(), shouldSampleSpan) { - return Sampled - } + if hasInstrumentationLibrarySpanWithCondition(rs.ScopeSpans(), shouldSampleSpan) { + return Sampled } } return NotSampled @@ -48,40 +44,32 @@ func hasResourceOrSpanWithCondition( // invertHasResourceOrSpanWithCondition iterates through all the resources and instrumentation library spans until any // callback returns false. func invertHasResourceOrSpanWithCondition( - batches []ptrace.Traces, + td ptrace.Traces, shouldSampleResource func(resource pcommon.Resource) bool, shouldSampleSpan func(span ptrace.Span) bool, ) Decision { - for _, batch := range batches { - rspans := batch.ResourceSpans() - - for i := 0; i < rspans.Len(); i++ { - rs := rspans.At(i) + for i := 0; i < td.ResourceSpans().Len(); i++ { + rs := td.ResourceSpans().At(i) - resource := rs.Resource() - if !shouldSampleResource(resource) { - return InvertNotSampled - } + resource := rs.Resource() + if !shouldSampleResource(resource) { + return InvertNotSampled + } - if !invertHasInstrumentationLibrarySpanWithCondition(rs.ScopeSpans(), shouldSampleSpan) { - return InvertNotSampled - } + if !invertHasInstrumentationLibrarySpanWithCondition(rs.ScopeSpans(), shouldSampleSpan) { + return InvertNotSampled } } return InvertSampled } // hasSpanWithCondition iterates through all the instrumentation library spans until any callback returns true. -func hasSpanWithCondition(batches []ptrace.Traces, shouldSample func(span ptrace.Span) bool) Decision { - for _, batch := range batches { - rspans := batch.ResourceSpans() +func hasSpanWithCondition(td ptrace.Traces, shouldSample func(span ptrace.Span) bool) Decision { + for i := 0; i < td.ResourceSpans().Len(); i++ { + rs := td.ResourceSpans().At(i) - for i := 0; i < rspans.Len(); i++ { - rs := rspans.At(i) - - if hasInstrumentationLibrarySpanWithCondition(rs.ScopeSpans(), shouldSample) { - return Sampled - } + if hasInstrumentationLibrarySpanWithCondition(rs.ScopeSpans(), shouldSample) { + return Sampled } } return NotSampled diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index 41756b7835e4..606ea94fd7fc 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -184,20 +184,11 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() { // Sampled or not, remove the batches trace.Lock() - traceBatches := trace.ReceivedBatches - trace.ReceivedBatches = nil + allSpans := ptrace.NewTraces() + trace.ReceivedBatches.MoveTo(allSpans) trace.Unlock() if decision == sampling.Sampled { - - // Combine all individual batches into a single batch so - // consumers may operate on the entire trace - allSpans := ptrace.NewTraces() - for j := 0; j < len(traceBatches); j++ { - batch := traceBatches[j] - batch.ResourceSpans().MoveAndAppendTo(allSpans.ResourceSpans()) - } - _ = tsp.nextConsumer.ConsumeTraces(policy.ctx, allSpans) } } @@ -301,7 +292,7 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa return finalDecision, matchingPolicy } -// ConsumeTraceData is required by the SpanProcessor interface. +// ConsumeTraces is required by the component.TracesProcessor interface. func (tsp *tailSamplingSpanProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { resourceSpans := td.ResourceSpans() for i := 0; i < resourceSpans.Len(); i++ { @@ -336,13 +327,15 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc for i := 0; i < lenPolicies; i++ { initialDecisions[i] = sampling.Pending } - initialTraceData := &sampling.TraceData{ - Decisions: initialDecisions, - ArrivalTime: time.Now(), - SpanCount: atomic.NewInt64(lenSpans), + d, loaded := tsp.idToTrace.Load(id) + if !loaded { + d, loaded = tsp.idToTrace.LoadOrStore(id, &sampling.TraceData{ + Decisions: initialDecisions, + ArrivalTime: time.Now(), + SpanCount: atomic.NewInt64(lenSpans), + ReceivedBatches: ptrace.NewTraces(), + }) } - d, loaded := tsp.idToTrace.LoadOrStore(id, initialTraceData) - actualData := d.(*sampling.TraceData) if loaded { actualData.SpanCount.Add(lenSpans) @@ -364,7 +357,6 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc } for i, p := range tsp.policies { - var traceTd ptrace.Traces actualData.Lock() actualDecision := actualData.Decisions[i] // If decision is pending, we want to add the new spans still under the lock, so the decision doesn't happen @@ -372,8 +364,7 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc if actualDecision == sampling.Pending { // Add the spans to the trace, but only once for all policy, otherwise same spans will // be duplicated in the final trace. - traceTd = prepareTraceBatch(resourceSpans, spans) - actualData.ReceivedBatches = append(actualData.ReceivedBatches, traceTd) + appendToTraces(actualData.ReceivedBatches, resourceSpans, spans) actualData.Unlock() break } @@ -382,7 +373,8 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc switch actualDecision { case sampling.Sampled: // Forward the spans to the policy destinations - traceTd := prepareTraceBatch(resourceSpans, spans) + traceTd := ptrace.NewTraces() + appendToTraces(traceTd, resourceSpans, spans) if err := tsp.nextConsumer.ConsumeTraces(p.ctx, traceTd); err != nil { tsp.logger.Warn("Error sending late arrived spans to destination", zap.String("policy", p.name), @@ -441,14 +433,12 @@ func (tsp *tailSamplingSpanProcessor) dropTrace(traceID pcommon.TraceID, deletio stats.Record(tsp.ctx, statTraceRemovalAgeSec.M(int64(deletionTime.Sub(trace.ArrivalTime)/time.Second))) } -func prepareTraceBatch(rss ptrace.ResourceSpans, spans []*ptrace.Span) ptrace.Traces { - traceTd := ptrace.NewTraces() - rs := traceTd.ResourceSpans().AppendEmpty() +func appendToTraces(dest ptrace.Traces, rss ptrace.ResourceSpans, spans []*ptrace.Span) { + rs := dest.ResourceSpans().AppendEmpty() rss.Resource().CopyTo(rs.Resource()) ils := rs.ScopeSpans().AppendEmpty() for _, span := range spans { sp := ils.Spans().AppendEmpty() span.CopyTo(sp) } - return traceTd }