Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aggregator] Sort heap in one go, instead of iterating one-by-one #3331

Merged
merged 2 commits into from
Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 46 additions & 13 deletions src/aggregator/aggregation/quantile/cm/heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,16 @@ func (h *minHeap) Push(value float64) {
heap := *h
n := len(heap)
i := n - 1
for {
for i < n && i >= 0 {
parent := (i - 1) / 2
if parent == i || heap[parent] <= heap[i] {
if parent == i || parent >= n || parent < 0 || heap[parent] <= heap[i] {
break
}
heap[parent], heap[i] = heap[i], heap[parent]
i = parent
}
}

func (h *minHeap) ensureSize() {
heap := *h
targetCap := cap(heap) * 2
newHeap := sharedHeapPool.Get(targetCap)
(*newHeap) = append(*newHeap, heap...)
if cap(heap) >= _initialHeapBucketSize {
sharedHeapPool.Put(heap)
}
(*h) = *newHeap
}

func (h *minHeap) Reset() {
if heap := *h; cap(heap) >= _initialHeapBucketSize {
sharedHeapPool.Put(heap)
Expand Down Expand Up @@ -99,3 +88,47 @@ func (h *minHeap) Pop() float64 {
*h = old[0:n]
return val
}

func (h minHeap) SortDesc() {
heap := h
// this is equivalent to Pop() in a loop (heapsort)
// all the redundant-looking conditions are there to eliminate bounds-checks
for n := len(heap) - 1; n > 0 && n < len(heap); n = len(heap) - 1 {
var (
i int
smallest int
)
heap[0], heap[n] = heap[n], heap[0]
for smallest >= 0 && smallest <= n {
var (
left = smallest*2 + 1
right = left + 1
)
if left < n && left >= 0 && heap[left] < heap[smallest] {
smallest = left
}
if right < n && right >= 0 && heap[right] < heap[smallest] {
smallest = right
}
if smallest == i {
break
}
heap[i], heap[smallest] = heap[smallest], heap[i]
i = smallest
}
heap = heap[0:n]
}
}

func (h *minHeap) ensureSize() {
var (
heap = *h
targetCap = cap(heap) * 2
newHeap = sharedHeapPool.Get(targetCap)
)
(*newHeap) = append(*newHeap, heap...)
if cap(heap) >= _initialHeapBucketSize {
sharedHeapPool.Put(heap)
}
(*h) = *newHeap
}
27 changes: 27 additions & 0 deletions src/aggregator/aggregation/quantile/cm/heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package cm

import (
"math/rand"
"sort"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -33,6 +35,7 @@ func TestMinHeapPushInDecreasingOrder(t *testing.T) {
h.Push(float64(i))
require.Equal(t, iter-i, h.Len())
}
validateSort(t, *h)
for i := 0; i < iter; i++ {
require.Equal(t, float64(i), h.Min())
require.Equal(t, float64(i), h.Pop())
Expand All @@ -47,13 +50,37 @@ func TestMinHeapPushInIncreasingOrder(t *testing.T) {
h.Push(float64(i))
require.Equal(t, i+1, h.Len())
}
validateSort(t, *h)
for i := 0; i < iter; i++ {
require.Equal(t, float64(i), h.Min())
require.Equal(t, float64(i), h.Pop())
validateInvariant(t, *h, 0)
}
}

func TestMinHeapPushInRandomOrderAndSort(t *testing.T) {
h := &minHeap{}
iter := 42
for i := 0; i < iter; i++ {
h.Push(rand.ExpFloat64())
}
validateSort(t, *h)
}

func validateSort(t *testing.T, h minHeap) {
t.Helper()
// copy heap before applying reference sort and minheap-sort
a := make([]float64, h.Len())
b := make([]float64, h.Len())
for i := 0; i < len(h); i++ {
a[i], b[i] = h[i], h[i]
}
sort.Sort(sort.Reverse(sort.Float64Slice(a)))
heap := (*minHeap)(&b)
heap.SortDesc()
require.Equal(t, a, b)
}

func validateInvariant(t *testing.T, h minHeap, i int) {
var (
n = h.Len()
Expand Down
91 changes: 59 additions & 32 deletions src/aggregator/aggregation/quantile/cm/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,7 @@ func (s *Stream) calcQuantiles() {
)
}

for curr != nil {
if idx == len(s.quantiles) {
break
}
for curr != nil && idx < len(s.computedQuantiles) {
maxRank = minRank + curr.numRanks + curr.delta
rank, threshold := s.thresholdBuf[idx].rank, s.thresholdBuf[idx].threshold

Expand All @@ -271,7 +268,7 @@ func (s *Stream) calcQuantiles() {
}

// check if the last sample value should satisfy unprocessed quantiles
for i := idx; i < len(s.quantiles); i++ {
for i := idx; i < len(s.thresholdBuf); i++ {
rank, threshold := s.thresholdBuf[i].rank, s.thresholdBuf[i].threshold
if maxRank >= rank+threshold || minRank > rank {
s.computedQuantiles[i] = prev.value
Expand All @@ -284,6 +281,7 @@ func (s *Stream) insert() {
var (
compCur = s.compressCursor
compValue = math.NaN()
samples = &s.samples
insertPointValue float64
sample *Sample
)
Expand All @@ -292,15 +290,22 @@ func (s *Stream) insert() {
compValue = compCur.value
}

samples := &s.samples
// break heap invariant and just sort all the times, as we'll consume all of them in one go
s.bufMore.SortDesc()

for s.insertCursor != nil {
var (
vals = []float64(s.bufMore)
idx = len(vals) - 1
)

for s.insertCursor != nil && idx < len(vals) {
curr := s.insertCursor
insertPointValue = curr.value

for s.bufMore.Len() > 0 && s.bufMore.Min() <= insertPointValue {
for idx >= 0 && vals[idx] <= insertPointValue {
val := vals[idx]
idx--
sample = s.samples.Acquire()
val := s.bufMore.Pop()
sample.value = val
sample.numRanks = 1
sample.delta = curr.numRanks + curr.delta - 1
Expand All @@ -316,19 +321,20 @@ func (s *Stream) insert() {
s.insertCursor = s.insertCursor.next
}

if s.insertCursor != nil {
return
}

for s.bufMore.Len() > 0 && s.bufMore.Min() >= samples.Back().value {
sample = s.samples.Acquire()
sample.value = s.bufMore.Pop()
sample.numRanks = 1
sample.delta = 0
samples.PushBack(sample)
s.numValues++
if s.insertCursor == nil && idx < len(vals) {
for idx >= 0 && vals[idx] >= samples.Back().value {
val := vals[idx]
idx--
sample = s.samples.Acquire()
sample.value = val
sample.numRanks = 1
sample.delta = 0
samples.PushBack(sample)
s.numValues++
}
}

s.bufMore = s.bufMore[:0]
s.resetInsertCursor()
}

Expand All @@ -345,27 +351,48 @@ func (s *Stream) compress() {
s.compressCursor = s.compressCursor.prev
}

var (
numVals = s.numValues
eps = 2.0 * s.eps
)

for s.compressCursor != s.samples.Front() {
next := s.compressCursor.next
maxRank := s.compressMinRank + s.compressCursor.numRanks + s.compressCursor.delta
threshold := s.threshold(maxRank)
s.compressMinRank -= s.compressCursor.numRanks
testVal := s.compressCursor.numRanks + next.numRanks + next.delta
var (
curr = s.compressCursor
next = curr.next
prev = curr.prev

maxRank = s.compressMinRank + curr.numRanks + curr.delta

threshold = int64(math.MaxInt64)
quantileMin int64
)

for i := range s.quantiles {
if maxRank >= int64(s.quantiles[i]*float64(numVals)) {
quantileMin = int64(eps * float64(maxRank) / s.quantiles[i])
} else {
quantileMin = int64(eps * float64(numVals-maxRank) / (1.0 - s.quantiles[i]))
}
if quantileMin < threshold {
threshold = quantileMin
}
}

s.compressMinRank -= curr.numRanks
testVal := curr.numRanks + next.numRanks + next.delta

if testVal <= threshold {
if s.insertCursor == s.compressCursor {
if s.insertCursor == curr {
s.insertCursor = next
}

next.numRanks += s.compressCursor.numRanks
next.numRanks += curr.numRanks

prev := s.compressCursor.prev
// no need to release sample here
s.samples.Remove(s.compressCursor)
s.compressCursor = prev
} else {
s.compressCursor = s.compressCursor.prev
s.samples.Remove(curr)
}
s.compressCursor = prev
}

if s.compressCursor == s.samples.Front() {
Expand Down