Skip to content

Commit

Permalink
eth: enforce announcement metadatas and drop peers violating the prot…
Browse files Browse the repository at this point in the history
…ocol (ethereum#28261)

* eth: enforce announcement metadatas and drop peers violating the protocol

* eth/fetcher: relax eth/68 validation a bit for flakey clients

* tests/fuzzers/txfetcher: pull in suggestion from Marius

* eth/fetcher: add tests for peer dropping

* eth/fetcher: linter linter linter linter linter
 Conflicts:
	eth/fetcher/tx_fetcher.go
	eth/handler.go
	eth/handler_eth.go
  • Loading branch information
karalabe authored and omerfirmak committed Aug 6, 2024
1 parent c14609a commit 90e5d79
Show file tree
Hide file tree
Showing 5 changed files with 542 additions and 78 deletions.
147 changes: 105 additions & 42 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 The go-ethereum Authors
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
Expand All @@ -20,6 +20,7 @@ import (
"bytes"
"errors"
"fmt"
"math"
mrand "math/rand"
"sort"
"time"
Expand Down Expand Up @@ -103,6 +104,14 @@ var (
type txAnnounce struct {
origin string // Identifier of the peer originating the notification
hashes []common.Hash // Batch of transaction hashes being announced
metas []*txMetadata // Batch of metadatas associated with the hashes (nil before eth/68)
}

// txMetadata is a set of extra data transmitted along the announcement for better
// fetch scheduling.
type txMetadata struct {
kind byte // Transaction consensus type
size uint32 // Transaction size in bytes
}

// txRequest represents an in-flight transaction retrieval request destined to
Expand All @@ -118,10 +127,11 @@ type txRequest struct {
type txDelivery struct {
origin string // Identifier of the peer originating the notification
hashes []common.Hash // Batch of transaction hashes having been delivered
metas []txMetadata // Batch of metadatas associated with the delivered hashes
direct bool // Whether this is a direct reply or a broadcast
}

// txDrop is the notiication that a peer has disconnected.
// txDrop is the notification that a peer has disconnected.
type txDrop struct {
peer string
}
Expand Down Expand Up @@ -153,14 +163,14 @@ type TxFetcher struct {

// Stage 1: Waiting lists for newly discovered transactions that might be
// broadcast without needing explicit request/reply round trips.
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
waitslots map[string]map[common.Hash]struct{} // Waiting announcement sgroupped by peer (DoS protection)
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
waitslots map[string]map[common.Hash]*txMetadata // Waiting announcement sgroupped by peer (DoS protection)

// Stage 2: Queue of transactions that waiting to be allocated to some peer
// to be retrieved directly.
announces map[string]map[common.Hash]struct{} // Set of announced transactions, grouped by origin peer
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash
announces map[string]map[common.Hash]*txMetadata // Set of announced transactions, grouped by origin peer
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash

// Stage 3: Set of transactions currently being retrieved, some which may be
// fulfilled and some rescheduled. Note, this step shares 'announces' from the
Expand All @@ -173,6 +183,7 @@ type TxFetcher struct {
hasTx func(common.Hash) bool // Retrieves a tx from the local txpool
addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
dropPeer func(string) // Drops a peer in case of announcement violation

step chan struct{} // Notification channel when the fetcher loop iterates
clock mclock.Clock // Time wrapper to simulate in tests
Expand All @@ -181,14 +192,14 @@ type TxFetcher struct {

// NewTxFetcher creates a transaction fetcher to retrieve transaction
// based on hash announcements.
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher {
return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil)
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher {
return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, dropPeer, mclock.System{}, nil)
}

// NewTxFetcherForTests is a testing method to mock out the realtime clock with
// a simulated version and the internal randomness with a deterministic one.
func NewTxFetcherForTests(
hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error,
hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string),
clock mclock.Clock, rand *mrand.Rand) *TxFetcher {
return &TxFetcher{
notify: make(chan *txAnnounce),
Expand All @@ -197,8 +208,8 @@ func NewTxFetcherForTests(
quit: make(chan struct{}),
waitlist: make(map[common.Hash]map[string]struct{}),
waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]struct{}),
announces: make(map[string]map[common.Hash]struct{}),
waitslots: make(map[string]map[common.Hash]*txMetadata),
announces: make(map[string]map[common.Hash]*txMetadata),
announced: make(map[common.Hash]map[string]struct{}),
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
Expand All @@ -207,27 +218,31 @@ func NewTxFetcherForTests(
hasTx: hasTx,
addTxs: addTxs,
fetchTxs: fetchTxs,
dropPeer: dropPeer,
clock: clock,
rand: rand,
}
}

// Notify announces the fetcher of the potential availability of a new batch of
// transactions in the network.
func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []common.Hash) error {
// Keep track of all the announced transactions
txAnnounceInMeter.Mark(int64(len(hashes)))

// Skip any transaction announcements that we already know of, or that we've
// previously marked as cheap and discarded. This check is of course racey,
// previously marked as cheap and discarded. This check is of course racy,
// because multiple concurrent notifies will still manage to pass it, but it's
// still valuable to check here because it runs concurrent to the internal
// loop, so anything caught here is time saved internally.
var (
unknowns = make([]common.Hash, 0, len(hashes))
duplicate, underpriced int64
unknownHashes = make([]common.Hash, 0, len(hashes))
unknownMetas = make([]*txMetadata, 0, len(hashes))

duplicate int64
underpriced int64
)
for _, hash := range hashes {
for i, hash := range hashes {
switch {
case f.hasTx(hash):
duplicate++
Expand All @@ -236,20 +251,22 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
underpriced++

default:
unknowns = append(unknowns, hash)
unknownHashes = append(unknownHashes, hash)
if types == nil {
unknownMetas = append(unknownMetas, nil)
} else {
unknownMetas = append(unknownMetas, &txMetadata{kind: types[i], size: sizes[i]})
}
}
}
txAnnounceKnownMeter.Mark(duplicate)
txAnnounceUnderpricedMeter.Mark(underpriced)

// If anything's left to announce, push it into the internal loop
if len(unknowns) == 0 {
if len(unknownHashes) == 0 {
return nil
}
announce := &txAnnounce{
origin: peer,
hashes: unknowns,
}
announce := &txAnnounce{origin: peer, hashes: unknownHashes, metas: unknownMetas}
select {
case f.notify <- announce:
return nil
Expand All @@ -261,7 +278,7 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
// Enqueue imports a batch of received transaction into the transaction pool
// and the fetcher. This method may be called by both transaction broadcasts and
// direct request replies. The differentiation is important so the fetcher can
// re-shedule missing transactions as soon as possible.
// re-schedule missing transactions as soon as possible.
func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error {
// Keep track of all the propagated transactions
if direct {
Expand All @@ -273,6 +290,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
// re-requesting them and dropping the peer in case of malicious transfers.
var (
added = make([]common.Hash, 0, len(txs))
metas = make([]txMetadata, 0, len(txs))
duplicate int64
underpriced int64
otherreject int64
Expand Down Expand Up @@ -302,6 +320,10 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
otherreject++
}
added = append(added, txs[i].Hash())
metas = append(metas, txMetadata{
kind: txs[i].Type(),
size: uint32(txs[i].Size()),
})
}
if direct {
txReplyKnownMeter.Mark(duplicate)
Expand All @@ -313,7 +335,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
txBroadcastOtherRejectMeter.Mark(otherreject)
}
select {
case f.cleanup <- &txDelivery{origin: peer, hashes: added, direct: direct}:
case f.cleanup <- &txDelivery{origin: peer, hashes: added, metas: metas, direct: direct}:
return nil
case <-f.quit:
return errTerminated
Expand Down Expand Up @@ -370,13 +392,15 @@ func (f *TxFetcher) loop() {
want := used + len(ann.hashes)
if want > maxTxAnnounces {
txAnnounceDOSMeter.Mark(int64(want - maxTxAnnounces))

ann.hashes = ann.hashes[:want-maxTxAnnounces]
ann.metas = ann.metas[:want-maxTxAnnounces]
}
// All is well, schedule the remainder of the transactions
idleWait := len(f.waittime) == 0
_, oldPeer := f.announces[ann.origin]

for _, hash := range ann.hashes {
for i, hash := range ann.hashes {
// If the transaction is already downloading, add it to the list
// of possible alternates (in case the current retrieval fails) and
// also account it for the peer.
Expand All @@ -385,9 +409,9 @@ func (f *TxFetcher) loop() {

// Stage 2 and 3 share the set of origins per tx
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = struct{}{}
announces[hash] = ann.metas[i]
} else {
f.announces[ann.origin] = map[common.Hash]struct{}{hash: {}}
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
}
continue
}
Expand All @@ -398,22 +422,28 @@ func (f *TxFetcher) loop() {

// Stage 2 and 3 share the set of origins per tx
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = struct{}{}
announces[hash] = ann.metas[i]
} else {
f.announces[ann.origin] = map[common.Hash]struct{}{hash: {}}
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
}
continue
}
// If the transaction is already known to the fetcher, but not
// yet downloading, add the peer as an alternate origin in the
// waiting list.
if f.waitlist[hash] != nil {
// Ignore double announcements from the same peer. This is
// especially important if metadata is also passed along to
// prevent malicious peers flip-flopping good/bad values.
if _, ok := f.waitlist[hash][ann.origin]; ok {
continue
}
f.waitlist[hash][ann.origin] = struct{}{}

if waitslots := f.waitslots[ann.origin]; waitslots != nil {
waitslots[hash] = struct{}{}
waitslots[hash] = ann.metas[i]
} else {
f.waitslots[ann.origin] = map[common.Hash]struct{}{hash: {}}
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
}
continue
}
Expand All @@ -422,9 +452,9 @@ func (f *TxFetcher) loop() {
f.waittime[hash] = f.clock.Now()

if waitslots := f.waitslots[ann.origin]; waitslots != nil {
waitslots[hash] = struct{}{}
waitslots[hash] = ann.metas[i]
} else {
f.waitslots[ann.origin] = map[common.Hash]struct{}{hash: {}}
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
}
}
// If a new item was added to the waitlist, schedule it into the fetcher
Expand All @@ -450,9 +480,9 @@ func (f *TxFetcher) loop() {
f.announced[hash] = f.waitlist[hash]
for peer := range f.waitlist[hash] {
if announces := f.announces[peer]; announces != nil {
announces[hash] = struct{}{}
announces[hash] = f.waitslots[peer][hash]
} else {
f.announces[peer] = map[common.Hash]struct{}{hash: {}}
f.announces[peer] = map[common.Hash]*txMetadata{hash: f.waitslots[peer][hash]}
}
delete(f.waitslots[peer], hash)
if len(f.waitslots[peer]) == 0 {
Expand Down Expand Up @@ -521,10 +551,27 @@ func (f *TxFetcher) loop() {

case delivery := <-f.cleanup:
// Independent if the delivery was direct or broadcast, remove all
// traces of the hash from internal trackers
for _, hash := range delivery.hashes {
// traces of the hash from internal trackers. That said, compare any
// advertised metadata with the real ones and drop bad peers.
for i, hash := range delivery.hashes {
if _, ok := f.waitlist[hash]; ok {
for peer, txset := range f.waitslots {
if meta := txset[hash]; meta != nil {
if delivery.metas[i].kind != meta.kind {
log.Warn("Announced transaction type mismatch", "peer", peer, "tx", hash, "type", delivery.metas[i].kind, "ann", meta.kind)
f.dropPeer(peer)
} else if delivery.metas[i].size != meta.size {
log.Warn("Announced transaction size mismatch", "peer", peer, "tx", hash, "size", delivery.metas[i].size, "ann", meta.size)
if math.Abs(float64(delivery.metas[i].size)-float64(meta.size)) > 8 {
// Normally we should drop a peer considering this is a protocol violation.
// However, due to the RLP vs consensus format messyness, allow a few bytes
// wiggle-room where we only warn, but don't drop.
//
// TODO(karalabe): Get rid of this relaxation when clients are proven stable.
f.dropPeer(peer)
}
}
}
delete(txset, hash)
if len(txset) == 0 {
delete(f.waitslots, peer)
Expand All @@ -534,6 +581,22 @@ func (f *TxFetcher) loop() {
delete(f.waittime, hash)
} else {
for peer, txset := range f.announces {
if meta := txset[hash]; meta != nil {
if delivery.metas[i].kind != meta.kind {
log.Warn("Announced transaction type mismatch", "peer", peer, "tx", hash, "type", delivery.metas[i].kind, "ann", meta.kind)
f.dropPeer(peer)
} else if delivery.metas[i].size != meta.size {
log.Warn("Announced transaction size mismatch", "peer", peer, "tx", hash, "size", delivery.metas[i].size, "ann", meta.size)
if math.Abs(float64(delivery.metas[i].size)-float64(meta.size)) > 8 {
// Normally we should drop a peer considering this is a protocol violation.
// However, due to the RLP vs consensus format messyness, allow a few bytes
// wiggle-room where we only warn, but don't drop.
//
// TODO(karalabe): Get rid of this relaxation when clients are proven stable.
f.dropPeer(peer)
}
}
}
delete(txset, hash)
if len(txset) == 0 {
delete(f.announces, peer)
Expand All @@ -559,7 +622,7 @@ func (f *TxFetcher) loop() {
// In case of a direct delivery, also reschedule anything missing
// from the original query
if delivery.direct {
// Mark the reqesting successful (independent of individual status)
// Mark the requesting successful (independent of individual status)
txRequestDoneMeter.Mark(int64(len(delivery.hashes)))

// Make sure something was pending, nuke it
Expand Down Expand Up @@ -608,7 +671,7 @@ func (f *TxFetcher) loop() {
delete(f.alternates, hash)
delete(f.fetching, hash)
}
// Something was delivered, try to rechedule requests
// Something was delivered, try to reschedule requests
f.scheduleFetches(timeoutTimer, timeoutTrigger, nil) // Partial delivery may enable others to deliver too
}

Expand Down Expand Up @@ -720,7 +783,7 @@ func (f *TxFetcher) rescheduleWait(timer *mclock.Timer, trigger chan struct{}) {
// should be rescheduled if some request is pending. In practice, a timeout will
// cause the timer to be rescheduled every 5 secs (until the peer comes through or
// disconnects). This is a limitation of the fetcher code because we don't trac
// pending requests and timed out requests separatey. Without double tracking, if
// pending requests and timed out requests separately. Without double tracking, if
// we simply didn't reschedule the timer on all-timeout then the timer would never
// be set again since len(request) > 0 => something's running.
func (f *TxFetcher) rescheduleTimeout(timer *mclock.Timer, trigger chan struct{}) {
Expand Down Expand Up @@ -835,7 +898,7 @@ func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string))

// forEachHash does a range loop over a map of hashes in production, but during
// testing it does a deterministic sorted random to allow reproducing issues.
func (f *TxFetcher) forEachHash(hashes map[common.Hash]struct{}, do func(hash common.Hash) bool) {
func (f *TxFetcher) forEachHash(hashes map[common.Hash]*txMetadata, do func(hash common.Hash) bool) {
// If we're running production, use whatever Go's map gives us
if f.rand == nil {
for hash := range hashes {
Expand Down
Loading

0 comments on commit 90e5d79

Please sign in to comment.