Skip to content

Commit

Permalink
Restructure FNS global cache to be list (#2036)
Browse files Browse the repository at this point in the history
  • Loading branch information
dydxwill authored Aug 7, 2024
1 parent 2afb412 commit 1b0f052
Showing 1 changed file with 85 additions and 44 deletions.
129 changes: 85 additions & 44 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,15 @@ type FullNodeStreamingManagerImpl struct {
ticker *time.Ticker
done chan bool

// map of clob pair id to stream updates.
streamUpdateCache map[uint32][]clobtypes.StreamUpdate
numUpdatesInCache uint32
// TODO: Consolidate the streamUpdateCache and streamUpdateSubscriptionCache into a single
// struct to avoid the need to maintain two separate slices for the same data.

// list of stream updates.
streamUpdateCache []clobtypes.StreamUpdate
// list of subscription ids for each stream update.
streamUpdateSubscriptionCache [][]uint32
// map from clob pair id to subscription ids.
clobPairIdToSubscriptionIdMapping map[uint32][]uint32

maxUpdatesInCache uint32
maxSubscriptionChannelSize uint32
Expand Down Expand Up @@ -66,10 +72,11 @@ func NewFullNodeStreamingManager(
orderbookSubscriptions: make(map[uint32]*OrderbookSubscription),
nextSubscriptionId: 0,

ticker: time.NewTicker(time.Duration(flushIntervalMs) * time.Millisecond),
done: make(chan bool),
streamUpdateCache: make(map[uint32][]clobtypes.StreamUpdate),
numUpdatesInCache: 0,
ticker: time.NewTicker(time.Duration(flushIntervalMs) * time.Millisecond),
done: make(chan bool),
streamUpdateCache: make([]clobtypes.StreamUpdate, 0),
streamUpdateSubscriptionCache: make([][]uint32, 0),
clobPairIdToSubscriptionIdMapping: make(map[uint32][]uint32),

maxUpdatesInCache: maxUpdatesInCache,
maxSubscriptionChannelSize: maxSubscriptionChannelSize,
Expand Down Expand Up @@ -101,7 +108,7 @@ func (sm *FullNodeStreamingManagerImpl) Enabled() bool {
func (sm *FullNodeStreamingManagerImpl) EmitMetrics() {
metrics.SetGauge(
metrics.GrpcStreamNumUpdatesBuffered,
float32(sm.numUpdatesInCache),
float32(len(sm.streamUpdateCache)),
)
metrics.SetGauge(
metrics.GrpcStreamSubscriberCount,
Expand Down Expand Up @@ -134,6 +141,17 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
messageSender: messageSender,
updatesChannel: make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize),
}
for _, clobPairId := range clobPairIds {
// if clobPairId exists in the map, append the subscription id to the slice
// otherwise, create a new slice with the subscription id
if _, ok := sm.clobPairIdToSubscriptionIdMapping[clobPairId]; !ok {
sm.clobPairIdToSubscriptionIdMapping[clobPairId] = []uint32{}
}
sm.clobPairIdToSubscriptionIdMapping[clobPairId] = append(
sm.clobPairIdToSubscriptionIdMapping[clobPairId],
sm.nextSubscriptionId,
)
}

sm.logger.Info(
fmt.Sprintf(
Expand Down Expand Up @@ -194,6 +212,22 @@ func (sm *FullNodeStreamingManagerImpl) removeSubscription(
}
close(subscription.updatesChannel)
delete(sm.orderbookSubscriptions, subscriptionIdToRemove)

// Iterate over the clobPairIdToSubscriptionIdMapping to remove the subscriptionIdToRemove
for pairId, subscriptionIds := range sm.clobPairIdToSubscriptionIdMapping {
for i, id := range subscriptionIds {
if id == subscriptionIdToRemove {
// Remove the subscription ID from the slice
sm.clobPairIdToSubscriptionIdMapping[pairId] = append(subscriptionIds[:i], subscriptionIds[i+1:]...)
break
}
}
// If the list is empty after removal, delete the key from the map
if len(sm.clobPairIdToSubscriptionIdMapping[pairId]) == 0 {
delete(sm.clobPairIdToSubscriptionIdMapping, pairId)
}
}

sm.logger.Info(
fmt.Sprintf("Removed streaming subscription id %+v", subscriptionIdToRemove),
)
Expand Down Expand Up @@ -295,27 +329,28 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
}

// Unmarshal each per-clob pair message to v1 updates.
updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate)
streamUpdates := make([]clobtypes.StreamUpdate, 0)
clobPairIds := make([]uint32, 0)
for clobPairId, update := range updates {
v1updates, err := streaming_util.GetOffchainUpdatesV1(update)
if err != nil {
panic(err)
}
updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{
{
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Updates: v1updates,
Snapshot: false,
},
streamUpdate := clobtypes.StreamUpdate{
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Updates: v1updates,
Snapshot: false,
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
}
streamUpdates = append(streamUpdates, streamUpdate)
clobPairIds = append(clobPairIds, clobPairId)
}

sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(updates)))
sm.AddUpdatesToCache(streamUpdates, clobPairIds, uint32(len(updates)))
}

// SendOrderbookFillUpdates groups fills by their clob pair ids and
Expand All @@ -333,7 +368,8 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
)

// Group fills by clob pair id.
updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate)
streamUpdates := make([]clobtypes.StreamUpdate, 0)
clobPairIds := make([]uint32, 0)
for _, orderbookFill := range orderbookFills {
// If this is a deleveraging fill, fetch the clob pair id from the deleveraged
// perpetual id.
Expand All @@ -346,24 +382,23 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
} else {
clobPairId = orderbookFill.Orders[0].OrderId.ClobPairId
}
if _, ok := updatesByClobPairId[clobPairId]; !ok {
updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{}
}
streamUpdate := clobtypes.StreamUpdate{
UpdateMessage: &clobtypes.StreamUpdate_OrderFill{
OrderFill: &orderbookFill,
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
}
updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate)
streamUpdates = append(streamUpdates, streamUpdate)
clobPairIds = append(clobPairIds, clobPairId)
}

sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(orderbookFills)))
sm.AddUpdatesToCache(streamUpdates, clobPairIds, uint32(len(orderbookFills)))
}

func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache(
updatesByClobPairId map[uint32][]clobtypes.StreamUpdate,
updates []clobtypes.StreamUpdate,
clobPairIds []uint32,
numUpdatesToAdd uint32,
) {
sm.Lock()
Expand All @@ -374,20 +409,22 @@ func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache(
float32(numUpdatesToAdd),
)

for clobPairId, streamUpdates := range updatesByClobPairId {
sm.streamUpdateCache[clobPairId] = append(sm.streamUpdateCache[clobPairId], streamUpdates...)
sm.streamUpdateCache = append(sm.streamUpdateCache, updates...)
for _, clobPairId := range clobPairIds {
sm.streamUpdateSubscriptionCache = append(
sm.streamUpdateSubscriptionCache,
sm.clobPairIdToSubscriptionIdMapping[clobPairId],
)
}
sm.numUpdatesInCache += numUpdatesToAdd

// Remove all subscriptions and wipe the buffer if buffer overflows.
if sm.numUpdatesInCache > sm.maxUpdatesInCache {
if len(sm.streamUpdateCache) > int(sm.maxUpdatesInCache) {
sm.logger.Error("Streaming buffer full capacity. Dropping messages and all subscriptions. " +
"Disconnect all clients and increase buffer size via the grpc-stream-buffer-size flag.")
for id := range sm.orderbookSubscriptions {
sm.removeSubscription(id)
}
clear(sm.streamUpdateCache)
sm.numUpdatesInCache = 0
}
sm.EmitMetrics()
}
Expand All @@ -398,8 +435,8 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdates() {
sm.FlushStreamUpdatesWithLock()
}

// FlushStreamUpdatesWithLock takes in a map of clob pair id to stream updates and emits them to subscribers.
// Note this method requires the lock and assumes that the lock has already been
// FlushStreamUpdatesWithLock takes in a list of stream updates and their corresponding subscription IDs,
// and emits them to subscribers. Note this method requires the lock and assumes that the lock has already been
// acquired by the caller.
func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() {
defer metrics.ModuleMeasureSince(
Expand All @@ -408,32 +445,36 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() {
time.Now(),
)

// Non-blocking send updates through subscriber's buffered channel.
// If the buffer is full, drop the subscription.
// Map to collect updates for each subscription.
subscriptionUpdates := make(map[uint32][]clobtypes.StreamUpdate)
idsToRemove := make([]uint32, 0)
for id, subscription := range sm.orderbookSubscriptions {
streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0)
for _, clobPairId := range subscription.clobPairIds {
if update, ok := sm.streamUpdateCache[clobPairId]; ok {
streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...)
}

// Collect updates for each subscription.
for i, update := range sm.streamUpdateCache {
subscriptionIds := sm.streamUpdateSubscriptionCache[i]
for _, id := range subscriptionIds {
subscriptionUpdates[id] = append(subscriptionUpdates[id], update)
}
}

if len(streamUpdatesForSubscription) > 0 {
// Non-blocking send updates through subscriber's buffered channel.
// If the buffer is full, drop the subscription.
for id, updates := range subscriptionUpdates {
if subscription, ok := sm.orderbookSubscriptions[id]; ok {
metrics.IncrCounter(
metrics.GrpcAddToSubscriptionChannelCount,
1,
)
select {
case subscription.updatesChannel <- streamUpdatesForSubscription:
case subscription.updatesChannel <- updates:
default:
idsToRemove = append(idsToRemove, id)
}
}
}

clear(sm.streamUpdateCache)
sm.numUpdatesInCache = 0
clear(sm.streamUpdateSubscriptionCache)

for _, id := range idsToRemove {
sm.logger.Error(
Expand Down

0 comments on commit 1b0f052

Please sign in to comment.