From f76d5a560797fbf5bc148bcb8c032c329a5d1ebb Mon Sep 17 00:00:00 2001 From: Uh Sado Date: Wed, 18 Sep 2019 08:31:31 +0000 Subject: [PATCH] replaced faulty mapset.Set with lrucache --- eth/peer.go | 36 +++++++++++++++--------------------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/eth/peer.go b/eth/peer.go index ad76f8f6a98f..6e935bafc968 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -23,9 +23,9 @@ import ( "sync" "time" - mapset "github.com/deckarep/golang-set" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/batch" + "github.com/ethereum/go-ethereum/common/lru" "github.com/ethereum/go-ethereum/core/types" metaapi "github.com/ethereum/go-ethereum/metadium/api" "github.com/ethereum/go-ethereum/p2p" @@ -87,8 +87,8 @@ type peer struct { td *big.Int lock sync.RWMutex - knownTxs mapset.Set // Set of transaction hashes known to be known by this peer - knownBlocks mapset.Set // Set of block hashes known to be known by this peer + knownTxs *lru.LruCache // Set of transaction hashes known to be known by this peer + knownBlocks *lru.LruCache // Set of block hashes known to be known by this peer queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer queuedProps chan *propEvent // Queue of blocks to broadcast to the peer queuedAnns chan *types.Block // Queue of blocks to announce to the peer @@ -101,8 +101,8 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { rw: rw, version: version, id: fmt.Sprintf("%x", p.ID().Bytes()[:8]), - knownTxs: mapset.NewSet(), - knownBlocks: mapset.NewSet(), + knownTxs: lru.NewLruCache(maxKnownTxs, true), + knownBlocks: lru.NewLruCache(maxKnownBlocks, true), queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), queuedProps: make(chan *propEvent, maxQueuedProps), queuedAnns: make(chan *types.Block, maxQueuedAnns), @@ -194,27 +194,21 @@ func (p *peer) SetHead(hash common.Hash, td *big.Int) { // never be propagated to this particular peer. func (p *peer) MarkBlock(hash common.Hash) { // If we reached the memory allowance, drop a previously known block hash - for p.knownBlocks.Cardinality() >= maxKnownBlocks { - p.knownBlocks.Pop() - } - p.knownBlocks.Add(hash) + p.knownBlocks.Put(hash, true) } // MarkTransaction marks a transaction as known for the peer, ensuring that it // will never be propagated to this particular peer. func (p *peer) MarkTransaction(hash common.Hash) { // If we reached the memory allowance, drop a previously known transaction hash - for p.knownTxs.Cardinality() >= maxKnownTxs { - p.knownTxs.Pop() - } - p.knownTxs.Add(hash) + p.knownTxs.Put(hash, true) } // SendTransactions sends transactions to the peer and includes the hashes // in its transaction hash set for future reference. func (p *peer) SendTransactions(txs types.Transactions) error { for _, tx := range txs { - p.knownTxs.Add(tx.Hash()) + p.knownTxs.Put(tx.Hash(), true) } return p2p.Send(p.rw, TxMsg, txs) } @@ -225,7 +219,7 @@ func (p *peer) AsyncSendTransactions(txs []*types.Transaction) { select { case p.queuedTxs <- txs: for _, tx := range txs { - p.knownTxs.Add(tx.Hash()) + p.knownTxs.Put(tx.Hash(), true) } default: p.Log().Debug("Dropping transaction propagation", "count", len(txs)) @@ -243,7 +237,7 @@ func (p *peer) resendPendingTxs(txs map[common.Address]types.Transactions) { // a hash notification. func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error { for _, hash := range hashes { - p.knownBlocks.Add(hash) + p.knownBlocks.Put(hash, true) } request := make(newBlockHashesData, len(hashes)) for i := 0; i < len(hashes); i++ { @@ -259,7 +253,7 @@ func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error func (p *peer) AsyncSendNewBlockHash(block *types.Block) { select { case p.queuedAnns <- block: - p.knownBlocks.Add(block.Hash()) + p.knownBlocks.Put(block.Hash(), true) default: p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash()) } @@ -267,7 +261,7 @@ func (p *peer) AsyncSendNewBlockHash(block *types.Block) { // SendNewBlock propagates an entire block to a remote peer. func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error { - p.knownBlocks.Add(block.Hash()) + p.knownBlocks.Put(block.Hash(), true) return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td}) } @@ -276,7 +270,7 @@ func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error { func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) { select { case p.queuedProps <- &propEvent{block: block, td: td}: - p.knownBlocks.Add(block.Hash()) + p.knownBlocks.Put(block.Hash(), true) default: p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash()) } @@ -522,7 +516,7 @@ func (ps *peerSet) PeersWithoutBlock(hash common.Hash) []*peer { list := make([]*peer, 0, len(ps.peers)) for _, p := range ps.peers { - if !p.knownBlocks.Contains(hash) { + if !p.knownBlocks.Exists(hash) { list = append(list, p) } } @@ -537,7 +531,7 @@ func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer { list := make([]*peer, 0, len(ps.peers)) for _, p := range ps.peers { - if !p.knownTxs.Contains(hash) { + if !p.knownTxs.Exists(hash) { list = append(list, p) } }