Skip to content

Commit

Permalink
[aggregator] Sort heap in one go, instead of iterating one-by-one (#3331
Browse files Browse the repository at this point in the history
)
  • Loading branch information
vdarulis authored Mar 8, 2021
1 parent a03e55f commit a6607a6
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 45 deletions.
59 changes: 46 additions & 13 deletions src/aggregator/aggregation/quantile/cm/heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,16 @@ func (h *minHeap) Push(value float64) {
heap := *h
n := len(heap)
i := n - 1
for {
for i < n && i >= 0 {
parent := (i - 1) / 2
if parent == i || heap[parent] <= heap[i] {
if parent == i || parent >= n || parent < 0 || heap[parent] <= heap[i] {
break
}
heap[parent], heap[i] = heap[i], heap[parent]
i = parent
}
}

func (h *minHeap) ensureSize() {
heap := *h
targetCap := cap(heap) * 2
newHeap := sharedHeapPool.Get(targetCap)
(*newHeap) = append(*newHeap, heap...)
if cap(heap) >= _initialHeapBucketSize {
sharedHeapPool.Put(heap)
}
(*h) = *newHeap
}

func (h *minHeap) Reset() {
if heap := *h; cap(heap) >= _initialHeapBucketSize {
sharedHeapPool.Put(heap)
Expand Down Expand Up @@ -99,3 +88,47 @@ func (h *minHeap) Pop() float64 {
*h = old[0:n]
return val
}

func (h minHeap) SortDesc() {
heap := h
// this is equivalent to Pop() in a loop (heapsort)
// all the redundant-looking conditions are there to eliminate bounds-checks
for n := len(heap) - 1; n > 0 && n < len(heap); n = len(heap) - 1 {
var (
i int
smallest int
)
heap[0], heap[n] = heap[n], heap[0]
for smallest >= 0 && smallest <= n {
var (
left = smallest*2 + 1
right = left + 1
)
if left < n && left >= 0 && heap[left] < heap[smallest] {
smallest = left
}
if right < n && right >= 0 && heap[right] < heap[smallest] {
smallest = right
}
if smallest == i {
break
}
heap[i], heap[smallest] = heap[smallest], heap[i]
i = smallest
}
heap = heap[0:n]
}
}

func (h *minHeap) ensureSize() {
var (
heap = *h
targetCap = cap(heap) * 2
newHeap = sharedHeapPool.Get(targetCap)
)
(*newHeap) = append(*newHeap, heap...)
if cap(heap) >= _initialHeapBucketSize {
sharedHeapPool.Put(heap)
}
(*h) = *newHeap
}
27 changes: 27 additions & 0 deletions src/aggregator/aggregation/quantile/cm/heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package cm

import (
"math/rand"
"sort"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -33,6 +35,7 @@ func TestMinHeapPushInDecreasingOrder(t *testing.T) {
h.Push(float64(i))
require.Equal(t, iter-i, h.Len())
}
validateSort(t, *h)
for i := 0; i < iter; i++ {
require.Equal(t, float64(i), h.Min())
require.Equal(t, float64(i), h.Pop())
Expand All @@ -47,13 +50,37 @@ func TestMinHeapPushInIncreasingOrder(t *testing.T) {
h.Push(float64(i))
require.Equal(t, i+1, h.Len())
}
validateSort(t, *h)
for i := 0; i < iter; i++ {
require.Equal(t, float64(i), h.Min())
require.Equal(t, float64(i), h.Pop())
validateInvariant(t, *h, 0)
}
}

func TestMinHeapPushInRandomOrderAndSort(t *testing.T) {
h := &minHeap{}
iter := 42
for i := 0; i < iter; i++ {
h.Push(rand.ExpFloat64())
}
validateSort(t, *h)
}

func validateSort(t *testing.T, h minHeap) {
t.Helper()
// copy heap before applying reference sort and minheap-sort
a := make([]float64, h.Len())
b := make([]float64, h.Len())
for i := 0; i < len(h); i++ {
a[i], b[i] = h[i], h[i]
}
sort.Sort(sort.Reverse(sort.Float64Slice(a)))
heap := (*minHeap)(&b)
heap.SortDesc()
require.Equal(t, a, b)
}

func validateInvariant(t *testing.T, h minHeap, i int) {
var (
n = h.Len()
Expand Down
91 changes: 59 additions & 32 deletions src/aggregator/aggregation/quantile/cm/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,7 @@ func (s *Stream) calcQuantiles() {
)
}

for curr != nil {
if idx == len(s.quantiles) {
break
}
for curr != nil && idx < len(s.computedQuantiles) {
maxRank = minRank + curr.numRanks + curr.delta
rank, threshold := s.thresholdBuf[idx].rank, s.thresholdBuf[idx].threshold

Expand All @@ -271,7 +268,7 @@ func (s *Stream) calcQuantiles() {
}

// check if the last sample value should satisfy unprocessed quantiles
for i := idx; i < len(s.quantiles); i++ {
for i := idx; i < len(s.thresholdBuf); i++ {
rank, threshold := s.thresholdBuf[i].rank, s.thresholdBuf[i].threshold
if maxRank >= rank+threshold || minRank > rank {
s.computedQuantiles[i] = prev.value
Expand All @@ -284,6 +281,7 @@ func (s *Stream) insert() {
var (
compCur = s.compressCursor
compValue = math.NaN()
samples = &s.samples
insertPointValue float64
sample *Sample
)
Expand All @@ -292,15 +290,22 @@ func (s *Stream) insert() {
compValue = compCur.value
}

samples := &s.samples
// break heap invariant and just sort all the times, as we'll consume all of them in one go
s.bufMore.SortDesc()

for s.insertCursor != nil {
var (
vals = []float64(s.bufMore)
idx = len(vals) - 1
)

for s.insertCursor != nil && idx < len(vals) {
curr := s.insertCursor
insertPointValue = curr.value

for s.bufMore.Len() > 0 && s.bufMore.Min() <= insertPointValue {
for idx >= 0 && vals[idx] <= insertPointValue {
val := vals[idx]
idx--
sample = s.samples.Acquire()
val := s.bufMore.Pop()
sample.value = val
sample.numRanks = 1
sample.delta = curr.numRanks + curr.delta - 1
Expand All @@ -316,19 +321,20 @@ func (s *Stream) insert() {
s.insertCursor = s.insertCursor.next
}

if s.insertCursor != nil {
return
}

for s.bufMore.Len() > 0 && s.bufMore.Min() >= samples.Back().value {
sample = s.samples.Acquire()
sample.value = s.bufMore.Pop()
sample.numRanks = 1
sample.delta = 0
samples.PushBack(sample)
s.numValues++
if s.insertCursor == nil && idx < len(vals) {
for idx >= 0 && vals[idx] >= samples.Back().value {
val := vals[idx]
idx--
sample = s.samples.Acquire()
sample.value = val
sample.numRanks = 1
sample.delta = 0
samples.PushBack(sample)
s.numValues++
}
}

s.bufMore = s.bufMore[:0]
s.resetInsertCursor()
}

Expand All @@ -345,27 +351,48 @@ func (s *Stream) compress() {
s.compressCursor = s.compressCursor.prev
}

var (
numVals = s.numValues
eps = 2.0 * s.eps
)

for s.compressCursor != s.samples.Front() {
next := s.compressCursor.next
maxRank := s.compressMinRank + s.compressCursor.numRanks + s.compressCursor.delta
threshold := s.threshold(maxRank)
s.compressMinRank -= s.compressCursor.numRanks
testVal := s.compressCursor.numRanks + next.numRanks + next.delta
var (
curr = s.compressCursor
next = curr.next
prev = curr.prev

maxRank = s.compressMinRank + curr.numRanks + curr.delta

threshold = int64(math.MaxInt64)
quantileMin int64
)

for i := range s.quantiles {
if maxRank >= int64(s.quantiles[i]*float64(numVals)) {
quantileMin = int64(eps * float64(maxRank) / s.quantiles[i])
} else {
quantileMin = int64(eps * float64(numVals-maxRank) / (1.0 - s.quantiles[i]))
}
if quantileMin < threshold {
threshold = quantileMin
}
}

s.compressMinRank -= curr.numRanks
testVal := curr.numRanks + next.numRanks + next.delta

if testVal <= threshold {
if s.insertCursor == s.compressCursor {
if s.insertCursor == curr {
s.insertCursor = next
}

next.numRanks += s.compressCursor.numRanks
next.numRanks += curr.numRanks

prev := s.compressCursor.prev
// no need to release sample here
s.samples.Remove(s.compressCursor)
s.compressCursor = prev
} else {
s.compressCursor = s.compressCursor.prev
s.samples.Remove(curr)
}
s.compressCursor = prev
}

if s.compressCursor == s.samples.Front() {
Expand Down

0 comments on commit a6607a6

Please sign in to comment.