Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

revert: txpool pending cache optimization #11

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 5 additions & 50 deletions core/txpool/legacypool/cache_for_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ import (
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/metrics"
"github.com/holiman/uint256"
)

var (
Expand All @@ -22,18 +20,12 @@ type cacheForMiner struct {
pending map[common.Address]map[*types.Transaction]struct{}
locals map[common.Address]bool
addrLock sync.Mutex

allCache map[common.Address][]*txpool.LazyTransaction
filteredCache map[common.Address][]*txpool.LazyTransaction
cacheLock sync.Mutex
}

func newCacheForMiner() *cacheForMiner {
return &cacheForMiner{
pending: make(map[common.Address]map[*types.Transaction]struct{}),
locals: make(map[common.Address]bool),
allCache: make(map[common.Address][]*txpool.LazyTransaction),
filteredCache: make(map[common.Address][]*txpool.LazyTransaction),
pending: make(map[common.Address]map[*types.Transaction]struct{}),
locals: make(map[common.Address]bool),
}
}

Expand Down Expand Up @@ -75,9 +67,8 @@ func (pc *cacheForMiner) del(txs types.Transactions, signer types.Signer) {
}
}

func (pc *cacheForMiner) sync2cache(pool txpool.LazyResolver, filter func(txs types.Transactions, addr common.Address) types.Transactions) {
func (pc *cacheForMiner) dump() map[common.Address]types.Transactions {
pending := make(map[common.Address]types.Transactions)

pc.txLock.Lock()
for addr, txlist := range pc.pending {
pending[addr] = make(types.Transactions, 0, len(txlist))
Expand All @@ -86,46 +77,10 @@ func (pc *cacheForMiner) sync2cache(pool txpool.LazyResolver, filter func(txs ty
}
}
pc.txLock.Unlock()

// convert pending to lazyTransactions
filteredLazy := make(map[common.Address][]*txpool.LazyTransaction)
allLazy := make(map[common.Address][]*txpool.LazyTransaction)
for addr, txs := range pending {
for _, txs := range pending {
// sorted by nonce
sort.Sort(types.TxByNonce(txs))
filterd := filter(txs, addr)
if len(txs) > 0 {
lazies := make([]*txpool.LazyTransaction, len(txs))
for i, tx := range txs {
lazies[i] = &txpool.LazyTransaction{
Pool: pool,
Hash: tx.Hash(),
Tx: tx,
Time: tx.Time(),
GasFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
GasTipCap: uint256.MustFromBig(tx.GasTipCap()),
Gas: tx.Gas(),
BlobGas: tx.BlobGas(),
}
}
allLazy[addr] = lazies
filteredLazy[addr] = lazies[:len(filterd)]
}
}

pc.cacheLock.Lock()
pc.filteredCache = filteredLazy
pc.allCache = allLazy
pc.cacheLock.Unlock()
}

func (pc *cacheForMiner) dump(filtered bool) map[common.Address][]*txpool.LazyTransaction {
pc.cacheLock.Lock()
pending := pc.allCache
if filtered {
pending = pc.filteredCache
}
pc.cacheLock.Unlock()
return pending
}

Expand All @@ -136,7 +91,7 @@ func (pc *cacheForMiner) markLocal(addr common.Address) {
pc.locals[addr] = true
}

func (pc *cacheForMiner) IsLocal(addr common.Address) bool {
func (pc *cacheForMiner) isLocal(addr common.Address) bool {
pc.addrLock.Lock()
defer pc.addrLock.Unlock()
return pc.locals[addr]
Expand Down
77 changes: 39 additions & 38 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,6 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A
// Set the basic pool parameters
pool.gasTip.Store(uint256.NewInt(gasTip))

// set dumper
pool.pendingCache.sync2cache(pool, pool.createFilter(pool.gasTip.Load().ToBig(), head.BaseFee))

// Initialize the state with head block, or fallback to empty one in
// case the head state is not available (might occur when node is not
// fully synced).
Expand Down Expand Up @@ -388,27 +385,9 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A
}
pool.wg.Add(1)
go pool.loop()
//go pool.loopOfSync()
return nil
}

func (pool *LegacyPool) loopOfSync() {
ticker := time.NewTicker(200 * time.Millisecond)
for {
select {
case <-pool.reorgShutdownCh:
return
case <-ticker.C:
gasTip := pool.gasTip.Load()
currHead := pool.currentHead.Load()
if gasTip == nil || currHead == nil {
continue
}
pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip.ToBig(), currHead.BaseFee))
}
}
}

// loop is the transaction pool's main event loop, waiting for and reacting to
// outside blockchain events as well as for various reporting and transaction
// eviction events.
Expand Down Expand Up @@ -647,11 +626,6 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
empty := txpool.PendingFilter{}
if filter == empty {
// return all pending transactions, no filtering
return pool.pendingCache.dump(false)
}
// If only blob transactions are requested, this pool is unsuitable as it
// contains none, don't even bother.
if filter.OnlyBlobTxs {
Expand All @@ -660,22 +634,53 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address]
defer func(t0 time.Time) {
getPendingDurationTimer.Update(time.Since(t0))
}(time.Now())
// It is a bit tricky here, we don't do the filtering here.
return pool.pendingCache.dump(true)
}
// If only blob transactions are requested, this pool is unsuitable as it
// contains none, don't even bother.
if filter.OnlyBlobTxs {
return nil
}

// Convert the new uint256.Int types to the old big.Int ones used by the legacy pool
var (
minTipBig *big.Int
baseFeeBig *big.Int
)
if filter.MinTip != nil {
minTipBig = filter.MinTip.ToBig()
}
if filter.BaseFee != nil {
baseFeeBig = filter.BaseFee.ToBig()
}
pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending))
for addr, txs := range pool.pendingCache.dump() {

func (pool *LegacyPool) createFilter(gasPrice, baseFee *big.Int) func(txs types.Transactions, addr common.Address) types.Transactions {
return func(txs types.Transactions, addr common.Address) types.Transactions {
if !pool.pendingCache.IsLocal(addr) {
// If the miner requests tip enforcement, cap the lists now
if minTipBig != nil && !pool.locals.contains(addr) {
for i, tx := range txs {
if tx.EffectiveGasTipIntCmp(gasPrice, baseFee) < 0 {
if tx.EffectiveGasTipIntCmp(minTipBig, baseFeeBig) < 0 {
txs = txs[:i]
break
}
}
}
return txs
if len(txs) > 0 {
lazies := make([]*txpool.LazyTransaction, len(txs))
for i := 0; i < len(txs); i++ {
lazies[i] = &txpool.LazyTransaction{
Pool: pool,
Hash: txs[i].Hash(),
Tx: txs[i],
Time: txs[i].Time(),
GasFeeCap: uint256.MustFromBig(txs[i].GasFeeCap()),
GasTipCap: uint256.MustFromBig(txs[i].GasTipCap()),
Gas: txs[i].Gas(),
BlobGas: txs[i].BlobGas(),
}
}
pending[addr] = lazies
}
}
return pending
}

// Locals retrieves the accounts currently considered local by the pool.
Expand Down Expand Up @@ -1478,10 +1483,6 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
pool.priced.SetBaseFee(pendingBaseFee)
}
}
gasTip, baseFee := pool.gasTip.Load(), pendingBaseFee
go func() {
pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip.ToBig(), baseFee))
}()
// Update all accounts to the latest known pending nonce
nonces := make(map[common.Address]uint64, len(pool.pending))
for addr, list := range pool.pending {
Expand Down
Loading