-
Notifications
You must be signed in to change notification settings - Fork 32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFQ Relayer: concurrent request processing #2739
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files selected for processing (1)
- services/rfq/relayer/service/statushandler.go (4 hunks)
Files skipped from review as they are similar to previous changes (1)
- services/rfq/relayer/service/statushandler.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files selected for processing (2)
- services/rfq/relayer/service/handlers.go (11 hunks)
- services/rfq/relayer/service/statushandler.go (4 hunks)
Files skipped from review as they are similar to previous changes (2)
- services/rfq/relayer/service/handlers.go
- services/rfq/relayer/service/statushandler.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files selected for processing (5)
- services/rfq/relayer/reldb/base/model.go (5 hunks)
- services/rfq/relayer/reldb/base/quote.go (1 hunks)
- services/rfq/relayer/reldb/db.go (4 hunks)
- services/rfq/relayer/service/handlers.go (12 hunks)
- services/rfq/relayer/service/relayer.go (8 hunks)
Files skipped from review as they are similar to previous changes (1)
- services/rfq/relayer/service/handlers.go
Additional comments not posted (5)
services/rfq/relayer/reldb/db.go (3)
33-34
: The addition ofUpdateRelayNonce
to theWriter
interface is appropriate and matches its implementation.
49-50
: The addition ofGetDBStats
to theReader
interface is well-placed and allows for enhanced monitoring capabilities.
84-85
: The addition of theRelayNonce
field in theQuoteRequest
struct is appropriately implemented to support new relay functionality.services/rfq/relayer/reldb/base/model.go (2)
94-95
: The addition of theRelayNonce
field in theRequestForQuote
struct is correctly implemented and aligns with the system's new requirements.
138-138
: The mapping of theRelayNonce
field in theFromQuoteRequest
function is correctly implemented, ensuring accurate data transfer between models.
// UpdateRelayNonce todo: db test. | ||
func (s Store) UpdateRelayNonce(ctx context.Context, id [32]byte, nonce uint64) error { | ||
tx := s.DB().WithContext(ctx).Model(&RequestForQuote{}). | ||
Where(fmt.Sprintf("%s = ?", transactionIDFieldName), hexutil.Encode(id[:])). | ||
Update(relayNonceFieldName, nonce) | ||
if tx.Error != nil { | ||
return fmt.Errorf("could not update: %w", tx.Error) | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation of UpdateRelayNonce
method looks correct. Ensure to follow up on the TODO for adding database tests to verify this functionality.
Would you like me to help create the database tests or open a GitHub issue for tracking this task?
// semaphore is used to limit the number of concurrent requests | ||
semaphore *semaphore.Weighted | ||
// relayMtx is used to synchronize handling of relay requests | ||
relayMtx mapmutex.StringMapMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The addition of semaphore
and relayMtx
fields to the Relayer
struct is crucial for managing concurrency. Consider adding unit tests to ensure these components function as expected.
Would you like assistance in writing these tests or should I open a GitHub issue for tracking this task?
err := r.runDBSelector(ctx, false, reldb.Seen, reldb.CommittedPending, reldb.CommittedConfirmed, reldb.NotEnoughInventory) | ||
if err != nil { | ||
return fmt.Errorf("could not start db selector: %w", err) | ||
} | ||
} | ||
} | ||
}) | ||
|
||
g.Go(func() error { | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return nil | ||
case <-time.After(defaultPostInterval * time.Second): | ||
err := r.runDBSelector(ctx, true, reldb.RelayStarted, reldb.RelayCompleted, reldb.ProvePosted) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The modifications to the runDBSelector
method to handle multiple statuses are correctly implemented. Ensure to add comprehensive unit tests to cover these new scenarios.
Would you like me to help create these unit tests or open a GitHub issue for tracking this task?
func (r *Relayer) processDB(ctx context.Context, serial bool, matchStatuses ...reldb.QuoteRequestStatus) (err error) { | ||
ctx, span := r.metrics.Tracer().Start(ctx, "processDB", trace.WithAttributes( | ||
attribute.Bool("serial", serial), | ||
)) | ||
defer func() { | ||
r.recordDBStats(ctx, span) | ||
metrics.EndSpanWithErr(span, err) | ||
}() | ||
|
||
requests, err := r.db.GetQuoteResultsByStatus(ctx, reldb.Seen, reldb.CommittedPending, reldb.CommittedConfirmed, reldb.RelayCompleted, reldb.ProvePosted, reldb.NotEnoughInventory) | ||
requests, err := r.db.GetQuoteResultsByStatus(ctx, matchStatuses...) | ||
if err != nil { | ||
return fmt.Errorf("could not get quote results: %w", err) | ||
} | ||
|
||
wg := sync.WaitGroup{} | ||
// Obviously, these are only seen. | ||
for _, request := range requests { | ||
// if deadline < now | ||
if request.Transaction.Deadline.Cmp(big.NewInt(time.Now().Unix())) < 0 && request.Status.Int() < reldb.RelayCompleted.Int() { | ||
err = r.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.DeadlineExceeded) | ||
for _, req := range requests { | ||
//nolint: nestif | ||
if serial { | ||
// process in serial | ||
err = r.processRequest(ctx, req) | ||
if err != nil { | ||
return fmt.Errorf("could not update request status: %w", err) | ||
return fmt.Errorf("could not process request: %w", err) | ||
} | ||
} else { | ||
// process in parallel (new goroutine) | ||
request := req // capture func literal | ||
ok := r.semaphore.TryAcquire(1) | ||
if !ok { | ||
span.AddEvent("could not acquire semaphore", trace.WithAttributes( | ||
attribute.String("transaction_id", hexutil.Encode(request.TransactionID[:])), | ||
)) | ||
continue | ||
} | ||
if err != nil { | ||
return fmt.Errorf("could not acquire semaphore: %w", err) | ||
} | ||
wg.Add(1) | ||
go func() { | ||
defer r.semaphore.Release(1) | ||
defer wg.Done() | ||
err = r.processRequest(ctx, request) | ||
if err != nil { | ||
logger.Errorf("could not process request: %w", err) | ||
} | ||
}() | ||
} | ||
} | ||
|
||
qr, err := r.requestToHandler(ctx, request) | ||
if err != nil { | ||
return fmt.Errorf("could not get request to handler: %w", err) | ||
} | ||
// no-op if serial is specified | ||
wg.Wait() | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes to the processDB
method to handle requests based on the serial
flag are correctly implemented. Consider refactoring to simplify the logic and improve readability.
- if serial {
+ func processSerialRequests(ctx context.Context, requests []reldb.QuoteRequest) error {
+ for _, req := range requests {
+ err := r.processRequest(ctx, req)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+ }
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
func (r *Relayer) processDB(ctx context.Context, serial bool, matchStatuses ...reldb.QuoteRequestStatus) (err error) { | |
ctx, span := r.metrics.Tracer().Start(ctx, "processDB", trace.WithAttributes( | |
attribute.Bool("serial", serial), | |
)) | |
defer func() { | |
r.recordDBStats(ctx, span) | |
metrics.EndSpanWithErr(span, err) | |
}() | |
requests, err := r.db.GetQuoteResultsByStatus(ctx, reldb.Seen, reldb.CommittedPending, reldb.CommittedConfirmed, reldb.RelayCompleted, reldb.ProvePosted, reldb.NotEnoughInventory) | |
requests, err := r.db.GetQuoteResultsByStatus(ctx, matchStatuses...) | |
if err != nil { | |
return fmt.Errorf("could not get quote results: %w", err) | |
} | |
wg := sync.WaitGroup{} | |
// Obviously, these are only seen. | |
for _, request := range requests { | |
// if deadline < now | |
if request.Transaction.Deadline.Cmp(big.NewInt(time.Now().Unix())) < 0 && request.Status.Int() < reldb.RelayCompleted.Int() { | |
err = r.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.DeadlineExceeded) | |
for _, req := range requests { | |
//nolint: nestif | |
if serial { | |
// process in serial | |
err = r.processRequest(ctx, req) | |
if err != nil { | |
return fmt.Errorf("could not update request status: %w", err) | |
return fmt.Errorf("could not process request: %w", err) | |
} | |
} else { | |
// process in parallel (new goroutine) | |
request := req // capture func literal | |
ok := r.semaphore.TryAcquire(1) | |
if !ok { | |
span.AddEvent("could not acquire semaphore", trace.WithAttributes( | |
attribute.String("transaction_id", hexutil.Encode(request.TransactionID[:])), | |
)) | |
continue | |
} | |
if err != nil { | |
return fmt.Errorf("could not acquire semaphore: %w", err) | |
} | |
wg.Add(1) | |
go func() { | |
defer r.semaphore.Release(1) | |
defer wg.Done() | |
err = r.processRequest(ctx, request) | |
if err != nil { | |
logger.Errorf("could not process request: %w", err) | |
} | |
}() | |
} | |
} | |
qr, err := r.requestToHandler(ctx, request) | |
if err != nil { | |
return fmt.Errorf("could not get request to handler: %w", err) | |
} | |
// no-op if serial is specified | |
wg.Wait() | |
return nil | |
} | |
func (r *Relayer) processDB(ctx context.Context, serial bool, matchStatuses ...reldb.QuoteRequestStatus) (err error) { | |
ctx, span := r.metrics.Tracer().Start(ctx, "processDB", trace.WithAttributes( | |
attribute.Bool("serial", serial), | |
)) | |
defer func() { | |
r.recordDBStats(ctx, span) | |
metrics.EndSpanWithErr(span, err) | |
}() | |
requests, err := r.db.GetQuoteResultsByStatus(ctx, matchStatuses...) | |
if err != nil { | |
return fmt.Errorf("could not get quote results: %w", err) | |
} | |
if serial { | |
err = r.processSerialRequests(ctx, requests) | |
if err != nil { | |
return fmt.Errorf("could not process request: %w", err) | |
} | |
} else { | |
wg := sync.WaitGroup{} | |
for _, req := range requests { | |
request := req // capture func literal | |
ok := r.semaphore.TryAcquire(1) | |
if !ok { | |
span.AddEvent("could not acquire semaphore", trace.WithAttributes( | |
attribute.String("transaction_id", hexutil.Encode(request.TransactionID[:])), | |
)) | |
continue | |
} | |
if err != nil { | |
return fmt.Errorf("could not acquire semaphore: %w", err) | |
} | |
wg.Add(1) | |
go func() { | |
defer r.semaphore.Release(1) | |
defer wg.Done() | |
err = r.processRequest(ctx, request) | |
if err != nil { | |
logger.Errorf("could not process request: %w", err) | |
} | |
}() | |
} | |
wg.Wait() | |
} | |
return nil | |
} | |
func (r *Relayer) processSerialRequests(ctx context.Context, requests []reldb.QuoteRequest) error { | |
for _, req := range requests { | |
err := r.processRequest(ctx, req) | |
if err != nil { | |
return err | |
} | |
} | |
return nil | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files ignored due to path filters (12)
agents/go.sum
is excluded by!**/*.sum
contrib/promexporter/go.sum
is excluded by!**/*.sum
contrib/screener-api/go.sum
is excluded by!**/*.sum
core/go.sum
is excluded by!**/*.sum
ethergo/go.sum
is excluded by!**/*.sum
go.work.sum
is excluded by!**/*.sum
services/cctp-relayer/go.sum
is excluded by!**/*.sum
services/explorer/go.sum
is excluded by!**/*.sum
services/omnirpc/go.sum
is excluded by!**/*.sum
services/rfq/go.sum
is excluded by!**/*.sum
services/scribe/go.sum
is excluded by!**/*.sum
services/stiprelayer/go.sum
is excluded by!**/*.sum
Files selected for processing (11)
- agents/go.mod (1 hunks)
- contrib/promexporter/go.mod (1 hunks)
- contrib/screener-api/go.mod (1 hunks)
- core/go.mod (1 hunks)
- ethergo/go.mod (1 hunks)
- services/cctp-relayer/go.mod (1 hunks)
- services/explorer/go.mod (1 hunks)
- services/omnirpc/go.mod (1 hunks)
- services/rfq/go.mod (2 hunks)
- services/scribe/go.mod (1 hunks)
- services/stiprelayer/go.mod (1 hunks)
Files not reviewed due to errors (1)
- agents/go.mod (no review received)
Files skipped from review due to trivial changes (8)
- contrib/promexporter/go.mod
- contrib/screener-api/go.mod
- core/go.mod
- ethergo/go.mod
- services/cctp-relayer/go.mod
- services/rfq/go.mod
- services/scribe/go.mod
- services/stiprelayer/go.mod
Additional comments not posted (2)
services/omnirpc/go.mod (1)
240-241
: The updates to OpenTelemetry packages should be tested to ensure compatibility with the existing codebase.services/explorer/go.mod (1)
261-262
: Ensure that the updated OpenTelemetry packages are compatible with theexplorer
service and do not introduce any regressions.Verification successful
To verify the compatibility of the updated OpenTelemetry packages with the
explorer
service, we need to check the release notes or changelogs for the specified versions for any breaking changes or compatibility issues.
To verify the compatibility of the updated OpenTelemetry packages, we need to fetch the release notes using the correct repository format.
The review of the release notes for the updated OpenTelemetry packages did not reveal any breaking changes that would affect the
explorer
service. The changes introduced in versionv1.27.0
ofgo.opentelemetry.io/contrib/propagators/b3
and versionv0.52.0
ofgo.opentelemetry.io/contrib/instrumentation/net/http/otelhttp
primarily include new features and some minor changes that do not seem to impact the existing functionality.
- No breaking changes were identified in the release notes for the specified versions.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify compatibility of updated OpenTelemetry packages with the `explorer` service. # Test: Check for any reported issues or breaking changes in the release notes of updated packages. echo "Review release notes of OpenTelemetry packages for any breaking changes affecting the `explorer` service."Length of output: 288
Script:
#!/bin/bash # Description: Verify compatibility of updated OpenTelemetry packages with the `explorer` service. # Fetch release notes or changelogs for the specified versions of the OpenTelemetry packages. packages=( "go.opentelemetry.io/contrib/instrumentation/net/http/[email protected]" "go.opentelemetry.io/contrib/propagators/[email protected]" ) for package in "${packages[@]}"; do # Extract package name and version pkg_name=$(echo "$package" | cut -d'@' -f1) pkg_version=$(echo "$package" | cut -d'@' -f2) # Fetch the release notes or changelog for the package echo "Fetching release notes for $pkg_name version $pkg_version" gh release view "$pkg_version" --repo "$pkg_name" doneLength of output: 1480
Script:
#!/bin/bash # Description: Verify compatibility of updated OpenTelemetry packages with the `explorer` service. # Define the correct GitHub repositories for the packages declare -A repos repos=( ["go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"]="open-telemetry/opentelemetry-go-contrib" ["go.opentelemetry.io/contrib/propagators/b3"]="open-telemetry/opentelemetry-go-contrib" ) # Define the package versions packages=( "go.opentelemetry.io/contrib/instrumentation/net/http/[email protected]" "go.opentelemetry.io/contrib/propagators/[email protected]" ) for package in "${packages[@]}"; do # Extract package name and version pkg_name=$(echo "$package" | cut -d'@' -f1) pkg_version=$(echo "$package" | cut -d'@' -f2) # Fetch the repository for the package repo=${repos[$pkg_name]} # Fetch the release notes or changelog for the package echo "Fetching release notes for $pkg_name version $pkg_version from repository $repo" gh release view "$pkg_version" --repo "$repo" doneLength of output: 25350
On failures, seeing this: actions/setup-java#543 & camunda/camunda#18750 style issue: will be resolved in the future by actions/toolkit#1589 |
Description
real rfq head sans additional prs in #2719
Ancillary: should fix #2689 #2690 #2691 #2692 #2693 #2694
Summary by CodeRabbit
New Features
Bug Fixes
Performance Improvements
Updates