From d1c13feb66a4202e9d954943d365d76650b91a9f Mon Sep 17 00:00:00 2001 From: trajan0x <83933037+trajan0x@users.noreply.github.com> Date: Mon, 29 Jul 2024 11:42:24 -0400 Subject: [PATCH] fix pending queue (#2954) Co-authored-by: Trajan0x --- ethergo/listener/listener.go | 5 ++- ethergo/submitter/db/mocks/service.go | 36 ++++++++-------- ethergo/submitter/db/service.go | 62 ++++++++++++++++++++++++++- ethergo/submitter/db/txdb/store.go | 23 +++++----- ethergo/submitter/db_test.go | 14 +++--- ethergo/submitter/metrics.go | 2 +- ethergo/submitter/queue.go | 4 +- ethergo/submitter/submitter_test.go | 6 +-- 8 files changed, 104 insertions(+), 48 deletions(-) diff --git a/ethergo/listener/listener.go b/ethergo/listener/listener.go index 7a550f4991..73faac9987 100644 --- a/ethergo/listener/listener.go +++ b/ethergo/listener/listener.go @@ -153,9 +153,10 @@ func (c *chainListener) doPoll(parentCtx context.Context, handler HandleLog) (er // Check if latest block is the same as start block (for chains with slow block times) didPoll := true - defer span.SetAttributes(attribute.Bool("did_poll", didPoll)) + defer func() { + span.SetAttributes(attribute.Bool("did_poll", didPoll)) + }() if c.latestBlock == c.startBlock { - //nolint:ineffassign didPoll = false return nil } diff --git a/ethergo/submitter/db/mocks/service.go b/ethergo/submitter/db/mocks/service.go index 30e2527588..cbee8e168f 100644 --- a/ethergo/submitter/db/mocks/service.go +++ b/ethergo/submitter/db/mocks/service.go @@ -55,11 +55,11 @@ func (_m *Service) DeleteTXS(ctx context.Context, maxAge time.Duration, matchSta return r0 } -// GetAllTXAttemptByStatus provides a mock function with given fields: ctx, fromAddress, chainID, matchStatuses -func (_m *Service) GetAllTXAttemptByStatus(ctx context.Context, fromAddress common.Address, chainID *big.Int, matchStatuses ...db.Status) ([]db.TX, error) { - _va := make([]interface{}, len(matchStatuses)) - for _i := range matchStatuses { - _va[_i] = matchStatuses[_i] +// GetAllTXAttemptByStatus provides a mock function with given fields: ctx, fromAddress, chainID, options +func (_m *Service) GetAllTXAttemptByStatus(ctx context.Context, fromAddress common.Address, chainID *big.Int, options ...db.Option) ([]db.TX, error) { + _va := make([]interface{}, len(options)) + for _i := range options { + _va[_i] = options[_i] } var _ca []interface{} _ca = append(_ca, ctx, fromAddress, chainID) @@ -67,8 +67,8 @@ func (_m *Service) GetAllTXAttemptByStatus(ctx context.Context, fromAddress comm ret := _m.Called(_ca...) var r0 []db.TX - if rf, ok := ret.Get(0).(func(context.Context, common.Address, *big.Int, ...db.Status) []db.TX); ok { - r0 = rf(ctx, fromAddress, chainID, matchStatuses...) + if rf, ok := ret.Get(0).(func(context.Context, common.Address, *big.Int, ...db.Option) []db.TX); ok { + r0 = rf(ctx, fromAddress, chainID, options...) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]db.TX) @@ -76,8 +76,8 @@ func (_m *Service) GetAllTXAttemptByStatus(ctx context.Context, fromAddress comm } var r1 error - if rf, ok := ret.Get(1).(func(context.Context, common.Address, *big.Int, ...db.Status) error); ok { - r1 = rf(ctx, fromAddress, chainID, matchStatuses...) + if rf, ok := ret.Get(1).(func(context.Context, common.Address, *big.Int, ...db.Option) error); ok { + r1 = rf(ctx, fromAddress, chainID, options...) } else { r1 = ret.Error(1) } @@ -210,11 +210,11 @@ func (_m *Service) GetNonceStatus(ctx context.Context, fromAddress common.Addres return r0, r1 } -// GetTXS provides a mock function with given fields: ctx, fromAddress, chainID, statuses -func (_m *Service) GetTXS(ctx context.Context, fromAddress common.Address, chainID *big.Int, statuses ...db.Status) ([]db.TX, error) { - _va := make([]interface{}, len(statuses)) - for _i := range statuses { - _va[_i] = statuses[_i] +// GetTXS provides a mock function with given fields: ctx, fromAddress, chainID, options +func (_m *Service) GetTXS(ctx context.Context, fromAddress common.Address, chainID *big.Int, options ...db.Option) ([]db.TX, error) { + _va := make([]interface{}, len(options)) + for _i := range options { + _va[_i] = options[_i] } var _ca []interface{} _ca = append(_ca, ctx, fromAddress, chainID) @@ -222,8 +222,8 @@ func (_m *Service) GetTXS(ctx context.Context, fromAddress common.Address, chain ret := _m.Called(_ca...) var r0 []db.TX - if rf, ok := ret.Get(0).(func(context.Context, common.Address, *big.Int, ...db.Status) []db.TX); ok { - r0 = rf(ctx, fromAddress, chainID, statuses...) + if rf, ok := ret.Get(0).(func(context.Context, common.Address, *big.Int, ...db.Option) []db.TX); ok { + r0 = rf(ctx, fromAddress, chainID, options...) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]db.TX) @@ -231,8 +231,8 @@ func (_m *Service) GetTXS(ctx context.Context, fromAddress common.Address, chain } var r1 error - if rf, ok := ret.Get(1).(func(context.Context, common.Address, *big.Int, ...db.Status) error); ok { - r1 = rf(ctx, fromAddress, chainID, statuses...) + if rf, ok := ret.Get(1).(func(context.Context, common.Address, *big.Int, ...db.Option) error); ok { + r1 = rf(ctx, fromAddress, chainID, options...) } else { r1 = ret.Error(1) } diff --git a/ethergo/submitter/db/service.go b/ethergo/submitter/db/service.go index ad2bbd78d2..d1977e1134 100644 --- a/ethergo/submitter/db/service.go +++ b/ethergo/submitter/db/service.go @@ -23,7 +23,7 @@ type Service interface { // PutTXS stores a tx in the database. PutTXS(ctx context.Context, txs ...TX) error // GetTXS gets all txs for a given address and chain id. If chain id is nil, it will get all txs for the address. - GetTXS(ctx context.Context, fromAddress common.Address, chainID *big.Int, statuses ...Status) (txs []TX, err error) + GetTXS(ctx context.Context, fromAddress common.Address, chainID *big.Int, options ...Option) (txs []TX, err error) // MarkAllBeforeNonceReplacedOrConfirmed marks all txs for a given chain id and address before a given nonce as replaced or confirmed. // TODO: cleaner function name MarkAllBeforeNonceReplacedOrConfirmed(ctx context.Context, signer common.Address, chainID *big.Int, nonce uint64) error @@ -31,7 +31,7 @@ type Service interface { // the function passed in will be passed a new service that is scoped to the transaction. DBTransaction(ctx context.Context, f TransactionFunc) error // GetAllTXAttemptByStatus gets all txs for a given address and chain id with a given status. - GetAllTXAttemptByStatus(ctx context.Context, fromAddress common.Address, chainID *big.Int, matchStatuses ...Status) (txs []TX, err error) + GetAllTXAttemptByStatus(ctx context.Context, fromAddress common.Address, chainID *big.Int, options ...Option) (txs []TX, err error) // GetNonceStatus returns the nonce status for a given nonce by aggregating all attempts and finding the highest status. GetNonceStatus(ctx context.Context, fromAddress common.Address, chainID *big.Int, nonce uint64) (status Status, err error) // GetNonceAttemptsByStatus gets all txs for a given address and chain id with a given status and nonce. @@ -44,6 +44,64 @@ type Service interface { GetDistinctChainIDs(ctx context.Context) ([]*big.Int, error) } +// Option is a type for specifying optional parameters. +type Option func(*options) + +type options struct { + statuses []Status + maxResults int +} + +var _ OptionsFetcher = (*options)(nil) + +// OptionsFetcher is the interface for fetching options. +type OptionsFetcher interface { + Statuses() []Status + MaxResults() int +} + +func (o *options) MaxResults() int { + return o.maxResults +} + +func (o *options) Statuses() []Status { + return o.statuses +} + +// DefaultMaxResultsPerChain is the maximum number of transactions to return per chain id. +// it is exported for testing. +// TODO: this should be an option passed to the GetTXs function. +// TODO: temporarily reduced from 50 to 1 to increase resiliency. +const DefaultMaxResultsPerChain = 10 + +// ParseOptions parses the options. +func ParseOptions(opts ...Option) OptionsFetcher { + myOptions := &options{ + statuses: nil, + maxResults: DefaultMaxResultsPerChain, // Default to 0 for no limit. + } + + for _, opt := range opts { + opt(myOptions) + } + + return myOptions +} + +// WithStatuses specifies the statuses to match. +func WithStatuses(statuses ...Status) Option { + return func(opts *options) { + opts.statuses = statuses + } +} + +// WithMaxResults specifies the maximum number of results to return. +func WithMaxResults(maxResults int) Option { + return func(opts *options) { + opts.maxResults = maxResults + } +} + // TransactionFunc is a function that can be passed to DBTransaction. type TransactionFunc func(ctx context.Context, svc Service) error diff --git a/ethergo/submitter/db/txdb/store.go b/ethergo/submitter/db/txdb/store.go index 06fb999a45..43897231d8 100644 --- a/ethergo/submitter/db/txdb/store.go +++ b/ethergo/submitter/db/txdb/store.go @@ -53,6 +53,7 @@ func (s *Store) MarkAllBeforeNonceReplacedOrConfirmed(ctx context.Context, signe dbTX := s.db.WithContext(ctx).Model(ÐTX{}). Where(fmt.Sprintf("%s = ?", chainIDFieldName), chainID.Uint64()). Where(fmt.Sprintf("%s < ?", nonceFieldName), nonce). + Where(fmt.Sprintf("%s < ?", statusFieldName), db.ReplacedOrConfirmed). Where(fmt.Sprintf("`%s` = ?", fromFieldName), signer.String()). // just in case we're updating a tx already marked as confirmed Updates(map[string]interface{}{statusFieldName: db.ReplacedOrConfirmed.Int()}) @@ -64,12 +65,6 @@ func (s *Store) MarkAllBeforeNonceReplacedOrConfirmed(ctx context.Context, signe return nil } -// MaxResultsPerChain is the maximum number of transactions to return per chain id. -// it is exported for testing. -// TODO: this should be an option passed to the GetTXs function. -// TODO: temporarily reduced from 50 to 1 to increase resiliency. -const MaxResultsPerChain = 10 - func statusToArgs(matchStatuses ...db.Status) []int { inArgs := make([]int, len(matchStatuses)) for i := range matchStatuses { @@ -123,10 +118,11 @@ func (s *Store) GetDistinctChainIDs(ctx context.Context) ([]*big.Int, error) { // GetTXS returns all transactions for a given address on a given (or any) chain id that match a given status. // there is a limit of 50 transactions per chain id. The limit does not make any guarantees about the number of nonces per chain. // the submitter will get only the most recent tx submitted for each chain so this can be used for gas pricing. -func (s *Store) GetTXS(ctx context.Context, fromAddress common.Address, chainID *big.Int, matchStatuses ...db.Status) (txs []db.TX, err error) { +func (s *Store) GetTXS(ctx context.Context, fromAddress common.Address, chainID *big.Int, options ...db.Option) (txs []db.TX, err error) { var dbTXs []ETHTX - inArgs := statusToArgs(matchStatuses...) + madeOptions := db.ParseOptions(options...) + inArgs := statusToArgs(madeOptions.Statuses()...) query := ETHTX{ From: fromAddress.String(), @@ -147,7 +143,7 @@ func (s *Store) GetTXS(ctx context.Context, fromAddress common.Address, chainID Where(fmt.Sprintf("%s IN ?", statusFieldName), inArgs). Group(fmt.Sprintf("%s, %s", nonceFieldName, chainIDFieldName)). Order(fmt.Sprintf("%s asc", nonceFieldName)). - Limit(MaxResultsPerChain) + Limit(madeOptions.MaxResults()) joinQuery, err := interpol.WithMap( "INNER JOIN (?) as subquery on `{table}`.`{id}` = `subquery`.`{id}` AND `{table}`.`{chainID}` = `subquery`.`{chainID}`", map[string]string{ @@ -180,9 +176,12 @@ func (s *Store) GetTXS(ctx context.Context, fromAddress common.Address, chainID } // GetAllTXAttemptByStatus returns all transactions for a given address on a given (or any) chain id that match a given status. -func (s *Store) GetAllTXAttemptByStatus(ctx context.Context, fromAddress common.Address, chainID *big.Int, matchStatuses ...db.Status) (txs []db.TX, err error) { +func (s *Store) GetAllTXAttemptByStatus(ctx context.Context, fromAddress common.Address, chainID *big.Int, options ...db.Option) (txs []db.TX, err error) { var dbTXs []ETHTX + madeOptions := db.ParseOptions(options...) + inArgs := statusToArgs(madeOptions.Statuses()...) + query := ETHTX{ From: fromAddress.String(), } @@ -199,10 +198,10 @@ func (s *Store) GetAllTXAttemptByStatus(ctx context.Context, fromAddress common. subQuery := s.DB().Model(ÐTX{}). Select(fmt.Sprintf("MAX(%s) as %s, %s, %s", idFieldName, idFieldName, nonceFieldName, chainIDFieldName)). Where(query). - Where(fmt.Sprintf("%s IN ?", statusFieldName), statusToArgs(matchStatuses...)). + Where(fmt.Sprintf("%s IN ?", statusFieldName), inArgs). Group(fmt.Sprintf("%s, %s", nonceFieldName, chainIDFieldName)). Order(fmt.Sprintf("%s asc", nonceFieldName)). - Limit(MaxResultsPerChain) + Limit(madeOptions.MaxResults()) // one consequence of innerjoining on nonce is we can't cap the max results for the whole query. This is a known limitation joinQuery, err := interpol.WithMap( diff --git a/ethergo/submitter/db_test.go b/ethergo/submitter/db_test.go index 8f402d96ac..b3c1a11d8b 100644 --- a/ethergo/submitter/db_test.go +++ b/ethergo/submitter/db_test.go @@ -12,8 +12,6 @@ import ( "github.com/synapsecns/sanguine/ethergo/mocks" "github.com/synapsecns/sanguine/ethergo/signer/nonce" "github.com/synapsecns/sanguine/ethergo/submitter/db" - "github.com/synapsecns/sanguine/ethergo/submitter/db/txdb" - "math/big" "github.com/synapsecns/sanguine/ethergo/util" @@ -95,11 +93,11 @@ func (t *TXSubmitterDBSuite) TestGetTransactionsWithLimitPerChainID() { } // get the transactions with limit per ChainID - result, err := testDB.GetTXS(t.GetTestContext(), mockAccount.Address, backend.GetBigChainID(), db.Pending) + result, err := testDB.GetTXS(t.GetTestContext(), mockAccount.Address, backend.GetBigChainID(), db.WithStatuses(db.Pending)) t.Require().NoError(err) // check that the result has the correct length - t.Require().Equal(txdb.MaxResultsPerChain, len(result)) + t.Require().Equal(db.DefaultMaxResultsPerChain, len(result)) // check that the result is limited per ChainID and address for _, tx := range result { @@ -119,9 +117,9 @@ func (t *TXSubmitterDBSuite) TestGetTransactionsWithLimitPerChainID() { // make sure this returns double the number of results, 2 per tx // TODO: check nonces - result, err = testDB.GetAllTXAttemptByStatus(t.GetTestContext(), mockAccount.Address, backend.GetBigChainID(), db.Pending) + result, err = testDB.GetAllTXAttemptByStatus(t.GetTestContext(), mockAccount.Address, backend.GetBigChainID(), db.WithStatuses(db.Pending)) t.Require().NoError(err) - t.Require().Equal(txdb.MaxResultsPerChain*2, len(result)) + t.Require().Equal(db.DefaultMaxResultsPerChain*2, len(result)) } } }) @@ -239,7 +237,7 @@ func (t *TXSubmitterDBSuite) TestDeleteTXS() { // ensure txs were stored allStatuses := []db.Status{db.Pending, db.Stored, db.Replaced, db.ReplacedOrConfirmed, db.Confirmed} - txs, err := testDB.GetTXS(t.GetTestContext(), mockAccount.Address, backend.GetBigChainID(), allStatuses...) + txs, err := testDB.GetTXS(t.GetTestContext(), mockAccount.Address, backend.GetBigChainID(), db.WithStatuses(allStatuses...)) t.Require().NoError(err) t.Equal(5, len(txs)) @@ -248,7 +246,7 @@ func (t *TXSubmitterDBSuite) TestDeleteTXS() { t.Require().NoError(err) // ensure txs were deleted - txs, err = testDB.GetTXS(t.GetTestContext(), mockAccount.Address, backend.GetBigChainID(), allStatuses...) + txs, err = testDB.GetTXS(t.GetTestContext(), mockAccount.Address, backend.GetBigChainID(), db.WithStatuses(allStatuses...)) t.Require().NoError(err) t.Equal(2, len(txs)) }) diff --git a/ethergo/submitter/metrics.go b/ethergo/submitter/metrics.go index 4131ae1870..a0bd7e1d04 100644 --- a/ethergo/submitter/metrics.go +++ b/ethergo/submitter/metrics.go @@ -41,7 +41,7 @@ type otelRecorder struct { // gasBalanceGauge is the gauge for the gas balance. gasBalanceGauge metric.Float64ObservableGauge // numPendingTxes is used for metrics. - // note: numPendingTxes will stop counting at MaxResultsPerChain. + // note: numPendingTxes will stop counting at DefaultMaxResultsPerChain. numPendingTxes *hashmap.Map[uint32, int] // currentNonces is used for metrics. // chainID -> nonce diff --git a/ethergo/submitter/queue.go b/ethergo/submitter/queue.go index 2e271fe9fb..db5b02f920 100644 --- a/ethergo/submitter/queue.go +++ b/ethergo/submitter/queue.go @@ -83,7 +83,7 @@ func (t *txSubmitterImpl) processQueue(parentCtx context.Context) (err error) { defer wg.Done() // get all the pendingTxes in the queue - pendingTxes, err := t.db.GetTXS(ctx, t.signer.Address(), chainID, db.Stored, db.Pending, db.FailedSubmit, db.Submitted) + pendingTxes, err := t.db.GetTXS(ctx, t.signer.Address(), chainID, db.WithStatuses(db.Stored, db.Pending, db.FailedSubmit, db.Submitted)) if err != nil { span.AddEvent("could not get pendingTxes", trace.WithAttributes( attribute.String("error", err.Error()), attribute.Int64("chainID", chainID.Int64()), @@ -153,7 +153,7 @@ func (t *txSubmitterImpl) processConfirmedQueue(parentCtx context.Context) (err metrics.EndSpanWithErr(span, err) }() - txs, err := t.db.GetAllTXAttemptByStatus(ctx, t.signer.Address(), nil, db.ReplacedOrConfirmed) + txs, err := t.db.GetAllTXAttemptByStatus(ctx, t.signer.Address(), nil, db.WithMaxResults(1000), db.WithStatuses(db.ReplacedOrConfirmed)) if err != nil { return fmt.Errorf("could not get txs: %w", err) } diff --git a/ethergo/submitter/submitter_test.go b/ethergo/submitter/submitter_test.go index 7ea7512de6..96af1ba0ba 100644 --- a/ethergo/submitter/submitter_test.go +++ b/ethergo/submitter/submitter_test.go @@ -299,7 +299,7 @@ func (s *SubmitterSuite) TestSubmitTransaction() { // make sure the tx wasn't submitted s.Equal(ogCounter.Uint64(), currentCounter.Uint64()) - txs, err := s.store.GetTXS(s.GetTestContext(), s.signer.Address(), chainID, db.Stored) + txs, err := s.store.GetTXS(s.GetTestContext(), s.signer.Address(), chainID, db.WithStatuses(db.Stored)) s.Require().NoError(err) s.Require().NotNil(txs[0]) @@ -356,7 +356,7 @@ func (s *SubmitterSuite) TestCheckAndSetConfirmation() { err = ts.CheckAndSetConfirmation(s.GetTestContext(), chainClient, allTxes) s.Require().NoError(err) - txs, err := s.store.GetAllTXAttemptByStatus(s.GetTestContext(), s.signer.Address(), tb.GetBigChainID(), db.ReplacedOrConfirmed, db.Confirmed, db.Replaced) + txs, err := s.store.GetAllTXAttemptByStatus(s.GetTestContext(), s.signer.Address(), tb.GetBigChainID(), db.WithStatuses(db.ReplacedOrConfirmed, db.Confirmed, db.Replaced)) s.Require().NoError(err) var replacedCount int @@ -398,7 +398,7 @@ func (s *SubmitterSuite) TestCheckAndSetConfirmationSingleTx() { err = ts.CheckAndSetConfirmation(s.GetTestContext(), chainClient, allTxes) s.Require().NoError(err) - txs, err := s.store.GetAllTXAttemptByStatus(s.GetTestContext(), s.signer.Address(), tb.GetBigChainID(), db.ReplacedOrConfirmed, db.Confirmed, db.Replaced) + txs, err := s.store.GetAllTXAttemptByStatus(s.GetTestContext(), s.signer.Address(), tb.GetBigChainID(), db.WithStatuses(db.ReplacedOrConfirmed, db.Confirmed, db.Replaced)) s.Require().NoError(err) for _, tx := range txs {