-
Notifications
You must be signed in to change notification settings - Fork 115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consolidate orderbook updates into a single stream update #1634
Conversation
WalkthroughThe changes to Changes
Sequence Diagram(s) (Beta)sequenceDiagram
participant Client
participant GrpcStreamingManagerImpl
participant SubscriptionManager
Client->>GrpcStreamingManagerImpl: SendOrderbookUpdates(updatesBySubscriptionId)
GrpcStreamingManagerImpl->>SubscriptionManager: Consolidate updates by subscription ID
SubscriptionManager-->>GrpcStreamingManagerImpl: Return consolidated updates
GrpcStreamingManagerImpl->>Client: sendStreamUpdate(consolidated updates)
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Outside diff range and nitpick comments (1)
protocol/streaming/grpc/grpc_streaming_manager.go (1)
Line range hint
205-230
: Enhance error handling and resource cleanup.The method
sendStreamUpdate
should handle errors more robustly. Currently, it only appends failed IDs for removal. Consider logging the error and ensuring that all resources are properly released.- if err := subscription.srv.Send(&clobtypes.StreamOrderbookUpdatesResponse{Updates: streamUpdatesForSubscription}); err != nil { - idsToRemove = append(idsToRemove, id) - } + if err := subscription.srv.Send(&clobtypes.StreamOrderbookUpdatesResponse{Updates: streamUpdatesForSubscription}); err != nil { + sm.logger.Error("Failed to send updates", log.ErrorKey, err) + idsToRemove = append(idsToRemove, id) + continue + }
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (1)
- protocol/streaming/grpc/grpc_streaming_manager.go (3 hunks)
Additional comments not posted (1)
protocol/streaming/grpc/grpc_streaming_manager.go (1)
125-147
: Ensure thread safety when accessing shared resources.The method
SendOrderbookUpdates
modifiesupdatesBySubscriptionId
which is shared across different goroutines. This can lead to race conditions. Verify that appropriate locking mechanisms are used when accessing this map.
updatesByClobPairId := make(map[uint32][]ocutypes.OffChainUpdateV1) | ||
for clobPairId, update := range updates { | ||
v1updates, err := GetOffchainUpdatesV1(update) | ||
if err != nil { | ||
panic(err) | ||
} | ||
updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{ | ||
{ | ||
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ | ||
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ | ||
Updates: v1updates, | ||
Snapshot: snapshot, | ||
updatesByClobPairId[clobPairId] = v1updates | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor the error handling in GetOffchainUpdatesV1
.
Currently, the method GetOffchainUpdatesV1
called within the loop can panic if an error occurs. This could lead to service instability. Consider handling the error more gracefully instead of using panic
.
- if err != nil {
- panic(err)
- }
+ if err != nil {
+ sm.logger.Error("Failed to get V1 updates", log.ErrorKey, err)
+ continue
+ }
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
updatesByClobPairId := make(map[uint32][]ocutypes.OffChainUpdateV1) | |
for clobPairId, update := range updates { | |
v1updates, err := GetOffchainUpdatesV1(update) | |
if err != nil { | |
panic(err) | |
} | |
updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{ | |
{ | |
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ | |
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ | |
Updates: v1updates, | |
Snapshot: snapshot, | |
updatesByClobPairId[clobPairId] = v1updates | |
} | |
updatesByClobPairId := make(map[uint32][]ocutypes.OffChainUpdateV1) | |
for clobPairId, update := range updates { | |
v1updates, err := GetOffchainUpdatesV1(update) | |
if err != nil { | |
sm.logger.Error("Failed to get V1 updates", log.ErrorKey, err) | |
continue | |
} | |
updatesByClobPairId[clobPairId] = v1updates | |
} |
updatesBySubscriptionId := make(map[uint32][]clobtypes.StreamUpdate) | ||
for id, subscription := range sm.orderbookSubscriptions { | ||
streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0) | ||
for _, clobPairId := range subscription.clobPairIds { | ||
if update, ok := updatesByClobPairId[clobPairId]; ok { | ||
streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...) | ||
} | ||
} | ||
|
||
updatesBySubscriptionId[id] = streamUpdatesForSubscription | ||
} | ||
|
||
sm.sendStreamUpdate(updatesBySubscriptionId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimize the loop for better performance.
The nested loops in SendOrderbookFillUpdates
can be optimized by reducing the scope of the inner loop. Consider precomputing a map from clobPairId
to updates before iterating over subscriptions.
- for id, subscription := range sm.orderbookSubscriptions {
- streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0)
- for _, clobPairId := range subscription.clobPairIds {
- if update, ok := updatesByClobPairId[clobPairId]; ok {
- streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...)
- }
- }
- updatesBySubscriptionId[id] = streamUpdatesForSubscription
- }
+ precomputedUpdates := make(map[uint32][]clobtypes.StreamUpdate)
+ for clobPairId, update := range updatesByClobPairId {
+ precomputedUpdates[clobPairId] = update
+ }
+ for id, subscription := range sm.orderbookSubscriptions {
+ for _, clobPairId := range subscription.clobPairIds {
+ if updates, ok := precomputedUpdates[clobPairId]; ok {
+ updatesBySubscriptionId[id] = append(updatesBySubscriptionId[id], updates...)
+ }
+ }
+ }
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
updatesBySubscriptionId := make(map[uint32][]clobtypes.StreamUpdate) | |
for id, subscription := range sm.orderbookSubscriptions { | |
streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0) | |
for _, clobPairId := range subscription.clobPairIds { | |
if update, ok := updatesByClobPairId[clobPairId]; ok { | |
streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...) | |
} | |
} | |
updatesBySubscriptionId[id] = streamUpdatesForSubscription | |
} | |
sm.sendStreamUpdate(updatesBySubscriptionId) | |
updatesBySubscriptionId := make(map[uint32][]clobtypes.StreamUpdate) | |
precomputedUpdates := make(map[uint32][]clobtypes.StreamUpdate) | |
for clobPairId, update := range updatesByClobPairId { | |
precomputedUpdates[clobPairId] = update | |
} | |
for id, subscription := range sm.orderbookSubscriptions { | |
for _, clobPairId := range subscription.clobPairIds { | |
if updates, ok := precomputedUpdates[clobPairId]; ok { | |
updatesBySubscriptionId[id] = append(updatesBySubscriptionId[id], updates...) | |
} | |
} | |
} | |
sm.sendStreamUpdate(updatesBySubscriptionId) |
@Mergifyio backport release/protocol/v5.x |
✅ Backports have been created
|
(cherry picked from commit cd974a2) # Conflicts: # protocol/streaming/grpc/grpc_streaming_manager.go
…1634) (#1635) Co-authored-by: jayy04 <[email protected]>
Changelist
[Describe or list the changes made in this PR]
Test Plan
[Describe how this PR was tested (if applicable)]
Author/Reviewer Checklist
state-breaking
label.indexer-postgres-breaking
label.PrepareProposal
orProcessProposal
, manually add the labelproposal-breaking
.feature:[feature-name]
.backport/[branch-name]
.refactor
,chore
,bug
.Summary by CodeRabbit