Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Full node streaming] emit taker order status at end of matching loop #2022

Merged
merged 1 commit into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions protocol/mocks/MemClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,34 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(orderbookFills)))
}

// SendTakerOrderStatus sends out a taker order and its status to the full node streaming service.
func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus(
streamTakerOrder clobtypes.StreamTakerOrder,
blockHeight uint32,
execMode sdk.ExecMode,
) {
clobPairId := uint32(0)
if liqOrder := streamTakerOrder.GetLiquidationOrder(); liqOrder != nil {
clobPairId = liqOrder.ClobPairId
}
if takerOrder := streamTakerOrder.GetOrder(); takerOrder != nil {
clobPairId = takerOrder.OrderId.ClobPairId
}

sm.AddUpdatesToCache(
map[uint32][]clobtypes.StreamUpdate{
clobPairId: {
{
UpdateMessage: &clobtypes.StreamUpdate_TakerOrder{
TakerOrder: &streamTakerOrder,
},
Comment on lines +383 to +385
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my understanding - do we need to send any fill amounts for taker orders?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should have the fill amounts in a following full node stream update if you are subscribed to the correct clob pair id!

},
},
},
1,
)
}

func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache(
updatesByClobPairId map[uint32][]clobtypes.StreamUpdate,
numUpdatesToAdd uint32,
Expand Down
7 changes: 7 additions & 0 deletions protocol/streaming/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates(
) {
}

func (sm *NoopGrpcStreamingManager) SendTakerOrderStatus(
takerOrder clobtypes.StreamTakerOrder,
blockHeight uint32,
execMode sdk.ExecMode,
) {
}

func (sm *NoopGrpcStreamingManager) InitializeNewStreams(
getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates,
blockHeight uint32,
Expand Down
5 changes: 5 additions & 0 deletions protocol/streaming/types/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ type FullNodeStreamingManager interface {
execMode sdk.ExecMode,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
)
SendTakerOrderStatus(
takerOrder clobtypes.StreamTakerOrder,
blockHeight uint32,
execMode sdk.ExecMode,
)
}

type OutgoingMessageSender interface {
Expand Down
6 changes: 6 additions & 0 deletions protocol/testutil/memclob/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,12 @@ func (f *FakeMemClobKeeper) SendOrderbookFillUpdates(
) {
}

func (f *FakeMemClobKeeper) SendTakerOrderStatus(
ctx sdk.Context,
takerOrder types.StreamTakerOrder,
) {
}
Comment on lines +515 to +519
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provide a mock implementation or a TODO comment.

The new method SendTakerOrderStatus is added to the FakeMemClobKeeper struct but its implementation is empty. Consider providing a mock implementation or adding a TODO comment to indicate future work.

+  // TODO: Implement the mock logic for SendTakerOrderStatus
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (f *FakeMemClobKeeper) SendTakerOrderStatus(
ctx sdk.Context,
takerOrder types.StreamTakerOrder,
) {
}
func (f *FakeMemClobKeeper) SendTakerOrderStatus(
ctx sdk.Context,
takerOrder types.StreamTakerOrder,
) {
// TODO: Implement the mock logic for SendTakerOrderStatus
}


// Placeholder to satisfy interface implementation of types.MemClobKeeper
func (f *FakeMemClobKeeper) AddOrderToOrderbookSubaccountUpdatesCheck(
ctx sdk.Context,
Expand Down
16 changes: 14 additions & 2 deletions protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (k Keeper) InitializeNewStreams(ctx sdk.Context) {
)
}

// SendOrderbookUpdates sends the offchain updates to the gRPC streaming manager.
// SendOrderbookUpdates sends the offchain updates to the Full Node streaming manager.
func (k Keeper) SendOrderbookUpdates(
ctx sdk.Context,
offchainUpdates *types.OffchainUpdates,
Expand All @@ -283,7 +283,7 @@ func (k Keeper) SendOrderbookUpdates(
)
}

// SendOrderbookFillUpdates sends the orderbook fills to the gRPC streaming manager.
// SendOrderbookFillUpdates sends the orderbook fills to the Full Node streaming manager.
func (k Keeper) SendOrderbookFillUpdates(
ctx sdk.Context,
orderbookFills []types.StreamOrderbookFill,
Expand All @@ -298,3 +298,15 @@ func (k Keeper) SendOrderbookFillUpdates(
k.PerpetualIdToClobPairId,
)
}

// SendTakerOrderStatus sends the taker order with its status to the Full Node streaming manager.
func (k Keeper) SendTakerOrderStatus(
ctx sdk.Context,
takerOrder types.StreamTakerOrder,
) {
k.GetFullNodeStreamingManager().SendTakerOrderStatus(
takerOrder,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
}
12 changes: 12 additions & 0 deletions protocol/x/clob/memclob/memclob.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,18 @@ func (m *MemClobPriceTimePriority) matchOrder(
order,
)

// If full node streaming is on, emit the taker order and its resulting status.
if m.generateOrderbookUpdates {
streamTakerOrder := m.GenerateStreamTakerOrder(
order,
takerOrderStatus,
)
m.clobKeeper.SendTakerOrderStatus(
ctx,
streamTakerOrder,
)
}

// If this is a replacement order, then ensure we remove the existing order from the orderbook.
if !order.IsLiquidation() {
orderId := order.MustGetOrder().OrderId
Expand Down
25 changes: 25 additions & 0 deletions protocol/x/clob/memclob/memclob_grpc_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,28 @@ func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderUpdate(
}
return offchainUpdates
}

// GenerateStreamTakerOrder returns a `StreamTakerOrder` object used in full node
// streaming from a matchableOrder and a taker order status.
func (m *MemClobPriceTimePriority) GenerateStreamTakerOrder(
takerOrder types.MatchableOrder,
takerOrderStatus types.TakerOrderStatus,
) types.StreamTakerOrder {
if takerOrder.IsLiquidation() {
liquidationOrder := takerOrder.MustGetLiquidationOrder()
streamLiquidationOrder := liquidationOrder.ToStreamLiquidationOrder()
return types.StreamTakerOrder{
TakerOrder: &types.StreamTakerOrder_LiquidationOrder{
LiquidationOrder: streamLiquidationOrder,
},
TakerOrderStatus: takerOrderStatus.ToStreamingTakerOrderStatus(),
}
}
order := takerOrder.MustGetOrder()
return types.StreamTakerOrder{
TakerOrder: &types.StreamTakerOrder_Order{
Order: &order,
},
TakerOrderStatus: takerOrderStatus.ToStreamingTakerOrderStatus(),
}
}
Comment on lines +161 to +184
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor suggestion: Simplify the nested if-else.

The nested if-else can be simplified for better readability.

func (m *MemClobPriceTimePriority) GenerateStreamTakerOrder(
	takerOrder types.MatchableOrder,
	takerOrderStatus types.TakerOrderStatus,
) types.StreamTakerOrder {
	if takerOrder.IsLiquidation() {
		liquidationOrder := takerOrder.MustGetLiquidationOrder()
		streamLiquidationOrder := liquidationOrder.ToStreamLiquidationOrder()
		return types.StreamTakerOrder{
			TakerOrder: &types.StreamTakerOrder_LiquidationOrder{
				LiquidationOrder: streamLiquidationOrder,
			},
			TakerOrderStatus: takerOrderStatus.ToStreamingTakerOrderStatus(),
		}
	}
	
	order := takerOrder.MustGetOrder()
	return types.StreamTakerOrder{
		TakerOrder: &types.StreamTakerOrder_Order{
			Order: &order,
		},
		TakerOrderStatus: takerOrderStatus.ToStreamingTakerOrderStatus(),
	}
}

Committable suggestion was skipped due to low confidence.

6 changes: 6 additions & 0 deletions protocol/x/clob/types/liquidation_order.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ func (lo *LiquidationOrder) MustGetOrder() Order {
panic("MustGetOrder: No underlying order on a LiquidationOrder type.")
}

// MustGetLiquidationOrder returns the underlying `LiquidationOrder` type.
// This function is necessary for the `LiquidationOrder` type to implement the `MatchableOrder` interface.
func (lo *LiquidationOrder) MustGetLiquidationOrder() LiquidationOrder {
return *lo
}

// MustGetLiquidatedPerpetualId returns the perpetual ID that this perpetual order is liquidating.
// This function is necessary for the `LiquidationOrder` type to implement the `MatchableOrder` interface.
func (lo *LiquidationOrder) MustGetLiquidatedPerpetualId() uint32 {
Expand Down
4 changes: 4 additions & 0 deletions protocol/x/clob/types/mem_clob_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ type MemClobKeeper interface {
ctx sdk.Context,
orderbookFills []StreamOrderbookFill,
)
SendTakerOrderStatus(
ctx sdk.Context,
takerOrder StreamTakerOrder,
)
AddOrderToOrderbookSubaccountUpdatesCheck(
ctx sdk.Context,
subaccountId satypes.SubaccountId,
Expand Down
6 changes: 6 additions & 0 deletions protocol/x/clob/types/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ func (o *Order) MustGetOrder() Order {
return *o
}

// MustGetLiquidationOrder always panics since Order is not a Liquidation Order.
// This function is necessary for the `Order` type to implement the `MatchableOrder` interface.
func (o *Order) MustGetLiquidationOrder() LiquidationOrder {
panic("MustGetLiquidationOrder: Order is not a liquidation order")
}

// MustGetLiquidatedPerpetualId always panics since there is no underlying perpetual ID for a `Order`.
// This function is necessary for the `Order` type to implement the `MatchableOrder` interface.
func (o *Order) MustGetLiquidatedPerpetualId() uint32 {
Expand Down
3 changes: 3 additions & 0 deletions protocol/x/clob/types/orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ type MatchableOrder interface {
// MustGetOrder returns the underlying order if this is not a liquidation order. Panics if called
// for a liquidation order.
MustGetOrder() Order
// MustGetLiquidationOrder returns the underlying liquidation order if this is not a regular order.
// Panics if called for a regular order.
MustGetLiquidationOrder() LiquidationOrder
// MustGetLiquidatedPerpetualId returns the perpetual ID if this is a liquidation order. Panics
// if called for a non-liquidation order.
MustGetLiquidatedPerpetualId() uint32
Expand Down
Loading