Skip to content

Commit

Permalink
refactor metrics address #2823, #2822, #2820 and #2819
Browse files Browse the repository at this point in the history
  • Loading branch information
trajan0x committed Jun 29, 2024
1 parent 9cf433b commit 96a490e
Show file tree
Hide file tree
Showing 12 changed files with 584 additions and 195 deletions.
113 changes: 4 additions & 109 deletions ethergo/submitter/chain_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/synapsecns/sanguine/ethergo/client"
"github.com/synapsecns/sanguine/ethergo/submitter/db"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)

Expand Down Expand Up @@ -66,9 +65,9 @@ func (t *txSubmitterImpl) chainPendingQueue(parentCtx context.Context, chainID *
span.SetAttributes(attribute.Int("nonce", int(currentNonce)))

// record metrics for txes.
t.currentNonces.Set(uint32(chainID.Int64()), currentNonce)
t.numPendingTxes.Set(uint32(chainID.Int64()), calculatePendingTxes(txes, currentNonce))
t.oldestPendingPerChain.Set(uint32(chainID.Int64()), fetchOldestPendingTx(txes, currentNonce))
t.otelRecorder.RecordNonceForChain(uint32(chainID.Int64()), currentNonce)
t.otelRecorder.RecordNumPendingTxes(uint32(chainID.Int64()), calculatePendingTxes(txes, currentNonce))
t.otelRecorder.RecordOldestPendingTx(uint32(chainID.Int64()), time.Since(fetchOldestPendingTx(txes, currentNonce)))

wg := &sync.WaitGroup{}

Expand All @@ -89,7 +88,7 @@ func (t *txSubmitterImpl) chainPendingQueue(parentCtx context.Context, chainID *
if err != nil {
return fmt.Errorf("could not get gas balance: %w", err)
}
t.currentGasBalances.Set(uint32(chainID.Int64()), core.CopyBigInt(gasBalance))
t.otelRecorder.RecordGasBalanceForChain(uint32(chainID.Int64()), core.CopyBigInt(gasBalance))
span.SetAttributes(attribute.String("gas_balance", gasBalance.String()))

for i := range txes {
Expand Down Expand Up @@ -143,110 +142,6 @@ func (t *txSubmitterImpl) chainPendingQueue(parentCtx context.Context, chainID *
return nil
}

// fetchOldestPendingTx fetches the oldest pending tx in the queue.
func fetchOldestPendingTx(txes []db.TX, nonce uint64) time.Time {
oldestPendingTx := time.Now()
for _, tx := range txes {
if tx.Nonce() >= nonce {
continue
}

if tx.CreationTime().Before(oldestPendingTx) {
oldestPendingTx = tx.CreationTime()
}
}

return oldestPendingTx

}

// calculatePendingTxes calculates the number of pending txes in the queue.
func calculatePendingTxes(txes []db.TX, nonce uint64) int {
realPendingCount := 0
for _, tx := range txes {
// current nonce is going to be transaction count (nonces are offset by -1 since first nonce is 0)
// so we use equal to here
if tx.Nonce() >= nonce {
realPendingCount++
}
}

return realPendingCount

}

func (t *txSubmitterImpl) recordNumPending(_ context.Context, observer metric.Observer) (err error) {
if t.metrics == nil || t.numPendingGauge == nil || t.numPendingTxes == nil {
return nil
}

t.numPendingTxes.Range(func(chainID uint32, numPending int) bool {
opts := metric.WithAttributes(
attribute.Int(metrics.ChainID, int(chainID)),
attribute.String("wallet", t.signer.Address().Hex()),
)
observer.ObserveInt64(t.numPendingGauge, int64(numPending), opts)

return true
})

return nil
}

func (t *txSubmitterImpl) recordNonces(_ context.Context, observer metric.Observer) (err error) {
if t.metrics == nil || t.nonceGauge == nil || t.currentNonces == nil {
return nil
}

t.currentNonces.Range(func(chainID uint32, nonce uint64) bool {
opts := metric.WithAttributes(
attribute.Int(metrics.ChainID, int(chainID)),
attribute.String("wallet", t.signer.Address().Hex()),
)
observer.ObserveInt64(t.nonceGauge, int64(nonce), opts)
return true
})

return nil
}

func (t *txSubmitterImpl) recordBalance(_ context.Context, observer metric.Observer) (err error) {
if t.metrics == nil || t.gasBalanceGauge == nil {
return nil
}

t.currentGasBalances.Range(func(chainID uint32, gasPrice *big.Int) bool {
opts := metric.WithAttributes(
attribute.Int(metrics.ChainID, int(chainID)),
attribute.String("wallet", t.signer.Address().Hex()),
)

observer.ObserveFloat64(t.gasBalanceGauge, toFloat(gasPrice), opts)
return true
})

return nil
}

func (t *txSubmitterImpl) recordOldestPendingTx(_ context.Context, observer metric.Observer) (err error) {
if t.metrics == nil || t.oldestPendingGauge == nil {
return nil
}

t.oldestPendingPerChain.Range(func(chainID uint32, oldestPendingTx time.Time) bool {
opts := metric.WithAttributes(
attribute.Int(metrics.ChainID, int(chainID)),
attribute.String("wallet", t.signer.Address().Hex()),
)
observer.ObserveFloat64(t.oldestPendingGauge, time.Since(oldestPendingTx).Seconds(), opts)

return true
})

return nil

}

func toFloat(wei *big.Int) float64 {
// Convert wei to float64
weiFloat := new(big.Float).SetInt(wei)
Expand Down
23 changes: 23 additions & 0 deletions ethergo/submitter/db/mocks/service.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions ethergo/submitter/db/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type Service interface {
GetChainIDsByStatus(ctx context.Context, fromAddress common.Address, matchStatuses ...Status) (chainIDs []*big.Int, err error)
// DeleteTXS deletes txs that are older than a given duration.
DeleteTXS(ctx context.Context, maxAge time.Duration, matchStatuses ...Status) error
// GetDistinctChainIDs gets the distinct chain ids for all txs.
GetDistinctChainIDs(ctx context.Context) ([]*big.Int, error)
}

// TransactionFunc is a function that can be passed to DBTransaction.
Expand Down
20 changes: 20 additions & 0 deletions ethergo/submitter/db/txdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,26 @@ func convertTXS(ethTxes []ETHTX) (txs []db.TX, err error) {
return txs, nil
}

// GetDistinctChainIDs gets a list of all chains that have been used.
func (s *Store) GetDistinctChainIDs(ctx context.Context) ([]*big.Int, error) {
var chainIDs []string
err := s.db.WithContext(ctx).
Model(&ETHTX{}).
Distinct(chainIDFieldName).
Pluck(chainIDFieldName, &chainIDs).Error
if err != nil {
return nil, err
}

var result []*big.Int
for _, chainIDStr := range chainIDs {
chainID := new(big.Int)
chainID.SetString(chainIDStr, 10)
result = append(result, chainID)
}
return result, nil
}

// GetTXS returns all transactions for a given address on a given (or any) chain id that match a given status.
// there is a limit of 50 transactions per chain id. The limit does not make any guarantees about the number of nonces per chain.
// the submitter will get only the most recent tx submitted for each chain so this can be used for gas pricing.
Expand Down
5 changes: 5 additions & 0 deletions ethergo/submitter/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ func (t *TXSubmitterDBSuite) TestGetNonceForChainID() {
}
}
}

distinctChains, err := testDB.GetDistinctChainIDs(t.GetTestContext())
t.Require().NoError(err)

t.Require().Equal(len(t.testBackends), len(distinctChains))
})
}

Expand Down
10 changes: 10 additions & 0 deletions ethergo/submitter/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,13 @@ func (t *txSubmitterImpl) GetNonce(parentCtx context.Context, chainID *big.Int,
func (t *txSubmitterImpl) CheckAndSetConfirmation(ctx context.Context, chainClient client.EVM, txes []db.TX) error {
return t.checkAndSetConfirmation(ctx, chainClient, txes)
}

// Outersection exports outersection for testing.
func Outersection(set, superset []*big.Int) []*big.Int {
return outersection(set, superset)
}

// MapToBigIntSlice exports mapToBigIntSlice for testing.
func MapToBigIntSlice[T any](m map[uint64]T) []*big.Int {
return mapToBigIntSlice(m)
}
Loading

0 comments on commit 96a490e

Please sign in to comment.