From 17d16e89c4610e9da423448cf363e9429d7147f4 Mon Sep 17 00:00:00 2001 From: Silvia Mitter Date: Tue, 12 Oct 2021 09:47:46 +0200 Subject: [PATCH] tail sampling: Change monitoring metrics (#6273) Introduce sampled and head_unsampled; and adapt logic for dropped to only contain trace events. implements #6269 --- changelogs/head.asciidoc | 3 ++- x-pack/apm-server/sampling/processor.go | 19 ++++++++++++++++--- x-pack/apm-server/sampling/processor_test.go | 14 +++++++++++--- 3 files changed, 29 insertions(+), 7 deletions(-) diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index 43fd958a50d..de6e4968a08 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -13,8 +13,9 @@ https://github.com/elastic/apm-server/compare/7.15\...master[View commits] - Removed `metricset.period` from service_destination metrics {pull}6111[6111] - Removed `http.request.socket` fields {pull}6152[6152] - Removed unused `transaction.duration.{count,sum.us}` metric fields {pull}6174[6174] -- Removed `apm-server.sampling.tail.storage_dir` config {pull}6236[6236] +- experimental:["This breaking change applies to the experimental tail-based sampling feature."] Removed `apm-server.sampling.tail.storage_dir` config {pull}6236[6236] - Removed `ProcessPending` self-instrumentation events {pull}6243[6243] +- experimental:["This breaking change applies to the experimental tail-based sampling feature."] Changed `apm-server.sampling.tail.events.*` metrics semantics {pull}6273[6273] - Removed warm phase from default ILM policy {pull}6322[6322] [float] diff --git a/x-pack/apm-server/sampling/processor.go b/x-pack/apm-server/sampling/processor.go index f61c359d09c..811256801a5 100644 --- a/x-pack/apm-server/sampling/processor.go +++ b/x-pack/apm-server/sampling/processor.go @@ -56,9 +56,11 @@ type Processor struct { } type eventMetrics struct { - processed int64 - dropped int64 - stored int64 + processed int64 + dropped int64 + stored int64 + sampled int64 + headUnsampled int64 } // NewProcessor returns a new Processor, for tail-sampling trace events. @@ -117,6 +119,8 @@ func (p *Processor) CollectMonitoring(_ monitoring.Mode, V monitoring.Visitor) { monitoring.ReportInt(V, "processed", atomic.LoadInt64(&p.eventMetrics.processed)) monitoring.ReportInt(V, "dropped", atomic.LoadInt64(&p.eventMetrics.dropped)) monitoring.ReportInt(V, "stored", atomic.LoadInt64(&p.eventMetrics.stored)) + monitoring.ReportInt(V, "sampled", atomic.LoadInt64(&p.eventMetrics.sampled)) + monitoring.ReportInt(V, "head_unsampled", atomic.LoadInt64(&p.eventMetrics.headUnsampled)) }) } @@ -156,6 +160,8 @@ func (p *Processor) ProcessBatch(ctx context.Context, batch *model.Batch) error if err != nil { return err } + default: + continue } if !report { // We shouldn't report this event, so remove it from the slice. @@ -164,6 +170,7 @@ func (p *Processor) ProcessBatch(ctx context.Context, batch *model.Batch) error events = events[:n-1] i-- } + p.updateProcessorMetrics(report, stored) } *batch = events @@ -191,6 +198,7 @@ func (p *Processor) processTransaction(event *model.APMEvent) (report, stored bo if !event.Transaction.Sampled { // (Head-based) unsampled transactions are passed through // by the tail sampler. + atomic.AddInt64(&p.eventMetrics.headUnsampled, 1) return true, false, nil } @@ -200,6 +208,9 @@ func (p *Processor) processTransaction(event *model.APMEvent) (report, stored bo // Tail-sampling decision has been made: report the transaction // if it was sampled. report := traceSampled + if report { + atomic.AddInt64(&p.eventMetrics.sampled, 1) + } return report, false, nil case eventstorage.ErrNotFound: // Tail-sampling decision has not yet been made. @@ -263,6 +274,7 @@ func (p *Processor) processSpan(event *model.APMEvent) (report, stored bool, _ e if !traceSampled { return false, false, nil } + atomic.AddInt64(&p.eventMetrics.sampled, 1) return true, false, nil } @@ -460,6 +472,7 @@ func (p *Processor) Run() error { } } } + atomic.AddInt64(&p.eventMetrics.sampled, int64(len(events))) if err := p.config.BatchProcessor.ProcessBatch(ctx, &events); err != nil { p.logger.With(logp.Error(err)).Warn("failed to report events") } diff --git a/x-pack/apm-server/sampling/processor_test.go b/x-pack/apm-server/sampling/processor_test.go index 50ec90162a4..b19fbd000c0 100644 --- a/x-pack/apm-server/sampling/processor_test.go +++ b/x-pack/apm-server/sampling/processor_test.go @@ -119,7 +119,9 @@ func TestProcessAlreadyTailSampled(t *testing.T) { expectedMonitoring := monitoring.MakeFlatSnapshot() expectedMonitoring.Ints["sampling.events.processed"] = 4 + expectedMonitoring.Ints["sampling.events.head_unsampled"] = 0 expectedMonitoring.Ints["sampling.events.stored"] = 2 + expectedMonitoring.Ints["sampling.events.sampled"] = 2 expectedMonitoring.Ints["sampling.events.dropped"] = 0 assertMonitoring(t, processor, expectedMonitoring, `sampling.events.*`) @@ -223,6 +225,8 @@ func TestProcessLocalTailSampling(t *testing.T) { expectedMonitoring := monitoring.MakeFlatSnapshot() expectedMonitoring.Ints["sampling.events.processed"] = 4 expectedMonitoring.Ints["sampling.events.stored"] = 4 + expectedMonitoring.Ints["sampling.events.sampled"] = 2 + expectedMonitoring.Ints["sampling.events.head_unsampled"] = 0 expectedMonitoring.Ints["sampling.events.dropped"] = 0 assertMonitoring(t, processor, expectedMonitoring, `sampling.events.*`) @@ -440,6 +444,8 @@ func TestProcessRemoteTailSampling(t *testing.T) { expectedMonitoring := monitoring.MakeFlatSnapshot() expectedMonitoring.Ints["sampling.events.processed"] = 1 expectedMonitoring.Ints["sampling.events.stored"] = 1 + expectedMonitoring.Ints["sampling.events.sampled"] = 1 + expectedMonitoring.Ints["sampling.events.head_unsampled"] = 0 expectedMonitoring.Ints["sampling.events.dropped"] = 0 assertMonitoring(t, processor, expectedMonitoring, `sampling.events.*`) @@ -479,7 +485,7 @@ func TestGroupsMonitoring(t *testing.T) { go processor.Run() defer processor.Stop(context.Background()) - for i := 0; i < config.MaxDynamicServices+1; i++ { + for i := 0; i < config.MaxDynamicServices+2; i++ { err := processor.ProcessBatch(context.Background(), &model.Batch{{ Service: model.Service{Name: fmt.Sprintf("service_%d", i)}, Processor: model.TransactionProcessor, @@ -487,7 +493,7 @@ func TestGroupsMonitoring(t *testing.T) { Event: model.Event{Duration: 123 * time.Millisecond}, Transaction: &model.Transaction{ ID: "0102030405060709", - Sampled: true, + Sampled: i < config.MaxDynamicServices+1, }, }}) require.NoError(t, err) @@ -495,9 +501,11 @@ func TestGroupsMonitoring(t *testing.T) { expectedMonitoring := monitoring.MakeFlatSnapshot() expectedMonitoring.Ints["sampling.dynamic_service_groups"] = int64(config.MaxDynamicServices) - expectedMonitoring.Ints["sampling.events.processed"] = int64(config.MaxDynamicServices) + 1 + expectedMonitoring.Ints["sampling.events.processed"] = int64(config.MaxDynamicServices) + 2 expectedMonitoring.Ints["sampling.events.stored"] = int64(config.MaxDynamicServices) expectedMonitoring.Ints["sampling.events.dropped"] = 1 // final event dropped, after service limit reached + expectedMonitoring.Ints["sampling.events.sampled"] = 0 + expectedMonitoring.Ints["sampling.events.head_unsampled"] = 1 assertMonitoring(t, processor, expectedMonitoring, `sampling.events.*`, `sampling.dynamic_service_groups`) }