diff --git a/src/aggregator/aggregation/quantile/cm/heap.go b/src/aggregator/aggregation/quantile/cm/heap.go index bcd8fc968b..027ac59a03 100644 --- a/src/aggregator/aggregation/quantile/cm/heap.go +++ b/src/aggregator/aggregation/quantile/cm/heap.go @@ -43,9 +43,9 @@ func (h *minHeap) Push(value float64) { heap := *h n := len(heap) i := n - 1 - for { + for i < n && i >= 0 { parent := (i - 1) / 2 - if parent == i || heap[parent] <= heap[i] { + if parent == i || parent >= n || parent < 0 || heap[parent] <= heap[i] { break } heap[parent], heap[i] = heap[i], heap[parent] @@ -53,17 +53,6 @@ func (h *minHeap) Push(value float64) { } } -func (h *minHeap) ensureSize() { - heap := *h - targetCap := cap(heap) * 2 - newHeap := sharedHeapPool.Get(targetCap) - (*newHeap) = append(*newHeap, heap...) - if cap(heap) >= _initialHeapBucketSize { - sharedHeapPool.Put(heap) - } - (*h) = *newHeap -} - func (h *minHeap) Reset() { if heap := *h; cap(heap) >= _initialHeapBucketSize { sharedHeapPool.Put(heap) @@ -99,3 +88,47 @@ func (h *minHeap) Pop() float64 { *h = old[0:n] return val } + +func (h minHeap) SortDesc() { + heap := h + // this is equivalent to Pop() in a loop (heapsort) + // all the redundant-looking conditions are there to eliminate bounds-checks + for n := len(heap) - 1; n > 0 && n < len(heap); n = len(heap) - 1 { + var ( + i int + smallest int + ) + heap[0], heap[n] = heap[n], heap[0] + for smallest >= 0 && smallest <= n { + var ( + left = smallest*2 + 1 + right = left + 1 + ) + if left < n && left >= 0 && heap[left] < heap[smallest] { + smallest = left + } + if right < n && right >= 0 && heap[right] < heap[smallest] { + smallest = right + } + if smallest == i { + break + } + heap[i], heap[smallest] = heap[smallest], heap[i] + i = smallest + } + heap = heap[0:n] + } +} + +func (h *minHeap) ensureSize() { + var ( + heap = *h + targetCap = cap(heap) * 2 + newHeap = sharedHeapPool.Get(targetCap) + ) + (*newHeap) = append(*newHeap, heap...) + if cap(heap) >= _initialHeapBucketSize { + sharedHeapPool.Put(heap) + } + (*h) = *newHeap +} diff --git a/src/aggregator/aggregation/quantile/cm/heap_test.go b/src/aggregator/aggregation/quantile/cm/heap_test.go index 8c02f8240a..befbaa1ebc 100644 --- a/src/aggregator/aggregation/quantile/cm/heap_test.go +++ b/src/aggregator/aggregation/quantile/cm/heap_test.go @@ -21,6 +21,8 @@ package cm import ( + "math/rand" + "sort" "testing" "github.com/stretchr/testify/require" @@ -33,6 +35,7 @@ func TestMinHeapPushInDecreasingOrder(t *testing.T) { h.Push(float64(i)) require.Equal(t, iter-i, h.Len()) } + validateSort(t, *h) for i := 0; i < iter; i++ { require.Equal(t, float64(i), h.Min()) require.Equal(t, float64(i), h.Pop()) @@ -47,6 +50,7 @@ func TestMinHeapPushInIncreasingOrder(t *testing.T) { h.Push(float64(i)) require.Equal(t, i+1, h.Len()) } + validateSort(t, *h) for i := 0; i < iter; i++ { require.Equal(t, float64(i), h.Min()) require.Equal(t, float64(i), h.Pop()) @@ -54,6 +58,29 @@ func TestMinHeapPushInIncreasingOrder(t *testing.T) { } } +func TestMinHeapPushInRandomOrderAndSort(t *testing.T) { + h := &minHeap{} + iter := 42 + for i := 0; i < iter; i++ { + h.Push(rand.ExpFloat64()) + } + validateSort(t, *h) +} + +func validateSort(t *testing.T, h minHeap) { + t.Helper() + // copy heap before applying reference sort and minheap-sort + a := make([]float64, h.Len()) + b := make([]float64, h.Len()) + for i := 0; i < len(h); i++ { + a[i], b[i] = h[i], h[i] + } + sort.Sort(sort.Reverse(sort.Float64Slice(a))) + heap := (*minHeap)(&b) + heap.SortDesc() + require.Equal(t, a, b) +} + func validateInvariant(t *testing.T, h minHeap, i int) { var ( n = h.Len() diff --git a/src/aggregator/aggregation/quantile/cm/stream.go b/src/aggregator/aggregation/quantile/cm/stream.go index cc9e20e4b5..179274dd39 100644 --- a/src/aggregator/aggregation/quantile/cm/stream.go +++ b/src/aggregator/aggregation/quantile/cm/stream.go @@ -253,10 +253,7 @@ func (s *Stream) calcQuantiles() { ) } - for curr != nil { - if idx == len(s.quantiles) { - break - } + for curr != nil && idx < len(s.computedQuantiles) { maxRank = minRank + curr.numRanks + curr.delta rank, threshold := s.thresholdBuf[idx].rank, s.thresholdBuf[idx].threshold @@ -271,7 +268,7 @@ func (s *Stream) calcQuantiles() { } // check if the last sample value should satisfy unprocessed quantiles - for i := idx; i < len(s.quantiles); i++ { + for i := idx; i < len(s.thresholdBuf); i++ { rank, threshold := s.thresholdBuf[i].rank, s.thresholdBuf[i].threshold if maxRank >= rank+threshold || minRank > rank { s.computedQuantiles[i] = prev.value @@ -284,6 +281,7 @@ func (s *Stream) insert() { var ( compCur = s.compressCursor compValue = math.NaN() + samples = &s.samples insertPointValue float64 sample *Sample ) @@ -292,15 +290,22 @@ func (s *Stream) insert() { compValue = compCur.value } - samples := &s.samples + // break heap invariant and just sort all the times, as we'll consume all of them in one go + s.bufMore.SortDesc() - for s.insertCursor != nil { + var ( + vals = []float64(s.bufMore) + idx = len(vals) - 1 + ) + + for s.insertCursor != nil && idx < len(vals) { curr := s.insertCursor insertPointValue = curr.value - for s.bufMore.Len() > 0 && s.bufMore.Min() <= insertPointValue { + for idx >= 0 && vals[idx] <= insertPointValue { + val := vals[idx] + idx-- sample = s.samples.Acquire() - val := s.bufMore.Pop() sample.value = val sample.numRanks = 1 sample.delta = curr.numRanks + curr.delta - 1 @@ -316,19 +321,20 @@ func (s *Stream) insert() { s.insertCursor = s.insertCursor.next } - if s.insertCursor != nil { - return - } - - for s.bufMore.Len() > 0 && s.bufMore.Min() >= samples.Back().value { - sample = s.samples.Acquire() - sample.value = s.bufMore.Pop() - sample.numRanks = 1 - sample.delta = 0 - samples.PushBack(sample) - s.numValues++ + if s.insertCursor == nil && idx < len(vals) { + for idx >= 0 && vals[idx] >= samples.Back().value { + val := vals[idx] + idx-- + sample = s.samples.Acquire() + sample.value = val + sample.numRanks = 1 + sample.delta = 0 + samples.PushBack(sample) + s.numValues++ + } } + s.bufMore = s.bufMore[:0] s.resetInsertCursor() } @@ -345,27 +351,48 @@ func (s *Stream) compress() { s.compressCursor = s.compressCursor.prev } + var ( + numVals = s.numValues + eps = 2.0 * s.eps + ) + for s.compressCursor != s.samples.Front() { - next := s.compressCursor.next - maxRank := s.compressMinRank + s.compressCursor.numRanks + s.compressCursor.delta - threshold := s.threshold(maxRank) - s.compressMinRank -= s.compressCursor.numRanks - testVal := s.compressCursor.numRanks + next.numRanks + next.delta + var ( + curr = s.compressCursor + next = curr.next + prev = curr.prev + + maxRank = s.compressMinRank + curr.numRanks + curr.delta + + threshold = int64(math.MaxInt64) + quantileMin int64 + ) + + for i := range s.quantiles { + if maxRank >= int64(s.quantiles[i]*float64(numVals)) { + quantileMin = int64(eps * float64(maxRank) / s.quantiles[i]) + } else { + quantileMin = int64(eps * float64(numVals-maxRank) / (1.0 - s.quantiles[i])) + } + if quantileMin < threshold { + threshold = quantileMin + } + } + + s.compressMinRank -= curr.numRanks + testVal := curr.numRanks + next.numRanks + next.delta if testVal <= threshold { - if s.insertCursor == s.compressCursor { + if s.insertCursor == curr { s.insertCursor = next } - next.numRanks += s.compressCursor.numRanks + next.numRanks += curr.numRanks - prev := s.compressCursor.prev // no need to release sample here - s.samples.Remove(s.compressCursor) - s.compressCursor = prev - } else { - s.compressCursor = s.compressCursor.prev + s.samples.Remove(curr) } + s.compressCursor = prev } if s.compressCursor == s.samples.Front() {