Skip to content

Commit

Permalink
vote/vote_pool: reject older than justified block number vote (#334)
Browse files Browse the repository at this point in the history
* vote/vote_pool: reject older than justified block number vote

The vote which targets the block number that is older or equal to justified
block number is not useful anymore. We can reject and prune those votes from
pool.

* vote/vote_pool: prune the future vote map and queue
  • Loading branch information
minh-bq committed Sep 7, 2023
1 parent 3ad024b commit b3f32e8
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 29 deletions.
72 changes: 49 additions & 23 deletions core/vote/vote_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type VotePool struct {

numFutureVotePerPeer map[string]uint64 // number of queued votes per peer
originatedFrom map[common.Hash]string // mapping from vote hash to the sender
justifiedBlockNumber uint64
}

type votesPriorityQueue []*types.VoteData
Expand Down Expand Up @@ -106,8 +107,13 @@ func (pool *VotePool) loop() {
case ev := <-pool.chainHeadCh:
if ev.Block != nil {
latestBlockNumber := ev.Block.NumberU64()
justifiedBlockNumber, _ := pool.engine.GetJustifiedBlock(pool.chain, ev.Block.NumberU64(), ev.Block.Hash())

pool.mu.Lock()
pool.justifiedBlockNumber = justifiedBlockNumber
pool.prune(latestBlockNumber)
pool.transferVotesFromFutureToCur(ev.Block.Header())
pool.mu.Unlock()
}
case <-pool.chainHeadSub.Err():
return
Expand Down Expand Up @@ -145,6 +151,11 @@ func (pool *VotePool) putIntoVotePool(voteWithPeerInfo *voteWithPeer) bool {
pool.mu.Lock()
defer pool.mu.Unlock()

if targetNumber <= pool.justifiedBlockNumber {
log.Debug("BlockNumber of vote is older than justified block number")
return false
}

voteHash := vote.Hash()
if _, ok := pool.originatedFrom[voteHash]; ok {
log.Debug("Vote pool already contained the same vote", "voteHash", voteHash)
Expand Down Expand Up @@ -236,10 +247,8 @@ func (pool *VotePool) putVote(m map[common.Hash]*VoteBox, votesPq *votesPriority
log.Debug("VoteHash put into votepool is:", "voteHash", voteHash)
}

// The caller must hold the pool mutex
func (pool *VotePool) transferVotesFromFutureToCur(latestBlockHeader *types.Header) {
pool.mu.Lock()
defer pool.mu.Unlock()

futurePq := pool.futureVotesPq
latestBlockNumber := latestBlockHeader.Number.Uint64()

Expand Down Expand Up @@ -313,31 +322,48 @@ func (pool *VotePool) transfer(blockHash common.Hash) {
delete(futureVotes, blockHash)
}

// Prune old data of duplicationSet, curVotePq and curVotesMap.
func (pool *VotePool) prune(latestBlockNumber uint64) {
pool.mu.Lock()
defer pool.mu.Unlock()
curVotes := pool.curVotes
curVotesPq := pool.curVotesPq

// delete votes in the range [,latestBlockNumber-lowerLimitOfVoteBlockNumber]
for curVotesPq.Len() > 0 && curVotesPq.Peek().TargetNumber+lowerLimitOfVoteBlockNumber-1 < latestBlockNumber {
// Prune curPriorityQueue.
blockHash := heap.Pop(curVotesPq).(*types.VoteData).TargetHash
localCurVotesPqGauge.Update(int64(curVotesPq.Len()))
if voteBox, ok := curVotes[blockHash]; ok {
voteMessages := voteBox.voteMessages
// Prune duplicationSet.
for _, voteMessage := range voteMessages {
voteHash := voteMessage.Hash()
delete(pool.originatedFrom, voteHash)
func (pool *VotePool) pruneVote(
latestBlockNumber uint64,
voteMap map[common.Hash]*VoteBox,
voteQueue *votesPriorityQueue,
isFuture bool,
) {
// delete votes older than or equal to latestBlockNumber-lowerLimitOfVoteBlockNumber or justified block number
for voteQueue.Len() > 0 {
vote := voteQueue.Peek()
if vote.TargetNumber+lowerLimitOfVoteBlockNumber-1 < latestBlockNumber || vote.TargetNumber <= pool.justifiedBlockNumber {
blockHash := heap.Pop(voteQueue).(*types.VoteData).TargetHash

if isFuture {
localFutureVotesPqGauge.Update(int64(voteQueue.Len()))
} else {
localCurVotesPqGauge.Update(int64(voteQueue.Len()))
}
// Prune curVotes Map.
delete(curVotes, blockHash)

if voteBox, ok := voteMap[blockHash]; ok {
voteMessages := voteBox.voteMessages
for _, voteMessage := range voteMessages {
voteHash := voteMessage.Hash()
if peer := pool.originatedFrom[voteHash]; peer != "" && isFuture {
pool.numFutureVotePerPeer[peer]--
}
delete(pool.originatedFrom, voteHash)
}
delete(voteMap, blockHash)
}
} else {
break
}
}
}

// Prune old data of curVotes and futureVotes
// The caller must hold the pool mutex
func (pool *VotePool) prune(latestBlockNumber uint64) {
pool.pruneVote(latestBlockNumber, pool.curVotes, pool.curVotesPq, false)
pool.pruneVote(latestBlockNumber, pool.futureVotes, pool.futureVotesPq, true)
}

// GetVotes as batch.
func (pool *VotePool) GetVotes() []*types.VoteEnvelope {
pool.mu.RLock()
Expand Down
8 changes: 2 additions & 6 deletions core/vote/vote_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,8 @@ func newTestBackend() *testBackend {
func (b *testBackend) IsMining() bool { return true }
func (b *testBackend) EventMux() *event.TypeMux { return b.eventMux }

func (p *mockPOSA) GetJustifiedNumberAndHash(chain consensus.ChainHeaderReader, header *types.Header) (uint64, common.Hash, error) {
parentHeader := chain.GetHeaderByHash(header.ParentHash)
if parentHeader == nil {
return 0, common.Hash{}, fmt.Errorf("unexpected error")
}
return parentHeader.Number.Uint64(), parentHeader.Hash(), nil
func (p *mockPOSA) GetJustifiedBlock(chain consensus.ChainHeaderReader, blockNumber uint64, blockHash common.Hash) (uint64, common.Hash) {
return 0, common.Hash{}
}

func (m *mockPOSA) VerifyVote(chain consensus.ChainHeaderReader, vote *types.VoteEnvelope) error {
Expand Down

0 comments on commit b3f32e8

Please sign in to comment.