-
Notifications
You must be signed in to change notification settings - Fork 20.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
eth: enforce announcement metadatas and drop peers violating the protocol #28261
Changes from all commits
3c6a45e
4a0d6ca
c44bcfb
661b7b2
5766018
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ import ( | |
"bytes" | ||
"errors" | ||
"fmt" | ||
"math" | ||
mrand "math/rand" | ||
"sort" | ||
"time" | ||
|
@@ -105,6 +106,14 @@ var ( | |
type txAnnounce struct { | ||
origin string // Identifier of the peer originating the notification | ||
hashes []common.Hash // Batch of transaction hashes being announced | ||
metas []*txMetadata // Batch of metadatas associated with the hashes (nil before eth/68) | ||
} | ||
|
||
// txMetadata is a set of extra data transmitted along the announcement for better | ||
// fetch scheduling. | ||
type txMetadata struct { | ||
kind byte // Transaction consensus type | ||
size uint32 // Transaction size in bytes | ||
} | ||
|
||
// txRequest represents an in-flight transaction retrieval request destined to | ||
|
@@ -120,6 +129,7 @@ type txRequest struct { | |
type txDelivery struct { | ||
origin string // Identifier of the peer originating the notification | ||
hashes []common.Hash // Batch of transaction hashes having been delivered | ||
metas []txMetadata // Batch of metadatas associated with the delivered hashes | ||
direct bool // Whether this is a direct reply or a broadcast | ||
} | ||
|
||
|
@@ -155,14 +165,14 @@ type TxFetcher struct { | |
|
||
// Stage 1: Waiting lists for newly discovered transactions that might be | ||
// broadcast without needing explicit request/reply round trips. | ||
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast | ||
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist | ||
waitslots map[string]map[common.Hash]struct{} // Waiting announcements grouped by peer (DoS protection) | ||
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast | ||
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist | ||
waitslots map[string]map[common.Hash]*txMetadata // Waiting announcements grouped by peer (DoS protection) | ||
|
||
// Stage 2: Queue of transactions that waiting to be allocated to some peer | ||
// to be retrieved directly. | ||
announces map[string]map[common.Hash]struct{} // Set of announced transactions, grouped by origin peer | ||
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash | ||
announces map[string]map[common.Hash]*txMetadata // Set of announced transactions, grouped by origin peer | ||
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash | ||
|
||
// Stage 3: Set of transactions currently being retrieved, some which may be | ||
// fulfilled and some rescheduled. Note, this step shares 'announces' from the | ||
|
@@ -175,6 +185,7 @@ type TxFetcher struct { | |
hasTx func(common.Hash) bool // Retrieves a tx from the local txpool | ||
addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool | ||
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer | ||
dropPeer func(string) // Drops a peer in case of announcement violation | ||
|
||
step chan struct{} // Notification channel when the fetcher loop iterates | ||
clock mclock.Clock // Time wrapper to simulate in tests | ||
|
@@ -183,14 +194,14 @@ type TxFetcher struct { | |
|
||
// NewTxFetcher creates a transaction fetcher to retrieve transaction | ||
// based on hash announcements. | ||
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher { | ||
return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil) | ||
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher { | ||
return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, dropPeer, mclock.System{}, nil) | ||
} | ||
|
||
// NewTxFetcherForTests is a testing method to mock out the realtime clock with | ||
// a simulated version and the internal randomness with a deterministic one. | ||
func NewTxFetcherForTests( | ||
hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, | ||
hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), | ||
clock mclock.Clock, rand *mrand.Rand) *TxFetcher { | ||
return &TxFetcher{ | ||
notify: make(chan *txAnnounce), | ||
|
@@ -199,8 +210,8 @@ func NewTxFetcherForTests( | |
quit: make(chan struct{}), | ||
waitlist: make(map[common.Hash]map[string]struct{}), | ||
waittime: make(map[common.Hash]mclock.AbsTime), | ||
waitslots: make(map[string]map[common.Hash]struct{}), | ||
announces: make(map[string]map[common.Hash]struct{}), | ||
waitslots: make(map[string]map[common.Hash]*txMetadata), | ||
announces: make(map[string]map[common.Hash]*txMetadata), | ||
announced: make(map[common.Hash]map[string]struct{}), | ||
fetching: make(map[common.Hash]string), | ||
requests: make(map[string]*txRequest), | ||
|
@@ -209,14 +220,15 @@ func NewTxFetcherForTests( | |
hasTx: hasTx, | ||
addTxs: addTxs, | ||
fetchTxs: fetchTxs, | ||
dropPeer: dropPeer, | ||
clock: clock, | ||
rand: rand, | ||
} | ||
} | ||
|
||
// Notify announces the fetcher of the potential availability of a new batch of | ||
// transactions in the network. | ||
func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error { | ||
func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []common.Hash) error { | ||
// Keep track of all the announced transactions | ||
txAnnounceInMeter.Mark(int64(len(hashes))) | ||
|
||
|
@@ -226,28 +238,35 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error { | |
// still valuable to check here because it runs concurrent to the internal | ||
// loop, so anything caught here is time saved internally. | ||
var ( | ||
unknowns = make([]common.Hash, 0, len(hashes)) | ||
unknownHashes = make([]common.Hash, 0, len(hashes)) | ||
unknownMetas = make([]*txMetadata, 0, len(hashes)) | ||
|
||
duplicate int64 | ||
underpriced int64 | ||
) | ||
for _, hash := range hashes { | ||
for i, hash := range hashes { | ||
switch { | ||
case f.hasTx(hash): | ||
duplicate++ | ||
case f.isKnownUnderpriced(hash): | ||
underpriced++ | ||
default: | ||
unknowns = append(unknowns, hash) | ||
unknownHashes = append(unknownHashes, hash) | ||
if types == nil { | ||
unknownMetas = append(unknownMetas, nil) | ||
} else { | ||
unknownMetas = append(unknownMetas, &txMetadata{kind: types[i], size: sizes[i]}) | ||
} | ||
} | ||
} | ||
txAnnounceKnownMeter.Mark(duplicate) | ||
txAnnounceUnderpricedMeter.Mark(underpriced) | ||
|
||
// If anything's left to announce, push it into the internal loop | ||
if len(unknowns) == 0 { | ||
if len(unknownHashes) == 0 { | ||
return nil | ||
} | ||
announce := &txAnnounce{origin: peer, hashes: unknowns} | ||
announce := &txAnnounce{origin: peer, hashes: unknownHashes, metas: unknownMetas} | ||
select { | ||
case f.notify <- announce: | ||
return nil | ||
|
@@ -290,6 +309,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) | |
// re-requesting them and dropping the peer in case of malicious transfers. | ||
var ( | ||
added = make([]common.Hash, 0, len(txs)) | ||
metas = make([]txMetadata, 0, len(txs)) | ||
) | ||
// proceed in batches | ||
for i := 0; i < len(txs); i += 128 { | ||
|
@@ -325,6 +345,10 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) | |
otherreject++ | ||
} | ||
added = append(added, batch[j].Hash()) | ||
metas = append(metas, txMetadata{ | ||
kind: batch[j].Type(), | ||
size: uint32(batch[j].Size()), | ||
}) | ||
} | ||
knownMeter.Mark(duplicate) | ||
underpricedMeter.Mark(underpriced) | ||
|
@@ -337,7 +361,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) | |
} | ||
} | ||
select { | ||
case f.cleanup <- &txDelivery{origin: peer, hashes: added, direct: direct}: | ||
case f.cleanup <- &txDelivery{origin: peer, hashes: added, metas: metas, direct: direct}: | ||
return nil | ||
case <-f.quit: | ||
return errTerminated | ||
|
@@ -394,13 +418,15 @@ func (f *TxFetcher) loop() { | |
want := used + len(ann.hashes) | ||
if want > maxTxAnnounces { | ||
txAnnounceDOSMeter.Mark(int64(want - maxTxAnnounces)) | ||
|
||
ann.hashes = ann.hashes[:want-maxTxAnnounces] | ||
ann.metas = ann.metas[:want-maxTxAnnounces] | ||
} | ||
// All is well, schedule the remainder of the transactions | ||
idleWait := len(f.waittime) == 0 | ||
_, oldPeer := f.announces[ann.origin] | ||
|
||
for _, hash := range ann.hashes { | ||
for i, hash := range ann.hashes { | ||
// If the transaction is already downloading, add it to the list | ||
// of possible alternates (in case the current retrieval fails) and | ||
// also account it for the peer. | ||
|
@@ -409,9 +435,9 @@ func (f *TxFetcher) loop() { | |
|
||
// Stage 2 and 3 share the set of origins per tx | ||
if announces := f.announces[ann.origin]; announces != nil { | ||
announces[hash] = struct{}{} | ||
announces[hash] = ann.metas[i] | ||
} else { | ||
f.announces[ann.origin] = map[common.Hash]struct{}{hash: {}} | ||
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]} | ||
} | ||
continue | ||
} | ||
|
@@ -422,22 +448,28 @@ func (f *TxFetcher) loop() { | |
|
||
// Stage 2 and 3 share the set of origins per tx | ||
if announces := f.announces[ann.origin]; announces != nil { | ||
announces[hash] = struct{}{} | ||
announces[hash] = ann.metas[i] | ||
} else { | ||
f.announces[ann.origin] = map[common.Hash]struct{}{hash: {}} | ||
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]} | ||
} | ||
continue | ||
} | ||
// If the transaction is already known to the fetcher, but not | ||
// yet downloading, add the peer as an alternate origin in the | ||
// waiting list. | ||
if f.waitlist[hash] != nil { | ||
// Ignore double announcements from the same peer. This is | ||
// especially important if metadata is also passed along to | ||
// prevent malicious peers flip-flopping good/bad values. | ||
if _, ok := f.waitlist[hash][ann.origin]; ok { | ||
continue | ||
} | ||
f.waitlist[hash][ann.origin] = struct{}{} | ||
|
||
if waitslots := f.waitslots[ann.origin]; waitslots != nil { | ||
waitslots[hash] = struct{}{} | ||
waitslots[hash] = ann.metas[i] | ||
} else { | ||
f.waitslots[ann.origin] = map[common.Hash]struct{}{hash: {}} | ||
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]} | ||
} | ||
continue | ||
} | ||
|
@@ -446,9 +478,9 @@ func (f *TxFetcher) loop() { | |
f.waittime[hash] = f.clock.Now() | ||
|
||
if waitslots := f.waitslots[ann.origin]; waitslots != nil { | ||
waitslots[hash] = struct{}{} | ||
waitslots[hash] = ann.metas[i] | ||
} else { | ||
f.waitslots[ann.origin] = map[common.Hash]struct{}{hash: {}} | ||
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]} | ||
} | ||
} | ||
// If a new item was added to the waitlist, schedule it into the fetcher | ||
|
@@ -474,9 +506,9 @@ func (f *TxFetcher) loop() { | |
f.announced[hash] = f.waitlist[hash] | ||
for peer := range f.waitlist[hash] { | ||
if announces := f.announces[peer]; announces != nil { | ||
announces[hash] = struct{}{} | ||
announces[hash] = f.waitslots[peer][hash] | ||
} else { | ||
f.announces[peer] = map[common.Hash]struct{}{hash: {}} | ||
f.announces[peer] = map[common.Hash]*txMetadata{hash: f.waitslots[peer][hash]} | ||
} | ||
delete(f.waitslots[peer], hash) | ||
if len(f.waitslots[peer]) == 0 { | ||
|
@@ -545,10 +577,27 @@ func (f *TxFetcher) loop() { | |
|
||
case delivery := <-f.cleanup: | ||
// Independent if the delivery was direct or broadcast, remove all | ||
// traces of the hash from internal trackers | ||
for _, hash := range delivery.hashes { | ||
// traces of the hash from internal trackers. That said, compare any | ||
// advertised metadata with the real ones and drop bad peers. | ||
for i, hash := range delivery.hashes { | ||
if _, ok := f.waitlist[hash]; ok { | ||
for peer, txset := range f.waitslots { | ||
if meta := txset[hash]; meta != nil { | ||
if delivery.metas[i].kind != meta.kind { | ||
log.Warn("Announced transaction type mismatch", "peer", peer, "tx", hash, "type", delivery.metas[i].kind, "ann", meta.kind) | ||
f.dropPeer(peer) | ||
} else if delivery.metas[i].size != meta.size { | ||
log.Warn("Announced transaction size mismatch", "peer", peer, "tx", hash, "size", delivery.metas[i].size, "ann", meta.size) | ||
if math.Abs(float64(delivery.metas[i].size)-float64(meta.size)) > 8 { | ||
// Normally we should drop a peer considering this is a protocol violation. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Impressive level of indentation reached here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I won't :D There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, it's very hard to follow the code here. Like, you check It might make perfect sense to you who wrote the code, but for a reviewer (and maybe even for yourself two years from now), it's pretty hard to ensure that it does what it should, and that all conditions are correct. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For example, couldn't it look something like this? (I mean part of it, there would need to be some more code too) case delivery := <-f.cleanup:
// Independent if the delivery was direct or broadcast, remove all
// traces of the hash from internal trackers. That said, compare any
// advertised metadata with the real ones and drop bad peers.
f.onDeliveries(delivery, func(announced, actual *txMetadata) {
if announced.kind != actual.kind {
log.Warn("Announced transaction type mismatch", "peer", peer, "tx", hash, "type", announced.kind, "ann", actual.kind)
f.dropPeer(peer)
} else if announced.size != meta.size {
log.Warn("Announced transaction size mismatch", "peer", peer, "tx", hash, "size", announced.size, "ann", actual.size)
if math.Abs(float64(announced.size)-float64(actual.size)) > 8 {
// Normally we should drop a peer considering this is a protocol violation.
// However, due to the RLP vs consensus format messyness, allow a few bytes
// wiggle-room where we only warn, but don't drop.
//
// TODO(karalabe): Get rid of this relaxation when clients are proven stable.
f.dropPeer(peer)
}
}
}) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this logic is used twice here wouldn't it make sense to combine it into an anonymous function?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah damn, Martin was quicker :D There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could probably clean the code up, but at this point I need to get this 5 liner change out of the way and focus on the rest of 4844 and don't want to diverge a month into splitting up the fetcher nice and elegantly. Though TBH, apart from introducing 10 more functions, I don't think it's gonna be much more elegant. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, but I'll take that "You could probably clean the code up" literally, and after we merge this PR, I want to clean it up. For example like this: Even if it means "introducing 10 more functions", I think 10 separate functions is preferrable over a gigantic switch with hundred-liner cases; it increases both readability and testability. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The catch with a lot of small methods is that people have the tendency to reuse them. Ok, this sounds weird. But the current logic has a very very strict flow. When there are many methods, people tend to maybe turn one or the other into more generic ones, tweak them, reuse them and then the flow can get completely whacked out because someone forgot some ordering constaint. That's usually the reason I do large methods. It's hard to mess it up in the future. |
||
// However, due to the RLP vs consensus format messyness, allow a few bytes | ||
// wiggle-room where we only warn, but don't drop. | ||
// | ||
// TODO(karalabe): Get rid of this relaxation when clients are proven stable. | ||
f.dropPeer(peer) | ||
} | ||
} | ||
} | ||
delete(txset, hash) | ||
if len(txset) == 0 { | ||
delete(f.waitslots, peer) | ||
|
@@ -558,6 +607,22 @@ func (f *TxFetcher) loop() { | |
delete(f.waittime, hash) | ||
} else { | ||
for peer, txset := range f.announces { | ||
if meta := txset[hash]; meta != nil { | ||
if delivery.metas[i].kind != meta.kind { | ||
log.Warn("Announced transaction type mismatch", "peer", peer, "tx", hash, "type", delivery.metas[i].kind, "ann", meta.kind) | ||
f.dropPeer(peer) | ||
} else if delivery.metas[i].size != meta.size { | ||
log.Warn("Announced transaction size mismatch", "peer", peer, "tx", hash, "size", delivery.metas[i].size, "ann", meta.size) | ||
if math.Abs(float64(delivery.metas[i].size)-float64(meta.size)) > 8 { | ||
// Normally we should drop a peer considering this is a protocol violation. | ||
// However, due to the RLP vs consensus format messyness, allow a few bytes | ||
// wiggle-room where we only warn, but don't drop. | ||
// | ||
// TODO(karalabe): Get rid of this relaxation when clients are proven stable. | ||
f.dropPeer(peer) | ||
} | ||
} | ||
} | ||
delete(txset, hash) | ||
if len(txset) == 0 { | ||
delete(f.announces, peer) | ||
|
@@ -859,7 +924,7 @@ func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string)) | |
|
||
// forEachHash does a range loop over a map of hashes in production, but during | ||
// testing it does a deterministic sorted random to allow reproducing issues. | ||
func (f *TxFetcher) forEachHash(hashes map[common.Hash]struct{}, do func(hash common.Hash) bool) { | ||
func (f *TxFetcher) forEachHash(hashes map[common.Hash]*txMetadata, do func(hash common.Hash) bool) { | ||
// If we're running production, use whatever Go's map gives us | ||
if f.rand == nil { | ||
for hash := range hashes { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just want to double-check the logic here. So, say
tx A
has sizeN
. Malicious peerA
announces it to us, as sizeM
and peerB
announces it as sizeN
(correctly).Now,
peer A
does not deliver, so we instead ask peerB
for it, and peerB
delivers a tx with sizeN
.A
to haveM
if coming fromA
andN
if coming fromB
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And also, would it make sense to somehow flag (warn) if we notice this behaviour: peers announcing different meta for a transaction.
It could point to shenanigans of this sort, and also of implementation flaws.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Answering my own question here:
Yes. So attack not possible here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... and furthermore, once the honest peer delivers
tx A
, the lie frompeer A
will be discovered and it will be ejected, since we iterate over all announcements inf.announces
. GoodThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the end, it will turn into a Warn when the bad peer gets dropped. IMO there's no need to find announce clashes at the moment of announcements, when delivery happens we'll know for sure who's right and if anyone's wrong.
I guess what we could do is to add a second warning log where we log that someone was a "bit" off, but not off enough to drop.