Skip to content

Commit

Permalink
eth/fetcher: throttle tx fetches to 128KB responses (#28304)
Browse files Browse the repository at this point in the history
* eth/fetcher: throttle tx fetches to 128KB responses

* eth/fetcher: unindent a clause per review request
  • Loading branch information
karalabe authored Oct 11, 2023
1 parent 7776a32 commit a6deb2d
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 37 deletions.
82 changes: 50 additions & 32 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,22 @@ const (
// can announce in a short time.
maxTxAnnounces = 4096

// maxTxRetrievals is the maximum transaction number can be fetched in one
// request. The rationale to pick 256 is:
// - In eth protocol, the softResponseLimit is 2MB. Nowadays according to
// Etherscan the average transaction size is around 200B, so in theory
// we can include lots of transaction in a single protocol packet.
// - However the maximum size of a single transaction is raised to 128KB,
// so pick a middle value here to ensure we can maximize the efficiency
// of the retrieval and response size overflow won't happen in most cases.
// maxTxRetrievals is the maximum number of transactions that can be fetched
// in one request. The rationale for picking 256 is to have a reasonabe lower
// bound for the transferred data (don't waste RTTs, transfer more meaningful
// batch sizes), but also have an upper bound on the sequentiality to allow
// using our entire peerset for deliveries.
//
// This number also acts as a failsafe against malicious announces which might
// cause us to request more data than we'd expect.
maxTxRetrievals = 256

// maxTxRetrievalSize is the max number of bytes that delivered transactions
// should weigh according to the announcements. The 128KB was chosen to limit
// retrieving a maximum of one blob transaction at a time to minimize hogging
// a connection between two peers.
maxTxRetrievalSize = 128 * 1024

// maxTxUnderpricedSetSize is the size of the underpriced transaction set that
// is used to track recent transactions that have been dropped so we don't
// re-request them.
Expand Down Expand Up @@ -859,25 +865,36 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{},
if len(f.announces[peer]) == 0 {
return // continue in the for-each
}
hashes := make([]common.Hash, 0, maxTxRetrievals)
f.forEachHash(f.announces[peer], func(hash common.Hash) bool {
if _, ok := f.fetching[hash]; !ok {
// Mark the hash as fetching and stash away possible alternates
f.fetching[hash] = peer

if _, ok := f.alternates[hash]; ok {
panic(fmt.Sprintf("alternate tracker already contains fetching item: %v", f.alternates[hash]))
}
f.alternates[hash] = f.announced[hash]
delete(f.announced, hash)
var (
hashes = make([]common.Hash, 0, maxTxRetrievals)
bytes uint64
)
f.forEachAnnounce(f.announces[peer], func(hash common.Hash, meta *txMetadata) bool {
// If the transaction is alcear fetching, skip to the next one
if _, ok := f.fetching[hash]; ok {
return true
}
// Mark the hash as fetching and stash away possible alternates
f.fetching[hash] = peer

// Accumulate the hash and stop if the limit was reached
hashes = append(hashes, hash)
if len(hashes) >= maxTxRetrievals {
return false // break in the for-each
if _, ok := f.alternates[hash]; ok {
panic(fmt.Sprintf("alternate tracker already contains fetching item: %v", f.alternates[hash]))
}
f.alternates[hash] = f.announced[hash]
delete(f.announced, hash)

// Accumulate the hash and stop if the limit was reached
hashes = append(hashes, hash)
if len(hashes) >= maxTxRetrievals {
return false // break in the for-each
}
if meta != nil { // Only set eth/68 and upwards
bytes += uint64(meta.size)
if bytes >= maxTxRetrievalSize {
return false
}
}
return true // continue in the for-each
return true // scheduled, try to add more
})
// If any hashes were allocated, request them from the peer
if len(hashes) > 0 {
Expand Down Expand Up @@ -922,27 +939,28 @@ func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string))
}
}

// forEachHash does a range loop over a map of hashes in production, but during
// testing it does a deterministic sorted random to allow reproducing issues.
func (f *TxFetcher) forEachHash(hashes map[common.Hash]*txMetadata, do func(hash common.Hash) bool) {
// forEachAnnounce does a range loop over a map of announcements in production,
// but during testing it does a deterministic sorted random to allow reproducing
// issues.
func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadata, do func(hash common.Hash, meta *txMetadata) bool) {
// If we're running production, use whatever Go's map gives us
if f.rand == nil {
for hash := range hashes {
if !do(hash) {
for hash, meta := range announces {
if !do(hash, meta) {
return
}
}
return
}
// We're running the test suite, make iteration deterministic
list := make([]common.Hash, 0, len(hashes))
for hash := range hashes {
list := make([]common.Hash, 0, len(announces))
for hash := range announces {
list = append(list, hash)
}
sortHashes(list)
rotateHashes(list, f.rand.Intn(len(list)))
for _, hash := range list {
if !do(hash) {
if !do(hash, announces[hash]) {
return
}
}
Expand Down
72 changes: 67 additions & 5 deletions eth/fetcher/tx_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params"
)

var (
Expand Down Expand Up @@ -993,15 +994,14 @@ func TestTransactionFetcherTimeoutTimerResets(t *testing.T) {
})
}

// Tests that if thousands of transactions are announces, only a small
// Tests that if thousands of transactions are announced, only a small
// number of them will be requested at a time.
func TestTransactionFetcherRateLimiting(t *testing.T) {
// Create a slew of transactions and to announce them
// Create a slew of transactions and announce them
var hashes []common.Hash
for i := 0; i < maxTxAnnounces; i++ {
hashes = append(hashes, common.Hash{byte(i / 256), byte(i % 256)})
}

testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
Expand Down Expand Up @@ -1029,6 +1029,68 @@ func TestTransactionFetcherRateLimiting(t *testing.T) {
})
}

// Tests that if huge transactions are announced, only a small number of them will
// be requested at a time, to keep the responses below a resonable level.
func TestTransactionFetcherBandwidthLimiting(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
// Announce mid size transactions from A to verify that multiple
// ones can be piled into a single request.
doTxNotify{peer: "A",
hashes: []common.Hash{{0x01}, {0x02}, {0x03}, {0x04}},
types: []byte{types.LegacyTxType, types.LegacyTxType, types.LegacyTxType, types.LegacyTxType},
sizes: []uint32{48 * 1024, 48 * 1024, 48 * 1024, 48 * 1024},
},
// Announce exactly on the limit transactions to see that only one
// gets requested
doTxNotify{peer: "B",
hashes: []common.Hash{{0x05}, {0x06}},
types: []byte{types.LegacyTxType, types.LegacyTxType},
sizes: []uint32{maxTxRetrievalSize, maxTxRetrievalSize},
},
// Announce oversized blob transactions to see that overflows are ok
doTxNotify{peer: "C",
hashes: []common.Hash{{0x07}, {0x08}},
types: []byte{types.BlobTxType, types.BlobTxType},
sizes: []uint32{params.MaxBlobGasPerBlock, params.MaxBlobGasPerBlock},
},
doWait{time: txArriveTimeout, step: true},
isWaiting(nil),
isScheduledWithMeta{
tracking: map[string][]announce{
"A": {
{common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)},
{common.Hash{0x02}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)},
{common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)},
{common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)},
},
"B": {
{common.Hash{0x05}, typeptr(types.LegacyTxType), sizeptr(maxTxRetrievalSize)},
{common.Hash{0x06}, typeptr(types.LegacyTxType), sizeptr(maxTxRetrievalSize)},
},
"C": {
{common.Hash{0x07}, typeptr(types.BlobTxType), sizeptr(params.MaxBlobGasPerBlock)},
{common.Hash{0x08}, typeptr(types.BlobTxType), sizeptr(params.MaxBlobGasPerBlock)},
},
},
fetching: map[string][]common.Hash{
"A": {{0x02}, {0x03}, {0x04}},
"B": {{0x06}},
"C": {{0x08}},
},
},
},
})
}

// Tests that then number of transactions a peer is allowed to announce and/or
// request at the same time is hard capped.
func TestTransactionFetcherDoSProtection(t *testing.T) {
Expand Down Expand Up @@ -1664,7 +1726,7 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
if (meta == nil && (ann.kind != nil || ann.size != nil)) ||
(meta != nil && (ann.kind == nil || ann.size == nil)) ||
(meta != nil && (meta.kind != *ann.kind || meta.size != *ann.size)) {
t.Errorf("step %d, peer %s, hash %x: waitslot metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, ann.kind, ann.size)
t.Errorf("step %d, peer %s, hash %x: waitslot metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, *ann.kind, *ann.size)
}
}
}
Expand Down Expand Up @@ -1733,7 +1795,7 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
if (meta == nil && (ann.kind != nil || ann.size != nil)) ||
(meta != nil && (ann.kind == nil || ann.size == nil)) ||
(meta != nil && (meta.kind != *ann.kind || meta.size != *ann.size)) {
t.Errorf("step %d, peer %s, hash %x: announce metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, ann.kind, ann.size)
t.Errorf("step %d, peer %s, hash %x: announce metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, *ann.kind, *ann.size)
}
}
}
Expand Down

0 comments on commit a6deb2d

Please sign in to comment.