Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix pending queue #2954

Merged
merged 4 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ethergo/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,10 @@ func (c *chainListener) doPoll(parentCtx context.Context, handler HandleLog) (er

// Check if latest block is the same as start block (for chains with slow block times)
didPoll := true
defer span.SetAttributes(attribute.Bool("did_poll", didPoll))
defer func() {
span.SetAttributes(attribute.Bool("did_poll", didPoll))
}()
if c.latestBlock == c.startBlock {
//nolint:ineffassign
didPoll = false
return nil
}
Expand Down
36 changes: 18 additions & 18 deletions ethergo/submitter/db/mocks/service.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 60 additions & 2 deletions ethergo/submitter/db/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@
// PutTXS stores a tx in the database.
PutTXS(ctx context.Context, txs ...TX) error
// GetTXS gets all txs for a given address and chain id. If chain id is nil, it will get all txs for the address.
GetTXS(ctx context.Context, fromAddress common.Address, chainID *big.Int, statuses ...Status) (txs []TX, err error)
GetTXS(ctx context.Context, fromAddress common.Address, chainID *big.Int, options ...Option) (txs []TX, err error)
// MarkAllBeforeNonceReplacedOrConfirmed marks all txs for a given chain id and address before a given nonce as replaced or confirmed.
// TODO: cleaner function name
MarkAllBeforeNonceReplacedOrConfirmed(ctx context.Context, signer common.Address, chainID *big.Int, nonce uint64) error
// DBTransaction executes a transaction on the database.
// the function passed in will be passed a new service that is scoped to the transaction.
DBTransaction(ctx context.Context, f TransactionFunc) error
// GetAllTXAttemptByStatus gets all txs for a given address and chain id with a given status.
GetAllTXAttemptByStatus(ctx context.Context, fromAddress common.Address, chainID *big.Int, matchStatuses ...Status) (txs []TX, err error)
GetAllTXAttemptByStatus(ctx context.Context, fromAddress common.Address, chainID *big.Int, options ...Option) (txs []TX, err error)
dwasse marked this conversation as resolved.
Show resolved Hide resolved
// GetNonceStatus returns the nonce status for a given nonce by aggregating all attempts and finding the highest status.
GetNonceStatus(ctx context.Context, fromAddress common.Address, chainID *big.Int, nonce uint64) (status Status, err error)
// GetNonceAttemptsByStatus gets all txs for a given address and chain id with a given status and nonce.
Expand All @@ -44,6 +44,64 @@
GetDistinctChainIDs(ctx context.Context) ([]*big.Int, error)
}

// Option is a type for specifying optional parameters.
type Option func(*options)

type options struct {
statuses []Status
maxResults int
}

var _ OptionsFetcher = (*options)(nil)

// OptionsFetcher is the interface for fetching options.
type OptionsFetcher interface {
Statuses() []Status
MaxResults() int
}

func (o *options) MaxResults() int {
return o.maxResults

Check warning on line 64 in ethergo/submitter/db/service.go

View check run for this annotation

Codecov / codecov/patch

ethergo/submitter/db/service.go#L63-L64

Added lines #L63 - L64 were not covered by tests
}

func (o *options) Statuses() []Status {
return o.statuses

Check warning on line 68 in ethergo/submitter/db/service.go

View check run for this annotation

Codecov / codecov/patch

ethergo/submitter/db/service.go#L67-L68

Added lines #L67 - L68 were not covered by tests
}

// DefaultMaxResultsPerChain is the maximum number of transactions to return per chain id.
// it is exported for testing.
// TODO: this should be an option passed to the GetTXs function.
// TODO: temporarily reduced from 50 to 1 to increase resiliency.
const DefaultMaxResultsPerChain = 10

// ParseOptions parses the options.
func ParseOptions(opts ...Option) OptionsFetcher {
myOptions := &options{
statuses: nil,
maxResults: DefaultMaxResultsPerChain, // Default to 0 for no limit.
}

for _, opt := range opts {
opt(myOptions)
}

Check warning on line 86 in ethergo/submitter/db/service.go

View check run for this annotation

Codecov / codecov/patch

ethergo/submitter/db/service.go#L78-L86

Added lines #L78 - L86 were not covered by tests

return myOptions

Check warning on line 88 in ethergo/submitter/db/service.go

View check run for this annotation

Codecov / codecov/patch

ethergo/submitter/db/service.go#L88

Added line #L88 was not covered by tests
}

// WithStatuses specifies the statuses to match.
func WithStatuses(statuses ...Status) Option {
return func(opts *options) {
opts.statuses = statuses
}

Check warning on line 95 in ethergo/submitter/db/service.go

View check run for this annotation

Codecov / codecov/patch

ethergo/submitter/db/service.go#L92-L95

Added lines #L92 - L95 were not covered by tests
}

// WithMaxResults specifies the maximum number of results to return.
func WithMaxResults(maxResults int) Option {
return func(opts *options) {
opts.maxResults = maxResults
}

Check warning on line 102 in ethergo/submitter/db/service.go

View check run for this annotation

Codecov / codecov/patch

ethergo/submitter/db/service.go#L99-L102

Added lines #L99 - L102 were not covered by tests
}

// TransactionFunc is a function that can be passed to DBTransaction.
type TransactionFunc func(ctx context.Context, svc Service) error

Expand Down
22 changes: 10 additions & 12 deletions ethergo/submitter/db/txdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,6 @@
return nil
}

// MaxResultsPerChain is the maximum number of transactions to return per chain id.
// it is exported for testing.
// TODO: this should be an option passed to the GetTXs function.
// TODO: temporarily reduced from 50 to 1 to increase resiliency.
const MaxResultsPerChain = 10

func statusToArgs(matchStatuses ...db.Status) []int {
inArgs := make([]int, len(matchStatuses))
for i := range matchStatuses {
Expand Down Expand Up @@ -123,10 +117,11 @@
// GetTXS returns all transactions for a given address on a given (or any) chain id that match a given status.
// there is a limit of 50 transactions per chain id. The limit does not make any guarantees about the number of nonces per chain.
// the submitter will get only the most recent tx submitted for each chain so this can be used for gas pricing.
func (s *Store) GetTXS(ctx context.Context, fromAddress common.Address, chainID *big.Int, matchStatuses ...db.Status) (txs []db.TX, err error) {
func (s *Store) GetTXS(ctx context.Context, fromAddress common.Address, chainID *big.Int, options ...db.Option) (txs []db.TX, err error) {

Check warning on line 120 in ethergo/submitter/db/txdb/store.go

View check run for this annotation

Codecov / codecov/patch

ethergo/submitter/db/txdb/store.go#L120

Added line #L120 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder: Add tests.

The new implementation of the GetTXS function is not covered by tests.

Do you want me to generate the unit testing code or open a GitHub issue to track this task?

Tools
GitHub Check: codecov/patch

[warning] 121-121: ethergo/submitter/db/txdb/store.go#L121
Added line #L121 was not covered by tests

var dbTXs []ETHTX

inArgs := statusToArgs(matchStatuses...)
madeOptions := db.ParseOptions(options...)
inArgs := statusToArgs(madeOptions.Statuses()...)

Check warning on line 124 in ethergo/submitter/db/txdb/store.go

View check run for this annotation

Codecov / codecov/patch

ethergo/submitter/db/txdb/store.go#L123-L124

Added lines #L123 - L124 were not covered by tests

query := ETHTX{
From: fromAddress.String(),
Expand All @@ -147,7 +142,7 @@
Where(fmt.Sprintf("%s IN ?", statusFieldName), inArgs).
Group(fmt.Sprintf("%s, %s", nonceFieldName, chainIDFieldName)).
Order(fmt.Sprintf("%s asc", nonceFieldName)).
Limit(MaxResultsPerChain)
Limit(madeOptions.MaxResults())

Check warning on line 145 in ethergo/submitter/db/txdb/store.go

View check run for this annotation

Codecov / codecov/patch

ethergo/submitter/db/txdb/store.go#L145

Added line #L145 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder: Add tests.

The limit on the number of results returned by the query should be covered by tests.

Do you want me to generate the unit testing code or open a GitHub issue to track this task?

Tools
GitHub Check: codecov/patch

[warning] 146-146: ethergo/submitter/db/txdb/store.go#L146
Added line #L146 was not covered by tests


joinQuery, err := interpol.WithMap(
"INNER JOIN (?) as subquery on `{table}`.`{id}` = `subquery`.`{id}` AND `{table}`.`{chainID}` = `subquery`.`{chainID}`", map[string]string{
Expand Down Expand Up @@ -180,9 +175,12 @@
}

// GetAllTXAttemptByStatus returns all transactions for a given address on a given (or any) chain id that match a given status.
func (s *Store) GetAllTXAttemptByStatus(ctx context.Context, fromAddress common.Address, chainID *big.Int, matchStatuses ...db.Status) (txs []db.TX, err error) {
func (s *Store) GetAllTXAttemptByStatus(ctx context.Context, fromAddress common.Address, chainID *big.Int, options ...db.Option) (txs []db.TX, err error) {

Check warning on line 178 in ethergo/submitter/db/txdb/store.go

View check run for this annotation

Codecov / codecov/patch

ethergo/submitter/db/txdb/store.go#L178

Added line #L178 was not covered by tests
dwasse marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder: Add tests.

The new implementation of the GetAllTXAttemptByStatus function is not covered by tests.

Do you want me to generate the unit testing code or open a GitHub issue to track this task?

Tools
GitHub Check: codecov/patch

[warning] 179-179: ethergo/submitter/db/txdb/store.go#L179
Added line #L179 was not covered by tests

var dbTXs []ETHTX

madeOptions := db.ParseOptions(options...)
inArgs := statusToArgs(madeOptions.Statuses()...)

Check warning on line 183 in ethergo/submitter/db/txdb/store.go

View check run for this annotation

Codecov / codecov/patch

ethergo/submitter/db/txdb/store.go#L181-L183

Added lines #L181 - L183 were not covered by tests
query := ETHTX{
From: fromAddress.String(),
}
Expand All @@ -199,10 +197,10 @@
subQuery := s.DB().Model(&ETHTX{}).
Select(fmt.Sprintf("MAX(%s) as %s, %s, %s", idFieldName, idFieldName, nonceFieldName, chainIDFieldName)).
Where(query).
Where(fmt.Sprintf("%s IN ?", statusFieldName), statusToArgs(matchStatuses...)).
Where(fmt.Sprintf("%s IN ?", statusFieldName), inArgs).

Check warning on line 200 in ethergo/submitter/db/txdb/store.go

View check run for this annotation

Codecov / codecov/patch

ethergo/submitter/db/txdb/store.go#L200

Added line #L200 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder: Add tests.

The limit on the number of results returned by the query should be covered by tests.

Do you want me to generate the unit testing code or open a GitHub issue to track this task?

Tools
GitHub Check: codecov/patch

[warning] 201-201: ethergo/submitter/db/txdb/store.go#L201
Added line #L201 was not covered by tests

Group(fmt.Sprintf("%s, %s", nonceFieldName, chainIDFieldName)).
Order(fmt.Sprintf("%s asc", nonceFieldName)).
Limit(MaxResultsPerChain)
Limit(madeOptions.MaxResults())

Check warning on line 203 in ethergo/submitter/db/txdb/store.go

View check run for this annotation

Codecov / codecov/patch

ethergo/submitter/db/txdb/store.go#L203

Added line #L203 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder: Add tests.

The limit on the number of results returned by the query should be covered by tests.

Do you want me to generate the unit testing code or open a GitHub issue to track this task?

Tools
GitHub Check: codecov/patch

[warning] 204-204: ethergo/submitter/db/txdb/store.go#L204
Added line #L204 was not covered by tests


// one consequence of innerjoining on nonce is we can't cap the max results for the whole query. This is a known limitation
joinQuery, err := interpol.WithMap(
Expand Down
14 changes: 6 additions & 8 deletions ethergo/submitter/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"github.com/synapsecns/sanguine/ethergo/mocks"
"github.com/synapsecns/sanguine/ethergo/signer/nonce"
"github.com/synapsecns/sanguine/ethergo/submitter/db"
"github.com/synapsecns/sanguine/ethergo/submitter/db/txdb"

"math/big"

"github.com/synapsecns/sanguine/ethergo/util"
Expand Down Expand Up @@ -95,11 +93,11 @@ func (t *TXSubmitterDBSuite) TestGetTransactionsWithLimitPerChainID() {
}

// get the transactions with limit per ChainID
result, err := testDB.GetTXS(t.GetTestContext(), mockAccount.Address, backend.GetBigChainID(), db.Pending)
result, err := testDB.GetTXS(t.GetTestContext(), mockAccount.Address, backend.GetBigChainID(), db.WithStatuses(db.Pending))
t.Require().NoError(err)

// check that the result has the correct length
t.Require().Equal(txdb.MaxResultsPerChain, len(result))
t.Require().Equal(db.DefaultMaxResultsPerChain, len(result))

// check that the result is limited per ChainID and address
for _, tx := range result {
Expand All @@ -119,9 +117,9 @@ func (t *TXSubmitterDBSuite) TestGetTransactionsWithLimitPerChainID() {

// make sure this returns double the number of results, 2 per tx
// TODO: check nonces
result, err = testDB.GetAllTXAttemptByStatus(t.GetTestContext(), mockAccount.Address, backend.GetBigChainID(), db.Pending)
result, err = testDB.GetAllTXAttemptByStatus(t.GetTestContext(), mockAccount.Address, backend.GetBigChainID(), db.WithStatuses(db.Pending))
t.Require().NoError(err)
t.Require().Equal(txdb.MaxResultsPerChain*2, len(result))
t.Require().Equal(db.DefaultMaxResultsPerChain*2, len(result))
}
}
})
Expand Down Expand Up @@ -239,7 +237,7 @@ func (t *TXSubmitterDBSuite) TestDeleteTXS() {

// ensure txs were stored
allStatuses := []db.Status{db.Pending, db.Stored, db.Replaced, db.ReplacedOrConfirmed, db.Confirmed}
txs, err := testDB.GetTXS(t.GetTestContext(), mockAccount.Address, backend.GetBigChainID(), allStatuses...)
txs, err := testDB.GetTXS(t.GetTestContext(), mockAccount.Address, backend.GetBigChainID(), db.WithStatuses(allStatuses...))
t.Require().NoError(err)
t.Equal(5, len(txs))

Expand All @@ -248,7 +246,7 @@ func (t *TXSubmitterDBSuite) TestDeleteTXS() {
t.Require().NoError(err)

// ensure txs were deleted
txs, err = testDB.GetTXS(t.GetTestContext(), mockAccount.Address, backend.GetBigChainID(), allStatuses...)
txs, err = testDB.GetTXS(t.GetTestContext(), mockAccount.Address, backend.GetBigChainID(), db.WithStatuses(allStatuses...))
t.Require().NoError(err)
t.Equal(2, len(txs))
})
Expand Down
2 changes: 1 addition & 1 deletion ethergo/submitter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type otelRecorder struct {
// gasBalanceGauge is the gauge for the gas balance.
gasBalanceGauge metric.Float64ObservableGauge
// numPendingTxes is used for metrics.
// note: numPendingTxes will stop counting at MaxResultsPerChain.
// note: numPendingTxes will stop counting at DefaultMaxResultsPerChain.
numPendingTxes *hashmap.Map[uint32, int]
// currentNonces is used for metrics.
// chainID -> nonce
Expand Down
4 changes: 2 additions & 2 deletions ethergo/submitter/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (t *txSubmitterImpl) processQueue(parentCtx context.Context) (err error) {
defer wg.Done()

// get all the pendingTxes in the queue
pendingTxes, err := t.db.GetTXS(ctx, t.signer.Address(), chainID, db.Stored, db.Pending, db.FailedSubmit, db.Submitted)
pendingTxes, err := t.db.GetTXS(ctx, t.signer.Address(), chainID, db.WithStatuses(db.Stored, db.Pending, db.FailedSubmit, db.Submitted))
if err != nil {
span.AddEvent("could not get pendingTxes", trace.WithAttributes(
attribute.String("error", err.Error()), attribute.Int64("chainID", chainID.Int64()),
Expand Down Expand Up @@ -153,7 +153,7 @@ func (t *txSubmitterImpl) processConfirmedQueue(parentCtx context.Context) (err
metrics.EndSpanWithErr(span, err)
}()

txs, err := t.db.GetAllTXAttemptByStatus(ctx, t.signer.Address(), nil, db.ReplacedOrConfirmed)
txs, err := t.db.GetAllTXAttemptByStatus(ctx, t.signer.Address(), nil, db.WithMaxResults(100), db.WithStatuses(db.ReplacedOrConfirmed))
if err != nil {
return fmt.Errorf("could not get txs: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions ethergo/submitter/submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (s *SubmitterSuite) TestSubmitTransaction() {
// make sure the tx wasn't submitted
s.Equal(ogCounter.Uint64(), currentCounter.Uint64())

txs, err := s.store.GetTXS(s.GetTestContext(), s.signer.Address(), chainID, db.Stored)
txs, err := s.store.GetTXS(s.GetTestContext(), s.signer.Address(), chainID, db.WithStatuses(db.Stored))
s.Require().NoError(err)

s.Require().NotNil(txs[0])
Expand Down Expand Up @@ -356,7 +356,7 @@ func (s *SubmitterSuite) TestCheckAndSetConfirmation() {
err = ts.CheckAndSetConfirmation(s.GetTestContext(), chainClient, allTxes)
s.Require().NoError(err)

txs, err := s.store.GetAllTXAttemptByStatus(s.GetTestContext(), s.signer.Address(), tb.GetBigChainID(), db.ReplacedOrConfirmed, db.Confirmed, db.Replaced)
txs, err := s.store.GetAllTXAttemptByStatus(s.GetTestContext(), s.signer.Address(), tb.GetBigChainID(), db.WithStatuses(db.ReplacedOrConfirmed, db.Confirmed, db.Replaced))
s.Require().NoError(err)

var replacedCount int
Expand Down Expand Up @@ -398,7 +398,7 @@ func (s *SubmitterSuite) TestCheckAndSetConfirmationSingleTx() {
err = ts.CheckAndSetConfirmation(s.GetTestContext(), chainClient, allTxes)
s.Require().NoError(err)

txs, err := s.store.GetAllTXAttemptByStatus(s.GetTestContext(), s.signer.Address(), tb.GetBigChainID(), db.ReplacedOrConfirmed, db.Confirmed, db.Replaced)
txs, err := s.store.GetAllTXAttemptByStatus(s.GetTestContext(), s.signer.Address(), tb.GetBigChainID(), db.WithStatuses(db.ReplacedOrConfirmed, db.Confirmed, db.Replaced))
s.Require().NoError(err)

for _, tx := range txs {
Expand Down
Loading