From 2eead06343a104eca637dab29046cc4ae838d629 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Sun, 24 Nov 2024 23:11:39 +0800 Subject: [PATCH] add througput metrics for txpool add unpromoted metric add reorg metrics add Add() metrics --- core/txpool/legacypool/legacypool.go | 127 ++++++++++++++++-- .../legacypool/legacypool_throughput.go | 55 ++++++++ .../legacypool/legacypool_throughput_test.go | 47 +++++++ 3 files changed, 216 insertions(+), 13 deletions(-) create mode 100644 core/txpool/legacypool/legacypool_throughput.go create mode 100644 core/txpool/legacypool/legacypool_throughput_test.go diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 15a017d45..2151a4905 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -128,9 +128,13 @@ var ( validateBasicTimer = metrics.NewRegisteredTimer("txpool/validate/basic", nil) requestPromoteTimer = metrics.NewRegisteredTimer("txpool/request/promote", nil) + addWaitTimer = metrics.NewRegisteredTimer("txpool/add/wait/time", nil) + addExecTimer = metrics.NewRegisteredTimer("txpool/add/exec/time", nil) + // reorg detail metrics resetTimer = metrics.NewRegisteredTimer("txpool/resettime", nil) promoteTimer = metrics.NewRegisteredTimer("txpool/promotetime", nil) + feedTimer = metrics.NewRegisteredTimer("txpool/feedtime", nil) demoteTimer = metrics.NewRegisteredTimer("txpool/demotetime", nil) reorgresetTimer = metrics.NewRegisteredTimer("txpool/reorgresettime", nil) truncateTimer = metrics.NewRegisteredTimer("txpool/truncatetime", nil) @@ -139,6 +143,16 @@ var ( // latency of accessing state objects accountSnapReadsTimer = metrics.NewRegisteredTimer("txpool/account/snap/readtime", nil) accountTrieReadsTimer = metrics.NewRegisteredTimer("txpool/account/trie/readtime", nil) + + reorgWaitTimer = metrics.NewRegisteredTimer("txpool/reorg/wait/time", nil) + reorgExecTimer = metrics.NewRegisteredTimer("txpool/reorg/exec/time", nil) + metricsPeriodTimer = metrics.NewRegisteredTimer("txpool/metrics/period", nil) + + //throughput metrics + addAvgTimer = metrics.NewRegisteredTimer("txpool/avg/add", nil) + queue2pendingAvgTimer = metrics.NewRegisteredTimer("txpool/avg/queue2pending", nil) + pending2p2pAvgTimer = metrics.NewRegisteredTimer("txpool/avg/pending2p2p", nil) + demoteAvgTimer = metrics.NewRegisteredTimer("txpool/avg/demote", nil) ) // BlockChain defines the minimal set of methods needed to back a tx pool with @@ -253,6 +267,20 @@ func (config *Config) sanitize() Config { // current state) and future transactions. Transactions move between those // two states over time as they are received and processed. type LegacyPool struct { + througPuts struct { + add throughput + queueToPending throughput + pendingToP2P throughput + demote throughput + reorgWait throughput + reorgExec throughput + addWait throughput + addExec throughput + } + metrics struct { + unpromoted int64 + } + config Config chainconfig *params.ChainConfig chain BlockChain @@ -291,6 +319,41 @@ type LegacyPool struct { l1CostFn txpool.L1CostFunc // To apply L1 costs as rollup, optional field, may be nil. } +func (pool *LegacyPool) reportMetrics(oldHead, newHead *types.Header) { + now := time.Now() + addAvg, addDur, addCost, addCount, addTps := pool.througPuts.add.avgAndRest(now) + queue2pendingAvg, q2pDur, q2pCost, q2pCount, q2pTps := pool.througPuts.queueToPending.avgAndRest(now) + pending2P2PAvg, p2ppDur, p2ppCost, p2ppCount, p2ppTps := pool.througPuts.pendingToP2P.avgAndRest(now) + demoteAvg, demDur, demCost, demCount, demTps := pool.througPuts.demote.avgAndRest(now) + _, metricsPeriod, reorgExec, _, _ := pool.througPuts.reorgExec.avgAndRest(now) + _, _, reorgWait, _, _ := pool.througPuts.reorgWait.avgAndRest(now) + _, _, addExec, _, _ := pool.througPuts.addExec.avgAndRest(now) + _, _, addWait, _, _ := pool.througPuts.addWait.avgAndRest(now) + + unpromoted := atomic.LoadInt64(&pool.metrics.unpromoted) + + // reset metrics + pool.resetMetrics() + + metricsPeriodTimer.Update(metricsPeriod) + + addAvgTimer.Update(addAvg) + queue2pendingAvgTimer.Update(queue2pendingAvg) + pending2p2pAvgTimer.Update(pending2P2PAvg) + demoteAvgTimer.Update(demoteAvg) + + if oldHead != nil && newHead != nil { + log.Info("Transaction pool reorged", "from", oldHead.Number.Uint64(), "to", newHead.Number.Uint64(), + "addAvg", addAvg, "addDur", addDur, "addCost", addCost, "addCount", addCount, "addTps", addTps, + "queue2pendingAvg", queue2pendingAvg, "queue2pandingDur", q2pDur, "queue2pendingCost", q2pCost, "queue2pendingCount", q2pCount, "queue2pendingTps", q2pTps, + "pending2P2PAvg", pending2P2PAvg, "pending2P2PDur", p2ppDur, "pending2P2PCost", p2ppCost, "pending2P2PCount", p2ppCount, "pending2P2PTps", p2ppTps, + "demoteAvg", demoteAvg, "demoteDur", demDur, "demoteCost", demCost, "demoteCount", demCount, "demoteTps", demTps, + "unpromoted", unpromoted, "promoted", q2pCount, "demoted", demCount, + "reorgExec", reorgExec, "reorgWait", reorgWait, "addExec", addExec, "addWait", addWait, + ) + } +} + type txpoolResetRequest struct { oldHead, newHead *types.Header } @@ -330,6 +393,16 @@ func New(config Config, chain BlockChain) *LegacyPool { if (!config.NoLocals || config.JournalRemote) && config.Journal != "" { pool.journal = newTxJournal(config.Journal) } + // reset metrics + pool.througPuts.add.init(addTimer) + pool.througPuts.queueToPending.init(promoteTimer) + pool.througPuts.pendingToP2P.init(feedTimer) + pool.througPuts.demote.init(demoteTimer) + pool.througPuts.addWait.init(addWaitTimer) + pool.througPuts.addExec.init(addExecTimer) + pool.througPuts.reorgWait.init(reorgWaitTimer) + pool.througPuts.reorgExec.init(reorgExecTimer) + return pool } @@ -1166,14 +1239,16 @@ func (pool *LegacyPool) addRemoteSync(tx *types.Transaction) error { // to the add is finished. Only use this during tests for determinism! func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error { start := time.Now() - var durationValidate, durationPromote time.Duration - defer func() { + var addWait, durationValidate, durationPromote time.Duration + defer func(t0 time.Time) { + pool.througPuts.addWait.mark(addWait, 1) + pool.througPuts.addExec.mark(time.Since(t0)-addWait, 1) if len(txs) > 0 { - addTimer.Update(time.Since(start) / time.Duration(len(txs))) + pool.througPuts.add.mark(time.Since(start), len(txs)) validateBasicTimer.Update(durationValidate / time.Duration(len(txs))) requestPromoteTimer.Update(durationPromote / time.Duration(len(txs))) } - }() + }(time.Now()) // Do not treat as local if local transactions have been disabled local = local && !pool.config.NoLocals @@ -1207,14 +1282,18 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error // Process all the new transaction and merge any errors into the original slice durationValidate = time.Since(start) + tw := time.Now() pool.mu.Lock() t0 := time.Now() + addWait = t0.Sub(tw) newErrs, dirtyAddrs := pool.addTxsLocked(news, local) if len(news) > 0 { addWithLockTimer.Update(time.Since(t0) / time.Duration(len(news))) } + tw = time.Now() pool.mu.Unlock() t0 = time.Now() + addWait += t0.Sub(tw) var nilSlot = 0 for _, err := range newErrs { @@ -1464,14 +1543,25 @@ func (pool *LegacyPool) scheduleReorgLoop() { } } +func (pool *LegacyPool) resetMetrics() { + atomic.StoreInt64(&pool.metrics.unpromoted, 0) +} + // runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop. func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*sortedMap) { + var promoted []*types.Transaction + var unpromoted int + var reorgWait time.Duration defer func(t0 time.Time) { - reorgDurationTimer.Update(time.Since(t0)) + atomic.AddInt64(&pool.metrics.unpromoted, int64(unpromoted)) + reorgTotalCost := time.Since(t0) + reorgDurationTimer.Update(reorgTotalCost) + pool.througPuts.reorgWait.mark(reorgWait, 1) + pool.througPuts.reorgExec.mark(reorgTotalCost-reorgWait, 1) if reset != nil { reorgresetTimer.UpdateSince(t0) if reset.newHead != nil { - log.Info("Transaction pool reorged", "from", reset.oldHead.Number.Uint64(), "to", reset.newHead.Number.Uint64()) + pool.reportMetrics(reset.oldHead, reset.newHead) } } }(time.Now()) @@ -1484,7 +1574,9 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, // the flatten operation can be avoided. promoteAddrs = dirtyAccounts.flatten() } + tw := time.Now() pool.mu.Lock() + reorgWait = time.Since(tw) tl, t0 := time.Now(), time.Now() if reset != nil { if pool.currentState != nil && metrics.EnabledExpensive { @@ -1510,16 +1602,16 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, } // Check for pending transactions for every account that sent new ones t0 = time.Now() - promoted := pool.promoteExecutables(promoteAddrs) - promoteTimer.UpdateSince(t0) + promoted, unpromoted = pool.promoteExecutables(promoteAddrs) + pool.througPuts.queueToPending.mark(time.Since(t0), len(promoted)) // If a new block appeared, validate the pool of pending transactions. This will // remove any transaction that has been included in the block or was invalidated // because of another transaction (e.g. higher gas price). t0 = time.Now() if reset != nil { - pool.demoteUnexecutables(demoteAddrs) - demoteTimer.UpdateSince(t0) + demoted := pool.demoteUnexecutables(demoteAddrs) + pool.througPuts.demote.mark(time.Since(t0), demoted) var pendingBaseFee = pool.priced.urgent.baseFee if reset.newHead != nil { if pool.chainconfig.IsLondon(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) { @@ -1546,7 +1638,9 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, if reset != nil { reorgresetNoblockingTimer.UpdateSince(tl) } + tw = time.Now() pool.mu.Unlock() + reorgWait += time.Since(tw) // Notify subsystems for newly added transactions for _, tx := range promoted { @@ -1561,7 +1655,9 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, for _, set := range events { txs = append(txs, set.Flatten()...) } + tfeed := time.Now() pool.txFeed.Send(core.NewTxsEvent{Txs: txs}) + pool.througPuts.pendingToP2P.mark(time.Since(tfeed), len(txs)) } } @@ -1726,9 +1822,10 @@ func (pool *LegacyPool) reduceBalanceByL1Cost(list *list, balance *uint256.Int) // promoteExecutables moves transactions that have become processable from the // future queue to the set of pending transactions. During this process, all // invalidated transactions (low nonce, low balance) are deleted. -func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.Transaction { +func (pool *LegacyPool) promoteExecutables(accounts []common.Address) ([]*types.Transaction, int) { // Track the promoted transactions to broadcast them at once var promoted []*types.Transaction + var unpromoted int // Iterate over all accounts and promote any executable transactions gasLimit := txpool.EffectiveGasLimit(pool.chainconfig, pool.currentHead.Load().GasLimit, pool.config.EffectiveGasCeil) @@ -1737,6 +1834,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T if list == nil { continue // Just in case someone calls with a non existing account } + all := list.Len() // Drop all transactions that are deemed too old (low nonce) forwards := list.Forward(pool.currentState.GetNonce(addr)) for _, tx := range forwards { @@ -1781,6 +1879,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T // Mark all the items dropped as removed pool.priced.Removed(len(forwards) + len(drops) + len(caps)) queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps))) + all -= len(drops) + len(drops) + len(caps) if pool.locals.contains(addr) { localGauge.Dec(int64(len(forwards) + len(drops) + len(caps))) } @@ -1792,8 +1891,9 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T pool.reserve(addr, false) } } + unpromoted += all } - return promoted + return promoted, unpromoted } // truncatePending removes transactions from the pending queue if the pool is above the @@ -1939,7 +2039,7 @@ func (pool *LegacyPool) truncateQueue() { // Note: transactions are not marked as removed in the priced list because re-heaping // is always explicitly triggered by SetBaseFee and it would be unnecessary and wasteful // to trigger a re-heap is this function -func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) { +func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) int { if demoteAddrs == nil { demoteAddrs = make([]common.Address, 0, len(pool.pending)) for addr := range pool.pending { @@ -2015,6 +2115,7 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) { removed += len(dropPendingCache) } pool.priced.Removed(removed) + return removed } // addressByHeartbeat is an account address tagged with its last activity timestamp. diff --git a/core/txpool/legacypool/legacypool_throughput.go b/core/txpool/legacypool/legacypool_throughput.go new file mode 100644 index 000000000..1725595bd --- /dev/null +++ b/core/txpool/legacypool/legacypool_throughput.go @@ -0,0 +1,55 @@ +package legacypool + +import ( + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/metrics" +) + +// throughput is a struct that holds the throughput metrics of the transaction pool. +// it is used at all key points to measure the throughput of the whole path where a transaction comes into the pool. +type throughput struct { + costTimer metrics.Timer + lastReset time.Time + cost int64 // cost in nanoseconds, need to be handled with atomic, so let's use int64 + counter int64 +} + +func (t *throughput) init(timer metrics.Timer) { + t.costTimer = timer + t.lastReset = time.Now() +} + +func (t *throughput) mark(cost time.Duration, count int) { + atomic.AddInt64(&t.cost, int64(cost)) + atomic.AddInt64(&t.counter, int64(count)) +} + +// avgAndReset returns the average nanoseconds of a transaction that it takes to go through the path. +// it's not accurate, but it's good enough to give a rough idea of the throughput. +// metrics data will be reset after this call. +func (t *throughput) avgAndRest(now time.Time) (avgCost time.Duration, duration time.Duration, totalCost time.Duration, count int64, tps int64) { + totalCostI64 := atomic.LoadInt64(&t.cost) + if t.lastReset.IsZero() { + duration = time.Duration(0) + } else { + duration = now.Sub(t.lastReset) + } + count = atomic.LoadInt64(&t.counter) + totalCost = time.Duration(totalCostI64) + t.costTimer.Update(totalCost) + + atomic.StoreInt64(&t.cost, 0) + atomic.StoreInt64(&t.counter, 0) + t.lastReset = now + if count == 0 { + avgCost = 0 + } else { + avgCost = time.Duration(totalCostI64 / count) + } + + tpsF64 := float64(time.Second) / float64(totalCostI64) * float64(count) + tps = int64(tpsF64) + return +} diff --git a/core/txpool/legacypool/legacypool_throughput_test.go b/core/txpool/legacypool/legacypool_throughput_test.go new file mode 100644 index 000000000..dc6d5de6f --- /dev/null +++ b/core/txpool/legacypool/legacypool_throughput_test.go @@ -0,0 +1,47 @@ +package legacypool + +import ( + "testing" + "time" +) + +func TestThroughputAvg(t *testing.T) { + tp := throughput{ + lastReset: time.Now(), + cost: 0, + counter: 0, + } + tp.mark(200*time.Millisecond, 4) + tp.mark(50*time.Millisecond, 1) + avg, _, total, count, tps := tp.avgAndRest(tp.lastReset.Add(500 * time.Millisecond)) + if avg != 50*time.Millisecond { + t.Errorf("expected avg to be 50ms, got %v", avg) + } + if total != 250*time.Millisecond { + t.Errorf("expected total to be 250ms, got %v", total) + } + if count != 5 { + t.Errorf("expected count to be 5, got %v", count) + } + if tps != 20 { + t.Errorf("expected tps to be 20, got %v", tps) + } + + tp = throughput{} + tp.lastReset = time.Now() + tp.mark(200*time.Millisecond, 0) + avg, _, total, count, tps = tp.avgAndRest(tp.lastReset.Add(500 * time.Millisecond)) + if avg != 0 { + t.Errorf("expected avg to be 0, got %v", avg) + } + if total != 200*time.Millisecond { + t.Errorf("expected total to be 200ms, got %v", total) + } + if count != 0 { + t.Errorf("expected count to be 0, got %v", count) + } + if tps != 0 { + t.Errorf("expected tps to be 0, got %v", tps) + } + +}