Skip to content

Commit

Permalink
core/txpool: implement DoS defenses from geth (#778)
Browse files Browse the repository at this point in the history
  • Loading branch information
manav2401 authored Mar 14, 2023
1 parent d0a9e0d commit 0d29c36
Show file tree
Hide file tree
Showing 4 changed files with 371 additions and 25 deletions.
51 changes: 42 additions & 9 deletions core/tx_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,17 +253,19 @@ type txList struct {
strict bool // Whether nonces are strictly continuous or not
txs *txSortedMap // Heap indexed sorted hash map of the transactions

costcap *big.Int // Price of the highest costing transaction (reset only if exceeds balance)
gascap uint64 // Gas limit of the highest spending transaction (reset only if exceeds block limit)
costcap *big.Int // Price of the highest costing transaction (reset only if exceeds balance)
gascap uint64 // Gas limit of the highest spending transaction (reset only if exceeds block limit)
totalcost *big.Int // Total cost of all transactions in the list
}

// newTxList create a new transaction list for maintaining nonce-indexable fast,
// gapped, sortable transaction lists.
func newTxList(strict bool) *txList {
return &txList{
strict: strict,
txs: newTxSortedMap(),
costcap: new(big.Int),
strict: strict,
txs: newTxSortedMap(),
costcap: new(big.Int),
totalcost: new(big.Int),
}
}

Expand Down Expand Up @@ -301,7 +303,13 @@ func (l *txList) Add(tx *types.Transaction, priceBump uint64) (bool, *types.Tran
if tx.GasFeeCapIntCmp(thresholdFeeCap) < 0 || tx.GasTipCapIntCmp(thresholdTip) < 0 {
return false, nil
}
// Old is being replaced, subtract old cost
l.subTotalCost([]*types.Transaction{old})
}

// Add new tx cost to totalcost
l.totalcost.Add(l.totalcost, tx.Cost())

// Otherwise overwrite the old transaction with the current one
l.txs.Put(tx)
if cost := tx.Cost(); l.costcap.Cmp(cost) < 0 {
Expand All @@ -317,7 +325,10 @@ func (l *txList) Add(tx *types.Transaction, priceBump uint64) (bool, *types.Tran
// provided threshold. Every removed transaction is returned for any post-removal
// maintenance.
func (l *txList) Forward(threshold uint64) types.Transactions {
return l.txs.Forward(threshold)
txs := l.txs.Forward(threshold)
l.subTotalCost(txs)

return txs
}

// Filter removes all transactions from the list with a cost or gas limit higher
Expand Down Expand Up @@ -356,14 +367,20 @@ func (l *txList) Filter(costLimit *big.Int, gasLimit uint64) (types.Transactions
}
invalids = l.txs.filter(func(tx *types.Transaction) bool { return tx.Nonce() > lowest })
}
// Reset total cost
l.subTotalCost(removed)
l.subTotalCost(invalids)
l.txs.reheap()
return removed, invalids
}

// Cap places a hard limit on the number of items, returning all transactions
// exceeding that limit.
func (l *txList) Cap(threshold int) types.Transactions {
return l.txs.Cap(threshold)
txs := l.txs.Cap(threshold)
l.subTotalCost(txs)

return txs
}

// Remove deletes a transaction from the maintained list, returning whether the
Expand All @@ -375,9 +392,14 @@ func (l *txList) Remove(tx *types.Transaction) (bool, types.Transactions) {
if removed := l.txs.Remove(nonce); !removed {
return false, nil
}

l.subTotalCost([]*types.Transaction{tx})
// In strict mode, filter out non-executable transactions
if l.strict {
return true, l.txs.Filter(func(tx *types.Transaction) bool { return tx.Nonce() > nonce })
txs := l.txs.Filter(func(tx *types.Transaction) bool { return tx.Nonce() > nonce })
l.subTotalCost(txs)

return true, txs
}
return true, nil
}
Expand All @@ -390,7 +412,10 @@ func (l *txList) Remove(tx *types.Transaction) (bool, types.Transactions) {
// prevent getting into and invalid state. This is not something that should ever
// happen but better to be self correcting than failing!
func (l *txList) Ready(start uint64) types.Transactions {
return l.txs.Ready(start)
txs := l.txs.Ready(start)
l.subTotalCost(txs)

return txs
}

// Len returns the length of the transaction list.
Expand All @@ -416,6 +441,14 @@ func (l *txList) LastElement() *types.Transaction {
return l.txs.LastElement()
}

// subTotalCost subtracts the cost of the given transactions from the
// total cost of all transactions.
func (l *txList) subTotalCost(txs []*types.Transaction) {
for _, tx := range txs {
l.totalcost.Sub(l.totalcost, tx.Cost())
}
}

// priceHeap is a heap.Interface implementation over transactions for retrieving
// price-sorted transactions to discard when the pool fills up. If baseFee is set
// then the heap is sorted based on the effective tip based on the given base fee.
Expand Down
85 changes: 77 additions & 8 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package core

import (
"container/heap"
"errors"
"fmt"
"math"
Expand Down Expand Up @@ -86,6 +87,14 @@ var (
// than some meaningful limit a user might use. This is not a consensus error
// making the transaction invalid, rather a DOS protection.
ErrOversizedData = errors.New("oversized data")

// ErrFutureReplacePending is returned if a future transaction replaces a pending
// transaction. Future transactions should only be able to replace other future transactions.
ErrFutureReplacePending = errors.New("future transaction tries to replace pending")

// ErrOverdraft is returned if a transaction would cause the senders balance to go negative
// thus invalidating a potential large number of transactions.
ErrOverdraft = errors.New("transaction would cause overdraft")
)

var (
Expand Down Expand Up @@ -645,9 +654,24 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
}
// Transactor should have enough funds to cover the costs
// cost == V + GP * GL
if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
balance := pool.currentState.GetBalance(from)
if balance.Cmp(tx.Cost()) < 0 {
return ErrInsufficientFunds
}
// Verify that replacing transactions will not result in overdraft
list := pool.pending[from]
if list != nil { // Sender already has pending txs
sum := new(big.Int).Add(tx.Cost(), list.totalcost)
if repl := list.txs.Get(tx.Nonce()); repl != nil {
// Deduct the cost of a transaction replaced by this
sum.Sub(sum, repl.Cost())
}

if balance.Cmp(sum) < 0 {
log.Trace("Replacing transactions would overdraft", "sender", from, "balance", pool.currentState.GetBalance(from), "required", sum)
return ErrOverdraft
}
}
// Ensure the transaction has more gas than the basic tx fee.
intrGas, err := IntrinsicGas(tx.Data(), tx.AccessList(), tx.To() == nil, true, pool.istanbul)
if err != nil {
Expand Down Expand Up @@ -684,6 +708,10 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
invalidTxMeter.Mark(1)
return false, err
}

// already validated by this point
from, _ := types.Sender(pool.signer, tx)

// If the transaction pool is full, discard underpriced transactions
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
// If the new transaction is underpriced, don't accept it
Expand All @@ -692,6 +720,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
underpricedTxMeter.Mark(1)
return false, ErrUnderpriced
}

// We're about to replace a transaction. The reorg does a more thorough
// analysis of what to remove and how, but it runs async. We don't want to
// do too many replacements between reorg-runs, so we cap the number of
Expand All @@ -712,17 +741,39 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
overflowedTxMeter.Mark(1)
return false, ErrTxPoolOverflow
}
// Bump the counter of rejections-since-reorg
pool.changesSinceReorg += len(drop)
// If the new transaction is a future transaction it should never churn pending transactions
if pool.isFuture(from, tx) {
var replacesPending bool

for _, dropTx := range drop {
dropSender, _ := types.Sender(pool.signer, dropTx)
if list := pool.pending[dropSender]; list != nil && list.Overlaps(dropTx) {
replacesPending = true
break
}
}
// Add all transactions back to the priced queue
if replacesPending {
for _, dropTx := range drop {
heap.Push(&pool.priced.urgent, dropTx)
}

log.Trace("Discarding future transaction replacing pending tx", "hash", hash)

return false, ErrFutureReplacePending
}
}
// Kick out the underpriced remote transactions.
for _, tx := range drop {
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
underpricedTxMeter.Mark(1)
pool.removeTx(tx.Hash(), false)

dropped := pool.removeTx(tx.Hash(), false)
pool.changesSinceReorg += dropped
}
}

// Try to replace an existing transaction in the pending pool
from, _ := types.Sender(pool.signer, tx) // already validated
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
// Nonce already pending, check if required price bump is met
inserted, old := list.Add(tx, pool.config.PriceBump)
Expand Down Expand Up @@ -766,6 +817,20 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
return replaced, nil
}

// isFuture reports whether the given transaction is immediately executable.
func (pool *TxPool) isFuture(from common.Address, tx *types.Transaction) bool {
list := pool.pending[from]
if list == nil {
return pool.pendingNonces.get(from) != tx.Nonce()
}
// Sender has pending transactions.
if old := list.txs.Get(tx.Nonce()); old != nil {
return false // It replaces a pending transaction.
}
// Not replacing, check if parent nonce exists in pending.
return list.txs.Get(tx.Nonce()-1) == nil
}

// enqueueTx inserts a new transaction into the non-executable transaction queue.
//
// Note, this method assumes the pool lock is held!
Expand Down Expand Up @@ -1013,11 +1078,12 @@ func (pool *TxPool) Has(hash common.Hash) bool {

// removeTx removes a single transaction from the queue, moving all subsequent
// transactions back to the future queue.
func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
// Returns the number of transactions removed from the pending queue.
func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) int {
// Fetch the transaction we wish to delete
tx := pool.all.Get(hash)
if tx == nil {
return
return 0
}
addr, _ := types.Sender(pool.signer, tx) // already validated during insertion

Expand Down Expand Up @@ -1045,7 +1111,8 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
pool.pendingNonces.setIfLower(addr, tx.Nonce())
// Reduce the pending counter
pendingGauge.Dec(int64(1 + len(invalids)))
return

return 1 + len(invalids)
}
}
// Transaction is in the future queue
Expand All @@ -1059,6 +1126,8 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
delete(pool.beats, addr)
}
}

return 0
}

// requestReset requests a pool reset to the new head block.
Expand Down
31 changes: 23 additions & 8 deletions core/tx_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ func validateTxPoolInternals(pool *TxPool) error {
if nonce := pool.pendingNonces.get(addr); nonce != last+1 {
return fmt.Errorf("pending nonce mismatch: have %v, want %v", nonce, last+1)
}

if txs.totalcost.Cmp(common.Big0) < 0 {
return fmt.Errorf("totalcost went negative: %v", txs.totalcost)
}
}
return nil
}
Expand Down Expand Up @@ -1128,7 +1132,7 @@ func TestTransactionPendingLimiting(t *testing.T) {
defer pool.Stop()

account := crypto.PubkeyToAddress(key.PublicKey)
testAddBalance(pool, account, big.NewInt(1000000))
testAddBalance(pool, account, big.NewInt(1000000000000))

// Keep track of transaction events to ensure all executables get announced
events := make(chan NewTxsEvent, testTxPoolConfig.AccountQueue+5)
Expand Down Expand Up @@ -1607,7 +1611,7 @@ func TestTransactionPoolRepricingKeepsLocals(t *testing.T) {
keys := make([]*ecdsa.PrivateKey, 3)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000*1000000))
testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(100000*1000000))
}
// Create transaction (both pending and queued) with a linearly growing gasprice
for i := uint64(0); i < 500; i++ {
Expand Down Expand Up @@ -1686,7 +1690,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
defer sub.Unsubscribe()

// Create a number of test accounts and fund them
keys := make([]*ecdsa.PrivateKey, 4)
keys := make([]*ecdsa.PrivateKey, 5)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
Expand Down Expand Up @@ -1722,6 +1726,10 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(1), keys[1])); err != ErrUnderpriced {
t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, ErrUnderpriced)
}
// Replace a future transaction with a future transaction
if err := pool.AddRemote(pricedTransaction(1, 100000, big.NewInt(2), keys[1])); err != nil { // +K1:1 => -K1:1 => Pend K0:0, K0:1, K2:0; Que K1:1
t.Fatalf("failed to add well priced transaction: %v", err)
}
// Ensure that adding high priced transactions drops cheap ones, but not own
if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil { // +K1:0 => -K1:1 => Pend K0:0, K0:1, K1:0, K2:0; Que -
t.Fatalf("failed to add well priced transaction: %v", err)
Expand All @@ -1732,14 +1740,19 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
if err := pool.AddRemote(pricedTransaction(3, 100000, big.NewInt(5), keys[1])); err != nil { // +K1:3 => -K0:1 => Pend K1:0, K2:0; Que K1:2 K1:3
t.Fatalf("failed to add well priced transaction: %v", err)
}
// Ensure that replacing a pending transaction with a future transaction fails
if err := pool.AddRemote(pricedTransaction(5, 100000, big.NewInt(6), keys[1])); err != ErrFutureReplacePending {
t.Fatalf("adding future replace transaction error mismatch: have %v, want %v", err, ErrFutureReplacePending)
}
pending, queued = pool.Stats()
if pending != 2 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
}
if queued != 2 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
}
if err := validateEvents(events, 1); err != nil {

if err := validateEvents(events, 2); err != nil {
t.Fatalf("additional event firing failed: %v", err)
}
if err := validateTxPoolInternals(pool); err != nil {
Expand Down Expand Up @@ -1901,11 +1914,12 @@ func TestTransactionPoolUnderpricingDynamicFee(t *testing.T) {
t.Fatalf("failed to add well priced transaction: %v", err)
}

tx = pricedTransaction(2, 100000, big.NewInt(3), keys[1])
tx = pricedTransaction(1, 100000, big.NewInt(3), keys[1])
if err := pool.AddRemote(tx); err != nil { // +K1:2, -K0:1 => Pend K0:0 K1:0, K2:0; Que K1:2
t.Fatalf("failed to add well priced transaction: %v", err)
}
tx = dynamicFeeTx(3, 100000, big.NewInt(4), big.NewInt(1), keys[1])

tx = dynamicFeeTx(2, 100000, big.NewInt(4), big.NewInt(1), keys[1])
if err := pool.AddRemote(tx); err != nil { // +K1:3, -K1:0 => Pend K0:0 K2:0; Que K1:2 K1:3
t.Fatalf("failed to add well priced transaction: %v", err)
}
Expand All @@ -1916,7 +1930,8 @@ func TestTransactionPoolUnderpricingDynamicFee(t *testing.T) {
if queued != 2 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
}
if err := validateEvents(events, 1); err != nil {

if err := validateEvents(events, 2); err != nil {
t.Fatalf("additional event firing failed: %v", err)
}
if err := validateTxPoolInternals(pool); err != nil {
Expand Down Expand Up @@ -2510,7 +2525,7 @@ func benchmarkPoolBatchInsert(b *testing.B, size int, local bool) {
defer pool.Stop()

account := crypto.PubkeyToAddress(key.PublicKey)
testAddBalance(pool, account, big.NewInt(1000000))
testAddBalance(pool, account, big.NewInt(1000000000000000000))

batches := make([]types.Transactions, b.N)
for i := 0; i < b.N; i++ {
Expand Down
Loading

0 comments on commit 0d29c36

Please sign in to comment.