Skip to content

Commit

Permalink
tail sampling: Change monitoring metrics (elastic#6273)
Browse files Browse the repository at this point in the history
Introduce sampled and head_unsampled; and adapt logic for dropped to only contain trace events.

implements elastic#6269
  • Loading branch information
simitt authored Oct 12, 2021
1 parent c3d27d4 commit 17d16e8
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 7 deletions.
3 changes: 2 additions & 1 deletion changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ https://github.com/elastic/apm-server/compare/7.15\...master[View commits]
- Removed `metricset.period` from service_destination metrics {pull}6111[6111]
- Removed `http.request.socket` fields {pull}6152[6152]
- Removed unused `transaction.duration.{count,sum.us}` metric fields {pull}6174[6174]
- Removed `apm-server.sampling.tail.storage_dir` config {pull}6236[6236]
- experimental:["This breaking change applies to the experimental tail-based sampling feature."] Removed `apm-server.sampling.tail.storage_dir` config {pull}6236[6236]
- Removed `ProcessPending` self-instrumentation events {pull}6243[6243]
- experimental:["This breaking change applies to the experimental tail-based sampling feature."] Changed `apm-server.sampling.tail.events.*` metrics semantics {pull}6273[6273]
- Removed warm phase from default ILM policy {pull}6322[6322]

[float]
Expand Down
19 changes: 16 additions & 3 deletions x-pack/apm-server/sampling/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ type Processor struct {
}

type eventMetrics struct {
processed int64
dropped int64
stored int64
processed int64
dropped int64
stored int64
sampled int64
headUnsampled int64
}

// NewProcessor returns a new Processor, for tail-sampling trace events.
Expand Down Expand Up @@ -117,6 +119,8 @@ func (p *Processor) CollectMonitoring(_ monitoring.Mode, V monitoring.Visitor) {
monitoring.ReportInt(V, "processed", atomic.LoadInt64(&p.eventMetrics.processed))
monitoring.ReportInt(V, "dropped", atomic.LoadInt64(&p.eventMetrics.dropped))
monitoring.ReportInt(V, "stored", atomic.LoadInt64(&p.eventMetrics.stored))
monitoring.ReportInt(V, "sampled", atomic.LoadInt64(&p.eventMetrics.sampled))
monitoring.ReportInt(V, "head_unsampled", atomic.LoadInt64(&p.eventMetrics.headUnsampled))
})
}

Expand Down Expand Up @@ -156,6 +160,8 @@ func (p *Processor) ProcessBatch(ctx context.Context, batch *model.Batch) error
if err != nil {
return err
}
default:
continue
}
if !report {
// We shouldn't report this event, so remove it from the slice.
Expand All @@ -164,6 +170,7 @@ func (p *Processor) ProcessBatch(ctx context.Context, batch *model.Batch) error
events = events[:n-1]
i--
}

p.updateProcessorMetrics(report, stored)
}
*batch = events
Expand Down Expand Up @@ -191,6 +198,7 @@ func (p *Processor) processTransaction(event *model.APMEvent) (report, stored bo
if !event.Transaction.Sampled {
// (Head-based) unsampled transactions are passed through
// by the tail sampler.
atomic.AddInt64(&p.eventMetrics.headUnsampled, 1)
return true, false, nil
}

Expand All @@ -200,6 +208,9 @@ func (p *Processor) processTransaction(event *model.APMEvent) (report, stored bo
// Tail-sampling decision has been made: report the transaction
// if it was sampled.
report := traceSampled
if report {
atomic.AddInt64(&p.eventMetrics.sampled, 1)
}
return report, false, nil
case eventstorage.ErrNotFound:
// Tail-sampling decision has not yet been made.
Expand Down Expand Up @@ -263,6 +274,7 @@ func (p *Processor) processSpan(event *model.APMEvent) (report, stored bool, _ e
if !traceSampled {
return false, false, nil
}
atomic.AddInt64(&p.eventMetrics.sampled, 1)
return true, false, nil
}

Expand Down Expand Up @@ -460,6 +472,7 @@ func (p *Processor) Run() error {
}
}
}
atomic.AddInt64(&p.eventMetrics.sampled, int64(len(events)))
if err := p.config.BatchProcessor.ProcessBatch(ctx, &events); err != nil {
p.logger.With(logp.Error(err)).Warn("failed to report events")
}
Expand Down
14 changes: 11 additions & 3 deletions x-pack/apm-server/sampling/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ func TestProcessAlreadyTailSampled(t *testing.T) {

expectedMonitoring := monitoring.MakeFlatSnapshot()
expectedMonitoring.Ints["sampling.events.processed"] = 4
expectedMonitoring.Ints["sampling.events.head_unsampled"] = 0
expectedMonitoring.Ints["sampling.events.stored"] = 2
expectedMonitoring.Ints["sampling.events.sampled"] = 2
expectedMonitoring.Ints["sampling.events.dropped"] = 0
assertMonitoring(t, processor, expectedMonitoring, `sampling.events.*`)

Expand Down Expand Up @@ -223,6 +225,8 @@ func TestProcessLocalTailSampling(t *testing.T) {
expectedMonitoring := monitoring.MakeFlatSnapshot()
expectedMonitoring.Ints["sampling.events.processed"] = 4
expectedMonitoring.Ints["sampling.events.stored"] = 4
expectedMonitoring.Ints["sampling.events.sampled"] = 2
expectedMonitoring.Ints["sampling.events.head_unsampled"] = 0
expectedMonitoring.Ints["sampling.events.dropped"] = 0
assertMonitoring(t, processor, expectedMonitoring, `sampling.events.*`)

Expand Down Expand Up @@ -440,6 +444,8 @@ func TestProcessRemoteTailSampling(t *testing.T) {
expectedMonitoring := monitoring.MakeFlatSnapshot()
expectedMonitoring.Ints["sampling.events.processed"] = 1
expectedMonitoring.Ints["sampling.events.stored"] = 1
expectedMonitoring.Ints["sampling.events.sampled"] = 1
expectedMonitoring.Ints["sampling.events.head_unsampled"] = 0
expectedMonitoring.Ints["sampling.events.dropped"] = 0
assertMonitoring(t, processor, expectedMonitoring, `sampling.events.*`)

Expand Down Expand Up @@ -479,25 +485,27 @@ func TestGroupsMonitoring(t *testing.T) {
go processor.Run()
defer processor.Stop(context.Background())

for i := 0; i < config.MaxDynamicServices+1; i++ {
for i := 0; i < config.MaxDynamicServices+2; i++ {
err := processor.ProcessBatch(context.Background(), &model.Batch{{
Service: model.Service{Name: fmt.Sprintf("service_%d", i)},
Processor: model.TransactionProcessor,
Trace: model.Trace{ID: uuid.Must(uuid.NewV4()).String()},
Event: model.Event{Duration: 123 * time.Millisecond},
Transaction: &model.Transaction{
ID: "0102030405060709",
Sampled: true,
Sampled: i < config.MaxDynamicServices+1,
},
}})
require.NoError(t, err)
}

expectedMonitoring := monitoring.MakeFlatSnapshot()
expectedMonitoring.Ints["sampling.dynamic_service_groups"] = int64(config.MaxDynamicServices)
expectedMonitoring.Ints["sampling.events.processed"] = int64(config.MaxDynamicServices) + 1
expectedMonitoring.Ints["sampling.events.processed"] = int64(config.MaxDynamicServices) + 2
expectedMonitoring.Ints["sampling.events.stored"] = int64(config.MaxDynamicServices)
expectedMonitoring.Ints["sampling.events.dropped"] = 1 // final event dropped, after service limit reached
expectedMonitoring.Ints["sampling.events.sampled"] = 0
expectedMonitoring.Ints["sampling.events.head_unsampled"] = 1
assertMonitoring(t, processor, expectedMonitoring, `sampling.events.*`, `sampling.dynamic_service_groups`)
}

Expand Down

0 comments on commit 17d16e8

Please sign in to comment.