From 9cf433b6080b490a30767ad7065641dc80ef25d0 Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Sat, 29 Jun 2024 12:52:01 -0400 Subject: [PATCH 01/23] record oldest pending tx (fixes #2819) [goreleaser] --- ethergo/submitter/chain_queue.go | 37 ++++++++++++++++++++++++++ ethergo/submitter/submitter.go | 45 +++++++++++++++++++++++--------- 2 files changed, 69 insertions(+), 13 deletions(-) diff --git a/ethergo/submitter/chain_queue.go b/ethergo/submitter/chain_queue.go index cdcabfbd76..c5d944f5ff 100644 --- a/ethergo/submitter/chain_queue.go +++ b/ethergo/submitter/chain_queue.go @@ -68,6 +68,7 @@ func (t *txSubmitterImpl) chainPendingQueue(parentCtx context.Context, chainID * // record metrics for txes. t.currentNonces.Set(uint32(chainID.Int64()), currentNonce) t.numPendingTxes.Set(uint32(chainID.Int64()), calculatePendingTxes(txes, currentNonce)) + t.oldestPendingPerChain.Set(uint32(chainID.Int64()), fetchOldestPendingTx(txes, currentNonce)) wg := &sync.WaitGroup{} @@ -142,6 +143,23 @@ func (t *txSubmitterImpl) chainPendingQueue(parentCtx context.Context, chainID * return nil } +// fetchOldestPendingTx fetches the oldest pending tx in the queue. +func fetchOldestPendingTx(txes []db.TX, nonce uint64) time.Time { + oldestPendingTx := time.Now() + for _, tx := range txes { + if tx.Nonce() >= nonce { + continue + } + + if tx.CreationTime().Before(oldestPendingTx) { + oldestPendingTx = tx.CreationTime() + } + } + + return oldestPendingTx + +} + // calculatePendingTxes calculates the number of pending txes in the queue. func calculatePendingTxes(txes []db.TX, nonce uint64) int { realPendingCount := 0 @@ -210,6 +228,25 @@ func (t *txSubmitterImpl) recordBalance(_ context.Context, observer metric.Obser return nil } +func (t *txSubmitterImpl) recordOldestPendingTx(_ context.Context, observer metric.Observer) (err error) { + if t.metrics == nil || t.oldestPendingGauge == nil { + return nil + } + + t.oldestPendingPerChain.Range(func(chainID uint32, oldestPendingTx time.Time) bool { + opts := metric.WithAttributes( + attribute.Int(metrics.ChainID, int(chainID)), + attribute.String("wallet", t.signer.Address().Hex()), + ) + observer.ObserveFloat64(t.oldestPendingGauge, time.Since(oldestPendingTx).Seconds(), opts) + + return true + }) + + return nil + +} + func toFloat(wei *big.Int) float64 { // Convert wei to float64 weiFloat := new(big.Float).SetInt(wei) diff --git a/ethergo/submitter/submitter.go b/ethergo/submitter/submitter.go index df1c107ddd..c8cc3d757f 100644 --- a/ethergo/submitter/submitter.go +++ b/ethergo/submitter/submitter.go @@ -78,6 +78,8 @@ type txSubmitterImpl struct { lastGasBlockCache *xsync.MapOf[int, *types.Header] // config is the config for the transaction submitter. config config.IConfig + // oldestPendingGauge is the gauge for the oldest pending transaction. + oldestPendingGauge metric.Float64ObservableGauge // numPendingGauge is the gauge for the number of pending transactions. numPendingGauge metric.Int64ObservableGauge // nonceGauge is the gauge for the current nonce. @@ -85,11 +87,17 @@ type txSubmitterImpl struct { // gasBalanceGauge is the gauge for the gas balance. gasBalanceGauge metric.Float64ObservableGauge // numPendingTxes is used for metrics. + // note: numPendingTxes will stop counting at MaxResultsPerChain. numPendingTxes *hashmap.Map[uint32, int] // currentNonces is used for metrics. + // chainID -> nonce currentNonces *hashmap.Map[uint32, uint64] // currentGasBalance is used for metrics. + // chainID -> balance currentGasBalances *hashmap.Map[uint32, *big.Int] + // oldestPendingPerChain is the oldest pending transaction. + // chainID -> time + oldestPendingPerChain *hashmap.Map[uint32, time.Time] } // ClientFetcher is the interface for fetching a chain client. @@ -102,19 +110,20 @@ type ClientFetcher interface { // NewTransactionSubmitter creates a new transaction submitter. func NewTransactionSubmitter(metrics metrics.Handler, signer signer.Signer, fetcher ClientFetcher, db db.Service, config config.IConfig) TransactionSubmitter { return &txSubmitterImpl{ - db: db, - config: config, - metrics: metrics, - meter: metrics.Meter(meterName), - signer: signer, - fetcher: fetcher, - nonceMux: mapmutex.NewStringerMapMutex(), - statusMux: mapmutex.NewStringMapMutex(), - retryNow: make(chan bool, 1), - lastGasBlockCache: xsync.NewIntegerMapOf[int, *types.Header](), - numPendingTxes: hashmap.New[uint32, int](), - currentNonces: hashmap.New[uint32, uint64](), - currentGasBalances: hashmap.New[uint32, *big.Int](), + db: db, + config: config, + metrics: metrics, + meter: metrics.Meter(meterName), + signer: signer, + fetcher: fetcher, + nonceMux: mapmutex.NewStringerMapMutex(), + statusMux: mapmutex.NewStringMapMutex(), + retryNow: make(chan bool, 1), + lastGasBlockCache: xsync.NewIntegerMapOf[int, *types.Header](), + numPendingTxes: hashmap.New[uint32, int](), + currentNonces: hashmap.New[uint32, uint64](), + currentGasBalances: hashmap.New[uint32, *big.Int](), + oldestPendingPerChain: hashmap.New[uint32, time.Time](), } } @@ -195,6 +204,16 @@ func (t *txSubmitterImpl) setupMetrics() (err error) { return fmt.Errorf("could not register callback: %w", err) } + t.oldestPendingGauge, err = t.meter.Float64ObservableGauge("oldest_pending_tx", metric.WithUnit("s")) + if err != nil { + return fmt.Errorf("could not create oldest pending gauge: %w", err) + } + + _, err = t.meter.RegisterCallback(t.recordOldestPendingTx, t.oldestPendingGauge) + if err != nil { + return fmt.Errorf("could not register callback: %w", err) + } + return nil } From 96a490eb86f7a67173dd924097ced094fc571c24 Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Sat, 29 Jun 2024 14:34:49 -0400 Subject: [PATCH 02/23] refactor metrics address #2823, #2822, #2820 and #2819 --- ethergo/submitter/chain_queue.go | 113 +--------- ethergo/submitter/db/mocks/service.go | 23 +++ ethergo/submitter/db/service.go | 2 + ethergo/submitter/db/txdb/store.go | 20 ++ ethergo/submitter/db_test.go | 5 + ethergo/submitter/export_test.go | 10 + ethergo/submitter/metrics.go | 272 +++++++++++++++++++++++++ ethergo/submitter/metrics_generated.go | 26 +++ ethergo/submitter/queue.go | 56 +++++ ethergo/submitter/submitter.go | 131 ++++-------- ethergo/submitter/util.go | 31 +++ ethergo/submitter/util_test.go | 90 ++++++++ 12 files changed, 584 insertions(+), 195 deletions(-) create mode 100644 ethergo/submitter/metrics.go create mode 100644 ethergo/submitter/metrics_generated.go diff --git a/ethergo/submitter/chain_queue.go b/ethergo/submitter/chain_queue.go index c5d944f5ff..635c061bb5 100644 --- a/ethergo/submitter/chain_queue.go +++ b/ethergo/submitter/chain_queue.go @@ -20,7 +20,6 @@ import ( "github.com/synapsecns/sanguine/ethergo/client" "github.com/synapsecns/sanguine/ethergo/submitter/db" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" ) @@ -66,9 +65,9 @@ func (t *txSubmitterImpl) chainPendingQueue(parentCtx context.Context, chainID * span.SetAttributes(attribute.Int("nonce", int(currentNonce))) // record metrics for txes. - t.currentNonces.Set(uint32(chainID.Int64()), currentNonce) - t.numPendingTxes.Set(uint32(chainID.Int64()), calculatePendingTxes(txes, currentNonce)) - t.oldestPendingPerChain.Set(uint32(chainID.Int64()), fetchOldestPendingTx(txes, currentNonce)) + t.otelRecorder.RecordNonceForChain(uint32(chainID.Int64()), currentNonce) + t.otelRecorder.RecordNumPendingTxes(uint32(chainID.Int64()), calculatePendingTxes(txes, currentNonce)) + t.otelRecorder.RecordOldestPendingTx(uint32(chainID.Int64()), time.Since(fetchOldestPendingTx(txes, currentNonce))) wg := &sync.WaitGroup{} @@ -89,7 +88,7 @@ func (t *txSubmitterImpl) chainPendingQueue(parentCtx context.Context, chainID * if err != nil { return fmt.Errorf("could not get gas balance: %w", err) } - t.currentGasBalances.Set(uint32(chainID.Int64()), core.CopyBigInt(gasBalance)) + t.otelRecorder.RecordGasBalanceForChain(uint32(chainID.Int64()), core.CopyBigInt(gasBalance)) span.SetAttributes(attribute.String("gas_balance", gasBalance.String())) for i := range txes { @@ -143,110 +142,6 @@ func (t *txSubmitterImpl) chainPendingQueue(parentCtx context.Context, chainID * return nil } -// fetchOldestPendingTx fetches the oldest pending tx in the queue. -func fetchOldestPendingTx(txes []db.TX, nonce uint64) time.Time { - oldestPendingTx := time.Now() - for _, tx := range txes { - if tx.Nonce() >= nonce { - continue - } - - if tx.CreationTime().Before(oldestPendingTx) { - oldestPendingTx = tx.CreationTime() - } - } - - return oldestPendingTx - -} - -// calculatePendingTxes calculates the number of pending txes in the queue. -func calculatePendingTxes(txes []db.TX, nonce uint64) int { - realPendingCount := 0 - for _, tx := range txes { - // current nonce is going to be transaction count (nonces are offset by -1 since first nonce is 0) - // so we use equal to here - if tx.Nonce() >= nonce { - realPendingCount++ - } - } - - return realPendingCount - -} - -func (t *txSubmitterImpl) recordNumPending(_ context.Context, observer metric.Observer) (err error) { - if t.metrics == nil || t.numPendingGauge == nil || t.numPendingTxes == nil { - return nil - } - - t.numPendingTxes.Range(func(chainID uint32, numPending int) bool { - opts := metric.WithAttributes( - attribute.Int(metrics.ChainID, int(chainID)), - attribute.String("wallet", t.signer.Address().Hex()), - ) - observer.ObserveInt64(t.numPendingGauge, int64(numPending), opts) - - return true - }) - - return nil -} - -func (t *txSubmitterImpl) recordNonces(_ context.Context, observer metric.Observer) (err error) { - if t.metrics == nil || t.nonceGauge == nil || t.currentNonces == nil { - return nil - } - - t.currentNonces.Range(func(chainID uint32, nonce uint64) bool { - opts := metric.WithAttributes( - attribute.Int(metrics.ChainID, int(chainID)), - attribute.String("wallet", t.signer.Address().Hex()), - ) - observer.ObserveInt64(t.nonceGauge, int64(nonce), opts) - return true - }) - - return nil -} - -func (t *txSubmitterImpl) recordBalance(_ context.Context, observer metric.Observer) (err error) { - if t.metrics == nil || t.gasBalanceGauge == nil { - return nil - } - - t.currentGasBalances.Range(func(chainID uint32, gasPrice *big.Int) bool { - opts := metric.WithAttributes( - attribute.Int(metrics.ChainID, int(chainID)), - attribute.String("wallet", t.signer.Address().Hex()), - ) - - observer.ObserveFloat64(t.gasBalanceGauge, toFloat(gasPrice), opts) - return true - }) - - return nil -} - -func (t *txSubmitterImpl) recordOldestPendingTx(_ context.Context, observer metric.Observer) (err error) { - if t.metrics == nil || t.oldestPendingGauge == nil { - return nil - } - - t.oldestPendingPerChain.Range(func(chainID uint32, oldestPendingTx time.Time) bool { - opts := metric.WithAttributes( - attribute.Int(metrics.ChainID, int(chainID)), - attribute.String("wallet", t.signer.Address().Hex()), - ) - observer.ObserveFloat64(t.oldestPendingGauge, time.Since(oldestPendingTx).Seconds(), opts) - - return true - }) - - return nil - -} - func toFloat(wei *big.Int) float64 { // Convert wei to float64 weiFloat := new(big.Float).SetInt(wei) diff --git a/ethergo/submitter/db/mocks/service.go b/ethergo/submitter/db/mocks/service.go index c489c8c754..30e2527588 100644 --- a/ethergo/submitter/db/mocks/service.go +++ b/ethergo/submitter/db/mocks/service.go @@ -115,6 +115,29 @@ func (_m *Service) GetChainIDsByStatus(ctx context.Context, fromAddress common.A return r0, r1 } +// GetDistinctChainIDs provides a mock function with given fields: ctx +func (_m *Service) GetDistinctChainIDs(ctx context.Context) ([]*big.Int, error) { + ret := _m.Called(ctx) + + var r0 []*big.Int + if rf, ok := ret.Get(0).(func(context.Context) []*big.Int); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*big.Int) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetNonceAttemptsByStatus provides a mock function with given fields: ctx, fromAddress, chainID, nonce, matchStatuses func (_m *Service) GetNonceAttemptsByStatus(ctx context.Context, fromAddress common.Address, chainID *big.Int, nonce uint64, matchStatuses ...db.Status) ([]db.TX, error) { _va := make([]interface{}, len(matchStatuses)) diff --git a/ethergo/submitter/db/service.go b/ethergo/submitter/db/service.go index 103b885f1f..b0d27960f4 100644 --- a/ethergo/submitter/db/service.go +++ b/ethergo/submitter/db/service.go @@ -40,6 +40,8 @@ type Service interface { GetChainIDsByStatus(ctx context.Context, fromAddress common.Address, matchStatuses ...Status) (chainIDs []*big.Int, err error) // DeleteTXS deletes txs that are older than a given duration. DeleteTXS(ctx context.Context, maxAge time.Duration, matchStatuses ...Status) error + // GetDistinctChainIDs gets the distinct chain ids for all txs. + GetDistinctChainIDs(ctx context.Context) ([]*big.Int, error) } // TransactionFunc is a function that can be passed to DBTransaction. diff --git a/ethergo/submitter/db/txdb/store.go b/ethergo/submitter/db/txdb/store.go index 295a00b70f..06fb999a45 100644 --- a/ethergo/submitter/db/txdb/store.go +++ b/ethergo/submitter/db/txdb/store.go @@ -100,6 +100,26 @@ func convertTXS(ethTxes []ETHTX) (txs []db.TX, err error) { return txs, nil } +// GetDistinctChainIDs gets a list of all chains that have been used. +func (s *Store) GetDistinctChainIDs(ctx context.Context) ([]*big.Int, error) { + var chainIDs []string + err := s.db.WithContext(ctx). + Model(ÐTX{}). + Distinct(chainIDFieldName). + Pluck(chainIDFieldName, &chainIDs).Error + if err != nil { + return nil, err + } + + var result []*big.Int + for _, chainIDStr := range chainIDs { + chainID := new(big.Int) + chainID.SetString(chainIDStr, 10) + result = append(result, chainID) + } + return result, nil +} + // GetTXS returns all transactions for a given address on a given (or any) chain id that match a given status. // there is a limit of 50 transactions per chain id. The limit does not make any guarantees about the number of nonces per chain. // the submitter will get only the most recent tx submitted for each chain so this can be used for gas pricing. diff --git a/ethergo/submitter/db_test.go b/ethergo/submitter/db_test.go index 61b4b7ec51..8f402d96ac 100644 --- a/ethergo/submitter/db_test.go +++ b/ethergo/submitter/db_test.go @@ -50,6 +50,11 @@ func (t *TXSubmitterDBSuite) TestGetNonceForChainID() { } } } + + distinctChains, err := testDB.GetDistinctChainIDs(t.GetTestContext()) + t.Require().NoError(err) + + t.Require().Equal(len(t.testBackends), len(distinctChains)) }) } diff --git a/ethergo/submitter/export_test.go b/ethergo/submitter/export_test.go index 3855e123b0..3eb3d476ce 100644 --- a/ethergo/submitter/export_test.go +++ b/ethergo/submitter/export_test.go @@ -114,3 +114,13 @@ func (t *txSubmitterImpl) GetNonce(parentCtx context.Context, chainID *big.Int, func (t *txSubmitterImpl) CheckAndSetConfirmation(ctx context.Context, chainClient client.EVM, txes []db.TX) error { return t.checkAndSetConfirmation(ctx, chainClient, txes) } + +// Outersection exports outersection for testing. +func Outersection(set, superset []*big.Int) []*big.Int { + return outersection(set, superset) +} + +// MapToBigIntSlice exports mapToBigIntSlice for testing. +func MapToBigIntSlice[T any](m map[uint64]T) []*big.Int { + return mapToBigIntSlice(m) +} diff --git a/ethergo/submitter/metrics.go b/ethergo/submitter/metrics.go new file mode 100644 index 0000000000..82c1043777 --- /dev/null +++ b/ethergo/submitter/metrics.go @@ -0,0 +1,272 @@ +package submitter + +import ( + "context" + "fmt" + "github.com/cornelk/hashmap" + "github.com/synapsecns/sanguine/core/metrics" + "github.com/synapsecns/sanguine/ethergo/signer/signer" + "github.com/synapsecns/sanguine/ethergo/submitter/db" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "math/big" + "time" +) + +const meterName = "github.com/synapsecns/sanguine/ethergo/submitter" + +// generate an interface for otelRecorder that exports the public method. +// this allows us to avoid using recordX externally anad makes the package less confusing. +// +//go:generate go run github.com/vburenin/ifacemaker -f metrics.go -s otelRecorder -i iOtelRecorder -p submitter -o metrics_generated.go -c "autogenerated file" +type otelRecorder struct { + metrics metrics.Handler + // meter is the metrics meter. + meter metric.Meter + // confirmedQueueGauge is the gauge for the confirmed queue. + confirmedQueueGauge metric.Int64ObservableGauge + // oldestPendingGauge is the gauge for the oldest pending transaction. + oldestPendingGauge metric.Float64ObservableGauge + // numPendingGauge is the gauge for the number of pending transactions. + numPendingGauge metric.Int64ObservableGauge + // nonceGauge is the gauge for the current nonce. + nonceGauge metric.Int64ObservableGauge + // gasBalanceGauge is the gauge for the gas balance. + gasBalanceGauge metric.Float64ObservableGauge + // numPendingTxes is used for metrics. + // note: numPendingTxes will stop counting at MaxResultsPerChain. + numPendingTxes *hashmap.Map[uint32, int] + // currentNonces is used for metrics. + // chainID -> nonce + currentNonces *hashmap.Map[uint32, uint64] + // currentGasBalance is used for metrics. + // chainID -> balance + currentGasBalances *hashmap.Map[uint32, *big.Int] + // oldestPendingPerChain is the oldest pending transaction. + // chainID -> time + oldestPendingPerChain *hashmap.Map[uint32, time.Duration] + // confirmedQueueCount is the count of the confirmed queue. + confirmedQueueCount *hashmap.Map[uint32, int] + // signer is the signer for signing transactions. + signer signer.Signer +} + +// nolint: cyclop +func newOtelRecorder(meterHandler metrics.Handler, signerT signer.Signer) (_ iOtelRecorder, err error) { + or := otelRecorder{ + meter: meterHandler.Meter(meterName), + numPendingTxes: hashmap.New[uint32, int](), + currentNonces: hashmap.New[uint32, uint64](), + currentGasBalances: hashmap.New[uint32, *big.Int](), + oldestPendingPerChain: hashmap.New[uint32, time.Duration](), + confirmedQueueCount: hashmap.New[uint32, int](), + signer: signerT, + } + + or.numPendingGauge, err = or.meter.Int64ObservableGauge("num_pending_txes") + if err != nil { + return nil, fmt.Errorf("could not create num pending txes gauge: %w", err) + } + + _, err = or.meter.RegisterCallback(or.recordNumPending, or.numPendingGauge) + if err != nil { + return nil, fmt.Errorf("could not register callback: %w", err) + } + + or.nonceGauge, err = or.meter.Int64ObservableGauge("current_nonce") + if err != nil { + return nil, fmt.Errorf("could not create nonce gauge: %w", err) + } + + _, err = or.meter.RegisterCallback(or.recordNonces, or.nonceGauge) + if err != nil { + return nil, fmt.Errorf("could not register callback: %w", err) + } + + or.gasBalanceGauge, err = or.meter.Float64ObservableGauge("gas_balance") + if err != nil { + return nil, fmt.Errorf("could not create gas balance gauge: %w", err) + } + + _, err = or.meter.RegisterCallback(or.recordBalance, or.gasBalanceGauge) + if err != nil { + return nil, fmt.Errorf("could not register callback: %w", err) + } + + or.oldestPendingGauge, err = or.meter.Float64ObservableGauge("oldest_pending_tx", metric.WithUnit("s")) + if err != nil { + return nil, fmt.Errorf("could not create oldest pending gauge: %w", err) + } + + _, err = or.meter.RegisterCallback(or.recordOldestPendingTx, or.oldestPendingGauge) + if err != nil { + return nil, fmt.Errorf("could not register callback: %w", err) + } + + or.confirmedQueueGauge, err = or.meter.Int64ObservableGauge("confirmed_queue") + if err != nil { + return nil, fmt.Errorf("could not create oldest pending gauge: %w", err) + } + + _, err = or.meter.RegisterCallback(or.recordConfirmedQueue, or.confirmedQueueGauge) + if err != nil { + return nil, fmt.Errorf("could not register callback: %w", err) + } + + return &or, nil +} + +func (o *otelRecorder) recordNumPending(_ context.Context, observer metric.Observer) (err error) { + if o.metrics == nil || o.numPendingGauge == nil || o.numPendingTxes == nil { + return nil + } + + o.numPendingTxes.Range(func(chainID uint32, numPending int) bool { + opts := metric.WithAttributes( + attribute.Int(metrics.ChainID, int(chainID)), + attribute.String("wallet", o.signer.Address().Hex()), + ) + observer.ObserveInt64(o.numPendingGauge, int64(numPending), opts) + + return true + }) + + return nil +} + +func (o *otelRecorder) recordNonces(_ context.Context, observer metric.Observer) (err error) { + if o.metrics == nil || o.nonceGauge == nil || o.currentNonces == nil { + return nil + } + + o.currentNonces.Range(func(chainID uint32, nonce uint64) bool { + opts := metric.WithAttributes( + attribute.Int(metrics.ChainID, int(chainID)), + attribute.String("wallet", o.signer.Address().Hex()), + ) + observer.ObserveInt64(o.nonceGauge, int64(nonce), opts) + return true + }) + + return nil +} + +func (o *otelRecorder) recordConfirmedQueue(_ context.Context, observer metric.Observer) (err error) { + if o.metrics == nil || o.nonceGauge == nil || o.currentNonces == nil { + return nil + } + + o.confirmedQueueCount.Range(func(chainID uint32, queueSize int) bool { + opts := metric.WithAttributes( + attribute.Int(metrics.ChainID, int(chainID)), + attribute.String("wallet", o.signer.Address().Hex()), + ) + observer.ObserveInt64(o.confirmedQueueGauge, int64(queueSize), opts) + return true + }) + + return nil +} + +func (o *otelRecorder) recordBalance(_ context.Context, observer metric.Observer) (err error) { + if o.metrics == nil || o.gasBalanceGauge == nil { + return nil + } + + o.currentGasBalances.Range(func(chainID uint32, gasPrice *big.Int) bool { + opts := metric.WithAttributes( + attribute.Int(metrics.ChainID, int(chainID)), + attribute.String("wallet", o.signer.Address().Hex()), + ) + + observer.ObserveFloat64(o.gasBalanceGauge, toFloat(gasPrice), opts) + return true + }) + + return nil +} + +func (o *otelRecorder) recordOldestPendingTx(_ context.Context, observer metric.Observer) (err error) { + if o.metrics == nil || o.oldestPendingGauge == nil { + return nil + } + + o.oldestPendingPerChain.Range(func(chainID uint32, oldestPendingTx time.Duration) bool { + opts := metric.WithAttributes( + attribute.Int(metrics.ChainID, int(chainID)), + attribute.String("wallet", o.signer.Address().Hex()), + ) + observer.ObserveFloat64(o.oldestPendingGauge, oldestPendingTx.Seconds(), opts) + + return true + }) + + return nil +} + +// RecordNonceForChain sets the nonce for a chain. +func (o *otelRecorder) RecordNonceForChain(chainID uint32, nonce uint64) { + o.currentNonces.Set(chainID, nonce) +} + +// RecordGasBalanceForChain sets the gas balance for a chain. +func (o *otelRecorder) RecordGasBalanceForChain(chainID uint32, balance *big.Int) { + o.currentGasBalances.Set(chainID, balance) +} + +// RecordOldestPendingTx sets the oldest pending tx. +func (o *otelRecorder) RecordOldestPendingTx(chainID uint32, lastPending time.Duration) { + o.oldestPendingPerChain.Set(chainID, lastPending) +} + +// RecordNumPendingTxes sets the number of pending txes. +func (o *otelRecorder) RecordNumPendingTxes(chainID uint32, numPending int) { + o.numPendingTxes.Set(chainID, numPending) +} + +// RecordConfirmedQueue sets the confirmed queue count. +func (o *otelRecorder) RecordConfirmedQueue(chainID uint32, queueSize int) { + o.confirmedQueueCount.Set(chainID, queueSize) +} + +// HasNonceForChain checks if a nonce exists for a chain. +func (o *otelRecorder) HasNonceForChain(chainID uint32) bool { + _, ok := o.currentNonces.Get(chainID) + return ok +} + +// HasGasBalanceForChain checks if a gas balance exists for a chain. +func (o *otelRecorder) HasGasBalanceForChain(chainID uint32) bool { + _, ok := o.currentGasBalances.Get(chainID) + return ok +} + +// fetchOldestPendingTx fetches the oldest pending tx in the queue. +func fetchOldestPendingTx(txes []db.TX, nonce uint64) time.Time { + oldestPendingTx := time.Now() + for _, tx := range txes { + if tx.Nonce() >= nonce { + continue + } + + if tx.CreationTime().Before(oldestPendingTx) { + oldestPendingTx = tx.CreationTime() + } + } + + return oldestPendingTx +} + +// calculatePendingTxes calculates the number of pending txes in the queue. +func calculatePendingTxes(txes []db.TX, nonce uint64) int { + realPendingCount := 0 + for _, tx := range txes { + // current nonce is going to be transaction count (nonces are offset by -1 since first nonce is 0) + // so we use equal to here + if tx.Nonce() >= nonce { + realPendingCount++ + } + } + + return realPendingCount +} diff --git a/ethergo/submitter/metrics_generated.go b/ethergo/submitter/metrics_generated.go new file mode 100644 index 0000000000..16ece2837c --- /dev/null +++ b/ethergo/submitter/metrics_generated.go @@ -0,0 +1,26 @@ +// autogenerated file + +package submitter + +import ( + "math/big" + "time" +) + +// iOtelRecorder ... +type iOtelRecorder interface { + // RecordNonceForChain sets the nonce for a chain. + RecordNonceForChain(chainID uint32, nonce uint64) + // RecordGasBalanceForChain sets the gas balance for a chain. + RecordGasBalanceForChain(chainID uint32, balance *big.Int) + // RecordOldestPendingTx sets the oldest pending tx. + RecordOldestPendingTx(chainID uint32, lastPending time.Duration) + // RecordNumPendingTxes sets the number of pending txes. + RecordNumPendingTxes(chainID uint32, numPending int) + // RecordConfirmedQueue sets the confirmed queue count. + RecordConfirmedQueue(chainID uint32, queueSize int) + // HasNonceForChain checks if a nonce exists for a chain. + HasNonceForChain(chainID uint32) bool + // HasGasBalanceForChain checks if a gas balance exists for a chain. + HasGasBalanceForChain(chainID uint32) bool +} diff --git a/ethergo/submitter/queue.go b/ethergo/submitter/queue.go index a7d8ba5c90..2e271fe9fb 100644 --- a/ethergo/submitter/queue.go +++ b/ethergo/submitter/queue.go @@ -66,6 +66,10 @@ func (t *txSubmitterImpl) processQueue(parentCtx context.Context) (err error) { return fmt.Errorf("could not get pendingChainIDs: %w", err) } + t.distinctChainIDMux.RLock() + noOpChainIDs := outersection(pendingChainIDs, t.distinctChainIDs) + t.distinctChainIDMux.RUnlock() + pendingChainIDs64 := make([]int64, len(pendingChainIDs)) for i, chainID := range pendingChainIDs { pendingChainIDs64[i] = chainID.Int64() @@ -94,6 +98,48 @@ func (t *txSubmitterImpl) processQueue(parentCtx context.Context) (err error) { } }(chainID) } + + for _, chainID := range noOpChainIDs { + t.otelRecorder.RecordOldestPendingTx(uint32(chainID.Int64()), 0) + t.otelRecorder.RecordNumPendingTxes(uint32(chainID.Int64()), 0) + + if !t.otelRecorder.HasNonceForChain(uint32(chainID.Int64())) { + wg.Add(1) + go func() { + defer wg.Done() + evmClient, err := t.fetcher.GetClient(ctx, chainID) + if err != nil { + logger.Warn("could not get client", "error", err) + return + } + nonce, err := evmClient.NonceAt(ctx, t.signer.Address(), nil) + if err != nil { + logger.Warn("could not get nonce", "error", err) + return + } + t.otelRecorder.RecordNonceForChain(uint32(chainID.Int64()), nonce) + }() + } + + if !t.otelRecorder.HasGasBalanceForChain(uint32(chainID.Int64())) { + wg.Add(1) + go func() { + defer wg.Done() + evmClient, err := t.fetcher.GetClient(ctx, chainID) + if err != nil { + logger.Warn("could not get client", "error", err) + return + } + balance, err := evmClient.BalanceAt(ctx, t.signer.Address(), nil) + if err != nil { + logger.Warn("could not get balance", "error", err) + return + } + t.otelRecorder.RecordGasBalanceForChain(uint32(chainID.Int64()), balance) + }() + } + } + wg.Wait() return nil @@ -114,6 +160,10 @@ func (t *txSubmitterImpl) processConfirmedQueue(parentCtx context.Context) (err sortedTXsByChainID := sortTxesByChainID(txs, maxTxesPerChain) + t.distinctChainIDMux.RLock() + noOpChainIDs := outersection(mapToBigIntSlice(sortedTXsByChainID), t.distinctChainIDs) + t.distinctChainIDMux.RUnlock() + var wg sync.WaitGroup wg.Add(len(sortedTXsByChainID)) @@ -128,6 +178,10 @@ func (t *txSubmitterImpl) processConfirmedQueue(parentCtx context.Context) (err }(chainID) } + for _, chainID := range noOpChainIDs { + t.otelRecorder.RecordConfirmedQueue(uint32(chainID.Int64()), 0) + } + wg.Wait() return nil } @@ -138,6 +192,8 @@ func (t *txSubmitterImpl) chainConfirmQueue(parentCtx context.Context, chainID * metrics.EndSpanWithErr(span, err) }() + t.otelRecorder.RecordConfirmedQueue(uint32(chainID.Int64()), len(txes)) + // chainClient is the client for the chain we're working on chainClient, err := t.fetcher.GetClient(ctx, chainID) if err != nil { diff --git a/ethergo/submitter/submitter.go b/ethergo/submitter/submitter.go index c8cc3d757f..71a528aac4 100644 --- a/ethergo/submitter/submitter.go +++ b/ethergo/submitter/submitter.go @@ -11,8 +11,6 @@ import ( "sync" "time" - "github.com/cornelk/hashmap" - "github.com/google/uuid" "github.com/puzpuzpuz/xsync/v2" @@ -32,15 +30,12 @@ import ( "github.com/synapsecns/sanguine/ethergo/submitter/db" "github.com/synapsecns/sanguine/ethergo/util" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" ) var logger = log.Logger("ethergo-submitter") -const meterName = "github.com/synapsecns/sanguine/services/rfq/api/rest" - // TransactionSubmitter is the interface for submitting transactions to the chain. type TransactionSubmitter interface { // Start starts the transaction submitter. @@ -56,7 +51,6 @@ type TransactionSubmitter interface { // txSubmitterImpl is the implementation of the transaction submitter. type txSubmitterImpl struct { metrics metrics.Handler - meter metric.Meter // signer is the signer for signing transactions. signer signer.Signer // nonceMux is the mutex for the nonces. It is keyed by chain. @@ -78,26 +72,13 @@ type txSubmitterImpl struct { lastGasBlockCache *xsync.MapOf[int, *types.Header] // config is the config for the transaction submitter. config config.IConfig - // oldestPendingGauge is the gauge for the oldest pending transaction. - oldestPendingGauge metric.Float64ObservableGauge - // numPendingGauge is the gauge for the number of pending transactions. - numPendingGauge metric.Int64ObservableGauge - // nonceGauge is the gauge for the current nonce. - nonceGauge metric.Int64ObservableGauge - // gasBalanceGauge is the gauge for the gas balance. - gasBalanceGauge metric.Float64ObservableGauge - // numPendingTxes is used for metrics. - // note: numPendingTxes will stop counting at MaxResultsPerChain. - numPendingTxes *hashmap.Map[uint32, int] - // currentNonces is used for metrics. - // chainID -> nonce - currentNonces *hashmap.Map[uint32, uint64] - // currentGasBalance is used for metrics. - // chainID -> balance - currentGasBalances *hashmap.Map[uint32, *big.Int] - // oldestPendingPerChain is the oldest pending transaction. - // chainID -> time - oldestPendingPerChain *hashmap.Map[uint32, time.Time] + // otelRecorder is the recorder for the otel metrics. + otelRecorder iOtelRecorder + // distinctChainIDMux is the mutex for the distinct chain ids. + distinctChainIDMux sync.RWMutex + // distinctChainIDs is the distinct chain ids for the transaction submitter. + // note: this map should not be appended to! + distinctChainIDs []*big.Int } // ClientFetcher is the interface for fetching a chain client. @@ -110,20 +91,15 @@ type ClientFetcher interface { // NewTransactionSubmitter creates a new transaction submitter. func NewTransactionSubmitter(metrics metrics.Handler, signer signer.Signer, fetcher ClientFetcher, db db.Service, config config.IConfig) TransactionSubmitter { return &txSubmitterImpl{ - db: db, - config: config, - metrics: metrics, - meter: metrics.Meter(meterName), - signer: signer, - fetcher: fetcher, - nonceMux: mapmutex.NewStringerMapMutex(), - statusMux: mapmutex.NewStringMapMutex(), - retryNow: make(chan bool, 1), - lastGasBlockCache: xsync.NewIntegerMapOf[int, *types.Header](), - numPendingTxes: hashmap.New[uint32, int](), - currentNonces: hashmap.New[uint32, uint64](), - currentGasBalances: hashmap.New[uint32, *big.Int](), - oldestPendingPerChain: hashmap.New[uint32, time.Time](), + db: db, + config: config, + metrics: metrics, + signer: signer, + fetcher: fetcher, + nonceMux: mapmutex.NewStringerMapMutex(), + statusMux: mapmutex.NewStringMapMutex(), + retryNow: make(chan bool, 1), + lastGasBlockCache: xsync.NewIntegerMapOf[int, *types.Header](), } } @@ -136,10 +112,20 @@ func (t *txSubmitterImpl) GetRetryInterval() time.Duration { return retryInterval } +// GetDistinctInterval returns the interval at which distinct chain ids should be queried. +// this is used for metric updates. +func (t *txSubmitterImpl) GetDistinctInterval() time.Duration { + retryInterval := time.Second * 60 + t.retryOnce.Do(func() { + retryInterval = time.Duration(0) + }) + return retryInterval +} + func (t *txSubmitterImpl) Start(parentCtx context.Context) (err error) { - err = t.setupMetrics() + t.otelRecorder, err = newOtelRecorder(t.metrics, t.signer) if err != nil { - return fmt.Errorf("could not setup metrics: %w", err) + return fmt.Errorf("could not create otel recorder: %w", err) } // start reaper process @@ -158,6 +144,23 @@ func (t *txSubmitterImpl) Start(parentCtx context.Context) (err error) { } }() + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(t.GetDistinctInterval()): + tmpChainIDs, err := t.db.GetDistinctChainIDs(ctx) + if err != nil { + logger.Errorf("could not update distinct chain ids: %v", err) + } + t.distinctChainIDMux.Lock() + t.distinctChainIDs = tmpChainIDs + t.distinctChainIDMux.Unlock() + } + } + }() + i := 0 for { i++ @@ -173,50 +176,6 @@ func (t *txSubmitterImpl) Start(parentCtx context.Context) (err error) { } } -func (t *txSubmitterImpl) setupMetrics() (err error) { - t.numPendingGauge, err = t.meter.Int64ObservableGauge("num_pending_txes") - if err != nil { - return fmt.Errorf("could not create num pending txes gauge: %w", err) - } - - _, err = t.meter.RegisterCallback(t.recordNumPending, t.numPendingGauge) - if err != nil { - return fmt.Errorf("could not register callback: %w", err) - } - - t.nonceGauge, err = t.meter.Int64ObservableGauge("current_nonce") - if err != nil { - return fmt.Errorf("could not create nonce gauge: %w", err) - } - - _, err = t.meter.RegisterCallback(t.recordNonces, t.nonceGauge) - if err != nil { - return fmt.Errorf("could not register callback: %w", err) - } - - t.gasBalanceGauge, err = t.meter.Float64ObservableGauge("gas_balance") - if err != nil { - return fmt.Errorf("could not create gas balance gauge: %w", err) - } - - _, err = t.meter.RegisterCallback(t.recordBalance, t.gasBalanceGauge) - if err != nil { - return fmt.Errorf("could not register callback: %w", err) - } - - t.oldestPendingGauge, err = t.meter.Float64ObservableGauge("oldest_pending_tx", metric.WithUnit("s")) - if err != nil { - return fmt.Errorf("could not create oldest pending gauge: %w", err) - } - - _, err = t.meter.RegisterCallback(t.recordOldestPendingTx, t.oldestPendingGauge) - if err != nil { - return fmt.Errorf("could not register callback: %w", err) - } - - return nil -} - func (t *txSubmitterImpl) GetSubmissionStatus(ctx context.Context, chainID *big.Int, nonce uint64) (status SubmissionStatus, err error) { nonceStatus, err := t.db.GetNonceStatus(ctx, t.signer.Address(), chainID, nonce) if err != nil { diff --git a/ethergo/submitter/util.go b/ethergo/submitter/util.go index 68f73a1f97..63d43c92d4 100644 --- a/ethergo/submitter/util.go +++ b/ethergo/submitter/util.go @@ -148,3 +148,34 @@ func groupTxesByNonce(txs []db.TX) map[uint64][]db.TX { return txesByNonce } + +// Function to check if a slice contains a given big integer. +func contains(slice []*big.Int, elem *big.Int) bool { + for _, v := range slice { + if v.Cmp(elem) == 0 { + return true + } + } + return false +} + +// Function to get the outersection of two slices of big.Int. +func outersection(set, superset []*big.Int) []*big.Int { + var result []*big.Int + for _, elem := range superset { + if !contains(set, elem) { + result = append(result, elem) + } + } + return result +} + +// Generic function to convert a map[uint64]T to []*big.Int. +func mapToBigIntSlice[T any](m map[uint64]T) []*big.Int { + var result []*big.Int + for k := range m { + bigIntKey := new(big.Int).SetUint64(k) + result = append(result, bigIntKey) + } + return result +} diff --git a/ethergo/submitter/util_test.go b/ethergo/submitter/util_test.go index d39e3452a0..6965d502f0 100644 --- a/ethergo/submitter/util_test.go +++ b/ethergo/submitter/util_test.go @@ -296,3 +296,93 @@ func makeAttrMap(tx *types.Transaction, UUID string) map[string]attribute.Value } return mapAttr } + +// Test for the outersection function. +func TestOutersection(t *testing.T) { + set := []*big.Int{ + big.NewInt(2), + big.NewInt(4), + } + + superset := []*big.Int{ + big.NewInt(1), + big.NewInt(2), + big.NewInt(3), + big.NewInt(4), + big.NewInt(5), + } + + expected := []*big.Int{ + big.NewInt(1), + big.NewInt(3), + big.NewInt(5), + } + + result := submitter.Outersection(set, superset) + + if len(result) != len(expected) { + t.Fatalf("Expected %d elements, but got %d", len(expected), len(result)) + } + + for i, v := range result { + if v.Cmp(expected[i]) != 0 { + t.Errorf("Expected %s but got %s at index %d", expected[i], v, i) + } + } +} + +// Test for the mapToBigIntSlice function with generics. +func TestMapToBigIntSlice(t *testing.T) { + m := map[uint64]struct{}{ + 1: {}, + 2: {}, + 3: {}, + } + + expected := []*big.Int{ + big.NewInt(1), + big.NewInt(2), + big.NewInt(3), + } + + result := submitter.MapToBigIntSlice(m) + + if len(result) != len(expected) { + t.Fatalf("Expected %d elements, but got %d", len(expected), len(result)) + } + + for i, v := range result { + if v.Cmp(expected[i]) != 0 { + t.Errorf("Expected %s but got %s at index %d", expected[i], v, i) + } + } +} + +func TestMapToBigIntSliceWithStruct(t *testing.T) { + type MyStruct struct { + Value int + } + m := map[uint64]MyStruct{ + 1: {Value: 10}, + 2: {Value: 20}, + 3: {Value: 30}, + } + + expected := []*big.Int{ + big.NewInt(1), + big.NewInt(2), + big.NewInt(3), + } + + result := submitter.MapToBigIntSlice(m) + + if len(result) != len(expected) { + t.Fatalf("Expected %d elements, but got %d", len(expected), len(result)) + } + + for i, v := range result { + if v.Cmp(expected[i]) != 0 { + t.Errorf("Expected %s but got %s at index %d", expected[i], v, i) + } + } +} From 3ed2ea1557e76928bdd7105f51df10f595c06a77 Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Sat, 29 Jun 2024 14:41:34 -0400 Subject: [PATCH 03/23] cleanup [goreleaser] --- ethergo/submitter/submitter.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/ethergo/submitter/submitter.go b/ethergo/submitter/submitter.go index 71a528aac4..4a163f65db 100644 --- a/ethergo/submitter/submitter.go +++ b/ethergo/submitter/submitter.go @@ -63,6 +63,8 @@ type txSubmitterImpl struct { db db.Service // retryOnce is used to return 0 on the first call to GetRetryInterval. retryOnce sync.Once + // distinctOnce is used to return 0 on the first call to GetDistinctInterval. + distinctOnce sync.Once // retryNow is used to trigger a retry immediately. // it circumvents the retry interval. // to prevent memory leaks, this has a buffer of 1. @@ -115,13 +117,15 @@ func (t *txSubmitterImpl) GetRetryInterval() time.Duration { // GetDistinctInterval returns the interval at which distinct chain ids should be queried. // this is used for metric updates. func (t *txSubmitterImpl) GetDistinctInterval() time.Duration { - retryInterval := time.Second * 60 - t.retryOnce.Do(func() { + retryInterval := time.Minute + t.distinctOnce.Do(func() { retryInterval = time.Duration(0) }) return retryInterval } +// Start starts the transaction submitter. +// nolint: cyclop func (t *txSubmitterImpl) Start(parentCtx context.Context) (err error) { t.otelRecorder, err = newOtelRecorder(t.metrics, t.signer) if err != nil { From 4e2d5c8e3899965dec23fd298ed09219b596be01 Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Sat, 29 Jun 2024 14:51:25 -0400 Subject: [PATCH 04/23] address https://github.com/synapsecns/sanguine/pull/2821#discussion_r1659948503 [goreleaser] --- ethergo/submitter/metrics.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ethergo/submitter/metrics.go b/ethergo/submitter/metrics.go index 82c1043777..c878ce6899 100644 --- a/ethergo/submitter/metrics.go +++ b/ethergo/submitter/metrics.go @@ -70,7 +70,7 @@ func newOtelRecorder(meterHandler metrics.Handler, signerT signer.Signer) (_ iOt _, err = or.meter.RegisterCallback(or.recordNumPending, or.numPendingGauge) if err != nil { - return nil, fmt.Errorf("could not register callback: %w", err) + return nil, fmt.Errorf("could not register callback for num pending txes gauge: %w", err) } or.nonceGauge, err = or.meter.Int64ObservableGauge("current_nonce") @@ -80,7 +80,7 @@ func newOtelRecorder(meterHandler metrics.Handler, signerT signer.Signer) (_ iOt _, err = or.meter.RegisterCallback(or.recordNonces, or.nonceGauge) if err != nil { - return nil, fmt.Errorf("could not register callback: %w", err) + return nil, fmt.Errorf("could not register callback for nonce gauge: %w", err) } or.gasBalanceGauge, err = or.meter.Float64ObservableGauge("gas_balance") @@ -90,7 +90,7 @@ func newOtelRecorder(meterHandler metrics.Handler, signerT signer.Signer) (_ iOt _, err = or.meter.RegisterCallback(or.recordBalance, or.gasBalanceGauge) if err != nil { - return nil, fmt.Errorf("could not register callback: %w", err) + return nil, fmt.Errorf("could not register callback for gas balance gauge: %w", err) } or.oldestPendingGauge, err = or.meter.Float64ObservableGauge("oldest_pending_tx", metric.WithUnit("s")) @@ -100,17 +100,17 @@ func newOtelRecorder(meterHandler metrics.Handler, signerT signer.Signer) (_ iOt _, err = or.meter.RegisterCallback(or.recordOldestPendingTx, or.oldestPendingGauge) if err != nil { - return nil, fmt.Errorf("could not register callback: %w", err) + return nil, fmt.Errorf("could not register callback for oldest pending gauge: %w", err) } or.confirmedQueueGauge, err = or.meter.Int64ObservableGauge("confirmed_queue") if err != nil { - return nil, fmt.Errorf("could not create oldest pending gauge: %w", err) + return nil, fmt.Errorf("could not create confirmed queue gauge: %w", err) } _, err = or.meter.RegisterCallback(or.recordConfirmedQueue, or.confirmedQueueGauge) if err != nil { - return nil, fmt.Errorf("could not register callback: %w", err) + return nil, fmt.Errorf("could not register callback for confirmed queue gauge: %w", err) } return &or, nil From dcb40c28ffe063dcee9f5f6a63c88aaa898cd9c0 Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Sat, 29 Jun 2024 15:04:41 -0400 Subject: [PATCH 05/23] [goreleaser] log outersection --- ethergo/submitter/queue.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ethergo/submitter/queue.go b/ethergo/submitter/queue.go index 2e271fe9fb..f818eb20f8 100644 --- a/ethergo/submitter/queue.go +++ b/ethergo/submitter/queue.go @@ -68,6 +68,8 @@ func (t *txSubmitterImpl) processQueue(parentCtx context.Context) (err error) { t.distinctChainIDMux.RLock() noOpChainIDs := outersection(pendingChainIDs, t.distinctChainIDs) + fmt.Println("outersection 2", noOpChainIDs) + t.distinctChainIDMux.RUnlock() pendingChainIDs64 := make([]int64, len(pendingChainIDs)) @@ -164,6 +166,8 @@ func (t *txSubmitterImpl) processConfirmedQueue(parentCtx context.Context) (err noOpChainIDs := outersection(mapToBigIntSlice(sortedTXsByChainID), t.distinctChainIDs) t.distinctChainIDMux.RUnlock() + fmt.Println("outersection 1", noOpChainIDs) + var wg sync.WaitGroup wg.Add(len(sortedTXsByChainID)) From ec3dc9249da1de227e76a7213d34a9d741166f3a Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Sat, 29 Jun 2024 15:04:51 -0400 Subject: [PATCH 06/23] Revert "[goreleaser] log outersection" This reverts commit dcb40c28ffe063dcee9f5f6a63c88aaa898cd9c0. --- ethergo/submitter/queue.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/ethergo/submitter/queue.go b/ethergo/submitter/queue.go index f818eb20f8..2e271fe9fb 100644 --- a/ethergo/submitter/queue.go +++ b/ethergo/submitter/queue.go @@ -68,8 +68,6 @@ func (t *txSubmitterImpl) processQueue(parentCtx context.Context) (err error) { t.distinctChainIDMux.RLock() noOpChainIDs := outersection(pendingChainIDs, t.distinctChainIDs) - fmt.Println("outersection 2", noOpChainIDs) - t.distinctChainIDMux.RUnlock() pendingChainIDs64 := make([]int64, len(pendingChainIDs)) @@ -166,8 +164,6 @@ func (t *txSubmitterImpl) processConfirmedQueue(parentCtx context.Context) (err noOpChainIDs := outersection(mapToBigIntSlice(sortedTXsByChainID), t.distinctChainIDs) t.distinctChainIDMux.RUnlock() - fmt.Println("outersection 1", noOpChainIDs) - var wg sync.WaitGroup wg.Add(len(sortedTXsByChainID)) From f5f1bfece3e156667e95a8e2ef870b78cf463d87 Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Sat, 29 Jun 2024 15:12:05 -0400 Subject: [PATCH 07/23] [goreleaser] log record --- ethergo/submitter/metrics.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ethergo/submitter/metrics.go b/ethergo/submitter/metrics.go index c878ce6899..e21196fedd 100644 --- a/ethergo/submitter/metrics.go +++ b/ethergo/submitter/metrics.go @@ -207,25 +207,30 @@ func (o *otelRecorder) recordOldestPendingTx(_ context.Context, observer metric. // RecordNonceForChain sets the nonce for a chain. func (o *otelRecorder) RecordNonceForChain(chainID uint32, nonce uint64) { o.currentNonces.Set(chainID, nonce) + fmt.Println("recoreded", chainID, nonce) } // RecordGasBalanceForChain sets the gas balance for a chain. func (o *otelRecorder) RecordGasBalanceForChain(chainID uint32, balance *big.Int) { o.currentGasBalances.Set(chainID, balance) + fmt.Println("recoreded", chainID, balance) } // RecordOldestPendingTx sets the oldest pending tx. func (o *otelRecorder) RecordOldestPendingTx(chainID uint32, lastPending time.Duration) { o.oldestPendingPerChain.Set(chainID, lastPending) + fmt.Println("recoreded", chainID, lastPending) } // RecordNumPendingTxes sets the number of pending txes. func (o *otelRecorder) RecordNumPendingTxes(chainID uint32, numPending int) { o.numPendingTxes.Set(chainID, numPending) + fmt.Println("recoreded", chainID, numPending) } // RecordConfirmedQueue sets the confirmed queue count. func (o *otelRecorder) RecordConfirmedQueue(chainID uint32, queueSize int) { + fmt.Println("recoreded", chainID, queueSize) o.confirmedQueueCount.Set(chainID, queueSize) } From 85fbda0eed2798207d49d22c284414de8ae51a3a Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Sat, 29 Jun 2024 15:27:45 -0400 Subject: [PATCH 08/23] record log [goreleaser] --- ethergo/submitter/metrics.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/ethergo/submitter/metrics.go b/ethergo/submitter/metrics.go index e21196fedd..c156b1b2ed 100644 --- a/ethergo/submitter/metrics.go +++ b/ethergo/submitter/metrics.go @@ -128,6 +128,8 @@ func (o *otelRecorder) recordNumPending(_ context.Context, observer metric.Obser ) observer.ObserveInt64(o.numPendingGauge, int64(numPending), opts) + fmt.Println("recoreded", chainID, numPending) + return true }) @@ -145,6 +147,7 @@ func (o *otelRecorder) recordNonces(_ context.Context, observer metric.Observer) attribute.String("wallet", o.signer.Address().Hex()), ) observer.ObserveInt64(o.nonceGauge, int64(nonce), opts) + fmt.Println("recoreded", chainID, nonce) return true }) @@ -173,6 +176,8 @@ func (o *otelRecorder) recordBalance(_ context.Context, observer metric.Observer return nil } + fmt.Println("recoreded", o.currentGasBalances) + o.currentGasBalances.Range(func(chainID uint32, gasPrice *big.Int) bool { opts := metric.WithAttributes( attribute.Int(metrics.ChainID, int(chainID)), @@ -191,6 +196,8 @@ func (o *otelRecorder) recordOldestPendingTx(_ context.Context, observer metric. return nil } + fmt.Println("recoreded", o.oldestPendingPerChain) + o.oldestPendingPerChain.Range(func(chainID uint32, oldestPendingTx time.Duration) bool { opts := metric.WithAttributes( attribute.Int(metrics.ChainID, int(chainID)), From 5cb5cdef128e22b7d4ed006c18d545467f1168f1 Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Sat, 29 Jun 2024 15:27:52 -0400 Subject: [PATCH 09/23] Revert "record log [goreleaser]" This reverts commit 85fbda0eed2798207d49d22c284414de8ae51a3a. --- ethergo/submitter/metrics.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/ethergo/submitter/metrics.go b/ethergo/submitter/metrics.go index c156b1b2ed..e21196fedd 100644 --- a/ethergo/submitter/metrics.go +++ b/ethergo/submitter/metrics.go @@ -128,8 +128,6 @@ func (o *otelRecorder) recordNumPending(_ context.Context, observer metric.Obser ) observer.ObserveInt64(o.numPendingGauge, int64(numPending), opts) - fmt.Println("recoreded", chainID, numPending) - return true }) @@ -147,7 +145,6 @@ func (o *otelRecorder) recordNonces(_ context.Context, observer metric.Observer) attribute.String("wallet", o.signer.Address().Hex()), ) observer.ObserveInt64(o.nonceGauge, int64(nonce), opts) - fmt.Println("recoreded", chainID, nonce) return true }) @@ -176,8 +173,6 @@ func (o *otelRecorder) recordBalance(_ context.Context, observer metric.Observer return nil } - fmt.Println("recoreded", o.currentGasBalances) - o.currentGasBalances.Range(func(chainID uint32, gasPrice *big.Int) bool { opts := metric.WithAttributes( attribute.Int(metrics.ChainID, int(chainID)), @@ -196,8 +191,6 @@ func (o *otelRecorder) recordOldestPendingTx(_ context.Context, observer metric. return nil } - fmt.Println("recoreded", o.oldestPendingPerChain) - o.oldestPendingPerChain.Range(func(chainID uint32, oldestPendingTx time.Duration) bool { opts := metric.WithAttributes( attribute.Int(metrics.ChainID, int(chainID)), From beb2bd9f2fd86440569e331ca0f964b4c5b78252 Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Sat, 29 Jun 2024 15:31:45 -0400 Subject: [PATCH 10/23] [goreleaser] update fork --- ethergo/submitter/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethergo/submitter/metrics.go b/ethergo/submitter/metrics.go index e21196fedd..f5bf24f5ce 100644 --- a/ethergo/submitter/metrics.go +++ b/ethergo/submitter/metrics.go @@ -13,7 +13,7 @@ import ( "time" ) -const meterName = "github.com/synapsecns/sanguine/ethergo/submitter" +const meterName = "github.com/synapsecns/sanguine/services/rfq/api/rest" // generate an interface for otelRecorder that exports the public method. // this allows us to avoid using recordX externally anad makes the package less confusing. From a82182e6e41e63a77369c940c1ef517753bcab3f Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Sat, 29 Jun 2024 15:31:53 -0400 Subject: [PATCH 11/23] Revert "[goreleaser] update fork" This reverts commit beb2bd9f2fd86440569e331ca0f964b4c5b78252. --- ethergo/submitter/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethergo/submitter/metrics.go b/ethergo/submitter/metrics.go index f5bf24f5ce..e21196fedd 100644 --- a/ethergo/submitter/metrics.go +++ b/ethergo/submitter/metrics.go @@ -13,7 +13,7 @@ import ( "time" ) -const meterName = "github.com/synapsecns/sanguine/services/rfq/api/rest" +const meterName = "github.com/synapsecns/sanguine/ethergo/submitter" // generate an interface for otelRecorder that exports the public method. // this allows us to avoid using recordX externally anad makes the package less confusing. From 86c3d706aede4f16f93b9ac76ec14840533e897c Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Sat, 29 Jun 2024 15:53:31 -0400 Subject: [PATCH 12/23] [goreleaser] --- ethergo/submitter/submitter.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/ethergo/submitter/submitter.go b/ethergo/submitter/submitter.go index 4a163f65db..2323e1abec 100644 --- a/ethergo/submitter/submitter.go +++ b/ethergo/submitter/submitter.go @@ -92,6 +92,11 @@ type ClientFetcher interface { // NewTransactionSubmitter creates a new transaction submitter. func NewTransactionSubmitter(metrics metrics.Handler, signer signer.Signer, fetcher ClientFetcher, db db.Service, config config.IConfig) TransactionSubmitter { + yo, err := newOtelRecorder(metrics, signer) + if err != nil { + fmt.Printf("could not create otel recorder: %w", err) + } + return &txSubmitterImpl{ db: db, config: config, @@ -102,6 +107,7 @@ func NewTransactionSubmitter(metrics metrics.Handler, signer signer.Signer, fetc statusMux: mapmutex.NewStringMapMutex(), retryNow: make(chan bool, 1), lastGasBlockCache: xsync.NewIntegerMapOf[int, *types.Header](), + otelRecorder: yo, } } @@ -127,10 +133,10 @@ func (t *txSubmitterImpl) GetDistinctInterval() time.Duration { // Start starts the transaction submitter. // nolint: cyclop func (t *txSubmitterImpl) Start(parentCtx context.Context) (err error) { - t.otelRecorder, err = newOtelRecorder(t.metrics, t.signer) - if err != nil { - return fmt.Errorf("could not create otel recorder: %w", err) - } + //t.otelRecorder, err = newOtelRecorder(t.metrics, t.signer) + //if err != nil { + // return fmt.Errorf("could not create otel recorder: %w", err) + //} // start reaper process ctx, cancel := context.WithCancel(parentCtx) From cd249c5c451f143e41db12037f62ebb3685dc429 Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Sat, 29 Jun 2024 15:53:40 -0400 Subject: [PATCH 13/23] Revert "[goreleaser]" This reverts commit 86c3d706aede4f16f93b9ac76ec14840533e897c. --- ethergo/submitter/submitter.go | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/ethergo/submitter/submitter.go b/ethergo/submitter/submitter.go index 2323e1abec..4a163f65db 100644 --- a/ethergo/submitter/submitter.go +++ b/ethergo/submitter/submitter.go @@ -92,11 +92,6 @@ type ClientFetcher interface { // NewTransactionSubmitter creates a new transaction submitter. func NewTransactionSubmitter(metrics metrics.Handler, signer signer.Signer, fetcher ClientFetcher, db db.Service, config config.IConfig) TransactionSubmitter { - yo, err := newOtelRecorder(metrics, signer) - if err != nil { - fmt.Printf("could not create otel recorder: %w", err) - } - return &txSubmitterImpl{ db: db, config: config, @@ -107,7 +102,6 @@ func NewTransactionSubmitter(metrics metrics.Handler, signer signer.Signer, fetc statusMux: mapmutex.NewStringMapMutex(), retryNow: make(chan bool, 1), lastGasBlockCache: xsync.NewIntegerMapOf[int, *types.Header](), - otelRecorder: yo, } } @@ -133,10 +127,10 @@ func (t *txSubmitterImpl) GetDistinctInterval() time.Duration { // Start starts the transaction submitter. // nolint: cyclop func (t *txSubmitterImpl) Start(parentCtx context.Context) (err error) { - //t.otelRecorder, err = newOtelRecorder(t.metrics, t.signer) - //if err != nil { - // return fmt.Errorf("could not create otel recorder: %w", err) - //} + t.otelRecorder, err = newOtelRecorder(t.metrics, t.signer) + if err != nil { + return fmt.Errorf("could not create otel recorder: %w", err) + } // start reaper process ctx, cancel := context.WithCancel(parentCtx) From 689120abce2910b0da697734f24d9e31b403076a Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Sat, 29 Jun 2024 15:59:37 -0400 Subject: [PATCH 14/23] Revert "[goreleaser] log record" This reverts commit f5f1bfece3e156667e95a8e2ef870b78cf463d87. --- ethergo/submitter/metrics.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/ethergo/submitter/metrics.go b/ethergo/submitter/metrics.go index e21196fedd..c878ce6899 100644 --- a/ethergo/submitter/metrics.go +++ b/ethergo/submitter/metrics.go @@ -207,30 +207,25 @@ func (o *otelRecorder) recordOldestPendingTx(_ context.Context, observer metric. // RecordNonceForChain sets the nonce for a chain. func (o *otelRecorder) RecordNonceForChain(chainID uint32, nonce uint64) { o.currentNonces.Set(chainID, nonce) - fmt.Println("recoreded", chainID, nonce) } // RecordGasBalanceForChain sets the gas balance for a chain. func (o *otelRecorder) RecordGasBalanceForChain(chainID uint32, balance *big.Int) { o.currentGasBalances.Set(chainID, balance) - fmt.Println("recoreded", chainID, balance) } // RecordOldestPendingTx sets the oldest pending tx. func (o *otelRecorder) RecordOldestPendingTx(chainID uint32, lastPending time.Duration) { o.oldestPendingPerChain.Set(chainID, lastPending) - fmt.Println("recoreded", chainID, lastPending) } // RecordNumPendingTxes sets the number of pending txes. func (o *otelRecorder) RecordNumPendingTxes(chainID uint32, numPending int) { o.numPendingTxes.Set(chainID, numPending) - fmt.Println("recoreded", chainID, numPending) } // RecordConfirmedQueue sets the confirmed queue count. func (o *otelRecorder) RecordConfirmedQueue(chainID uint32, queueSize int) { - fmt.Println("recoreded", chainID, queueSize) o.confirmedQueueCount.Set(chainID, queueSize) } From 41410d0fb6401d439297360884477b6eb2ece1e4 Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Sat, 29 Jun 2024 16:06:33 -0400 Subject: [PATCH 15/23] REVERT ME cleanup [goreleaser] --- ethergo/submitter/submitter.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/ethergo/submitter/submitter.go b/ethergo/submitter/submitter.go index 4a163f65db..8890f86b2e 100644 --- a/ethergo/submitter/submitter.go +++ b/ethergo/submitter/submitter.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "github.com/brianvoe/gofakeit/v6" + "go.opentelemetry.io/otel/metric" "math" "math/big" "reflect" @@ -132,6 +134,17 @@ func (t *txSubmitterImpl) Start(parentCtx context.Context) (err error) { return fmt.Errorf("could not create otel recorder: %w", err) } + yo := t.metrics.Meter("submitter") + r, err := yo.Int64ObservableGauge("num_pending_txes") + if err != nil { + return fmt.Errorf("could not create num pending txes gauge: %w", err) + } + + yo.RegisterCallback(func(ctx context.Context, observer metric.Observer) error { + observer.ObserveInt64(r, gofakeit.Int64()) + return nil + }, r) + // start reaper process ctx, cancel := context.WithCancel(parentCtx) go func() { From 65edc1a73d50d8eab37b741a87678c6db19dddcb Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Sat, 29 Jun 2024 16:06:56 -0400 Subject: [PATCH 16/23] Revert "REVERT ME cleanup [goreleaser]" This reverts commit 41410d0fb6401d439297360884477b6eb2ece1e4. --- ethergo/submitter/submitter.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/ethergo/submitter/submitter.go b/ethergo/submitter/submitter.go index 8890f86b2e..4a163f65db 100644 --- a/ethergo/submitter/submitter.go +++ b/ethergo/submitter/submitter.go @@ -4,8 +4,6 @@ import ( "context" "errors" "fmt" - "github.com/brianvoe/gofakeit/v6" - "go.opentelemetry.io/otel/metric" "math" "math/big" "reflect" @@ -134,17 +132,6 @@ func (t *txSubmitterImpl) Start(parentCtx context.Context) (err error) { return fmt.Errorf("could not create otel recorder: %w", err) } - yo := t.metrics.Meter("submitter") - r, err := yo.Int64ObservableGauge("num_pending_txes") - if err != nil { - return fmt.Errorf("could not create num pending txes gauge: %w", err) - } - - yo.RegisterCallback(func(ctx context.Context, observer metric.Observer) error { - observer.ObserveInt64(r, gofakeit.Int64()) - return nil - }, r) - // start reaper process ctx, cancel := context.WithCancel(parentCtx) go func() { From 5a49e7b75cc7f3f8da55c3de3e96b4c7f9d28fc9 Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Sat, 29 Jun 2024 16:12:15 -0400 Subject: [PATCH 17/23] Reapply "REVERT ME cleanup [goreleaser]" This reverts commit 65edc1a73d50d8eab37b741a87678c6db19dddcb. --- ethergo/submitter/submitter.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/ethergo/submitter/submitter.go b/ethergo/submitter/submitter.go index 4a163f65db..8890f86b2e 100644 --- a/ethergo/submitter/submitter.go +++ b/ethergo/submitter/submitter.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "github.com/brianvoe/gofakeit/v6" + "go.opentelemetry.io/otel/metric" "math" "math/big" "reflect" @@ -132,6 +134,17 @@ func (t *txSubmitterImpl) Start(parentCtx context.Context) (err error) { return fmt.Errorf("could not create otel recorder: %w", err) } + yo := t.metrics.Meter("submitter") + r, err := yo.Int64ObservableGauge("num_pending_txes") + if err != nil { + return fmt.Errorf("could not create num pending txes gauge: %w", err) + } + + yo.RegisterCallback(func(ctx context.Context, observer metric.Observer) error { + observer.ObserveInt64(r, gofakeit.Int64()) + return nil + }, r) + // start reaper process ctx, cancel := context.WithCancel(parentCtx) go func() { From dfabfe780412f42456d82dd9438c4794dded5510 Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Sat, 29 Jun 2024 16:14:20 -0400 Subject: [PATCH 18/23] try things (in the arena) [goreleaser] --- ethergo/submitter/metrics.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/ethergo/submitter/metrics.go b/ethergo/submitter/metrics.go index c878ce6899..9ab835dd39 100644 --- a/ethergo/submitter/metrics.go +++ b/ethergo/submitter/metrics.go @@ -3,6 +3,7 @@ package submitter import ( "context" "fmt" + "github.com/brianvoe/gofakeit/v6" "github.com/cornelk/hashmap" "github.com/synapsecns/sanguine/core/metrics" "github.com/synapsecns/sanguine/ethergo/signer/signer" @@ -13,7 +14,7 @@ import ( "time" ) -const meterName = "github.com/synapsecns/sanguine/ethergo/submitter" +const meterName = "ethergo/submitter" // generate an interface for otelRecorder that exports the public method. // this allows us to avoid using recordX externally anad makes the package less confusing. @@ -63,6 +64,16 @@ func newOtelRecorder(meterHandler metrics.Handler, signerT signer.Signer) (_ iOt signer: signerT, } + r, err := or.meter.Int64ObservableGauge("IMARANDOMNUMBER") + if err != nil { + return nil, fmt.Errorf("could not create num pending txes gauge: %w", err) + } + + or.meter.RegisterCallback(func(ctx context.Context, observer metric.Observer) error { + observer.ObserveInt64(r, gofakeit.Int64()) + return nil + }, r) + or.numPendingGauge, err = or.meter.Int64ObservableGauge("num_pending_txes") if err != nil { return nil, fmt.Errorf("could not create num pending txes gauge: %w", err) From e41b1178a153f64bc686bdc8775043bd95e9bd17 Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Sat, 29 Jun 2024 16:21:05 -0400 Subject: [PATCH 19/23] try MORE thigns [goreleaser] --- ethergo/submitter/metrics.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ethergo/submitter/metrics.go b/ethergo/submitter/metrics.go index 9ab835dd39..0e08d11da6 100644 --- a/ethergo/submitter/metrics.go +++ b/ethergo/submitter/metrics.go @@ -128,11 +128,14 @@ func newOtelRecorder(meterHandler metrics.Handler, signerT signer.Signer) (_ iOt } func (o *otelRecorder) recordNumPending(_ context.Context, observer metric.Observer) (err error) { + fmt.Println("recordNumPending") if o.metrics == nil || o.numPendingGauge == nil || o.numPendingTxes == nil { return nil } + fmt.Println("recordNumPending 2") o.numPendingTxes.Range(func(chainID uint32, numPending int) bool { + fmt.Println("recordNumPending for ", chainID) opts := metric.WithAttributes( attribute.Int(metrics.ChainID, int(chainID)), attribute.String("wallet", o.signer.Address().Hex()), @@ -141,6 +144,7 @@ func (o *otelRecorder) recordNumPending(_ context.Context, observer metric.Obser return true }) + fmt.Println("recordNumPending 3") return nil } From 780c09784cda08d2eb1e3506c5490addca2a58ce Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Sat, 29 Jun 2024 16:27:03 -0400 Subject: [PATCH 20/23] Revert "try MORE thigns [goreleaser]" This reverts commit e41b1178a153f64bc686bdc8775043bd95e9bd17. --- ethergo/submitter/metrics.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/ethergo/submitter/metrics.go b/ethergo/submitter/metrics.go index 0e08d11da6..9ab835dd39 100644 --- a/ethergo/submitter/metrics.go +++ b/ethergo/submitter/metrics.go @@ -128,14 +128,11 @@ func newOtelRecorder(meterHandler metrics.Handler, signerT signer.Signer) (_ iOt } func (o *otelRecorder) recordNumPending(_ context.Context, observer metric.Observer) (err error) { - fmt.Println("recordNumPending") if o.metrics == nil || o.numPendingGauge == nil || o.numPendingTxes == nil { return nil } - fmt.Println("recordNumPending 2") o.numPendingTxes.Range(func(chainID uint32, numPending int) bool { - fmt.Println("recordNumPending for ", chainID) opts := metric.WithAttributes( attribute.Int(metrics.ChainID, int(chainID)), attribute.String("wallet", o.signer.Address().Hex()), @@ -144,7 +141,6 @@ func (o *otelRecorder) recordNumPending(_ context.Context, observer metric.Obser return true }) - fmt.Println("recordNumPending 3") return nil } From 252473556d3669e15bc763f37e62ae1719d7b1d5 Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Sat, 29 Jun 2024 16:27:15 -0400 Subject: [PATCH 21/23] Revert "try things (in the arena) [goreleaser]" This reverts commit dfabfe780412f42456d82dd9438c4794dded5510. --- ethergo/submitter/metrics.go | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/ethergo/submitter/metrics.go b/ethergo/submitter/metrics.go index 9ab835dd39..c878ce6899 100644 --- a/ethergo/submitter/metrics.go +++ b/ethergo/submitter/metrics.go @@ -3,7 +3,6 @@ package submitter import ( "context" "fmt" - "github.com/brianvoe/gofakeit/v6" "github.com/cornelk/hashmap" "github.com/synapsecns/sanguine/core/metrics" "github.com/synapsecns/sanguine/ethergo/signer/signer" @@ -14,7 +13,7 @@ import ( "time" ) -const meterName = "ethergo/submitter" +const meterName = "github.com/synapsecns/sanguine/ethergo/submitter" // generate an interface for otelRecorder that exports the public method. // this allows us to avoid using recordX externally anad makes the package less confusing. @@ -64,16 +63,6 @@ func newOtelRecorder(meterHandler metrics.Handler, signerT signer.Signer) (_ iOt signer: signerT, } - r, err := or.meter.Int64ObservableGauge("IMARANDOMNUMBER") - if err != nil { - return nil, fmt.Errorf("could not create num pending txes gauge: %w", err) - } - - or.meter.RegisterCallback(func(ctx context.Context, observer metric.Observer) error { - observer.ObserveInt64(r, gofakeit.Int64()) - return nil - }, r) - or.numPendingGauge, err = or.meter.Int64ObservableGauge("num_pending_txes") if err != nil { return nil, fmt.Errorf("could not create num pending txes gauge: %w", err) From aee3f7d63a00289e75b748d4e1fa545eb61c20d7 Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Sat, 29 Jun 2024 16:27:28 -0400 Subject: [PATCH 22/23] Revert "Reapply "REVERT ME cleanup [goreleaser]"" This reverts commit 5a49e7b75cc7f3f8da55c3de3e96b4c7f9d28fc9. --- ethergo/submitter/submitter.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/ethergo/submitter/submitter.go b/ethergo/submitter/submitter.go index 8890f86b2e..4a163f65db 100644 --- a/ethergo/submitter/submitter.go +++ b/ethergo/submitter/submitter.go @@ -4,8 +4,6 @@ import ( "context" "errors" "fmt" - "github.com/brianvoe/gofakeit/v6" - "go.opentelemetry.io/otel/metric" "math" "math/big" "reflect" @@ -134,17 +132,6 @@ func (t *txSubmitterImpl) Start(parentCtx context.Context) (err error) { return fmt.Errorf("could not create otel recorder: %w", err) } - yo := t.metrics.Meter("submitter") - r, err := yo.Int64ObservableGauge("num_pending_txes") - if err != nil { - return fmt.Errorf("could not create num pending txes gauge: %w", err) - } - - yo.RegisterCallback(func(ctx context.Context, observer metric.Observer) error { - observer.ObserveInt64(r, gofakeit.Int64()) - return nil - }, r) - // start reaper process ctx, cancel := context.WithCancel(parentCtx) go func() { From 3c88ae3f4773063029012eee04d533ce6db54050 Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Sat, 29 Jun 2024 16:27:57 -0400 Subject: [PATCH 23/23] attach metric handler :( [goreleaser] --- ethergo/submitter/metrics.go | 1 + 1 file changed, 1 insertion(+) diff --git a/ethergo/submitter/metrics.go b/ethergo/submitter/metrics.go index c878ce6899..47420d8a5a 100644 --- a/ethergo/submitter/metrics.go +++ b/ethergo/submitter/metrics.go @@ -54,6 +54,7 @@ type otelRecorder struct { // nolint: cyclop func newOtelRecorder(meterHandler metrics.Handler, signerT signer.Signer) (_ iOtelRecorder, err error) { or := otelRecorder{ + metrics: meterHandler, meter: meterHandler.Meter(meterName), numPendingTxes: hashmap.New[uint32, int](), currentNonces: hashmap.New[uint32, uint64](),