Skip to content

Commit

Permalink
address cmts
Browse files Browse the repository at this point in the history
  • Loading branch information
dydxwill committed Aug 6, 2024
1 parent e6fcbf5 commit 8d333b8
Showing 1 changed file with 26 additions and 14 deletions.
40 changes: 26 additions & 14 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ type FullNodeStreamingManagerImpl struct {
// list of stream updates.
streamUpdateCache []clobtypes.StreamUpdate
// list of subscription ids for each stream update.
streamUpdateSubscriptionCache [][]uint32
streamUpdateSubscriptionCache [][]uint32
// map from clob pair id to subscription ids.
clobPairIdToSubscriptionIdMapping map[uint32][]uint32
numUpdatesInCache uint32

maxUpdatesInCache uint32
maxSubscriptionChannelSize uint32
Expand Down Expand Up @@ -74,7 +74,6 @@ func NewFullNodeStreamingManager(
streamUpdateCache: make([]clobtypes.StreamUpdate, 0),
streamUpdateSubscriptionCache: make([][]uint32, 0),
clobPairIdToSubscriptionIdMapping: make(map[uint32][]uint32),
numUpdatesInCache: 0,

maxUpdatesInCache: maxUpdatesInCache,
maxSubscriptionChannelSize: maxSubscriptionChannelSize,
Expand Down Expand Up @@ -106,7 +105,7 @@ func (sm *FullNodeStreamingManagerImpl) Enabled() bool {
func (sm *FullNodeStreamingManagerImpl) EmitMetrics() {
metrics.SetGauge(
metrics.GrpcStreamNumUpdatesBuffered,
float32(sm.numUpdatesInCache),
float32(len(sm.streamUpdateCache)),
)
metrics.SetGauge(
metrics.GrpcStreamSubscriberCount,
Expand Down Expand Up @@ -210,6 +209,22 @@ func (sm *FullNodeStreamingManagerImpl) removeSubscription(
}
close(subscription.updatesChannel)
delete(sm.orderbookSubscriptions, subscriptionIdToRemove)

// Iterate over the clobPairIdToSubscriptionIdMapping to remove the subscriptionIdToRemove
for pairId, subscriptionIds := range sm.clobPairIdToSubscriptionIdMapping {
for i, id := range subscriptionIds {
if id == subscriptionIdToRemove {
// Remove the subscription ID from the slice
sm.clobPairIdToSubscriptionIdMapping[pairId] = append(subscriptionIds[:i], subscriptionIds[i+1:]...)
break
}
}
// If the list is empty after removal, delete the key from the map
if len(sm.clobPairIdToSubscriptionIdMapping[pairId]) == 0 {
delete(sm.clobPairIdToSubscriptionIdMapping, pairId)
}
}

sm.logger.Info(
fmt.Sprintf("Removed streaming subscription id %+v", subscriptionIdToRemove),
)
Expand Down Expand Up @@ -311,7 +326,7 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
}

// Unmarshal each per-clob pair message to v1 updates.
updatesByClobPairId := make([]clobtypes.StreamUpdate, 0)
streamUpdates := make([]clobtypes.StreamUpdate, 0)
clobPairIds := make([]uint32, 0)
for clobPairId, update := range updates {
v1updates, err := streaming_util.GetOffchainUpdatesV1(update)
Expand All @@ -328,11 +343,11 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
}
updatesByClobPairId = append(updatesByClobPairId, streamUpdate)
streamUpdates = append(streamUpdates, streamUpdate)
clobPairIds = append(clobPairIds, clobPairId)
}

sm.AddUpdatesToCache(updatesByClobPairId, clobPairIds, uint32(len(updates)))
sm.AddUpdatesToCache(streamUpdates, clobPairIds, uint32(len(updates)))
}

// SendOrderbookFillUpdates groups fills by their clob pair ids and
Expand All @@ -350,7 +365,7 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
)

// Group fills by clob pair id.
updatesByClobPairId := make([]clobtypes.StreamUpdate, 0)
streamUpdates := make([]clobtypes.StreamUpdate, 0)
clobPairIds := make([]uint32, 0)
for _, orderbookFill := range orderbookFills {
// If this is a deleveraging fill, fetch the clob pair id from the deleveraged
Expand All @@ -371,11 +386,11 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
}
updatesByClobPairId = append(updatesByClobPairId, streamUpdate)
streamUpdates = append(streamUpdates, streamUpdate)
clobPairIds = append(clobPairIds, clobPairId)
}

sm.AddUpdatesToCache(updatesByClobPairId, clobPairIds, uint32(len(orderbookFills)))
sm.AddUpdatesToCache(streamUpdates, clobPairIds, uint32(len(orderbookFills)))
}

func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache(
Expand All @@ -398,17 +413,15 @@ func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache(
sm.clobPairIdToSubscriptionIdMapping[clobPairId],
)
}
sm.numUpdatesInCache += numUpdatesToAdd

// Remove all subscriptions and wipe the buffer if buffer overflows.
if sm.numUpdatesInCache > sm.maxUpdatesInCache {
if len(sm.streamUpdateCache) > int(sm.maxUpdatesInCache) {
sm.logger.Error("Streaming buffer full capacity. Dropping messages and all subscriptions. " +
"Disconnect all clients and increase buffer size via the grpc-stream-buffer-size flag.")
for id := range sm.orderbookSubscriptions {
sm.removeSubscription(id)
}
clear(sm.streamUpdateCache)
sm.numUpdatesInCache = 0
}
sm.EmitMetrics()
}
Expand Down Expand Up @@ -459,7 +472,6 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() {

clear(sm.streamUpdateCache)
clear(sm.streamUpdateSubscriptionCache)
sm.numUpdatesInCache = 0

for _, id := range idsToRemove {
sm.logger.Error(
Expand Down

0 comments on commit 8d333b8

Please sign in to comment.