From 88ce450b282ecaf005b7c1cb0f6f7153600a03e8 Mon Sep 17 00:00:00 2001 From: Silvia Mitter Date: Tue, 12 Oct 2021 09:47:46 +0200 Subject: [PATCH] tail sampling: Change monitoring metrics (#6273) Introduce sampled and head_unsampled; and adapt logic for dropped to only contain trace events. implements #6269 (cherry picked from commit 17d16e89c4610e9da423448cf363e9429d7147f4) # Conflicts: # changelogs/head.asciidoc --- x-pack/apm-server/sampling/processor.go | 19 ++++++++++++++++--- x-pack/apm-server/sampling/processor_test.go | 14 +++++++++++--- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/x-pack/apm-server/sampling/processor.go b/x-pack/apm-server/sampling/processor.go index f61c359d09c..811256801a5 100644 --- a/x-pack/apm-server/sampling/processor.go +++ b/x-pack/apm-server/sampling/processor.go @@ -56,9 +56,11 @@ type Processor struct { } type eventMetrics struct { - processed int64 - dropped int64 - stored int64 + processed int64 + dropped int64 + stored int64 + sampled int64 + headUnsampled int64 } // NewProcessor returns a new Processor, for tail-sampling trace events. @@ -117,6 +119,8 @@ func (p *Processor) CollectMonitoring(_ monitoring.Mode, V monitoring.Visitor) { monitoring.ReportInt(V, "processed", atomic.LoadInt64(&p.eventMetrics.processed)) monitoring.ReportInt(V, "dropped", atomic.LoadInt64(&p.eventMetrics.dropped)) monitoring.ReportInt(V, "stored", atomic.LoadInt64(&p.eventMetrics.stored)) + monitoring.ReportInt(V, "sampled", atomic.LoadInt64(&p.eventMetrics.sampled)) + monitoring.ReportInt(V, "head_unsampled", atomic.LoadInt64(&p.eventMetrics.headUnsampled)) }) } @@ -156,6 +160,8 @@ func (p *Processor) ProcessBatch(ctx context.Context, batch *model.Batch) error if err != nil { return err } + default: + continue } if !report { // We shouldn't report this event, so remove it from the slice. @@ -164,6 +170,7 @@ func (p *Processor) ProcessBatch(ctx context.Context, batch *model.Batch) error events = events[:n-1] i-- } + p.updateProcessorMetrics(report, stored) } *batch = events @@ -191,6 +198,7 @@ func (p *Processor) processTransaction(event *model.APMEvent) (report, stored bo if !event.Transaction.Sampled { // (Head-based) unsampled transactions are passed through // by the tail sampler. + atomic.AddInt64(&p.eventMetrics.headUnsampled, 1) return true, false, nil } @@ -200,6 +208,9 @@ func (p *Processor) processTransaction(event *model.APMEvent) (report, stored bo // Tail-sampling decision has been made: report the transaction // if it was sampled. report := traceSampled + if report { + atomic.AddInt64(&p.eventMetrics.sampled, 1) + } return report, false, nil case eventstorage.ErrNotFound: // Tail-sampling decision has not yet been made. @@ -263,6 +274,7 @@ func (p *Processor) processSpan(event *model.APMEvent) (report, stored bool, _ e if !traceSampled { return false, false, nil } + atomic.AddInt64(&p.eventMetrics.sampled, 1) return true, false, nil } @@ -460,6 +472,7 @@ func (p *Processor) Run() error { } } } + atomic.AddInt64(&p.eventMetrics.sampled, int64(len(events))) if err := p.config.BatchProcessor.ProcessBatch(ctx, &events); err != nil { p.logger.With(logp.Error(err)).Warn("failed to report events") } diff --git a/x-pack/apm-server/sampling/processor_test.go b/x-pack/apm-server/sampling/processor_test.go index 4570b224720..1b3c7921132 100644 --- a/x-pack/apm-server/sampling/processor_test.go +++ b/x-pack/apm-server/sampling/processor_test.go @@ -119,7 +119,9 @@ func TestProcessAlreadyTailSampled(t *testing.T) { expectedMonitoring := monitoring.MakeFlatSnapshot() expectedMonitoring.Ints["sampling.events.processed"] = 4 + expectedMonitoring.Ints["sampling.events.head_unsampled"] = 0 expectedMonitoring.Ints["sampling.events.stored"] = 2 + expectedMonitoring.Ints["sampling.events.sampled"] = 2 expectedMonitoring.Ints["sampling.events.dropped"] = 0 assertMonitoring(t, processor, expectedMonitoring, `sampling.events.*`) @@ -223,6 +225,8 @@ func TestProcessLocalTailSampling(t *testing.T) { expectedMonitoring := monitoring.MakeFlatSnapshot() expectedMonitoring.Ints["sampling.events.processed"] = 4 expectedMonitoring.Ints["sampling.events.stored"] = 4 + expectedMonitoring.Ints["sampling.events.sampled"] = 2 + expectedMonitoring.Ints["sampling.events.head_unsampled"] = 0 expectedMonitoring.Ints["sampling.events.dropped"] = 0 assertMonitoring(t, processor, expectedMonitoring, `sampling.events.*`) @@ -440,6 +444,8 @@ func TestProcessRemoteTailSampling(t *testing.T) { expectedMonitoring := monitoring.MakeFlatSnapshot() expectedMonitoring.Ints["sampling.events.processed"] = 1 expectedMonitoring.Ints["sampling.events.stored"] = 1 + expectedMonitoring.Ints["sampling.events.sampled"] = 1 + expectedMonitoring.Ints["sampling.events.head_unsampled"] = 0 expectedMonitoring.Ints["sampling.events.dropped"] = 0 assertMonitoring(t, processor, expectedMonitoring, `sampling.events.*`) @@ -479,7 +485,7 @@ func TestGroupsMonitoring(t *testing.T) { go processor.Run() defer processor.Stop(context.Background()) - for i := 0; i < config.MaxDynamicServices+1; i++ { + for i := 0; i < config.MaxDynamicServices+2; i++ { err := processor.ProcessBatch(context.Background(), &model.Batch{{ Service: model.Service{Name: fmt.Sprintf("service_%d", i)}, Processor: model.TransactionProcessor, @@ -487,7 +493,7 @@ func TestGroupsMonitoring(t *testing.T) { Event: model.Event{Duration: 123 * time.Millisecond}, Transaction: &model.Transaction{ ID: "0102030405060709", - Sampled: true, + Sampled: i < config.MaxDynamicServices+1, }, }}) require.NoError(t, err) @@ -495,9 +501,11 @@ func TestGroupsMonitoring(t *testing.T) { expectedMonitoring := monitoring.MakeFlatSnapshot() expectedMonitoring.Ints["sampling.dynamic_service_groups"] = int64(config.MaxDynamicServices) - expectedMonitoring.Ints["sampling.events.processed"] = int64(config.MaxDynamicServices) + 1 + expectedMonitoring.Ints["sampling.events.processed"] = int64(config.MaxDynamicServices) + 2 expectedMonitoring.Ints["sampling.events.stored"] = int64(config.MaxDynamicServices) expectedMonitoring.Ints["sampling.events.dropped"] = 1 // final event dropped, after service limit reached + expectedMonitoring.Ints["sampling.events.sampled"] = 0 + expectedMonitoring.Ints["sampling.events.head_unsampled"] = 1 assertMonitoring(t, processor, expectedMonitoring, `sampling.events.*`, `sampling.dynamic_service_groups`) }