Skip to content

Commit

Permalink
fix pending queue (#2954)
Browse files Browse the repository at this point in the history
Co-authored-by: Trajan0x <[email protected]>
  • Loading branch information
trajan0x and trajan0x authored Jul 29, 2024
1 parent de65453 commit d1c13fe
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 48 deletions.
5 changes: 3 additions & 2 deletions ethergo/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,10 @@ func (c *chainListener) doPoll(parentCtx context.Context, handler HandleLog) (er

// Check if latest block is the same as start block (for chains with slow block times)
didPoll := true
defer span.SetAttributes(attribute.Bool("did_poll", didPoll))
defer func() {
span.SetAttributes(attribute.Bool("did_poll", didPoll))
}()
if c.latestBlock == c.startBlock {
//nolint:ineffassign
didPoll = false
return nil
}
Expand Down
36 changes: 18 additions & 18 deletions ethergo/submitter/db/mocks/service.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 60 additions & 2 deletions ethergo/submitter/db/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ type Service interface {
// PutTXS stores a tx in the database.
PutTXS(ctx context.Context, txs ...TX) error
// GetTXS gets all txs for a given address and chain id. If chain id is nil, it will get all txs for the address.
GetTXS(ctx context.Context, fromAddress common.Address, chainID *big.Int, statuses ...Status) (txs []TX, err error)
GetTXS(ctx context.Context, fromAddress common.Address, chainID *big.Int, options ...Option) (txs []TX, err error)
// MarkAllBeforeNonceReplacedOrConfirmed marks all txs for a given chain id and address before a given nonce as replaced or confirmed.
// TODO: cleaner function name
MarkAllBeforeNonceReplacedOrConfirmed(ctx context.Context, signer common.Address, chainID *big.Int, nonce uint64) error
// DBTransaction executes a transaction on the database.
// the function passed in will be passed a new service that is scoped to the transaction.
DBTransaction(ctx context.Context, f TransactionFunc) error
// GetAllTXAttemptByStatus gets all txs for a given address and chain id with a given status.
GetAllTXAttemptByStatus(ctx context.Context, fromAddress common.Address, chainID *big.Int, matchStatuses ...Status) (txs []TX, err error)
GetAllTXAttemptByStatus(ctx context.Context, fromAddress common.Address, chainID *big.Int, options ...Option) (txs []TX, err error)
// GetNonceStatus returns the nonce status for a given nonce by aggregating all attempts and finding the highest status.
GetNonceStatus(ctx context.Context, fromAddress common.Address, chainID *big.Int, nonce uint64) (status Status, err error)
// GetNonceAttemptsByStatus gets all txs for a given address and chain id with a given status and nonce.
Expand All @@ -44,6 +44,64 @@ type Service interface {
GetDistinctChainIDs(ctx context.Context) ([]*big.Int, error)
}

// Option is a type for specifying optional parameters.
type Option func(*options)

type options struct {
statuses []Status
maxResults int
}

var _ OptionsFetcher = (*options)(nil)

// OptionsFetcher is the interface for fetching options.
type OptionsFetcher interface {
Statuses() []Status
MaxResults() int
}

func (o *options) MaxResults() int {
return o.maxResults
}

func (o *options) Statuses() []Status {
return o.statuses
}

// DefaultMaxResultsPerChain is the maximum number of transactions to return per chain id.
// it is exported for testing.
// TODO: this should be an option passed to the GetTXs function.
// TODO: temporarily reduced from 50 to 1 to increase resiliency.
const DefaultMaxResultsPerChain = 10

// ParseOptions parses the options.
func ParseOptions(opts ...Option) OptionsFetcher {
myOptions := &options{
statuses: nil,
maxResults: DefaultMaxResultsPerChain, // Default to 0 for no limit.
}

for _, opt := range opts {
opt(myOptions)
}

return myOptions
}

// WithStatuses specifies the statuses to match.
func WithStatuses(statuses ...Status) Option {
return func(opts *options) {
opts.statuses = statuses
}
}

// WithMaxResults specifies the maximum number of results to return.
func WithMaxResults(maxResults int) Option {
return func(opts *options) {
opts.maxResults = maxResults
}
}

// TransactionFunc is a function that can be passed to DBTransaction.
type TransactionFunc func(ctx context.Context, svc Service) error

Expand Down
23 changes: 11 additions & 12 deletions ethergo/submitter/db/txdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (s *Store) MarkAllBeforeNonceReplacedOrConfirmed(ctx context.Context, signe
dbTX := s.db.WithContext(ctx).Model(&ETHTX{}).
Where(fmt.Sprintf("%s = ?", chainIDFieldName), chainID.Uint64()).
Where(fmt.Sprintf("%s < ?", nonceFieldName), nonce).
Where(fmt.Sprintf("%s < ?", statusFieldName), db.ReplacedOrConfirmed).
Where(fmt.Sprintf("`%s` = ?", fromFieldName), signer.String()).
// just in case we're updating a tx already marked as confirmed
Updates(map[string]interface{}{statusFieldName: db.ReplacedOrConfirmed.Int()})
Expand All @@ -64,12 +65,6 @@ func (s *Store) MarkAllBeforeNonceReplacedOrConfirmed(ctx context.Context, signe
return nil
}

// MaxResultsPerChain is the maximum number of transactions to return per chain id.
// it is exported for testing.
// TODO: this should be an option passed to the GetTXs function.
// TODO: temporarily reduced from 50 to 1 to increase resiliency.
const MaxResultsPerChain = 10

func statusToArgs(matchStatuses ...db.Status) []int {
inArgs := make([]int, len(matchStatuses))
for i := range matchStatuses {
Expand Down Expand Up @@ -123,10 +118,11 @@ func (s *Store) GetDistinctChainIDs(ctx context.Context) ([]*big.Int, error) {
// GetTXS returns all transactions for a given address on a given (or any) chain id that match a given status.
// there is a limit of 50 transactions per chain id. The limit does not make any guarantees about the number of nonces per chain.
// the submitter will get only the most recent tx submitted for each chain so this can be used for gas pricing.
func (s *Store) GetTXS(ctx context.Context, fromAddress common.Address, chainID *big.Int, matchStatuses ...db.Status) (txs []db.TX, err error) {
func (s *Store) GetTXS(ctx context.Context, fromAddress common.Address, chainID *big.Int, options ...db.Option) (txs []db.TX, err error) {
var dbTXs []ETHTX

inArgs := statusToArgs(matchStatuses...)
madeOptions := db.ParseOptions(options...)
inArgs := statusToArgs(madeOptions.Statuses()...)

query := ETHTX{
From: fromAddress.String(),
Expand All @@ -147,7 +143,7 @@ func (s *Store) GetTXS(ctx context.Context, fromAddress common.Address, chainID
Where(fmt.Sprintf("%s IN ?", statusFieldName), inArgs).
Group(fmt.Sprintf("%s, %s", nonceFieldName, chainIDFieldName)).
Order(fmt.Sprintf("%s asc", nonceFieldName)).
Limit(MaxResultsPerChain)
Limit(madeOptions.MaxResults())

joinQuery, err := interpol.WithMap(
"INNER JOIN (?) as subquery on `{table}`.`{id}` = `subquery`.`{id}` AND `{table}`.`{chainID}` = `subquery`.`{chainID}`", map[string]string{
Expand Down Expand Up @@ -180,9 +176,12 @@ func (s *Store) GetTXS(ctx context.Context, fromAddress common.Address, chainID
}

// GetAllTXAttemptByStatus returns all transactions for a given address on a given (or any) chain id that match a given status.
func (s *Store) GetAllTXAttemptByStatus(ctx context.Context, fromAddress common.Address, chainID *big.Int, matchStatuses ...db.Status) (txs []db.TX, err error) {
func (s *Store) GetAllTXAttemptByStatus(ctx context.Context, fromAddress common.Address, chainID *big.Int, options ...db.Option) (txs []db.TX, err error) {
var dbTXs []ETHTX

madeOptions := db.ParseOptions(options...)
inArgs := statusToArgs(madeOptions.Statuses()...)

query := ETHTX{
From: fromAddress.String(),
}
Expand All @@ -199,10 +198,10 @@ func (s *Store) GetAllTXAttemptByStatus(ctx context.Context, fromAddress common.
subQuery := s.DB().Model(&ETHTX{}).
Select(fmt.Sprintf("MAX(%s) as %s, %s, %s", idFieldName, idFieldName, nonceFieldName, chainIDFieldName)).
Where(query).
Where(fmt.Sprintf("%s IN ?", statusFieldName), statusToArgs(matchStatuses...)).
Where(fmt.Sprintf("%s IN ?", statusFieldName), inArgs).
Group(fmt.Sprintf("%s, %s", nonceFieldName, chainIDFieldName)).
Order(fmt.Sprintf("%s asc", nonceFieldName)).
Limit(MaxResultsPerChain)
Limit(madeOptions.MaxResults())

// one consequence of innerjoining on nonce is we can't cap the max results for the whole query. This is a known limitation
joinQuery, err := interpol.WithMap(
Expand Down
14 changes: 6 additions & 8 deletions ethergo/submitter/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"github.com/synapsecns/sanguine/ethergo/mocks"
"github.com/synapsecns/sanguine/ethergo/signer/nonce"
"github.com/synapsecns/sanguine/ethergo/submitter/db"
"github.com/synapsecns/sanguine/ethergo/submitter/db/txdb"

"math/big"

"github.com/synapsecns/sanguine/ethergo/util"
Expand Down Expand Up @@ -95,11 +93,11 @@ func (t *TXSubmitterDBSuite) TestGetTransactionsWithLimitPerChainID() {
}

// get the transactions with limit per ChainID
result, err := testDB.GetTXS(t.GetTestContext(), mockAccount.Address, backend.GetBigChainID(), db.Pending)
result, err := testDB.GetTXS(t.GetTestContext(), mockAccount.Address, backend.GetBigChainID(), db.WithStatuses(db.Pending))
t.Require().NoError(err)

// check that the result has the correct length
t.Require().Equal(txdb.MaxResultsPerChain, len(result))
t.Require().Equal(db.DefaultMaxResultsPerChain, len(result))

// check that the result is limited per ChainID and address
for _, tx := range result {
Expand All @@ -119,9 +117,9 @@ func (t *TXSubmitterDBSuite) TestGetTransactionsWithLimitPerChainID() {

// make sure this returns double the number of results, 2 per tx
// TODO: check nonces
result, err = testDB.GetAllTXAttemptByStatus(t.GetTestContext(), mockAccount.Address, backend.GetBigChainID(), db.Pending)
result, err = testDB.GetAllTXAttemptByStatus(t.GetTestContext(), mockAccount.Address, backend.GetBigChainID(), db.WithStatuses(db.Pending))
t.Require().NoError(err)
t.Require().Equal(txdb.MaxResultsPerChain*2, len(result))
t.Require().Equal(db.DefaultMaxResultsPerChain*2, len(result))
}
}
})
Expand Down Expand Up @@ -239,7 +237,7 @@ func (t *TXSubmitterDBSuite) TestDeleteTXS() {

// ensure txs were stored
allStatuses := []db.Status{db.Pending, db.Stored, db.Replaced, db.ReplacedOrConfirmed, db.Confirmed}
txs, err := testDB.GetTXS(t.GetTestContext(), mockAccount.Address, backend.GetBigChainID(), allStatuses...)
txs, err := testDB.GetTXS(t.GetTestContext(), mockAccount.Address, backend.GetBigChainID(), db.WithStatuses(allStatuses...))
t.Require().NoError(err)
t.Equal(5, len(txs))

Expand All @@ -248,7 +246,7 @@ func (t *TXSubmitterDBSuite) TestDeleteTXS() {
t.Require().NoError(err)

// ensure txs were deleted
txs, err = testDB.GetTXS(t.GetTestContext(), mockAccount.Address, backend.GetBigChainID(), allStatuses...)
txs, err = testDB.GetTXS(t.GetTestContext(), mockAccount.Address, backend.GetBigChainID(), db.WithStatuses(allStatuses...))
t.Require().NoError(err)
t.Equal(2, len(txs))
})
Expand Down
2 changes: 1 addition & 1 deletion ethergo/submitter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type otelRecorder struct {
// gasBalanceGauge is the gauge for the gas balance.
gasBalanceGauge metric.Float64ObservableGauge
// numPendingTxes is used for metrics.
// note: numPendingTxes will stop counting at MaxResultsPerChain.
// note: numPendingTxes will stop counting at DefaultMaxResultsPerChain.
numPendingTxes *hashmap.Map[uint32, int]
// currentNonces is used for metrics.
// chainID -> nonce
Expand Down
4 changes: 2 additions & 2 deletions ethergo/submitter/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (t *txSubmitterImpl) processQueue(parentCtx context.Context) (err error) {
defer wg.Done()

// get all the pendingTxes in the queue
pendingTxes, err := t.db.GetTXS(ctx, t.signer.Address(), chainID, db.Stored, db.Pending, db.FailedSubmit, db.Submitted)
pendingTxes, err := t.db.GetTXS(ctx, t.signer.Address(), chainID, db.WithStatuses(db.Stored, db.Pending, db.FailedSubmit, db.Submitted))
if err != nil {
span.AddEvent("could not get pendingTxes", trace.WithAttributes(
attribute.String("error", err.Error()), attribute.Int64("chainID", chainID.Int64()),
Expand Down Expand Up @@ -153,7 +153,7 @@ func (t *txSubmitterImpl) processConfirmedQueue(parentCtx context.Context) (err
metrics.EndSpanWithErr(span, err)
}()

txs, err := t.db.GetAllTXAttemptByStatus(ctx, t.signer.Address(), nil, db.ReplacedOrConfirmed)
txs, err := t.db.GetAllTXAttemptByStatus(ctx, t.signer.Address(), nil, db.WithMaxResults(1000), db.WithStatuses(db.ReplacedOrConfirmed))
if err != nil {
return fmt.Errorf("could not get txs: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions ethergo/submitter/submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (s *SubmitterSuite) TestSubmitTransaction() {
// make sure the tx wasn't submitted
s.Equal(ogCounter.Uint64(), currentCounter.Uint64())

txs, err := s.store.GetTXS(s.GetTestContext(), s.signer.Address(), chainID, db.Stored)
txs, err := s.store.GetTXS(s.GetTestContext(), s.signer.Address(), chainID, db.WithStatuses(db.Stored))
s.Require().NoError(err)

s.Require().NotNil(txs[0])
Expand Down Expand Up @@ -356,7 +356,7 @@ func (s *SubmitterSuite) TestCheckAndSetConfirmation() {
err = ts.CheckAndSetConfirmation(s.GetTestContext(), chainClient, allTxes)
s.Require().NoError(err)

txs, err := s.store.GetAllTXAttemptByStatus(s.GetTestContext(), s.signer.Address(), tb.GetBigChainID(), db.ReplacedOrConfirmed, db.Confirmed, db.Replaced)
txs, err := s.store.GetAllTXAttemptByStatus(s.GetTestContext(), s.signer.Address(), tb.GetBigChainID(), db.WithStatuses(db.ReplacedOrConfirmed, db.Confirmed, db.Replaced))
s.Require().NoError(err)

var replacedCount int
Expand Down Expand Up @@ -398,7 +398,7 @@ func (s *SubmitterSuite) TestCheckAndSetConfirmationSingleTx() {
err = ts.CheckAndSetConfirmation(s.GetTestContext(), chainClient, allTxes)
s.Require().NoError(err)

txs, err := s.store.GetAllTXAttemptByStatus(s.GetTestContext(), s.signer.Address(), tb.GetBigChainID(), db.ReplacedOrConfirmed, db.Confirmed, db.Replaced)
txs, err := s.store.GetAllTXAttemptByStatus(s.GetTestContext(), s.signer.Address(), tb.GetBigChainID(), db.WithStatuses(db.ReplacedOrConfirmed, db.Confirmed, db.Replaced))
s.Require().NoError(err)

for _, tx := range txs {
Expand Down

0 comments on commit d1c13fe

Please sign in to comment.