From 96a490eb86f7a67173dd924097ced094fc571c24 Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Sat, 29 Jun 2024 14:34:49 -0400 Subject: [PATCH] refactor metrics address #2823, #2822, #2820 and #2819 --- ethergo/submitter/chain_queue.go | 113 +--------- ethergo/submitter/db/mocks/service.go | 23 +++ ethergo/submitter/db/service.go | 2 + ethergo/submitter/db/txdb/store.go | 20 ++ ethergo/submitter/db_test.go | 5 + ethergo/submitter/export_test.go | 10 + ethergo/submitter/metrics.go | 272 +++++++++++++++++++++++++ ethergo/submitter/metrics_generated.go | 26 +++ ethergo/submitter/queue.go | 56 +++++ ethergo/submitter/submitter.go | 131 ++++-------- ethergo/submitter/util.go | 31 +++ ethergo/submitter/util_test.go | 90 ++++++++ 12 files changed, 584 insertions(+), 195 deletions(-) create mode 100644 ethergo/submitter/metrics.go create mode 100644 ethergo/submitter/metrics_generated.go diff --git a/ethergo/submitter/chain_queue.go b/ethergo/submitter/chain_queue.go index c5d944f5ff..635c061bb5 100644 --- a/ethergo/submitter/chain_queue.go +++ b/ethergo/submitter/chain_queue.go @@ -20,7 +20,6 @@ import ( "github.com/synapsecns/sanguine/ethergo/client" "github.com/synapsecns/sanguine/ethergo/submitter/db" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" ) @@ -66,9 +65,9 @@ func (t *txSubmitterImpl) chainPendingQueue(parentCtx context.Context, chainID * span.SetAttributes(attribute.Int("nonce", int(currentNonce))) // record metrics for txes. - t.currentNonces.Set(uint32(chainID.Int64()), currentNonce) - t.numPendingTxes.Set(uint32(chainID.Int64()), calculatePendingTxes(txes, currentNonce)) - t.oldestPendingPerChain.Set(uint32(chainID.Int64()), fetchOldestPendingTx(txes, currentNonce)) + t.otelRecorder.RecordNonceForChain(uint32(chainID.Int64()), currentNonce) + t.otelRecorder.RecordNumPendingTxes(uint32(chainID.Int64()), calculatePendingTxes(txes, currentNonce)) + t.otelRecorder.RecordOldestPendingTx(uint32(chainID.Int64()), time.Since(fetchOldestPendingTx(txes, currentNonce))) wg := &sync.WaitGroup{} @@ -89,7 +88,7 @@ func (t *txSubmitterImpl) chainPendingQueue(parentCtx context.Context, chainID * if err != nil { return fmt.Errorf("could not get gas balance: %w", err) } - t.currentGasBalances.Set(uint32(chainID.Int64()), core.CopyBigInt(gasBalance)) + t.otelRecorder.RecordGasBalanceForChain(uint32(chainID.Int64()), core.CopyBigInt(gasBalance)) span.SetAttributes(attribute.String("gas_balance", gasBalance.String())) for i := range txes { @@ -143,110 +142,6 @@ func (t *txSubmitterImpl) chainPendingQueue(parentCtx context.Context, chainID * return nil } -// fetchOldestPendingTx fetches the oldest pending tx in the queue. -func fetchOldestPendingTx(txes []db.TX, nonce uint64) time.Time { - oldestPendingTx := time.Now() - for _, tx := range txes { - if tx.Nonce() >= nonce { - continue - } - - if tx.CreationTime().Before(oldestPendingTx) { - oldestPendingTx = tx.CreationTime() - } - } - - return oldestPendingTx - -} - -// calculatePendingTxes calculates the number of pending txes in the queue. -func calculatePendingTxes(txes []db.TX, nonce uint64) int { - realPendingCount := 0 - for _, tx := range txes { - // current nonce is going to be transaction count (nonces are offset by -1 since first nonce is 0) - // so we use equal to here - if tx.Nonce() >= nonce { - realPendingCount++ - } - } - - return realPendingCount - -} - -func (t *txSubmitterImpl) recordNumPending(_ context.Context, observer metric.Observer) (err error) { - if t.metrics == nil || t.numPendingGauge == nil || t.numPendingTxes == nil { - return nil - } - - t.numPendingTxes.Range(func(chainID uint32, numPending int) bool { - opts := metric.WithAttributes( - attribute.Int(metrics.ChainID, int(chainID)), - attribute.String("wallet", t.signer.Address().Hex()), - ) - observer.ObserveInt64(t.numPendingGauge, int64(numPending), opts) - - return true - }) - - return nil -} - -func (t *txSubmitterImpl) recordNonces(_ context.Context, observer metric.Observer) (err error) { - if t.metrics == nil || t.nonceGauge == nil || t.currentNonces == nil { - return nil - } - - t.currentNonces.Range(func(chainID uint32, nonce uint64) bool { - opts := metric.WithAttributes( - attribute.Int(metrics.ChainID, int(chainID)), - attribute.String("wallet", t.signer.Address().Hex()), - ) - observer.ObserveInt64(t.nonceGauge, int64(nonce), opts) - return true - }) - - return nil -} - -func (t *txSubmitterImpl) recordBalance(_ context.Context, observer metric.Observer) (err error) { - if t.metrics == nil || t.gasBalanceGauge == nil { - return nil - } - - t.currentGasBalances.Range(func(chainID uint32, gasPrice *big.Int) bool { - opts := metric.WithAttributes( - attribute.Int(metrics.ChainID, int(chainID)), - attribute.String("wallet", t.signer.Address().Hex()), - ) - - observer.ObserveFloat64(t.gasBalanceGauge, toFloat(gasPrice), opts) - return true - }) - - return nil -} - -func (t *txSubmitterImpl) recordOldestPendingTx(_ context.Context, observer metric.Observer) (err error) { - if t.metrics == nil || t.oldestPendingGauge == nil { - return nil - } - - t.oldestPendingPerChain.Range(func(chainID uint32, oldestPendingTx time.Time) bool { - opts := metric.WithAttributes( - attribute.Int(metrics.ChainID, int(chainID)), - attribute.String("wallet", t.signer.Address().Hex()), - ) - observer.ObserveFloat64(t.oldestPendingGauge, time.Since(oldestPendingTx).Seconds(), opts) - - return true - }) - - return nil - -} - func toFloat(wei *big.Int) float64 { // Convert wei to float64 weiFloat := new(big.Float).SetInt(wei) diff --git a/ethergo/submitter/db/mocks/service.go b/ethergo/submitter/db/mocks/service.go index c489c8c754..30e2527588 100644 --- a/ethergo/submitter/db/mocks/service.go +++ b/ethergo/submitter/db/mocks/service.go @@ -115,6 +115,29 @@ func (_m *Service) GetChainIDsByStatus(ctx context.Context, fromAddress common.A return r0, r1 } +// GetDistinctChainIDs provides a mock function with given fields: ctx +func (_m *Service) GetDistinctChainIDs(ctx context.Context) ([]*big.Int, error) { + ret := _m.Called(ctx) + + var r0 []*big.Int + if rf, ok := ret.Get(0).(func(context.Context) []*big.Int); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*big.Int) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetNonceAttemptsByStatus provides a mock function with given fields: ctx, fromAddress, chainID, nonce, matchStatuses func (_m *Service) GetNonceAttemptsByStatus(ctx context.Context, fromAddress common.Address, chainID *big.Int, nonce uint64, matchStatuses ...db.Status) ([]db.TX, error) { _va := make([]interface{}, len(matchStatuses)) diff --git a/ethergo/submitter/db/service.go b/ethergo/submitter/db/service.go index 103b885f1f..b0d27960f4 100644 --- a/ethergo/submitter/db/service.go +++ b/ethergo/submitter/db/service.go @@ -40,6 +40,8 @@ type Service interface { GetChainIDsByStatus(ctx context.Context, fromAddress common.Address, matchStatuses ...Status) (chainIDs []*big.Int, err error) // DeleteTXS deletes txs that are older than a given duration. DeleteTXS(ctx context.Context, maxAge time.Duration, matchStatuses ...Status) error + // GetDistinctChainIDs gets the distinct chain ids for all txs. + GetDistinctChainIDs(ctx context.Context) ([]*big.Int, error) } // TransactionFunc is a function that can be passed to DBTransaction. diff --git a/ethergo/submitter/db/txdb/store.go b/ethergo/submitter/db/txdb/store.go index 295a00b70f..06fb999a45 100644 --- a/ethergo/submitter/db/txdb/store.go +++ b/ethergo/submitter/db/txdb/store.go @@ -100,6 +100,26 @@ func convertTXS(ethTxes []ETHTX) (txs []db.TX, err error) { return txs, nil } +// GetDistinctChainIDs gets a list of all chains that have been used. +func (s *Store) GetDistinctChainIDs(ctx context.Context) ([]*big.Int, error) { + var chainIDs []string + err := s.db.WithContext(ctx). + Model(ÐTX{}). + Distinct(chainIDFieldName). + Pluck(chainIDFieldName, &chainIDs).Error + if err != nil { + return nil, err + } + + var result []*big.Int + for _, chainIDStr := range chainIDs { + chainID := new(big.Int) + chainID.SetString(chainIDStr, 10) + result = append(result, chainID) + } + return result, nil +} + // GetTXS returns all transactions for a given address on a given (or any) chain id that match a given status. // there is a limit of 50 transactions per chain id. The limit does not make any guarantees about the number of nonces per chain. // the submitter will get only the most recent tx submitted for each chain so this can be used for gas pricing. diff --git a/ethergo/submitter/db_test.go b/ethergo/submitter/db_test.go index 61b4b7ec51..8f402d96ac 100644 --- a/ethergo/submitter/db_test.go +++ b/ethergo/submitter/db_test.go @@ -50,6 +50,11 @@ func (t *TXSubmitterDBSuite) TestGetNonceForChainID() { } } } + + distinctChains, err := testDB.GetDistinctChainIDs(t.GetTestContext()) + t.Require().NoError(err) + + t.Require().Equal(len(t.testBackends), len(distinctChains)) }) } diff --git a/ethergo/submitter/export_test.go b/ethergo/submitter/export_test.go index 3855e123b0..3eb3d476ce 100644 --- a/ethergo/submitter/export_test.go +++ b/ethergo/submitter/export_test.go @@ -114,3 +114,13 @@ func (t *txSubmitterImpl) GetNonce(parentCtx context.Context, chainID *big.Int, func (t *txSubmitterImpl) CheckAndSetConfirmation(ctx context.Context, chainClient client.EVM, txes []db.TX) error { return t.checkAndSetConfirmation(ctx, chainClient, txes) } + +// Outersection exports outersection for testing. +func Outersection(set, superset []*big.Int) []*big.Int { + return outersection(set, superset) +} + +// MapToBigIntSlice exports mapToBigIntSlice for testing. +func MapToBigIntSlice[T any](m map[uint64]T) []*big.Int { + return mapToBigIntSlice(m) +} diff --git a/ethergo/submitter/metrics.go b/ethergo/submitter/metrics.go new file mode 100644 index 0000000000..82c1043777 --- /dev/null +++ b/ethergo/submitter/metrics.go @@ -0,0 +1,272 @@ +package submitter + +import ( + "context" + "fmt" + "github.com/cornelk/hashmap" + "github.com/synapsecns/sanguine/core/metrics" + "github.com/synapsecns/sanguine/ethergo/signer/signer" + "github.com/synapsecns/sanguine/ethergo/submitter/db" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "math/big" + "time" +) + +const meterName = "github.com/synapsecns/sanguine/ethergo/submitter" + +// generate an interface for otelRecorder that exports the public method. +// this allows us to avoid using recordX externally anad makes the package less confusing. +// +//go:generate go run github.com/vburenin/ifacemaker -f metrics.go -s otelRecorder -i iOtelRecorder -p submitter -o metrics_generated.go -c "autogenerated file" +type otelRecorder struct { + metrics metrics.Handler + // meter is the metrics meter. + meter metric.Meter + // confirmedQueueGauge is the gauge for the confirmed queue. + confirmedQueueGauge metric.Int64ObservableGauge + // oldestPendingGauge is the gauge for the oldest pending transaction. + oldestPendingGauge metric.Float64ObservableGauge + // numPendingGauge is the gauge for the number of pending transactions. + numPendingGauge metric.Int64ObservableGauge + // nonceGauge is the gauge for the current nonce. + nonceGauge metric.Int64ObservableGauge + // gasBalanceGauge is the gauge for the gas balance. + gasBalanceGauge metric.Float64ObservableGauge + // numPendingTxes is used for metrics. + // note: numPendingTxes will stop counting at MaxResultsPerChain. + numPendingTxes *hashmap.Map[uint32, int] + // currentNonces is used for metrics. + // chainID -> nonce + currentNonces *hashmap.Map[uint32, uint64] + // currentGasBalance is used for metrics. + // chainID -> balance + currentGasBalances *hashmap.Map[uint32, *big.Int] + // oldestPendingPerChain is the oldest pending transaction. + // chainID -> time + oldestPendingPerChain *hashmap.Map[uint32, time.Duration] + // confirmedQueueCount is the count of the confirmed queue. + confirmedQueueCount *hashmap.Map[uint32, int] + // signer is the signer for signing transactions. + signer signer.Signer +} + +// nolint: cyclop +func newOtelRecorder(meterHandler metrics.Handler, signerT signer.Signer) (_ iOtelRecorder, err error) { + or := otelRecorder{ + meter: meterHandler.Meter(meterName), + numPendingTxes: hashmap.New[uint32, int](), + currentNonces: hashmap.New[uint32, uint64](), + currentGasBalances: hashmap.New[uint32, *big.Int](), + oldestPendingPerChain: hashmap.New[uint32, time.Duration](), + confirmedQueueCount: hashmap.New[uint32, int](), + signer: signerT, + } + + or.numPendingGauge, err = or.meter.Int64ObservableGauge("num_pending_txes") + if err != nil { + return nil, fmt.Errorf("could not create num pending txes gauge: %w", err) + } + + _, err = or.meter.RegisterCallback(or.recordNumPending, or.numPendingGauge) + if err != nil { + return nil, fmt.Errorf("could not register callback: %w", err) + } + + or.nonceGauge, err = or.meter.Int64ObservableGauge("current_nonce") + if err != nil { + return nil, fmt.Errorf("could not create nonce gauge: %w", err) + } + + _, err = or.meter.RegisterCallback(or.recordNonces, or.nonceGauge) + if err != nil { + return nil, fmt.Errorf("could not register callback: %w", err) + } + + or.gasBalanceGauge, err = or.meter.Float64ObservableGauge("gas_balance") + if err != nil { + return nil, fmt.Errorf("could not create gas balance gauge: %w", err) + } + + _, err = or.meter.RegisterCallback(or.recordBalance, or.gasBalanceGauge) + if err != nil { + return nil, fmt.Errorf("could not register callback: %w", err) + } + + or.oldestPendingGauge, err = or.meter.Float64ObservableGauge("oldest_pending_tx", metric.WithUnit("s")) + if err != nil { + return nil, fmt.Errorf("could not create oldest pending gauge: %w", err) + } + + _, err = or.meter.RegisterCallback(or.recordOldestPendingTx, or.oldestPendingGauge) + if err != nil { + return nil, fmt.Errorf("could not register callback: %w", err) + } + + or.confirmedQueueGauge, err = or.meter.Int64ObservableGauge("confirmed_queue") + if err != nil { + return nil, fmt.Errorf("could not create oldest pending gauge: %w", err) + } + + _, err = or.meter.RegisterCallback(or.recordConfirmedQueue, or.confirmedQueueGauge) + if err != nil { + return nil, fmt.Errorf("could not register callback: %w", err) + } + + return &or, nil +} + +func (o *otelRecorder) recordNumPending(_ context.Context, observer metric.Observer) (err error) { + if o.metrics == nil || o.numPendingGauge == nil || o.numPendingTxes == nil { + return nil + } + + o.numPendingTxes.Range(func(chainID uint32, numPending int) bool { + opts := metric.WithAttributes( + attribute.Int(metrics.ChainID, int(chainID)), + attribute.String("wallet", o.signer.Address().Hex()), + ) + observer.ObserveInt64(o.numPendingGauge, int64(numPending), opts) + + return true + }) + + return nil +} + +func (o *otelRecorder) recordNonces(_ context.Context, observer metric.Observer) (err error) { + if o.metrics == nil || o.nonceGauge == nil || o.currentNonces == nil { + return nil + } + + o.currentNonces.Range(func(chainID uint32, nonce uint64) bool { + opts := metric.WithAttributes( + attribute.Int(metrics.ChainID, int(chainID)), + attribute.String("wallet", o.signer.Address().Hex()), + ) + observer.ObserveInt64(o.nonceGauge, int64(nonce), opts) + return true + }) + + return nil +} + +func (o *otelRecorder) recordConfirmedQueue(_ context.Context, observer metric.Observer) (err error) { + if o.metrics == nil || o.nonceGauge == nil || o.currentNonces == nil { + return nil + } + + o.confirmedQueueCount.Range(func(chainID uint32, queueSize int) bool { + opts := metric.WithAttributes( + attribute.Int(metrics.ChainID, int(chainID)), + attribute.String("wallet", o.signer.Address().Hex()), + ) + observer.ObserveInt64(o.confirmedQueueGauge, int64(queueSize), opts) + return true + }) + + return nil +} + +func (o *otelRecorder) recordBalance(_ context.Context, observer metric.Observer) (err error) { + if o.metrics == nil || o.gasBalanceGauge == nil { + return nil + } + + o.currentGasBalances.Range(func(chainID uint32, gasPrice *big.Int) bool { + opts := metric.WithAttributes( + attribute.Int(metrics.ChainID, int(chainID)), + attribute.String("wallet", o.signer.Address().Hex()), + ) + + observer.ObserveFloat64(o.gasBalanceGauge, toFloat(gasPrice), opts) + return true + }) + + return nil +} + +func (o *otelRecorder) recordOldestPendingTx(_ context.Context, observer metric.Observer) (err error) { + if o.metrics == nil || o.oldestPendingGauge == nil { + return nil + } + + o.oldestPendingPerChain.Range(func(chainID uint32, oldestPendingTx time.Duration) bool { + opts := metric.WithAttributes( + attribute.Int(metrics.ChainID, int(chainID)), + attribute.String("wallet", o.signer.Address().Hex()), + ) + observer.ObserveFloat64(o.oldestPendingGauge, oldestPendingTx.Seconds(), opts) + + return true + }) + + return nil +} + +// RecordNonceForChain sets the nonce for a chain. +func (o *otelRecorder) RecordNonceForChain(chainID uint32, nonce uint64) { + o.currentNonces.Set(chainID, nonce) +} + +// RecordGasBalanceForChain sets the gas balance for a chain. +func (o *otelRecorder) RecordGasBalanceForChain(chainID uint32, balance *big.Int) { + o.currentGasBalances.Set(chainID, balance) +} + +// RecordOldestPendingTx sets the oldest pending tx. +func (o *otelRecorder) RecordOldestPendingTx(chainID uint32, lastPending time.Duration) { + o.oldestPendingPerChain.Set(chainID, lastPending) +} + +// RecordNumPendingTxes sets the number of pending txes. +func (o *otelRecorder) RecordNumPendingTxes(chainID uint32, numPending int) { + o.numPendingTxes.Set(chainID, numPending) +} + +// RecordConfirmedQueue sets the confirmed queue count. +func (o *otelRecorder) RecordConfirmedQueue(chainID uint32, queueSize int) { + o.confirmedQueueCount.Set(chainID, queueSize) +} + +// HasNonceForChain checks if a nonce exists for a chain. +func (o *otelRecorder) HasNonceForChain(chainID uint32) bool { + _, ok := o.currentNonces.Get(chainID) + return ok +} + +// HasGasBalanceForChain checks if a gas balance exists for a chain. +func (o *otelRecorder) HasGasBalanceForChain(chainID uint32) bool { + _, ok := o.currentGasBalances.Get(chainID) + return ok +} + +// fetchOldestPendingTx fetches the oldest pending tx in the queue. +func fetchOldestPendingTx(txes []db.TX, nonce uint64) time.Time { + oldestPendingTx := time.Now() + for _, tx := range txes { + if tx.Nonce() >= nonce { + continue + } + + if tx.CreationTime().Before(oldestPendingTx) { + oldestPendingTx = tx.CreationTime() + } + } + + return oldestPendingTx +} + +// calculatePendingTxes calculates the number of pending txes in the queue. +func calculatePendingTxes(txes []db.TX, nonce uint64) int { + realPendingCount := 0 + for _, tx := range txes { + // current nonce is going to be transaction count (nonces are offset by -1 since first nonce is 0) + // so we use equal to here + if tx.Nonce() >= nonce { + realPendingCount++ + } + } + + return realPendingCount +} diff --git a/ethergo/submitter/metrics_generated.go b/ethergo/submitter/metrics_generated.go new file mode 100644 index 0000000000..16ece2837c --- /dev/null +++ b/ethergo/submitter/metrics_generated.go @@ -0,0 +1,26 @@ +// autogenerated file + +package submitter + +import ( + "math/big" + "time" +) + +// iOtelRecorder ... +type iOtelRecorder interface { + // RecordNonceForChain sets the nonce for a chain. + RecordNonceForChain(chainID uint32, nonce uint64) + // RecordGasBalanceForChain sets the gas balance for a chain. + RecordGasBalanceForChain(chainID uint32, balance *big.Int) + // RecordOldestPendingTx sets the oldest pending tx. + RecordOldestPendingTx(chainID uint32, lastPending time.Duration) + // RecordNumPendingTxes sets the number of pending txes. + RecordNumPendingTxes(chainID uint32, numPending int) + // RecordConfirmedQueue sets the confirmed queue count. + RecordConfirmedQueue(chainID uint32, queueSize int) + // HasNonceForChain checks if a nonce exists for a chain. + HasNonceForChain(chainID uint32) bool + // HasGasBalanceForChain checks if a gas balance exists for a chain. + HasGasBalanceForChain(chainID uint32) bool +} diff --git a/ethergo/submitter/queue.go b/ethergo/submitter/queue.go index a7d8ba5c90..2e271fe9fb 100644 --- a/ethergo/submitter/queue.go +++ b/ethergo/submitter/queue.go @@ -66,6 +66,10 @@ func (t *txSubmitterImpl) processQueue(parentCtx context.Context) (err error) { return fmt.Errorf("could not get pendingChainIDs: %w", err) } + t.distinctChainIDMux.RLock() + noOpChainIDs := outersection(pendingChainIDs, t.distinctChainIDs) + t.distinctChainIDMux.RUnlock() + pendingChainIDs64 := make([]int64, len(pendingChainIDs)) for i, chainID := range pendingChainIDs { pendingChainIDs64[i] = chainID.Int64() @@ -94,6 +98,48 @@ func (t *txSubmitterImpl) processQueue(parentCtx context.Context) (err error) { } }(chainID) } + + for _, chainID := range noOpChainIDs { + t.otelRecorder.RecordOldestPendingTx(uint32(chainID.Int64()), 0) + t.otelRecorder.RecordNumPendingTxes(uint32(chainID.Int64()), 0) + + if !t.otelRecorder.HasNonceForChain(uint32(chainID.Int64())) { + wg.Add(1) + go func() { + defer wg.Done() + evmClient, err := t.fetcher.GetClient(ctx, chainID) + if err != nil { + logger.Warn("could not get client", "error", err) + return + } + nonce, err := evmClient.NonceAt(ctx, t.signer.Address(), nil) + if err != nil { + logger.Warn("could not get nonce", "error", err) + return + } + t.otelRecorder.RecordNonceForChain(uint32(chainID.Int64()), nonce) + }() + } + + if !t.otelRecorder.HasGasBalanceForChain(uint32(chainID.Int64())) { + wg.Add(1) + go func() { + defer wg.Done() + evmClient, err := t.fetcher.GetClient(ctx, chainID) + if err != nil { + logger.Warn("could not get client", "error", err) + return + } + balance, err := evmClient.BalanceAt(ctx, t.signer.Address(), nil) + if err != nil { + logger.Warn("could not get balance", "error", err) + return + } + t.otelRecorder.RecordGasBalanceForChain(uint32(chainID.Int64()), balance) + }() + } + } + wg.Wait() return nil @@ -114,6 +160,10 @@ func (t *txSubmitterImpl) processConfirmedQueue(parentCtx context.Context) (err sortedTXsByChainID := sortTxesByChainID(txs, maxTxesPerChain) + t.distinctChainIDMux.RLock() + noOpChainIDs := outersection(mapToBigIntSlice(sortedTXsByChainID), t.distinctChainIDs) + t.distinctChainIDMux.RUnlock() + var wg sync.WaitGroup wg.Add(len(sortedTXsByChainID)) @@ -128,6 +178,10 @@ func (t *txSubmitterImpl) processConfirmedQueue(parentCtx context.Context) (err }(chainID) } + for _, chainID := range noOpChainIDs { + t.otelRecorder.RecordConfirmedQueue(uint32(chainID.Int64()), 0) + } + wg.Wait() return nil } @@ -138,6 +192,8 @@ func (t *txSubmitterImpl) chainConfirmQueue(parentCtx context.Context, chainID * metrics.EndSpanWithErr(span, err) }() + t.otelRecorder.RecordConfirmedQueue(uint32(chainID.Int64()), len(txes)) + // chainClient is the client for the chain we're working on chainClient, err := t.fetcher.GetClient(ctx, chainID) if err != nil { diff --git a/ethergo/submitter/submitter.go b/ethergo/submitter/submitter.go index c8cc3d757f..71a528aac4 100644 --- a/ethergo/submitter/submitter.go +++ b/ethergo/submitter/submitter.go @@ -11,8 +11,6 @@ import ( "sync" "time" - "github.com/cornelk/hashmap" - "github.com/google/uuid" "github.com/puzpuzpuz/xsync/v2" @@ -32,15 +30,12 @@ import ( "github.com/synapsecns/sanguine/ethergo/submitter/db" "github.com/synapsecns/sanguine/ethergo/util" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" ) var logger = log.Logger("ethergo-submitter") -const meterName = "github.com/synapsecns/sanguine/services/rfq/api/rest" - // TransactionSubmitter is the interface for submitting transactions to the chain. type TransactionSubmitter interface { // Start starts the transaction submitter. @@ -56,7 +51,6 @@ type TransactionSubmitter interface { // txSubmitterImpl is the implementation of the transaction submitter. type txSubmitterImpl struct { metrics metrics.Handler - meter metric.Meter // signer is the signer for signing transactions. signer signer.Signer // nonceMux is the mutex for the nonces. It is keyed by chain. @@ -78,26 +72,13 @@ type txSubmitterImpl struct { lastGasBlockCache *xsync.MapOf[int, *types.Header] // config is the config for the transaction submitter. config config.IConfig - // oldestPendingGauge is the gauge for the oldest pending transaction. - oldestPendingGauge metric.Float64ObservableGauge - // numPendingGauge is the gauge for the number of pending transactions. - numPendingGauge metric.Int64ObservableGauge - // nonceGauge is the gauge for the current nonce. - nonceGauge metric.Int64ObservableGauge - // gasBalanceGauge is the gauge for the gas balance. - gasBalanceGauge metric.Float64ObservableGauge - // numPendingTxes is used for metrics. - // note: numPendingTxes will stop counting at MaxResultsPerChain. - numPendingTxes *hashmap.Map[uint32, int] - // currentNonces is used for metrics. - // chainID -> nonce - currentNonces *hashmap.Map[uint32, uint64] - // currentGasBalance is used for metrics. - // chainID -> balance - currentGasBalances *hashmap.Map[uint32, *big.Int] - // oldestPendingPerChain is the oldest pending transaction. - // chainID -> time - oldestPendingPerChain *hashmap.Map[uint32, time.Time] + // otelRecorder is the recorder for the otel metrics. + otelRecorder iOtelRecorder + // distinctChainIDMux is the mutex for the distinct chain ids. + distinctChainIDMux sync.RWMutex + // distinctChainIDs is the distinct chain ids for the transaction submitter. + // note: this map should not be appended to! + distinctChainIDs []*big.Int } // ClientFetcher is the interface for fetching a chain client. @@ -110,20 +91,15 @@ type ClientFetcher interface { // NewTransactionSubmitter creates a new transaction submitter. func NewTransactionSubmitter(metrics metrics.Handler, signer signer.Signer, fetcher ClientFetcher, db db.Service, config config.IConfig) TransactionSubmitter { return &txSubmitterImpl{ - db: db, - config: config, - metrics: metrics, - meter: metrics.Meter(meterName), - signer: signer, - fetcher: fetcher, - nonceMux: mapmutex.NewStringerMapMutex(), - statusMux: mapmutex.NewStringMapMutex(), - retryNow: make(chan bool, 1), - lastGasBlockCache: xsync.NewIntegerMapOf[int, *types.Header](), - numPendingTxes: hashmap.New[uint32, int](), - currentNonces: hashmap.New[uint32, uint64](), - currentGasBalances: hashmap.New[uint32, *big.Int](), - oldestPendingPerChain: hashmap.New[uint32, time.Time](), + db: db, + config: config, + metrics: metrics, + signer: signer, + fetcher: fetcher, + nonceMux: mapmutex.NewStringerMapMutex(), + statusMux: mapmutex.NewStringMapMutex(), + retryNow: make(chan bool, 1), + lastGasBlockCache: xsync.NewIntegerMapOf[int, *types.Header](), } } @@ -136,10 +112,20 @@ func (t *txSubmitterImpl) GetRetryInterval() time.Duration { return retryInterval } +// GetDistinctInterval returns the interval at which distinct chain ids should be queried. +// this is used for metric updates. +func (t *txSubmitterImpl) GetDistinctInterval() time.Duration { + retryInterval := time.Second * 60 + t.retryOnce.Do(func() { + retryInterval = time.Duration(0) + }) + return retryInterval +} + func (t *txSubmitterImpl) Start(parentCtx context.Context) (err error) { - err = t.setupMetrics() + t.otelRecorder, err = newOtelRecorder(t.metrics, t.signer) if err != nil { - return fmt.Errorf("could not setup metrics: %w", err) + return fmt.Errorf("could not create otel recorder: %w", err) } // start reaper process @@ -158,6 +144,23 @@ func (t *txSubmitterImpl) Start(parentCtx context.Context) (err error) { } }() + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(t.GetDistinctInterval()): + tmpChainIDs, err := t.db.GetDistinctChainIDs(ctx) + if err != nil { + logger.Errorf("could not update distinct chain ids: %v", err) + } + t.distinctChainIDMux.Lock() + t.distinctChainIDs = tmpChainIDs + t.distinctChainIDMux.Unlock() + } + } + }() + i := 0 for { i++ @@ -173,50 +176,6 @@ func (t *txSubmitterImpl) Start(parentCtx context.Context) (err error) { } } -func (t *txSubmitterImpl) setupMetrics() (err error) { - t.numPendingGauge, err = t.meter.Int64ObservableGauge("num_pending_txes") - if err != nil { - return fmt.Errorf("could not create num pending txes gauge: %w", err) - } - - _, err = t.meter.RegisterCallback(t.recordNumPending, t.numPendingGauge) - if err != nil { - return fmt.Errorf("could not register callback: %w", err) - } - - t.nonceGauge, err = t.meter.Int64ObservableGauge("current_nonce") - if err != nil { - return fmt.Errorf("could not create nonce gauge: %w", err) - } - - _, err = t.meter.RegisterCallback(t.recordNonces, t.nonceGauge) - if err != nil { - return fmt.Errorf("could not register callback: %w", err) - } - - t.gasBalanceGauge, err = t.meter.Float64ObservableGauge("gas_balance") - if err != nil { - return fmt.Errorf("could not create gas balance gauge: %w", err) - } - - _, err = t.meter.RegisterCallback(t.recordBalance, t.gasBalanceGauge) - if err != nil { - return fmt.Errorf("could not register callback: %w", err) - } - - t.oldestPendingGauge, err = t.meter.Float64ObservableGauge("oldest_pending_tx", metric.WithUnit("s")) - if err != nil { - return fmt.Errorf("could not create oldest pending gauge: %w", err) - } - - _, err = t.meter.RegisterCallback(t.recordOldestPendingTx, t.oldestPendingGauge) - if err != nil { - return fmt.Errorf("could not register callback: %w", err) - } - - return nil -} - func (t *txSubmitterImpl) GetSubmissionStatus(ctx context.Context, chainID *big.Int, nonce uint64) (status SubmissionStatus, err error) { nonceStatus, err := t.db.GetNonceStatus(ctx, t.signer.Address(), chainID, nonce) if err != nil { diff --git a/ethergo/submitter/util.go b/ethergo/submitter/util.go index 68f73a1f97..63d43c92d4 100644 --- a/ethergo/submitter/util.go +++ b/ethergo/submitter/util.go @@ -148,3 +148,34 @@ func groupTxesByNonce(txs []db.TX) map[uint64][]db.TX { return txesByNonce } + +// Function to check if a slice contains a given big integer. +func contains(slice []*big.Int, elem *big.Int) bool { + for _, v := range slice { + if v.Cmp(elem) == 0 { + return true + } + } + return false +} + +// Function to get the outersection of two slices of big.Int. +func outersection(set, superset []*big.Int) []*big.Int { + var result []*big.Int + for _, elem := range superset { + if !contains(set, elem) { + result = append(result, elem) + } + } + return result +} + +// Generic function to convert a map[uint64]T to []*big.Int. +func mapToBigIntSlice[T any](m map[uint64]T) []*big.Int { + var result []*big.Int + for k := range m { + bigIntKey := new(big.Int).SetUint64(k) + result = append(result, bigIntKey) + } + return result +} diff --git a/ethergo/submitter/util_test.go b/ethergo/submitter/util_test.go index d39e3452a0..6965d502f0 100644 --- a/ethergo/submitter/util_test.go +++ b/ethergo/submitter/util_test.go @@ -296,3 +296,93 @@ func makeAttrMap(tx *types.Transaction, UUID string) map[string]attribute.Value } return mapAttr } + +// Test for the outersection function. +func TestOutersection(t *testing.T) { + set := []*big.Int{ + big.NewInt(2), + big.NewInt(4), + } + + superset := []*big.Int{ + big.NewInt(1), + big.NewInt(2), + big.NewInt(3), + big.NewInt(4), + big.NewInt(5), + } + + expected := []*big.Int{ + big.NewInt(1), + big.NewInt(3), + big.NewInt(5), + } + + result := submitter.Outersection(set, superset) + + if len(result) != len(expected) { + t.Fatalf("Expected %d elements, but got %d", len(expected), len(result)) + } + + for i, v := range result { + if v.Cmp(expected[i]) != 0 { + t.Errorf("Expected %s but got %s at index %d", expected[i], v, i) + } + } +} + +// Test for the mapToBigIntSlice function with generics. +func TestMapToBigIntSlice(t *testing.T) { + m := map[uint64]struct{}{ + 1: {}, + 2: {}, + 3: {}, + } + + expected := []*big.Int{ + big.NewInt(1), + big.NewInt(2), + big.NewInt(3), + } + + result := submitter.MapToBigIntSlice(m) + + if len(result) != len(expected) { + t.Fatalf("Expected %d elements, but got %d", len(expected), len(result)) + } + + for i, v := range result { + if v.Cmp(expected[i]) != 0 { + t.Errorf("Expected %s but got %s at index %d", expected[i], v, i) + } + } +} + +func TestMapToBigIntSliceWithStruct(t *testing.T) { + type MyStruct struct { + Value int + } + m := map[uint64]MyStruct{ + 1: {Value: 10}, + 2: {Value: 20}, + 3: {Value: 30}, + } + + expected := []*big.Int{ + big.NewInt(1), + big.NewInt(2), + big.NewInt(3), + } + + result := submitter.MapToBigIntSlice(m) + + if len(result) != len(expected) { + t.Fatalf("Expected %d elements, but got %d", len(expected), len(result)) + } + + for i, v := range result { + if v.Cmp(expected[i]) != 0 { + t.Errorf("Expected %s but got %s at index %d", expected[i], v, i) + } + } +}