-
Notifications
You must be signed in to change notification settings - Fork 124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consolidate orderbook updates into a single stream update #1634
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -113,29 +113,42 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Unmarshal each per-clob pair message to v1 updates. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
updatesByClobPairId := make(map[uint32][]ocutypes.OffChainUpdateV1) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for clobPairId, update := range updates { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
v1updates, err := GetOffchainUpdatesV1(update) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
panic(err) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Updates: v1updates, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Snapshot: snapshot, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
updatesByClobPairId[clobPairId] = v1updates | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
updatesBySubscriptionId := make(map[uint32][]clobtypes.StreamUpdate) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for id, subscription := range sm.orderbookSubscriptions { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Consolidate orderbook updates into a single `StreamUpdate`. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
v1updates := make([]ocutypes.OffChainUpdateV1, 0) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for _, clobPairId := range subscription.clobPairIds { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if update, ok := updatesByClobPairId[clobPairId]; ok { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
v1updates = append(v1updates, update...) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if len(v1updates) > 0 { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
updatesBySubscriptionId[id] = []clobtypes.StreamUpdate{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Updates: v1updates, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Snapshot: snapshot, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
BlockHeight: blockHeight, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ExecMode: uint32(execMode), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
BlockHeight: blockHeight, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ExecMode: uint32(execMode), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
sm.sendStreamUpdate( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
updatesByClobPairId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
sm.sendStreamUpdate(updatesBySubscriptionId) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// SendOrderbookFillUpdates groups fills by their clob pair ids and | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -172,14 +185,24 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||
updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
sm.sendStreamUpdate( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
updatesByClobPairId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
updatesBySubscriptionId := make(map[uint32][]clobtypes.StreamUpdate) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for id, subscription := range sm.orderbookSubscriptions { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for _, clobPairId := range subscription.clobPairIds { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if update, ok := updatesByClobPairId[clobPairId]; ok { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
updatesBySubscriptionId[id] = streamUpdatesForSubscription | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
sm.sendStreamUpdate(updatesBySubscriptionId) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+188
to
+200
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Optimize the loop for better performance. The nested loops in - for id, subscription := range sm.orderbookSubscriptions {
- streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0)
- for _, clobPairId := range subscription.clobPairIds {
- if update, ok := updatesByClobPairId[clobPairId]; ok {
- streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...)
- }
- }
- updatesBySubscriptionId[id] = streamUpdatesForSubscription
- }
+ precomputedUpdates := make(map[uint32][]clobtypes.StreamUpdate)
+ for clobPairId, update := range updatesByClobPairId {
+ precomputedUpdates[clobPairId] = update
+ }
+ for id, subscription := range sm.orderbookSubscriptions {
+ for _, clobPairId := range subscription.clobPairIds {
+ if updates, ok := precomputedUpdates[clobPairId]; ok {
+ updatesBySubscriptionId[id] = append(updatesBySubscriptionId[id], updates...)
+ }
+ }
+ } Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// sendStreamUpdate takes in a map of clob pair id to stream updates and emits them to subscribers. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
func (sm *GrpcStreamingManagerImpl) sendStreamUpdate( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
updatesByClobPairId map[uint32][]clobtypes.StreamUpdate, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
updatesBySubscriptionId map[uint32][]clobtypes.StreamUpdate, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
metrics.IncrCounter( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
metrics.GrpcEmitProtocolUpdateCount, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -192,24 +215,19 @@ func (sm *GrpcStreamingManagerImpl) sendStreamUpdate( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Send updates to subscribers. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
idsToRemove := make([]uint32, 0) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for id, subscription := range sm.orderbookSubscriptions { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for _, clobPairId := range subscription.clobPairIds { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if update, ok := updatesByClobPairId[clobPairId]; ok { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if len(streamUpdatesForSubscription) > 0 { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
metrics.IncrCounter( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
metrics.GrpcSendResponseToSubscriberCount, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
1, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if err := subscription.srv.Send( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
&clobtypes.StreamOrderbookUpdatesResponse{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Updates: streamUpdatesForSubscription, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
idsToRemove = append(idsToRemove, id) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if streamUpdatesForSubscription, ok := updatesBySubscriptionId[id]; ok { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if len(streamUpdatesForSubscription) > 0 { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
metrics.IncrCounter( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
metrics.GrpcSendResponseToSubscriberCount, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
1, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if err := subscription.srv.Send( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
&clobtypes.StreamOrderbookUpdatesResponse{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Updates: streamUpdatesForSubscription, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
idsToRemove = append(idsToRemove, id) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor the error handling in
GetOffchainUpdatesV1
.Currently, the method
GetOffchainUpdatesV1
called within the loop can panic if an error occurs. This could lead to service instability. Consider handling the error more gracefully instead of usingpanic
.Committable suggestion