Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vote/vote_pool: reject older than justified block number vote #334

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 49 additions & 23 deletions core/vote/vote_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type VotePool struct {

numFutureVotePerPeer map[string]uint64 // number of queued votes per peer
originatedFrom map[common.Hash]string // mapping from vote hash to the sender
justifiedBlockNumber uint64
}

type votesPriorityQueue []*types.VoteData
Expand Down Expand Up @@ -106,8 +107,13 @@ func (pool *VotePool) loop() {
case ev := <-pool.chainHeadCh:
if ev.Block != nil {
latestBlockNumber := ev.Block.NumberU64()
justifiedBlockNumber, _ := pool.engine.GetJustifiedBlock(pool.chain, ev.Block.NumberU64(), ev.Block.Hash())

pool.mu.Lock()
pool.justifiedBlockNumber = justifiedBlockNumber
pool.prune(latestBlockNumber)
pool.transferVotesFromFutureToCur(ev.Block.Header())
pool.mu.Unlock()
}
case <-pool.chainHeadSub.Err():
return
Expand Down Expand Up @@ -141,6 +147,11 @@ func (pool *VotePool) putIntoVotePool(voteWithPeerInfo *voteWithPeer) bool {
pool.mu.Lock()
defer pool.mu.Unlock()

if targetNumber <= pool.justifiedBlockNumber {
log.Debug("BlockNumber of vote is older than justified block number")
return false
}

voteHash := vote.Hash()
if _, ok := pool.originatedFrom[voteHash]; ok {
log.Debug("Vote pool already contained the same vote", "voteHash", voteHash)
Expand Down Expand Up @@ -232,10 +243,8 @@ func (pool *VotePool) putVote(m map[common.Hash]*VoteBox, votesPq *votesPriority
log.Debug("VoteHash put into votepool is:", "voteHash", voteHash)
}

// The caller must hold the pool mutex
func (pool *VotePool) transferVotesFromFutureToCur(latestBlockHeader *types.Header) {
pool.mu.Lock()
defer pool.mu.Unlock()

futurePq := pool.futureVotesPq
latestBlockNumber := latestBlockHeader.Number.Uint64()

Expand Down Expand Up @@ -309,31 +318,48 @@ func (pool *VotePool) transfer(blockHash common.Hash) {
delete(futureVotes, blockHash)
}

// Prune old data of duplicationSet, curVotePq and curVotesMap.
func (pool *VotePool) prune(latestBlockNumber uint64) {
pool.mu.Lock()
defer pool.mu.Unlock()
curVotes := pool.curVotes
curVotesPq := pool.curVotesPq

// delete votes in the range [,latestBlockNumber-lowerLimitOfVoteBlockNumber]
for curVotesPq.Len() > 0 && curVotesPq.Peek().TargetNumber+lowerLimitOfVoteBlockNumber-1 < latestBlockNumber {
// Prune curPriorityQueue.
blockHash := heap.Pop(curVotesPq).(*types.VoteData).TargetHash
localCurVotesPqGauge.Update(int64(curVotesPq.Len()))
if voteBox, ok := curVotes[blockHash]; ok {
voteMessages := voteBox.voteMessages
// Prune duplicationSet.
for _, voteMessage := range voteMessages {
voteHash := voteMessage.Hash()
delete(pool.originatedFrom, voteHash)
func (pool *VotePool) pruneVote(
latestBlockNumber uint64,
voteMap map[common.Hash]*VoteBox,
voteQueue *votesPriorityQueue,
isFuture bool,
) {
// delete votes older than or equal to latestBlockNumber-lowerLimitOfVoteBlockNumber or justified block number
for voteQueue.Len() > 0 {
vote := voteQueue.Peek()
if vote.TargetNumber+lowerLimitOfVoteBlockNumber-1 < latestBlockNumber || vote.TargetNumber <= pool.justifiedBlockNumber {
blockHash := heap.Pop(voteQueue).(*types.VoteData).TargetHash

if isFuture {
localFutureVotesPqGauge.Update(int64(voteQueue.Len()))
} else {
localCurVotesPqGauge.Update(int64(voteQueue.Len()))
}
// Prune curVotes Map.
delete(curVotes, blockHash)

if voteBox, ok := voteMap[blockHash]; ok {
voteMessages := voteBox.voteMessages
for _, voteMessage := range voteMessages {
voteHash := voteMessage.Hash()
if peer := pool.originatedFrom[voteHash]; peer != "" && isFuture {
pool.numFutureVotePerPeer[peer]--
}
delete(pool.originatedFrom, voteHash)
}
delete(voteMap, blockHash)
}
} else {
break
}
}
}

// Prune old data of curVotes and futureVotes
// The caller must hold the pool mutex
func (pool *VotePool) prune(latestBlockNumber uint64) {
pool.pruneVote(latestBlockNumber, pool.curVotes, pool.curVotesPq, false)
pool.pruneVote(latestBlockNumber, pool.futureVotes, pool.futureVotesPq, true)
}

// GetVotes as batch.
func (pool *VotePool) GetVotes() []*types.VoteEnvelope {
pool.mu.RLock()
Expand Down
8 changes: 2 additions & 6 deletions core/vote/vote_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,8 @@ func newTestBackend() *testBackend {
func (b *testBackend) IsMining() bool { return true }
func (b *testBackend) EventMux() *event.TypeMux { return b.eventMux }

func (p *mockPOSA) GetJustifiedNumberAndHash(chain consensus.ChainHeaderReader, header *types.Header) (uint64, common.Hash, error) {
parentHeader := chain.GetHeaderByHash(header.ParentHash)
if parentHeader == nil {
return 0, common.Hash{}, fmt.Errorf("unexpected error")
}
return parentHeader.Number.Uint64(), parentHeader.Hash(), nil
func (p *mockPOSA) GetJustifiedBlock(chain consensus.ChainHeaderReader, blockNumber uint64, blockHash common.Hash) (uint64, common.Hash) {
return 0, common.Hash{}
}

func (m *mockPOSA) VerifyVote(chain consensus.ChainHeaderReader, vote *types.VoteEnvelope) error {
Expand Down