diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index b2f4d24aba..473b611b9c 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -350,7 +350,7 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates( clobPairIds = append(clobPairIds, clobPairId) } - sm.AddUpdatesToCache(streamUpdates, clobPairIds, uint32(len(updates))) + sm.AddUpdatesToCache(streamUpdates, clobPairIds) } // SendOrderbookFillUpdates groups fills by their clob pair ids and @@ -393,7 +393,7 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates( clobPairIds = append(clobPairIds, clobPairId) } - sm.AddUpdatesToCache(streamUpdates, clobPairIds, uint32(len(orderbookFills))) + sm.AddUpdatesToCache(streamUpdates, clobPairIds) } // SendTakerOrderStatus sends out a taker order and its status to the full node streaming service. @@ -411,30 +411,31 @@ func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus( } sm.AddUpdatesToCache( - map[uint32][]clobtypes.StreamUpdate{ - clobPairId: { - { - UpdateMessage: &clobtypes.StreamUpdate_TakerOrder{ - TakerOrder: &streamTakerOrder, - }, + []clobtypes.StreamUpdate{ + { + UpdateMessage: &clobtypes.StreamUpdate_TakerOrder{ + TakerOrder: &streamTakerOrder, }, + BlockHeight: blockHeight, + ExecMode: uint32(execMode), }, }, - 1, + []uint32{clobPairId}, ) } +// AddUpdatesToCache adds a series of updates to the full node streaming cache. +// Clob pair ids are the clob pair id each update is relevant to. func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache( updates []clobtypes.StreamUpdate, clobPairIds []uint32, - numUpdatesToAdd uint32, ) { sm.Lock() defer sm.Unlock() metrics.IncrCounter( metrics.GrpcAddUpdateToBufferCount, - float32(numUpdatesToAdd), + float32(len(updates)), ) sm.streamUpdateCache = append(sm.streamUpdateCache, updates...)