Skip to content

Commit

Permalink
merge demote
Browse files Browse the repository at this point in the history
  • Loading branch information
andyzhang2023 committed Apr 2, 2024
2 parents 53cabe1 + 4af5471 commit 90c30da
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 11 deletions.
67 changes: 57 additions & 10 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ var (
addBlockingDurationTimer = metrics.NewRegisteredTimer("txpool/add/blocking/duration", nil)
addLockedDurationTimer = metrics.NewRegisteredTimer("txpool/add/locked/duration", nil)
addValidateDurationTimer = metrics.NewRegisteredTimer("txpool/add/validate/duration", nil)
// demote metrics
// demoteDuration measures how long time a demotion takes.
demoteTxMeter = metrics.NewRegisteredMeter("txpool/demote/tx", nil)
resetDepthMeter = metrics.NewRegisteredMeter("txpool/reset/depth", nil) //reorg depth of blocks which causes demote
)

// BlockChain defines the minimal set of methods needed to back a tx pool with
Expand Down Expand Up @@ -1399,7 +1403,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
}(time.Now())
defer close(done)

var promoteAddrs []common.Address
var promoteAddrs, demoteAddrs []common.Address
if dirtyAccounts != nil && reset == nil {
// Only dirty accounts need to be promoted, unless we're resetting.
// For resets, all addresses in the tx queue will be promoted and
Expand All @@ -1410,7 +1414,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
treset := time.Now()
if reset != nil {
// Reset from the old head to the new, rescheduling any reorged transactions
pool.reset(reset.oldHead, reset.newHead)
demoteAddrs = pool.reset(reset.oldHead, reset.newHead)

// Nonces were reset, discard any events that became stale
for addr := range events {
Expand All @@ -1431,9 +1435,9 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
// If a new block appeared, validate the pool of pending transactions. This will
// remove any transaction that has been included in the block or was invalidated
// because of another transaction (e.g. higher gas price).
var t0 = time.Now()
if reset != nil {
t0 := time.Now()
pool.demoteUnexecutables()
pool.demoteUnexecutables(demoteAddrs)
demoteDurationTimer.Update(time.Since(t0))
var pendingBaseFee = pool.priced.urgent.baseFee
if reset.newHead != nil {
Expand All @@ -1454,7 +1458,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
pool.pendingNonces.setAll(nonces)
}
// Ensure pool.queue and pool.pending sizes stay within the configured limits.
t0 := time.Now()
t0 = time.Now()
pool.truncatePending()
pool.truncateQueue()
truncateDurationTimer.Update(time.Since(t0))
Expand Down Expand Up @@ -1485,16 +1489,37 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,

// reset retrieves the current state of the blockchain and ensures the content
// of the transaction pool is valid with regard to the chain state.
func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
func (pool *LegacyPool) reset(oldHead, newHead *types.Header) (demoteAddrs []common.Address) {
// If we're reorging an old state, reinject all dropped transactions
var reinject types.Transactions
// collect demote addresses
var collectAddr = func(txs types.Transactions) {
addrs := make(map[common.Address]struct{})
for _, tx := range txs {
addr, err := types.Sender(pool.signer, tx)
//it might come from other pool, by other signer
if err != nil {
continue
}
if !pool.Filter(tx) {
continue
}
addrs[addr] = struct{}{}
}
demoteAddrs = make([]common.Address, 0, len(addrs))
for addr := range addrs {
demoteAddrs = append(demoteAddrs, addr)
}
}

var depth uint64 = 1

if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
// If the reorg is too deep, avoid doing it (will happen during fast sync)
oldNum := oldHead.Number.Uint64()
newNum := newHead.Number.Uint64()

if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
if depth = uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
log.Debug("Skipping deep transaction reorg", "depth", depth)
} else {
// Reorg seems shallow enough to pull in all transactions into memory
Expand Down Expand Up @@ -1560,9 +1585,18 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
}
}
reinject = lost

collectAddr(append(discarded, included...))
}
}
} else if newHead != nil && oldHead.Hash() == newHead.ParentHash {
block := pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
if block != nil {
collectAddr(block.Transactions())
}
}
resetDepthMeter.Mark(int64(depth))
log.Info("reset block depth", "depth", depth)
// Initialize the internal state to the current head
if newHead == nil {
newHead = pool.chain.CurrentBlock() // Special case during testing
Expand All @@ -1587,6 +1621,7 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
pool.mu.Unlock()
pool.addTxsLocked(reinject, false)
pool.mu.Lock()
return
}

// promoteExecutables moves transactions that have become processable from the
Expand Down Expand Up @@ -1811,12 +1846,24 @@ func (pool *LegacyPool) truncateQueue() {
// Note: transactions are not marked as removed in the priced list because re-heaping
// is always explicitly triggered by SetBaseFee and it would be unnecessary and wasteful
// to trigger a re-heap is this function
func (pool *LegacyPool) demoteUnexecutables() {
func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) {
if demoteAddrs == nil {
demoteAddrs = make([]common.Address, 0, len(pool.pending))
for addr := range pool.pending {
demoteAddrs = append(demoteAddrs, addr)
}
}
demoteTxMeter.Mark(int64(len(demoteAddrs)))

// Iterate over all accounts and demote any non-executable transactions
gasLimit := txpool.EffectiveGasLimit(pool.chainconfig, pool.currentHead.Load().GasLimit)
for addr, list := range pool.pending {
var dropPendingCache []*types.Transaction
for _, addr := range demoteAddrs {
list := pool.pending[addr]
if list == nil {
continue
}
nonce := pool.currentState.GetNonce(addr)
var dropPendingCache []*types.Transaction

// Drop all transactions that are deemed too old (low nonce)
olds := list.Forward(nonce)
Expand Down
2 changes: 1 addition & 1 deletion core/txpool/legacypool/legacypool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2567,7 +2567,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) {
// Benchmark the speed of pool validation
b.ResetTimer()
for i := 0; i < b.N; i++ {
pool.demoteUnexecutables()
pool.demoteUnexecutables(nil)
}
}

Expand Down

0 comments on commit 90c30da

Please sign in to comment.