From 6720da8cc2968ea49e87650fcb4338cc7568af3a Mon Sep 17 00:00:00 2001 From: minh-bq <97180373+minh-bq@users.noreply.github.com> Date: Thu, 13 Jul 2023 11:09:07 +0700 Subject: [PATCH] core/vote/vote_pool: protect pool against malicious peer (#309) This commit limits the number of votes in future queue per peer as the vote in future queue is not fully verified. The check happens before the costly basicVerify which has to verify the BLS signature. --- core/vote/vote_manager.go | 3 +- core/vote/vote_pool.go | 105 +++++++++++++------------- core/vote/vote_pool_test.go | 142 ++++++++++++++++++++++++++++++++---- eth/backend.go | 2 +- 4 files changed, 184 insertions(+), 68 deletions(-) diff --git a/core/vote/vote_manager.go b/core/vote/vote_manager.go index 832f3e66f4..2760898d71 100644 --- a/core/vote/vote_manager.go +++ b/core/vote/vote_manager.go @@ -160,7 +160,8 @@ func (voteManager *VoteManager) loop() { rawdb.WriteHighestFinalityVote(voteManager.db, curHead.Number.Uint64()) log.Debug("vote manager produced vote", "votedBlockNumber", voteMessage.Data.TargetNumber, "votedBlockHash", voteMessage.Data.TargetHash, "voteMessageHash", voteMessage.Hash()) - voteManager.pool.PutVote(voteMessage) + // This is a local vote so just pass the dummy peer information + voteManager.pool.PutVote("", voteMessage) votesManagerCounter.Inc(1) } case <-voteManager.chainHeadSub.Err(): diff --git a/core/vote/vote_pool.go b/core/vote/vote_pool.go index d43b209b6c..dfce1cffad 100644 --- a/core/vote/vote_pool.go +++ b/core/vote/vote_pool.go @@ -4,8 +4,6 @@ import ( "container/heap" "sync" - mapset "github.com/deckarep/golang-set" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core" @@ -13,11 +11,11 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/params" ) const ( maxFutureVoteAmountPerBlock = 50 + maxFutureVotePerPeer = 25 voteBufferForPut = 256 // votes in the range (currentBlockNum-256,currentBlockNum+11] will be stored @@ -28,11 +26,6 @@ const ( ) var ( - localCurVotesCounter = metrics.NewRegisteredCounter("curVotes/local", nil) - localFutureVotesCounter = metrics.NewRegisteredCounter("futureVotes/local", nil) - - localReceivedVotesGauge = metrics.NewRegisteredGauge("receivedVotes/local", nil) - localCurVotesPqGauge = metrics.NewRegisteredGauge("curVotesPq/local", nil) localFutureVotesPqGauge = metrics.NewRegisteredGauge("futureVotesPq/local", nil) ) @@ -42,16 +35,19 @@ type VoteBox struct { voteMessages []*types.VoteEnvelope } +// voteWithPeer is a wrapper around VoteEnvelop to include peer information +type voteWithPeer struct { + vote *types.VoteEnvelope + peer string +} + type VotePool struct { - chain *core.BlockChain - chainconfig *params.ChainConfig - mu sync.RWMutex + chain *core.BlockChain + mu sync.RWMutex votesFeed event.Feed scope event.SubscriptionScope - receivedVotes mapset.Set - curVotes map[common.Hash]*VoteBox futureVotes map[common.Hash]*VoteBox @@ -61,32 +57,34 @@ type VotePool struct { chainHeadCh chan core.ChainHeadEvent chainHeadSub event.Subscription - votesCh chan *types.VoteEnvelope + votesCh chan *voteWithPeer engine consensus.FastFinalityPoSA maxCurVoteAmountPerBlock int + + numFutureVotePerPeer map[string]uint64 // number of queued votes per peer + originatedFrom map[common.Hash]string // mapping from vote hash to the sender } type votesPriorityQueue []*types.VoteData func NewVotePool( - chainconfig *params.ChainConfig, chain *core.BlockChain, engine consensus.FastFinalityPoSA, maxCurVoteAmountPerBlock int, ) *VotePool { votePool := &VotePool{ chain: chain, - chainconfig: chainconfig, - receivedVotes: mapset.NewSet(), curVotes: make(map[common.Hash]*VoteBox), futureVotes: make(map[common.Hash]*VoteBox), curVotesPq: &votesPriorityQueue{}, futureVotesPq: &votesPriorityQueue{}, chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), - votesCh: make(chan *types.VoteEnvelope, voteBufferForPut), + votesCh: make(chan *voteWithPeer, voteBufferForPut), engine: engine, maxCurVoteAmountPerBlock: maxCurVoteAmountPerBlock, + numFutureVotePerPeer: make(map[string]uint64), + originatedFrom: make(map[common.Hash]string), } // Subscribe events from blockchain and start the main event loop. @@ -117,11 +115,14 @@ func (pool *VotePool) loop() { } } -func (pool *VotePool) PutVote(vote *types.VoteEnvelope) { - pool.votesCh <- vote +func (pool *VotePool) PutVote(peer string, vote *types.VoteEnvelope) { + pool.votesCh <- &voteWithPeer{vote: vote, peer: peer} } -func (pool *VotePool) putIntoVotePool(vote *types.VoteEnvelope) bool { +func (pool *VotePool) putIntoVotePool(voteWithPeerInfo *voteWithPeer) bool { + vote := voteWithPeerInfo.vote + peer := voteWithPeerInfo.peer + targetNumber := vote.Data.TargetNumber targetHash := vote.Data.TargetHash header := pool.chain.CurrentBlock().Header() @@ -133,6 +134,16 @@ func (pool *VotePool) putIntoVotePool(vote *types.VoteEnvelope) bool { return false } + pool.mu.Lock() + defer pool.mu.Unlock() + + voteHash := vote.Hash() + if _, ok := pool.originatedFrom[voteHash]; ok { + log.Debug("Vote pool already contained the same vote", "voteHash", voteHash) + return false + } + pool.originatedFrom[voteHash] = peer + voteData := &types.VoteData{ TargetNumber: targetNumber, TargetHash: targetHash, @@ -152,8 +163,19 @@ func (pool *VotePool) putIntoVotePool(vote *types.VoteEnvelope) bool { votesPq = pool.curVotesPq } - voteHash := vote.Hash() + if isFutureVote { + // As we cannot fully verify the future vote, we need to set a limit of + // future votes per peer to void be DOSed by peer. + if pool.numFutureVotePerPeer[peer] >= maxFutureVotePerPeer { + return false + } + pool.numFutureVotePerPeer[peer]++ + } + if ok := pool.basicVerify(vote, headNumber, votes, isFutureVote, voteHash); !ok { + if isFutureVote { + pool.numFutureVotePerPeer[peer]-- + } return false } @@ -177,14 +199,13 @@ func (pool *VotePool) SubscribeNewVoteEvent(ch chan<- core.NewVoteEvent) event.S return pool.scope.Track(pool.votesFeed.Subscribe(ch)) } +// The vote pool's mutex must already be acquired when calling this function func (pool *VotePool) putVote(m map[common.Hash]*VoteBox, votesPq *votesPriorityQueue, vote *types.VoteEnvelope, voteData *types.VoteData, voteHash common.Hash, isFutureVote bool) { targetHash := vote.Data.TargetHash targetNumber := vote.Data.TargetNumber log.Debug("The vote info to put is:", "voteBlockNumber", targetNumber, "voteBlockHash", targetHash) - pool.mu.Lock() - defer pool.mu.Unlock() if _, ok := m[targetHash]; !ok { // Push into votes priorityQueue if not exist in corresponding votes Map. // To be noted: will not put into priorityQueue if exists in map to avoid duplicate element with the same voteData. @@ -204,16 +225,7 @@ func (pool *VotePool) putVote(m map[common.Hash]*VoteBox, votesPq *votesPriority // Put into corresponding votes map. m[targetHash].voteMessages = append(m[targetHash].voteMessages, vote) - // Add into received vote to avoid future duplicated vote comes. - pool.receivedVotes.Add(voteHash) log.Debug("VoteHash put into votepool is:", "voteHash", voteHash) - - if isFutureVote { - localFutureVotesCounter.Inc(1) - } else { - localCurVotesCounter.Inc(1) - } - localReceivedVotesGauge.Update(int64(pool.receivedVotes.Cardinality())) } func (pool *VotePool) transferVotesFromFutureToCur(latestBlockHeader *types.Header) { @@ -247,6 +259,7 @@ func (pool *VotePool) transferVotesFromFutureToCur(latestBlockHeader *types.Head } } +// The vote pool's mutex must already be acquired when calling this function func (pool *VotePool) transfer(blockHash common.Hash) { curPq, futurePq := pool.curVotesPq, pool.futureVotesPq curVotes, futureVotes := pool.curVotes, pool.futureVotes @@ -263,7 +276,6 @@ func (pool *VotePool) transfer(blockHash common.Hash) { for _, vote := range voteBox.voteMessages { // Verify if the vote comes from valid validators based on voteAddress (BLSPublicKey). if pool.engine.VerifyVote(pool.chain, vote) != nil { - pool.receivedVotes.Remove(vote.Hash()) continue } @@ -282,10 +294,15 @@ func (pool *VotePool) transfer(blockHash common.Hash) { curVotes[blockHash].voteMessages = append(curVotes[blockHash].voteMessages, validVotes...) } + for _, vote := range futureVotes[blockHash].voteMessages { + peer, ok := pool.originatedFrom[vote.Hash()] + if !ok { + log.Debug("Cannot find the sender of vote", "voteHash", vote.Hash()) + continue + } + pool.numFutureVotePerPeer[peer]-- + } delete(futureVotes, blockHash) - - localCurVotesCounter.Inc(int64(len(validVotes))) - localFutureVotesCounter.Dec(int64(len(voteBox.voteMessages))) } // Prune old data of duplicationSet, curVotePq and curVotesMap. @@ -305,13 +322,10 @@ func (pool *VotePool) prune(latestBlockNumber uint64) { // Prune duplicationSet. for _, voteMessage := range voteMessages { voteHash := voteMessage.Hash() - pool.receivedVotes.Remove(voteHash) + delete(pool.originatedFrom, voteHash) } // Prune curVotes Map. delete(curVotes, blockHash) - - localCurVotesCounter.Dec(int64(len(voteMessages))) - localReceivedVotesGauge.Update(int64(pool.receivedVotes.Cardinality())) } } } @@ -340,16 +354,7 @@ func (pool *VotePool) FetchVoteByBlockHash(blockHash common.Hash) []*types.VoteE func (pool *VotePool) basicVerify(vote *types.VoteEnvelope, headNumber uint64, m map[common.Hash]*VoteBox, isFutureVote bool, voteHash common.Hash) bool { targetHash := vote.Data.TargetHash - pool.mu.RLock() - defer pool.mu.RUnlock() - - // Check duplicate voteMessage firstly. - if pool.receivedVotes.Contains(voteHash) { - log.Debug("Vote pool already contained the same vote", "voteHash", voteHash) - return false - } - // TODO: Find a better solution to prevent DOS // To prevent DOS attacks, make sure no more than 21 votes per blockHash if not futureVotes // and no more than 50 votes per blockHash if futureVotes. maxVoteAmountPerBlock := pool.maxCurVoteAmountPerBlock diff --git a/core/vote/vote_pool_test.go b/core/vote/vote_pool_test.go index 76df0cf26f..63fb91d4d8 100644 --- a/core/vote/vote_pool_test.go +++ b/core/vote/vote_pool_test.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" + blsCommon "github.com/ethereum/go-ethereum/crypto/bls/common" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" @@ -74,10 +75,10 @@ func (m *mockPOSA) IsActiveValidatorAt(chain consensus.ChainHeaderReader, header return true } -func (pool *VotePool) verifyStructureSizeOfVotePool(receivedVotes, curVotes, futureVotes, curVotesPq, futureVotesPq int) bool { +func (pool *VotePool) verifyStructureSizeOfVotePool(curVotes, futureVotes, curVotesPq, futureVotesPq int) bool { for i := 0; i < timeThreshold; i++ { time.Sleep(1 * time.Second) - if pool.receivedVotes.Cardinality() == receivedVotes && len(pool.curVotes) == curVotes && len(pool.futureVotes) == futureVotes && pool.curVotesPq.Len() == curVotesPq && pool.futureVotesPq.Len() == futureVotesPq { + if len(pool.curVotes) == curVotes && len(pool.futureVotes) == futureVotes && pool.curVotesPq.Len() == curVotesPq && pool.futureVotesPq.Len() == futureVotesPq { return true } } @@ -108,7 +109,7 @@ func testVotePool(t *testing.T, isValidRules bool) { mockEngine := &mockPOSA{} // Create vote pool - votePool := NewVotePool(params.TestChainConfig, chain, mockEngine, 22) + votePool := NewVotePool(chain, mockEngine, 22) // Create vote manager // Create a temporary file for the votes journal @@ -154,13 +155,13 @@ func testVotePool(t *testing.T, isValidRules bool) { } if !isValidRules { - if votePool.verifyStructureSizeOfVotePool(11, 11, 0, 11, 0) { + if votePool.verifyStructureSizeOfVotePool(11, 0, 11, 0) { t.Fatalf("put vote failed") } return } - if !votePool.verifyStructureSizeOfVotePool(11, 11, 0, 11, 0) { + if !votePool.verifyStructureSizeOfVotePool(11, 0, 11, 0) { t.Fatalf("put vote failed") } @@ -185,7 +186,7 @@ func testVotePool(t *testing.T, isValidRules bool) { panic(err) } - if !votePool.verifyStructureSizeOfVotePool(12, 12, 0, 12, 0) { + if !votePool.verifyStructureSizeOfVotePool(12, 0, 12, 0) { t.Fatalf("put vote failed") } @@ -197,7 +198,7 @@ func testVotePool(t *testing.T, isValidRules bool) { } // currently chain size is 268, and votePool should be pruned, so vote pool size should be 256! - if !votePool.verifyStructureSizeOfVotePool(256, 256, 0, 256, 0) { + if !votePool.verifyStructureSizeOfVotePool(256, 0, 256, 0) { t.Fatalf("put vote failed") } @@ -209,9 +210,9 @@ func testVotePool(t *testing.T, isValidRules bool) { }, }, } - voteManager.pool.PutVote(invalidVote) + voteManager.pool.PutVote("", invalidVote) - if !votePool.verifyStructureSizeOfVotePool(256, 256, 0, 256, 0) { + if !votePool.verifyStructureSizeOfVotePool(256, 0, 256, 0) { t.Fatalf("put vote failed") } @@ -231,9 +232,9 @@ func testVotePool(t *testing.T, isValidRules bool) { if err := voteManager.signer.SignVote(futureVote); err != nil { t.Fatalf("sign vote failed") } - voteManager.pool.PutVote(futureVote) + voteManager.pool.PutVote("", futureVote) - if !votePool.verifyStructureSizeOfVotePool(257, 256, 1, 256, 1) { + if !votePool.verifyStructureSizeOfVotePool(256, 1, 256, 1) { t.Fatalf("put vote failed") } @@ -248,9 +249,9 @@ func testVotePool(t *testing.T, isValidRules bool) { if err := voteManager.signer.SignVote(duplicateVote); err != nil { t.Fatalf("sign vote failed") } - voteManager.pool.PutVote(duplicateVote) + voteManager.pool.PutVote("", duplicateVote) - if !votePool.verifyStructureSizeOfVotePool(257, 256, 1, 256, 1) { + if !votePool.verifyStructureSizeOfVotePool(256, 1, 256, 1) { t.Fatalf("put vote failed") } @@ -263,8 +264,8 @@ func testVotePool(t *testing.T, isValidRules bool) { }, }, } - voteManager.pool.PutVote(futureVote) - if !votePool.verifyStructureSizeOfVotePool(257, 256, 1, 256, 1) { + voteManager.pool.PutVote("", futureVote) + if !votePool.verifyStructureSizeOfVotePool(256, 1, 256, 1) { t.Fatalf("put vote failed") } @@ -301,7 +302,7 @@ func testVotePool(t *testing.T, isValidRules bool) { } // Pruner will keep the size of votePool as latestBlockHeader-255~latestBlockHeader, then final result should be 256! - if !votePool.verifyStructureSizeOfVotePool(257, 256, 0, 256, 0) { + if !votePool.verifyStructureSizeOfVotePool(256, 0, 256, 0) { t.Fatalf("put vote failed") } @@ -357,3 +358,112 @@ func setUpKeyManager(t *testing.T) (string, string) { km.ImportKeystores(context.Background(), []*wallet.Keystore{keystore}, []string{password}) return walletPasswordDir, walletDir } + +func generateVote( + blockNumber int, + blockHash common.Hash, + secretKey blsCommon.SecretKey, +) *types.VoteEnvelope { + voteData := types.VoteData{ + TargetNumber: 1, + TargetHash: blockHash, + } + digest := voteData.Hash() + signature := secretKey.Sign(digest[:]) + + vote := &types.VoteEnvelope{ + RawVoteEnvelope: types.RawVoteEnvelope{ + PublicKey: types.BLSPublicKey(secretKey.PublicKey().Marshal()), + Signature: types.BLSSignature(signature.Marshal()), + Data: &voteData, + }, + } + + return vote +} + +func TestVotePoolDosProtection(t *testing.T) { + secretKey, err := bls.RandKey() + if err != nil { + t.Fatalf("Failed to create secret key, err %s", err) + } + + // Create a database pre-initialize with a genesis block + db := rawdb.NewMemoryDatabase() + genesis := (&core.Genesis{ + Config: params.TestChainConfig, + Alloc: core.GenesisAlloc{testAddr: {Balance: big.NewInt(1000000)}}, + BaseFee: big.NewInt(params.InitialBaseFee), + }).MustCommit(db) + chain, _ := core.NewBlockChain(db, nil, params.TestChainConfig, ethash.NewFullFaker(), vm.Config{}, nil, nil) + + bs, _ := core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 25, nil, true) + if _, err := chain.InsertChain(bs[:1]); err != nil { + panic(err) + } + mockEngine := &mockPOSA{} + + // Create vote pool + votePool := NewVotePool(chain, mockEngine, 22) + + for i := 0; i < maxFutureVotePerPeer; i++ { + vote := generateVote(1, common.BigToHash(big.NewInt(int64(i+1))), secretKey) + votePool.PutVote("AAAA", vote) + time.Sleep(100 * time.Millisecond) + } + + if len(*votePool.futureVotesPq) != maxFutureVotePerPeer { + t.Fatalf("Future vote pool length, expect %d have %d", maxFutureVotePerPeer, len(*votePool.futureVotesPq)) + } + if votePool.numFutureVotePerPeer["AAAA"] != maxFutureVotePerPeer { + t.Fatalf("Number of future vote per peer, expect %d have %d", maxFutureVotePerPeer, votePool.numFutureVotePerPeer["AAAA"]) + } + + // This vote is dropped due to DOS protection + vote := generateVote(1, common.BigToHash(big.NewInt(int64(maxFutureVoteAmountPerBlock+1))), secretKey) + votePool.PutVote("AAAA", vote) + time.Sleep(100 * time.Millisecond) + if len(*votePool.futureVotesPq) != maxFutureVotePerPeer { + t.Fatalf("Future vote pool length, expect %d have %d", maxFutureVotePerPeer, len(*votePool.futureVotesPq)) + } + if votePool.numFutureVotePerPeer["AAAA"] != maxFutureVotePerPeer { + t.Fatalf("Number of future vote per peer, expect %d have %d", maxFutureVotePerPeer, votePool.numFutureVotePerPeer["AAAA"]) + } + + // Vote from different peer must be accepted + vote = generateVote(1, common.BigToHash(big.NewInt(int64(maxFutureVoteAmountPerBlock+2))), secretKey) + votePool.PutVote("BBBB", vote) + time.Sleep(100 * time.Millisecond) + if len(*votePool.futureVotesPq) != maxFutureVotePerPeer+1 { + t.Fatalf("Future vote pool length, expect %d have %d", maxFutureVotePerPeer, len(*votePool.futureVotesPq)) + } + if votePool.numFutureVotePerPeer["AAAA"] != maxFutureVotePerPeer { + t.Fatalf("Number of future vote per peer, expect %d have %d", maxFutureVotePerPeer, votePool.numFutureVotePerPeer["AAAA"]) + } + if votePool.numFutureVotePerPeer["BBBB"] != 1 { + t.Fatalf("Number of future vote per peer, expect %d have %d", 1, votePool.numFutureVotePerPeer["BBBB"]) + } + + // One vote is not queued twice + votePool.PutVote("CCCC", vote) + time.Sleep(100 * time.Millisecond) + if len(*votePool.futureVotesPq) != maxFutureVotePerPeer+1 { + t.Fatalf("Future vote pool length, expect %d have %d", maxFutureVotePerPeer, len(*votePool.futureVotesPq)) + } + if votePool.numFutureVotePerPeer["CCCC"] != 0 { + t.Fatalf("Number of future vote per peer, expect %d have %d", 0, votePool.numFutureVotePerPeer["CCCC"]) + } + + if _, err := chain.InsertChain(bs[1:]); err != nil { + panic(err) + } + time.Sleep(100 * time.Millisecond) + // Future vote must be transferred to current and failed the verification, + // numFutureVotePerPeer decreases + if len(*votePool.futureVotesPq) != 0 { + t.Fatalf("Future vote pool length, expect %d have %d", 0, len(*votePool.futureVotesPq)) + } + if votePool.numFutureVotePerPeer["AAAA"] != 0 { + t.Fatalf("Number of future vote per peer, expect %d have %d", 0, votePool.numFutureVotePerPeer["AAAA"]) + } +} diff --git a/eth/backend.go b/eth/backend.go index 583563b3af..3ee2224c28 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -297,7 +297,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { if !ok { return nil, errors.New("consensus engine does not support fast finality") } - votePool := vote.NewVotePool(chainConfig, eth.blockchain, finalityEngine, nodeConfig.MaxCurVoteAmountPerBlock) + votePool := vote.NewVotePool(eth.blockchain, finalityEngine, nodeConfig.MaxCurVoteAmountPerBlock) if _, err := vote.NewVoteManager( eth, chainDb,