diff --git a/processor/groupbyattrsprocessor/attribute_groups.go b/processor/groupbyattrsprocessor/attribute_groups.go index 165896906cf9..02d982b1302c 100644 --- a/processor/groupbyattrsprocessor/attribute_groups.go +++ b/processor/groupbyattrsprocessor/attribute_groups.go @@ -73,39 +73,6 @@ func matchingScopeMetrics(rm pmetric.ResourceMetrics, library pcommon.Instrument return ilm } -// spansGroupedByAttrs keeps all found grouping attributes for spans, together with the matching records -type spansGroupedByAttrs struct { - ptrace.ResourceSpansSlice -} - -// logsGroupedByAttrs keeps all found grouping attributes for logs, together with the matching records -type logsGroupedByAttrs struct { - plog.ResourceLogsSlice -} - -// metricsGroupedByAttrs keeps all found grouping attributes for metrics, together with the matching records -type metricsGroupedByAttrs struct { - pmetric.ResourceMetricsSlice -} - -func newLogsGroupedByAttrs() *logsGroupedByAttrs { - return &logsGroupedByAttrs{ - ResourceLogsSlice: plog.NewResourceLogsSlice(), - } -} - -func newSpansGroupedByAttrs() *spansGroupedByAttrs { - return &spansGroupedByAttrs{ - ResourceSpansSlice: ptrace.NewResourceSpansSlice(), - } -} - -func newMetricsGroupedByAttrs() *metricsGroupedByAttrs { - return &metricsGroupedByAttrs{ - ResourceMetricsSlice: pmetric.NewResourceMetricsSlice(), - } -} - // Build the Attributes that we'll be looking for in existing Resources as a merge of the Attributes // of the original Resource with the requested Attributes func buildReferenceAttributes(originResource pcommon.Resource, requiredAttributes pcommon.Map) pcommon.Map { @@ -142,44 +109,9 @@ func resourceMatches(resource pcommon.Resource, referenceAttributes pcommon.Map) return matching } -// findResource searches for an existing plog.ResourceLogs that strictly matches with the specified reference -// Attributes. Returns the matching plog.ResourceLogs and bool value which is set to true if found -func (lgba logsGroupedByAttrs) findResource(referenceAttributes pcommon.Map) (plog.ResourceLogs, bool) { - for i := 0; i < lgba.Len(); i++ { - if resourceMatches(lgba.At(i).Resource(), referenceAttributes) { - return lgba.At(i), true - } - } - return plog.ResourceLogs{}, false -} - -// findResource searches for an existing plog.ResourceLogs that strictly matches with the specified reference -// Attributes. Returns the matching plog.ResourceLogs and bool value which is set to true if found -func (sgba spansGroupedByAttrs) findResource(referenceAttributes pcommon.Map) (ptrace.ResourceSpans, bool) { - for i := 0; i < sgba.Len(); i++ { - if resourceMatches(sgba.At(i).Resource(), referenceAttributes) { - return sgba.At(i), true - } - } - return ptrace.ResourceSpans{}, false -} - -// findResource searches for an existing pmetric.ResourceMetrics that strictly matches with the specified reference -// Attributes. Returns the matching pmetric.ResourceMetrics and bool value which is set to true if found -func (mgba metricsGroupedByAttrs) findResource(referenceAttributes pcommon.Map) (pmetric.ResourceMetrics, bool) { - - for i := 0; i < mgba.Len(); i++ { - if resourceMatches(mgba.At(i).Resource(), referenceAttributes) { - return mgba.At(i), true - } - } - return pmetric.ResourceMetrics{}, false -} - // Update the specified (and new) Resource with the properties of the original Resource, and with the // required Attributes func updateResourceToMatch(newResource pcommon.Resource, originResource pcommon.Resource, requiredAttributes pcommon.Map) { - originResource.CopyTo(newResource) // This prioritizes required attributes over the original resource attributes, if they overlap @@ -188,62 +120,61 @@ func updateResourceToMatch(newResource pcommon.Resource, originResource pcommon. v.CopyTo(attrs.PutEmpty(k)) return true }) - } // findOrCreateResource searches for a Resource with matching attributes and returns it. If nothing is found, it is being created -func (sgba *spansGroupedByAttrs) findOrCreateResource(originResource pcommon.Resource, requiredAttributes pcommon.Map) ptrace.ResourceSpans { +func findOrCreateResourceSpans(traces ptrace.Traces, originResource pcommon.Resource, requiredAttributes pcommon.Map) ptrace.ResourceSpans { // Build the reference attributes that we're looking for in Resources referenceAttributes := buildReferenceAttributes(originResource, requiredAttributes) - // Do we have a matching Resource? - resource, found := sgba.findResource(referenceAttributes) - if found { - return resource + rss := traces.ResourceSpans() + for i := 0; i < rss.Len(); i++ { + if resourceMatches(rss.At(i).Resource(), referenceAttributes) { + return rss.At(i) + } } - // Not found: create a new resource - resource = sgba.AppendEmpty() - updateResourceToMatch(resource.Resource(), originResource, requiredAttributes) - return resource - + // Not found: append a new ResourceSpans. + rs := rss.AppendEmpty() + updateResourceToMatch(rs.Resource(), originResource, requiredAttributes) + return rs } -// findResourceOrElseCreate searches for a Resource with matching attributes and returns it. If nothing is found, it is being created -func (lgba *logsGroupedByAttrs) findResourceOrElseCreate(originResource pcommon.Resource, requiredAttributes pcommon.Map) plog.ResourceLogs { +// findOrCreateResourceLogs searches for a Resource with matching attributes and returns it. If nothing is found, it is being created +func findOrCreateResourceLogs(logs plog.Logs, originResource pcommon.Resource, requiredAttributes pcommon.Map) plog.ResourceLogs { // Build the reference attributes that we're looking for in Resources referenceAttributes := buildReferenceAttributes(originResource, requiredAttributes) - // Do we have a matching Resource? - resource, found := lgba.findResource(referenceAttributes) - if found { - return resource + rls := logs.ResourceLogs() + for i := 0; i < rls.Len(); i++ { + if resourceMatches(rls.At(i).Resource(), referenceAttributes) { + return rls.At(i) + } } - // Not found: create a new resource - resource = lgba.AppendEmpty() - updateResourceToMatch(resource.Resource(), originResource, requiredAttributes) - return resource - + // Not found: append a new ResourceLogs + rl := rls.AppendEmpty() + updateResourceToMatch(rl.Resource(), originResource, requiredAttributes) + return rl } -// findResourceOrElseCreate searches for a Resource with matching attributes and returns it. If nothing is found, it is being created -func (mgba *metricsGroupedByAttrs) findResourceOrElseCreate(originResource pcommon.Resource, requiredAttributes pcommon.Map) pmetric.ResourceMetrics { +// findOrCreateResourceMetrics searches for a Resource with matching attributes and returns it. If nothing is found, it is being created +func findOrCreateResourceMetrics(metrics pmetric.Metrics, originResource pcommon.Resource, requiredAttributes pcommon.Map) pmetric.ResourceMetrics { // Build the reference attributes that we're looking for in Resources referenceAttributes := buildReferenceAttributes(originResource, requiredAttributes) - // Do we have a matching Resource? - resource, found := mgba.findResource(referenceAttributes) - if found { - return resource + rms := metrics.ResourceMetrics() + for i := 0; i < rms.Len(); i++ { + if resourceMatches(rms.At(i).Resource(), referenceAttributes) { + return rms.At(i) + } } - // Not found: create a new resource - resource = mgba.AppendEmpty() - updateResourceToMatch(resource.Resource(), originResource, requiredAttributes) - return resource - + // Not found: append a new ResourceMetrics + rm := rms.AppendEmpty() + updateResourceToMatch(rm.Resource(), originResource, requiredAttributes) + return rm } diff --git a/processor/groupbyattrsprocessor/attribute_groups_test.go b/processor/groupbyattrsprocessor/attribute_groups_test.go index 799b27f24d97..4258b42025c2 100644 --- a/processor/groupbyattrsprocessor/attribute_groups_test.go +++ b/processor/groupbyattrsprocessor/attribute_groups_test.go @@ -57,10 +57,9 @@ func randomGroups(count int) []pcommon.Map { } var ( - count = 1000 - groups = randomGroups(count) - res = simpleResource() - lagAttrs = newLogsGroupedByAttrs() + count = 1000 + groups = randomGroups(count) + res = simpleResource() ) func TestResourceAttributeScenarios(t *testing.T) { @@ -107,6 +106,7 @@ func TestResourceAttributeScenarios(t *testing.T) { }, } + logs := plog.NewLogs() for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { recordAttributeMap := pcommon.NewMap() @@ -119,7 +119,7 @@ func TestResourceAttributeScenarios(t *testing.T) { tt.fillExpectedResourceFun(tt.baseResource, expectedResource) } - rl := lagAttrs.findResourceOrElseCreate(tt.baseResource, recordAttributeMap) + rl := findOrCreateResourceLogs(logs, tt.baseResource, recordAttributeMap) assert.EqualValues(t, expectedResource.Attributes(), rl.Resource().Attributes()) }) } @@ -158,5 +158,9 @@ func TestInstrumentationLibraryMatching(t *testing.T) { } func BenchmarkAttrGrouping(b *testing.B) { - lagAttrs.findResourceOrElseCreate(res, groups[rand.Intn(count)]) + logs := plog.NewLogs() + b.ReportAllocs() + for n := 0; n < b.N; n++ { + findOrCreateResourceLogs(logs, res, groups[rand.Intn(count)]) + } } diff --git a/processor/groupbyattrsprocessor/processor.go b/processor/groupbyattrsprocessor/processor.go index a8eef76e76f9..8dd47a4ea690 100644 --- a/processor/groupbyattrsprocessor/processor.go +++ b/processor/groupbyattrsprocessor/processor.go @@ -33,7 +33,7 @@ type groupByAttrsProcessor struct { // ProcessTraces process traces and groups traces by attribute. func (gap *groupByAttrsProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { rss := td.ResourceSpans() - groupedResourceSpans := newSpansGroupedByAttrs() + groupedTraces := ptrace.NewTraces() for i := 0; i < rss.Len(); i++ { rs := rss.At(i) @@ -56,16 +56,14 @@ func (gap *groupByAttrsProcessor) processTraces(ctx context.Context, td ptrace.T // Lets combine the base resource attributes + the extracted (grouped) attributes // and keep them in the grouping entry - groupedSpans := groupedResourceSpans.findOrCreateResource(rs.Resource(), requiredAttributes) - sp := matchingScopeSpans(groupedSpans, ils.Scope()).Spans().AppendEmpty() + groupedResourceSpans := findOrCreateResourceSpans(groupedTraces, rs.Resource(), requiredAttributes) + sp := matchingScopeSpans(groupedResourceSpans, ils.Scope()).Spans().AppendEmpty() span.CopyTo(sp) } } } // Copy the grouped data into output - groupedTraces := ptrace.NewTraces() - groupedResourceSpans.MoveAndAppendTo(groupedTraces.ResourceSpans()) stats.Record(ctx, mDistSpanGroups.M(int64(groupedTraces.ResourceSpans().Len()))) return groupedTraces, nil @@ -73,7 +71,7 @@ func (gap *groupByAttrsProcessor) processTraces(ctx context.Context, td ptrace.T func (gap *groupByAttrsProcessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) { rl := ld.ResourceLogs() - groupedResourceLogs := newLogsGroupedByAttrs() + groupedLogs := plog.NewLogs() for i := 0; i < rl.Len(); i++ { ls := rl.At(i) @@ -96,8 +94,8 @@ func (gap *groupByAttrsProcessor) processLogs(ctx context.Context, ld plog.Logs) // Lets combine the base resource attributes + the extracted (grouped) attributes // and keep them in the grouping entry - groupedLogs := groupedResourceLogs.findResourceOrElseCreate(ls.Resource(), requiredAttributes) - lr := matchingScopeLogs(groupedLogs, sl.Scope()).LogRecords().AppendEmpty() + groupedResourceLogs := findOrCreateResourceLogs(groupedLogs, ls.Resource(), requiredAttributes) + lr := matchingScopeLogs(groupedResourceLogs, sl.Scope()).LogRecords().AppendEmpty() log.CopyTo(lr) } } @@ -105,8 +103,6 @@ func (gap *groupByAttrsProcessor) processLogs(ctx context.Context, ld plog.Logs) } // Copy the grouped data into output - groupedLogs := plog.NewLogs() - groupedResourceLogs.MoveAndAppendTo(groupedLogs.ResourceLogs()) stats.Record(ctx, mDistLogGroups.M(int64(groupedLogs.ResourceLogs().Len()))) return groupedLogs, nil @@ -114,7 +110,7 @@ func (gap *groupByAttrsProcessor) processLogs(ctx context.Context, ld plog.Logs) func (gap *groupByAttrsProcessor) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { rms := md.ResourceMetrics() - groupedResourceMetrics := newMetricsGroupedByAttrs() + groupedMetrics := pmetric.NewMetrics() for i := 0; i < rms.Len(); i++ { rm := rms.At(i) @@ -130,35 +126,35 @@ func (gap *groupByAttrsProcessor) processMetrics(ctx context.Context, md pmetric case pmetric.MetricTypeGauge: for pointIndex := 0; pointIndex < metric.Gauge().DataPoints().Len(); pointIndex++ { dataPoint := metric.Gauge().DataPoints().At(pointIndex) - groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, groupedResourceMetrics, rm, ilm, metric, dataPoint.Attributes()) + groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, groupedMetrics, rm, ilm, metric, dataPoint.Attributes()) dataPoint.CopyTo(groupedMetric.Gauge().DataPoints().AppendEmpty()) } case pmetric.MetricTypeSum: for pointIndex := 0; pointIndex < metric.Sum().DataPoints().Len(); pointIndex++ { dataPoint := metric.Sum().DataPoints().At(pointIndex) - groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, groupedResourceMetrics, rm, ilm, metric, dataPoint.Attributes()) + groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, groupedMetrics, rm, ilm, metric, dataPoint.Attributes()) dataPoint.CopyTo(groupedMetric.Sum().DataPoints().AppendEmpty()) } case pmetric.MetricTypeSummary: for pointIndex := 0; pointIndex < metric.Summary().DataPoints().Len(); pointIndex++ { dataPoint := metric.Summary().DataPoints().At(pointIndex) - groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, groupedResourceMetrics, rm, ilm, metric, dataPoint.Attributes()) + groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, groupedMetrics, rm, ilm, metric, dataPoint.Attributes()) dataPoint.CopyTo(groupedMetric.Summary().DataPoints().AppendEmpty()) } case pmetric.MetricTypeHistogram: for pointIndex := 0; pointIndex < metric.Histogram().DataPoints().Len(); pointIndex++ { dataPoint := metric.Histogram().DataPoints().At(pointIndex) - groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, groupedResourceMetrics, rm, ilm, metric, dataPoint.Attributes()) + groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, groupedMetrics, rm, ilm, metric, dataPoint.Attributes()) dataPoint.CopyTo(groupedMetric.Histogram().DataPoints().AppendEmpty()) } case pmetric.MetricTypeExponentialHistogram: for pointIndex := 0; pointIndex < metric.ExponentialHistogram().DataPoints().Len(); pointIndex++ { dataPoint := metric.ExponentialHistogram().DataPoints().At(pointIndex) - groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, groupedResourceMetrics, rm, ilm, metric, dataPoint.Attributes()) + groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, groupedMetrics, rm, ilm, metric, dataPoint.Attributes()) dataPoint.CopyTo(groupedMetric.ExponentialHistogram().DataPoints().AppendEmpty()) } @@ -167,9 +163,6 @@ func (gap *groupByAttrsProcessor) processMetrics(ctx context.Context, md pmetric } } - // Copy the grouped data into output - groupedMetrics := pmetric.NewMetrics() - groupedResourceMetrics.MoveAndAppendTo(groupedMetrics.ResourceMetrics()) stats.Record(ctx, mDistMetricGroups.M(int64(groupedMetrics.ResourceMetrics().Len()))) return groupedMetrics, nil @@ -248,7 +241,7 @@ func getMetricInInstrumentationLibrary(ilm pmetric.ScopeMetrics, searchedMetric // Returns the Metric in the appropriate Resource matching with the specified Attributes func (gap *groupByAttrsProcessor) getGroupedMetricsFromAttributes( ctx context.Context, - groupedResourceMetrics *metricsGroupedByAttrs, + groupedMetrics pmetric.Metrics, originResourceMetrics pmetric.ResourceMetrics, ilm pmetric.ScopeMetrics, metric pmetric.Metric, @@ -266,10 +259,10 @@ func (gap *groupByAttrsProcessor) getGroupedMetricsFromAttributes( } // Get the ResourceMetrics matching with these attributes - groupedResource := groupedResourceMetrics.findResourceOrElseCreate(originResourceMetrics.Resource(), requiredAttributes) + groupedResourceMetrics := findOrCreateResourceMetrics(groupedMetrics, originResourceMetrics.Resource(), requiredAttributes) // Get the corresponding instrumentation library - groupedInstrumentationLibrary := matchingScopeMetrics(groupedResource, ilm.Scope()) + groupedInstrumentationLibrary := matchingScopeMetrics(groupedResourceMetrics, ilm.Scope()) // Return the metric in this resource return getMetricInInstrumentationLibrary(groupedInstrumentationLibrary, metric)