Skip to content

Commit

Permalink
Draft/txpool p2p parallel improve (#5)
Browse files Browse the repository at this point in the history
* make the number of p2p parallel enqueue routines fixable

* make the number of p2p parallel enqueue routines fixable

* fix issue of 'logging in init() function is not working'

* add detailed metrics for txpool.Add() function

---------

Co-authored-by: andyzhang2023 <[email protected]>
  • Loading branch information
2 people authored and bnoieh committed Oct 14, 2024
1 parent e31218e commit 69fa97d
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 5 deletions.
19 changes: 14 additions & 5 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,10 @@ var (
journalMutexTimer = metrics.NewRegisteredTimer("txpool/mutex/journal/duration", nil)

// latency of add() method
addTimer = metrics.NewRegisteredTimer("txpool/addtime", nil)
addWithLockTimer = metrics.NewRegisteredTimer("txpool/locked/addtime", nil)
addTimer = metrics.NewRegisteredTimer("txpool/addtime", nil)
addWithLockTimer = metrics.NewRegisteredTimer("txpool/locked/addtime", nil)
validateBasicTimer = metrics.NewRegisteredTimer("txpool/validate/basic", nil)
requestPromoteTimer = metrics.NewRegisteredTimer("txpool/request/promote", nil)

// reorg detail metrics
resetTimer = metrics.NewRegisteredTimer("txpool/resettime", nil)
Expand Down Expand Up @@ -1118,11 +1120,15 @@ func (pool *LegacyPool) addRemoteSync(tx *types.Transaction) error {
// If sync is set, the method will block until all internal maintenance related
// to the add is finished. Only use this during tests for determinism!
func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error {
defer func(t0 time.Time) {
start := time.Now()
var durationValidate, durationPromote time.Duration
defer func() {
if len(txs) > 0 {
addTimer.Update(time.Since(t0) / time.Duration(len(txs)))
addTimer.Update(time.Since(start) / time.Duration(len(txs)))
validateBasicTimer.Update(durationValidate / time.Duration(len(txs)))
requestPromoteTimer.Update(durationPromote / time.Duration(len(txs)))
}
}(time.Now())
}()
// Do not treat as local if local transactions have been disabled
local = local && !pool.config.NoLocals

Expand Down Expand Up @@ -1155,13 +1161,15 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error
}

// Process all the new transaction and merge any errors into the original slice
durationValidate = time.Since(start)
pool.mu.Lock()
t0 := time.Now()
newErrs, dirtyAddrs := pool.addTxsLocked(news, local)
if len(news) > 0 {
addWithLockTimer.Update(time.Since(t0) / time.Duration(len(news)))
}
pool.mu.Unlock()
t0 = time.Now()

var nilSlot = 0
for _, err := range newErrs {
Expand All @@ -1176,6 +1184,7 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error
if sync {
<-done
}
durationPromote = time.Since(t0)
return errs
}

Expand Down
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
if config.TxPool.Journal != "" {
config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal)
}
log.Info("P2P euqneue parallel thread number", "threadNum", TxQueueSize)
legacyPool := legacypool.New(config.TxPool, eth.blockchain)

txPools := []txpool.SubPool{legacyPool}
Expand Down

0 comments on commit 69fa97d

Please sign in to comment.