diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index da919be9d6..2a7e561671 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -106,6 +106,12 @@ var ( reheapTimer = metrics.NewRegisteredTimer("txpool/reheap", nil) staledMeter = metrics.NewRegisteredMeter("txpool/staled/count", nil) // staled transactions + + // demote metrics + // demoteDuration measures how long time a demotion takes. + demoteDurationTimer = metrics.NewRegisteredTimer("txpool/demote/duration", nil) + demoteTxMeter = metrics.NewRegisteredMeter("txpool/demote/tx", nil) + resetDepthMeter = metrics.NewRegisteredMeter("txpool/reset/depth", nil) //reorg depth of blocks which causes demote ) // BlockChain defines the minimal set of methods needed to back a tx pool with @@ -1334,7 +1340,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, }(time.Now()) defer close(done) - var promoteAddrs []common.Address + var promoteAddrs, demoteAddrs []common.Address if dirtyAccounts != nil && reset == nil { // Only dirty accounts need to be promoted, unless we're resetting. // For resets, all addresses in the tx queue will be promoted and @@ -1344,7 +1350,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, pool.mu.Lock() if reset != nil { // Reset from the old head to the new, rescheduling any reorged transactions - pool.reset(reset.oldHead, reset.newHead) + demoteAddrs = pool.reset(reset.oldHead, reset.newHead) // Nonces were reset, discard any events that became stale for addr := range events { @@ -1365,8 +1371,10 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, // If a new block appeared, validate the pool of pending transactions. This will // remove any transaction that has been included in the block or was invalidated // because of another transaction (e.g. higher gas price). + var t0 = time.Now() if reset != nil { - pool.demoteUnexecutables() + pool.demoteUnexecutables(demoteAddrs) + demoteDurationTimer.Update(time.Since(t0)) if reset.newHead != nil { if pool.chainconfig.IsLondon(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) { pendingBaseFee := eip1559.CalcBaseFee(pool.chainconfig, reset.newHead, reset.newHead.Time+1) @@ -1410,16 +1418,41 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, // reset retrieves the current state of the blockchain and ensures the content // of the transaction pool is valid with regard to the chain state. -func (pool *LegacyPool) reset(oldHead, newHead *types.Header) { +func (pool *LegacyPool) reset(oldHead, newHead *types.Header) (demoteAddrs []common.Address) { // If we're reorging an old state, reinject all dropped transactions var reinject types.Transactions + // collect demote addresses + var collectAddr = func(txs types.Transactions) { + addrs := make(map[common.Address]struct{}) + for _, tx := range txs { + if !pool.Filter(tx) { + continue + } + // it is heavy to get sender from tx, so we try to get it from the pool + if oldtx := pool.all.Get(tx.Hash()); oldtx != nil { + tx = oldtx + } + addr, err := types.Sender(pool.signer, tx) + //it might come from other pool, by other signer + if err != nil { + continue + } + addrs[addr] = struct{}{} + } + demoteAddrs = make([]common.Address, 0, len(addrs)) + for addr := range addrs { + demoteAddrs = append(demoteAddrs, addr) + } + } + + var depth uint64 = 1 if oldHead != nil && oldHead.Hash() != newHead.ParentHash { // If the reorg is too deep, avoid doing it (will happen during fast sync) oldNum := oldHead.Number.Uint64() newNum := newHead.Number.Uint64() - if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 { + if depth = uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 { log.Debug("Skipping deep transaction reorg", "depth", depth) } else { // Reorg seems shallow enough to pull in all transactions into memory @@ -1485,9 +1518,18 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) { } } reinject = lost + + collectAddr(append(discarded, included...)) } } + } else if newHead != nil && oldHead.Hash() == newHead.ParentHash { + block := pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64()) + if block != nil { + collectAddr(block.Transactions()) + } } + resetDepthMeter.Mark(int64(depth)) + log.Info("reset block depth", "depth", depth) // Initialize the internal state to the current head if newHead == nil { newHead = pool.chain.CurrentBlock() // Special case during testing @@ -1511,6 +1553,7 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) { log.Debug("Reinjecting stale transactions", "count", len(reinject)) core.SenderCacher.Recover(pool.signer, reinject) pool.addTxsLocked(reinject, false) + return } // promoteExecutables moves transactions that have become processable from the @@ -1731,10 +1774,22 @@ func (pool *LegacyPool) truncateQueue() { // Note: transactions are not marked as removed in the priced list because re-heaping // is always explicitly triggered by SetBaseFee and it would be unnecessary and wasteful // to trigger a re-heap is this function -func (pool *LegacyPool) demoteUnexecutables() { +func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) { + if demoteAddrs == nil { + demoteAddrs = make([]common.Address, 0, len(pool.pending)) + for addr := range pool.pending { + demoteAddrs = append(demoteAddrs, addr) + } + } + demoteTxMeter.Mark(int64(len(demoteAddrs))) + // Iterate over all accounts and demote any non-executable transactions gasLimit := txpool.EffectiveGasLimit(pool.chainconfig, pool.currentHead.Load().GasLimit) - for addr, list := range pool.pending { + for _, addr := range demoteAddrs { + list := pool.pending[addr] + if list == nil { + continue + } nonce := pool.currentState.GetNonce(addr) // Drop all transactions that are deemed too old (low nonce) diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index fb2a77d559..283bddff14 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -2567,7 +2567,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) { // Benchmark the speed of pool validation b.ResetTimer() for i := 0; i < b.N; i++ { - pool.demoteUnexecutables() + pool.demoteUnexecutables(nil) } }