From 6236d555756a73908c80998014e09dada10921c1 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Thu, 19 Sep 2024 20:08:56 +0000 Subject: [PATCH] store minimal data in normalization cache --- .../datapointstorage/datapointcache.go | 87 ++++++++++++++++++- .../datapointstorage/datapointcache_test.go | 2 +- .../normalization/standard_normalizer.go | 4 +- 3 files changed, 86 insertions(+), 7 deletions(-) diff --git a/exporter/collector/internal/datapointstorage/datapointcache.go b/exporter/collector/internal/datapointstorage/datapointcache.go index 7b01c5789..0d97fe8e9 100644 --- a/exporter/collector/internal/datapointstorage/datapointcache.go +++ b/exporter/collector/internal/datapointstorage/datapointcache.go @@ -93,7 +93,14 @@ func (c *Cache) GetNumberDataPoint(identifier uint64) (pmetric.NumberDataPoint, func (c *Cache) SetNumberDataPoint(identifier uint64, point pmetric.NumberDataPoint) { c.numberLock.Lock() defer c.numberLock.Unlock() - c.numberCache[identifier] = usedNumberPoint{point, atomic.NewBool(true)} + if existing, ok := c.numberCache[identifier]; ok { + existing.used.Store(true) + MinimalNumberDataPointCopyTo(point, existing.point) + return + } + newPoint := pmetric.NewNumberDataPoint() + MinimalNumberDataPointCopyTo(point, newPoint) + c.numberCache[identifier] = usedNumberPoint{newPoint, atomic.NewBool(true)} } // GetSummaryDataPoint retrieves the point associated with the identifier, and whether @@ -112,7 +119,14 @@ func (c *Cache) GetSummaryDataPoint(identifier uint64) (pmetric.SummaryDataPoint func (c *Cache) SetSummaryDataPoint(identifier uint64, point pmetric.SummaryDataPoint) { c.summaryLock.Lock() defer c.summaryLock.Unlock() - c.summaryCache[identifier] = usedSummaryPoint{point, atomic.NewBool(true)} + if existing, ok := c.summaryCache[identifier]; ok { + existing.used.Store(true) + MinimalSummaryDataPointCopyTo(point, existing.point) + return + } + newPoint := pmetric.NewSummaryDataPoint() + MinimalSummaryDataPointCopyTo(point, newPoint) + c.summaryCache[identifier] = usedSummaryPoint{newPoint, atomic.NewBool(true)} } // GetHistogramDataPoint retrieves the point associated with the identifier, and whether @@ -131,7 +145,14 @@ func (c *Cache) GetHistogramDataPoint(identifier uint64) (pmetric.HistogramDataP func (c *Cache) SetHistogramDataPoint(identifier uint64, point pmetric.HistogramDataPoint) { c.histogramLock.Lock() defer c.histogramLock.Unlock() - c.histogramCache[identifier] = usedHistogramPoint{point, atomic.NewBool(true)} + if existing, ok := c.histogramCache[identifier]; ok { + existing.used.Store(true) + MinimalHistogramDataPointCopyTo(point, existing.point) + return + } + newPoint := pmetric.NewHistogramDataPoint() + MinimalHistogramDataPointCopyTo(point, newPoint) + c.histogramCache[identifier] = usedHistogramPoint{newPoint, atomic.NewBool(true)} } // GetExponentialHistogramDataPoint retrieves the point associated with the identifier, and whether @@ -150,7 +171,14 @@ func (c *Cache) GetExponentialHistogramDataPoint(identifier uint64) (pmetric.Exp func (c *Cache) SetExponentialHistogramDataPoint(identifier uint64, point pmetric.ExponentialHistogramDataPoint) { c.exponentialHistogramLock.Lock() defer c.exponentialHistogramLock.Unlock() - c.exponentialHistogramCache[identifier] = usedExponentialHistogramPoint{point, atomic.NewBool(true)} + if existing, ok := c.exponentialHistogramCache[identifier]; ok { + existing.used.Store(true) + MinimalExponentialHistogramDataPointCopyTo(point, existing.point) + return + } + newPoint := pmetric.NewExponentialHistogramDataPoint() + MinimalExponentialHistogramDataPointCopyTo(point, newPoint) + c.exponentialHistogramCache[identifier] = usedExponentialHistogramPoint{newPoint, atomic.NewBool(true)} } // gc garbage collects the cache after the ticker ticks. @@ -251,3 +279,54 @@ func hashOfMap(h hash.Hash64, m map[string]string) { h.Write(kvSep) } } + +// MinimalSummaryDataPointCopyTo is the same as CopyTo for SummaryDataPoint, +// but only copies values required for normalization. +func MinimalSummaryDataPointCopyTo(src, dest pmetric.SummaryDataPoint) { + // We do not copy attributes, start timestamp, flags, quantiles + dest.SetTimestamp(src.Timestamp()) + dest.SetCount(src.Count()) + dest.SetSum(src.Sum()) +} + +// MinimalHistogramDataPointCopyTo is the same as CopyTo for +// HistogramDataPoint, but only copies values required for normalization. +func MinimalHistogramDataPointCopyTo(src, dest pmetric.HistogramDataPoint) { + // We do not copy attributes, start timestamp, flags, exemplars, min, max + dest.SetTimestamp(src.Timestamp()) + dest.SetCount(src.Count()) + src.BucketCounts().CopyTo(dest.BucketCounts()) + src.ExplicitBounds().CopyTo(dest.ExplicitBounds()) + if src.HasSum() { + dest.SetSum(src.Sum()) + } +} + +// MinimalExponentialHistogramDataPointCopyTo is the same as CopyTo for +// ExponentialHistogramDataPoint, but only copies values required for normalization. +func MinimalExponentialHistogramDataPointCopyTo(src, dest pmetric.ExponentialHistogramDataPoint) { + // We do not copy attributes, start timestamp, flags, exemplars, min, max, zero threshold + dest.SetTimestamp(src.Timestamp()) + dest.SetCount(src.Count()) + dest.SetZeroCount(src.ZeroCount()) + dest.SetScale(src.Scale()) + src.Positive().CopyTo(dest.Positive()) + src.Negative().CopyTo(dest.Negative()) + if src.HasSum() { + dest.SetSum(src.Sum()) + } + dest.SetZeroThreshold(src.ZeroThreshold()) +} + +// MinimalNumberDataPointCopyTo is the same as CopyTo for NumberDataPoint, +// but only copies values required for normalization +func MinimalNumberDataPointCopyTo(src, dest pmetric.NumberDataPoint) { + // We do not copy attributes, start timestamp, flags, exemplars + dest.SetTimestamp(src.Timestamp()) + switch src.ValueType() { + case pmetric.NumberDataPointValueTypeDouble: + dest.SetDoubleValue(src.DoubleValue()) + case pmetric.NumberDataPointValueTypeInt: + dest.SetIntValue(src.IntValue()) + } +} diff --git a/exporter/collector/internal/datapointstorage/datapointcache_test.go b/exporter/collector/internal/datapointstorage/datapointcache_test.go index 22415da39..2edf223a1 100644 --- a/exporter/collector/internal/datapointstorage/datapointcache_test.go +++ b/exporter/collector/internal/datapointstorage/datapointcache_test.go @@ -67,7 +67,7 @@ func TestGC(t *testing.T) { fakeTicker := make(chan time.Time) id := uint64(12345) - c.SetNumberDataPoint(id, pmetric.NumberDataPoint{}) + c.SetNumberDataPoint(id, pmetric.NewNumberDataPoint()) // bar exists since we just set it usedPoint, found := c.numberCache[id] diff --git a/exporter/collector/internal/normalization/standard_normalizer.go b/exporter/collector/internal/normalization/standard_normalizer.go index e249aa9c0..9db7d3302 100644 --- a/exporter/collector/internal/normalization/standard_normalizer.go +++ b/exporter/collector/internal/normalization/standard_normalizer.go @@ -129,7 +129,7 @@ func subtractExponentialHistogramDataPoint(a, b pmetric.ExponentialHistogramData a.Negative().BucketCounts().FromRaw(subtractExponentialBuckets(a.Negative(), b.Negative())) } -// subtractExponentialBuckets returns a - b. +// subtractExponentialBuckets subtracts b from a. func subtractExponentialBuckets(a, b pmetric.ExponentialHistogramDataPointBuckets) []uint64 { newBuckets := make([]uint64, a.BucketCounts().Len()) offsetDiff := int(a.Offset() - b.Offset()) @@ -213,7 +213,7 @@ func lessThanHistogramDataPoint(a, b pmetric.HistogramDataPoint) bool { return a.Count() < b.Count() || a.Sum() < b.Sum() } -// subtractHistogramDataPoint returns a - b. +// subtractHistogramDataPoint subtracts b from a. func subtractHistogramDataPoint(a, b pmetric.HistogramDataPoint) { // Use the timestamp from the normalization point a.SetStartTimestamp(b.Timestamp())