diff --git a/proto/dydxprotocol/clob/streaming.proto b/proto/dydxprotocol/clob/streaming.proto index 06c74ffbe16..ae3811134ee 100644 --- a/proto/dydxprotocol/clob/streaming.proto +++ b/proto/dydxprotocol/clob/streaming.proto @@ -12,5 +12,6 @@ message StagedFinalizeBlockEvent { oneof event { StreamOrderbookFill order_fill = 1; dydxprotocol.subaccounts.StreamSubaccountUpdate subaccount_update = 2; + StreamOrderbookUpdate orderbook_update = 3; } } diff --git a/protocol/mocks/MemClobKeeper.go b/protocol/mocks/MemClobKeeper.go index 12a2f8cff32..5d8d68d42f1 100644 --- a/protocol/mocks/MemClobKeeper.go +++ b/protocol/mocks/MemClobKeeper.go @@ -415,9 +415,9 @@ func (_m *MemClobKeeper) ReplayPlaceOrder(ctx types.Context, msg *clobtypes.MsgP return r0, r1, r2, r3 } -// SendOrderbookFillUpdates provides a mock function with given fields: ctx, orderbookFills -func (_m *MemClobKeeper) SendOrderbookFillUpdates(ctx types.Context, orderbookFills []clobtypes.StreamOrderbookFill) { - _m.Called(ctx, orderbookFills) +// SendOrderbookFillUpdate provides a mock function with given fields: ctx, orderbookFills +func (_m *MemClobKeeper) SendOrderbookFillUpdate(ctx types.Context, orderbookFill clobtypes.StreamOrderbookFill) { + _m.Called(ctx, orderbookFill) } // SendOrderbookUpdates provides a mock function with given fields: ctx, offchainUpdates diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index c43a254e787..f3249e0cc57 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -391,38 +391,21 @@ func getStagedEventsCount(store storetypes.KVStore) uint32 { } // Stage a subaccount update event in transient store, during `FinalizeBlock`. -func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockSubaccountUpdate( +func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate( ctx sdk.Context, subaccountUpdate satypes.StreamSubaccountUpdate, ) { - lib.AssertDeliverTxMode(ctx) - stagedEvent := clobtypes.StagedFinalizeBlockEvent{ - Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{ - SubaccountUpdate: &subaccountUpdate, - }, + // If not `DeliverTx`, return since we don't stream optimistic subaccount updates. + if !lib.IsDeliverTxMode(ctx) { + return } - sm.stageFinalizeBlockEvent( - ctx, - sm.cdc.MustMarshal(&stagedEvent), - ) -} -// Stage a fill event in transient store, during `FinalizeBlock`. -// Since `FinalizeBlock` code block can be called more than once with optimistic -// execution (once optimistically and optionally once on the canonical block), -// we need to stage the events in transient store and later emit them -// during `Precommit`. -func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockFill( - ctx sdk.Context, - fill clobtypes.StreamOrderbookFill, -) { - lib.AssertDeliverTxMode(ctx) + // If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block. stagedEvent := clobtypes.StagedFinalizeBlockEvent{ - Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{ - OrderFill: &fill, + Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{ + SubaccountUpdate: &subaccountUpdate, }, } - sm.stageFinalizeBlockEvent( ctx, sm.cdc.MustMarshal(&stagedEvent), @@ -545,7 +528,7 @@ func getStreamUpdatesFromOffchainUpdates( func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates( offchainUpdates *clobtypes.OffchainUpdates, blockHeight uint32, - execMode sdk.ExecMode, + ctx sdk.Context, ) { defer metrics.ModuleMeasureSince( metrics.FullNodeGrpc, @@ -553,9 +536,30 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates( time.Now(), ) - streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates(offchainUpdates, blockHeight, execMode) + // If not `DeliverTx`, then updates are optimistic. Stream them directly. + if !lib.IsDeliverTxMode(ctx) { + streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates(offchainUpdates, blockHeight, ctx.ExecMode()) + sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds) + return + } - sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds) + // If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block. + v1updates, err := streaming_util.GetOffchainUpdatesV1(offchainUpdates) + if err != nil { + panic(err) + } + stagedEvent := clobtypes.StagedFinalizeBlockEvent{ + Event: &clobtypes.StagedFinalizeBlockEvent_OrderbookUpdate{ + OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ + Updates: v1updates, + Snapshot: false, + }, + }, + } + sm.stageFinalizeBlockEvent( + ctx, + sm.cdc.MustMarshal(&stagedEvent), + ) } func (sm *FullNodeStreamingManagerImpl) getStreamUpdatesForOrderbookFills( @@ -595,28 +599,44 @@ func (sm *FullNodeStreamingManagerImpl) getStreamUpdatesForOrderbookFills( return streamUpdates, clobPairIds } -// SendOrderbookFillUpdates groups fills by their clob pair ids and +// SendOrderbookFillUpdate groups fills by their clob pair ids and // sends messages to the subscribers. -func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates( - orderbookFills []clobtypes.StreamOrderbookFill, +func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate( + orderbookFill clobtypes.StreamOrderbookFill, blockHeight uint32, - execMode sdk.ExecMode, + ctx sdk.Context, perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, ) { + // TODO defer metrics.ModuleMeasureSince( metrics.FullNodeGrpc, metrics.GrpcSendOrderbookFillsLatency, time.Now(), ) - streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills( - orderbookFills, - blockHeight, - execMode, - perpetualIdToClobPairId, - ) + // If not `DeliverTx`, then updates are optimistic. Stream them directly. + if !lib.IsDeliverTxMode(ctx) { + streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills( + []clobtypes.StreamOrderbookFill{orderbookFill}, + blockHeight, + ctx.ExecMode(), + perpetualIdToClobPairId, + ) + sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds) + return + } + + // If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block. + stagedEvent := clobtypes.StagedFinalizeBlockEvent{ + Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{ + OrderFill: &orderbookFill, + }, + } - sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds) + sm.stageFinalizeBlockEvent( + ctx, + sm.cdc.MustMarshal(&stagedEvent), + ) } // SendTakerOrderStatus sends out a taker order and its status to the full node streaming service. diff --git a/protocol/streaming/noop_streaming_manager.go b/protocol/streaming/noop_streaming_manager.go index 4df60bc4271..9174aeb6120 100644 --- a/protocol/streaming/noop_streaming_manager.go +++ b/protocol/streaming/noop_streaming_manager.go @@ -32,14 +32,14 @@ func (sm *NoopGrpcStreamingManager) Subscribe( func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates( updates *clobtypes.OffchainUpdates, blockHeight uint32, - execMode sdk.ExecMode, + ctx sdk.Context, ) { } -func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates( - orderbookFills []clobtypes.StreamOrderbookFill, +func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdate( + orderbookFill clobtypes.StreamOrderbookFill, blockHeight uint32, - execMode sdk.ExecMode, + ctx sdk.Context, perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, ) { } @@ -79,19 +79,13 @@ func (sm *NoopGrpcStreamingManager) InitializeNewStreams( func (sm *NoopGrpcStreamingManager) Stop() { } -func (sm *NoopGrpcStreamingManager) StageFinalizeBlockFill( - ctx sdk.Context, - fill clobtypes.StreamOrderbookFill, -) { -} - func (sm *NoopGrpcStreamingManager) GetStagedFinalizeBlockEvents( ctx sdk.Context, ) []clobtypes.StagedFinalizeBlockEvent { return nil } -func (sm *NoopGrpcStreamingManager) StageFinalizeBlockSubaccountUpdate( +func (sm *NoopGrpcStreamingManager) SendSubaccountUpdate( ctx sdk.Context, subaccountUpdate satypes.StreamSubaccountUpdate, ) { diff --git a/protocol/streaming/types/interface.go b/protocol/streaming/types/interface.go index 0f097d3e755..4bb54c51110 100644 --- a/protocol/streaming/types/interface.go +++ b/protocol/streaming/types/interface.go @@ -32,12 +32,12 @@ type FullNodeStreamingManager interface { SendOrderbookUpdates( offchainUpdates *clobtypes.OffchainUpdates, blockHeight uint32, - execMode sdk.ExecMode, + ctx sdk.Context, ) - SendOrderbookFillUpdates( - orderbookFills []clobtypes.StreamOrderbookFill, + SendOrderbookFillUpdate( + orderbookFill clobtypes.StreamOrderbookFill, blockHeight uint32, - execMode sdk.ExecMode, + ctx sdk.Context, perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, ) SendTakerOrderStatus( @@ -50,11 +50,7 @@ type FullNodeStreamingManager interface { blockHeight uint32, execMode sdk.ExecMode, ) - StageFinalizeBlockFill( - ctx sdk.Context, - fill clobtypes.StreamOrderbookFill, - ) - StageFinalizeBlockSubaccountUpdate( + SendSubaccountUpdate( ctx sdk.Context, subaccountUpdate satypes.StreamSubaccountUpdate, ) diff --git a/protocol/testutil/memclob/keeper.go b/protocol/testutil/memclob/keeper.go index 22ba76acdd0..376f6fb30af 100644 --- a/protocol/testutil/memclob/keeper.go +++ b/protocol/testutil/memclob/keeper.go @@ -508,9 +508,9 @@ func (f *FakeMemClobKeeper) SendOrderbookUpdates( ) { } -func (f *FakeMemClobKeeper) SendOrderbookFillUpdates( +func (f *FakeMemClobKeeper) SendOrderbookFillUpdate( ctx sdk.Context, - orderbookFills []types.StreamOrderbookFill, + orderbookFill types.StreamOrderbookFill, ) { } diff --git a/protocol/x/clob/keeper/keeper.go b/protocol/x/clob/keeper/keeper.go index 8c39342ca64..65503feeadf 100644 --- a/protocol/x/clob/keeper/keeper.go +++ b/protocol/x/clob/keeper/keeper.go @@ -310,22 +310,19 @@ func (k Keeper) SendOrderbookUpdates( k.GetFullNodeStreamingManager().SendOrderbookUpdates( offchainUpdates, lib.MustConvertIntegerToUint32(ctx.BlockHeight()), - ctx.ExecMode(), + ctx, ) } -// SendOrderbookFillUpdates sends the orderbook fills to the Full Node streaming manager. -func (k Keeper) SendOrderbookFillUpdates( +// SendOrderbookFillUpdate sends the orderbook fills to the Full Node streaming manager. +func (k Keeper) SendOrderbookFillUpdate( ctx sdk.Context, - orderbookFills []types.StreamOrderbookFill, + orderbookFill types.StreamOrderbookFill, ) { - if len(orderbookFills) == 0 { - return - } - k.GetFullNodeStreamingManager().SendOrderbookFillUpdates( - orderbookFills, + k.GetFullNodeStreamingManager().SendOrderbookFillUpdate( + orderbookFill, lib.MustConvertIntegerToUint32(ctx.BlockHeight()), - ctx.ExecMode(), + ctx, k.PerpetualIdToClobPairId, ) } diff --git a/protocol/x/clob/keeper/process_operations.go b/protocol/x/clob/keeper/process_operations.go index 026d9d316cf..ad1c117eb7a 100644 --- a/protocol/x/clob/keeper/process_operations.go +++ b/protocol/x/clob/keeper/process_operations.go @@ -560,9 +560,11 @@ func (k Keeper) PersistMatchOrdersToState( makerOrders, ) - k.GetFullNodeStreamingManager().StageFinalizeBlockFill( - ctx, + k.GetFullNodeStreamingManager().SendOrderbookFillUpdate( streamOrderbookFill, + uint32(ctx.BlockHeight()), + ctx, + k.PerpetualIdToClobPairId, ) } @@ -670,9 +672,11 @@ func (k Keeper) PersistMatchLiquidationToState( takerOrder, makerOrders, ) - k.GetFullNodeStreamingManager().StageFinalizeBlockFill( - ctx, + k.GetFullNodeStreamingManager().SendOrderbookFillUpdate( streamOrderbookFill, + uint32(ctx.BlockHeight()), + ctx, + k.PerpetualIdToClobPairId, ) } return nil @@ -843,11 +847,9 @@ func (k Keeper) PersistMatchDeleveragingToState( }, }, } - k.SendOrderbookFillUpdates( + k.SendOrderbookFillUpdate( ctx, - []types.StreamOrderbookFill{ - streamOrderbookFill, - }, + streamOrderbookFill, ) } } diff --git a/protocol/x/clob/memclob/memclob.go b/protocol/x/clob/memclob/memclob.go index d6d1e087746..db541650c35 100644 --- a/protocol/x/clob/memclob/memclob.go +++ b/protocol/x/clob/memclob/memclob.go @@ -402,7 +402,7 @@ func (m *MemClobPriceTimePriority) mustUpdateMemclobStateWithMatches( ) clobMatch := internalOperation.GetMatch() orderbookMatchFill := m.GenerateStreamOrderbookFill(ctx, *clobMatch, takerOrder, makerOrders) - m.clobKeeper.SendOrderbookFillUpdates(ctx, []types.StreamOrderbookFill{orderbookMatchFill}) + m.clobKeeper.SendOrderbookFillUpdate(ctx, orderbookMatchFill) } // Build a slice of all subaccounts which had matches this matching loop, and sort them for determinism. diff --git a/protocol/x/clob/types/mem_clob_keeper.go b/protocol/x/clob/types/mem_clob_keeper.go index d5553677187..6e25cadf35d 100644 --- a/protocol/x/clob/types/mem_clob_keeper.go +++ b/protocol/x/clob/types/mem_clob_keeper.go @@ -102,9 +102,9 @@ type MemClobKeeper interface { ctx sdk.Context, offchainUpdates *OffchainUpdates, ) - SendOrderbookFillUpdates( + SendOrderbookFillUpdate( ctx sdk.Context, - orderbookFills []StreamOrderbookFill, + orderbookFill StreamOrderbookFill, ) SendTakerOrderStatus( ctx sdk.Context, diff --git a/protocol/x/clob/types/streaming.pb.go b/protocol/x/clob/types/streaming.pb.go index 83b8db719a7..1f6f552fb33 100644 --- a/protocol/x/clob/types/streaming.pb.go +++ b/protocol/x/clob/types/streaming.pb.go @@ -30,6 +30,7 @@ type StagedFinalizeBlockEvent struct { // Types that are valid to be assigned to Event: // *StagedFinalizeBlockEvent_OrderFill // *StagedFinalizeBlockEvent_SubaccountUpdate + // *StagedFinalizeBlockEvent_OrderbookUpdate Event isStagedFinalizeBlockEvent_Event `protobuf_oneof:"event"` } @@ -78,9 +79,13 @@ type StagedFinalizeBlockEvent_OrderFill struct { type StagedFinalizeBlockEvent_SubaccountUpdate struct { SubaccountUpdate *types.StreamSubaccountUpdate `protobuf:"bytes,2,opt,name=subaccount_update,json=subaccountUpdate,proto3,oneof" json:"subaccount_update,omitempty"` } +type StagedFinalizeBlockEvent_OrderbookUpdate struct { + OrderbookUpdate *StreamOrderbookUpdate `protobuf:"bytes,3,opt,name=orderbook_update,json=orderbookUpdate,proto3,oneof" json:"orderbook_update,omitempty"` +} func (*StagedFinalizeBlockEvent_OrderFill) isStagedFinalizeBlockEvent_Event() {} func (*StagedFinalizeBlockEvent_SubaccountUpdate) isStagedFinalizeBlockEvent_Event() {} +func (*StagedFinalizeBlockEvent_OrderbookUpdate) isStagedFinalizeBlockEvent_Event() {} func (m *StagedFinalizeBlockEvent) GetEvent() isStagedFinalizeBlockEvent_Event { if m != nil { @@ -103,11 +108,19 @@ func (m *StagedFinalizeBlockEvent) GetSubaccountUpdate() *types.StreamSubaccount return nil } +func (m *StagedFinalizeBlockEvent) GetOrderbookUpdate() *StreamOrderbookUpdate { + if x, ok := m.GetEvent().(*StagedFinalizeBlockEvent_OrderbookUpdate); ok { + return x.OrderbookUpdate + } + return nil +} + // XXX_OneofWrappers is for the internal use of the proto package. func (*StagedFinalizeBlockEvent) XXX_OneofWrappers() []interface{} { return []interface{}{ (*StagedFinalizeBlockEvent_OrderFill)(nil), (*StagedFinalizeBlockEvent_SubaccountUpdate)(nil), + (*StagedFinalizeBlockEvent_OrderbookUpdate)(nil), } } @@ -118,25 +131,26 @@ func init() { func init() { proto.RegisterFile("dydxprotocol/clob/streaming.proto", fileDescriptor_cecf6ffcf2554dee) } var fileDescriptor_cecf6ffcf2554dee = []byte{ - // 281 bytes of a gzipped FileDescriptorProto + // 303 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x52, 0x4c, 0xa9, 0x4c, 0xa9, 0x28, 0x28, 0xca, 0x2f, 0xc9, 0x4f, 0xce, 0xcf, 0xd1, 0x4f, 0xce, 0xc9, 0x4f, 0xd2, 0x2f, 0x2e, 0x29, 0x4a, 0x4d, 0xcc, 0xcd, 0xcc, 0x4b, 0xd7, 0x03, 0x8b, 0x0b, 0x09, 0x22, 0x2b, 0xd1, 0x03, 0x29, 0x91, 0xd2, 0x40, 0xd1, 0x55, 0x5c, 0x9a, 0x94, 0x98, 0x9c, 0x9c, 0x5f, 0x9a, 0x57, 0x52, - 0x8c, 0xae, 0x59, 0x4a, 0x16, 0xd3, 0xfc, 0xc2, 0xd2, 0xd4, 0xa2, 0x4a, 0x88, 0xb4, 0xd2, 0x59, - 0x46, 0x2e, 0x89, 0xe0, 0x92, 0xc4, 0xf4, 0xd4, 0x14, 0xb7, 0xcc, 0xbc, 0xc4, 0x9c, 0xcc, 0xaa, + 0x8c, 0xae, 0x59, 0x4a, 0x16, 0xd3, 0xfc, 0xc2, 0xd2, 0xd4, 0xa2, 0x4a, 0x88, 0xb4, 0xd2, 0x12, + 0x26, 0x2e, 0x89, 0xe0, 0x92, 0xc4, 0xf4, 0xd4, 0x14, 0xb7, 0xcc, 0xbc, 0xc4, 0x9c, 0xcc, 0xaa, 0x54, 0xa7, 0x9c, 0xfc, 0xe4, 0x6c, 0xd7, 0xb2, 0xd4, 0xbc, 0x12, 0x21, 0x77, 0x2e, 0xae, 0xfc, 0xa2, 0x94, 0xd4, 0xa2, 0xf8, 0xb4, 0xcc, 0x9c, 0x1c, 0x09, 0x46, 0x05, 0x46, 0x0d, 0x6e, 0x23, 0x35, 0x3d, 0x0c, 0xd7, 0xe8, 0x05, 0x83, 0xed, 0xf4, 0x07, 0x29, 0x4d, 0xca, 0xcf, 0xcf, 0x76, 0xcb, 0xcc, 0xc9, 0xf1, 0x60, 0x08, 0xe2, 0x04, 0xeb, 0x05, 0x71, 0x84, 0xe2, 0xb9, 0x04, 0x11, 0x6e, 0x8c, 0x2f, 0x2d, 0x48, 0x49, 0x2c, 0x49, 0x95, 0x60, 0x02, 0x9b, 0x67, 0x80, 0x6a, 0x1e, 0x92, 0x57, 0xa0, 0xc6, 0x06, 0xc3, 0x45, 0x42, 0xc1, 0xfa, 0x3c, 0x18, 0x82, 0x04, 0x8a, 0xd1, - 0xc4, 0x9c, 0xd8, 0xb9, 0x58, 0x53, 0x41, 0x4e, 0x76, 0x0a, 0x38, 0xf1, 0x48, 0x8e, 0xf1, 0xc2, - 0x23, 0x39, 0xc6, 0x07, 0x8f, 0xe4, 0x18, 0x27, 0x3c, 0x96, 0x63, 0xb8, 0xf0, 0x58, 0x8e, 0xe1, - 0xc6, 0x63, 0x39, 0x86, 0x28, 0xb3, 0xf4, 0xcc, 0x92, 0x8c, 0xd2, 0x24, 0xbd, 0xe4, 0xfc, 0x5c, - 0x7d, 0x94, 0x30, 0x29, 0x33, 0xd1, 0x4d, 0xce, 0x48, 0xcc, 0xcc, 0xd3, 0x87, 0x8b, 0x54, 0x40, - 0xc2, 0xa9, 0xa4, 0xb2, 0x20, 0xb5, 0x38, 0x89, 0x0d, 0x2c, 0x6c, 0x0c, 0x08, 0x00, 0x00, 0xff, - 0xff, 0x65, 0x71, 0xd8, 0xa8, 0xa9, 0x01, 0x00, 0x00, + 0xc4, 0x84, 0x42, 0xb9, 0x04, 0xf2, 0x61, 0xd6, 0xc3, 0xcc, 0x67, 0x06, 0x9b, 0xaf, 0x41, 0xd8, + 0xbd, 0x70, 0x73, 0xf9, 0xf3, 0x51, 0x85, 0x9c, 0xd8, 0xb9, 0x58, 0x53, 0x41, 0x21, 0xe1, 0x14, + 0x70, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e, 0xc9, 0x31, 0x4e, 0x78, 0x2c, + 0xc7, 0x70, 0xe1, 0xb1, 0x1c, 0xc3, 0x8d, 0xc7, 0x72, 0x0c, 0x51, 0x66, 0xe9, 0x99, 0x25, 0x19, + 0xa5, 0x49, 0x7a, 0xc9, 0xf9, 0xb9, 0xfa, 0x28, 0x41, 0x5d, 0x66, 0xa2, 0x9b, 0x9c, 0x91, 0x98, + 0x99, 0xa7, 0x0f, 0x17, 0xa9, 0x80, 0x04, 0x7f, 0x49, 0x65, 0x41, 0x6a, 0x71, 0x12, 0x1b, 0x58, + 0xd8, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0xe1, 0xc0, 0xbc, 0x6c, 0x00, 0x02, 0x00, 0x00, } func (m *StagedFinalizeBlockEvent) Marshal() (dAtA []byte, err error) { @@ -213,6 +227,27 @@ func (m *StagedFinalizeBlockEvent_SubaccountUpdate) MarshalToSizedBuffer(dAtA [] } return len(dAtA) - i, nil } +func (m *StagedFinalizeBlockEvent_OrderbookUpdate) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *StagedFinalizeBlockEvent_OrderbookUpdate) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.OrderbookUpdate != nil { + { + size, err := m.OrderbookUpdate.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintStreaming(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } + return len(dAtA) - i, nil +} func encodeVarintStreaming(dAtA []byte, offset int, v uint64) int { offset -= sovStreaming(v) base := offset @@ -260,6 +295,18 @@ func (m *StagedFinalizeBlockEvent_SubaccountUpdate) Size() (n int) { } return n } +func (m *StagedFinalizeBlockEvent_OrderbookUpdate) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.OrderbookUpdate != nil { + l = m.OrderbookUpdate.Size() + n += 1 + l + sovStreaming(uint64(l)) + } + return n +} func sovStreaming(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 @@ -366,6 +413,41 @@ func (m *StagedFinalizeBlockEvent) Unmarshal(dAtA []byte) error { } m.Event = &StagedFinalizeBlockEvent_SubaccountUpdate{v} iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field OrderbookUpdate", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStreaming + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthStreaming + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStreaming + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &StreamOrderbookUpdate{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Event = &StagedFinalizeBlockEvent_OrderbookUpdate{v} + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipStreaming(dAtA[iNdEx:]) diff --git a/protocol/x/subaccounts/keeper/subaccount.go b/protocol/x/subaccounts/keeper/subaccount.go index 5eb650cce09..e72ccd60e7f 100644 --- a/protocol/x/subaccounts/keeper/subaccount.go +++ b/protocol/x/subaccounts/keeper/subaccount.go @@ -445,7 +445,7 @@ func (k Keeper) UpdateSubaccounts( if lib.IsDeliverTxMode(ctx) && k.GetFullNodeStreamingManager().Enabled() { if k.GetFullNodeStreamingManager().TracksSubaccountId(*u.SettledSubaccount.Id) { subaccountUpdate := GenerateStreamSubaccountUpdate(u, fundingPayments) - k.GetFullNodeStreamingManager().StageFinalizeBlockSubaccountUpdate( + k.GetFullNodeStreamingManager().SendSubaccountUpdate( ctx, subaccountUpdate, )