-
Notifications
You must be signed in to change notification settings - Fork 115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Restructure FNS global cache to be list #2036
Changes from all commits
3c7a63b
af79f20
96cb022
57e54fa
ce15619
e6fcbf5
8d333b8
40a26d5
3e2325f
c238336
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,9 +29,15 @@ type FullNodeStreamingManagerImpl struct { | |
ticker *time.Ticker | ||
done chan bool | ||
|
||
// map of clob pair id to stream updates. | ||
streamUpdateCache map[uint32][]clobtypes.StreamUpdate | ||
numUpdatesInCache uint32 | ||
// TODO: Consolidate the streamUpdateCache and streamUpdateSubscriptionCache into a single | ||
// struct to avoid the need to maintain two separate slices for the same data. | ||
|
||
// list of stream updates. | ||
streamUpdateCache []clobtypes.StreamUpdate | ||
// list of subscription ids for each stream update. | ||
streamUpdateSubscriptionCache [][]uint32 | ||
// map from clob pair id to subscription ids. | ||
clobPairIdToSubscriptionIdMapping map[uint32][]uint32 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. comment for these 2 new fields? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
|
||
maxUpdatesInCache uint32 | ||
maxSubscriptionChannelSize uint32 | ||
|
@@ -66,10 +72,11 @@ func NewFullNodeStreamingManager( | |
orderbookSubscriptions: make(map[uint32]*OrderbookSubscription), | ||
nextSubscriptionId: 0, | ||
|
||
ticker: time.NewTicker(time.Duration(flushIntervalMs) * time.Millisecond), | ||
done: make(chan bool), | ||
streamUpdateCache: make(map[uint32][]clobtypes.StreamUpdate), | ||
numUpdatesInCache: 0, | ||
ticker: time.NewTicker(time.Duration(flushIntervalMs) * time.Millisecond), | ||
done: make(chan bool), | ||
streamUpdateCache: make([]clobtypes.StreamUpdate, 0), | ||
streamUpdateSubscriptionCache: make([][]uint32, 0), | ||
clobPairIdToSubscriptionIdMapping: make(map[uint32][]uint32), | ||
|
||
maxUpdatesInCache: maxUpdatesInCache, | ||
maxSubscriptionChannelSize: maxSubscriptionChannelSize, | ||
|
@@ -101,7 +108,7 @@ func (sm *FullNodeStreamingManagerImpl) Enabled() bool { | |
func (sm *FullNodeStreamingManagerImpl) EmitMetrics() { | ||
metrics.SetGauge( | ||
metrics.GrpcStreamNumUpdatesBuffered, | ||
float32(sm.numUpdatesInCache), | ||
float32(len(sm.streamUpdateCache)), | ||
) | ||
metrics.SetGauge( | ||
metrics.GrpcStreamSubscriberCount, | ||
|
@@ -134,6 +141,17 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( | |
messageSender: messageSender, | ||
updatesChannel: make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize), | ||
} | ||
for _, clobPairId := range clobPairIds { | ||
// if clobPairId exists in the map, append the subscription id to the slice | ||
// otherwise, create a new slice with the subscription id | ||
if _, ok := sm.clobPairIdToSubscriptionIdMapping[clobPairId]; !ok { | ||
sm.clobPairIdToSubscriptionIdMapping[clobPairId] = []uint32{} | ||
} | ||
sm.clobPairIdToSubscriptionIdMapping[clobPairId] = append( | ||
sm.clobPairIdToSubscriptionIdMapping[clobPairId], | ||
sm.nextSubscriptionId, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we evict stuff from this map when we prune order subscriptions from the fns manager? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done, thanks for the catch! |
||
) | ||
} | ||
|
||
sm.logger.Info( | ||
fmt.Sprintf( | ||
|
@@ -194,6 +212,22 @@ func (sm *FullNodeStreamingManagerImpl) removeSubscription( | |
} | ||
close(subscription.updatesChannel) | ||
delete(sm.orderbookSubscriptions, subscriptionIdToRemove) | ||
|
||
// Iterate over the clobPairIdToSubscriptionIdMapping to remove the subscriptionIdToRemove | ||
for pairId, subscriptionIds := range sm.clobPairIdToSubscriptionIdMapping { | ||
for i, id := range subscriptionIds { | ||
if id == subscriptionIdToRemove { | ||
// Remove the subscription ID from the slice | ||
sm.clobPairIdToSubscriptionIdMapping[pairId] = append(subscriptionIds[:i], subscriptionIds[i+1:]...) | ||
break | ||
} | ||
} | ||
// If the list is empty after removal, delete the key from the map | ||
if len(sm.clobPairIdToSubscriptionIdMapping[pairId]) == 0 { | ||
delete(sm.clobPairIdToSubscriptionIdMapping, pairId) | ||
} | ||
} | ||
|
||
sm.logger.Info( | ||
fmt.Sprintf("Removed streaming subscription id %+v", subscriptionIdToRemove), | ||
) | ||
|
@@ -295,27 +329,28 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates( | |
} | ||
|
||
// Unmarshal each per-clob pair message to v1 updates. | ||
updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate) | ||
streamUpdates := make([]clobtypes.StreamUpdate, 0) | ||
clobPairIds := make([]uint32, 0) | ||
for clobPairId, update := range updates { | ||
v1updates, err := streaming_util.GetOffchainUpdatesV1(update) | ||
if err != nil { | ||
panic(err) | ||
} | ||
updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{ | ||
{ | ||
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ | ||
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ | ||
Updates: v1updates, | ||
Snapshot: false, | ||
}, | ||
streamUpdate := clobtypes.StreamUpdate{ | ||
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ | ||
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ | ||
Updates: v1updates, | ||
Snapshot: false, | ||
}, | ||
BlockHeight: blockHeight, | ||
ExecMode: uint32(execMode), | ||
}, | ||
BlockHeight: blockHeight, | ||
ExecMode: uint32(execMode), | ||
} | ||
streamUpdates = append(streamUpdates, streamUpdate) | ||
clobPairIds = append(clobPairIds, clobPairId) | ||
} | ||
|
||
sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(updates))) | ||
sm.AddUpdatesToCache(streamUpdates, clobPairIds, uint32(len(updates))) | ||
} | ||
|
||
// SendOrderbookFillUpdates groups fills by their clob pair ids and | ||
|
@@ -333,7 +368,8 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates( | |
) | ||
|
||
// Group fills by clob pair id. | ||
updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate) | ||
streamUpdates := make([]clobtypes.StreamUpdate, 0) | ||
clobPairIds := make([]uint32, 0) | ||
for _, orderbookFill := range orderbookFills { | ||
// If this is a deleveraging fill, fetch the clob pair id from the deleveraged | ||
// perpetual id. | ||
|
@@ -346,24 +382,23 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates( | |
} else { | ||
clobPairId = orderbookFill.Orders[0].OrderId.ClobPairId | ||
} | ||
if _, ok := updatesByClobPairId[clobPairId]; !ok { | ||
updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{} | ||
} | ||
streamUpdate := clobtypes.StreamUpdate{ | ||
UpdateMessage: &clobtypes.StreamUpdate_OrderFill{ | ||
OrderFill: &orderbookFill, | ||
}, | ||
BlockHeight: blockHeight, | ||
ExecMode: uint32(execMode), | ||
} | ||
updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate) | ||
streamUpdates = append(streamUpdates, streamUpdate) | ||
clobPairIds = append(clobPairIds, clobPairId) | ||
} | ||
|
||
sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(orderbookFills))) | ||
sm.AddUpdatesToCache(streamUpdates, clobPairIds, uint32(len(orderbookFills))) | ||
} | ||
|
||
func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache( | ||
updatesByClobPairId map[uint32][]clobtypes.StreamUpdate, | ||
updates []clobtypes.StreamUpdate, | ||
clobPairIds []uint32, | ||
numUpdatesToAdd uint32, | ||
) { | ||
sm.Lock() | ||
|
@@ -374,20 +409,22 @@ func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache( | |
float32(numUpdatesToAdd), | ||
) | ||
|
||
for clobPairId, streamUpdates := range updatesByClobPairId { | ||
sm.streamUpdateCache[clobPairId] = append(sm.streamUpdateCache[clobPairId], streamUpdates...) | ||
sm.streamUpdateCache = append(sm.streamUpdateCache, updates...) | ||
for _, clobPairId := range clobPairIds { | ||
sm.streamUpdateSubscriptionCache = append( | ||
sm.streamUpdateSubscriptionCache, | ||
sm.clobPairIdToSubscriptionIdMapping[clobPairId], | ||
) | ||
} | ||
sm.numUpdatesInCache += numUpdatesToAdd | ||
|
||
// Remove all subscriptions and wipe the buffer if buffer overflows. | ||
if sm.numUpdatesInCache > sm.maxUpdatesInCache { | ||
if len(sm.streamUpdateCache) > int(sm.maxUpdatesInCache) { | ||
sm.logger.Error("Streaming buffer full capacity. Dropping messages and all subscriptions. " + | ||
"Disconnect all clients and increase buffer size via the grpc-stream-buffer-size flag.") | ||
for id := range sm.orderbookSubscriptions { | ||
sm.removeSubscription(id) | ||
} | ||
clear(sm.streamUpdateCache) | ||
sm.numUpdatesInCache = 0 | ||
} | ||
sm.EmitMetrics() | ||
} | ||
|
@@ -398,8 +435,8 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdates() { | |
sm.FlushStreamUpdatesWithLock() | ||
} | ||
|
||
// FlushStreamUpdatesWithLock takes in a map of clob pair id to stream updates and emits them to subscribers. | ||
// Note this method requires the lock and assumes that the lock has already been | ||
// FlushStreamUpdatesWithLock takes in a list of stream updates and their corresponding subscription IDs, | ||
// and emits them to subscribers. Note this method requires the lock and assumes that the lock has already been | ||
// acquired by the caller. | ||
func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() { | ||
defer metrics.ModuleMeasureSince( | ||
|
@@ -408,32 +445,36 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() { | |
time.Now(), | ||
) | ||
|
||
// Non-blocking send updates through subscriber's buffered channel. | ||
// If the buffer is full, drop the subscription. | ||
// Map to collect updates for each subscription. | ||
subscriptionUpdates := make(map[uint32][]clobtypes.StreamUpdate) | ||
idsToRemove := make([]uint32, 0) | ||
for id, subscription := range sm.orderbookSubscriptions { | ||
streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0) | ||
for _, clobPairId := range subscription.clobPairIds { | ||
if update, ok := sm.streamUpdateCache[clobPairId]; ok { | ||
streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...) | ||
} | ||
|
||
// Collect updates for each subscription. | ||
for i, update := range sm.streamUpdateCache { | ||
subscriptionIds := sm.streamUpdateSubscriptionCache[i] | ||
for _, id := range subscriptionIds { | ||
subscriptionUpdates[id] = append(subscriptionUpdates[id], update) | ||
} | ||
} | ||
|
||
if len(streamUpdatesForSubscription) > 0 { | ||
// Non-blocking send updates through subscriber's buffered channel. | ||
// If the buffer is full, drop the subscription. | ||
for id, updates := range subscriptionUpdates { | ||
if subscription, ok := sm.orderbookSubscriptions[id]; ok { | ||
metrics.IncrCounter( | ||
metrics.GrpcAddToSubscriptionChannelCount, | ||
1, | ||
) | ||
select { | ||
case subscription.updatesChannel <- streamUpdatesForSubscription: | ||
case subscription.updatesChannel <- updates: | ||
default: | ||
idsToRemove = append(idsToRemove, id) | ||
} | ||
} | ||
} | ||
|
||
clear(sm.streamUpdateCache) | ||
sm.numUpdatesInCache = 0 | ||
clear(sm.streamUpdateSubscriptionCache) | ||
|
||
for _, id := range idsToRemove { | ||
sm.logger.Error( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: cleaner to combine these two since they are used together and assumed to have same length etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a TODO for now, will address in followup PR