Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/protocols/eth: simplify peer known block/txs caches #23649

Merged
merged 4 commits into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 43 additions & 52 deletions eth/protocols/eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ type Peer struct {
head common.Hash // Latest advertised head block hash
td *big.Int // Latest advertised head block total difficulty

knownBlocks mapset.Set // Set of block hashes known to be known by this peer
knownBlocks *leakySet // Set of block hashes known to be known by this peer
queuedBlocks chan *blockPropagation // Queue of blocks to broadcast to the peer
queuedBlockAnns chan *types.Block // Queue of blocks to announce to the peer

txpool TxPool // Transaction pool used by the broadcasters for liveness checks
knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
knownTxs *leakySet // Set of transaction hashes known to be known by this peer
txBroadcast chan []common.Hash // Channel used to queue transaction propagation requests
txAnnounce chan []common.Hash // Channel used to queue transaction announcement requests

Expand All @@ -96,8 +96,8 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe
Peer: p,
rw: rw,
version: version,
knownTxs: mapset.NewSet(),
knownBlocks: mapset.NewSet(),
knownTxs: newLeakySet(maxKnownTxs),
knownBlocks: newLeakySet(maxKnownBlocks),
queuedBlocks: make(chan *blockPropagation, maxQueuedBlocks),
queuedBlockAnns: make(chan *types.Block, maxQueuedBlockAnns),
txBroadcast: make(chan []common.Hash),
Expand Down Expand Up @@ -162,19 +162,13 @@ func (p *Peer) KnownTransaction(hash common.Hash) bool {
// never be propagated to this particular peer.
func (p *Peer) markBlock(hash common.Hash) {
// If we reached the memory allowance, drop a previously known block hash
for p.knownBlocks.Cardinality() >= maxKnownBlocks {
p.knownBlocks.Pop()
}
p.knownBlocks.Add(hash)
}

// markTransaction marks a transaction as known for the peer, ensuring that it
// will never be propagated to this particular peer.
func (p *Peer) markTransaction(hash common.Hash) {
// If we reached the memory allowance, drop a previously known transaction hash
for p.knownTxs.Cardinality() >= maxKnownTxs {
p.knownTxs.Pop()
}
p.knownTxs.Add(hash)
}

Expand All @@ -189,9 +183,6 @@ func (p *Peer) markTransaction(hash common.Hash) {
// tests that directly send messages without having to do the asyn queueing.
func (p *Peer) SendTransactions(txs types.Transactions) error {
// Mark all the transactions as known, but ensure we don't overflow our limits
for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(txs)) {
p.knownTxs.Pop()
}
for _, tx := range txs {
p.knownTxs.Add(tx.Hash())
}
Expand All @@ -205,12 +196,7 @@ func (p *Peer) AsyncSendTransactions(hashes []common.Hash) {
select {
case p.txBroadcast <- hashes:
// Mark all the transactions as known, but ensure we don't overflow our limits
for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
p.knownTxs.Pop()
}
for _, hash := range hashes {
p.knownTxs.Add(hash)
}
p.knownTxs.Add(hashes...)
case <-p.term:
p.Log().Debug("Dropping transaction propagation", "count", len(hashes))
}
Expand All @@ -224,12 +210,7 @@ func (p *Peer) AsyncSendTransactions(hashes []common.Hash) {
// not be managed directly.
func (p *Peer) sendPooledTransactionHashes(hashes []common.Hash) error {
// Mark all the transactions as known, but ensure we don't overflow our limits
for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
p.knownTxs.Pop()
}
for _, hash := range hashes {
p.knownTxs.Add(hash)
}
p.knownTxs.Add(hashes...)
return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket(hashes))
}

Expand All @@ -240,12 +221,7 @@ func (p *Peer) AsyncSendPooledTransactionHashes(hashes []common.Hash) {
select {
case p.txAnnounce <- hashes:
// Mark all the transactions as known, but ensure we don't overflow our limits
for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
p.knownTxs.Pop()
}
for _, hash := range hashes {
p.knownTxs.Add(hash)
}
p.knownTxs.Add(hashes...)
case <-p.term:
p.Log().Debug("Dropping transaction announcement", "count", len(hashes))
}
Expand All @@ -254,12 +230,8 @@ func (p *Peer) AsyncSendPooledTransactionHashes(hashes []common.Hash) {
// ReplyPooledTransactionsRLP is the eth/66 version of SendPooledTransactionsRLP.
func (p *Peer) ReplyPooledTransactionsRLP(id uint64, hashes []common.Hash, txs []rlp.RawValue) error {
// Mark all the transactions as known, but ensure we don't overflow our limits
for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
p.knownTxs.Pop()
}
for _, hash := range hashes {
p.knownTxs.Add(hash)
}
p.knownTxs.Add(hashes...)

// Not packed into PooledTransactionsPacket to avoid RLP decoding
return p2p.Send(p.rw, PooledTransactionsMsg, PooledTransactionsRLPPacket66{
RequestId: id,
Expand All @@ -271,12 +243,8 @@ func (p *Peer) ReplyPooledTransactionsRLP(id uint64, hashes []common.Hash, txs [
// a hash notification.
func (p *Peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error {
// Mark all the block hashes as known, but ensure we don't overflow our limits
for p.knownBlocks.Cardinality() > max(0, maxKnownBlocks-len(hashes)) {
p.knownBlocks.Pop()
}
for _, hash := range hashes {
p.knownBlocks.Add(hash)
}
p.knownBlocks.Add(hashes...)

request := make(NewBlockHashesPacket, len(hashes))
for i := 0; i < len(hashes); i++ {
request[i].Hash = hashes[i]
Expand All @@ -292,9 +260,6 @@ func (p *Peer) AsyncSendNewBlockHash(block *types.Block) {
select {
case p.queuedBlockAnns <- block:
// Mark all the block hash as known, but ensure we don't overflow our limits
for p.knownBlocks.Cardinality() >= maxKnownBlocks {
p.knownBlocks.Pop()
}
p.knownBlocks.Add(block.Hash())
default:
p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash())
Expand All @@ -304,9 +269,6 @@ func (p *Peer) AsyncSendNewBlockHash(block *types.Block) {
// SendNewBlock propagates an entire block to a remote peer.
func (p *Peer) SendNewBlock(block *types.Block, td *big.Int) error {
// Mark all the block hash as known, but ensure we don't overflow our limits
for p.knownBlocks.Cardinality() >= maxKnownBlocks {
p.knownBlocks.Pop()
}
p.knownBlocks.Add(block.Hash())
return p2p.Send(p.rw, NewBlockMsg, &NewBlockPacket{
Block: block,
Expand All @@ -320,9 +282,6 @@ func (p *Peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
select {
case p.queuedBlocks <- &blockPropagation{block: block, td: td}:
// Mark all the block hash as known, but ensure we don't overflow our limits
for p.knownBlocks.Cardinality() >= maxKnownBlocks {
p.knownBlocks.Pop()
}
p.knownBlocks.Add(block.Hash())
default:
p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash())
Expand Down Expand Up @@ -465,3 +424,35 @@ func (p *Peer) RequestTxs(hashes []common.Hash) error {
GetPooledTransactionsPacket: hashes,
})
}

ferranbt marked this conversation as resolved.
Show resolved Hide resolved
type leakySet struct {
impl mapset.Set
ferranbt marked this conversation as resolved.
Show resolved Hide resolved
max int
}

func newLeakySet(max int) *leakySet {
return &leakySet{
max: max,
impl: mapset.NewSet(),
}
}

// Add adds a list of elements to the set.
func (s *leakySet) Add(hashes ...common.Hash) {
for s.impl.Cardinality() > max(0, s.max-len(hashes)) {
s.impl.Pop()
}
for _, hash := range hashes {
s.impl.Add(hash)
}
}

// Contains returns whether the given item is in the set.
func (s *leakySet) Contains(hash common.Hash) bool {
return s.impl.Contains(hash)
}

// Cardinality returns the number of elements in the set.
func (s *leakySet) Cardinality() int {
return s.impl.Cardinality()
}
27 changes: 27 additions & 0 deletions eth/protocols/eth/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ package eth

import (
"crypto/rand"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
)
Expand Down Expand Up @@ -59,3 +61,28 @@ func (p *testPeer) close() {
p.Peer.Close()
p.app.Close()
}

func TestPeerSet(t *testing.T) {
size := 5
s := newLeakySet(size)

// add 10 items
for i := 0; i < size*2; i++ {
s.Add(common.Hash{byte(i)})
}

if s.Cardinality() != size {
t.Fatalf("wrong size, expected %d but found %d", size, s.Cardinality())
}

vals := []common.Hash{}
for i := 10; i < 20; i++ {
vals = append(vals, common.Hash{byte(i)})
}

// add item in batch
s.Add(vals...)
if s.Cardinality() < size {
t.Fatalf("bad size")
}
}