diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index 1d035ed6b3..4c526a200b 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -29,9 +29,15 @@ type FullNodeStreamingManagerImpl struct { ticker *time.Ticker done chan bool - // map of clob pair id to stream updates. - streamUpdateCache map[uint32][]clobtypes.StreamUpdate - numUpdatesInCache uint32 + // TODO: Consolidate the streamUpdateCache and streamUpdateSubscriptionCache into a single + // struct to avoid the need to maintain two separate slices for the same data. + + // list of stream updates. + streamUpdateCache []clobtypes.StreamUpdate + // list of subscription ids for each stream update. + streamUpdateSubscriptionCache [][]uint32 + // map from clob pair id to subscription ids. + clobPairIdToSubscriptionIdMapping map[uint32][]uint32 maxUpdatesInCache uint32 maxSubscriptionChannelSize uint32 @@ -66,10 +72,11 @@ func NewFullNodeStreamingManager( orderbookSubscriptions: make(map[uint32]*OrderbookSubscription), nextSubscriptionId: 0, - ticker: time.NewTicker(time.Duration(flushIntervalMs) * time.Millisecond), - done: make(chan bool), - streamUpdateCache: make(map[uint32][]clobtypes.StreamUpdate), - numUpdatesInCache: 0, + ticker: time.NewTicker(time.Duration(flushIntervalMs) * time.Millisecond), + done: make(chan bool), + streamUpdateCache: make([]clobtypes.StreamUpdate, 0), + streamUpdateSubscriptionCache: make([][]uint32, 0), + clobPairIdToSubscriptionIdMapping: make(map[uint32][]uint32), maxUpdatesInCache: maxUpdatesInCache, maxSubscriptionChannelSize: maxSubscriptionChannelSize, @@ -101,7 +108,7 @@ func (sm *FullNodeStreamingManagerImpl) Enabled() bool { func (sm *FullNodeStreamingManagerImpl) EmitMetrics() { metrics.SetGauge( metrics.GrpcStreamNumUpdatesBuffered, - float32(sm.numUpdatesInCache), + float32(len(sm.streamUpdateCache)), ) metrics.SetGauge( metrics.GrpcStreamSubscriberCount, @@ -134,6 +141,17 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( messageSender: messageSender, updatesChannel: make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize), } + for _, clobPairId := range clobPairIds { + // if clobPairId exists in the map, append the subscription id to the slice + // otherwise, create a new slice with the subscription id + if _, ok := sm.clobPairIdToSubscriptionIdMapping[clobPairId]; !ok { + sm.clobPairIdToSubscriptionIdMapping[clobPairId] = []uint32{} + } + sm.clobPairIdToSubscriptionIdMapping[clobPairId] = append( + sm.clobPairIdToSubscriptionIdMapping[clobPairId], + sm.nextSubscriptionId, + ) + } sm.logger.Info( fmt.Sprintf( @@ -194,6 +212,22 @@ func (sm *FullNodeStreamingManagerImpl) removeSubscription( } close(subscription.updatesChannel) delete(sm.orderbookSubscriptions, subscriptionIdToRemove) + + // Iterate over the clobPairIdToSubscriptionIdMapping to remove the subscriptionIdToRemove + for pairId, subscriptionIds := range sm.clobPairIdToSubscriptionIdMapping { + for i, id := range subscriptionIds { + if id == subscriptionIdToRemove { + // Remove the subscription ID from the slice + sm.clobPairIdToSubscriptionIdMapping[pairId] = append(subscriptionIds[:i], subscriptionIds[i+1:]...) + break + } + } + // If the list is empty after removal, delete the key from the map + if len(sm.clobPairIdToSubscriptionIdMapping[pairId]) == 0 { + delete(sm.clobPairIdToSubscriptionIdMapping, pairId) + } + } + sm.logger.Info( fmt.Sprintf("Removed streaming subscription id %+v", subscriptionIdToRemove), ) @@ -295,27 +329,28 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates( } // Unmarshal each per-clob pair message to v1 updates. - updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate) + streamUpdates := make([]clobtypes.StreamUpdate, 0) + clobPairIds := make([]uint32, 0) for clobPairId, update := range updates { v1updates, err := streaming_util.GetOffchainUpdatesV1(update) if err != nil { panic(err) } - updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{ - { - UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ - OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ - Updates: v1updates, - Snapshot: false, - }, + streamUpdate := clobtypes.StreamUpdate{ + UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ + OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ + Updates: v1updates, + Snapshot: false, }, - BlockHeight: blockHeight, - ExecMode: uint32(execMode), }, + BlockHeight: blockHeight, + ExecMode: uint32(execMode), } + streamUpdates = append(streamUpdates, streamUpdate) + clobPairIds = append(clobPairIds, clobPairId) } - sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(updates))) + sm.AddUpdatesToCache(streamUpdates, clobPairIds, uint32(len(updates))) } // SendOrderbookFillUpdates groups fills by their clob pair ids and @@ -333,7 +368,8 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates( ) // Group fills by clob pair id. - updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate) + streamUpdates := make([]clobtypes.StreamUpdate, 0) + clobPairIds := make([]uint32, 0) for _, orderbookFill := range orderbookFills { // If this is a deleveraging fill, fetch the clob pair id from the deleveraged // perpetual id. @@ -346,9 +382,6 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates( } else { clobPairId = orderbookFill.Orders[0].OrderId.ClobPairId } - if _, ok := updatesByClobPairId[clobPairId]; !ok { - updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{} - } streamUpdate := clobtypes.StreamUpdate{ UpdateMessage: &clobtypes.StreamUpdate_OrderFill{ OrderFill: &orderbookFill, @@ -356,14 +389,16 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates( BlockHeight: blockHeight, ExecMode: uint32(execMode), } - updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate) + streamUpdates = append(streamUpdates, streamUpdate) + clobPairIds = append(clobPairIds, clobPairId) } - sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(orderbookFills))) + sm.AddUpdatesToCache(streamUpdates, clobPairIds, uint32(len(orderbookFills))) } func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache( - updatesByClobPairId map[uint32][]clobtypes.StreamUpdate, + updates []clobtypes.StreamUpdate, + clobPairIds []uint32, numUpdatesToAdd uint32, ) { sm.Lock() @@ -374,20 +409,22 @@ func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache( float32(numUpdatesToAdd), ) - for clobPairId, streamUpdates := range updatesByClobPairId { - sm.streamUpdateCache[clobPairId] = append(sm.streamUpdateCache[clobPairId], streamUpdates...) + sm.streamUpdateCache = append(sm.streamUpdateCache, updates...) + for _, clobPairId := range clobPairIds { + sm.streamUpdateSubscriptionCache = append( + sm.streamUpdateSubscriptionCache, + sm.clobPairIdToSubscriptionIdMapping[clobPairId], + ) } - sm.numUpdatesInCache += numUpdatesToAdd // Remove all subscriptions and wipe the buffer if buffer overflows. - if sm.numUpdatesInCache > sm.maxUpdatesInCache { + if len(sm.streamUpdateCache) > int(sm.maxUpdatesInCache) { sm.logger.Error("Streaming buffer full capacity. Dropping messages and all subscriptions. " + "Disconnect all clients and increase buffer size via the grpc-stream-buffer-size flag.") for id := range sm.orderbookSubscriptions { sm.removeSubscription(id) } clear(sm.streamUpdateCache) - sm.numUpdatesInCache = 0 } sm.EmitMetrics() } @@ -398,8 +435,8 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdates() { sm.FlushStreamUpdatesWithLock() } -// FlushStreamUpdatesWithLock takes in a map of clob pair id to stream updates and emits them to subscribers. -// Note this method requires the lock and assumes that the lock has already been +// FlushStreamUpdatesWithLock takes in a list of stream updates and their corresponding subscription IDs, +// and emits them to subscribers. Note this method requires the lock and assumes that the lock has already been // acquired by the caller. func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() { defer metrics.ModuleMeasureSince( @@ -408,24 +445,28 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() { time.Now(), ) - // Non-blocking send updates through subscriber's buffered channel. - // If the buffer is full, drop the subscription. + // Map to collect updates for each subscription. + subscriptionUpdates := make(map[uint32][]clobtypes.StreamUpdate) idsToRemove := make([]uint32, 0) - for id, subscription := range sm.orderbookSubscriptions { - streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0) - for _, clobPairId := range subscription.clobPairIds { - if update, ok := sm.streamUpdateCache[clobPairId]; ok { - streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...) - } + + // Collect updates for each subscription. + for i, update := range sm.streamUpdateCache { + subscriptionIds := sm.streamUpdateSubscriptionCache[i] + for _, id := range subscriptionIds { + subscriptionUpdates[id] = append(subscriptionUpdates[id], update) } + } - if len(streamUpdatesForSubscription) > 0 { + // Non-blocking send updates through subscriber's buffered channel. + // If the buffer is full, drop the subscription. + for id, updates := range subscriptionUpdates { + if subscription, ok := sm.orderbookSubscriptions[id]; ok { metrics.IncrCounter( metrics.GrpcAddToSubscriptionChannelCount, 1, ) select { - case subscription.updatesChannel <- streamUpdatesForSubscription: + case subscription.updatesChannel <- updates: default: idsToRemove = append(idsToRemove, id) } @@ -433,7 +474,7 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() { } clear(sm.streamUpdateCache) - sm.numUpdatesInCache = 0 + clear(sm.streamUpdateSubscriptionCache) for _, id := range idsToRemove { sm.logger.Error(