diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index 8d0e5f9d98..fd876a1fb1 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -113,29 +113,42 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( } // Unmarshal each per-clob pair message to v1 updates. - updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate) + updatesByClobPairId := make(map[uint32][]ocutypes.OffChainUpdateV1) for clobPairId, update := range updates { v1updates, err := GetOffchainUpdatesV1(update) if err != nil { panic(err) } - updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{ - { - UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ - OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ - Updates: v1updates, - Snapshot: snapshot, + updatesByClobPairId[clobPairId] = v1updates + } + + updatesBySubscriptionId := make(map[uint32][]clobtypes.StreamUpdate) + for id, subscription := range sm.orderbookSubscriptions { + // Consolidate orderbook updates into a single `StreamUpdate`. + v1updates := make([]ocutypes.OffChainUpdateV1, 0) + for _, clobPairId := range subscription.clobPairIds { + if update, ok := updatesByClobPairId[clobPairId]; ok { + v1updates = append(v1updates, update...) + } + } + + if len(v1updates) > 0 { + updatesBySubscriptionId[id] = []clobtypes.StreamUpdate{ + { + UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ + OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ + Updates: v1updates, + Snapshot: snapshot, + }, }, + BlockHeight: blockHeight, + ExecMode: uint32(execMode), }, - BlockHeight: blockHeight, - ExecMode: uint32(execMode), - }, + } } } - sm.sendStreamUpdate( - updatesByClobPairId, - ) + sm.sendStreamUpdate(updatesBySubscriptionId) } // SendOrderbookFillUpdates groups fills by their clob pair ids and @@ -172,14 +185,24 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates( updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate) } - sm.sendStreamUpdate( - updatesByClobPairId, - ) + updatesBySubscriptionId := make(map[uint32][]clobtypes.StreamUpdate) + for id, subscription := range sm.orderbookSubscriptions { + streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0) + for _, clobPairId := range subscription.clobPairIds { + if update, ok := updatesByClobPairId[clobPairId]; ok { + streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...) + } + } + + updatesBySubscriptionId[id] = streamUpdatesForSubscription + } + + sm.sendStreamUpdate(updatesBySubscriptionId) } // sendStreamUpdate takes in a map of clob pair id to stream updates and emits them to subscribers. func (sm *GrpcStreamingManagerImpl) sendStreamUpdate( - updatesByClobPairId map[uint32][]clobtypes.StreamUpdate, + updatesBySubscriptionId map[uint32][]clobtypes.StreamUpdate, ) { metrics.IncrCounter( metrics.GrpcEmitProtocolUpdateCount, @@ -192,24 +215,19 @@ func (sm *GrpcStreamingManagerImpl) sendStreamUpdate( // Send updates to subscribers. idsToRemove := make([]uint32, 0) for id, subscription := range sm.orderbookSubscriptions { - streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0) - for _, clobPairId := range subscription.clobPairIds { - if update, ok := updatesByClobPairId[clobPairId]; ok { - streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...) - } - } - - if len(streamUpdatesForSubscription) > 0 { - metrics.IncrCounter( - metrics.GrpcSendResponseToSubscriberCount, - 1, - ) - if err := subscription.srv.Send( - &clobtypes.StreamOrderbookUpdatesResponse{ - Updates: streamUpdatesForSubscription, - }, - ); err != nil { - idsToRemove = append(idsToRemove, id) + if streamUpdatesForSubscription, ok := updatesBySubscriptionId[id]; ok { + if len(streamUpdatesForSubscription) > 0 { + metrics.IncrCounter( + metrics.GrpcSendResponseToSubscriberCount, + 1, + ) + if err := subscription.srv.Send( + &clobtypes.StreamOrderbookUpdatesResponse{ + Updates: streamUpdatesForSubscription, + }, + ); err != nil { + idsToRemove = append(idsToRemove, id) + } } } }