From 114075e28e2d0a7b9d7413ad2cc2c953a38f59bd Mon Sep 17 00:00:00 2001 From: Bui Quang Minh Date: Tue, 1 Aug 2023 18:21:06 +0700 Subject: [PATCH 1/2] vote/vote_pool: reject older than justified block number vote The vote which targets the block number that is older or equal to justified block number is not useful anymore. We can reject and prune those votes from pool. --- core/vote/vote_pool.go | 49 +++++++++++++++++++++++-------------- core/vote/vote_pool_test.go | 8 ++---- 2 files changed, 33 insertions(+), 24 deletions(-) diff --git a/core/vote/vote_pool.go b/core/vote/vote_pool.go index 32446d93a0..04521de79d 100644 --- a/core/vote/vote_pool.go +++ b/core/vote/vote_pool.go @@ -68,6 +68,7 @@ type VotePool struct { numFutureVotePerPeer map[string]uint64 // number of queued votes per peer originatedFrom map[common.Hash]string // mapping from vote hash to the sender + justifiedBlockNumber uint64 } type votesPriorityQueue []*types.VoteData @@ -106,8 +107,13 @@ func (pool *VotePool) loop() { case ev := <-pool.chainHeadCh: if ev.Block != nil { latestBlockNumber := ev.Block.NumberU64() + justifiedBlockNumber, _ := pool.engine.GetJustifiedBlock(pool.chain, ev.Block.NumberU64(), ev.Block.Hash()) + + pool.mu.Lock() + pool.justifiedBlockNumber = justifiedBlockNumber pool.prune(latestBlockNumber) pool.transferVotesFromFutureToCur(ev.Block.Header()) + pool.mu.Unlock() } case <-pool.chainHeadSub.Err(): return @@ -141,6 +147,11 @@ func (pool *VotePool) putIntoVotePool(voteWithPeerInfo *voteWithPeer) bool { pool.mu.Lock() defer pool.mu.Unlock() + if targetNumber <= pool.justifiedBlockNumber { + log.Debug("BlockNumber of vote is older than justified block number") + return false + } + voteHash := vote.Hash() if _, ok := pool.originatedFrom[voteHash]; ok { log.Debug("Vote pool already contained the same vote", "voteHash", voteHash) @@ -232,10 +243,8 @@ func (pool *VotePool) putVote(m map[common.Hash]*VoteBox, votesPq *votesPriority log.Debug("VoteHash put into votepool is:", "voteHash", voteHash) } +// The caller must hold the pool mutex func (pool *VotePool) transferVotesFromFutureToCur(latestBlockHeader *types.Header) { - pool.mu.Lock() - defer pool.mu.Unlock() - futurePq := pool.futureVotesPq latestBlockNumber := latestBlockHeader.Number.Uint64() @@ -310,26 +319,30 @@ func (pool *VotePool) transfer(blockHash common.Hash) { } // Prune old data of duplicationSet, curVotePq and curVotesMap. +// The caller must hold the pool mutex func (pool *VotePool) prune(latestBlockNumber uint64) { - pool.mu.Lock() - defer pool.mu.Unlock() curVotes := pool.curVotes curVotesPq := pool.curVotesPq - // delete votes in the range [,latestBlockNumber-lowerLimitOfVoteBlockNumber] - for curVotesPq.Len() > 0 && curVotesPq.Peek().TargetNumber+lowerLimitOfVoteBlockNumber-1 < latestBlockNumber { - // Prune curPriorityQueue. - blockHash := heap.Pop(curVotesPq).(*types.VoteData).TargetHash - localCurVotesPqGauge.Update(int64(curVotesPq.Len())) - if voteBox, ok := curVotes[blockHash]; ok { - voteMessages := voteBox.voteMessages - // Prune duplicationSet. - for _, voteMessage := range voteMessages { - voteHash := voteMessage.Hash() - delete(pool.originatedFrom, voteHash) + // delete votes older than or equal to latestBlockNumber-lowerLimitOfVoteBlockNumber or justified block number + for curVotesPq.Len() > 0 { + vote := curVotesPq.Peek() + if vote.TargetNumber+lowerLimitOfVoteBlockNumber-1 < latestBlockNumber || vote.TargetNumber <= pool.justifiedBlockNumber { + // Prune curPriorityQueue. + blockHash := heap.Pop(curVotesPq).(*types.VoteData).TargetHash + localCurVotesPqGauge.Update(int64(curVotesPq.Len())) + if voteBox, ok := curVotes[blockHash]; ok { + voteMessages := voteBox.voteMessages + // Prune duplicationSet. + for _, voteMessage := range voteMessages { + voteHash := voteMessage.Hash() + delete(pool.originatedFrom, voteHash) + } + // Prune curVotes Map. + delete(curVotes, blockHash) } - // Prune curVotes Map. - delete(curVotes, blockHash) + } else { + break } } } diff --git a/core/vote/vote_pool_test.go b/core/vote/vote_pool_test.go index ada0f3d2e4..8151695b65 100644 --- a/core/vote/vote_pool_test.go +++ b/core/vote/vote_pool_test.go @@ -59,12 +59,8 @@ func newTestBackend() *testBackend { func (b *testBackend) IsMining() bool { return true } func (b *testBackend) EventMux() *event.TypeMux { return b.eventMux } -func (p *mockPOSA) GetJustifiedNumberAndHash(chain consensus.ChainHeaderReader, header *types.Header) (uint64, common.Hash, error) { - parentHeader := chain.GetHeaderByHash(header.ParentHash) - if parentHeader == nil { - return 0, common.Hash{}, fmt.Errorf("unexpected error") - } - return parentHeader.Number.Uint64(), parentHeader.Hash(), nil +func (p *mockPOSA) GetJustifiedBlock(chain consensus.ChainHeaderReader, blockNumber uint64, blockHash common.Hash) (uint64, common.Hash) { + return 0, common.Hash{} } func (m *mockPOSA) VerifyVote(chain consensus.ChainHeaderReader, vote *types.VoteEnvelope) error { From 926ac60710a536fc4f7c35f272e09a73a504f0d6 Mon Sep 17 00:00:00 2001 From: Bui Quang Minh Date: Wed, 2 Aug 2023 11:47:15 +0700 Subject: [PATCH 2/2] vote/vote_pool: prune the future vote map and queue --- core/vote/vote_pool.go | 43 +++++++++++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/core/vote/vote_pool.go b/core/vote/vote_pool.go index 04521de79d..0f6d495e35 100644 --- a/core/vote/vote_pool.go +++ b/core/vote/vote_pool.go @@ -318,28 +318,34 @@ func (pool *VotePool) transfer(blockHash common.Hash) { delete(futureVotes, blockHash) } -// Prune old data of duplicationSet, curVotePq and curVotesMap. -// The caller must hold the pool mutex -func (pool *VotePool) prune(latestBlockNumber uint64) { - curVotes := pool.curVotes - curVotesPq := pool.curVotesPq - +func (pool *VotePool) pruneVote( + latestBlockNumber uint64, + voteMap map[common.Hash]*VoteBox, + voteQueue *votesPriorityQueue, + isFuture bool, +) { // delete votes older than or equal to latestBlockNumber-lowerLimitOfVoteBlockNumber or justified block number - for curVotesPq.Len() > 0 { - vote := curVotesPq.Peek() + for voteQueue.Len() > 0 { + vote := voteQueue.Peek() if vote.TargetNumber+lowerLimitOfVoteBlockNumber-1 < latestBlockNumber || vote.TargetNumber <= pool.justifiedBlockNumber { - // Prune curPriorityQueue. - blockHash := heap.Pop(curVotesPq).(*types.VoteData).TargetHash - localCurVotesPqGauge.Update(int64(curVotesPq.Len())) - if voteBox, ok := curVotes[blockHash]; ok { + blockHash := heap.Pop(voteQueue).(*types.VoteData).TargetHash + + if isFuture { + localFutureVotesPqGauge.Update(int64(voteQueue.Len())) + } else { + localCurVotesPqGauge.Update(int64(voteQueue.Len())) + } + + if voteBox, ok := voteMap[blockHash]; ok { voteMessages := voteBox.voteMessages - // Prune duplicationSet. for _, voteMessage := range voteMessages { voteHash := voteMessage.Hash() + if peer := pool.originatedFrom[voteHash]; peer != "" && isFuture { + pool.numFutureVotePerPeer[peer]-- + } delete(pool.originatedFrom, voteHash) } - // Prune curVotes Map. - delete(curVotes, blockHash) + delete(voteMap, blockHash) } } else { break @@ -347,6 +353,13 @@ func (pool *VotePool) prune(latestBlockNumber uint64) { } } +// Prune old data of curVotes and futureVotes +// The caller must hold the pool mutex +func (pool *VotePool) prune(latestBlockNumber uint64) { + pool.pruneVote(latestBlockNumber, pool.curVotes, pool.curVotesPq, false) + pool.pruneVote(latestBlockNumber, pool.futureVotes, pool.futureVotesPq, true) +} + // GetVotes as batch. func (pool *VotePool) GetVotes() []*types.VoteEnvelope { pool.mu.RLock()