Skip to content

Commit

Permalink
Consolidate orderbook updates into a single stream update (#1634)
Browse files Browse the repository at this point in the history
(cherry picked from commit cd974a2)

# Conflicts:
#	protocol/streaming/grpc/grpc_streaming_manager.go
  • Loading branch information
jayy04 authored and mergify[bot] committed Jun 6, 2024
1 parent 473a079 commit b964e71
Showing 1 changed file with 68 additions and 7 deletions.
75 changes: 68 additions & 7 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,20 +113,38 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
}

// Unmarshal each per-clob pair message to v1 updates.
updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate)
updatesByClobPairId := make(map[uint32][]ocutypes.OffChainUpdateV1)
for clobPairId, update := range updates {
v1updates, err := GetOffchainUpdatesV1(update)
if err != nil {
panic(err)
}
updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{
{
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Updates: v1updates,
Snapshot: snapshot,
updatesByClobPairId[clobPairId] = v1updates
}

updatesBySubscriptionId := make(map[uint32][]clobtypes.StreamUpdate)
for id, subscription := range sm.orderbookSubscriptions {
// Consolidate orderbook updates into a single `StreamUpdate`.
v1updates := make([]ocutypes.OffChainUpdateV1, 0)
for _, clobPairId := range subscription.clobPairIds {
if update, ok := updatesByClobPairId[clobPairId]; ok {
v1updates = append(v1updates, update...)
}
}

if len(v1updates) > 0 {
updatesBySubscriptionId[id] = []clobtypes.StreamUpdate{
{
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Updates: v1updates,
Snapshot: snapshot,
},
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
},
<<<<<<< HEAD
},
}
}
Expand All @@ -136,6 +154,13 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
blockHeight,
execMode,
)
=======
}
}
}

sm.sendStreamUpdate(updatesBySubscriptionId)
>>>>>>> cd974a2f (Consolidate orderbook updates into a single stream update (#1634))
}

// SendOrderbookFillUpdates groups fills by their clob pair ids and
Expand Down Expand Up @@ -170,18 +195,38 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates(
updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate)
}

<<<<<<< HEAD
sm.sendStreamUpdate(
updatesByClobPairId,
blockHeight,
execMode,
)
=======
updatesBySubscriptionId := make(map[uint32][]clobtypes.StreamUpdate)
for id, subscription := range sm.orderbookSubscriptions {
streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0)
for _, clobPairId := range subscription.clobPairIds {
if update, ok := updatesByClobPairId[clobPairId]; ok {
streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...)
}
}

updatesBySubscriptionId[id] = streamUpdatesForSubscription
}

sm.sendStreamUpdate(updatesBySubscriptionId)
>>>>>>> cd974a2f (Consolidate orderbook updates into a single stream update (#1634))
}

// sendStreamUpdate takes in a map of clob pair id to stream updates and emits them to subscribers.
func (sm *GrpcStreamingManagerImpl) sendStreamUpdate(
<<<<<<< HEAD
updatesByClobPairId map[uint32][]clobtypes.StreamUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
=======
updatesBySubscriptionId map[uint32][]clobtypes.StreamUpdate,
>>>>>>> cd974a2f (Consolidate orderbook updates into a single stream update (#1634))
) {
metrics.IncrCounter(
metrics.GrpcEmitProtocolUpdateCount,
Expand All @@ -194,6 +239,7 @@ func (sm *GrpcStreamingManagerImpl) sendStreamUpdate(
// Send updates to subscribers.
idsToRemove := make([]uint32, 0)
for id, subscription := range sm.orderbookSubscriptions {
<<<<<<< HEAD
streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0)
for _, clobPairId := range subscription.clobPairIds {
if update, ok := updatesByClobPairId[clobPairId]; ok {
Expand All @@ -214,6 +260,21 @@ func (sm *GrpcStreamingManagerImpl) sendStreamUpdate(
},
); err != nil {
idsToRemove = append(idsToRemove, id)
=======
if streamUpdatesForSubscription, ok := updatesBySubscriptionId[id]; ok {
if len(streamUpdatesForSubscription) > 0 {
metrics.IncrCounter(
metrics.GrpcSendResponseToSubscriberCount,
1,
)
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: streamUpdatesForSubscription,
},
); err != nil {
idsToRemove = append(idsToRemove, id)
}
>>>>>>> cd974a2f (Consolidate orderbook updates into a single stream update (#1634))
}
}
}
Expand Down

0 comments on commit b964e71

Please sign in to comment.