From 7ed450b25fb6efc56bb747b37e70538f9463f936 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Fri, 20 Oct 2023 09:55:40 -0400 Subject: [PATCH] Implement Heap Map (#2137) Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Co-authored-by: Alberto Benegiamo Co-authored-by: Stephen Buttolph --- .../avalanche/bootstrap/bootstrapper.go | 46 ++++-- snow/engine/avalanche/vertex/heap.go | 135 ----------------- snow/engine/avalanche/vertex/heap_test.go | 138 ------------------ snow/networking/benchlist/benchlist.go | 103 ++++--------- snow/networking/benchlist/benchlist_test.go | 44 ++---- utils/heap/map.go | 132 +++++++++++++++++ utils/heap/map_test.go | 96 ++++++++++++ utils/math/averager_heap.go | 108 ++------------ utils/math/averager_heap_test.go | 108 +++++++------- utils/timer/adaptive_timeout_manager.go | 69 ++------- vms/platformvm/txs/txheap/by_age.go | 21 ++- vms/platformvm/txs/txheap/by_end_time.go | 20 +-- vms/platformvm/txs/txheap/by_end_time_test.go | 2 +- vms/platformvm/txs/txheap/by_start_time.go | 20 +-- vms/platformvm/txs/txheap/heap.go | 107 ++++---------- x/sync/peer_tracker.go | 15 +- 16 files changed, 451 insertions(+), 713 deletions(-) delete mode 100644 snow/engine/avalanche/vertex/heap.go delete mode 100644 snow/engine/avalanche/vertex/heap_test.go create mode 100644 utils/heap/map.go create mode 100644 utils/heap/map_test.go diff --git a/snow/engine/avalanche/bootstrap/bootstrapper.go b/snow/engine/avalanche/bootstrap/bootstrapper.go index ed626f56ce88..40f75968ae08 100644 --- a/snow/engine/avalanche/bootstrap/bootstrapper.go +++ b/snow/engine/avalanche/bootstrap/bootstrapper.go @@ -15,8 +15,8 @@ import ( "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/snow/choices" "github.com/ava-labs/avalanchego/snow/consensus/avalanche" - "github.com/ava-labs/avalanchego/snow/engine/avalanche/vertex" "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/utils/heap" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/version" @@ -417,14 +417,15 @@ func (b *bootstrapper) fetch(ctx context.Context, vtxIDs ...ids.ID) error { // Process the vertices in [vtxs]. func (b *bootstrapper) process(ctx context.Context, vtxs ...avalanche.Vertex) error { - // Vertices that we need to process. Store them in a heap for deduplication - // and so we always process vertices further down in the DAG first. This helps - // to reduce the number of repeated DAG traversals. - toProcess := vertex.NewHeap() + // Vertices that we need to process prioritized by vertices that are unknown + // or the furthest down the DAG. Unknown vertices are prioritized to ensure + // that once we have made it below a certain height in DAG traversal we do + // not need to reset and repeat DAG traversals. + toProcess := heap.NewMap[ids.ID, avalanche.Vertex](vertexLess) for _, vtx := range vtxs { vtxID := vtx.ID() if _, ok := b.processedCache.Get(vtxID); !ok { // only process a vertex if we haven't already - toProcess.Push(vtx) + _, _ = toProcess.Push(vtxID, vtx) } else { b.VtxBlocked.RemoveMissingID(vtxID) } @@ -433,13 +434,15 @@ func (b *bootstrapper) process(ctx context.Context, vtxs ...avalanche.Vertex) er vtxHeightSet := set.Set[ids.ID]{} prevHeight := uint64(0) - for toProcess.Len() > 0 { // While there are unprocessed vertices + for { if b.Halted() { return nil } - vtx := toProcess.Pop() // Get an unknown vertex or one furthest down the DAG - vtxID := vtx.ID() + vtxID, vtx, ok := toProcess.Pop() + if !ok { + break + } switch vtx.Status() { case choices.Unknown: @@ -511,7 +514,7 @@ func (b *bootstrapper) process(ctx context.Context, vtxs ...avalanche.Vertex) er parentID := parent.ID() if _, ok := b.processedCache.Get(parentID); !ok { // But only if we haven't processed the parent if !vtxHeightSet.Contains(parentID) { - toProcess.Push(parent) + toProcess.Push(parentID, parent) } } } @@ -633,3 +636,26 @@ func (b *bootstrapper) checkFinish(ctx context.Context) error { b.processedCache.Flush() return b.OnFinished(ctx, b.Config.SharedCfg.RequestID) } + +// A vertex is less than another vertex if it is unknown. Ties are broken by +// prioritizing vertices that have a greater height. +func vertexLess(i, j avalanche.Vertex) bool { + if !i.Status().Fetched() { + return true + } + if !j.Status().Fetched() { + return false + } + + // Treat errors on retrieving the height as if the vertex is not fetched + heightI, errI := i.Height() + if errI != nil { + return true + } + heightJ, errJ := j.Height() + if errJ != nil { + return false + } + + return heightI > heightJ +} diff --git a/snow/engine/avalanche/vertex/heap.go b/snow/engine/avalanche/vertex/heap.go deleted file mode 100644 index fa9a0a83d920..000000000000 --- a/snow/engine/avalanche/vertex/heap.go +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package vertex - -import ( - "container/heap" - - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/snow/consensus/avalanche" - "github.com/ava-labs/avalanchego/utils/set" -) - -var ( - _ Heap = (*maxHeightVertexHeap)(nil) - _ heap.Interface = (*priorityQueue)(nil) -) - -type priorityQueue []avalanche.Vertex - -func (pq priorityQueue) Len() int { - return len(pq) -} - -// Returns true if the vertex at index i has greater height than the vertex at -// index j. -func (pq priorityQueue) Less(i, j int) bool { - statusI := pq[i].Status() - statusJ := pq[j].Status() - - // Put unknown vertices at the front of the heap to ensure once we have made - // it below a certain height in DAG traversal we do not need to reset - if !statusI.Fetched() { - return true - } - if !statusJ.Fetched() { - return false - } - - // Treat errors on retrieving the height as if the vertex is not fetched - heightI, errI := pq[i].Height() - if errI != nil { - return true - } - heightJ, errJ := pq[j].Height() - if errJ != nil { - return false - } - return heightI > heightJ -} - -func (pq priorityQueue) Swap(i, j int) { - pq[i], pq[j] = pq[j], pq[i] -} - -// Push adds an item to this priority queue. x must have type *vertexItem -func (pq *priorityQueue) Push(x interface{}) { - item := x.(avalanche.Vertex) - *pq = append(*pq, item) -} - -// Pop returns the last item in this priorityQueue -func (pq *priorityQueue) Pop() interface{} { - old := *pq - n := len(old) - item := old[n-1] - old[n-1] = nil - *pq = old[0 : n-1] - return item -} - -// Heap defines the functionality of a heap of vertices with unique VertexIDs -// ordered by height -type Heap interface { - // Empty the heap. - Clear() - - // Add the provided vertex to the heap. Vertices are de-duplicated, returns - // true if the vertex was added, false if it was dropped. - Push(avalanche.Vertex) bool - - // Remove the top vertex. Assumes that there is at least one element. - Pop() avalanche.Vertex - - // Returns if a vertex with the provided ID is currently in the heap. - Contains(ids.ID) bool - - // Returns the number of vertices in the heap. - Len() int -} - -// NewHeap returns an empty Heap -func NewHeap() Heap { - return &maxHeightVertexHeap{} -} - -type maxHeightVertexHeap struct { - heap priorityQueue - elementIDs set.Set[ids.ID] -} - -func (vh *maxHeightVertexHeap) Clear() { - vh.heap = priorityQueue{} - vh.elementIDs.Clear() -} - -// Push adds an element to this heap. Returns true if the element was added. -// Returns false if it was already in the heap. -func (vh *maxHeightVertexHeap) Push(vtx avalanche.Vertex) bool { - vtxID := vtx.ID() - if vh.elementIDs.Contains(vtxID) { - return false - } - - vh.elementIDs.Add(vtxID) - heap.Push(&vh.heap, vtx) - return true -} - -// If there are any vertices in this heap with status Unknown, removes one such -// vertex and returns it. Otherwise, removes and returns the vertex in this heap -// with the greatest height. -func (vh *maxHeightVertexHeap) Pop() avalanche.Vertex { - vtx := heap.Pop(&vh.heap).(avalanche.Vertex) - vh.elementIDs.Remove(vtx.ID()) - return vtx -} - -func (vh *maxHeightVertexHeap) Len() int { - return vh.heap.Len() -} - -func (vh *maxHeightVertexHeap) Contains(vtxID ids.ID) bool { - return vh.elementIDs.Contains(vtxID) -} diff --git a/snow/engine/avalanche/vertex/heap_test.go b/snow/engine/avalanche/vertex/heap_test.go deleted file mode 100644 index 1301f3fda96b..000000000000 --- a/snow/engine/avalanche/vertex/heap_test.go +++ /dev/null @@ -1,138 +0,0 @@ -// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package vertex - -import ( - "testing" - - "github.com/stretchr/testify/require" - - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/snow/choices" - "github.com/ava-labs/avalanchego/snow/consensus/avalanche" -) - -// This example inserts several ints into an IntHeap, checks the minimum, -// and removes them in order of priority. -func TestUniqueVertexHeapReturnsOrdered(t *testing.T) { - require := require.New(t) - - h := NewHeap() - - vtx0 := &avalanche.TestVertex{ - TestDecidable: choices.TestDecidable{ - IDV: ids.GenerateTestID(), - StatusV: choices.Processing, - }, - HeightV: 0, - } - vtx1 := &avalanche.TestVertex{ - TestDecidable: choices.TestDecidable{ - IDV: ids.GenerateTestID(), - StatusV: choices.Processing, - }, - HeightV: 1, - } - vtx2 := &avalanche.TestVertex{ - TestDecidable: choices.TestDecidable{ - IDV: ids.GenerateTestID(), - StatusV: choices.Processing, - }, - HeightV: 1, - } - vtx3 := &avalanche.TestVertex{ - TestDecidable: choices.TestDecidable{ - IDV: ids.GenerateTestID(), - StatusV: choices.Processing, - }, - HeightV: 3, - } - vtx4 := &avalanche.TestVertex{ - TestDecidable: choices.TestDecidable{ - IDV: ids.GenerateTestID(), - StatusV: choices.Unknown, - }, - HeightV: 0, - } - - vts := []avalanche.Vertex{vtx0, vtx1, vtx2, vtx3, vtx4} - - for _, vtx := range vts { - h.Push(vtx) - } - - vtxZ := h.Pop() - require.Equal(vtx4.ID(), vtxZ.ID()) - - vtxA := h.Pop() - height, err := vtxA.Height() - require.NoError(err) - require.Equal(uint64(3), height) - require.Equal(vtx3.ID(), vtxA.ID()) - - vtxB := h.Pop() - height, err = vtxB.Height() - require.NoError(err) - require.Equal(uint64(1), height) - require.Contains([]ids.ID{vtx1.ID(), vtx2.ID()}, vtxB.ID()) - - vtxC := h.Pop() - height, err = vtxC.Height() - require.NoError(err) - require.Equal(uint64(1), height) - require.Contains([]ids.ID{vtx1.ID(), vtx2.ID()}, vtxC.ID()) - - require.NotEqual(vtxB.ID(), vtxC.ID()) - - vtxD := h.Pop() - height, err = vtxD.Height() - require.NoError(err) - require.Zero(height) - require.Equal(vtx0.ID(), vtxD.ID()) - - require.Zero(h.Len()) -} - -func TestUniqueVertexHeapRemainsUnique(t *testing.T) { - require := require.New(t) - - h := NewHeap() - - vtx0 := &avalanche.TestVertex{ - TestDecidable: choices.TestDecidable{ - IDV: ids.GenerateTestID(), - StatusV: choices.Processing, - }, - HeightV: 0, - } - vtx1 := &avalanche.TestVertex{ - TestDecidable: choices.TestDecidable{ - IDV: ids.GenerateTestID(), - StatusV: choices.Processing, - }, - HeightV: 1, - } - - sharedID := ids.GenerateTestID() - vtx2 := &avalanche.TestVertex{ - TestDecidable: choices.TestDecidable{ - IDV: sharedID, - StatusV: choices.Processing, - }, - HeightV: 1, - } - vtx3 := &avalanche.TestVertex{ - TestDecidable: choices.TestDecidable{ - IDV: sharedID, - StatusV: choices.Processing, - }, - HeightV: 2, - } - - require.True(h.Push(vtx0)) - require.True(h.Push(vtx1)) - require.True(h.Push(vtx2)) - require.False(h.Push(vtx3)) - require.Equal(3, h.Len()) -} diff --git a/snow/networking/benchlist/benchlist.go b/snow/networking/benchlist/benchlist.go index 4f7cb20ab244..51c1a8d3c2bb 100644 --- a/snow/networking/benchlist/benchlist.go +++ b/snow/networking/benchlist/benchlist.go @@ -4,7 +4,6 @@ package benchlist import ( - "container/heap" "fmt" "math/rand" "sync" @@ -16,6 +15,7 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow/validators" + "github.com/ava-labs/avalanchego/utils/heap" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/utils/timer" @@ -24,8 +24,6 @@ import ( safemath "github.com/ava-labs/avalanchego/utils/math" ) -var _ heap.Interface = (*benchedQueue)(nil) - // If a peer consistently does not respond to queries, it will // increase latencies on the network whenever that peer is polled. // If we cannot terminate the poll early, then the poll will wait @@ -45,46 +43,6 @@ type Benchlist interface { IsBenched(nodeID ids.NodeID) bool } -// Data about a validator who is benched -type benchData struct { - benchedUntil time.Time - nodeID ids.NodeID - index int -} - -// Each element is a benched validator -type benchedQueue []*benchData - -func (bq benchedQueue) Len() int { - return len(bq) -} - -func (bq benchedQueue) Less(i, j int) bool { - return bq[i].benchedUntil.Before(bq[j].benchedUntil) -} - -func (bq benchedQueue) Swap(i, j int) { - bq[i], bq[j] = bq[j], bq[i] - bq[i].index = i - bq[j].index = j -} - -// Push adds an item to this queue. x must have type *benchData -func (bq *benchedQueue) Push(x interface{}) { - item := x.(*benchData) - item.index = len(*bq) - *bq = append(*bq, item) -} - -// Pop returns the validator that should leave the bench next -func (bq *benchedQueue) Pop() interface{} { - n := len(*bq) - item := (*bq)[n-1] - (*bq)[n-1] = nil // make sure the item is freed from memory - *bq = (*bq)[:n-1] - return item -} - type failureStreak struct { // Time of first consecutive timeout firstFailure time.Time @@ -120,9 +78,8 @@ type benchlist struct { // IDs of validators that are currently benched benchlistSet set.Set[ids.NodeID] - // Min heap containing benched validators and their endtimes - // Pop() returns the next validator to leave - benchedQueue benchedQueue + // Min heap of benched validators ordered by when they can be unbenched + benchedHeap heap.Map[ids.NodeID, time.Time] // A validator will be benched if [threshold] messages in a row // to them time out and the first of those messages was more than @@ -159,6 +116,7 @@ func NewBenchlist( failureStreaks: make(map[ids.NodeID]failureStreak), benchlistSet: set.Set[ids.NodeID]{}, benchable: benchable, + benchedHeap: heap.NewMap[ids.NodeID, time.Time](time.Time.Before), vdrs: validators, threshold: threshold, minimumFailingDuration: minimumFailingDuration, @@ -177,60 +135,52 @@ func (b *benchlist) update() { now := b.clock.Time() for { - // [next] is nil when no more validators should - // leave the bench at this time - next := b.nextToLeave(now) - if next == nil { + if !b.canUnbench(now) { break } - b.remove(next) + b.remove() } // Set next time update will be called b.setNextLeaveTime() } -// Remove [validator] from the benchlist +// Removes the next node from the benchlist // Assumes [b.lock] is held -func (b *benchlist) remove(node *benchData) { - // Update state - id := node.nodeID +func (b *benchlist) remove() { + nodeID, _, _ := b.benchedHeap.Pop() b.log.Debug("removing node from benchlist", - zap.Stringer("nodeID", id), + zap.Stringer("nodeID", nodeID), ) - heap.Remove(&b.benchedQueue, node.index) - b.benchlistSet.Remove(id) - b.benchable.Unbenched(b.chainID, id) + b.benchlistSet.Remove(nodeID) + b.benchable.Unbenched(b.chainID, nodeID) // Update metrics - b.metrics.numBenched.Set(float64(b.benchedQueue.Len())) + b.metrics.numBenched.Set(float64(b.benchedHeap.Len())) benchedStake := b.vdrs.SubsetWeight(b.benchlistSet) b.metrics.weightBenched.Set(float64(benchedStake)) } -// Returns the next validator that should leave -// the bench at time [now]. nil if no validator should. +// Returns if a validator should leave the bench at time [now]. +// False if no validator should. // Assumes [b.lock] is held -func (b *benchlist) nextToLeave(now time.Time) *benchData { - if b.benchedQueue.Len() == 0 { - return nil +func (b *benchlist) canUnbench(now time.Time) bool { + _, next, ok := b.benchedHeap.Peek() + if !ok { + return false } - next := b.benchedQueue[0] - if now.Before(next.benchedUntil) { - return nil - } - return next + return now.After(next) } // Set [b.timer] to fire when the next validator should leave the bench // Assumes [b.lock] is held func (b *benchlist) setNextLeaveTime() { - if b.benchedQueue.Len() == 0 { + _, next, ok := b.benchedHeap.Peek() + if !ok { b.timer.Cancel() return } now := b.clock.Time() - next := b.benchedQueue[0] - nextLeave := next.benchedUntil.Sub(now) + nextLeave := next.Sub(now) b.timer.SetTimeoutIn(nextLeave) } @@ -336,10 +286,7 @@ func (b *benchlist) bench(nodeID ids.NodeID) { delete(b.failureStreaks, nodeID) b.streaklock.Unlock() - heap.Push( - &b.benchedQueue, - &benchData{nodeID: nodeID, benchedUntil: benchedUntil}, - ) + b.benchedHeap.Push(nodeID, benchedUntil) b.log.Debug("benching validator after consecutive failed queries", zap.Stringer("nodeID", nodeID), zap.Duration("benchDuration", benchedUntil.Sub(now)), @@ -350,6 +297,6 @@ func (b *benchlist) bench(nodeID ids.NodeID) { b.setNextLeaveTime() // Update metrics - b.metrics.numBenched.Set(float64(b.benchedQueue.Len())) + b.metrics.numBenched.Set(float64(b.benchedHeap.Len())) b.metrics.weightBenched.Set(float64(newBenchedStake)) } diff --git a/snow/networking/benchlist/benchlist_test.go b/snow/networking/benchlist/benchlist_test.go index a33abd943c59..e9945dd0dfa5 100644 --- a/snow/networking/benchlist/benchlist_test.go +++ b/snow/networking/benchlist/benchlist_test.go @@ -66,7 +66,7 @@ func TestBenchlistAdd(t *testing.T) { require.False(b.isBenched(vdrID3)) require.False(b.isBenched(vdrID4)) require.Empty(b.failureStreaks) - require.Empty(b.benchedQueue) + require.Zero(b.benchedHeap.Len()) require.Empty(b.benchlistSet) b.lock.Unlock() @@ -77,7 +77,7 @@ func TestBenchlistAdd(t *testing.T) { // Still shouldn't be benched due to not enough consecutive failure require.False(b.isBenched(vdrID0)) - require.Empty(b.benchedQueue) + require.Zero(b.benchedHeap.Len()) require.Empty(b.benchlistSet) require.Len(b.failureStreaks, 1) fs := b.failureStreaks[vdrID0] @@ -91,7 +91,7 @@ func TestBenchlistAdd(t *testing.T) { // has passed since the first failure b.lock.Lock() require.False(b.isBenched(vdrID0)) - require.Empty(b.benchedQueue) + require.Zero(b.benchedHeap.Len()) require.Empty(b.benchlistSet) b.lock.Unlock() @@ -112,13 +112,14 @@ func TestBenchlistAdd(t *testing.T) { // Now this validator should be benched b.lock.Lock() require.True(b.isBenched(vdrID0)) - require.Equal(b.benchedQueue.Len(), 1) + require.Equal(b.benchedHeap.Len(), 1) require.Equal(b.benchlistSet.Len(), 1) - next := b.benchedQueue[0] - require.Equal(vdrID0, next.nodeID) - require.False(next.benchedUntil.After(now.Add(duration))) - require.False(next.benchedUntil.Before(now.Add(duration / 2))) + nodeID, benchedUntil, ok := b.benchedHeap.Peek() + require.True(ok) + require.Equal(vdrID0, nodeID) + require.False(benchedUntil.After(now.Add(duration))) + require.False(benchedUntil.Before(now.Add(duration / 2))) require.Empty(b.failureStreaks) require.True(benched) benchable.BenchedF = nil @@ -137,7 +138,7 @@ func TestBenchlistAdd(t *testing.T) { b.lock.Lock() require.True(b.isBenched(vdrID0)) require.False(b.isBenched(vdrID1)) - require.Equal(b.benchedQueue.Len(), 1) + require.Equal(b.benchedHeap.Len(), 1) require.Equal(b.benchlistSet.Len(), 1) require.Empty(b.failureStreaks) b.lock.Unlock() @@ -215,7 +216,7 @@ func TestBenchlistMaxStake(t *testing.T) { require.True(b.isBenched(vdrID0)) require.True(b.isBenched(vdrID1)) require.False(b.isBenched(vdrID2)) - require.Equal(b.benchedQueue.Len(), 2) + require.Equal(b.benchedHeap.Len(), 2) require.Equal(b.benchlistSet.Len(), 2) require.Len(b.failureStreaks, 1) fs := b.failureStreaks[vdrID2] @@ -242,7 +243,7 @@ func TestBenchlistMaxStake(t *testing.T) { require.True(b.isBenched(vdrID0)) require.True(b.isBenched(vdrID1)) require.True(b.isBenched(vdrID4)) - require.Equal(3, b.benchedQueue.Len()) + require.Equal(3, b.benchedHeap.Len()) require.Equal(3, b.benchlistSet.Len()) require.Contains(b.benchlistSet, vdrID0) require.Contains(b.benchlistSet, vdrID1) @@ -261,19 +262,10 @@ func TestBenchlistMaxStake(t *testing.T) { require.True(b.isBenched(vdrID1)) require.True(b.isBenched(vdrID4)) require.False(b.isBenched(vdrID2)) - require.Equal(3, b.benchedQueue.Len()) + require.Equal(3, b.benchedHeap.Len()) require.Equal(3, b.benchlistSet.Len()) require.Len(b.failureStreaks, 1) require.Contains(b.failureStreaks, vdrID2) - - // Ensure the benched queue root has the min end time - minEndTime := b.benchedQueue[0].benchedUntil - benchedIDs := []ids.NodeID{vdrID0, vdrID1, vdrID4} - for _, benchedVdr := range b.benchedQueue { - require.Contains(benchedIDs, benchedVdr.nodeID) - require.False(benchedVdr.benchedUntil.Before(minEndTime)) - } - b.lock.Unlock() } @@ -348,18 +340,10 @@ func TestBenchlistRemove(t *testing.T) { require.True(b.isBenched(vdrID0)) require.True(b.isBenched(vdrID1)) require.True(b.isBenched(vdrID2)) - require.Equal(3, b.benchedQueue.Len()) + require.Equal(3, b.benchedHeap.Len()) require.Equal(3, b.benchlistSet.Len()) require.Empty(b.failureStreaks) - // Ensure the benched queue root has the min end time - minEndTime := b.benchedQueue[0].benchedUntil - benchedIDs := []ids.NodeID{vdrID0, vdrID1, vdrID2} - for _, benchedVdr := range b.benchedQueue { - require.Contains(benchedIDs, benchedVdr.nodeID) - require.False(benchedVdr.benchedUntil.Before(minEndTime)) - } - // Set the benchlist's clock past when all validators should be unbenched // so that when its timer fires, it can remove them b.clock.Set(b.clock.Time().Add(duration)) diff --git a/utils/heap/map.go b/utils/heap/map.go new file mode 100644 index 000000000000..dbe06c06446e --- /dev/null +++ b/utils/heap/map.go @@ -0,0 +1,132 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package heap + +import ( + "container/heap" + + "github.com/ava-labs/avalanchego/utils" +) + +var _ heap.Interface = (*indexedQueue[int, int])(nil) + +func MapValues[K comparable, V any](m Map[K, V]) []V { + result := make([]V, 0, m.Len()) + for _, e := range m.queue.entries { + result = append(result, e.v) + } + return result +} + +// NewMap returns a heap without duplicates ordered by its values +func NewMap[K comparable, V any](less func(a, b V) bool) Map[K, V] { + return Map[K, V]{ + queue: &indexedQueue[K, V]{ + queue: queue[entry[K, V]]{ + less: func(a, b entry[K, V]) bool { + return less(a.v, b.v) + }, + }, + index: make(map[K]int), + }, + } +} + +type Map[K comparable, V any] struct { + queue *indexedQueue[K, V] +} + +// Push returns the evicted previous value if present +func (m *Map[K, V]) Push(k K, v V) (V, bool) { + if i, ok := m.queue.index[k]; ok { + prev := m.queue.entries[i] + m.queue.entries[i].v = v + heap.Fix(m.queue, i) + return prev.v, true + } + + heap.Push(m.queue, entry[K, V]{k: k, v: v}) + return utils.Zero[V](), false +} + +func (m *Map[K, V]) Pop() (K, V, bool) { + if m.Len() == 0 { + return utils.Zero[K](), utils.Zero[V](), false + } + + popped := heap.Pop(m.queue).(entry[K, V]) + return popped.k, popped.v, true +} + +func (m *Map[K, V]) Peek() (K, V, bool) { + if m.Len() == 0 { + return utils.Zero[K](), utils.Zero[V](), false + } + + entry := m.queue.entries[0] + return entry.k, entry.v, true +} + +func (m *Map[K, V]) Len() int { + return m.queue.Len() +} + +func (m *Map[K, V]) Remove(k K) (V, bool) { + if i, ok := m.queue.index[k]; ok { + removed := heap.Remove(m.queue, i).(entry[K, V]) + return removed.v, true + } + return utils.Zero[V](), false +} + +func (m *Map[K, V]) Contains(k K) bool { + _, ok := m.queue.index[k] + return ok +} + +func (m *Map[K, V]) Get(k K) (V, bool) { + if i, ok := m.queue.index[k]; ok { + got := m.queue.entries[i] + return got.v, true + } + return utils.Zero[V](), false +} + +func (m *Map[K, V]) Fix(k K) { + if i, ok := m.queue.index[k]; ok { + heap.Fix(m.queue, i) + } +} + +type indexedQueue[K comparable, V any] struct { + queue[entry[K, V]] + index map[K]int +} + +func (h *indexedQueue[K, V]) Swap(i, j int) { + h.entries[i], h.entries[j] = h.entries[j], h.entries[i] + h.index[h.entries[i].k], h.index[h.entries[j].k] = i, j +} + +func (h *indexedQueue[K, V]) Push(x any) { + entry := x.(entry[K, V]) + h.entries = append(h.entries, entry) + h.index[entry.k] = len(h.index) +} + +func (h *indexedQueue[K, V]) Pop() any { + end := len(h.entries) - 1 + + popped := h.entries[end] + h.entries[end] = entry[K, V]{} + h.entries = h.entries[:end] + + delete(h.index, popped.k) + return popped +} + +type entry[K any, V any] struct { + k K + v V +} diff --git a/utils/heap/map_test.go b/utils/heap/map_test.go new file mode 100644 index 000000000000..cc774a5a50df --- /dev/null +++ b/utils/heap/map_test.go @@ -0,0 +1,96 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package heap + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMap(t *testing.T) { + tests := []struct { + name string + setup func(h Map[string, int]) + expected []entry[string, int] + }{ + { + name: "only push", + setup: func(h Map[string, int]) { + h.Push("a", 1) + h.Push("b", 2) + h.Push("c", 3) + }, + expected: []entry[string, int]{ + {k: "a", v: 1}, + {k: "b", v: 2}, + {k: "c", v: 3}, + }, + }, + { + name: "out of order pushes", + setup: func(h Map[string, int]) { + h.Push("a", 1) + h.Push("e", 5) + h.Push("b", 2) + h.Push("d", 4) + h.Push("c", 3) + }, + expected: []entry[string, int]{ + {"a", 1}, + {"b", 2}, + {"c", 3}, + {"d", 4}, + {"e", 5}, + }, + }, + { + name: "push and pop", + setup: func(m Map[string, int]) { + m.Push("a", 1) + m.Push("e", 5) + m.Push("b", 2) + m.Push("d", 4) + m.Push("c", 3) + m.Pop() + m.Pop() + m.Pop() + }, + expected: []entry[string, int]{ + {"d", 4}, + {"e", 5}, + }, + }, + { + name: "duplicate key is overridden", + setup: func(h Map[string, int]) { + h.Push("a", 1) + h.Push("a", 2) + }, + expected: []entry[string, int]{ + {k: "a", v: 2}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + + h := NewMap[string, int](func(a, b int) bool { + return a < b + }) + + tt.setup(h) + + require.Equal(len(tt.expected), h.Len()) + for _, expected := range tt.expected { + k, v, ok := h.Pop() + require.True(ok) + require.Equal(expected.k, k) + require.Equal(expected.v, v) + } + }) + } +} diff --git a/utils/math/averager_heap.go b/utils/math/averager_heap.go index b09393b48803..070593f0eeb8 100644 --- a/utils/math/averager_heap.go +++ b/utils/math/averager_heap.go @@ -4,16 +4,13 @@ package math import ( - "container/heap" - "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/heap" ) -var ( - _ AveragerHeap = averagerHeap{} - _ heap.Interface = (*averagerHeapBackend)(nil) -) +var _ AveragerHeap = (*averagerHeap)(nil) +// TODO replace this interface with utils/heap // AveragerHeap maintains a heap of the averagers. type AveragerHeap interface { // Add the average to the heap. If [nodeID] is already in the heap, the @@ -33,113 +30,36 @@ type AveragerHeap interface { Len() int } -type averagerHeapEntry struct { - nodeID ids.NodeID - averager Averager - index int -} - -type averagerHeapBackend struct { - isMaxHeap bool - nodeIDToEntry map[ids.NodeID]*averagerHeapEntry - entries []*averagerHeapEntry -} - type averagerHeap struct { - b *averagerHeapBackend -} - -// NewMinAveragerHeap returns a new empty min heap. The returned heap is not -// thread safe. -func NewMinAveragerHeap() AveragerHeap { - return averagerHeap{b: &averagerHeapBackend{ - nodeIDToEntry: make(map[ids.NodeID]*averagerHeapEntry), - }} + heap heap.Map[ids.NodeID, Averager] } // NewMaxAveragerHeap returns a new empty max heap. The returned heap is not // thread safe. func NewMaxAveragerHeap() AveragerHeap { - return averagerHeap{b: &averagerHeapBackend{ - isMaxHeap: true, - nodeIDToEntry: make(map[ids.NodeID]*averagerHeapEntry), - }} + return averagerHeap{ + heap: heap.NewMap[ids.NodeID, Averager](func(a, b Averager) bool { + return a.Read() > b.Read() + }), + } } func (h averagerHeap) Add(nodeID ids.NodeID, averager Averager) (Averager, bool) { - if e, exists := h.b.nodeIDToEntry[nodeID]; exists { - oldAverager := e.averager - e.averager = averager - heap.Fix(h.b, e.index) - return oldAverager, true - } - - heap.Push(h.b, &averagerHeapEntry{ - nodeID: nodeID, - averager: averager, - }) - return nil, false + return h.heap.Push(nodeID, averager) } func (h averagerHeap) Remove(nodeID ids.NodeID) (Averager, bool) { - e, exists := h.b.nodeIDToEntry[nodeID] - if !exists { - return nil, false - } - heap.Remove(h.b, e.index) - return e.averager, true + return h.heap.Remove(nodeID) } func (h averagerHeap) Pop() (ids.NodeID, Averager, bool) { - if len(h.b.entries) == 0 { - return ids.EmptyNodeID, nil, false - } - e := h.b.entries[0] - heap.Pop(h.b) - return e.nodeID, e.averager, true + return h.heap.Pop() } func (h averagerHeap) Peek() (ids.NodeID, Averager, bool) { - if len(h.b.entries) == 0 { - return ids.EmptyNodeID, nil, false - } - e := h.b.entries[0] - return e.nodeID, e.averager, true + return h.heap.Peek() } func (h averagerHeap) Len() int { - return len(h.b.entries) -} - -func (h *averagerHeapBackend) Len() int { - return len(h.entries) -} - -func (h *averagerHeapBackend) Less(i, j int) bool { - if h.isMaxHeap { - return h.entries[i].averager.Read() > h.entries[j].averager.Read() - } - return h.entries[i].averager.Read() < h.entries[j].averager.Read() -} - -func (h *averagerHeapBackend) Swap(i, j int) { - h.entries[i], h.entries[j] = h.entries[j], h.entries[i] - h.entries[i].index = i - h.entries[j].index = j -} - -func (h *averagerHeapBackend) Push(x interface{}) { - e := x.(*averagerHeapEntry) - e.index = len(h.entries) - h.nodeIDToEntry[e.nodeID] = e - h.entries = append(h.entries, e) -} - -func (h *averagerHeapBackend) Pop() interface{} { - newLen := len(h.entries) - 1 - e := h.entries[newLen] - h.entries[newLen] = nil - delete(h.nodeIDToEntry, e.nodeID) - h.entries = h.entries[:newLen] - return e + return h.heap.Len() } diff --git a/utils/math/averager_heap_test.go b/utils/math/averager_heap_test.go index a979612952c4..0586eb77947e 100644 --- a/utils/math/averager_heap_test.go +++ b/utils/math/averager_heap_test.go @@ -20,19 +20,13 @@ func TestAveragerHeap(t *testing.T) { n2 := ids.GenerateTestNodeID() tests := []struct { - h AveragerHeap - a []Averager + name string + h AveragerHeap + a []Averager }{ { - h: NewMinAveragerHeap(), - a: []Averager{ - NewAverager(0, time.Second, time.Now()), - NewAverager(1, time.Second, time.Now()), - NewAverager(2, time.Second, time.Now()), - }, - }, - { - h: NewMaxAveragerHeap(), + name: "max heap", + h: NewMaxAveragerHeap(), a: []Averager{ NewAverager(0, time.Second, time.Now()), NewAverager(-1, time.Second, time.Now()), @@ -42,67 +36,69 @@ func TestAveragerHeap(t *testing.T) { } for _, test := range tests { - _, _, ok := test.h.Pop() - require.False(ok) + t.Run(test.name, func(t *testing.T) { + _, _, ok := test.h.Pop() + require.False(ok) - _, _, ok = test.h.Peek() - require.False(ok) + _, _, ok = test.h.Peek() + require.False(ok) - l := test.h.Len() - require.Zero(l) + l := test.h.Len() + require.Zero(l) - _, ok = test.h.Add(n1, test.a[1]) - require.False(ok) + _, ok = test.h.Add(n1, test.a[1]) + require.False(ok) - n, a, ok := test.h.Peek() - require.True(ok) - require.Equal(n1, n) - require.Equal(test.a[1], a) + n, a, ok := test.h.Peek() + require.True(ok) + require.Equal(n1, n) + require.Equal(test.a[1], a) - l = test.h.Len() - require.Equal(1, l) + l = test.h.Len() + require.Equal(1, l) - a, ok = test.h.Add(n1, test.a[1]) - require.True(ok) - require.Equal(test.a[1], a) + a, ok = test.h.Add(n1, test.a[1]) + require.True(ok) + require.Equal(test.a[1], a) - l = test.h.Len() - require.Equal(1, l) + l = test.h.Len() + require.Equal(1, l) - _, ok = test.h.Add(n0, test.a[0]) - require.False(ok) + _, ok = test.h.Add(n0, test.a[0]) + require.False(ok) - _, ok = test.h.Add(n2, test.a[2]) - require.False(ok) + _, ok = test.h.Add(n2, test.a[2]) + require.False(ok) - n, a, ok = test.h.Pop() - require.True(ok) - require.Equal(n0, n) - require.Equal(test.a[0], a) + n, a, ok = test.h.Pop() + require.True(ok) + require.Equal(n0, n) + require.Equal(test.a[0], a) - l = test.h.Len() - require.Equal(2, l) + l = test.h.Len() + require.Equal(2, l) - a, ok = test.h.Remove(n1) - require.True(ok) - require.Equal(test.a[1], a) + a, ok = test.h.Remove(n1) + require.True(ok) + require.Equal(test.a[1], a) - l = test.h.Len() - require.Equal(1, l) + l = test.h.Len() + require.Equal(1, l) - _, ok = test.h.Remove(n1) - require.False(ok) + _, ok = test.h.Remove(n1) + require.False(ok) - l = test.h.Len() - require.Equal(1, l) + l = test.h.Len() + require.Equal(1, l) - a, ok = test.h.Add(n2, test.a[0]) - require.True(ok) - require.Equal(test.a[2], a) + a, ok = test.h.Add(n2, test.a[0]) + require.True(ok) + require.Equal(test.a[2], a) - n, a, ok = test.h.Pop() - require.True(ok) - require.Equal(n2, n) - require.Equal(test.a[0], a) + n, a, ok = test.h.Pop() + require.True(ok) + require.Equal(n2, n) + require.Equal(test.a[0], a) + }) } } diff --git a/utils/timer/adaptive_timeout_manager.go b/utils/timer/adaptive_timeout_manager.go index 0a0a299cd1da..95b284a48c5f 100644 --- a/utils/timer/adaptive_timeout_manager.go +++ b/utils/timer/adaptive_timeout_manager.go @@ -4,7 +4,6 @@ package timer import ( - "container/heap" "errors" "fmt" "sync" @@ -13,6 +12,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/heap" "github.com/ava-labs/avalanchego/utils/math" "github.com/ava-labs/avalanchego/utils/timer/mockable" "github.com/ava-labs/avalanchego/utils/wrappers" @@ -24,12 +24,10 @@ var ( errInitialTimeoutBelowMinimum = errors.New("initial timeout cannot be less than minimum timeout") errTooSmallTimeoutCoefficient = errors.New("timeout coefficient must be >= 1") - _ heap.Interface = (*timeoutQueue)(nil) _ AdaptiveTimeoutManager = (*adaptiveTimeoutManager)(nil) ) type adaptiveTimeout struct { - index int // Index in the wait queue id ids.RequestID // Unique ID of this timeout handler func() // Function to execute if timed out duration time.Duration // How long this timeout was set for @@ -37,38 +35,6 @@ type adaptiveTimeout struct { measureLatency bool // Whether this request should impact latency } -type timeoutQueue []*adaptiveTimeout - -func (tq timeoutQueue) Len() int { - return len(tq) -} - -func (tq timeoutQueue) Less(i, j int) bool { - return tq[i].deadline.Before(tq[j].deadline) -} - -func (tq timeoutQueue) Swap(i, j int) { - tq[i], tq[j] = tq[j], tq[i] - tq[i].index = i - tq[j].index = j -} - -// Push adds an item to this priority queue. x must have type *adaptiveTimeout -func (tq *timeoutQueue) Push(x interface{}) { - item := x.(*adaptiveTimeout) - item.index = len(*tq) - *tq = append(*tq, item) -} - -// Pop returns the next item in this queue -func (tq *timeoutQueue) Pop() interface{} { - n := len(*tq) - item := (*tq)[n-1] - (*tq)[n-1] = nil // make sure the item is freed from memory - *tq = (*tq)[:n-1] - return item -} - // AdaptiveTimeoutConfig contains the parameters provided to the // adaptive timeout manager. type AdaptiveTimeoutConfig struct { @@ -120,8 +86,7 @@ type adaptiveTimeoutManager struct { minimumTimeout time.Duration maximumTimeout time.Duration currentTimeout time.Duration // Amount of time before a timeout - timeoutMap map[ids.RequestID]*adaptiveTimeout - timeoutQueue timeoutQueue + timeoutHeap heap.Map[ids.RequestID, *adaptiveTimeout] timer *Timer // Timer that will fire to clear the timeouts } @@ -166,7 +131,9 @@ func NewAdaptiveTimeoutManager( maximumTimeout: config.MaximumTimeout, currentTimeout: config.InitialTimeout, timeoutCoefficient: config.TimeoutCoefficient, - timeoutMap: make(map[ids.RequestID]*adaptiveTimeout), + timeoutHeap: heap.NewMap[ids.RequestID, *adaptiveTimeout](func(a, b *adaptiveTimeout) bool { + return a.deadline.Before(b.deadline) + }), } tm.timer = NewTimer(tm.timeout) tm.averager = math.NewAverager(float64(config.InitialTimeout), config.TimeoutHalflife, tm.clock.Time()) @@ -215,9 +182,8 @@ func (tm *adaptiveTimeoutManager) put(id ids.RequestID, measureLatency bool, han deadline: now.Add(tm.currentTimeout), measureLatency: measureLatency, } - tm.timeoutMap[id] = timeout - tm.numPendingTimeouts.Set(float64(len(tm.timeoutMap))) - heap.Push(&tm.timeoutQueue, timeout) + tm.timeoutHeap.Push(id, timeout) + tm.numPendingTimeouts.Set(float64(tm.timeoutHeap.Len())) tm.setNextTimeoutTime() } @@ -231,24 +197,18 @@ func (tm *adaptiveTimeoutManager) Remove(id ids.RequestID) { // Assumes [tm.lock] is held func (tm *adaptiveTimeoutManager) remove(id ids.RequestID, now time.Time) { - timeout, exists := tm.timeoutMap[id] + // Observe the response time to update average network response time. + timeout, exists := tm.timeoutHeap.Remove(id) if !exists { return } - // Observe the response time to update average network response time. if timeout.measureLatency { timeoutRegisteredAt := timeout.deadline.Add(-1 * timeout.duration) latency := now.Sub(timeoutRegisteredAt) tm.observeLatencyAndUpdateTimeout(latency, now) } - - // Remove the timeout from the map - delete(tm.timeoutMap, id) - tm.numPendingTimeouts.Set(float64(len(tm.timeoutMap))) - - // Remove the timeout from the queue - heap.Remove(&tm.timeoutQueue, timeout.index) + tm.numPendingTimeouts.Set(float64(tm.timeoutHeap.Len())) } // Assumes [tm.lock] is not held. @@ -300,11 +260,10 @@ func (tm *adaptiveTimeoutManager) observeLatencyAndUpdateTimeout(latency time.Du // returns nil. // Assumes [tm.lock] is held func (tm *adaptiveTimeoutManager) getNextTimeoutHandler(now time.Time) func() { - if tm.timeoutQueue.Len() == 0 { + _, nextTimeout, ok := tm.timeoutHeap.Peek() + if !ok { return nil } - - nextTimeout := tm.timeoutQueue[0] if nextTimeout.deadline.After(now) { return nil } @@ -315,14 +274,14 @@ func (tm *adaptiveTimeoutManager) getNextTimeoutHandler(now time.Time) func() { // Calculate the time of the next timeout and set // the timer to fire at that time. func (tm *adaptiveTimeoutManager) setNextTimeoutTime() { - if tm.timeoutQueue.Len() == 0 { + _, nextTimeout, ok := tm.timeoutHeap.Peek() + if !ok { // There are no pending timeouts tm.timer.Cancel() return } now := tm.clock.Time() - nextTimeout := tm.timeoutQueue[0] timeToNextTimeout := nextTimeout.deadline.Sub(now) tm.timer.SetTimeoutIn(timeToNextTimeout) } diff --git a/vms/platformvm/txs/txheap/by_age.go b/vms/platformvm/txs/txheap/by_age.go index a445822dd6e6..be888c437a0f 100644 --- a/vms/platformvm/txs/txheap/by_age.go +++ b/vms/platformvm/txs/txheap/by_age.go @@ -3,18 +3,15 @@ package txheap -var _ Heap = (*byAge)(nil) - -type byAge struct { - txHeap -} +import ( + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/heap" +) func NewByAge() Heap { - h := &byAge{} - h.initialize(h) - return h -} - -func (h *byAge) Less(i, j int) bool { - return h.txs[i].age < h.txs[j].age + return &txHeap{ + heap: heap.NewMap[ids.ID, heapTx](func(a, b heapTx) bool { + return a.age < b.age + }), + } } diff --git a/vms/platformvm/txs/txheap/by_end_time.go b/vms/platformvm/txs/txheap/by_end_time.go index 2b0cbd8d3817..ba144448919d 100644 --- a/vms/platformvm/txs/txheap/by_end_time.go +++ b/vms/platformvm/txs/txheap/by_end_time.go @@ -6,6 +6,8 @@ package txheap import ( "time" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/heap" "github.com/ava-labs/avalanchego/vms/platformvm/txs" ) @@ -16,15 +18,15 @@ type byEndTime struct { } func NewByEndTime() TimedHeap { - h := &byEndTime{} - h.initialize(h) - return h -} - -func (h *byEndTime) Less(i, j int) bool { - iTime := h.txs[i].tx.Unsigned.(txs.Staker).EndTime() - jTime := h.txs[j].tx.Unsigned.(txs.Staker).EndTime() - return iTime.Before(jTime) + return &byEndTime{ + txHeap: txHeap{ + heap: heap.NewMap[ids.ID, heapTx](func(a, b heapTx) bool { + aTime := a.tx.Unsigned.(txs.Staker).EndTime() + bTime := b.tx.Unsigned.(txs.Staker).EndTime() + return aTime.Before(bTime) + }), + }, + } } func (h *byEndTime) Timestamp() time.Time { diff --git a/vms/platformvm/txs/txheap/by_end_time_test.go b/vms/platformvm/txs/txheap/by_end_time_test.go index 33ddc3cc3d1a..8ea152d27e02 100644 --- a/vms/platformvm/txs/txheap/by_end_time_test.go +++ b/vms/platformvm/txs/txheap/by_end_time_test.go @@ -14,7 +14,7 @@ import ( "github.com/ava-labs/avalanchego/vms/secp256k1fx" ) -func TestByStopTime(t *testing.T) { +func TestByEndTime(t *testing.T) { require := require.New(t) txHeap := NewByEndTime() diff --git a/vms/platformvm/txs/txheap/by_start_time.go b/vms/platformvm/txs/txheap/by_start_time.go index 31834cf0603d..f19c28d76436 100644 --- a/vms/platformvm/txs/txheap/by_start_time.go +++ b/vms/platformvm/txs/txheap/by_start_time.go @@ -6,6 +6,8 @@ package txheap import ( "time" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/heap" "github.com/ava-labs/avalanchego/vms/platformvm/txs" ) @@ -22,15 +24,15 @@ type byStartTime struct { } func NewByStartTime() TimedHeap { - h := &byStartTime{} - h.initialize(h) - return h -} - -func (h *byStartTime) Less(i, j int) bool { - iTime := h.txs[i].tx.Unsigned.(txs.Staker).StartTime() - jTime := h.txs[j].tx.Unsigned.(txs.Staker).StartTime() - return iTime.Before(jTime) + return &byStartTime{ + txHeap: txHeap{ + heap: heap.NewMap[ids.ID, heapTx](func(a, b heapTx) bool { + aTime := a.tx.Unsigned.(txs.Staker).StartTime() + bTime := b.tx.Unsigned.(txs.Staker).StartTime() + return aTime.Before(bTime) + }), + }, + } } func (h *byStartTime) Timestamp() time.Time { diff --git a/vms/platformvm/txs/txheap/heap.go b/vms/platformvm/txs/txheap/heap.go index 4b6ba68614cb..3727bb891d92 100644 --- a/vms/platformvm/txs/txheap/heap.go +++ b/vms/platformvm/txs/txheap/heap.go @@ -4,14 +4,11 @@ package txheap import ( - "container/heap" - "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/heap" "github.com/ava-labs/avalanchego/vms/platformvm/txs" ) -var _ Heap = (*txHeap)(nil) - type Heap interface { Add(tx *txs.Tx) Get(txID ids.ID) *txs.Tx @@ -23,107 +20,57 @@ type Heap interface { } type heapTx struct { - tx *txs.Tx - index int - age int + tx *txs.Tx + age int } type txHeap struct { - self heap.Interface - - txIDToIndex map[ids.ID]int - txs []*heapTx - currentAge int -} - -func (h *txHeap) initialize(self heap.Interface) { - h.self = self - h.txIDToIndex = make(map[ids.ID]int) + heap heap.Map[ids.ID, heapTx] + currentAge int } func (h *txHeap) Add(tx *txs.Tx) { - heap.Push(h.self, tx) + txID := tx.ID() + if h.heap.Contains(txID) { + return + } + htx := heapTx{ + tx: tx, + age: h.currentAge, + } + h.currentAge++ + h.heap.Push(txID, htx) } func (h *txHeap) Get(txID ids.ID) *txs.Tx { - index, exists := h.txIDToIndex[txID] - if !exists { - return nil - } - return h.txs[index].tx + got, _ := h.heap.Get(txID) + return got.tx } func (h *txHeap) List() []*txs.Tx { - res := make([]*txs.Tx, 0, len(h.txs)) - for _, tx := range h.txs { + heapTxs := heap.MapValues(h.heap) + res := make([]*txs.Tx, 0, len(heapTxs)) + for _, tx := range heapTxs { res = append(res, tx.tx) } return res } func (h *txHeap) Remove(txID ids.ID) *txs.Tx { - index, exists := h.txIDToIndex[txID] - if !exists { - return nil - } - return heap.Remove(h.self, index).(*txs.Tx) + removed, _ := h.heap.Remove(txID) + return removed.tx } func (h *txHeap) Peek() *txs.Tx { - return h.txs[0].tx + _, peeked, _ := h.heap.Peek() + return peeked.tx } func (h *txHeap) RemoveTop() *txs.Tx { - return heap.Pop(h.self).(*txs.Tx) + _, popped, _ := h.heap.Pop() + return popped.tx } func (h *txHeap) Len() int { - return len(h.txs) -} - -func (h *txHeap) Swap(i, j int) { - // The follow "i"s and "j"s are intentionally swapped to perform the actual - // swap - iTx := h.txs[j] - jTx := h.txs[i] - - iTx.index = i - jTx.index = j - h.txs[i] = iTx - h.txs[j] = jTx - - iTxID := iTx.tx.ID() - jTxID := jTx.tx.ID() - h.txIDToIndex[iTxID] = i - h.txIDToIndex[jTxID] = j -} - -func (h *txHeap) Push(x interface{}) { - tx := x.(*txs.Tx) - - txID := tx.ID() - _, exists := h.txIDToIndex[txID] - if exists { - return - } - htx := &heapTx{ - tx: tx, - index: len(h.txs), - age: h.currentAge, - } - h.currentAge++ - h.txIDToIndex[txID] = htx.index - h.txs = append(h.txs, htx) -} - -func (h *txHeap) Pop() interface{} { - newLen := len(h.txs) - 1 - htx := h.txs[newLen] - h.txs[newLen] = nil - h.txs = h.txs[:newLen] - - tx := htx.tx - txID := tx.ID() - delete(h.txIDToIndex, txID) - return tx + return h.heap.Len() } diff --git a/x/sync/peer_tracker.go b/x/sync/peer_tracker.go index e1d471cc40ab..7c105f3363af 100644 --- a/x/sync/peer_tracker.go +++ b/x/sync/peer_tracker.go @@ -14,6 +14,7 @@ import ( "go.uber.org/zap" "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/heap" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/utils/wrappers" @@ -55,7 +56,7 @@ type peerTracker struct { // Peers that we're connected to that responded to the last request they were sent. responsivePeers set.Set[ids.NodeID] // Max heap that contains the average bandwidth of peers. - bandwidthHeap safemath.AveragerHeap + bandwidthHeap heap.Map[ids.NodeID, safemath.Averager] averageBandwidth safemath.Averager log logging.Logger numTrackedPeers prometheus.Gauge @@ -69,10 +70,12 @@ func newPeerTracker( registerer prometheus.Registerer, ) (*peerTracker, error) { t := &peerTracker{ - peers: make(map[ids.NodeID]*peerInfo), - trackedPeers: make(set.Set[ids.NodeID]), - responsivePeers: make(set.Set[ids.NodeID]), - bandwidthHeap: safemath.NewMaxAveragerHeap(), + peers: make(map[ids.NodeID]*peerInfo), + trackedPeers: make(set.Set[ids.NodeID]), + responsivePeers: make(set.Set[ids.NodeID]), + bandwidthHeap: heap.NewMap[ids.NodeID, safemath.Averager](func(a, b safemath.Averager) bool { + return a.Read() > b.Read() + }), averageBandwidth: safemath.NewAverager(0, bandwidthHalflife, time.Now()), log: log, numTrackedPeers: prometheus.NewGauge( @@ -212,7 +215,7 @@ func (p *peerTracker) TrackBandwidth(nodeID ids.NodeID, bandwidth float64) { } else { peer.bandwidth.Observe(bandwidth, now) } - p.bandwidthHeap.Add(nodeID, peer.bandwidth) + p.bandwidthHeap.Push(nodeID, peer.bandwidth) if bandwidth == 0 { p.responsivePeers.Remove(nodeID)