diff --git a/protocol/mocks/MemClobKeeper.go b/protocol/mocks/MemClobKeeper.go index 0b8e5d2504..8eacc26019 100644 --- a/protocol/mocks/MemClobKeeper.go +++ b/protocol/mocks/MemClobKeeper.go @@ -416,6 +416,11 @@ func (_m *MemClobKeeper) SendOrderbookUpdates(ctx types.Context, offchainUpdates _m.Called(ctx, offchainUpdates) } +// SendTakerOrderStatus provides a mock function with given fields: ctx, takerOrder +func (_m *MemClobKeeper) SendTakerOrderStatus(ctx types.Context, takerOrder clobtypes.StreamTakerOrder) { + _m.Called(ctx, takerOrder) +} + // SetLongTermOrderPlacement provides a mock function with given fields: ctx, order, blockHeight func (_m *MemClobKeeper) SetLongTermOrderPlacement(ctx types.Context, order clobtypes.Order, blockHeight uint32) { _m.Called(ctx, order, blockHeight) diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index 1d035ed6b3..2ddfd001be 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -362,6 +362,34 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates( sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(orderbookFills))) } +// SendTakerOrderStatus sends out a taker order and its status to the full node streaming service. +func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus( + streamTakerOrder clobtypes.StreamTakerOrder, + blockHeight uint32, + execMode sdk.ExecMode, +) { + clobPairId := uint32(0) + if liqOrder := streamTakerOrder.GetLiquidationOrder(); liqOrder != nil { + clobPairId = liqOrder.ClobPairId + } + if takerOrder := streamTakerOrder.GetOrder(); takerOrder != nil { + clobPairId = takerOrder.OrderId.ClobPairId + } + + sm.AddUpdatesToCache( + map[uint32][]clobtypes.StreamUpdate{ + clobPairId: { + { + UpdateMessage: &clobtypes.StreamUpdate_TakerOrder{ + TakerOrder: &streamTakerOrder, + }, + }, + }, + }, + 1, + ) +} + func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache( updatesByClobPairId map[uint32][]clobtypes.StreamUpdate, numUpdatesToAdd uint32, diff --git a/protocol/streaming/noop_streaming_manager.go b/protocol/streaming/noop_streaming_manager.go index 2334142223..749bcf2b67 100644 --- a/protocol/streaming/noop_streaming_manager.go +++ b/protocol/streaming/noop_streaming_manager.go @@ -42,6 +42,13 @@ func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates( ) { } +func (sm *NoopGrpcStreamingManager) SendTakerOrderStatus( + takerOrder clobtypes.StreamTakerOrder, + blockHeight uint32, + execMode sdk.ExecMode, +) { +} + func (sm *NoopGrpcStreamingManager) InitializeNewStreams( getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates, blockHeight uint32, diff --git a/protocol/streaming/types/interface.go b/protocol/streaming/types/interface.go index d357dddc39..66fcf808e5 100644 --- a/protocol/streaming/types/interface.go +++ b/protocol/streaming/types/interface.go @@ -34,6 +34,11 @@ type FullNodeStreamingManager interface { execMode sdk.ExecMode, perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, ) + SendTakerOrderStatus( + takerOrder clobtypes.StreamTakerOrder, + blockHeight uint32, + execMode sdk.ExecMode, + ) } type OutgoingMessageSender interface { diff --git a/protocol/testutil/memclob/keeper.go b/protocol/testutil/memclob/keeper.go index ea515ff62d..ce15a95e29 100644 --- a/protocol/testutil/memclob/keeper.go +++ b/protocol/testutil/memclob/keeper.go @@ -512,6 +512,12 @@ func (f *FakeMemClobKeeper) SendOrderbookFillUpdates( ) { } +func (f *FakeMemClobKeeper) SendTakerOrderStatus( + ctx sdk.Context, + takerOrder types.StreamTakerOrder, +) { +} + // Placeholder to satisfy interface implementation of types.MemClobKeeper func (f *FakeMemClobKeeper) AddOrderToOrderbookSubaccountUpdatesCheck( ctx sdk.Context, diff --git a/protocol/x/clob/keeper/keeper.go b/protocol/x/clob/keeper/keeper.go index 92921b9afb..0fbf4535b4 100644 --- a/protocol/x/clob/keeper/keeper.go +++ b/protocol/x/clob/keeper/keeper.go @@ -267,7 +267,7 @@ func (k Keeper) InitializeNewStreams(ctx sdk.Context) { ) } -// SendOrderbookUpdates sends the offchain updates to the gRPC streaming manager. +// SendOrderbookUpdates sends the offchain updates to the Full Node streaming manager. func (k Keeper) SendOrderbookUpdates( ctx sdk.Context, offchainUpdates *types.OffchainUpdates, @@ -283,7 +283,7 @@ func (k Keeper) SendOrderbookUpdates( ) } -// SendOrderbookFillUpdates sends the orderbook fills to the gRPC streaming manager. +// SendOrderbookFillUpdates sends the orderbook fills to the Full Node streaming manager. func (k Keeper) SendOrderbookFillUpdates( ctx sdk.Context, orderbookFills []types.StreamOrderbookFill, @@ -298,3 +298,15 @@ func (k Keeper) SendOrderbookFillUpdates( k.PerpetualIdToClobPairId, ) } + +// SendTakerOrderStatus sends the taker order with its status to the Full Node streaming manager. +func (k Keeper) SendTakerOrderStatus( + ctx sdk.Context, + takerOrder types.StreamTakerOrder, +) { + k.GetFullNodeStreamingManager().SendTakerOrderStatus( + takerOrder, + lib.MustConvertIntegerToUint32(ctx.BlockHeight()), + ctx.ExecMode(), + ) +} diff --git a/protocol/x/clob/memclob/memclob.go b/protocol/x/clob/memclob/memclob.go index 6c9779571f..8054a41e78 100644 --- a/protocol/x/clob/memclob/memclob.go +++ b/protocol/x/clob/memclob/memclob.go @@ -767,6 +767,18 @@ func (m *MemClobPriceTimePriority) matchOrder( order, ) + // If full node streaming is on, emit the taker order and its resulting status. + if m.generateOrderbookUpdates { + streamTakerOrder := m.GenerateStreamTakerOrder( + order, + takerOrderStatus, + ) + m.clobKeeper.SendTakerOrderStatus( + ctx, + streamTakerOrder, + ) + } + // If this is a replacement order, then ensure we remove the existing order from the orderbook. if !order.IsLiquidation() { orderId := order.MustGetOrder().OrderId diff --git a/protocol/x/clob/memclob/memclob_grpc_streaming.go b/protocol/x/clob/memclob/memclob_grpc_streaming.go index d33988b078..ba2f48b236 100644 --- a/protocol/x/clob/memclob/memclob_grpc_streaming.go +++ b/protocol/x/clob/memclob/memclob_grpc_streaming.go @@ -157,3 +157,28 @@ func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderUpdate( } return offchainUpdates } + +// GenerateStreamTakerOrder returns a `StreamTakerOrder` object used in full node +// streaming from a matchableOrder and a taker order status. +func (m *MemClobPriceTimePriority) GenerateStreamTakerOrder( + takerOrder types.MatchableOrder, + takerOrderStatus types.TakerOrderStatus, +) types.StreamTakerOrder { + if takerOrder.IsLiquidation() { + liquidationOrder := takerOrder.MustGetLiquidationOrder() + streamLiquidationOrder := liquidationOrder.ToStreamLiquidationOrder() + return types.StreamTakerOrder{ + TakerOrder: &types.StreamTakerOrder_LiquidationOrder{ + LiquidationOrder: streamLiquidationOrder, + }, + TakerOrderStatus: takerOrderStatus.ToStreamingTakerOrderStatus(), + } + } + order := takerOrder.MustGetOrder() + return types.StreamTakerOrder{ + TakerOrder: &types.StreamTakerOrder_Order{ + Order: &order, + }, + TakerOrderStatus: takerOrderStatus.ToStreamingTakerOrderStatus(), + } +} diff --git a/protocol/x/clob/types/liquidation_order.go b/protocol/x/clob/types/liquidation_order.go index 20aa56fe3d..5804d0f9f0 100644 --- a/protocol/x/clob/types/liquidation_order.go +++ b/protocol/x/clob/types/liquidation_order.go @@ -115,6 +115,12 @@ func (lo *LiquidationOrder) MustGetOrder() Order { panic("MustGetOrder: No underlying order on a LiquidationOrder type.") } +// MustGetLiquidationOrder returns the underlying `LiquidationOrder` type. +// This function is necessary for the `LiquidationOrder` type to implement the `MatchableOrder` interface. +func (lo *LiquidationOrder) MustGetLiquidationOrder() LiquidationOrder { + return *lo +} + // MustGetLiquidatedPerpetualId returns the perpetual ID that this perpetual order is liquidating. // This function is necessary for the `LiquidationOrder` type to implement the `MatchableOrder` interface. func (lo *LiquidationOrder) MustGetLiquidatedPerpetualId() uint32 { diff --git a/protocol/x/clob/types/mem_clob_keeper.go b/protocol/x/clob/types/mem_clob_keeper.go index c198c739cf..18898e79e2 100644 --- a/protocol/x/clob/types/mem_clob_keeper.go +++ b/protocol/x/clob/types/mem_clob_keeper.go @@ -104,6 +104,10 @@ type MemClobKeeper interface { ctx sdk.Context, orderbookFills []StreamOrderbookFill, ) + SendTakerOrderStatus( + ctx sdk.Context, + takerOrder StreamTakerOrder, + ) AddOrderToOrderbookSubaccountUpdatesCheck( ctx sdk.Context, subaccountId satypes.SubaccountId, diff --git a/protocol/x/clob/types/order.go b/protocol/x/clob/types/order.go index 9a4e559391..deac6716e1 100644 --- a/protocol/x/clob/types/order.go +++ b/protocol/x/clob/types/order.go @@ -132,6 +132,12 @@ func (o *Order) MustGetOrder() Order { return *o } +// MustGetLiquidationOrder always panics since Order is not a Liquidation Order. +// This function is necessary for the `Order` type to implement the `MatchableOrder` interface. +func (o *Order) MustGetLiquidationOrder() LiquidationOrder { + panic("MustGetLiquidationOrder: Order is not a liquidation order") +} + // MustGetLiquidatedPerpetualId always panics since there is no underlying perpetual ID for a `Order`. // This function is necessary for the `Order` type to implement the `MatchableOrder` interface. func (o *Order) MustGetLiquidatedPerpetualId() uint32 { diff --git a/protocol/x/clob/types/orderbook.go b/protocol/x/clob/types/orderbook.go index ef4cf65e93..ba6d1f31bc 100644 --- a/protocol/x/clob/types/orderbook.go +++ b/protocol/x/clob/types/orderbook.go @@ -206,6 +206,9 @@ type MatchableOrder interface { // MustGetOrder returns the underlying order if this is not a liquidation order. Panics if called // for a liquidation order. MustGetOrder() Order + // MustGetLiquidationOrder returns the underlying liquidation order if this is not a regular order. + // Panics if called for a regular order. + MustGetLiquidationOrder() LiquidationOrder // MustGetLiquidatedPerpetualId returns the perpetual ID if this is a liquidation order. Panics // if called for a non-liquidation order. MustGetLiquidatedPerpetualId() uint32