Skip to content

Commit

Permalink
eth: enforce announcement metadatas and drop peers violating the prot…
Browse files Browse the repository at this point in the history
…ocol (#28261)

* eth: enforce announcement metadatas and drop peers violating the protocol

* eth/fetcher: relax eth/68 validation a bit for flakey clients

* tests/fuzzers/txfetcher: pull in suggestion from Marius

* eth/fetcher: add tests for peer dropping

* eth/fetcher: linter linter linter linter linter
  • Loading branch information
karalabe authored Oct 10, 2023
1 parent 6505297 commit 8afbcf4
Show file tree
Hide file tree
Showing 5 changed files with 531 additions and 65 deletions.
127 changes: 96 additions & 31 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"errors"
"fmt"
"math"
mrand "math/rand"
"sort"
"time"
Expand Down Expand Up @@ -105,6 +106,14 @@ var (
type txAnnounce struct {
origin string // Identifier of the peer originating the notification
hashes []common.Hash // Batch of transaction hashes being announced
metas []*txMetadata // Batch of metadatas associated with the hashes (nil before eth/68)
}

// txMetadata is a set of extra data transmitted along the announcement for better
// fetch scheduling.
type txMetadata struct {
kind byte // Transaction consensus type
size uint32 // Transaction size in bytes
}

// txRequest represents an in-flight transaction retrieval request destined to
Expand All @@ -120,6 +129,7 @@ type txRequest struct {
type txDelivery struct {
origin string // Identifier of the peer originating the notification
hashes []common.Hash // Batch of transaction hashes having been delivered
metas []txMetadata // Batch of metadatas associated with the delivered hashes
direct bool // Whether this is a direct reply or a broadcast
}

Expand Down Expand Up @@ -155,14 +165,14 @@ type TxFetcher struct {

// Stage 1: Waiting lists for newly discovered transactions that might be
// broadcast without needing explicit request/reply round trips.
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
waitslots map[string]map[common.Hash]struct{} // Waiting announcements grouped by peer (DoS protection)
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
waitslots map[string]map[common.Hash]*txMetadata // Waiting announcements grouped by peer (DoS protection)

// Stage 2: Queue of transactions that waiting to be allocated to some peer
// to be retrieved directly.
announces map[string]map[common.Hash]struct{} // Set of announced transactions, grouped by origin peer
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash
announces map[string]map[common.Hash]*txMetadata // Set of announced transactions, grouped by origin peer
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash

// Stage 3: Set of transactions currently being retrieved, some which may be
// fulfilled and some rescheduled. Note, this step shares 'announces' from the
Expand All @@ -175,6 +185,7 @@ type TxFetcher struct {
hasTx func(common.Hash) bool // Retrieves a tx from the local txpool
addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
dropPeer func(string) // Drops a peer in case of announcement violation

step chan struct{} // Notification channel when the fetcher loop iterates
clock mclock.Clock // Time wrapper to simulate in tests
Expand All @@ -183,14 +194,14 @@ type TxFetcher struct {

// NewTxFetcher creates a transaction fetcher to retrieve transaction
// based on hash announcements.
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher {
return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil)
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher {
return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, dropPeer, mclock.System{}, nil)
}

// NewTxFetcherForTests is a testing method to mock out the realtime clock with
// a simulated version and the internal randomness with a deterministic one.
func NewTxFetcherForTests(
hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error,
hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string),
clock mclock.Clock, rand *mrand.Rand) *TxFetcher {
return &TxFetcher{
notify: make(chan *txAnnounce),
Expand All @@ -199,8 +210,8 @@ func NewTxFetcherForTests(
quit: make(chan struct{}),
waitlist: make(map[common.Hash]map[string]struct{}),
waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]struct{}),
announces: make(map[string]map[common.Hash]struct{}),
waitslots: make(map[string]map[common.Hash]*txMetadata),
announces: make(map[string]map[common.Hash]*txMetadata),
announced: make(map[common.Hash]map[string]struct{}),
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
Expand All @@ -209,14 +220,15 @@ func NewTxFetcherForTests(
hasTx: hasTx,
addTxs: addTxs,
fetchTxs: fetchTxs,
dropPeer: dropPeer,
clock: clock,
rand: rand,
}
}

// Notify announces the fetcher of the potential availability of a new batch of
// transactions in the network.
func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []common.Hash) error {
// Keep track of all the announced transactions
txAnnounceInMeter.Mark(int64(len(hashes)))

Expand All @@ -226,28 +238,35 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
// still valuable to check here because it runs concurrent to the internal
// loop, so anything caught here is time saved internally.
var (
unknowns = make([]common.Hash, 0, len(hashes))
unknownHashes = make([]common.Hash, 0, len(hashes))
unknownMetas = make([]*txMetadata, 0, len(hashes))

duplicate int64
underpriced int64
)
for _, hash := range hashes {
for i, hash := range hashes {
switch {
case f.hasTx(hash):
duplicate++
case f.isKnownUnderpriced(hash):
underpriced++
default:
unknowns = append(unknowns, hash)
unknownHashes = append(unknownHashes, hash)
if types == nil {
unknownMetas = append(unknownMetas, nil)
} else {
unknownMetas = append(unknownMetas, &txMetadata{kind: types[i], size: sizes[i]})
}
}
}
txAnnounceKnownMeter.Mark(duplicate)
txAnnounceUnderpricedMeter.Mark(underpriced)

// If anything's left to announce, push it into the internal loop
if len(unknowns) == 0 {
if len(unknownHashes) == 0 {
return nil
}
announce := &txAnnounce{origin: peer, hashes: unknowns}
announce := &txAnnounce{origin: peer, hashes: unknownHashes, metas: unknownMetas}
select {
case f.notify <- announce:
return nil
Expand Down Expand Up @@ -290,6 +309,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
// re-requesting them and dropping the peer in case of malicious transfers.
var (
added = make([]common.Hash, 0, len(txs))
metas = make([]txMetadata, 0, len(txs))
)
// proceed in batches
for i := 0; i < len(txs); i += 128 {
Expand Down Expand Up @@ -325,6 +345,10 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
otherreject++
}
added = append(added, batch[j].Hash())
metas = append(metas, txMetadata{
kind: batch[j].Type(),
size: uint32(batch[j].Size()),
})
}
knownMeter.Mark(duplicate)
underpricedMeter.Mark(underpriced)
Expand All @@ -337,7 +361,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
}
}
select {
case f.cleanup <- &txDelivery{origin: peer, hashes: added, direct: direct}:
case f.cleanup <- &txDelivery{origin: peer, hashes: added, metas: metas, direct: direct}:
return nil
case <-f.quit:
return errTerminated
Expand Down Expand Up @@ -394,13 +418,15 @@ func (f *TxFetcher) loop() {
want := used + len(ann.hashes)
if want > maxTxAnnounces {
txAnnounceDOSMeter.Mark(int64(want - maxTxAnnounces))

ann.hashes = ann.hashes[:want-maxTxAnnounces]
ann.metas = ann.metas[:want-maxTxAnnounces]
}
// All is well, schedule the remainder of the transactions
idleWait := len(f.waittime) == 0
_, oldPeer := f.announces[ann.origin]

for _, hash := range ann.hashes {
for i, hash := range ann.hashes {
// If the transaction is already downloading, add it to the list
// of possible alternates (in case the current retrieval fails) and
// also account it for the peer.
Expand All @@ -409,9 +435,9 @@ func (f *TxFetcher) loop() {

// Stage 2 and 3 share the set of origins per tx
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = struct{}{}
announces[hash] = ann.metas[i]
} else {
f.announces[ann.origin] = map[common.Hash]struct{}{hash: {}}
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
}
continue
}
Expand All @@ -422,22 +448,28 @@ func (f *TxFetcher) loop() {

// Stage 2 and 3 share the set of origins per tx
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = struct{}{}
announces[hash] = ann.metas[i]
} else {
f.announces[ann.origin] = map[common.Hash]struct{}{hash: {}}
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
}
continue
}
// If the transaction is already known to the fetcher, but not
// yet downloading, add the peer as an alternate origin in the
// waiting list.
if f.waitlist[hash] != nil {
// Ignore double announcements from the same peer. This is
// especially important if metadata is also passed along to
// prevent malicious peers flip-flopping good/bad values.
if _, ok := f.waitlist[hash][ann.origin]; ok {
continue
}
f.waitlist[hash][ann.origin] = struct{}{}

if waitslots := f.waitslots[ann.origin]; waitslots != nil {
waitslots[hash] = struct{}{}
waitslots[hash] = ann.metas[i]
} else {
f.waitslots[ann.origin] = map[common.Hash]struct{}{hash: {}}
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
}
continue
}
Expand All @@ -446,9 +478,9 @@ func (f *TxFetcher) loop() {
f.waittime[hash] = f.clock.Now()

if waitslots := f.waitslots[ann.origin]; waitslots != nil {
waitslots[hash] = struct{}{}
waitslots[hash] = ann.metas[i]
} else {
f.waitslots[ann.origin] = map[common.Hash]struct{}{hash: {}}
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
}
}
// If a new item was added to the waitlist, schedule it into the fetcher
Expand All @@ -474,9 +506,9 @@ func (f *TxFetcher) loop() {
f.announced[hash] = f.waitlist[hash]
for peer := range f.waitlist[hash] {
if announces := f.announces[peer]; announces != nil {
announces[hash] = struct{}{}
announces[hash] = f.waitslots[peer][hash]
} else {
f.announces[peer] = map[common.Hash]struct{}{hash: {}}
f.announces[peer] = map[common.Hash]*txMetadata{hash: f.waitslots[peer][hash]}
}
delete(f.waitslots[peer], hash)
if len(f.waitslots[peer]) == 0 {
Expand Down Expand Up @@ -545,10 +577,27 @@ func (f *TxFetcher) loop() {

case delivery := <-f.cleanup:
// Independent if the delivery was direct or broadcast, remove all
// traces of the hash from internal trackers
for _, hash := range delivery.hashes {
// traces of the hash from internal trackers. That said, compare any
// advertised metadata with the real ones and drop bad peers.
for i, hash := range delivery.hashes {
if _, ok := f.waitlist[hash]; ok {
for peer, txset := range f.waitslots {
if meta := txset[hash]; meta != nil {
if delivery.metas[i].kind != meta.kind {
log.Warn("Announced transaction type mismatch", "peer", peer, "tx", hash, "type", delivery.metas[i].kind, "ann", meta.kind)
f.dropPeer(peer)
} else if delivery.metas[i].size != meta.size {
log.Warn("Announced transaction size mismatch", "peer", peer, "tx", hash, "size", delivery.metas[i].size, "ann", meta.size)
if math.Abs(float64(delivery.metas[i].size)-float64(meta.size)) > 8 {
// Normally we should drop a peer considering this is a protocol violation.
// However, due to the RLP vs consensus format messyness, allow a few bytes
// wiggle-room where we only warn, but don't drop.
//
// TODO(karalabe): Get rid of this relaxation when clients are proven stable.
f.dropPeer(peer)
}
}
}
delete(txset, hash)
if len(txset) == 0 {
delete(f.waitslots, peer)
Expand All @@ -558,6 +607,22 @@ func (f *TxFetcher) loop() {
delete(f.waittime, hash)
} else {
for peer, txset := range f.announces {
if meta := txset[hash]; meta != nil {
if delivery.metas[i].kind != meta.kind {
log.Warn("Announced transaction type mismatch", "peer", peer, "tx", hash, "type", delivery.metas[i].kind, "ann", meta.kind)
f.dropPeer(peer)
} else if delivery.metas[i].size != meta.size {
log.Warn("Announced transaction size mismatch", "peer", peer, "tx", hash, "size", delivery.metas[i].size, "ann", meta.size)
if math.Abs(float64(delivery.metas[i].size)-float64(meta.size)) > 8 {
// Normally we should drop a peer considering this is a protocol violation.
// However, due to the RLP vs consensus format messyness, allow a few bytes
// wiggle-room where we only warn, but don't drop.
//
// TODO(karalabe): Get rid of this relaxation when clients are proven stable.
f.dropPeer(peer)
}
}
}
delete(txset, hash)
if len(txset) == 0 {
delete(f.announces, peer)
Expand Down Expand Up @@ -859,7 +924,7 @@ func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string))

// forEachHash does a range loop over a map of hashes in production, but during
// testing it does a deterministic sorted random to allow reproducing issues.
func (f *TxFetcher) forEachHash(hashes map[common.Hash]struct{}, do func(hash common.Hash) bool) {
func (f *TxFetcher) forEachHash(hashes map[common.Hash]*txMetadata, do func(hash common.Hash) bool) {
// If we're running production, use whatever Go's map gives us
if f.rand == nil {
for hash := range hashes {
Expand Down
Loading

0 comments on commit 8afbcf4

Please sign in to comment.