Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: txpool pending limit #1107

Merged
merged 9 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ var (
utils.TxPoolGlobalSlotsFlag,
utils.TxPoolAccountQueueFlag,
utils.TxPoolGlobalQueueFlag,
utils.TxPoolAccountPendingLimitFlag,
utils.TxPoolLifetimeFlag,
utils.SyncModeFlag,
utils.ExitWhenSyncedFlag,
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.TxPoolGlobalSlotsFlag,
utils.TxPoolAccountQueueFlag,
utils.TxPoolGlobalQueueFlag,
utils.TxPoolAccountPendingLimitFlag,
utils.TxPoolLifetimeFlag,
},
},
Expand Down
8 changes: 8 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,11 @@ var (
Usage: "Maximum number of non-executable transaction slots for all accounts",
Value: ethconfig.Defaults.TxPool.GlobalQueue,
}
TxPoolAccountPendingLimitFlag = cli.Uint64Flag{
Name: "txpool.accountpendinglimit",
Usage: "Maximum number of executable transactions allowed per account",
Value: ethconfig.Defaults.TxPool.AccountPendingLimit,
}
TxPoolLifetimeFlag = cli.DurationFlag{
Name: "txpool.lifetime",
Usage: "Maximum amount of time non-executable transaction are queued",
Expand Down Expand Up @@ -1519,6 +1524,9 @@ func setTxPool(ctx *cli.Context, cfg *core.TxPoolConfig) {
if ctx.GlobalIsSet(TxPoolGlobalQueueFlag.Name) {
cfg.GlobalQueue = ctx.GlobalUint64(TxPoolGlobalQueueFlag.Name)
}
if ctx.GlobalIsSet(TxPoolAccountPendingLimitFlag.Name) {
cfg.AccountPendingLimit = ctx.GlobalUint64(TxPoolAccountPendingLimitFlag.Name)
}
if ctx.GlobalIsSet(TxPoolLifetimeFlag.Name) {
cfg.Lifetime = ctx.GlobalDuration(TxPoolLifetimeFlag.Name)
}
Expand Down
54 changes: 54 additions & 0 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ var (
pendingReplaceMeter = metrics.NewRegisteredMeter("txpool/pending/replace", nil)
pendingRateLimitMeter = metrics.NewRegisteredMeter("txpool/pending/ratelimit", nil) // Dropped due to rate limiting
pendingNofundsMeter = metrics.NewRegisteredMeter("txpool/pending/nofunds", nil) // Dropped due to out-of-funds
pendingEvictionMeter = metrics.NewRegisteredMeter("txpool/pending/eviction", nil) // Dropped due to lifetime

// Metrics for the queued pool
queuedDiscardMeter = metrics.NewRegisteredMeter("txpool/queued/discard", nil)
Expand Down Expand Up @@ -178,6 +179,8 @@ type TxPoolConfig struct {
AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts

AccountPendingLimit uint64 // Number of executable transactions allowed per account

Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
}

Expand All @@ -195,6 +198,8 @@ var DefaultTxPoolConfig = TxPoolConfig{
AccountQueue: 64,
GlobalQueue: 1024,

AccountPendingLimit: 1024,

Lifetime: 3 * time.Hour,
}

Expand Down Expand Up @@ -230,6 +235,10 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
log.Warn("Sanitizing invalid txpool global queue", "provided", conf.GlobalQueue, "updated", DefaultTxPoolConfig.GlobalQueue)
conf.GlobalQueue = DefaultTxPoolConfig.GlobalQueue
}
if conf.AccountPendingLimit < 1 {
log.Warn("Sanitizing invalid txpool account pending limit", "provided", conf.AccountPendingLimit, "updated", DefaultTxPoolConfig.AccountPendingLimit)
conf.AccountPendingLimit = DefaultTxPoolConfig.AccountPendingLimit
}
if conf.Lifetime < 1 {
log.Warn("Sanitizing invalid txpool lifetime", "provided", conf.Lifetime, "updated", DefaultTxPoolConfig.Lifetime)
conf.Lifetime = DefaultTxPoolConfig.Lifetime
Expand Down Expand Up @@ -422,6 +431,7 @@ func (pool *TxPool) loop() {
// Handle inactive account transaction eviction
case <-evict.C:
pool.mu.Lock()
// Evict queued transactions
for addr := range pool.queue {
// Skip local transactions from the eviction mechanism
if pool.locals.contains(addr) {
Expand All @@ -437,6 +447,22 @@ func (pool *TxPool) loop() {
queuedEvictionMeter.Mark(int64(len(list)))
}
}
// Evict pending transactions
for addr := range pool.pending {
// Skip local transactions from the eviction mechanism
if pool.locals.contains(addr) {
continue
}
// Any non-locals old enough should be removed
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
Thegaram marked this conversation as resolved.
Show resolved Hide resolved
list := pool.pending[addr].Flatten()
Thegaram marked this conversation as resolved.
Show resolved Hide resolved
for _, tx := range list {
log.Trace("Evicting transaction due to timeout", "account", addr.Hex(), "hash", tx.Hash().Hex(), "lifetime sec", time.Since(pool.beats[addr]).Seconds(), "lifetime limit sec", pool.config.Lifetime.Seconds())
Thegaram marked this conversation as resolved.
Show resolved Hide resolved
pool.removeTx(tx.Hash(), true)
}
pendingEvictionMeter.Mark(int64(len(list)))
}
}
pool.mu.Unlock()

// Handle local transaction journal rotation
Expand Down Expand Up @@ -957,6 +983,11 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
}
list := pool.pending[addr]

// Account pending list is full
if uint64(list.Len()) >= pool.config.AccountPendingLimit {
return false
}

inserted, old := list.Add(tx, pool.currentState, pool.config.PriceBump, pool.chainconfig, pool.currentHead)
if !inserted {
// An older transaction was better, discard this
Expand Down Expand Up @@ -1574,6 +1605,29 @@ func (pool *TxPool) executableTxFilter(costLimit *big.Int) func(tx *types.Transa
// pending limit. The algorithm tries to reduce transaction counts by an approximately
// equal number for all for accounts with many pending transactions.
func (pool *TxPool) truncatePending() {
// Truncate pending lists to max length
for addr, list := range pool.pending {
if list.Len() > int(pool.config.AccountPendingLimit) {
caps := list.Cap(int(pool.config.AccountPendingLimit))
for _, tx := range caps {
// Drop the transaction from the global pools too
hash := tx.Hash()
pool.all.Remove(hash)
pool.calculateTxsLifecycle(types.Transactions{tx}, time.Now())

// Update the account nonce to the dropped transaction
// note: this will set pending nonce to the min nonce from the discarded txs
pool.pendingNonces.setIfLower(addr, tx.Nonce())
log.Trace("Removed pending transaction to comply with hard limit", "hash", hash.Hex())
}
pool.priced.Removed(len(caps))
pendingGauge.Dec(int64(len(caps)))
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(caps)))
}
}
}

pending := uint64(0)
for _, list := range pool.pending {
pending += uint64(list.Len())
Expand Down
11 changes: 9 additions & 2 deletions core/tx_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1097,8 +1097,14 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
// The whole life time pass after last promotion, kick out stale transactions
time.Sleep(2 * config.Lifetime)
pending, queued = pool.Stats()
if pending != 2 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
if nolocals {
if pending != 0 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
}
} else {
if pending != 1 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1)
}
}
if nolocals {
if queued != 0 {
Expand Down Expand Up @@ -1158,6 +1164,7 @@ func TestTransactionPendingLimiting(t *testing.T) {
// Tests that if the transaction count belonging to multiple accounts go above
// some hard threshold, the higher transactions are dropped to prevent DOS
// attacks.
// TODO
func TestTransactionPendingGlobalLimiting(t *testing.T) {
t.Parallel()

Expand Down