Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
teddyding committed Sep 30, 2024
1 parent ee90970 commit 1fb2b95
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 29 deletions.
47 changes: 29 additions & 18 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func getStagedEventsCount(store storetypes.KVStore) uint32 {
return binary.BigEndian.Uint32(countsBytes)
}

// Stage a subaccount update event in transient store, during `FinalizeBlock`.
// Send a subaccount update event.
func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate(
ctx sdk.Context,
subaccountUpdate satypes.StreamSubaccountUpdate,
Expand All @@ -399,7 +399,7 @@ func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate(
return
}

// If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block.
// If `DeliverTx`, updates should be staged to be streamed after consensus finalizes on a block.
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{
SubaccountUpdate: &subaccountUpdate,
Expand Down Expand Up @@ -489,6 +489,8 @@ func getStreamUpdatesFromOffchainUpdates(
) (streamUpdates []clobtypes.StreamUpdate, clobPairIds []uint32) {
// Group updates by clob pair id.
clobPairIdToV1Updates := make(map[uint32][]ocutypes.OffChainUpdateV1)
// unique list of clob pair Ids to send updates for.
clobPairIds = make([]uint32, 0)
for _, v1update := range v1updates {
var clobPairId uint32
switch u := v1update.UpdateMessage.(type) {
Expand All @@ -506,14 +508,23 @@ func getStreamUpdatesFromOffchainUpdates(

if _, ok := clobPairIdToV1Updates[clobPairId]; !ok {
clobPairIdToV1Updates[clobPairId] = []ocutypes.OffChainUpdateV1{}
clobPairIds = append(clobPairIds, clobPairId)
}
clobPairIdToV1Updates[clobPairId] = append(clobPairIdToV1Updates[clobPairId], v1update)
}

// Unmarshal each per-clob pair message to v1 updates.
streamUpdates = make([]clobtypes.StreamUpdate, 0)
clobPairIds = make([]uint32, 0)
for clobPairId, v1updates := range clobPairIdToV1Updates {
streamUpdates = make([]clobtypes.StreamUpdate, len(clobPairIds))

for _, clobPairId := range clobPairIds {
v1updates, exists := clobPairIdToV1Updates[clobPairId]
if !exists {
panic(fmt.Sprintf(
"clob pair id %v not found in clobPairIdToV1Updates: %v",
clobPairId,
clobPairIdToV1Updates,
))
}
streamUpdate := clobtypes.StreamUpdate{
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Expand All @@ -525,7 +536,6 @@ func getStreamUpdatesFromOffchainUpdates(
ExecMode: uint32(execMode),
}
streamUpdates = append(streamUpdates, streamUpdate)
clobPairIds = append(clobPairIds, clobPairId)
}

return streamUpdates, clobPairIds
Expand All @@ -535,7 +545,6 @@ func getStreamUpdatesFromOffchainUpdates(
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
blockHeight uint32,
ctx sdk.Context,
) {
v1updates := streaming_util.GetOffchainUpdatesV1(offchainUpdates)
Expand All @@ -548,12 +557,16 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
time.Now(),
)

streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates(v1updates, blockHeight, ctx.ExecMode())
streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates(
v1updates,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
return
}

// If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block.
// If `DeliverTx`, updates should be staged to be streamed after consensus finalizes on a block.
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Expand Down Expand Up @@ -609,7 +622,6 @@ func (sm *FullNodeStreamingManagerImpl) getStreamUpdatesForOrderbookFills(
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate(
orderbookFill clobtypes.StreamOrderbookFill,
blockHeight uint32,
ctx sdk.Context,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) {
Expand All @@ -623,15 +635,15 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate(

streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills(
[]clobtypes.StreamOrderbookFill{orderbookFill},
blockHeight,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
perpetualIdToClobPairId,
)
sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
return
}

// If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block.
// If `DeliverTx`, updates should be staged to be streamed after consensus finalizes on a block.
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{
OrderFill: &orderbookFill,
Expand All @@ -647,7 +659,6 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate(
// SendTakerOrderStatus sends out a taker order and its status to the full node streaming service.
func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus(
streamTakerOrder clobtypes.StreamTakerOrder,
blockHeight uint32,
ctx sdk.Context,
) {
// In current design, we never send this during `DeliverTx` (`FinalizeBlock`).
Expand All @@ -667,7 +678,7 @@ func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus(
UpdateMessage: &clobtypes.StreamUpdate_TakerOrder{
TakerOrder: &streamTakerOrder,
},
BlockHeight: blockHeight,
BlockHeight: lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ExecMode: uint32(ctx.ExecMode()),
},
},
Expand Down Expand Up @@ -925,15 +936,15 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(
// Cache updates to sync local ops queue
sycnLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates(
streaming_util.GetOffchainUpdatesV1(orderBookUpdatesToSyncLocalOpsQueue),
uint32(ctx.BlockHeight()),
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
sm.cacheStreamUpdatesByClobPairWithLock(sycnLocalUpdates, syncLocalClobPairIds)

// Cache updates for finalized fills.
fillStreamUpdates, fillClobPairIds := sm.getStreamUpdatesForOrderbookFills(
finalizedFills,
uint32(ctx.BlockHeight()),
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
perpetualIdToClobPairId,
)
Expand All @@ -943,7 +954,7 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(
for _, finalizedUpdate := range finalizedOrderbookUpdates {
streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates(
finalizedUpdate.Updates,
uint32(ctx.BlockHeight()),
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
sm.cacheStreamUpdatesByClobPairWithLock(streamUpdates, clobPairIds)
Expand All @@ -952,7 +963,7 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(
// Finally, cache updates for finalized subaccount updates
subaccountStreamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates(
finalizedSubaccountUpdates,
uint32(ctx.BlockHeight()),
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
sm.cacheStreamUpdatesBySubaccountWithLock(subaccountStreamUpdates, subaccountIds)
Expand Down
3 changes: 0 additions & 3 deletions protocol/streaming/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,19 @@ func (sm *NoopGrpcStreamingManager) Subscribe(

func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates(
updates *clobtypes.OffchainUpdates,
blockHeight uint32,
ctx sdk.Context,
) {
}

func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdate(
orderbookFill clobtypes.StreamOrderbookFill,
blockHeight uint32,
ctx sdk.Context,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) {
}

func (sm *NoopGrpcStreamingManager) SendTakerOrderStatus(
takerOrder clobtypes.StreamTakerOrder,
blockHeight uint32,
ctx sdk.Context,
) {
}
Expand Down
3 changes: 0 additions & 3 deletions protocol/streaming/types/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,15 @@ type FullNodeStreamingManager interface {
) map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate
SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
blockHeight uint32,
ctx sdk.Context,
)
SendOrderbookFillUpdate(
orderbookFill clobtypes.StreamOrderbookFill,
blockHeight uint32,
ctx sdk.Context,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
)
SendTakerOrderStatus(
takerOrder clobtypes.StreamTakerOrder,
blockHeight uint32,
ctx sdk.Context,
)
SendFinalizedSubaccountUpdates(
Expand Down
3 changes: 0 additions & 3 deletions protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,6 @@ func (k Keeper) SendOrderbookUpdates(

k.GetFullNodeStreamingManager().SendOrderbookUpdates(
offchainUpdates,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx,
)
}
Expand All @@ -321,7 +320,6 @@ func (k Keeper) SendOrderbookFillUpdate(
) {
k.GetFullNodeStreamingManager().SendOrderbookFillUpdate(
orderbookFill,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx,
k.PerpetualIdToClobPairId,
)
Expand All @@ -334,7 +332,6 @@ func (k Keeper) SendTakerOrderStatus(
) {
k.GetFullNodeStreamingManager().SendTakerOrderStatus(
takerOrder,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx,
)
}
2 changes: 0 additions & 2 deletions protocol/x/clob/keeper/process_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,6 @@ func (k Keeper) PersistMatchOrdersToState(

k.GetFullNodeStreamingManager().SendOrderbookFillUpdate(
streamOrderbookFill,
uint32(ctx.BlockHeight()),
ctx,
k.PerpetualIdToClobPairId,
)
Expand Down Expand Up @@ -674,7 +673,6 @@ func (k Keeper) PersistMatchLiquidationToState(
)
k.GetFullNodeStreamingManager().SendOrderbookFillUpdate(
streamOrderbookFill,
uint32(ctx.BlockHeight()),
ctx,
k.PerpetualIdToClobPairId,
)
Expand Down

0 comments on commit 1fb2b95

Please sign in to comment.