Skip to content

Commit

Permalink
core/vote/vote_pool: protect pool against malicious peer (#309)
Browse files Browse the repository at this point in the history
This commit limits the number of votes in future queue per peer as the vote in
future queue is not fully verified. The check happens before the costly
basicVerify which has to verify the BLS signature.
  • Loading branch information
minh-bq committed Sep 7, 2023
1 parent 08f5245 commit 6720da8
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 68 deletions.
3 changes: 2 additions & 1 deletion core/vote/vote_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ func (voteManager *VoteManager) loop() {
rawdb.WriteHighestFinalityVote(voteManager.db, curHead.Number.Uint64())

log.Debug("vote manager produced vote", "votedBlockNumber", voteMessage.Data.TargetNumber, "votedBlockHash", voteMessage.Data.TargetHash, "voteMessageHash", voteMessage.Hash())
voteManager.pool.PutVote(voteMessage)
// This is a local vote so just pass the dummy peer information
voteManager.pool.PutVote("", voteMessage)
votesManagerCounter.Inc(1)
}
case <-voteManager.chainHeadSub.Err():
Expand Down
105 changes: 55 additions & 50 deletions core/vote/vote_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,18 @@ import (
"container/heap"
"sync"

mapset "github.com/deckarep/golang-set"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
)

const (
maxFutureVoteAmountPerBlock = 50
maxFutureVotePerPeer = 25

voteBufferForPut = 256
// votes in the range (currentBlockNum-256,currentBlockNum+11] will be stored
Expand All @@ -28,11 +26,6 @@ const (
)

var (
localCurVotesCounter = metrics.NewRegisteredCounter("curVotes/local", nil)
localFutureVotesCounter = metrics.NewRegisteredCounter("futureVotes/local", nil)

localReceivedVotesGauge = metrics.NewRegisteredGauge("receivedVotes/local", nil)

localCurVotesPqGauge = metrics.NewRegisteredGauge("curVotesPq/local", nil)
localFutureVotesPqGauge = metrics.NewRegisteredGauge("futureVotesPq/local", nil)
)
Expand All @@ -42,16 +35,19 @@ type VoteBox struct {
voteMessages []*types.VoteEnvelope
}

// voteWithPeer is a wrapper around VoteEnvelop to include peer information
type voteWithPeer struct {
vote *types.VoteEnvelope
peer string
}

type VotePool struct {
chain *core.BlockChain
chainconfig *params.ChainConfig
mu sync.RWMutex
chain *core.BlockChain
mu sync.RWMutex

votesFeed event.Feed
scope event.SubscriptionScope

receivedVotes mapset.Set

curVotes map[common.Hash]*VoteBox
futureVotes map[common.Hash]*VoteBox

Expand All @@ -61,32 +57,34 @@ type VotePool struct {
chainHeadCh chan core.ChainHeadEvent
chainHeadSub event.Subscription

votesCh chan *types.VoteEnvelope
votesCh chan *voteWithPeer

engine consensus.FastFinalityPoSA
maxCurVoteAmountPerBlock int

numFutureVotePerPeer map[string]uint64 // number of queued votes per peer
originatedFrom map[common.Hash]string // mapping from vote hash to the sender
}

type votesPriorityQueue []*types.VoteData

func NewVotePool(
chainconfig *params.ChainConfig,
chain *core.BlockChain,
engine consensus.FastFinalityPoSA,
maxCurVoteAmountPerBlock int,
) *VotePool {
votePool := &VotePool{
chain: chain,
chainconfig: chainconfig,
receivedVotes: mapset.NewSet(),
curVotes: make(map[common.Hash]*VoteBox),
futureVotes: make(map[common.Hash]*VoteBox),
curVotesPq: &votesPriorityQueue{},
futureVotesPq: &votesPriorityQueue{},
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
votesCh: make(chan *types.VoteEnvelope, voteBufferForPut),
votesCh: make(chan *voteWithPeer, voteBufferForPut),
engine: engine,
maxCurVoteAmountPerBlock: maxCurVoteAmountPerBlock,
numFutureVotePerPeer: make(map[string]uint64),
originatedFrom: make(map[common.Hash]string),
}

// Subscribe events from blockchain and start the main event loop.
Expand Down Expand Up @@ -117,11 +115,14 @@ func (pool *VotePool) loop() {
}
}

func (pool *VotePool) PutVote(vote *types.VoteEnvelope) {
pool.votesCh <- vote
func (pool *VotePool) PutVote(peer string, vote *types.VoteEnvelope) {
pool.votesCh <- &voteWithPeer{vote: vote, peer: peer}
}

func (pool *VotePool) putIntoVotePool(vote *types.VoteEnvelope) bool {
func (pool *VotePool) putIntoVotePool(voteWithPeerInfo *voteWithPeer) bool {
vote := voteWithPeerInfo.vote
peer := voteWithPeerInfo.peer

targetNumber := vote.Data.TargetNumber
targetHash := vote.Data.TargetHash
header := pool.chain.CurrentBlock().Header()
Expand All @@ -133,6 +134,16 @@ func (pool *VotePool) putIntoVotePool(vote *types.VoteEnvelope) bool {
return false
}

pool.mu.Lock()
defer pool.mu.Unlock()

voteHash := vote.Hash()
if _, ok := pool.originatedFrom[voteHash]; ok {
log.Debug("Vote pool already contained the same vote", "voteHash", voteHash)
return false
}
pool.originatedFrom[voteHash] = peer

voteData := &types.VoteData{
TargetNumber: targetNumber,
TargetHash: targetHash,
Expand All @@ -152,8 +163,19 @@ func (pool *VotePool) putIntoVotePool(vote *types.VoteEnvelope) bool {
votesPq = pool.curVotesPq
}

voteHash := vote.Hash()
if isFutureVote {
// As we cannot fully verify the future vote, we need to set a limit of
// future votes per peer to void be DOSed by peer.
if pool.numFutureVotePerPeer[peer] >= maxFutureVotePerPeer {
return false
}
pool.numFutureVotePerPeer[peer]++
}

if ok := pool.basicVerify(vote, headNumber, votes, isFutureVote, voteHash); !ok {
if isFutureVote {
pool.numFutureVotePerPeer[peer]--
}
return false
}

Expand All @@ -177,14 +199,13 @@ func (pool *VotePool) SubscribeNewVoteEvent(ch chan<- core.NewVoteEvent) event.S
return pool.scope.Track(pool.votesFeed.Subscribe(ch))
}

// The vote pool's mutex must already be acquired when calling this function
func (pool *VotePool) putVote(m map[common.Hash]*VoteBox, votesPq *votesPriorityQueue, vote *types.VoteEnvelope, voteData *types.VoteData, voteHash common.Hash, isFutureVote bool) {
targetHash := vote.Data.TargetHash
targetNumber := vote.Data.TargetNumber

log.Debug("The vote info to put is:", "voteBlockNumber", targetNumber, "voteBlockHash", targetHash)

pool.mu.Lock()
defer pool.mu.Unlock()
if _, ok := m[targetHash]; !ok {
// Push into votes priorityQueue if not exist in corresponding votes Map.
// To be noted: will not put into priorityQueue if exists in map to avoid duplicate element with the same voteData.
Expand All @@ -204,16 +225,7 @@ func (pool *VotePool) putVote(m map[common.Hash]*VoteBox, votesPq *votesPriority

// Put into corresponding votes map.
m[targetHash].voteMessages = append(m[targetHash].voteMessages, vote)
// Add into received vote to avoid future duplicated vote comes.
pool.receivedVotes.Add(voteHash)
log.Debug("VoteHash put into votepool is:", "voteHash", voteHash)

if isFutureVote {
localFutureVotesCounter.Inc(1)
} else {
localCurVotesCounter.Inc(1)
}
localReceivedVotesGauge.Update(int64(pool.receivedVotes.Cardinality()))
}

func (pool *VotePool) transferVotesFromFutureToCur(latestBlockHeader *types.Header) {
Expand Down Expand Up @@ -247,6 +259,7 @@ func (pool *VotePool) transferVotesFromFutureToCur(latestBlockHeader *types.Head
}
}

// The vote pool's mutex must already be acquired when calling this function
func (pool *VotePool) transfer(blockHash common.Hash) {
curPq, futurePq := pool.curVotesPq, pool.futureVotesPq
curVotes, futureVotes := pool.curVotes, pool.futureVotes
Expand All @@ -263,7 +276,6 @@ func (pool *VotePool) transfer(blockHash common.Hash) {
for _, vote := range voteBox.voteMessages {
// Verify if the vote comes from valid validators based on voteAddress (BLSPublicKey).
if pool.engine.VerifyVote(pool.chain, vote) != nil {
pool.receivedVotes.Remove(vote.Hash())
continue
}

Expand All @@ -282,10 +294,15 @@ func (pool *VotePool) transfer(blockHash common.Hash) {
curVotes[blockHash].voteMessages = append(curVotes[blockHash].voteMessages, validVotes...)
}

for _, vote := range futureVotes[blockHash].voteMessages {
peer, ok := pool.originatedFrom[vote.Hash()]
if !ok {
log.Debug("Cannot find the sender of vote", "voteHash", vote.Hash())
continue
}
pool.numFutureVotePerPeer[peer]--
}
delete(futureVotes, blockHash)

localCurVotesCounter.Inc(int64(len(validVotes)))
localFutureVotesCounter.Dec(int64(len(voteBox.voteMessages)))
}

// Prune old data of duplicationSet, curVotePq and curVotesMap.
Expand All @@ -305,13 +322,10 @@ func (pool *VotePool) prune(latestBlockNumber uint64) {
// Prune duplicationSet.
for _, voteMessage := range voteMessages {
voteHash := voteMessage.Hash()
pool.receivedVotes.Remove(voteHash)
delete(pool.originatedFrom, voteHash)
}
// Prune curVotes Map.
delete(curVotes, blockHash)

localCurVotesCounter.Dec(int64(len(voteMessages)))
localReceivedVotesGauge.Update(int64(pool.receivedVotes.Cardinality()))
}
}
}
Expand Down Expand Up @@ -340,16 +354,7 @@ func (pool *VotePool) FetchVoteByBlockHash(blockHash common.Hash) []*types.VoteE

func (pool *VotePool) basicVerify(vote *types.VoteEnvelope, headNumber uint64, m map[common.Hash]*VoteBox, isFutureVote bool, voteHash common.Hash) bool {
targetHash := vote.Data.TargetHash
pool.mu.RLock()
defer pool.mu.RUnlock()

// Check duplicate voteMessage firstly.
if pool.receivedVotes.Contains(voteHash) {
log.Debug("Vote pool already contained the same vote", "voteHash", voteHash)
return false
}

// TODO: Find a better solution to prevent DOS
// To prevent DOS attacks, make sure no more than 21 votes per blockHash if not futureVotes
// and no more than 50 votes per blockHash if futureVotes.
maxVoteAmountPerBlock := pool.maxCurVoteAmountPerBlock
Expand Down
Loading

0 comments on commit 6720da8

Please sign in to comment.