From 1fb2b957dbab0ad0510ae0f82faae986fc1c1618 Mon Sep 17 00:00:00 2001 From: Teddy Ding Date: Mon, 30 Sep 2024 13:15:17 -0400 Subject: [PATCH] Address comments --- .../streaming/full_node_streaming_manager.go | 47 ++++++++++++------- protocol/streaming/noop_streaming_manager.go | 3 -- protocol/streaming/types/interface.go | 3 -- protocol/x/clob/keeper/keeper.go | 3 -- protocol/x/clob/keeper/process_operations.go | 2 - 5 files changed, 29 insertions(+), 29 deletions(-) diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index ba1d550c3d..759b955c55 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -389,7 +389,7 @@ func getStagedEventsCount(store storetypes.KVStore) uint32 { return binary.BigEndian.Uint32(countsBytes) } -// Stage a subaccount update event in transient store, during `FinalizeBlock`. +// Send a subaccount update event. func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate( ctx sdk.Context, subaccountUpdate satypes.StreamSubaccountUpdate, @@ -399,7 +399,7 @@ func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate( return } - // If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block. + // If `DeliverTx`, updates should be staged to be streamed after consensus finalizes on a block. stagedEvent := clobtypes.StagedFinalizeBlockEvent{ Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{ SubaccountUpdate: &subaccountUpdate, @@ -489,6 +489,8 @@ func getStreamUpdatesFromOffchainUpdates( ) (streamUpdates []clobtypes.StreamUpdate, clobPairIds []uint32) { // Group updates by clob pair id. clobPairIdToV1Updates := make(map[uint32][]ocutypes.OffChainUpdateV1) + // unique list of clob pair Ids to send updates for. + clobPairIds = make([]uint32, 0) for _, v1update := range v1updates { var clobPairId uint32 switch u := v1update.UpdateMessage.(type) { @@ -506,14 +508,23 @@ func getStreamUpdatesFromOffchainUpdates( if _, ok := clobPairIdToV1Updates[clobPairId]; !ok { clobPairIdToV1Updates[clobPairId] = []ocutypes.OffChainUpdateV1{} + clobPairIds = append(clobPairIds, clobPairId) } clobPairIdToV1Updates[clobPairId] = append(clobPairIdToV1Updates[clobPairId], v1update) } // Unmarshal each per-clob pair message to v1 updates. - streamUpdates = make([]clobtypes.StreamUpdate, 0) - clobPairIds = make([]uint32, 0) - for clobPairId, v1updates := range clobPairIdToV1Updates { + streamUpdates = make([]clobtypes.StreamUpdate, len(clobPairIds)) + + for _, clobPairId := range clobPairIds { + v1updates, exists := clobPairIdToV1Updates[clobPairId] + if !exists { + panic(fmt.Sprintf( + "clob pair id %v not found in clobPairIdToV1Updates: %v", + clobPairId, + clobPairIdToV1Updates, + )) + } streamUpdate := clobtypes.StreamUpdate{ UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ @@ -525,7 +536,6 @@ func getStreamUpdatesFromOffchainUpdates( ExecMode: uint32(execMode), } streamUpdates = append(streamUpdates, streamUpdate) - clobPairIds = append(clobPairIds, clobPairId) } return streamUpdates, clobPairIds @@ -535,7 +545,6 @@ func getStreamUpdatesFromOffchainUpdates( // sends messages to the subscribers. func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates( offchainUpdates *clobtypes.OffchainUpdates, - blockHeight uint32, ctx sdk.Context, ) { v1updates := streaming_util.GetOffchainUpdatesV1(offchainUpdates) @@ -548,12 +557,16 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates( time.Now(), ) - streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates(v1updates, blockHeight, ctx.ExecMode()) + streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates( + v1updates, + lib.MustConvertIntegerToUint32(ctx.BlockHeight()), + ctx.ExecMode(), + ) sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds) return } - // If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block. + // If `DeliverTx`, updates should be staged to be streamed after consensus finalizes on a block. stagedEvent := clobtypes.StagedFinalizeBlockEvent{ Event: &clobtypes.StagedFinalizeBlockEvent_OrderbookUpdate{ OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ @@ -609,7 +622,6 @@ func (sm *FullNodeStreamingManagerImpl) getStreamUpdatesForOrderbookFills( // sends messages to the subscribers. func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate( orderbookFill clobtypes.StreamOrderbookFill, - blockHeight uint32, ctx sdk.Context, perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, ) { @@ -623,7 +635,7 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate( streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills( []clobtypes.StreamOrderbookFill{orderbookFill}, - blockHeight, + lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx.ExecMode(), perpetualIdToClobPairId, ) @@ -631,7 +643,7 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate( return } - // If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block. + // If `DeliverTx`, updates should be staged to be streamed after consensus finalizes on a block. stagedEvent := clobtypes.StagedFinalizeBlockEvent{ Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{ OrderFill: &orderbookFill, @@ -647,7 +659,6 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate( // SendTakerOrderStatus sends out a taker order and its status to the full node streaming service. func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus( streamTakerOrder clobtypes.StreamTakerOrder, - blockHeight uint32, ctx sdk.Context, ) { // In current design, we never send this during `DeliverTx` (`FinalizeBlock`). @@ -667,7 +678,7 @@ func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus( UpdateMessage: &clobtypes.StreamUpdate_TakerOrder{ TakerOrder: &streamTakerOrder, }, - BlockHeight: blockHeight, + BlockHeight: lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ExecMode: uint32(ctx.ExecMode()), }, }, @@ -925,7 +936,7 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock( // Cache updates to sync local ops queue sycnLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates( streaming_util.GetOffchainUpdatesV1(orderBookUpdatesToSyncLocalOpsQueue), - uint32(ctx.BlockHeight()), + lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx.ExecMode(), ) sm.cacheStreamUpdatesByClobPairWithLock(sycnLocalUpdates, syncLocalClobPairIds) @@ -933,7 +944,7 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock( // Cache updates for finalized fills. fillStreamUpdates, fillClobPairIds := sm.getStreamUpdatesForOrderbookFills( finalizedFills, - uint32(ctx.BlockHeight()), + lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx.ExecMode(), perpetualIdToClobPairId, ) @@ -943,7 +954,7 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock( for _, finalizedUpdate := range finalizedOrderbookUpdates { streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates( finalizedUpdate.Updates, - uint32(ctx.BlockHeight()), + lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx.ExecMode(), ) sm.cacheStreamUpdatesByClobPairWithLock(streamUpdates, clobPairIds) @@ -952,7 +963,7 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock( // Finally, cache updates for finalized subaccount updates subaccountStreamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates( finalizedSubaccountUpdates, - uint32(ctx.BlockHeight()), + lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx.ExecMode(), ) sm.cacheStreamUpdatesBySubaccountWithLock(subaccountStreamUpdates, subaccountIds) diff --git a/protocol/streaming/noop_streaming_manager.go b/protocol/streaming/noop_streaming_manager.go index 5358b9b098..9dc7bf6de9 100644 --- a/protocol/streaming/noop_streaming_manager.go +++ b/protocol/streaming/noop_streaming_manager.go @@ -31,14 +31,12 @@ func (sm *NoopGrpcStreamingManager) Subscribe( func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates( updates *clobtypes.OffchainUpdates, - blockHeight uint32, ctx sdk.Context, ) { } func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdate( orderbookFill clobtypes.StreamOrderbookFill, - blockHeight uint32, ctx sdk.Context, perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, ) { @@ -46,7 +44,6 @@ func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdate( func (sm *NoopGrpcStreamingManager) SendTakerOrderStatus( takerOrder clobtypes.StreamTakerOrder, - blockHeight uint32, ctx sdk.Context, ) { } diff --git a/protocol/streaming/types/interface.go b/protocol/streaming/types/interface.go index cddaada7d7..5b42864016 100644 --- a/protocol/streaming/types/interface.go +++ b/protocol/streaming/types/interface.go @@ -31,18 +31,15 @@ type FullNodeStreamingManager interface { ) map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate SendOrderbookUpdates( offchainUpdates *clobtypes.OffchainUpdates, - blockHeight uint32, ctx sdk.Context, ) SendOrderbookFillUpdate( orderbookFill clobtypes.StreamOrderbookFill, - blockHeight uint32, ctx sdk.Context, perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, ) SendTakerOrderStatus( takerOrder clobtypes.StreamTakerOrder, - blockHeight uint32, ctx sdk.Context, ) SendFinalizedSubaccountUpdates( diff --git a/protocol/x/clob/keeper/keeper.go b/protocol/x/clob/keeper/keeper.go index 5ceaad43a8..f49eb61271 100644 --- a/protocol/x/clob/keeper/keeper.go +++ b/protocol/x/clob/keeper/keeper.go @@ -309,7 +309,6 @@ func (k Keeper) SendOrderbookUpdates( k.GetFullNodeStreamingManager().SendOrderbookUpdates( offchainUpdates, - lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx, ) } @@ -321,7 +320,6 @@ func (k Keeper) SendOrderbookFillUpdate( ) { k.GetFullNodeStreamingManager().SendOrderbookFillUpdate( orderbookFill, - lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx, k.PerpetualIdToClobPairId, ) @@ -334,7 +332,6 @@ func (k Keeper) SendTakerOrderStatus( ) { k.GetFullNodeStreamingManager().SendTakerOrderStatus( takerOrder, - lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx, ) } diff --git a/protocol/x/clob/keeper/process_operations.go b/protocol/x/clob/keeper/process_operations.go index ad1c117eb7..86808afb5d 100644 --- a/protocol/x/clob/keeper/process_operations.go +++ b/protocol/x/clob/keeper/process_operations.go @@ -562,7 +562,6 @@ func (k Keeper) PersistMatchOrdersToState( k.GetFullNodeStreamingManager().SendOrderbookFillUpdate( streamOrderbookFill, - uint32(ctx.BlockHeight()), ctx, k.PerpetualIdToClobPairId, ) @@ -674,7 +673,6 @@ func (k Keeper) PersistMatchLiquidationToState( ) k.GetFullNodeStreamingManager().SendOrderbookFillUpdate( streamOrderbookFill, - uint32(ctx.BlockHeight()), ctx, k.PerpetualIdToClobPairId, )