Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructure FNS global cache to be list #2036

Merged
merged 10 commits into from
Aug 7, 2024
Merged
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 65 additions & 39 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ type FullNodeStreamingManagerImpl struct {
ticker *time.Ticker
done chan bool

// map of clob pair id to stream updates.
streamUpdateCache map[uint32][]clobtypes.StreamUpdate
numUpdatesInCache uint32
// list of stream updates.
streamUpdateCache []clobtypes.StreamUpdate
// list of subscription ids for each stream update.
streamUpdateSubscriptionCache [][]uint32
clobPairIdToSubscriptionIdMapping map[uint32][]uint32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment for these 2 new fields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

numUpdatesInCache uint32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prob no need for this field now that we have a central list, can use len(list)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed


maxUpdatesInCache uint32
maxSubscriptionChannelSize uint32
Expand Down Expand Up @@ -66,10 +69,12 @@ func NewFullNodeStreamingManager(
orderbookSubscriptions: make(map[uint32]*OrderbookSubscription),
nextSubscriptionId: 0,

ticker: time.NewTicker(time.Duration(flushIntervalMs) * time.Millisecond),
done: make(chan bool),
streamUpdateCache: make(map[uint32][]clobtypes.StreamUpdate),
numUpdatesInCache: 0,
ticker: time.NewTicker(time.Duration(flushIntervalMs) * time.Millisecond),
done: make(chan bool),
streamUpdateCache: make([]clobtypes.StreamUpdate, 0),
streamUpdateSubscriptionCache: make([][]uint32, 0),
clobPairIdToSubscriptionIdMapping: make(map[uint32][]uint32),
numUpdatesInCache: 0,

maxUpdatesInCache: maxUpdatesInCache,
maxSubscriptionChannelSize: maxSubscriptionChannelSize,
Expand Down Expand Up @@ -134,6 +139,17 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
messageSender: messageSender,
updatesChannel: make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize),
}
for _, clobPairId := range clobPairIds {
// if clobPairId exists in the map, append the subscription id to the slice
// otherwise, create a new slice with the subscription id
if _, ok := sm.clobPairIdToSubscriptionIdMapping[clobPairId]; !ok {
sm.clobPairIdToSubscriptionIdMapping[clobPairId] = []uint32{}
}
sm.clobPairIdToSubscriptionIdMapping[clobPairId] = append(
sm.clobPairIdToSubscriptionIdMapping[clobPairId],
sm.nextSubscriptionId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we evict stuff from this map when we prune order subscriptions from the fns manager?

Copy link
Contributor Author

@dydxwill dydxwill Aug 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, thanks for the catch!

)
}

sm.logger.Info(
fmt.Sprintf(
Expand Down Expand Up @@ -295,27 +311,28 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
}

// Unmarshal each per-clob pair message to v1 updates.
updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate)
updatesByClobPairId := make([]clobtypes.StreamUpdate, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename var since this is not a map anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

clobPairIds := make([]uint32, 0)
for clobPairId, update := range updates {
v1updates, err := streaming_util.GetOffchainUpdatesV1(update)
if err != nil {
panic(err)
}
updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{
{
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Updates: v1updates,
Snapshot: false,
},
streamUpdate := clobtypes.StreamUpdate{
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Updates: v1updates,
Snapshot: false,
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
}
updatesByClobPairId = append(updatesByClobPairId, streamUpdate)
clobPairIds = append(clobPairIds, clobPairId)
}

sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(updates)))
sm.AddUpdatesToCache(updatesByClobPairId, clobPairIds, uint32(len(updates)))
}

// SendOrderbookFillUpdates groups fills by their clob pair ids and
Expand All @@ -333,7 +350,8 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
)

// Group fills by clob pair id.
updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate)
updatesByClobPairId := make([]clobtypes.StreamUpdate, 0)
clobPairIds := make([]uint32, 0)
for _, orderbookFill := range orderbookFills {
// If this is a deleveraging fill, fetch the clob pair id from the deleveraged
// perpetual id.
Expand All @@ -346,24 +364,23 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
} else {
clobPairId = orderbookFill.Orders[0].OrderId.ClobPairId
}
if _, ok := updatesByClobPairId[clobPairId]; !ok {
updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{}
}
streamUpdate := clobtypes.StreamUpdate{
UpdateMessage: &clobtypes.StreamUpdate_OrderFill{
OrderFill: &orderbookFill,
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
}
updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate)
updatesByClobPairId = append(updatesByClobPairId, streamUpdate)
clobPairIds = append(clobPairIds, clobPairId)
}

sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(orderbookFills)))
sm.AddUpdatesToCache(updatesByClobPairId, clobPairIds, uint32(len(orderbookFills)))
}

func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache(
updatesByClobPairId map[uint32][]clobtypes.StreamUpdate,
updates []clobtypes.StreamUpdate,
clobPairIds []uint32,
numUpdatesToAdd uint32,
) {
sm.Lock()
Expand All @@ -374,8 +391,12 @@ func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache(
float32(numUpdatesToAdd),
)

for clobPairId, streamUpdates := range updatesByClobPairId {
sm.streamUpdateCache[clobPairId] = append(sm.streamUpdateCache[clobPairId], streamUpdates...)
sm.streamUpdateCache = append(sm.streamUpdateCache, updates...)
for _, clobPairId := range clobPairIds {
sm.streamUpdateSubscriptionCache = append(
sm.streamUpdateSubscriptionCache,
sm.clobPairIdToSubscriptionIdMapping[clobPairId],
)
}
sm.numUpdatesInCache += numUpdatesToAdd

Expand All @@ -398,8 +419,8 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdates() {
sm.FlushStreamUpdatesWithLock()
}

// FlushStreamUpdatesWithLock takes in a map of clob pair id to stream updates and emits them to subscribers.
// Note this method requires the lock and assumes that the lock has already been
// FlushStreamUpdatesWithLock takes in a list of stream updates and their corresponding subscription IDs,
// and emits them to subscribers. Note this method requires the lock and assumes that the lock has already been
// acquired by the caller.
func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() {
defer metrics.ModuleMeasureSince(
Expand All @@ -408,31 +429,36 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() {
time.Now(),
)

// Non-blocking send updates through subscriber's buffered channel.
// If the buffer is full, drop the subscription.
// Map to collect updates for each subscription.
subscriptionUpdates := make(map[uint32][]clobtypes.StreamUpdate)
idsToRemove := make([]uint32, 0)
for id, subscription := range sm.orderbookSubscriptions {
streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0)
for _, clobPairId := range subscription.clobPairIds {
if update, ok := sm.streamUpdateCache[clobPairId]; ok {
streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...)
}

// Collect updates for each subscription.
for i, update := range sm.streamUpdateCache {
subscriptionIds := sm.streamUpdateSubscriptionCache[i]
for _, id := range subscriptionIds {
subscriptionUpdates[id] = append(subscriptionUpdates[id], update)
}
}

if len(streamUpdatesForSubscription) > 0 {
// Non-blocking send updates through subscriber's buffered channel.
// If the buffer is full, drop the subscription.
for id, updates := range subscriptionUpdates {
if subscription, ok := sm.orderbookSubscriptions[id]; ok {
metrics.IncrCounter(
metrics.GrpcAddToSubscriptionChannelCount,
1,
)
select {
case subscription.updatesChannel <- streamUpdatesForSubscription:
case subscription.updatesChannel <- updates:
default:
idsToRemove = append(idsToRemove, id)
}
}
}

clear(sm.streamUpdateCache)
clear(sm.streamUpdateSubscriptionCache)
sm.numUpdatesInCache = 0

for _, id := range idsToRemove {
Expand Down
Loading