From 1f3cec4b2496f56ffca2cb54e18d4ef052aa12f7 Mon Sep 17 00:00:00 2001 From: Vi <1149443+mistodon@users.noreply.github.com> Date: Wed, 21 Sep 2022 19:02:34 +0100 Subject: [PATCH] [processor/cumulativetodelta] Reduce memory consumption for histograms (#13751) (#14007) Currently a MetricIdentity struct is allocated for each count/sum/bucket of a histogram which means that all of the attributes for the datapoint are duplicated across each component within the datapoint. This change bundles the histogram values into one unit (HistogramValue) to be converted, which requires only one MetricIdentity, reducing memory footprint. --- .../internal/tracking/identity.go | 12 -- .../internal/tracking/identity_test.go | 7 +- .../internal/tracking/tracker.go | 61 +++++++--- .../internal/tracking/value.go | 18 +++ .../cumulativetodeltaprocessor/processor.go | 105 ++++-------------- unreleased/issue_13751.yaml | 11 ++ 6 files changed, 100 insertions(+), 114 deletions(-) create mode 100644 unreleased/issue_13751.yaml diff --git a/processor/cumulativetodeltaprocessor/internal/tracking/identity.go b/processor/cumulativetodeltaprocessor/internal/tracking/identity.go index 0c7caa7cd332..7945fecd1c59 100644 --- a/processor/cumulativetodeltaprocessor/internal/tracking/identity.go +++ b/processor/cumulativetodeltaprocessor/internal/tracking/identity.go @@ -32,13 +32,6 @@ type MetricIdentity struct { StartTimestamp pcommon.Timestamp Attributes pcommon.Map MetricValueType pmetric.NumberDataPointValueType - MetricField string -} - -type HistogramIdentities struct { - CountIdentity MetricIdentity - SumIdentity MetricIdentity - BucketIdentities []MetricIdentity } const A = int32('A') @@ -82,11 +75,6 @@ func (mi *MetricIdentity) Write(b *bytes.Buffer) { }) b.WriteByte(SEP) b.WriteString(strconv.FormatInt(int64(mi.StartTimestamp), 36)) - - if mi.MetricField != "" { - b.WriteByte(SEP) - b.WriteString(mi.MetricField) - } } func (mi *MetricIdentity) IsFloatVal() bool { diff --git a/processor/cumulativetodeltaprocessor/internal/tracking/identity_test.go b/processor/cumulativetodeltaprocessor/internal/tracking/identity_test.go index 43e19a4fdd76..a1f11b49d4e7 100644 --- a/processor/cumulativetodeltaprocessor/internal/tracking/identity_test.go +++ b/processor/cumulativetodeltaprocessor/internal/tracking/identity_test.go @@ -43,7 +43,6 @@ func TestMetricIdentity_Write(t *testing.T) { StartTimestamp pcommon.Timestamp Attributes pcommon.Map MetricValueType pmetric.NumberDataPointValueType - MetricField string } tests := []struct { name string @@ -74,16 +73,15 @@ func TestMetricIdentity_Write(t *testing.T) { want: []string{"C" + SEPSTR + "B", "Y"}, }, { - name: "histogram sum", + name: "histogram", fields: fields{ Resource: resource, InstrumentationLibrary: il, Attributes: attributes, MetricDataType: pmetric.MetricDataTypeHistogram, MetricValueType: pmetric.NumberDataPointValueTypeInt, - MetricField: "bound_100", }, - want: []string{"D" + SEPSTR + "B", "bound_100"}, + want: []string{"D" + SEPSTR + "B"}, }, } for _, tt := range tests { @@ -98,7 +96,6 @@ func TestMetricIdentity_Write(t *testing.T) { StartTimestamp: tt.fields.StartTimestamp, Attributes: tt.fields.Attributes, MetricValueType: tt.fields.MetricValueType, - MetricField: tt.fields.MetricField, } b := &bytes.Buffer{} mi.Write(b) diff --git a/processor/cumulativetodeltaprocessor/internal/tracking/tracker.go b/processor/cumulativetodeltaprocessor/internal/tracking/tracker.go index 36b81c595485..17786bd4c20d 100644 --- a/processor/cumulativetodeltaprocessor/internal/tracking/tracker.go +++ b/processor/cumulativetodeltaprocessor/internal/tracking/tracker.go @@ -22,6 +22,7 @@ import ( "time" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" ) @@ -44,6 +45,7 @@ type DeltaValue struct { StartTimestamp pcommon.Timestamp FloatValue float64 IntValue int64 + HistogramValue *HistogramPoint } func NewMetricTracker(ctx context.Context, logger *zap.Logger, maxStaleness time.Duration) *MetricTracker { @@ -95,6 +97,7 @@ func (t *MetricTracker) Convert(in MetricPoint) (out DeltaValue, valid bool) { StartTimestamp: metricPoint.ObservedTimestamp, FloatValue: metricPoint.FloatValue, IntValue: metricPoint.IntValue, + HistogramValue: metricPoint.HistogramValue, } valid = true } @@ -108,28 +111,54 @@ func (t *MetricTracker) Convert(in MetricPoint) (out DeltaValue, valid bool) { out.StartTimestamp = state.PrevPoint.ObservedTimestamp - if metricID.IsFloatVal() { - value := metricPoint.FloatValue - prevValue := state.PrevPoint.FloatValue - delta := value - prevValue + switch metricID.MetricDataType { + case pmetric.MetricDataTypeHistogram: + value := metricPoint.HistogramValue + prevValue := state.PrevPoint.HistogramValue + if math.IsNaN(value.Sum) { + value.Sum = prevValue.Sum + } - // Detect reset on a monotonic counter - if metricID.MetricIsMonotonic && value < prevValue { - delta = value + if len(value.Buckets) != len(prevValue.Buckets) { + valid = false } - out.FloatValue = delta - } else { - value := metricPoint.IntValue - prevValue := state.PrevPoint.IntValue - delta := value - prevValue + delta := value.Clone() - // Detect reset on a monotonic counter - if metricID.MetricIsMonotonic && value < prevValue { - delta = value + // Calculate deltas unless histogram count was reset + if valid && delta.Count >= prevValue.Count { + delta.Count -= prevValue.Count + delta.Sum -= prevValue.Sum + for index, prevBucket := range prevValue.Buckets { + delta.Buckets[index] -= prevBucket + } } - out.IntValue = delta + out.HistogramValue = &delta + case pmetric.MetricDataTypeSum: + if metricID.IsFloatVal() { + value := metricPoint.FloatValue + prevValue := state.PrevPoint.FloatValue + delta := value - prevValue + + // Detect reset on a monotonic counter + if metricID.MetricIsMonotonic && value < prevValue { + delta = value + } + + out.FloatValue = delta + } else { + value := metricPoint.IntValue + prevValue := state.PrevPoint.IntValue + delta := value - prevValue + + // Detect reset on a monotonic counter + if metricID.MetricIsMonotonic && value < prevValue { + delta = value + } + + out.IntValue = delta + } } state.PrevPoint = metricPoint diff --git a/processor/cumulativetodeltaprocessor/internal/tracking/value.go b/processor/cumulativetodeltaprocessor/internal/tracking/value.go index 94f55907c243..adafcda01b98 100644 --- a/processor/cumulativetodeltaprocessor/internal/tracking/value.go +++ b/processor/cumulativetodeltaprocessor/internal/tracking/value.go @@ -20,4 +20,22 @@ type ValuePoint struct { ObservedTimestamp pcommon.Timestamp FloatValue float64 IntValue int64 + HistogramValue *HistogramPoint +} + +type HistogramPoint struct { + Count uint64 + Sum float64 + Buckets []uint64 +} + +func (point *HistogramPoint) Clone() HistogramPoint { + bucketValues := make([]uint64, len(point.Buckets)) + copy(bucketValues, point.Buckets) + + return HistogramPoint{ + Count: point.Count, + Sum: point.Sum, + Buckets: bucketValues, + } } diff --git a/processor/cumulativetodeltaprocessor/processor.go b/processor/cumulativetodeltaprocessor/processor.go index 978cc4e2bcbd..c418dd61cdf8 100644 --- a/processor/cumulativetodeltaprocessor/processor.go +++ b/processor/cumulativetodeltaprocessor/processor.go @@ -16,10 +16,8 @@ package cumulativetodeltaprocessor // import "github.com/open-telemetry/opentele import ( "context" - "fmt" "math" - "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/service/featuregate" "go.uber.org/zap" @@ -114,7 +112,7 @@ func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pme return false } - countIdentity := tracking.MetricIdentity{ + baseIdentity := tracking.MetricIdentity{ Resource: rm.Resource(), InstrumentationLibrary: ilm.Scope(), MetricDataType: m.DataType(), @@ -122,22 +120,9 @@ func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pme MetricUnit: m.Unit(), MetricIsMonotonic: true, MetricValueType: pmetric.NumberDataPointValueTypeInt, - MetricField: "count", } - sumIdentity := countIdentity - sumIdentity.MetricField = "sum" - sumIdentity.MetricValueType = pmetric.NumberDataPointValueTypeDouble - - bucketIdentities := makeBucketIdentities(countIdentity, ms.DataPoints().At(0)) - - histogramIdentities := tracking.HistogramIdentities{ - CountIdentity: countIdentity, - SumIdentity: sumIdentity, - BucketIdentities: bucketIdentities, - } - - ctdp.convertHistogramDataPoints(ms.DataPoints(), &histogramIdentities) + ctdp.convertHistogramDataPoints(ms.DataPoints(), baseIdentity) ms.SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta) return ms.DataPoints().Len() == 0 @@ -152,19 +137,6 @@ func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pme return md, nil } -func makeBucketIdentities(baseIdentity tracking.MetricIdentity, dp pmetric.HistogramDataPoint) []tracking.MetricIdentity { - numBuckets := dp.BucketCounts().Len() - bucketIdentities := make([]tracking.MetricIdentity, numBuckets) - - for index := 0; index < numBuckets; index++ { - bucketIdentity := baseIdentity - bucketIdentity.MetricField = fmt.Sprintf("bucket_%d", index) - bucketIdentities[index] = bucketIdentity - } - - return bucketIdentities -} - func (ctdp *cumulativeToDeltaProcessor) shutdown(context.Context) error { ctdp.cancelFunc() return nil @@ -175,32 +147,6 @@ func (ctdp *cumulativeToDeltaProcessor) shouldConvertMetric(metricName string) b (ctdp.excludeFS == nil || !ctdp.excludeFS.Matches(metricName)) } -func (ctdp *cumulativeToDeltaProcessor) convertHistogramFloatValue(id tracking.MetricIdentity, dp pmetric.HistogramDataPoint, value float64) (tracking.DeltaValue, bool) { - id.StartTimestamp = dp.StartTimestamp() - id.Attributes = dp.Attributes() - trackingPoint := tracking.MetricPoint{ - Identity: id, - Value: tracking.ValuePoint{ - ObservedTimestamp: dp.Timestamp(), - FloatValue: value, - }, - } - return ctdp.deltaCalculator.Convert(trackingPoint) -} - -func (ctdp *cumulativeToDeltaProcessor) convertHistogramIntValue(id tracking.MetricIdentity, dp pmetric.HistogramDataPoint, value int64) (tracking.DeltaValue, bool) { - id.StartTimestamp = dp.StartTimestamp() - id.Attributes = dp.Attributes() - trackingPoint := tracking.MetricPoint{ - Identity: id, - Value: tracking.ValuePoint{ - ObservedTimestamp: dp.Timestamp(), - IntValue: value, - }, - } - return ctdp.deltaCalculator.Convert(trackingPoint) -} - func (ctdp *cumulativeToDeltaProcessor) convertDataPoints(in interface{}, baseIdentity tracking.MetricIdentity) { if dps, ok := in.(pmetric.NumberDataPointSlice); ok { @@ -244,43 +190,40 @@ func (ctdp *cumulativeToDeltaProcessor) convertDataPoints(in interface{}, baseId } } -func (ctdp *cumulativeToDeltaProcessor) convertHistogramDataPoints(in interface{}, baseIdentities *tracking.HistogramIdentities) { +func (ctdp *cumulativeToDeltaProcessor) convertHistogramDataPoints(in interface{}, baseIdentity tracking.MetricIdentity) { if dps, ok := in.(pmetric.HistogramDataPointSlice); ok { dps.RemoveIf(func(dp pmetric.HistogramDataPoint) bool { - countID := baseIdentities.CountIdentity - countDelta, countValid := ctdp.convertHistogramIntValue(countID, dp, int64(dp.Count())) - - hasSum := dp.HasSum() && !math.IsNaN(dp.Sum()) - sumDelta, sumValid := tracking.DeltaValue{}, true + id := baseIdentity + id.StartTimestamp = dp.StartTimestamp() + id.Attributes = dp.Attributes() - if hasSum { - sumID := baseIdentities.SumIdentity - sumDelta, sumValid = ctdp.convertHistogramFloatValue(sumID, dp, dp.Sum()) + point := tracking.ValuePoint{ + ObservedTimestamp: dp.Timestamp(), + HistogramValue: &tracking.HistogramPoint{ + Count: dp.Count(), + Sum: dp.Sum(), + Buckets: dp.BucketCounts().AsRaw(), + }, } - bucketsValid := true - updatedBucketCounts := pcommon.NewUInt64Slice() - dp.BucketCounts().CopyTo(updatedBucketCounts) - for index := 0; index < updatedBucketCounts.Len(); index++ { - bucketID := baseIdentities.BucketIdentities[index] - bucketDelta, bucketValid := ctdp.convertHistogramIntValue(bucketID, dp, - int64(updatedBucketCounts.At(index))) - updatedBucketCounts.SetAt(index, uint64(bucketDelta.IntValue)) - bucketsValid = bucketsValid && bucketValid + trackingPoint := tracking.MetricPoint{ + Identity: id, + Value: point, } + delta, valid := ctdp.deltaCalculator.Convert(trackingPoint) - if countValid && sumValid && bucketsValid { - dp.SetStartTimestamp(countDelta.StartTimestamp) - dp.SetCount(uint64(countDelta.IntValue)) - if hasSum { - dp.SetSum(sumDelta.FloatValue) + if valid { + dp.SetStartTimestamp(delta.StartTimestamp) + dp.SetCount(delta.HistogramValue.Count) + if dp.HasSum() && !math.IsNaN(dp.Sum()) { + dp.SetSum(delta.HistogramValue.Sum) } - updatedBucketCounts.MoveTo(dp.BucketCounts()) + dp.BucketCounts().FromRaw(delta.HistogramValue.Buckets) return false } - return true + return !valid }) } } diff --git a/unreleased/issue_13751.yaml b/unreleased/issue_13751.yaml new file mode 100644 index 000000000000..b8673b9fd7b0 --- /dev/null +++ b/unreleased/issue_13751.yaml @@ -0,0 +1,11 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: cumulativetodeltaprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Reduce memory consumption of histograms in cumulativetodeltaprocessor by allocating only a single identity per datapoint. + +# One or more tracking issues related to the change +issues: [13751]