From 79b9a1af605a6a90be069c4eb734ebf510f489da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gy=C3=B6rgy=20Krajcsovits?= Date: Sat, 8 Jul 2023 01:41:55 +0200 Subject: [PATCH] Implement scaledown MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: György Krajcsovits --- .../prometheusremotewrite/histograms.go | 69 ++++++++++++++----- .../prometheusremotewrite/histograms_test.go | 64 ++++++++--------- 2 files changed, 84 insertions(+), 49 deletions(-) diff --git a/pkg/translator/prometheusremotewrite/histograms.go b/pkg/translator/prometheusremotewrite/histograms.go index 34ce65729191..e2c1906afcea 100644 --- a/pkg/translator/prometheusremotewrite/histograms.go +++ b/pkg/translator/prometheusremotewrite/histograms.go @@ -58,15 +58,20 @@ func addSingleExponentialHistogramDataPoint( // to Prometheus Native Histogram. func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prompb.Histogram, error) { scale := p.Scale() - if scale < -4 || scale > 8 { + if scale < -4 { return prompb.Histogram{}, fmt.Errorf("cannot convert exponential to native histogram."+ - " Scale must be <= 8 and >= -4, was %d", scale) - // TODO: downscale to 8 if scale > 8 + " Scale must be >= -4, was %d", scale) } - pSpans, pDeltas := convertBucketsLayout(p.Positive(), 1) - nSpans, nDeltas := convertBucketsLayout(p.Negative(), 1) + var scaleDown int32 + if scale > 8 { + scaleDown = scale - 8 + scale = 8 + } + + pSpans, pDeltas := convertBucketsLayout(p.Positive(), scaleDown) + nSpans, nDeltas := convertBucketsLayout(p.Negative(), scaleDown) h := prompb.Histogram{ Schema: scale, @@ -105,8 +110,8 @@ func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prom // index 0 corresponds to the range (1, base] while Prometheus bucket index 0 // to the range (base 1]. // -// scaleMerge is the number of buckets to merge into a single bucket - must be power of 2 -func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets, _ int32) ([]prompb.BucketSpan, []int64) { +// scaleDown is the factor by which the buckets are scaled down. In other words 2^scaleDown buckets will be merged into one. +func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets, scaleDown int32) ([]prompb.BucketSpan, []int64) { bucketCounts := buckets.BucketCounts() if bucketCounts.Len() == 0 { return nil, nil @@ -115,6 +120,7 @@ func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets, var ( spans []prompb.BucketSpan deltas []int64 + count int64 prevCount int64 nextBucketIdx int32 ) @@ -125,34 +131,63 @@ func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets, prevCount = count } + // The offset is scaled and adjusted by 1 as described above. + bucketIdx := buckets.Offset()>>scaleDown + 1 + spans = append(spans, prompb.BucketSpan{ + Offset: bucketIdx, + Length: 0, + }) + for i := 0; i < bucketCounts.Len(); i++ { - count := int64(bucketCounts.At(i)) + // The offset is scaled and adjusted by 1 as described above. + nextBucketIdx = (int32(i)+buckets.Offset())>>scaleDown + 1 + if bucketIdx == nextBucketIdx { // we have not collected enough buckets to merge yet + count += int64(bucketCounts.At(i)) + continue + } if count == 0 { + count = int64(bucketCounts.At(i)) continue } - // The offset is adjusted by 1 as described above. - bucketIdx := int32(i) + buckets.Offset() + 1 - delta := bucketIdx - nextBucketIdx - if i == 0 || delta > 2 { - // We have to create a new span, either because we are - // at the very beginning, or because we have found a gap + gap := nextBucketIdx - bucketIdx - 1 + if gap > 2 { + // We have to create a new span, either because we have found a gap // of more than two buckets. The constant 2 is copied from the logic in // https://github.com/prometheus/client_golang/blob/27f0506d6ebbb117b6b697d0552ee5be2502c5f2/prometheus/histogram.go#L1296 spans = append(spans, prompb.BucketSpan{ - Offset: delta, + Offset: gap, Length: 0, }) } else { // We have found a small gap (or no gap at all). // Insert empty buckets as needed. - for j := int32(0); j < delta; j++ { + for j := int32(0); j < gap; j++ { appendDelta(0) } } appendDelta(count) - nextBucketIdx = bucketIdx + 1 + count = int64(bucketCounts.At(i)) + bucketIdx = nextBucketIdx + } + // nextBucketIdx is the last index, not the next one, hence no need to deduct 1 + gap := nextBucketIdx - bucketIdx + if gap > 2 { + // We have to create a new span, because we have found a gap + // of more than two buckets. The constant 2 is copied from the logic in + // https://github.com/prometheus/client_golang/blob/27f0506d6ebbb117b6b697d0552ee5be2502c5f2/prometheus/histogram.go#L1296 + spans = append(spans, prompb.BucketSpan{ + Offset: gap, + Length: 0, + }) + } else { + // We have found a small gap (or no gap at all). + // Insert empty buckets as needed. + for j := int32(0); j < gap; j++ { + appendDelta(0) + } } + appendDelta(count) return spans, deltas } diff --git a/pkg/translator/prometheusremotewrite/histograms_test.go b/pkg/translator/prometheusremotewrite/histograms_test.go index 94335b734bfd..31a4ee581fca 100644 --- a/pkg/translator/prometheusremotewrite/histograms_test.go +++ b/pkg/translator/prometheusremotewrite/histograms_test.go @@ -38,7 +38,7 @@ func TestConvertBucketsLayout(t *testing.T) { return b }, wantLayout: map[int32]expectedBucketLayout{ - 1: { + 0: { wantSpans: []prompb.BucketSpan{ { Offset: 1, @@ -47,7 +47,7 @@ func TestConvertBucketsLayout(t *testing.T) { }, wantDeltas: []int64{4, -1, -1, -1}, }, - 2: { + 1: { wantSpans: []prompb.BucketSpan{ { Offset: 1, @@ -55,9 +55,9 @@ func TestConvertBucketsLayout(t *testing.T) { }, }, // 4+3, 2+1 = 7, 3 =delta= 7, -4 - wantDeltas: []int64{4, -4}, + wantDeltas: []int64{7, -4}, }, - 4: { + 2: { wantSpans: []prompb.BucketSpan{ { Offset: 1, @@ -78,7 +78,7 @@ func TestConvertBucketsLayout(t *testing.T) { return b }, wantLayout: map[int32]expectedBucketLayout{ - 1: { + 0: { wantSpans: []prompb.BucketSpan{ { Offset: 2, @@ -87,7 +87,7 @@ func TestConvertBucketsLayout(t *testing.T) { }, wantDeltas: []int64{4, -1, -1, -1}, }, - 2: { + 1: { wantSpans: []prompb.BucketSpan{ { Offset: 1, @@ -96,7 +96,7 @@ func TestConvertBucketsLayout(t *testing.T) { }, wantDeltas: []int64{4, 1, -4}, // 0+4, 3+2, 1+0 = 4, 5, 1 }, - 4: { + 2: { wantSpans: []prompb.BucketSpan{ { Offset: 1, @@ -116,7 +116,7 @@ func TestConvertBucketsLayout(t *testing.T) { return b }, wantLayout: map[int32]expectedBucketLayout{ - 1: { + 0: { wantSpans: []prompb.BucketSpan{ { Offset: 5, @@ -129,7 +129,7 @@ func TestConvertBucketsLayout(t *testing.T) { }, wantDeltas: []int64{4, -2, -2, 2, -1}, }, - 2: { + 1: { wantSpans: []prompb.BucketSpan{ { Offset: 3, @@ -144,7 +144,7 @@ func TestConvertBucketsLayout(t *testing.T) { // 4+2, 0+2, 0+0, 0+0, 0+0, 0+0, 0+0, 0+0, 1+0 = 6, 2, 0, 0, 0, 0, 0, 0, 1 wantDeltas: []int64{6, -4, -1}, }, - 4: { + 2: { wantSpans: []prompb.BucketSpan{ { Offset: 2, @@ -171,7 +171,7 @@ func TestConvertBucketsLayout(t *testing.T) { return b }, wantLayout: map[int32]expectedBucketLayout{ - 1: { + 0: { wantSpans: []prompb.BucketSpan{ { Offset: 5, @@ -184,10 +184,10 @@ func TestConvertBucketsLayout(t *testing.T) { }, wantDeltas: []int64{4, -2, -2, 2, -1}, }, - 2: { + 1: { wantSpans: []prompb.BucketSpan{ { - Offset: 2, + Offset: 3, Length: 2, }, { @@ -199,7 +199,7 @@ func TestConvertBucketsLayout(t *testing.T) { // 4+2, 0+2, 0+0, 0+0, 0+0, 0+0, 1+0 = 6, 2, 0, 0, 0, 0, 1 wantDeltas: []int64{6, -4, -1}, }, - 4: { + 2: { wantSpans: []prompb.BucketSpan{ { Offset: 2, @@ -222,7 +222,7 @@ func TestConvertBucketsLayout(t *testing.T) { return b }, wantLayout: map[int32]expectedBucketLayout{ - 1: { + 0: { wantSpans: []prompb.BucketSpan{ { Offset: -1, @@ -235,10 +235,10 @@ func TestConvertBucketsLayout(t *testing.T) { }, wantDeltas: []int64{3, -2, 0}, }, - 2: { + 1: { wantSpans: []prompb.BucketSpan{ { - Offset: -1, + Offset: 0, Length: 3, }, }, @@ -246,10 +246,10 @@ func TestConvertBucketsLayout(t *testing.T) { // 3+1, 0+0, 0+1 = 4, 0, 1 wantDeltas: []int64{4, -4, 1}, }, - 4: { + 2: { wantSpans: []prompb.BucketSpan{ { - Offset: -1, + Offset: 0, Length: 2, }, }, @@ -268,7 +268,7 @@ func TestConvertBucketsLayout(t *testing.T) { return b }, wantLayout: map[int32]expectedBucketLayout{ - 1: { + 0: { wantSpans: []prompb.BucketSpan{ { Offset: -1, @@ -277,10 +277,10 @@ func TestConvertBucketsLayout(t *testing.T) { }, wantDeltas: []int64{3, -2, -1, 1, -1, 1}, }, - 2: { + 1: { wantSpans: []prompb.BucketSpan{ { - Offset: -1, + Offset: 0, Length: 3, }, }, @@ -288,10 +288,10 @@ func TestConvertBucketsLayout(t *testing.T) { // 3+1, 0+1, 0+1 = 4, 1, 1 wantDeltas: []int64{4, -3, 0}, }, - 4: { + 2: { wantSpans: []prompb.BucketSpan{ { - Offset: -1, + Offset: 0, Length: 2, }, }, @@ -310,7 +310,7 @@ func TestConvertBucketsLayout(t *testing.T) { return b }, wantLayout: map[int32]expectedBucketLayout{ - 1: { + 0: { wantSpans: []prompb.BucketSpan{ { Offset: -1, @@ -319,10 +319,10 @@ func TestConvertBucketsLayout(t *testing.T) { }, wantDeltas: []int64{3, -3, 0, 1, -1, 0, 1}, }, - 2: { + 1: { wantSpans: []prompb.BucketSpan{ { - Offset: -1, + Offset: 0, Length: 4, }, }, @@ -330,10 +330,10 @@ func TestConvertBucketsLayout(t *testing.T) { // 3+0, 0+1, 0+0, 0+1 = 3, 1, 0, 1 wantDeltas: []int64{3, -2, -1, 1}, }, - 4: { + 2: { wantSpans: []prompb.BucketSpan{ { - Offset: -1, + Offset: 0, Length: 3, }, }, @@ -347,15 +347,15 @@ func TestConvertBucketsLayout(t *testing.T) { name: "zero buckets", buckets: pmetric.NewExponentialHistogramDataPointBuckets, wantLayout: map[int32]expectedBucketLayout{ - 1: { + 0: { wantSpans: nil, wantDeltas: nil, }, - 2: { + 1: { wantSpans: nil, wantDeltas: nil, }, - 4: { + 2: { wantSpans: nil, wantDeltas: nil, },