-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFQ Relayer: concurrent request processing #2739
Changes from 59 commits
6afae8a
817e66c
2dd5aa6
a500f11
2a41ada
4d38041
2c5c82b
d0a1f0c
f78f465
0ea9e94
16599cc
c02428c
d48e53a
a94fc80
6e054f3
9c86df8
49b0eca
a93fd2a
b7d7841
5635d61
381e0db
5ccebc9
f540d29
aacbb3a
6bee34f
8f7b96e
3bf969e
530f916
7b6b613
35dae54
36f2a9f
85f7d07
73423ac
5664008
df2f822
0547917
912c56f
0e7114b
f9c4613
bf4e297
3996a65
39f9701
bef4a23
bcec14f
60040da
52cf495
58b30ac
df28e70
852de63
b61cc2a
e59b3d3
5fbdfaf
e161a8d
a6a1831
5d38963
6e82d8f
aa411ea
316b9f8
85bef07
e8bd911
19921c4
51aab1c
97ada76
a686322
13f7897
e1c2bc2
0363c1c
3850c29
857375b
9b5bbf8
d20b5f4
cd33535
7c2e331
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,8 @@ | |
"errors" | ||
"fmt" | ||
|
||
"database/sql" | ||
|
||
"github.com/synapsecns/sanguine/services/rfq/relayer/reldb" | ||
"gorm.io/gorm" | ||
) | ||
|
@@ -125,3 +127,13 @@ | |
|
||
return rebalance, nil | ||
} | ||
|
||
// GetDBStats gets the database stats. | ||
func (s Store) GetDBStats(ctx context.Context) (*sql.DBStats, error) { | ||
sqlDB, err := s.DB().WithContext(ctx).DB() | ||
if err != nil { | ||
return nil, fmt.Errorf("could not get db: %w", err) | ||
} | ||
stats := sqlDB.Stats() | ||
return &stats, nil | ||
Comment on lines
+132
to
+138
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add unit tests for the new ToolsGitHub Check: codecov/patch
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ package reldb | |
|
||
import ( | ||
"context" | ||
"database/sql" | ||
"database/sql/driver" | ||
"errors" | ||
"fmt" | ||
|
@@ -43,6 +44,8 @@ type Reader interface { | |
GetPendingRebalances(ctx context.Context, chainIDs ...uint64) ([]*Rebalance, error) | ||
// GetRebalance gets a rebalance by ID. Should return ErrNoRebalanceForID if not found. | ||
GetRebalanceByID(ctx context.Context, rebalanceID string) (*Rebalance, error) | ||
// GetDBStats gets the database stats. | ||
GetDBStats(ctx context.Context) (*sql.DBStats, error) | ||
Comment on lines
+49
to
+50
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tip Codebase Verification Ensure the
Analysis chainEnsure the Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Description: Verify all implementations of `GetDBStats` in `Reader` types.
# Test: Search for the method implementation in all `Reader` types. Expect: Consistent implementation.
rg --type go $'GetDBStats' ./services/rfq/relayer/reldb/
Length of output: 424 Script: #!/bin/bash
# Description: Identify all concrete types implementing the `Reader` interface.
# Test: Search for all types implementing the `Reader` interface. Expect: Comprehensive list of implementations.
ast-grep --lang go --pattern $'type $_ struct {
$$$
}
func ($_ $_) GetDBStats($_) ($_ $_, $_) {
$$$
}' ./services/rfq/relayer/reldb/
Length of output: 634 Script: #!/bin/bash
# Description: Identify all concrete types implementing the `Reader` interface.
# Test: Search for all types implementing the `Reader` interface. Expect: Comprehensive list of implementations.
ast-grep --lang go --pattern $'type $_ struct {
$$$
}
func ($_ $_) GetDBStats($_) ($_ $_, $_) {
$$$
}' ./services/rfq/relayer/reldb/
Length of output: 634 Script: #!/bin/bash
# Description: Identify all concrete types implementing the `Reader` interface by searching for struct types.
# Test: Search for all struct types in the directory. Expect: Comprehensive list of struct types.
ast-grep --lang go --pattern 'type $_ struct {
$$$
}' ./services/rfq/relayer/reldb/
Length of output: 7754 Script: #!/bin/bash
# Description: Identify all concrete types implementing the `Reader` interface by searching for method implementations.
# Test: Search for all method implementations of `GetDBStats`. Expect: Comprehensive list of method implementations.
ast-grep --lang go --pattern 'func ($_ $_) GetDBStats($_) ($_ $_, $_) {
$$$
}' ./services/rfq/relayer/reldb/
Length of output: 112 Script: #!/bin/bash
# Description: Check if `Store` struct in `mysql` and `sqlite` packages implement `GetDBStats`.
# Test: Search for `GetDBStats` implementation in `mysql` and `sqlite`. Expect: Implementation found.
rg --type go 'GetDBStats' ./services/rfq/relayer/reldb/mysql ./services/rfq/relayer/reldb/sqlite
Length of output: 96 |
||
} | ||
|
||
// Service is the interface for the database service. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,11 +5,13 @@ | |
"errors" | ||
"fmt" | ||
"strings" | ||
"time" | ||
|
||
"github.com/ethereum/go-ethereum/accounts/abi/bind" | ||
"github.com/ethereum/go-ethereum/common/hexutil" | ||
"github.com/ethereum/go-ethereum/core/types" | ||
"github.com/synapsecns/sanguine/core/metrics" | ||
"github.com/synapsecns/sanguine/core/retry" | ||
"github.com/synapsecns/sanguine/services/rfq/api/model" | ||
"github.com/synapsecns/sanguine/services/rfq/contracts/fastbridge" | ||
"github.com/synapsecns/sanguine/services/rfq/relayer/inventory" | ||
|
@@ -18,6 +20,8 @@ | |
"go.opentelemetry.io/otel/trace" | ||
) | ||
|
||
var maxRPCRetryTime = 15 * time.Second | ||
|
||
// handleBridgeRequestedLog handles the BridgeRequestedLog event. | ||
// Step 1: Seen | ||
// | ||
|
@@ -33,14 +37,26 @@ | |
metrics.EndSpanWithErr(span, err) | ||
}() | ||
|
||
// TODO: consider a mapmutex | ||
unlocker, ok := r.relayMtx.TryLock(hexutil.Encode(req.TransactionId[:])) | ||
if !ok { | ||
span.SetAttributes(attribute.Bool("locked", true)) | ||
// already processing this request | ||
return nil | ||
} | ||
|
||
defer unlocker.Unlock() | ||
|
||
_, err = r.db.GetQuoteRequestByID(ctx, req.TransactionId) | ||
// expect no results | ||
if !errors.Is(err, reldb.ErrNoQuoteForID) { | ||
// maybe a db err? if so error out & try again later | ||
if err != nil { | ||
return fmt.Errorf("could not call db: %w", err) | ||
} | ||
|
||
span.AddEvent("already known") | ||
// already seen this request | ||
return nil | ||
} | ||
|
||
// TODO: these should be premade | ||
|
@@ -54,9 +70,17 @@ | |
return fmt.Errorf("could not get correct fast bridge: %w", err) | ||
} | ||
|
||
bridgeTx, err := fastBridge.GetBridgeTransaction(&bind.CallOpts{Context: ctx}, req.Request) | ||
var bridgeTx fastbridge.IFastBridgeBridgeTransaction | ||
call := func(ctx context.Context) error { | ||
bridgeTx, err = fastBridge.GetBridgeTransaction(&bind.CallOpts{Context: ctx}, req.Request) | ||
if err != nil { | ||
return fmt.Errorf("could not get bridge transaction: %w", err) | ||
} | ||
return nil | ||
} | ||
err = retry.WithBackoff(ctx, call, retry.WithMaxTotalTime(maxRPCRetryTime)) | ||
if err != nil { | ||
return fmt.Errorf("could not get bridge transaction: %w", err) | ||
return fmt.Errorf("could not make call: %w", err) | ||
Comment on lines
+73
to
+83
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The retry logic for fetching the bridge transaction is crucial. However, this block of code lacks unit tests, which is essential to ensure its reliability under various network conditions. Would you like help in creating these test cases? ToolsGitHub Check: codecov/patch
|
||
} | ||
|
||
// TODO: you can just pull these out of inventory. If they don't exist mark as invalid. | ||
|
@@ -71,7 +95,7 @@ | |
return fmt.Errorf("could not get decimals: %w", err) | ||
} | ||
|
||
err = r.db.StoreQuoteRequest(ctx, reldb.QuoteRequest{ | ||
dbReq := reldb.QuoteRequest{ | ||
BlockNumber: req.Raw.BlockNumber, | ||
RawRequest: req.Request, | ||
OriginTokenDecimals: *originDecimals, | ||
|
@@ -81,11 +105,25 @@ | |
Transaction: bridgeTx, | ||
Status: reldb.Seen, | ||
OriginTxHash: req.Raw.TxHash, | ||
}) | ||
} | ||
err = r.db.StoreQuoteRequest(ctx, dbReq) | ||
if err != nil { | ||
return fmt.Errorf("could not get db: %w", err) | ||
} | ||
|
||
// immediately forward the request to handleSeen | ||
span.AddEvent("sending to handleSeen") | ||
qr, err := r.requestToHandler(ctx, dbReq) | ||
if err != nil { | ||
return fmt.Errorf("could not get quote request handler: %w", err) | ||
} | ||
// Forward instead of lock since we called lock above. | ||
fwdErr := qr.Forward(ctx, dbReq) | ||
if fwdErr != nil { | ||
logger.Errorf("could not forward to handle seen: %w", fwdErr) | ||
span.AddEvent("could not forward to handle seen") | ||
} | ||
|
||
return nil | ||
} | ||
|
||
|
@@ -167,10 +205,20 @@ | |
return nil | ||
} | ||
|
||
request.Status = reldb.CommittedPending | ||
err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.CommittedPending) | ||
if err != nil { | ||
return fmt.Errorf("could not update request status: %w", err) | ||
} | ||
|
||
// immediately forward the request to handleCommitPending | ||
span.AddEvent("forwarding to handleCommitPending") | ||
fwdErr := q.Forward(ctx, request) | ||
if fwdErr != nil { | ||
logger.Errorf("could not forward to handle commit pending: %w", fwdErr) | ||
span.AddEvent("could not forward to handle commit pending") | ||
} | ||
|
||
Comment on lines
+208
to
+221
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logic to update the quote request status and forward it to Would you like help in creating these test cases? ToolsGitHub Check: codecov/patch
|
||
return nil | ||
} | ||
|
||
|
@@ -201,20 +249,40 @@ | |
return nil | ||
} | ||
|
||
bs, err := q.Origin.Bridge.BridgeStatuses(&bind.CallOpts{Context: ctx}, request.TransactionID) | ||
var bs uint8 | ||
call := func(ctx context.Context) error { | ||
bs, err = q.Origin.Bridge.BridgeStatuses(&bind.CallOpts{Context: ctx}, request.TransactionID) | ||
if err != nil { | ||
return fmt.Errorf("could not get bridge status: %w", err) | ||
} | ||
return nil | ||
} | ||
err = retry.WithBackoff(ctx, call, retry.WithMaxTotalTime(maxRPCRetryTime)) | ||
if err != nil { | ||
return fmt.Errorf("could not get bridge status: %w", err) | ||
return fmt.Errorf("could not make contract call: %w", err) | ||
} | ||
|
||
span.AddEvent("status_check", trace.WithAttributes(attribute.String("chain_bridge_status", fastbridge.BridgeStatus(bs).String()))) | ||
|
||
// sanity check to make sure it's still requested. | ||
if bs == fastbridge.REQUESTED.Int() { | ||
err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.CommittedConfirmed) | ||
if err != nil { | ||
return fmt.Errorf("could not update request status: %w", err) | ||
} | ||
if bs != fastbridge.REQUESTED.Int() { | ||
return nil | ||
} | ||
|
||
request.Status = reldb.CommittedConfirmed | ||
err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.CommittedConfirmed) | ||
if err != nil { | ||
return fmt.Errorf("could not update request status: %w", err) | ||
} | ||
|
||
// immediately forward to handleCommitConfirmed | ||
span.AddEvent("forwarding to handleCommitConfirmed") | ||
fwdErr := q.Forward(ctx, request) | ||
if fwdErr != nil { | ||
logger.Errorf("could not forward to handle commit confirmed: %w", fwdErr) | ||
span.AddEvent("could not forward to handle commit confirmed") | ||
} | ||
|
||
return nil | ||
} | ||
|
||
|
@@ -253,7 +321,9 @@ | |
return fmt.Errorf("could not get quote request: %w", err) | ||
} | ||
// we might've accidentally gotten this later, if so we'll just ignore it | ||
if reqID.Status != reldb.RelayStarted { | ||
// note that in the edge case where we pessimistically marked as DeadlineExceeded | ||
// and the relay was actually succesful, we should continue the proving process | ||
if reqID.Status != reldb.RelayStarted && reqID.Status != reldb.DeadlineExceeded { | ||
logger.Warnf("got relay log for request that was not relay started (transaction id: %s, txhash: %s)", hexutil.Encode(reqID.TransactionID[:]), req.Raw.TxHash) | ||
return nil | ||
} | ||
|
@@ -321,9 +391,17 @@ | |
|
||
// make sure relayer hasn't already proved. This is neeeded in case of an abrupt halt in event sourcing | ||
// note: this assumes caller has already checked the sender is the relayer. | ||
bs, err := q.Origin.Bridge.BridgeStatuses(&bind.CallOpts{Context: ctx}, request.TransactionID) | ||
var bs uint8 | ||
call := func(ctx context.Context) error { | ||
bs, err = q.Origin.Bridge.BridgeStatuses(&bind.CallOpts{Context: ctx}, request.TransactionID) | ||
if err != nil { | ||
return fmt.Errorf("could not get bridge status: %w", err) | ||
} | ||
return nil | ||
} | ||
err = retry.WithBackoff(ctx, call, retry.WithMaxTotalTime(maxRPCRetryTime)) | ||
if err != nil { | ||
return fmt.Errorf("could not get bridge status: %w", err) | ||
return fmt.Errorf("could not make contract call: %w", err) | ||
} | ||
|
||
if bs == fastbridge.RelayerClaimed.Int() { | ||
|
@@ -334,9 +412,17 @@ | |
return nil | ||
} | ||
|
||
canClaim, err := q.Origin.Bridge.CanClaim(&bind.CallOpts{Context: ctx}, request.TransactionID, q.RelayerAddress) | ||
var canClaim bool | ||
claimCall := func(ctx context.Context) error { | ||
canClaim, err = q.Origin.Bridge.CanClaim(&bind.CallOpts{Context: ctx}, request.TransactionID, q.RelayerAddress) | ||
if err != nil { | ||
return fmt.Errorf("could not check if can claim: %w", err) | ||
} | ||
return nil | ||
} | ||
err = retry.WithBackoff(ctx, claimCall, retry.WithMaxTotalTime(maxRPCRetryTime)) | ||
if err != nil { | ||
return fmt.Errorf("could not check if can claim: %w", err) | ||
return fmt.Errorf("could not make call: %w", err) | ||
} | ||
|
||
// can't claim yet. we'll check again later | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor the condition for
shouldRelay
to improve readability and maintainability.This separates the condition into clearer, more manageable parts, making the code easier to understand and maintain.
Committable suggestion