Skip to content

Commit

Permalink
record oldest pending tx (fixes #2819)
Browse files Browse the repository at this point in the history
[goreleaser]
  • Loading branch information
trajan0x committed Jun 29, 2024
1 parent 021d351 commit 9cf433b
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 13 deletions.
37 changes: 37 additions & 0 deletions ethergo/submitter/chain_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (t *txSubmitterImpl) chainPendingQueue(parentCtx context.Context, chainID *
// record metrics for txes.
t.currentNonces.Set(uint32(chainID.Int64()), currentNonce)
t.numPendingTxes.Set(uint32(chainID.Int64()), calculatePendingTxes(txes, currentNonce))
t.oldestPendingPerChain.Set(uint32(chainID.Int64()), fetchOldestPendingTx(txes, currentNonce))

wg := &sync.WaitGroup{}

Expand Down Expand Up @@ -142,6 +143,23 @@ func (t *txSubmitterImpl) chainPendingQueue(parentCtx context.Context, chainID *
return nil
}

// fetchOldestPendingTx fetches the oldest pending tx in the queue.
func fetchOldestPendingTx(txes []db.TX, nonce uint64) time.Time {
oldestPendingTx := time.Now()
for _, tx := range txes {
if tx.Nonce() >= nonce {
continue
}

if tx.CreationTime().Before(oldestPendingTx) {
oldestPendingTx = tx.CreationTime()
}

Check warning on line 156 in ethergo/submitter/chain_queue.go

View check run for this annotation

Codecov / codecov/patch

ethergo/submitter/chain_queue.go#L154-L156

Added lines #L154 - L156 were not covered by tests
}

return oldestPendingTx

}

Check failure on line 161 in ethergo/submitter/chain_queue.go

View workflow job for this annotation

GitHub Actions / Lint (ethergo)

unnecessary trailing newline (whitespace)

// calculatePendingTxes calculates the number of pending txes in the queue.
func calculatePendingTxes(txes []db.TX, nonce uint64) int {
realPendingCount := 0
Expand Down Expand Up @@ -210,6 +228,25 @@ func (t *txSubmitterImpl) recordBalance(_ context.Context, observer metric.Obser
return nil
}

func (t *txSubmitterImpl) recordOldestPendingTx(_ context.Context, observer metric.Observer) (err error) {
if t.metrics == nil || t.oldestPendingGauge == nil {
return nil
}

Check warning on line 234 in ethergo/submitter/chain_queue.go

View check run for this annotation

Codecov / codecov/patch

ethergo/submitter/chain_queue.go#L231-L234

Added lines #L231 - L234 were not covered by tests

t.oldestPendingPerChain.Range(func(chainID uint32, oldestPendingTx time.Time) bool {
opts := metric.WithAttributes(
attribute.Int(metrics.ChainID, int(chainID)),
attribute.String("wallet", t.signer.Address().Hex()),
)
observer.ObserveFloat64(t.oldestPendingGauge, time.Since(oldestPendingTx).Seconds(), opts)

return true
})

Check warning on line 244 in ethergo/submitter/chain_queue.go

View check run for this annotation

Codecov / codecov/patch

ethergo/submitter/chain_queue.go#L236-L244

Added lines #L236 - L244 were not covered by tests

return nil

Check warning on line 246 in ethergo/submitter/chain_queue.go

View check run for this annotation

Codecov / codecov/patch

ethergo/submitter/chain_queue.go#L246

Added line #L246 was not covered by tests

}

Check failure on line 248 in ethergo/submitter/chain_queue.go

View workflow job for this annotation

GitHub Actions / Lint (ethergo)

unnecessary trailing newline (whitespace)

func toFloat(wei *big.Int) float64 {
// Convert wei to float64
weiFloat := new(big.Float).SetInt(wei)
Expand Down
45 changes: 32 additions & 13 deletions ethergo/submitter/submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,26 @@ type txSubmitterImpl struct {
lastGasBlockCache *xsync.MapOf[int, *types.Header]
// config is the config for the transaction submitter.
config config.IConfig
// oldestPendingGauge is the gauge for the oldest pending transaction.
oldestPendingGauge metric.Float64ObservableGauge
// numPendingGauge is the gauge for the number of pending transactions.
numPendingGauge metric.Int64ObservableGauge
// nonceGauge is the gauge for the current nonce.
nonceGauge metric.Int64ObservableGauge
// gasBalanceGauge is the gauge for the gas balance.
gasBalanceGauge metric.Float64ObservableGauge
// numPendingTxes is used for metrics.
// note: numPendingTxes will stop counting at MaxResultsPerChain.
numPendingTxes *hashmap.Map[uint32, int]
// currentNonces is used for metrics.
// chainID -> nonce
currentNonces *hashmap.Map[uint32, uint64]
// currentGasBalance is used for metrics.
// chainID -> balance
currentGasBalances *hashmap.Map[uint32, *big.Int]
// oldestPendingPerChain is the oldest pending transaction.
// chainID -> time
oldestPendingPerChain *hashmap.Map[uint32, time.Time]
}

// ClientFetcher is the interface for fetching a chain client.
Expand All @@ -102,19 +110,20 @@ type ClientFetcher interface {
// NewTransactionSubmitter creates a new transaction submitter.
func NewTransactionSubmitter(metrics metrics.Handler, signer signer.Signer, fetcher ClientFetcher, db db.Service, config config.IConfig) TransactionSubmitter {
return &txSubmitterImpl{
db: db,
config: config,
metrics: metrics,
meter: metrics.Meter(meterName),
signer: signer,
fetcher: fetcher,
nonceMux: mapmutex.NewStringerMapMutex(),
statusMux: mapmutex.NewStringMapMutex(),
retryNow: make(chan bool, 1),
lastGasBlockCache: xsync.NewIntegerMapOf[int, *types.Header](),
numPendingTxes: hashmap.New[uint32, int](),
currentNonces: hashmap.New[uint32, uint64](),
currentGasBalances: hashmap.New[uint32, *big.Int](),
db: db,
config: config,
metrics: metrics,
meter: metrics.Meter(meterName),
signer: signer,
fetcher: fetcher,
nonceMux: mapmutex.NewStringerMapMutex(),
statusMux: mapmutex.NewStringMapMutex(),
retryNow: make(chan bool, 1),
lastGasBlockCache: xsync.NewIntegerMapOf[int, *types.Header](),
numPendingTxes: hashmap.New[uint32, int](),
currentNonces: hashmap.New[uint32, uint64](),
currentGasBalances: hashmap.New[uint32, *big.Int](),
oldestPendingPerChain: hashmap.New[uint32, time.Time](),
}
}

Expand Down Expand Up @@ -195,6 +204,16 @@ func (t *txSubmitterImpl) setupMetrics() (err error) {
return fmt.Errorf("could not register callback: %w", err)
}

t.oldestPendingGauge, err = t.meter.Float64ObservableGauge("oldest_pending_tx", metric.WithUnit("s"))
if err != nil {
return fmt.Errorf("could not create oldest pending gauge: %w", err)
}

Check warning on line 210 in ethergo/submitter/submitter.go

View check run for this annotation

Codecov / codecov/patch

ethergo/submitter/submitter.go#L209-L210

Added lines #L209 - L210 were not covered by tests

_, err = t.meter.RegisterCallback(t.recordOldestPendingTx, t.oldestPendingGauge)
if err != nil {
return fmt.Errorf("could not register callback: %w", err)
}

Check warning on line 215 in ethergo/submitter/submitter.go

View check run for this annotation

Codecov / codecov/patch

ethergo/submitter/submitter.go#L214-L215

Added lines #L214 - L215 were not covered by tests

return nil
}

Expand Down

0 comments on commit 9cf433b

Please sign in to comment.