Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructure FNS global cache to be list #2036

Merged
merged 10 commits into from
Aug 7, 2024
Merged
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 82 additions & 44 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ type FullNodeStreamingManagerImpl struct {
ticker *time.Ticker
done chan bool

// map of clob pair id to stream updates.
streamUpdateCache map[uint32][]clobtypes.StreamUpdate
numUpdatesInCache uint32
// list of stream updates.
streamUpdateCache []clobtypes.StreamUpdate
// list of subscription ids for each stream update.
streamUpdateSubscriptionCache [][]uint32
Comment on lines +36 to +38
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: cleaner to combine these two since they are used together and assumed to have same length etc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a TODO for now, will address in followup PR

// map from clob pair id to subscription ids.
clobPairIdToSubscriptionIdMapping map[uint32][]uint32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment for these 2 new fields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


maxUpdatesInCache uint32
maxSubscriptionChannelSize uint32
Expand Down Expand Up @@ -66,10 +69,11 @@ func NewFullNodeStreamingManager(
orderbookSubscriptions: make(map[uint32]*OrderbookSubscription),
nextSubscriptionId: 0,

ticker: time.NewTicker(time.Duration(flushIntervalMs) * time.Millisecond),
done: make(chan bool),
streamUpdateCache: make(map[uint32][]clobtypes.StreamUpdate),
numUpdatesInCache: 0,
ticker: time.NewTicker(time.Duration(flushIntervalMs) * time.Millisecond),
done: make(chan bool),
streamUpdateCache: make([]clobtypes.StreamUpdate, 0),
streamUpdateSubscriptionCache: make([][]uint32, 0),
clobPairIdToSubscriptionIdMapping: make(map[uint32][]uint32),

maxUpdatesInCache: maxUpdatesInCache,
maxSubscriptionChannelSize: maxSubscriptionChannelSize,
Expand Down Expand Up @@ -101,7 +105,7 @@ func (sm *FullNodeStreamingManagerImpl) Enabled() bool {
func (sm *FullNodeStreamingManagerImpl) EmitMetrics() {
metrics.SetGauge(
metrics.GrpcStreamNumUpdatesBuffered,
float32(sm.numUpdatesInCache),
float32(len(sm.streamUpdateCache)),
)
metrics.SetGauge(
metrics.GrpcStreamSubscriberCount,
Expand Down Expand Up @@ -134,6 +138,17 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
messageSender: messageSender,
updatesChannel: make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize),
}
for _, clobPairId := range clobPairIds {
// if clobPairId exists in the map, append the subscription id to the slice
// otherwise, create a new slice with the subscription id
if _, ok := sm.clobPairIdToSubscriptionIdMapping[clobPairId]; !ok {
sm.clobPairIdToSubscriptionIdMapping[clobPairId] = []uint32{}
}
sm.clobPairIdToSubscriptionIdMapping[clobPairId] = append(
sm.clobPairIdToSubscriptionIdMapping[clobPairId],
sm.nextSubscriptionId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we evict stuff from this map when we prune order subscriptions from the fns manager?

Copy link
Contributor Author

@dydxwill dydxwill Aug 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, thanks for the catch!

)
}

sm.logger.Info(
fmt.Sprintf(
Expand Down Expand Up @@ -194,6 +209,22 @@ func (sm *FullNodeStreamingManagerImpl) removeSubscription(
}
close(subscription.updatesChannel)
delete(sm.orderbookSubscriptions, subscriptionIdToRemove)

// Iterate over the clobPairIdToSubscriptionIdMapping to remove the subscriptionIdToRemove
for pairId, subscriptionIds := range sm.clobPairIdToSubscriptionIdMapping {
for i, id := range subscriptionIds {
if id == subscriptionIdToRemove {
// Remove the subscription ID from the slice
sm.clobPairIdToSubscriptionIdMapping[pairId] = append(subscriptionIds[:i], subscriptionIds[i+1:]...)
break
}
}
// If the list is empty after removal, delete the key from the map
if len(sm.clobPairIdToSubscriptionIdMapping[pairId]) == 0 {
delete(sm.clobPairIdToSubscriptionIdMapping, pairId)
}
}

sm.logger.Info(
fmt.Sprintf("Removed streaming subscription id %+v", subscriptionIdToRemove),
)
Expand Down Expand Up @@ -295,27 +326,28 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
}

// Unmarshal each per-clob pair message to v1 updates.
updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate)
streamUpdates := make([]clobtypes.StreamUpdate, 0)
clobPairIds := make([]uint32, 0)
for clobPairId, update := range updates {
v1updates, err := streaming_util.GetOffchainUpdatesV1(update)
if err != nil {
panic(err)
}
updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{
{
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Updates: v1updates,
Snapshot: false,
},
streamUpdate := clobtypes.StreamUpdate{
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Updates: v1updates,
Snapshot: false,
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
}
streamUpdates = append(streamUpdates, streamUpdate)
clobPairIds = append(clobPairIds, clobPairId)
}

sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(updates)))
sm.AddUpdatesToCache(streamUpdates, clobPairIds, uint32(len(updates)))
}

// SendOrderbookFillUpdates groups fills by their clob pair ids and
Expand All @@ -333,7 +365,8 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
)

// Group fills by clob pair id.
updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate)
streamUpdates := make([]clobtypes.StreamUpdate, 0)
clobPairIds := make([]uint32, 0)
for _, orderbookFill := range orderbookFills {
// If this is a deleveraging fill, fetch the clob pair id from the deleveraged
// perpetual id.
Expand All @@ -346,24 +379,23 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
} else {
clobPairId = orderbookFill.Orders[0].OrderId.ClobPairId
}
if _, ok := updatesByClobPairId[clobPairId]; !ok {
updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{}
}
streamUpdate := clobtypes.StreamUpdate{
UpdateMessage: &clobtypes.StreamUpdate_OrderFill{
OrderFill: &orderbookFill,
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
}
updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate)
streamUpdates = append(streamUpdates, streamUpdate)
clobPairIds = append(clobPairIds, clobPairId)
}

sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(orderbookFills)))
sm.AddUpdatesToCache(streamUpdates, clobPairIds, uint32(len(orderbookFills)))
}

func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache(
updatesByClobPairId map[uint32][]clobtypes.StreamUpdate,
updates []clobtypes.StreamUpdate,
clobPairIds []uint32,
numUpdatesToAdd uint32,
) {
sm.Lock()
Expand All @@ -374,20 +406,22 @@ func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache(
float32(numUpdatesToAdd),
)

for clobPairId, streamUpdates := range updatesByClobPairId {
sm.streamUpdateCache[clobPairId] = append(sm.streamUpdateCache[clobPairId], streamUpdates...)
sm.streamUpdateCache = append(sm.streamUpdateCache, updates...)
for _, clobPairId := range clobPairIds {
sm.streamUpdateSubscriptionCache = append(
sm.streamUpdateSubscriptionCache,
sm.clobPairIdToSubscriptionIdMapping[clobPairId],
)
}
sm.numUpdatesInCache += numUpdatesToAdd

// Remove all subscriptions and wipe the buffer if buffer overflows.
if sm.numUpdatesInCache > sm.maxUpdatesInCache {
if len(sm.streamUpdateCache) > int(sm.maxUpdatesInCache) {
sm.logger.Error("Streaming buffer full capacity. Dropping messages and all subscriptions. " +
"Disconnect all clients and increase buffer size via the grpc-stream-buffer-size flag.")
for id := range sm.orderbookSubscriptions {
sm.removeSubscription(id)
}
clear(sm.streamUpdateCache)
sm.numUpdatesInCache = 0
}
sm.EmitMetrics()
}
Expand All @@ -398,8 +432,8 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdates() {
sm.FlushStreamUpdatesWithLock()
}

// FlushStreamUpdatesWithLock takes in a map of clob pair id to stream updates and emits them to subscribers.
// Note this method requires the lock and assumes that the lock has already been
// FlushStreamUpdatesWithLock takes in a list of stream updates and their corresponding subscription IDs,
// and emits them to subscribers. Note this method requires the lock and assumes that the lock has already been
// acquired by the caller.
func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() {
defer metrics.ModuleMeasureSince(
Expand All @@ -408,32 +442,36 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() {
time.Now(),
)

// Non-blocking send updates through subscriber's buffered channel.
// If the buffer is full, drop the subscription.
// Map to collect updates for each subscription.
subscriptionUpdates := make(map[uint32][]clobtypes.StreamUpdate)
idsToRemove := make([]uint32, 0)
for id, subscription := range sm.orderbookSubscriptions {
streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0)
for _, clobPairId := range subscription.clobPairIds {
if update, ok := sm.streamUpdateCache[clobPairId]; ok {
streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...)
}

// Collect updates for each subscription.
for i, update := range sm.streamUpdateCache {
subscriptionIds := sm.streamUpdateSubscriptionCache[i]
for _, id := range subscriptionIds {
subscriptionUpdates[id] = append(subscriptionUpdates[id], update)
}
}

if len(streamUpdatesForSubscription) > 0 {
// Non-blocking send updates through subscriber's buffered channel.
// If the buffer is full, drop the subscription.
for id, updates := range subscriptionUpdates {
if subscription, ok := sm.orderbookSubscriptions[id]; ok {
metrics.IncrCounter(
metrics.GrpcAddToSubscriptionChannelCount,
1,
)
select {
case subscription.updatesChannel <- streamUpdatesForSubscription:
case subscription.updatesChannel <- updates:
default:
idsToRemove = append(idsToRemove, id)
}
}
}

clear(sm.streamUpdateCache)
sm.numUpdatesInCache = 0
clear(sm.streamUpdateSubscriptionCache)

for _, id := range idsToRemove {
sm.logger.Error(
Expand Down
Loading