diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index b613d0acc5..2afc1a4473 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -1202,6 +1202,8 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction { return item } +func (p *BlobPool) PreDropPending(txs []*types.Transaction) {} + // Add inserts a set of blob transactions into the pool if they pass validation (both // consensus validity and pool restrictions). func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error { diff --git a/core/txpool/bundlepool/bundlepool.go b/core/txpool/bundlepool/bundlepool.go index a1b0e1a838..60d8e2f615 100644 --- a/core/txpool/bundlepool/bundlepool.go +++ b/core/txpool/bundlepool/bundlepool.go @@ -4,13 +4,14 @@ import ( "container/heap" "context" "errors" - mapset "github.com/deckarep/golang-set/v2" - "github.com/ethereum/go-ethereum/miner" - "github.com/ethereum/go-ethereum/rpc" "math/big" "sync" "time" + mapset "github.com/deckarep/golang-set/v2" + "github.com/ethereum/go-ethereum/miner" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" @@ -289,6 +290,8 @@ func (p *BundlePool) Get(hash common.Hash) *types.Transaction { return nil } +func (p *BundlePool) PreDropPending(txs []*types.Transaction) {} + // Add enqueues a batch of transactions into the pool if they are valid. Due // to the large transaction churn, add may postpone fully integrating the tx // to a later point to batch multiple ones together. diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 43d60a0793..47f10d4d6d 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -159,7 +159,8 @@ var ( reorgCount = metrics.NewRegisteredCounter("txpool/reorg/count", nil) resetCount = metrics.NewRegisteredCounter("txpool/reorg/reset/count", nil) - loopReportTimer = metrics.NewRegisteredTimer("txpool/loop/report", nil) + loopReportTimer = metrics.NewRegisteredTimer("txpool/loop/report", nil) + loopPredropPendingTimer = metrics.NewRegisteredTimer("txpool/loop/predroppending", nil) ) // BlockChain defines the minimal set of methods needed to back a tx pool with @@ -321,6 +322,8 @@ type LegacyPool struct { changesSinceReorg int // A counter for how many drops we've performed in-between reorg. l1CostFn txpool.L1CostFunc // To apply L1 costs as rollup, optional field, may be nil. + + preDropPendingCh chan []*types.Transaction } type txpoolResetRequest struct { @@ -335,21 +338,22 @@ func New(config Config, chain BlockChain) *LegacyPool { // Create the transaction pool with its initial settings pool := &LegacyPool{ - config: config, - chain: chain, - chainconfig: chain.Config(), - signer: types.LatestSigner(chain.Config()), - pending: make(map[common.Address]*list, config.GlobalSlots), - queue: make(map[common.Address]*list, config.GlobalQueue), - beats: make(map[common.Address]time.Time), - all: newLookup(), - reqResetCh: make(chan *txpoolResetRequest), - reqPromoteCh: make(chan *accountSet), - queueTxEventCh: make(chan *types.Transaction), - reorgDoneCh: make(chan chan struct{}), - reorgShutdownCh: make(chan struct{}), - initDoneCh: make(chan struct{}), - pendingCache: newCacheForMiner(), + config: config, + chain: chain, + chainconfig: chain.Config(), + signer: types.LatestSigner(chain.Config()), + pending: make(map[common.Address]*list, config.GlobalSlots), + queue: make(map[common.Address]*list, config.GlobalQueue), + beats: make(map[common.Address]time.Time), + all: newLookup(), + reqResetCh: make(chan *txpoolResetRequest), + reqPromoteCh: make(chan *accountSet), + queueTxEventCh: make(chan *types.Transaction), + reorgDoneCh: make(chan chan struct{}), + reorgShutdownCh: make(chan struct{}), + initDoneCh: make(chan struct{}), + pendingCache: newCacheForMiner(), + preDropPendingCh: make(chan []*types.Transaction, 10), } if !config.EnableCache { pool.pendingCache = newNoneCacheForMiner(pool) @@ -451,6 +455,30 @@ func (pool *LegacyPool) loop() { close(pool.initDoneCh) for { select { + case txs := <-pool.preDropPendingCh: + pool.mu.Lock() + start := time.Now() + dropCount := 0 + for _, tx := range txs { + hash := tx.Hash() + existTx := pool.all.Get(hash) + if existTx == nil { + continue + } + addr, _ := types.Sender(pool.signer, tx) + if list := pool.pending[addr]; list != nil { + if removed := list.SimpleRemove(tx); removed { + pool.all.Remove(hash) + pool.priced.Removed(1) + pendingGauge.Dec(int64(1)) + pool.pendingCounter -= 1 + dropCount++ + } + } + } + log.Info("txpool-trace pre drop pends", "dropped", dropCount, "remaining", pool.pendingCounter) + loopPredropPendingTimer.UpdateSince(start) + pool.mu.Unlock() // Handle pool shutdown case <-pool.reorgShutdownCh: return @@ -535,6 +563,10 @@ func (pool *LegacyPool) loop() { } } +func (p *LegacyPool) PreDropPending(txs []*types.Transaction) { + p.preDropPendingCh <- txs +} + // Close terminates the transaction pool. func (pool *LegacyPool) Close() error { // Terminate the pool reorger and return diff --git a/core/txpool/legacypool/list.go b/core/txpool/legacypool/list.go index 999ac9c0ea..c16901b4f6 100644 --- a/core/txpool/legacypool/list.go +++ b/core/txpool/legacypool/list.go @@ -438,6 +438,16 @@ func (l *list) Remove(tx *types.Transaction) (bool, types.Transactions) { return true, nil } +func (l *list) SimpleRemove(tx *types.Transaction) bool { + // Remove the transaction from the set + nonce := tx.Nonce() + if removed := l.txs.Remove(nonce); !removed { + return false + } + l.subTotalCost([]*types.Transaction{tx}) + return true +} + // Ready retrieves a sequentially increasing list of transactions starting at the // provided nonce that is ready for processing. The returned transactions will be // removed from the list. diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index e4f89d07f6..86cad8f0da 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -166,6 +166,8 @@ type SubPool interface { // Status returns the known status (unknown/pending/queued) of a transaction // identified by their hashes. Status(hash common.Hash) TxStatus + + PreDropPending(txs []*types.Transaction) } type BundleSubpool interface { diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 522a2f74c2..db2aa23aed 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -278,6 +278,12 @@ func (p *TxPool) loop(head *types.Header, chain BlockChain) { errc <- nil } +func (p *TxPool) PreDropPending(txs []*types.Transaction) { + for _, subpool := range p.subpools { + subpool.PreDropPending(txs) + } +} + // SetGasTip updates the minimum gas tip required by the transaction pool for a // new transaction, and drops all transactions below this threshold. func (p *TxPool) SetGasTip(tip *big.Int) { diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index 9d8e6e4937..18ade1cc04 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -625,6 +625,9 @@ func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashe } log.Info("perf-trace txpool-trace newPayload ExecutableDataToBlock", "duration", common.PrettyDuration(time.Since(start)), "hash", params.BlockHash, "number", params.Number) + txpool := api.eth.TxPool() + txpool.PreDropPending(block.Transactions()) + // Stash away the last update to warn the user if the beacon client goes offline api.lastNewPayloadLock.Lock() api.lastNewPayloadUpdate = time.Now()