Skip to content

Commit

Permalink
Upgrade txpool module to increase tps (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
mask-pp authored Mar 21, 2022
1 parent 13ea5c2 commit 09a31cc
Showing 1 changed file with 35 additions and 10 deletions.
45 changes: 35 additions & 10 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,19 @@ var (
reheapTimer = metrics.NewRegisteredTimer("txpool/reheap", nil)
)

var (
addrsPool = sync.Pool{
New: func() interface{} {
return make([]common.Address, 0, 8)
},
}
addrBeatPool = sync.Pool{
New: func() interface{} {
return make(addressesByHeartbeat, 0, 8)
},
}
)

// TxStatus is the current status of a transaction as seen by the pool.
type TxStatus uint

Expand Down Expand Up @@ -267,6 +280,8 @@ type TxPool struct {
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)

spammers *prque.Prque

changesSinceReorg int // A counter for how many drops we've performed in-between reorg.
}

Expand Down Expand Up @@ -298,6 +313,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
spammers: prque.New(nil),
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
Expand Down Expand Up @@ -1166,13 +1182,14 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt
}
}
// Reset needs promote for all addresses
promoteAddrs = make([]common.Address, 0, len(pool.queue))
promoteAddrs = addrsPool.Get().([]common.Address)
for addr := range pool.queue {
promoteAddrs = append(promoteAddrs, addr)
}
}
// Check for pending transactions for every account that sent new ones
promoted := pool.promoteExecutables(promoteAddrs)
defer addrsPool.Put(promoteAddrs[:0])

// If a new block appeared, validate the pool of pending transactions. This will
// remove any transaction that has been included in the block or was invalidated
Expand Down Expand Up @@ -1387,18 +1404,19 @@ func (pool *TxPool) truncatePending() {

pendingBeforeCap := pending
// Assemble a spam order to penalize large transactors first
spammers := prque.New(nil)
pool.spammers.Reset()
for addr, list := range pool.pending {
// Only evict transactions from high rollers
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
spammers.Push(addr, int64(list.Len()))
pool.spammers.Push(addr, int64(list.Len()))
}
}
// Gradually drop transactions from offenders
offenders := []common.Address{}
for pending > pool.config.GlobalSlots && !spammers.Empty() {
offenders := addrsPool.Get().([]common.Address)
defer addrsPool.Put(offenders[:0])
for pending > pool.config.GlobalSlots && !pool.spammers.Empty() {
// Retrieve the next offender if not local address
offender, _ := spammers.Pop()
offender, _ := pool.spammers.Pop()
offenders = append(offenders, offender.(common.Address))

// Equalize balances until all the same or below threshold
Expand Down Expand Up @@ -1471,7 +1489,8 @@ func (pool *TxPool) truncateQueue() {
}

// Sort all accounts with queued transactions by heartbeat
addresses := make(addressesByHeartbeat, 0, len(pool.queue))
addresses := addrBeatPool.Get().(addressesByHeartbeat)
defer addrBeatPool.Put(addresses[:0])
for addr := range pool.queue {
if !pool.locals.contains(addr) { // don't drop locals
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
Expand Down Expand Up @@ -1620,7 +1639,10 @@ func (as *accountSet) containsTx(tx *types.Transaction) bool {
// add inserts a new address into the set to track.
func (as *accountSet) add(addr common.Address) {
as.accounts[addr] = struct{}{}
as.cache = nil
if as.cache != nil {
addrsPool.Put((*as.cache)[:0])
as.cache = nil
}
}

// addTx adds the sender of tx into the set.
Expand All @@ -1634,7 +1656,7 @@ func (as *accountSet) addTx(tx *types.Transaction) {
// reuse. The returned slice should not be changed!
func (as *accountSet) flatten() []common.Address {
if as.cache == nil {
accounts := make([]common.Address, 0, len(as.accounts))
accounts := addrsPool.Get().([]common.Address)
for account := range as.accounts {
accounts = append(accounts, account)
}
Expand All @@ -1648,7 +1670,10 @@ func (as *accountSet) merge(other *accountSet) {
for addr := range other.accounts {
as.accounts[addr] = struct{}{}
}
as.cache = nil
if as.cache != nil {
addrsPool.Put((*as.cache)[:0])
as.cache = nil
}
}

// txLookup is used internally by TxPool to track transactions while allowing
Expand Down

0 comments on commit 09a31cc

Please sign in to comment.