Skip to content

Commit

Permalink
tail sampling: Change monitoring metrics
Browse files Browse the repository at this point in the history
Introduce sampled, head_unsampled and unprocessed.
Adapt logic for dropped to only contain trace events.

implements elastic#6269
  • Loading branch information
simitt committed Oct 4, 2021
1 parent 79b43c1 commit d8facd8
Showing 1 changed file with 19 additions and 4 deletions.
23 changes: 19 additions & 4 deletions x-pack/apm-server/sampling/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,12 @@ type Processor struct {
}

type eventMetrics struct {
processed int64
dropped int64
stored int64
processed int64
dropped int64
stored int64
sampled int64
head_unsampled int64
unprocessed int64
}

// NewProcessor returns a new Processor, for tail-sampling trace events.
Expand Down Expand Up @@ -117,6 +120,9 @@ func (p *Processor) CollectMonitoring(_ monitoring.Mode, V monitoring.Visitor) {
monitoring.ReportInt(V, "processed", atomic.LoadInt64(&p.eventMetrics.processed))
monitoring.ReportInt(V, "dropped", atomic.LoadInt64(&p.eventMetrics.dropped))
monitoring.ReportInt(V, "stored", atomic.LoadInt64(&p.eventMetrics.stored))
monitoring.ReportInt(V, "sampled", atomic.LoadInt64(&p.eventMetrics.sampled))
monitoring.ReportInt(V, "head_unsampled", atomic.LoadInt64(&p.eventMetrics.head_unsampled))
monitoring.ReportInt(V, "unprocessed", atomic.LoadInt64(&p.eventMetrics.unprocessed))
})
}

Expand Down Expand Up @@ -149,13 +155,17 @@ func (p *Processor) ProcessBatch(ctx context.Context, batch *model.Batch) error
if err != nil {
return err
}
p.updateProcessorMetrics(report, stored)
case model.SpanProcessor:
var err error
atomic.AddInt64(&p.eventMetrics.processed, 1)
report, stored, err = p.processSpan(event)
if err != nil {
return err
}
p.updateProcessorMetrics(report, stored)
default:
atomic.AddInt64(&p.eventMetrics.unprocessed, 1)
}
if !report {
// We shouldn't report this event, so remove it from the slice.
Expand All @@ -164,7 +174,6 @@ func (p *Processor) ProcessBatch(ctx context.Context, batch *model.Batch) error
events = events[:n-1]
i--
}
p.updateProcessorMetrics(report, stored)
}
*batch = events
return nil
Expand All @@ -191,6 +200,7 @@ func (p *Processor) processTransaction(event *model.APMEvent) (report, stored bo
if !event.Transaction.Sampled {
// (Head-based) unsampled transactions are passed through
// by the tail sampler.
atomic.AddInt64(&p.eventMetrics.head_unsampled, 1)
return true, false, nil
}

Expand All @@ -200,6 +210,9 @@ func (p *Processor) processTransaction(event *model.APMEvent) (report, stored bo
// Tail-sampling decision has been made: report the transaction
// if it was sampled.
report := traceSampled
if report {
atomic.AddInt64(&p.eventMetrics.sampled, 1)
}
return report, false, nil
case eventstorage.ErrNotFound:
// Tail-sampling decision has not yet been made.
Expand Down Expand Up @@ -263,6 +276,7 @@ func (p *Processor) processSpan(event *model.APMEvent) (report, stored bool, _ e
if !traceSampled {
return false, false, nil
}
atomic.AddInt64(&p.eventMetrics.sampled, 1)
return true, false, nil
}

Expand Down Expand Up @@ -460,6 +474,7 @@ func (p *Processor) Run() error {
}
}
}
atomic.AddInt64(&p.eventMetrics.processed, int64(len(events)))
if err := p.config.BatchProcessor.ProcessBatch(ctx, &events); err != nil {
p.logger.With(logp.Error(err)).Warn("failed to report events")
}
Expand Down

0 comments on commit d8facd8

Please sign in to comment.