Skip to content

Commit

Permalink
Implement scaledown
Browse files Browse the repository at this point in the history
Signed-off-by: György Krajcsovits <[email protected]>
  • Loading branch information
krajorama authored and jpkrohling committed Jul 12, 2023
1 parent a7e0f92 commit 79b9a1a
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 49 deletions.
69 changes: 52 additions & 17 deletions pkg/translator/prometheusremotewrite/histograms.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,20 @@ func addSingleExponentialHistogramDataPoint(
// to Prometheus Native Histogram.
func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prompb.Histogram, error) {
scale := p.Scale()
if scale < -4 || scale > 8 {
if scale < -4 {
return prompb.Histogram{},
fmt.Errorf("cannot convert exponential to native histogram."+
" Scale must be <= 8 and >= -4, was %d", scale)
// TODO: downscale to 8 if scale > 8
" Scale must be >= -4, was %d", scale)
}

pSpans, pDeltas := convertBucketsLayout(p.Positive(), 1)
nSpans, nDeltas := convertBucketsLayout(p.Negative(), 1)
var scaleDown int32
if scale > 8 {
scaleDown = scale - 8
scale = 8
}

pSpans, pDeltas := convertBucketsLayout(p.Positive(), scaleDown)
nSpans, nDeltas := convertBucketsLayout(p.Negative(), scaleDown)

h := prompb.Histogram{
Schema: scale,
Expand Down Expand Up @@ -105,8 +110,8 @@ func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prom
// index 0 corresponds to the range (1, base] while Prometheus bucket index 0
// to the range (base 1].
//
// scaleMerge is the number of buckets to merge into a single bucket - must be power of 2
func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets, _ int32) ([]prompb.BucketSpan, []int64) {
// scaleDown is the factor by which the buckets are scaled down. In other words 2^scaleDown buckets will be merged into one.
func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets, scaleDown int32) ([]prompb.BucketSpan, []int64) {
bucketCounts := buckets.BucketCounts()
if bucketCounts.Len() == 0 {
return nil, nil
Expand All @@ -115,6 +120,7 @@ func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets,
var (
spans []prompb.BucketSpan
deltas []int64
count int64
prevCount int64
nextBucketIdx int32
)
Expand All @@ -125,34 +131,63 @@ func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets,
prevCount = count
}

// The offset is scaled and adjusted by 1 as described above.
bucketIdx := buckets.Offset()>>scaleDown + 1
spans = append(spans, prompb.BucketSpan{
Offset: bucketIdx,
Length: 0,
})

for i := 0; i < bucketCounts.Len(); i++ {
count := int64(bucketCounts.At(i))
// The offset is scaled and adjusted by 1 as described above.
nextBucketIdx = (int32(i)+buckets.Offset())>>scaleDown + 1
if bucketIdx == nextBucketIdx { // we have not collected enough buckets to merge yet
count += int64(bucketCounts.At(i))
continue
}
if count == 0 {
count = int64(bucketCounts.At(i))
continue
}

// The offset is adjusted by 1 as described above.
bucketIdx := int32(i) + buckets.Offset() + 1
delta := bucketIdx - nextBucketIdx
if i == 0 || delta > 2 {
// We have to create a new span, either because we are
// at the very beginning, or because we have found a gap
gap := nextBucketIdx - bucketIdx - 1
if gap > 2 {
// We have to create a new span, either because we have found a gap
// of more than two buckets. The constant 2 is copied from the logic in
// https://github.com/prometheus/client_golang/blob/27f0506d6ebbb117b6b697d0552ee5be2502c5f2/prometheus/histogram.go#L1296
spans = append(spans, prompb.BucketSpan{
Offset: delta,
Offset: gap,
Length: 0,
})
} else {
// We have found a small gap (or no gap at all).
// Insert empty buckets as needed.
for j := int32(0); j < delta; j++ {
for j := int32(0); j < gap; j++ {
appendDelta(0)
}
}
appendDelta(count)
nextBucketIdx = bucketIdx + 1
count = int64(bucketCounts.At(i))
bucketIdx = nextBucketIdx
}
// nextBucketIdx is the last index, not the next one, hence no need to deduct 1
gap := nextBucketIdx - bucketIdx
if gap > 2 {
// We have to create a new span, because we have found a gap
// of more than two buckets. The constant 2 is copied from the logic in
// https://github.com/prometheus/client_golang/blob/27f0506d6ebbb117b6b697d0552ee5be2502c5f2/prometheus/histogram.go#L1296
spans = append(spans, prompb.BucketSpan{
Offset: gap,
Length: 0,
})
} else {
// We have found a small gap (or no gap at all).
// Insert empty buckets as needed.
for j := int32(0); j < gap; j++ {
appendDelta(0)
}
}
appendDelta(count)

return spans, deltas
}
64 changes: 32 additions & 32 deletions pkg/translator/prometheusremotewrite/histograms_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestConvertBucketsLayout(t *testing.T) {
return b
},
wantLayout: map[int32]expectedBucketLayout{
1: {
0: {
wantSpans: []prompb.BucketSpan{
{
Offset: 1,
Expand All @@ -47,17 +47,17 @@ func TestConvertBucketsLayout(t *testing.T) {
},
wantDeltas: []int64{4, -1, -1, -1},
},
2: {
1: {
wantSpans: []prompb.BucketSpan{
{
Offset: 1,
Length: 2,
},
},
// 4+3, 2+1 = 7, 3 =delta= 7, -4
wantDeltas: []int64{4, -4},
wantDeltas: []int64{7, -4},
},
4: {
2: {
wantSpans: []prompb.BucketSpan{
{
Offset: 1,
Expand All @@ -78,7 +78,7 @@ func TestConvertBucketsLayout(t *testing.T) {
return b
},
wantLayout: map[int32]expectedBucketLayout{
1: {
0: {
wantSpans: []prompb.BucketSpan{
{
Offset: 2,
Expand All @@ -87,7 +87,7 @@ func TestConvertBucketsLayout(t *testing.T) {
},
wantDeltas: []int64{4, -1, -1, -1},
},
2: {
1: {
wantSpans: []prompb.BucketSpan{
{
Offset: 1,
Expand All @@ -96,7 +96,7 @@ func TestConvertBucketsLayout(t *testing.T) {
},
wantDeltas: []int64{4, 1, -4}, // 0+4, 3+2, 1+0 = 4, 5, 1
},
4: {
2: {
wantSpans: []prompb.BucketSpan{
{
Offset: 1,
Expand All @@ -116,7 +116,7 @@ func TestConvertBucketsLayout(t *testing.T) {
return b
},
wantLayout: map[int32]expectedBucketLayout{
1: {
0: {
wantSpans: []prompb.BucketSpan{
{
Offset: 5,
Expand All @@ -129,7 +129,7 @@ func TestConvertBucketsLayout(t *testing.T) {
},
wantDeltas: []int64{4, -2, -2, 2, -1},
},
2: {
1: {
wantSpans: []prompb.BucketSpan{
{
Offset: 3,
Expand All @@ -144,7 +144,7 @@ func TestConvertBucketsLayout(t *testing.T) {
// 4+2, 0+2, 0+0, 0+0, 0+0, 0+0, 0+0, 0+0, 1+0 = 6, 2, 0, 0, 0, 0, 0, 0, 1
wantDeltas: []int64{6, -4, -1},
},
4: {
2: {
wantSpans: []prompb.BucketSpan{
{
Offset: 2,
Expand All @@ -171,7 +171,7 @@ func TestConvertBucketsLayout(t *testing.T) {
return b
},
wantLayout: map[int32]expectedBucketLayout{
1: {
0: {
wantSpans: []prompb.BucketSpan{
{
Offset: 5,
Expand All @@ -184,10 +184,10 @@ func TestConvertBucketsLayout(t *testing.T) {
},
wantDeltas: []int64{4, -2, -2, 2, -1},
},
2: {
1: {
wantSpans: []prompb.BucketSpan{
{
Offset: 2,
Offset: 3,
Length: 2,
},
{
Expand All @@ -199,7 +199,7 @@ func TestConvertBucketsLayout(t *testing.T) {
// 4+2, 0+2, 0+0, 0+0, 0+0, 0+0, 1+0 = 6, 2, 0, 0, 0, 0, 1
wantDeltas: []int64{6, -4, -1},
},
4: {
2: {
wantSpans: []prompb.BucketSpan{
{
Offset: 2,
Expand All @@ -222,7 +222,7 @@ func TestConvertBucketsLayout(t *testing.T) {
return b
},
wantLayout: map[int32]expectedBucketLayout{
1: {
0: {
wantSpans: []prompb.BucketSpan{
{
Offset: -1,
Expand All @@ -235,21 +235,21 @@ func TestConvertBucketsLayout(t *testing.T) {
},
wantDeltas: []int64{3, -2, 0},
},
2: {
1: {
wantSpans: []prompb.BucketSpan{
{
Offset: -1,
Offset: 0,
Length: 3,
},
},
// Downscale:
// 3+1, 0+0, 0+1 = 4, 0, 1
wantDeltas: []int64{4, -4, 1},
},
4: {
2: {
wantSpans: []prompb.BucketSpan{
{
Offset: -1,
Offset: 0,
Length: 2,
},
},
Expand All @@ -268,7 +268,7 @@ func TestConvertBucketsLayout(t *testing.T) {
return b
},
wantLayout: map[int32]expectedBucketLayout{
1: {
0: {
wantSpans: []prompb.BucketSpan{
{
Offset: -1,
Expand All @@ -277,21 +277,21 @@ func TestConvertBucketsLayout(t *testing.T) {
},
wantDeltas: []int64{3, -2, -1, 1, -1, 1},
},
2: {
1: {
wantSpans: []prompb.BucketSpan{
{
Offset: -1,
Offset: 0,
Length: 3,
},
},
// Downscale:
// 3+1, 0+1, 0+1 = 4, 1, 1
wantDeltas: []int64{4, -3, 0},
},
4: {
2: {
wantSpans: []prompb.BucketSpan{
{
Offset: -1,
Offset: 0,
Length: 2,
},
},
Expand All @@ -310,7 +310,7 @@ func TestConvertBucketsLayout(t *testing.T) {
return b
},
wantLayout: map[int32]expectedBucketLayout{
1: {
0: {
wantSpans: []prompb.BucketSpan{
{
Offset: -1,
Expand All @@ -319,21 +319,21 @@ func TestConvertBucketsLayout(t *testing.T) {
},
wantDeltas: []int64{3, -3, 0, 1, -1, 0, 1},
},
2: {
1: {
wantSpans: []prompb.BucketSpan{
{
Offset: -1,
Offset: 0,
Length: 4,
},
},
// Downscale:
// 3+0, 0+1, 0+0, 0+1 = 3, 1, 0, 1
wantDeltas: []int64{3, -2, -1, 1},
},
4: {
2: {
wantSpans: []prompb.BucketSpan{
{
Offset: -1,
Offset: 0,
Length: 3,
},
},
Expand All @@ -347,15 +347,15 @@ func TestConvertBucketsLayout(t *testing.T) {
name: "zero buckets",
buckets: pmetric.NewExponentialHistogramDataPointBuckets,
wantLayout: map[int32]expectedBucketLayout{
1: {
0: {
wantSpans: nil,
wantDeltas: nil,
},
2: {
1: {
wantSpans: nil,
wantDeltas: nil,
},
4: {
2: {
wantSpans: nil,
wantDeltas: nil,
},
Expand Down

0 comments on commit 79b9a1a

Please sign in to comment.