From b2099c5b4ef281adafdaabbc893a4c6f171119ff Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Thu, 5 Aug 2021 17:20:59 +0800 Subject: [PATCH] aggregation/spanmetrics: handle composite spans Update spanmetrics to take into account composite spans, multiplying the span count by the composite count and using the reported composite span duration instead of the "gross" duration. --- changelogs/head.asciidoc | 1 + .../aggregation/spanmetrics/aggregator.go | 14 ++++++- .../spanmetrics/aggregator_test.go | 39 +++++++++++++++++++ 3 files changed, 52 insertions(+), 2 deletions(-) diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index 7e3fcd27b8c..bf2412656f4 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -19,6 +19,7 @@ https://github.com/elastic/apm-server/compare/7.13\...master[View commits] [float] ==== Added +- `service_destination` span metrics now take into account composite spans {pull}5896[5896] [float] ==== Deprecated diff --git a/x-pack/apm-server/aggregation/spanmetrics/aggregator.go b/x-pack/apm-server/aggregation/spanmetrics/aggregator.go index efa99516e4f..8eef44523fe 100644 --- a/x-pack/apm-server/aggregation/spanmetrics/aggregator.go +++ b/x-pack/apm-server/aggregation/spanmetrics/aggregator.go @@ -198,6 +198,17 @@ func (a *Aggregator) processSpan(event *model.APMEvent) model.APMEvent { return model.APMEvent{} } + // For composite spans we use the composite sum duration, which is the sum of + // pre-aggregated spans and excludes time gaps that are counted in the reported + // span duration. For non-composite spans we just use the reported span duration. + count := 1 + durationMillis := event.Span.Duration + if event.Span.Composite != nil { + count = event.Span.Composite.Count + durationMillis = event.Span.Composite.Sum + } + duration := time.Duration(durationMillis * float64(time.Millisecond)) + key := aggregationKey{ serviceEnvironment: event.Service.Environment, serviceName: event.Service.Name, @@ -205,9 +216,8 @@ func (a *Aggregator) processSpan(event *model.APMEvent) model.APMEvent { outcome: event.Event.Outcome, resource: event.Span.DestinationService.Resource, } - duration := time.Duration(event.Span.Duration * float64(time.Millisecond)) metrics := spanMetrics{ - count: event.Span.RepresentativeCount, + count: float64(count) * event.Span.RepresentativeCount, sum: float64(duration.Microseconds()) * event.Span.RepresentativeCount, } if a.active.storeOrUpdate(key, metrics) { diff --git a/x-pack/apm-server/aggregation/spanmetrics/aggregator_test.go b/x-pack/apm-server/aggregation/spanmetrics/aggregator_test.go index 2f4582b1819..e0a750fce1f 100644 --- a/x-pack/apm-server/aggregation/spanmetrics/aggregator_test.go +++ b/x-pack/apm-server/aggregation/spanmetrics/aggregator_test.go @@ -183,6 +183,45 @@ func TestAggregatorRun(t *testing.T) { } } +func TestAggregateCompositeSpan(t *testing.T) { + batches := make(chan model.Batch, 1) + agg, err := NewAggregator(AggregatorConfig{ + BatchProcessor: makeChanBatchProcessor(batches), + Interval: 10 * time.Millisecond, + MaxGroups: 1000, + }) + require.NoError(t, err) + + span := makeSpan("service-A", "java", "final_destination", "success", time.Second, 2) + span.Span.Composite = &model.Composite{Count: 25, Sum: 700 /* milliseconds */} + err = agg.ProcessBatch(context.Background(), &model.Batch{span}) + require.NoError(t, err) + + // Start the aggregator after processing to ensure metrics are aggregated deterministically. + go agg.Run() + defer agg.Stop(context.Background()) + + batch := expectBatch(t, batches) + metricsets := batchMetricsets(t, batch) + + assert.Equal(t, []model.APMEvent{{ + Agent: model.Agent{Name: "java"}, + Service: model.Service{Name: "service-A"}, + Event: model.Event{Outcome: "success"}, + Metricset: &model.Metricset{ + Name: "service_destination", + Span: model.MetricsetSpan{ + DestinationService: model.DestinationService{Resource: "final_destination"}, + }, + Samples: map[string]model.MetricsetSample{ + "span.destination.service.response_time.count": {Value: 50.0}, + "span.destination.service.response_time.sum.us": {Value: 1400000}, + "metricset.period": {Value: 10}, + }, + }, + }}, metricsets) +} + func TestAggregatorOverflow(t *testing.T) { batches := make(chan model.Batch, 1) agg, err := NewAggregator(AggregatorConfig{