diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 4caa3e0b2a..7a57b21783 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -140,6 +140,10 @@ var ( addBlockingDurationTimer = metrics.NewRegisteredTimer("txpool/add/blocking/duration", nil) addLockedDurationTimer = metrics.NewRegisteredTimer("txpool/add/locked/duration", nil) addValidateDurationTimer = metrics.NewRegisteredTimer("txpool/add/validate/duration", nil) + // demote metrics + // demoteDuration measures how long time a demotion takes. + demoteTxMeter = metrics.NewRegisteredMeter("txpool/demote/tx", nil) + resetDepthMeter = metrics.NewRegisteredMeter("txpool/reset/depth", nil) //reorg depth of blocks which causes demote ) // BlockChain defines the minimal set of methods needed to back a tx pool with @@ -1399,7 +1403,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, }(time.Now()) defer close(done) - var promoteAddrs []common.Address + var promoteAddrs, demoteAddrs []common.Address if dirtyAccounts != nil && reset == nil { // Only dirty accounts need to be promoted, unless we're resetting. // For resets, all addresses in the tx queue will be promoted and @@ -1410,7 +1414,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, treset := time.Now() if reset != nil { // Reset from the old head to the new, rescheduling any reorged transactions - pool.reset(reset.oldHead, reset.newHead) + demoteAddrs = pool.reset(reset.oldHead, reset.newHead) // Nonces were reset, discard any events that became stale for addr := range events { @@ -1431,9 +1435,9 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, // If a new block appeared, validate the pool of pending transactions. This will // remove any transaction that has been included in the block or was invalidated // because of another transaction (e.g. higher gas price). + var t0 = time.Now() if reset != nil { - t0 := time.Now() - pool.demoteUnexecutables() + pool.demoteUnexecutables(demoteAddrs) demoteDurationTimer.Update(time.Since(t0)) var pendingBaseFee = pool.priced.urgent.baseFee if reset.newHead != nil { @@ -1454,7 +1458,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, pool.pendingNonces.setAll(nonces) } // Ensure pool.queue and pool.pending sizes stay within the configured limits. - t0 := time.Now() + t0 = time.Now() pool.truncatePending() pool.truncateQueue() truncateDurationTimer.Update(time.Since(t0)) @@ -1485,16 +1489,37 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, // reset retrieves the current state of the blockchain and ensures the content // of the transaction pool is valid with regard to the chain state. -func (pool *LegacyPool) reset(oldHead, newHead *types.Header) { +func (pool *LegacyPool) reset(oldHead, newHead *types.Header) (demoteAddrs []common.Address) { // If we're reorging an old state, reinject all dropped transactions var reinject types.Transactions + // collect demote addresses + var collectAddr = func(txs types.Transactions) { + addrs := make(map[common.Address]struct{}) + for _, tx := range txs { + addr, err := types.Sender(pool.signer, tx) + //it might come from other pool, by other signer + if err != nil { + continue + } + if !pool.Filter(tx) { + continue + } + addrs[addr] = struct{}{} + } + demoteAddrs = make([]common.Address, 0, len(addrs)) + for addr := range addrs { + demoteAddrs = append(demoteAddrs, addr) + } + } + + var depth uint64 = 1 if oldHead != nil && oldHead.Hash() != newHead.ParentHash { // If the reorg is too deep, avoid doing it (will happen during fast sync) oldNum := oldHead.Number.Uint64() newNum := newHead.Number.Uint64() - if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 { + if depth = uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 { log.Debug("Skipping deep transaction reorg", "depth", depth) } else { // Reorg seems shallow enough to pull in all transactions into memory @@ -1560,9 +1585,18 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) { } } reinject = lost + + collectAddr(append(discarded, included...)) } } + } else if newHead != nil && oldHead.Hash() == newHead.ParentHash { + block := pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64()) + if block != nil { + collectAddr(block.Transactions()) + } } + resetDepthMeter.Mark(int64(depth)) + log.Info("reset block depth", "depth", depth) // Initialize the internal state to the current head if newHead == nil { newHead = pool.chain.CurrentBlock() // Special case during testing @@ -1587,6 +1621,7 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) { pool.mu.Unlock() pool.addTxsLocked(reinject, false) pool.mu.Lock() + return } // promoteExecutables moves transactions that have become processable from the @@ -1811,12 +1846,24 @@ func (pool *LegacyPool) truncateQueue() { // Note: transactions are not marked as removed in the priced list because re-heaping // is always explicitly triggered by SetBaseFee and it would be unnecessary and wasteful // to trigger a re-heap is this function -func (pool *LegacyPool) demoteUnexecutables() { +func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) { + if demoteAddrs == nil { + demoteAddrs = make([]common.Address, 0, len(pool.pending)) + for addr := range pool.pending { + demoteAddrs = append(demoteAddrs, addr) + } + } + demoteTxMeter.Mark(int64(len(demoteAddrs))) + // Iterate over all accounts and demote any non-executable transactions gasLimit := txpool.EffectiveGasLimit(pool.chainconfig, pool.currentHead.Load().GasLimit) - for addr, list := range pool.pending { - var dropPendingCache []*types.Transaction + for _, addr := range demoteAddrs { + list := pool.pending[addr] + if list == nil { + continue + } nonce := pool.currentState.GetNonce(addr) + var dropPendingCache []*types.Transaction // Drop all transactions that are deemed too old (low nonce) olds := list.Forward(nonce) diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index fb2a77d559..283bddff14 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -2567,7 +2567,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) { // Benchmark the speed of pool validation b.ResetTimer() for i := 0; i < b.N; i++ { - pool.demoteUnexecutables() + pool.demoteUnexecutables(nil) } }