Skip to content

Commit

Permalink
[chore] remove unnecessary allocations in groupbyattrsprocessor (#14547)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Sep 27, 2022
1 parent 1c6855d commit b60681b
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 129 deletions.
133 changes: 32 additions & 101 deletions processor/groupbyattrsprocessor/attribute_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,39 +73,6 @@ func matchingScopeMetrics(rm pmetric.ResourceMetrics, library pcommon.Instrument
return ilm
}

// spansGroupedByAttrs keeps all found grouping attributes for spans, together with the matching records
type spansGroupedByAttrs struct {
ptrace.ResourceSpansSlice
}

// logsGroupedByAttrs keeps all found grouping attributes for logs, together with the matching records
type logsGroupedByAttrs struct {
plog.ResourceLogsSlice
}

// metricsGroupedByAttrs keeps all found grouping attributes for metrics, together with the matching records
type metricsGroupedByAttrs struct {
pmetric.ResourceMetricsSlice
}

func newLogsGroupedByAttrs() *logsGroupedByAttrs {
return &logsGroupedByAttrs{
ResourceLogsSlice: plog.NewResourceLogsSlice(),
}
}

func newSpansGroupedByAttrs() *spansGroupedByAttrs {
return &spansGroupedByAttrs{
ResourceSpansSlice: ptrace.NewResourceSpansSlice(),
}
}

func newMetricsGroupedByAttrs() *metricsGroupedByAttrs {
return &metricsGroupedByAttrs{
ResourceMetricsSlice: pmetric.NewResourceMetricsSlice(),
}
}

// Build the Attributes that we'll be looking for in existing Resources as a merge of the Attributes
// of the original Resource with the requested Attributes
func buildReferenceAttributes(originResource pcommon.Resource, requiredAttributes pcommon.Map) pcommon.Map {
Expand Down Expand Up @@ -142,44 +109,9 @@ func resourceMatches(resource pcommon.Resource, referenceAttributes pcommon.Map)
return matching
}

// findResource searches for an existing plog.ResourceLogs that strictly matches with the specified reference
// Attributes. Returns the matching plog.ResourceLogs and bool value which is set to true if found
func (lgba logsGroupedByAttrs) findResource(referenceAttributes pcommon.Map) (plog.ResourceLogs, bool) {
for i := 0; i < lgba.Len(); i++ {
if resourceMatches(lgba.At(i).Resource(), referenceAttributes) {
return lgba.At(i), true
}
}
return plog.ResourceLogs{}, false
}

// findResource searches for an existing plog.ResourceLogs that strictly matches with the specified reference
// Attributes. Returns the matching plog.ResourceLogs and bool value which is set to true if found
func (sgba spansGroupedByAttrs) findResource(referenceAttributes pcommon.Map) (ptrace.ResourceSpans, bool) {
for i := 0; i < sgba.Len(); i++ {
if resourceMatches(sgba.At(i).Resource(), referenceAttributes) {
return sgba.At(i), true
}
}
return ptrace.ResourceSpans{}, false
}

// findResource searches for an existing pmetric.ResourceMetrics that strictly matches with the specified reference
// Attributes. Returns the matching pmetric.ResourceMetrics and bool value which is set to true if found
func (mgba metricsGroupedByAttrs) findResource(referenceAttributes pcommon.Map) (pmetric.ResourceMetrics, bool) {

for i := 0; i < mgba.Len(); i++ {
if resourceMatches(mgba.At(i).Resource(), referenceAttributes) {
return mgba.At(i), true
}
}
return pmetric.ResourceMetrics{}, false
}

// Update the specified (and new) Resource with the properties of the original Resource, and with the
// required Attributes
func updateResourceToMatch(newResource pcommon.Resource, originResource pcommon.Resource, requiredAttributes pcommon.Map) {

originResource.CopyTo(newResource)

// This prioritizes required attributes over the original resource attributes, if they overlap
Expand All @@ -188,62 +120,61 @@ func updateResourceToMatch(newResource pcommon.Resource, originResource pcommon.
v.CopyTo(attrs.PutEmpty(k))
return true
})

}

// findOrCreateResource searches for a Resource with matching attributes and returns it. If nothing is found, it is being created
func (sgba *spansGroupedByAttrs) findOrCreateResource(originResource pcommon.Resource, requiredAttributes pcommon.Map) ptrace.ResourceSpans {
func findOrCreateResourceSpans(traces ptrace.Traces, originResource pcommon.Resource, requiredAttributes pcommon.Map) ptrace.ResourceSpans {

// Build the reference attributes that we're looking for in Resources
referenceAttributes := buildReferenceAttributes(originResource, requiredAttributes)

// Do we have a matching Resource?
resource, found := sgba.findResource(referenceAttributes)
if found {
return resource
rss := traces.ResourceSpans()
for i := 0; i < rss.Len(); i++ {
if resourceMatches(rss.At(i).Resource(), referenceAttributes) {
return rss.At(i)
}
}

// Not found: create a new resource
resource = sgba.AppendEmpty()
updateResourceToMatch(resource.Resource(), originResource, requiredAttributes)
return resource

// Not found: append a new ResourceSpans.
rs := rss.AppendEmpty()
updateResourceToMatch(rs.Resource(), originResource, requiredAttributes)
return rs
}

// findResourceOrElseCreate searches for a Resource with matching attributes and returns it. If nothing is found, it is being created
func (lgba *logsGroupedByAttrs) findResourceOrElseCreate(originResource pcommon.Resource, requiredAttributes pcommon.Map) plog.ResourceLogs {
// findOrCreateResourceLogs searches for a Resource with matching attributes and returns it. If nothing is found, it is being created
func findOrCreateResourceLogs(logs plog.Logs, originResource pcommon.Resource, requiredAttributes pcommon.Map) plog.ResourceLogs {

// Build the reference attributes that we're looking for in Resources
referenceAttributes := buildReferenceAttributes(originResource, requiredAttributes)

// Do we have a matching Resource?
resource, found := lgba.findResource(referenceAttributes)
if found {
return resource
rls := logs.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
if resourceMatches(rls.At(i).Resource(), referenceAttributes) {
return rls.At(i)
}
}

// Not found: create a new resource
resource = lgba.AppendEmpty()
updateResourceToMatch(resource.Resource(), originResource, requiredAttributes)
return resource

// Not found: append a new ResourceLogs
rl := rls.AppendEmpty()
updateResourceToMatch(rl.Resource(), originResource, requiredAttributes)
return rl
}

// findResourceOrElseCreate searches for a Resource with matching attributes and returns it. If nothing is found, it is being created
func (mgba *metricsGroupedByAttrs) findResourceOrElseCreate(originResource pcommon.Resource, requiredAttributes pcommon.Map) pmetric.ResourceMetrics {
// findOrCreateResourceMetrics searches for a Resource with matching attributes and returns it. If nothing is found, it is being created
func findOrCreateResourceMetrics(metrics pmetric.Metrics, originResource pcommon.Resource, requiredAttributes pcommon.Map) pmetric.ResourceMetrics {

// Build the reference attributes that we're looking for in Resources
referenceAttributes := buildReferenceAttributes(originResource, requiredAttributes)

// Do we have a matching Resource?
resource, found := mgba.findResource(referenceAttributes)
if found {
return resource
rms := metrics.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
if resourceMatches(rms.At(i).Resource(), referenceAttributes) {
return rms.At(i)
}
}

// Not found: create a new resource
resource = mgba.AppendEmpty()
updateResourceToMatch(resource.Resource(), originResource, requiredAttributes)
return resource

// Not found: append a new ResourceMetrics
rm := rms.AppendEmpty()
updateResourceToMatch(rm.Resource(), originResource, requiredAttributes)
return rm
}
16 changes: 10 additions & 6 deletions processor/groupbyattrsprocessor/attribute_groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,9 @@ func randomGroups(count int) []pcommon.Map {
}

var (
count = 1000
groups = randomGroups(count)
res = simpleResource()
lagAttrs = newLogsGroupedByAttrs()
count = 1000
groups = randomGroups(count)
res = simpleResource()
)

func TestResourceAttributeScenarios(t *testing.T) {
Expand Down Expand Up @@ -107,6 +106,7 @@ func TestResourceAttributeScenarios(t *testing.T) {
},
}

logs := plog.NewLogs()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
recordAttributeMap := pcommon.NewMap()
Expand All @@ -119,7 +119,7 @@ func TestResourceAttributeScenarios(t *testing.T) {
tt.fillExpectedResourceFun(tt.baseResource, expectedResource)
}

rl := lagAttrs.findResourceOrElseCreate(tt.baseResource, recordAttributeMap)
rl := findOrCreateResourceLogs(logs, tt.baseResource, recordAttributeMap)
assert.EqualValues(t, expectedResource.Attributes(), rl.Resource().Attributes())
})
}
Expand Down Expand Up @@ -158,5 +158,9 @@ func TestInstrumentationLibraryMatching(t *testing.T) {
}

func BenchmarkAttrGrouping(b *testing.B) {
lagAttrs.findResourceOrElseCreate(res, groups[rand.Intn(count)])
logs := plog.NewLogs()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
findOrCreateResourceLogs(logs, res, groups[rand.Intn(count)])
}
}
37 changes: 15 additions & 22 deletions processor/groupbyattrsprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type groupByAttrsProcessor struct {
// ProcessTraces process traces and groups traces by attribute.
func (gap *groupByAttrsProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) {
rss := td.ResourceSpans()
groupedResourceSpans := newSpansGroupedByAttrs()
groupedTraces := ptrace.NewTraces()

for i := 0; i < rss.Len(); i++ {
rs := rss.At(i)
Expand All @@ -56,24 +56,22 @@ func (gap *groupByAttrsProcessor) processTraces(ctx context.Context, td ptrace.T

// Lets combine the base resource attributes + the extracted (grouped) attributes
// and keep them in the grouping entry
groupedSpans := groupedResourceSpans.findOrCreateResource(rs.Resource(), requiredAttributes)
sp := matchingScopeSpans(groupedSpans, ils.Scope()).Spans().AppendEmpty()
groupedResourceSpans := findOrCreateResourceSpans(groupedTraces, rs.Resource(), requiredAttributes)
sp := matchingScopeSpans(groupedResourceSpans, ils.Scope()).Spans().AppendEmpty()
span.CopyTo(sp)
}
}
}

// Copy the grouped data into output
groupedTraces := ptrace.NewTraces()
groupedResourceSpans.MoveAndAppendTo(groupedTraces.ResourceSpans())
stats.Record(ctx, mDistSpanGroups.M(int64(groupedTraces.ResourceSpans().Len())))

return groupedTraces, nil
}

func (gap *groupByAttrsProcessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) {
rl := ld.ResourceLogs()
groupedResourceLogs := newLogsGroupedByAttrs()
groupedLogs := plog.NewLogs()

for i := 0; i < rl.Len(); i++ {
ls := rl.At(i)
Expand All @@ -96,25 +94,23 @@ func (gap *groupByAttrsProcessor) processLogs(ctx context.Context, ld plog.Logs)

// Lets combine the base resource attributes + the extracted (grouped) attributes
// and keep them in the grouping entry
groupedLogs := groupedResourceLogs.findResourceOrElseCreate(ls.Resource(), requiredAttributes)
lr := matchingScopeLogs(groupedLogs, sl.Scope()).LogRecords().AppendEmpty()
groupedResourceLogs := findOrCreateResourceLogs(groupedLogs, ls.Resource(), requiredAttributes)
lr := matchingScopeLogs(groupedResourceLogs, sl.Scope()).LogRecords().AppendEmpty()
log.CopyTo(lr)
}
}

}

// Copy the grouped data into output
groupedLogs := plog.NewLogs()
groupedResourceLogs.MoveAndAppendTo(groupedLogs.ResourceLogs())
stats.Record(ctx, mDistLogGroups.M(int64(groupedLogs.ResourceLogs().Len())))

return groupedLogs, nil
}

func (gap *groupByAttrsProcessor) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
rms := md.ResourceMetrics()
groupedResourceMetrics := newMetricsGroupedByAttrs()
groupedMetrics := pmetric.NewMetrics()

for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
Expand All @@ -130,35 +126,35 @@ func (gap *groupByAttrsProcessor) processMetrics(ctx context.Context, md pmetric
case pmetric.MetricTypeGauge:
for pointIndex := 0; pointIndex < metric.Gauge().DataPoints().Len(); pointIndex++ {
dataPoint := metric.Gauge().DataPoints().At(pointIndex)
groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, groupedResourceMetrics, rm, ilm, metric, dataPoint.Attributes())
groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, groupedMetrics, rm, ilm, metric, dataPoint.Attributes())
dataPoint.CopyTo(groupedMetric.Gauge().DataPoints().AppendEmpty())
}

case pmetric.MetricTypeSum:
for pointIndex := 0; pointIndex < metric.Sum().DataPoints().Len(); pointIndex++ {
dataPoint := metric.Sum().DataPoints().At(pointIndex)
groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, groupedResourceMetrics, rm, ilm, metric, dataPoint.Attributes())
groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, groupedMetrics, rm, ilm, metric, dataPoint.Attributes())
dataPoint.CopyTo(groupedMetric.Sum().DataPoints().AppendEmpty())
}

case pmetric.MetricTypeSummary:
for pointIndex := 0; pointIndex < metric.Summary().DataPoints().Len(); pointIndex++ {
dataPoint := metric.Summary().DataPoints().At(pointIndex)
groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, groupedResourceMetrics, rm, ilm, metric, dataPoint.Attributes())
groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, groupedMetrics, rm, ilm, metric, dataPoint.Attributes())
dataPoint.CopyTo(groupedMetric.Summary().DataPoints().AppendEmpty())
}

case pmetric.MetricTypeHistogram:
for pointIndex := 0; pointIndex < metric.Histogram().DataPoints().Len(); pointIndex++ {
dataPoint := metric.Histogram().DataPoints().At(pointIndex)
groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, groupedResourceMetrics, rm, ilm, metric, dataPoint.Attributes())
groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, groupedMetrics, rm, ilm, metric, dataPoint.Attributes())
dataPoint.CopyTo(groupedMetric.Histogram().DataPoints().AppendEmpty())
}

case pmetric.MetricTypeExponentialHistogram:
for pointIndex := 0; pointIndex < metric.ExponentialHistogram().DataPoints().Len(); pointIndex++ {
dataPoint := metric.ExponentialHistogram().DataPoints().At(pointIndex)
groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, groupedResourceMetrics, rm, ilm, metric, dataPoint.Attributes())
groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, groupedMetrics, rm, ilm, metric, dataPoint.Attributes())
dataPoint.CopyTo(groupedMetric.ExponentialHistogram().DataPoints().AppendEmpty())
}

Expand All @@ -167,9 +163,6 @@ func (gap *groupByAttrsProcessor) processMetrics(ctx context.Context, md pmetric
}
}

// Copy the grouped data into output
groupedMetrics := pmetric.NewMetrics()
groupedResourceMetrics.MoveAndAppendTo(groupedMetrics.ResourceMetrics())
stats.Record(ctx, mDistMetricGroups.M(int64(groupedMetrics.ResourceMetrics().Len())))

return groupedMetrics, nil
Expand Down Expand Up @@ -248,7 +241,7 @@ func getMetricInInstrumentationLibrary(ilm pmetric.ScopeMetrics, searchedMetric
// Returns the Metric in the appropriate Resource matching with the specified Attributes
func (gap *groupByAttrsProcessor) getGroupedMetricsFromAttributes(
ctx context.Context,
groupedResourceMetrics *metricsGroupedByAttrs,
groupedMetrics pmetric.Metrics,
originResourceMetrics pmetric.ResourceMetrics,
ilm pmetric.ScopeMetrics,
metric pmetric.Metric,
Expand All @@ -266,10 +259,10 @@ func (gap *groupByAttrsProcessor) getGroupedMetricsFromAttributes(
}

// Get the ResourceMetrics matching with these attributes
groupedResource := groupedResourceMetrics.findResourceOrElseCreate(originResourceMetrics.Resource(), requiredAttributes)
groupedResourceMetrics := findOrCreateResourceMetrics(groupedMetrics, originResourceMetrics.Resource(), requiredAttributes)

// Get the corresponding instrumentation library
groupedInstrumentationLibrary := matchingScopeMetrics(groupedResource, ilm.Scope())
groupedInstrumentationLibrary := matchingScopeMetrics(groupedResourceMetrics, ilm.Scope())

// Return the metric in this resource
return getMetricInInstrumentationLibrary(groupedInstrumentationLibrary, metric)
Expand Down

0 comments on commit b60681b

Please sign in to comment.