Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-1190] Emit FinalizeBlock updates in single batch. #2260

Merged
merged 3 commits into from
Sep 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 141 additions & 44 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,19 +490,11 @@ func (sm *FullNodeStreamingManagerImpl) TracksSubaccountId(subaccountId satypes.
return exists
}

// SendOrderbookUpdates groups updates by their clob pair ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the 3 functions SendOrderbookUpdates, SendOrderbookFillUpdates, SendFinalizedSubaccountUpdates, we are just taking out a few lines into a helper function. No logic change

func getStreamUpdatesFromOffchainUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
blockHeight uint32,
execMode sdk.ExecMode,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookUpdatesLatency,
time.Now(),
)

) (streamUpdates []clobtypes.StreamUpdate, clobPairIds []uint32) {
// Group updates by clob pair id.
updates := make(map[uint32]*clobtypes.OffchainUpdates)
for _, message := range offchainUpdates.Messages {
Expand All @@ -514,8 +506,8 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
}

// Unmarshal each per-clob pair message to v1 updates.
streamUpdates := make([]clobtypes.StreamUpdate, 0)
clobPairIds := make([]uint32, 0)
streamUpdates = make([]clobtypes.StreamUpdate, 0)
clobPairIds = make([]uint32, 0)
for clobPairId, update := range updates {
v1updates, err := streaming_util.GetOffchainUpdatesV1(update)
if err != nil {
Expand All @@ -535,26 +527,39 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
clobPairIds = append(clobPairIds, clobPairId)
}

sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
return streamUpdates, clobPairIds
}

// SendOrderbookFillUpdates groups fills by their clob pair ids and
// SendOrderbookUpdates groups updates by their clob pair ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
orderbookFills []clobtypes.StreamOrderbookFill,
func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
blockHeight uint32,
execMode sdk.ExecMode,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookFillsLatency,
metrics.GrpcSendOrderbookUpdatesLatency,
time.Now(),
)

streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates(offchainUpdates, blockHeight, execMode)

sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
}

func (sm *FullNodeStreamingManagerImpl) getStreamUpdatesForOrderbookFills(
orderbookFills []clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) (
streamUpdates []clobtypes.StreamUpdate,
clobPairIds []uint32,
) {
// Group fills by clob pair id.
streamUpdates := make([]clobtypes.StreamUpdate, 0)
clobPairIds := make([]uint32, 0)
streamUpdates = make([]clobtypes.StreamUpdate, 0)
clobPairIds = make([]uint32, 0)
for _, orderbookFill := range orderbookFills {
// If this is a deleveraging fill, fetch the clob pair id from the deleveraged
// perpetual id.
Expand All @@ -577,6 +582,29 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
streamUpdates = append(streamUpdates, streamUpdate)
clobPairIds = append(clobPairIds, clobPairId)
}
return streamUpdates, clobPairIds
}

// SendOrderbookFillUpdates groups fills by their clob pair ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
orderbookFills []clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookFillsLatency,
time.Now(),
)

streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills(
orderbookFills,
blockHeight,
execMode,
perpetualIdToClobPairId,
)

sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
}
Expand Down Expand Up @@ -609,6 +637,31 @@ func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus(
)
}

func getStreamUpdatesForSubaccountUpdates(
subaccountUpdates []satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
) (
streamUpdates []clobtypes.StreamUpdate,
subaccountIds []*satypes.SubaccountId,
) {
// Group subaccount updates by subaccount id.
streamUpdates = make([]clobtypes.StreamUpdate, 0)
subaccountIds = make([]*satypes.SubaccountId, 0)
for _, subaccountUpdate := range subaccountUpdates {
streamUpdate := clobtypes.StreamUpdate{
UpdateMessage: &clobtypes.StreamUpdate_SubaccountUpdate{
SubaccountUpdate: &subaccountUpdate,
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
}
streamUpdates = append(streamUpdates, streamUpdate)
subaccountIds = append(subaccountIds, subaccountUpdate.SubaccountId)
}
return streamUpdates, subaccountIds
}

// SendFinalizedSubaccountUpdates groups subaccount updates by their subaccount ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendFinalizedSubaccountUpdates(
Expand All @@ -626,20 +679,11 @@ func (sm *FullNodeStreamingManagerImpl) SendFinalizedSubaccountUpdates(
panic("SendFinalizedSubaccountUpdates should only be called in ExecModeFinalize")
}

// Group subaccount updates by subaccount id.
streamUpdates := make([]clobtypes.StreamUpdate, 0)
subaccountIds := make([]*satypes.SubaccountId, 0)
for _, subaccountUpdate := range subaccountUpdates {
streamUpdate := clobtypes.StreamUpdate{
UpdateMessage: &clobtypes.StreamUpdate_SubaccountUpdate{
SubaccountUpdate: &subaccountUpdate,
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
}
streamUpdates = append(streamUpdates, streamUpdate)
subaccountIds = append(subaccountIds, subaccountUpdate.SubaccountId)
}
streamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates(
subaccountUpdates,
blockHeight,
execMode,
)

sm.AddSubaccountUpdatesToCache(streamUpdates, subaccountIds)
}
Expand Down Expand Up @@ -796,6 +840,47 @@ func (sm *FullNodeStreamingManagerImpl) GetSubaccountSnapshotsForInitStreams(
return ret
}

// addBatchUpdatesToCacheWithLock adds batched updates to the cache.
// Used by `StreamBatchUpdatesAfterFinalizeBlock` to batch orderbook, fill
// and subaccount updates in a single stream.
// Note this method requires the lock and assumes that the lock has already been
// acquired by the caller.
func (sm *FullNodeStreamingManagerImpl) addBatchUpdatesToCacheWithLock(
orderbookStreamUpdates []clobtypes.StreamUpdate,
orderbookClobPairIds []uint32,
fillStreamUpdates []clobtypes.StreamUpdate,
fillClobPairIds []uint32,
subaccountStreamUpdates []clobtypes.StreamUpdate,
subaccountIds []*satypes.SubaccountId,
) {
// Add orderbook updates to cache.
sm.streamUpdateCache = append(sm.streamUpdateCache, orderbookStreamUpdates...)
for _, clobPairId := range orderbookClobPairIds {
sm.streamUpdateSubscriptionCache = append(
sm.streamUpdateSubscriptionCache,
sm.clobPairIdToSubscriptionIdMapping[clobPairId],
)
}

// Add fill updates to cache.
sm.streamUpdateCache = append(sm.streamUpdateCache, fillStreamUpdates...)
for _, clobPairId := range fillClobPairIds {
sm.streamUpdateSubscriptionCache = append(
sm.streamUpdateSubscriptionCache,
sm.clobPairIdToSubscriptionIdMapping[clobPairId],
)
}

// Add subaccount updates to cache.
sm.streamUpdateCache = append(sm.streamUpdateCache, subaccountStreamUpdates...)
for _, subaccountId := range subaccountIds {
sm.streamUpdateSubscriptionCache = append(
sm.streamUpdateSubscriptionCache,
sm.subaccountIdToSubscriptionIdMapping[*subaccountId],
)
}
}

// Grpc Streaming logic after consensus agrees on a block.
// - Stream all events staged during `FinalizeBlock`.
// - Stream orderbook updates to sync fills in local ops queue.
Expand All @@ -804,33 +889,45 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(
orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) {
// Flush all pending updates, since we want the onchain updates to arrive in a batch.
sm.FlushStreamUpdates()

finalizedFills, finalizedSubaccountUpdates := sm.getStagedEventsFromFinalizeBlock(ctx)

// TODO(CT-1190): Stream below in a single batch.
// Send orderbook updates to sync optimistic orderbook onchain state after FinalizeBlock.
sm.SendOrderbookUpdates(
orderbookStreamUpdates, orderbookClobPairIds := getStreamUpdatesFromOffchainUpdates(
orderBookUpdatesToSyncLocalOpsQueue,
uint32(ctx.BlockHeight()),
ctx.ExecMode(),
)

// Send finalized fills from FinalizeBlock.
sm.SendOrderbookFillUpdates(
fillStreamUpdates, fillClobPairIds := sm.getStreamUpdatesForOrderbookFills(
finalizedFills,
uint32(ctx.BlockHeight()),
ctx.ExecMode(),
perpetualIdToClobPairId,
)

// Send finalized subaccount updates from FinalizeBlock.
sm.SendFinalizedSubaccountUpdates(
subaccountStreamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates(
finalizedSubaccountUpdates,
uint32(ctx.BlockHeight()),
ctx.ExecMode(),
)

sm.Lock()
defer sm.Unlock()

// Flush all pending updates, since we want the onchain updates to arrive in a batch.
sm.FlushStreamUpdatesWithLock()

sm.addBatchUpdatesToCacheWithLock(
orderbookStreamUpdates,
orderbookClobPairIds,
fillStreamUpdates,
fillClobPairIds,
subaccountStreamUpdates,
subaccountIds,
)

// Emit all stream updates in a single batch.
// Note we still have the lock, which is released right before function returns.
sm.FlushStreamUpdatesWithLock()
}

// getStagedEventsFromFinalizeBlock returns staged events from `FinalizeBlock`.
Expand Down
Loading