Skip to content

Commit

Permalink
add througput metrics for txpool
Browse files Browse the repository at this point in the history
add unpromoted metric
add reorg metrics
add Add() metrics
  • Loading branch information
andyzhang2023 committed Nov 25, 2024
1 parent ec9393c commit 2eead06
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 13 deletions.
127 changes: 114 additions & 13 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,13 @@ var (
validateBasicTimer = metrics.NewRegisteredTimer("txpool/validate/basic", nil)
requestPromoteTimer = metrics.NewRegisteredTimer("txpool/request/promote", nil)

addWaitTimer = metrics.NewRegisteredTimer("txpool/add/wait/time", nil)
addExecTimer = metrics.NewRegisteredTimer("txpool/add/exec/time", nil)

// reorg detail metrics
resetTimer = metrics.NewRegisteredTimer("txpool/resettime", nil)
promoteTimer = metrics.NewRegisteredTimer("txpool/promotetime", nil)
feedTimer = metrics.NewRegisteredTimer("txpool/feedtime", nil)
demoteTimer = metrics.NewRegisteredTimer("txpool/demotetime", nil)
reorgresetTimer = metrics.NewRegisteredTimer("txpool/reorgresettime", nil)
truncateTimer = metrics.NewRegisteredTimer("txpool/truncatetime", nil)
Expand All @@ -139,6 +143,16 @@ var (
// latency of accessing state objects
accountSnapReadsTimer = metrics.NewRegisteredTimer("txpool/account/snap/readtime", nil)
accountTrieReadsTimer = metrics.NewRegisteredTimer("txpool/account/trie/readtime", nil)

reorgWaitTimer = metrics.NewRegisteredTimer("txpool/reorg/wait/time", nil)
reorgExecTimer = metrics.NewRegisteredTimer("txpool/reorg/exec/time", nil)
metricsPeriodTimer = metrics.NewRegisteredTimer("txpool/metrics/period", nil)

//throughput metrics
addAvgTimer = metrics.NewRegisteredTimer("txpool/avg/add", nil)
queue2pendingAvgTimer = metrics.NewRegisteredTimer("txpool/avg/queue2pending", nil)
pending2p2pAvgTimer = metrics.NewRegisteredTimer("txpool/avg/pending2p2p", nil)
demoteAvgTimer = metrics.NewRegisteredTimer("txpool/avg/demote", nil)
)

// BlockChain defines the minimal set of methods needed to back a tx pool with
Expand Down Expand Up @@ -253,6 +267,20 @@ func (config *Config) sanitize() Config {
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type LegacyPool struct {
througPuts struct {
add throughput
queueToPending throughput
pendingToP2P throughput
demote throughput
reorgWait throughput
reorgExec throughput
addWait throughput
addExec throughput
}
metrics struct {
unpromoted int64
}

config Config
chainconfig *params.ChainConfig
chain BlockChain
Expand Down Expand Up @@ -291,6 +319,41 @@ type LegacyPool struct {
l1CostFn txpool.L1CostFunc // To apply L1 costs as rollup, optional field, may be nil.
}

func (pool *LegacyPool) reportMetrics(oldHead, newHead *types.Header) {
now := time.Now()
addAvg, addDur, addCost, addCount, addTps := pool.througPuts.add.avgAndRest(now)
queue2pendingAvg, q2pDur, q2pCost, q2pCount, q2pTps := pool.througPuts.queueToPending.avgAndRest(now)
pending2P2PAvg, p2ppDur, p2ppCost, p2ppCount, p2ppTps := pool.througPuts.pendingToP2P.avgAndRest(now)
demoteAvg, demDur, demCost, demCount, demTps := pool.througPuts.demote.avgAndRest(now)
_, metricsPeriod, reorgExec, _, _ := pool.througPuts.reorgExec.avgAndRest(now)
_, _, reorgWait, _, _ := pool.througPuts.reorgWait.avgAndRest(now)
_, _, addExec, _, _ := pool.througPuts.addExec.avgAndRest(now)
_, _, addWait, _, _ := pool.througPuts.addWait.avgAndRest(now)

unpromoted := atomic.LoadInt64(&pool.metrics.unpromoted)

// reset metrics
pool.resetMetrics()

metricsPeriodTimer.Update(metricsPeriod)

addAvgTimer.Update(addAvg)
queue2pendingAvgTimer.Update(queue2pendingAvg)
pending2p2pAvgTimer.Update(pending2P2PAvg)
demoteAvgTimer.Update(demoteAvg)

if oldHead != nil && newHead != nil {
log.Info("Transaction pool reorged", "from", oldHead.Number.Uint64(), "to", newHead.Number.Uint64(),
"addAvg", addAvg, "addDur", addDur, "addCost", addCost, "addCount", addCount, "addTps", addTps,
"queue2pendingAvg", queue2pendingAvg, "queue2pandingDur", q2pDur, "queue2pendingCost", q2pCost, "queue2pendingCount", q2pCount, "queue2pendingTps", q2pTps,
"pending2P2PAvg", pending2P2PAvg, "pending2P2PDur", p2ppDur, "pending2P2PCost", p2ppCost, "pending2P2PCount", p2ppCount, "pending2P2PTps", p2ppTps,
"demoteAvg", demoteAvg, "demoteDur", demDur, "demoteCost", demCost, "demoteCount", demCount, "demoteTps", demTps,
"unpromoted", unpromoted, "promoted", q2pCount, "demoted", demCount,
"reorgExec", reorgExec, "reorgWait", reorgWait, "addExec", addExec, "addWait", addWait,
)
}
}

type txpoolResetRequest struct {
oldHead, newHead *types.Header
}
Expand Down Expand Up @@ -330,6 +393,16 @@ func New(config Config, chain BlockChain) *LegacyPool {
if (!config.NoLocals || config.JournalRemote) && config.Journal != "" {
pool.journal = newTxJournal(config.Journal)
}
// reset metrics
pool.througPuts.add.init(addTimer)
pool.througPuts.queueToPending.init(promoteTimer)
pool.througPuts.pendingToP2P.init(feedTimer)
pool.througPuts.demote.init(demoteTimer)
pool.througPuts.addWait.init(addWaitTimer)
pool.througPuts.addExec.init(addExecTimer)
pool.througPuts.reorgWait.init(reorgWaitTimer)
pool.througPuts.reorgExec.init(reorgExecTimer)

return pool
}

Expand Down Expand Up @@ -1166,14 +1239,16 @@ func (pool *LegacyPool) addRemoteSync(tx *types.Transaction) error {
// to the add is finished. Only use this during tests for determinism!
func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error {
start := time.Now()
var durationValidate, durationPromote time.Duration
defer func() {
var addWait, durationValidate, durationPromote time.Duration
defer func(t0 time.Time) {
pool.througPuts.addWait.mark(addWait, 1)
pool.througPuts.addExec.mark(time.Since(t0)-addWait, 1)
if len(txs) > 0 {
addTimer.Update(time.Since(start) / time.Duration(len(txs)))
pool.througPuts.add.mark(time.Since(start), len(txs))
validateBasicTimer.Update(durationValidate / time.Duration(len(txs)))
requestPromoteTimer.Update(durationPromote / time.Duration(len(txs)))
}
}()
}(time.Now())
// Do not treat as local if local transactions have been disabled
local = local && !pool.config.NoLocals

Expand Down Expand Up @@ -1207,14 +1282,18 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error

// Process all the new transaction and merge any errors into the original slice
durationValidate = time.Since(start)
tw := time.Now()
pool.mu.Lock()
t0 := time.Now()
addWait = t0.Sub(tw)
newErrs, dirtyAddrs := pool.addTxsLocked(news, local)
if len(news) > 0 {
addWithLockTimer.Update(time.Since(t0) / time.Duration(len(news)))
}
tw = time.Now()
pool.mu.Unlock()
t0 = time.Now()
addWait += t0.Sub(tw)

var nilSlot = 0
for _, err := range newErrs {
Expand Down Expand Up @@ -1464,14 +1543,25 @@ func (pool *LegacyPool) scheduleReorgLoop() {
}
}

func (pool *LegacyPool) resetMetrics() {
atomic.StoreInt64(&pool.metrics.unpromoted, 0)
}

// runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop.
func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*sortedMap) {
var promoted []*types.Transaction
var unpromoted int
var reorgWait time.Duration
defer func(t0 time.Time) {
reorgDurationTimer.Update(time.Since(t0))
atomic.AddInt64(&pool.metrics.unpromoted, int64(unpromoted))
reorgTotalCost := time.Since(t0)
reorgDurationTimer.Update(reorgTotalCost)
pool.througPuts.reorgWait.mark(reorgWait, 1)
pool.througPuts.reorgExec.mark(reorgTotalCost-reorgWait, 1)
if reset != nil {
reorgresetTimer.UpdateSince(t0)
if reset.newHead != nil {
log.Info("Transaction pool reorged", "from", reset.oldHead.Number.Uint64(), "to", reset.newHead.Number.Uint64())
pool.reportMetrics(reset.oldHead, reset.newHead)
}
}
}(time.Now())
Expand All @@ -1484,7 +1574,9 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
// the flatten operation can be avoided.
promoteAddrs = dirtyAccounts.flatten()
}
tw := time.Now()
pool.mu.Lock()
reorgWait = time.Since(tw)
tl, t0 := time.Now(), time.Now()
if reset != nil {
if pool.currentState != nil && metrics.EnabledExpensive {
Expand All @@ -1510,16 +1602,16 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
}
// Check for pending transactions for every account that sent new ones
t0 = time.Now()
promoted := pool.promoteExecutables(promoteAddrs)
promoteTimer.UpdateSince(t0)
promoted, unpromoted = pool.promoteExecutables(promoteAddrs)
pool.througPuts.queueToPending.mark(time.Since(t0), len(promoted))

// If a new block appeared, validate the pool of pending transactions. This will
// remove any transaction that has been included in the block or was invalidated
// because of another transaction (e.g. higher gas price).
t0 = time.Now()
if reset != nil {
pool.demoteUnexecutables(demoteAddrs)
demoteTimer.UpdateSince(t0)
demoted := pool.demoteUnexecutables(demoteAddrs)
pool.througPuts.demote.mark(time.Since(t0), demoted)
var pendingBaseFee = pool.priced.urgent.baseFee
if reset.newHead != nil {
if pool.chainconfig.IsLondon(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) {
Expand All @@ -1546,7 +1638,9 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
if reset != nil {
reorgresetNoblockingTimer.UpdateSince(tl)
}
tw = time.Now()
pool.mu.Unlock()
reorgWait += time.Since(tw)

// Notify subsystems for newly added transactions
for _, tx := range promoted {
Expand All @@ -1561,7 +1655,9 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
for _, set := range events {
txs = append(txs, set.Flatten()...)
}
tfeed := time.Now()
pool.txFeed.Send(core.NewTxsEvent{Txs: txs})
pool.througPuts.pendingToP2P.mark(time.Since(tfeed), len(txs))
}
}

Expand Down Expand Up @@ -1726,9 +1822,10 @@ func (pool *LegacyPool) reduceBalanceByL1Cost(list *list, balance *uint256.Int)
// promoteExecutables moves transactions that have become processable from the
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.Transaction {
func (pool *LegacyPool) promoteExecutables(accounts []common.Address) ([]*types.Transaction, int) {
// Track the promoted transactions to broadcast them at once
var promoted []*types.Transaction
var unpromoted int

// Iterate over all accounts and promote any executable transactions
gasLimit := txpool.EffectiveGasLimit(pool.chainconfig, pool.currentHead.Load().GasLimit, pool.config.EffectiveGasCeil)
Expand All @@ -1737,6 +1834,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
if list == nil {
continue // Just in case someone calls with a non existing account
}
all := list.Len()
// Drop all transactions that are deemed too old (low nonce)
forwards := list.Forward(pool.currentState.GetNonce(addr))
for _, tx := range forwards {
Expand Down Expand Up @@ -1781,6 +1879,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
// Mark all the items dropped as removed
pool.priced.Removed(len(forwards) + len(drops) + len(caps))
queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
all -= len(drops) + len(drops) + len(caps)
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
}
Expand All @@ -1792,8 +1891,9 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
pool.reserve(addr, false)
}
}
unpromoted += all
}
return promoted
return promoted, unpromoted
}

// truncatePending removes transactions from the pending queue if the pool is above the
Expand Down Expand Up @@ -1939,7 +2039,7 @@ func (pool *LegacyPool) truncateQueue() {
// Note: transactions are not marked as removed in the priced list because re-heaping
// is always explicitly triggered by SetBaseFee and it would be unnecessary and wasteful
// to trigger a re-heap is this function
func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) {
func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) int {
if demoteAddrs == nil {
demoteAddrs = make([]common.Address, 0, len(pool.pending))
for addr := range pool.pending {
Expand Down Expand Up @@ -2015,6 +2115,7 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) {
removed += len(dropPendingCache)
}
pool.priced.Removed(removed)
return removed
}

// addressByHeartbeat is an account address tagged with its last activity timestamp.
Expand Down
55 changes: 55 additions & 0 deletions core/txpool/legacypool/legacypool_throughput.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package legacypool

import (
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/metrics"
)

// throughput is a struct that holds the throughput metrics of the transaction pool.
// it is used at all key points to measure the throughput of the whole path where a transaction comes into the pool.
type throughput struct {
costTimer metrics.Timer
lastReset time.Time
cost int64 // cost in nanoseconds, need to be handled with atomic, so let's use int64
counter int64
}

func (t *throughput) init(timer metrics.Timer) {
t.costTimer = timer
t.lastReset = time.Now()
}

func (t *throughput) mark(cost time.Duration, count int) {
atomic.AddInt64(&t.cost, int64(cost))
atomic.AddInt64(&t.counter, int64(count))
}

// avgAndReset returns the average nanoseconds of a transaction that it takes to go through the path.
// it's not accurate, but it's good enough to give a rough idea of the throughput.
// metrics data will be reset after this call.
func (t *throughput) avgAndRest(now time.Time) (avgCost time.Duration, duration time.Duration, totalCost time.Duration, count int64, tps int64) {
totalCostI64 := atomic.LoadInt64(&t.cost)
if t.lastReset.IsZero() {
duration = time.Duration(0)
} else {
duration = now.Sub(t.lastReset)
}
count = atomic.LoadInt64(&t.counter)
totalCost = time.Duration(totalCostI64)
t.costTimer.Update(totalCost)

atomic.StoreInt64(&t.cost, 0)
atomic.StoreInt64(&t.counter, 0)
t.lastReset = now
if count == 0 {
avgCost = 0
} else {
avgCost = time.Duration(totalCostI64 / count)
}

tpsF64 := float64(time.Second) / float64(totalCostI64) * float64(count)
tps = int64(tpsF64)
return
}
47 changes: 47 additions & 0 deletions core/txpool/legacypool/legacypool_throughput_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package legacypool

import (
"testing"
"time"
)

func TestThroughputAvg(t *testing.T) {
tp := throughput{
lastReset: time.Now(),
cost: 0,
counter: 0,
}
tp.mark(200*time.Millisecond, 4)
tp.mark(50*time.Millisecond, 1)
avg, _, total, count, tps := tp.avgAndRest(tp.lastReset.Add(500 * time.Millisecond))
if avg != 50*time.Millisecond {
t.Errorf("expected avg to be 50ms, got %v", avg)
}
if total != 250*time.Millisecond {
t.Errorf("expected total to be 250ms, got %v", total)
}
if count != 5 {
t.Errorf("expected count to be 5, got %v", count)
}
if tps != 20 {
t.Errorf("expected tps to be 20, got %v", tps)
}

tp = throughput{}
tp.lastReset = time.Now()
tp.mark(200*time.Millisecond, 0)
avg, _, total, count, tps = tp.avgAndRest(tp.lastReset.Add(500 * time.Millisecond))
if avg != 0 {
t.Errorf("expected avg to be 0, got %v", avg)
}
if total != 200*time.Millisecond {
t.Errorf("expected total to be 200ms, got %v", total)
}
if count != 0 {
t.Errorf("expected count to be 0, got %v", count)
}
if tps != 0 {
t.Errorf("expected tps to be 0, got %v", tps)
}

}

0 comments on commit 2eead06

Please sign in to comment.