Skip to content

Commit

Permalink
txpool: drop pending in front when new payload
Browse files Browse the repository at this point in the history
  • Loading branch information
bnoieh committed Dec 16, 2024
1 parent e871e53 commit c234e53
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 19 deletions.
2 changes: 2 additions & 0 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,8 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
return item
}

func (p *BlobPool) PreDropPending(txs []*types.Transaction) {}

// Add inserts a set of blob transactions into the pool if they pass validation (both
// consensus validity and pool restrictions).
func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
Expand Down
9 changes: 6 additions & 3 deletions core/txpool/bundlepool/bundlepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"container/heap"
"context"
"errors"
mapset "github.com/deckarep/golang-set/v2"
"github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/rpc"
"math/big"
"sync"
"time"

mapset "github.com/deckarep/golang-set/v2"
"github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/rpc"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
Expand Down Expand Up @@ -289,6 +290,8 @@ func (p *BundlePool) Get(hash common.Hash) *types.Transaction {
return nil
}

func (p *BundlePool) PreDropPending(txs []*types.Transaction) {}

// Add enqueues a batch of transactions into the pool if they are valid. Due
// to the large transaction churn, add may postpone fully integrating the tx
// to a later point to batch multiple ones together.
Expand Down
64 changes: 48 additions & 16 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ var (
reorgCount = metrics.NewRegisteredCounter("txpool/reorg/count", nil)
resetCount = metrics.NewRegisteredCounter("txpool/reorg/reset/count", nil)

loopReportTimer = metrics.NewRegisteredTimer("txpool/loop/report", nil)
loopReportTimer = metrics.NewRegisteredTimer("txpool/loop/report", nil)
loopPredropPendingTimer = metrics.NewRegisteredTimer("txpool/loop/predroppending", nil)
)

// BlockChain defines the minimal set of methods needed to back a tx pool with
Expand Down Expand Up @@ -321,6 +322,8 @@ type LegacyPool struct {
changesSinceReorg int // A counter for how many drops we've performed in-between reorg.

l1CostFn txpool.L1CostFunc // To apply L1 costs as rollup, optional field, may be nil.

preDropPendingCh chan []*types.Transaction
}

type txpoolResetRequest struct {
Expand All @@ -335,21 +338,22 @@ func New(config Config, chain BlockChain) *LegacyPool {

// Create the transaction pool with its initial settings
pool := &LegacyPool{
config: config,
chain: chain,
chainconfig: chain.Config(),
signer: types.LatestSigner(chain.Config()),
pending: make(map[common.Address]*list, config.GlobalSlots),
queue: make(map[common.Address]*list, config.GlobalQueue),
beats: make(map[common.Address]time.Time),
all: newLookup(),
reqResetCh: make(chan *txpoolResetRequest),
reqPromoteCh: make(chan *accountSet),
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
pendingCache: newCacheForMiner(),
config: config,
chain: chain,
chainconfig: chain.Config(),
signer: types.LatestSigner(chain.Config()),
pending: make(map[common.Address]*list, config.GlobalSlots),
queue: make(map[common.Address]*list, config.GlobalQueue),
beats: make(map[common.Address]time.Time),
all: newLookup(),
reqResetCh: make(chan *txpoolResetRequest),
reqPromoteCh: make(chan *accountSet),
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
pendingCache: newCacheForMiner(),
preDropPendingCh: make(chan []*types.Transaction, 10),
}
if !config.EnableCache {
pool.pendingCache = newNoneCacheForMiner(pool)
Expand Down Expand Up @@ -451,6 +455,30 @@ func (pool *LegacyPool) loop() {
close(pool.initDoneCh)
for {
select {
case txs := <-pool.preDropPendingCh:
pool.mu.Lock()
start := time.Now()
dropCount := 0
for _, tx := range txs {
hash := tx.Hash()
existTx := pool.all.Get(hash)
if existTx == nil {
continue
}
addr, _ := types.Sender(pool.signer, tx)
if list := pool.pending[addr]; list != nil {
if removed := list.SimpleRemove(tx); removed {
pool.all.Remove(hash)
pool.priced.Removed(1)
pendingGauge.Dec(int64(1))
pool.pendingCounter -= 1
dropCount++
}
}
}
log.Info("txpool-trace pre drop pends", "dropped", dropCount, "remaining", pool.pendingCounter)
loopPredropPendingTimer.UpdateSince(start)
pool.mu.Unlock()
// Handle pool shutdown
case <-pool.reorgShutdownCh:
return
Expand Down Expand Up @@ -535,6 +563,10 @@ func (pool *LegacyPool) loop() {
}
}

func (p *LegacyPool) PreDropPending(txs []*types.Transaction) {
p.preDropPendingCh <- txs
}

// Close terminates the transaction pool.
func (pool *LegacyPool) Close() error {
// Terminate the pool reorger and return
Expand Down
10 changes: 10 additions & 0 deletions core/txpool/legacypool/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,16 @@ func (l *list) Remove(tx *types.Transaction) (bool, types.Transactions) {
return true, nil
}

func (l *list) SimpleRemove(tx *types.Transaction) bool {
// Remove the transaction from the set
nonce := tx.Nonce()
if removed := l.txs.Remove(nonce); !removed {
return false
}
l.subTotalCost([]*types.Transaction{tx})
return true
}

// Ready retrieves a sequentially increasing list of transactions starting at the
// provided nonce that is ready for processing. The returned transactions will be
// removed from the list.
Expand Down
2 changes: 2 additions & 0 deletions core/txpool/subpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ type SubPool interface {
// Status returns the known status (unknown/pending/queued) of a transaction
// identified by their hashes.
Status(hash common.Hash) TxStatus

PreDropPending(txs []*types.Transaction)
}

type BundleSubpool interface {
Expand Down
6 changes: 6 additions & 0 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,12 @@ func (p *TxPool) loop(head *types.Header, chain BlockChain) {
errc <- nil
}

func (p *TxPool) PreDropPending(txs []*types.Transaction) {
for _, subpool := range p.subpools {
subpool.PreDropPending(txs)
}
}

// SetGasTip updates the minimum gas tip required by the transaction pool for a
// new transaction, and drops all transactions below this threshold.
func (p *TxPool) SetGasTip(tip *big.Int) {
Expand Down
3 changes: 3 additions & 0 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,9 @@ func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashe
}
log.Info("perf-trace txpool-trace newPayload ExecutableDataToBlock", "duration", common.PrettyDuration(time.Since(start)), "hash", params.BlockHash, "number", params.Number)

txpool := api.eth.TxPool()
txpool.PreDropPending(block.Transactions())

// Stash away the last update to warn the user if the beacon client goes offline
api.lastNewPayloadLock.Lock()
api.lastNewPayloadUpdate = time.Now()
Expand Down

0 comments on commit c234e53

Please sign in to comment.