Skip to content

Commit

Permalink
Internalize logic to stage FinalizeBlock events
Browse files Browse the repository at this point in the history
  • Loading branch information
teddyding committed Sep 27, 2024
1 parent 7193a10 commit 26723d6
Show file tree
Hide file tree
Showing 13 changed files with 217 additions and 111 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { StreamOrderbookFill, StreamOrderbookFillSDKType } from "./query";
import { StreamOrderbookFill, StreamOrderbookFillSDKType, StreamOrderbookUpdate, StreamOrderbookUpdateSDKType } from "./query";
import { StreamSubaccountUpdate, StreamSubaccountUpdateSDKType } from "../subaccounts/streaming";
import * as _m0 from "protobufjs/minimal";
import { DeepPartial } from "../../helpers";
Expand All @@ -7,18 +7,21 @@ import { DeepPartial } from "../../helpers";
export interface StagedFinalizeBlockEvent {
orderFill?: StreamOrderbookFill;
subaccountUpdate?: StreamSubaccountUpdate;
orderbookUpdate?: StreamOrderbookUpdate;
}
/** StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`. */

export interface StagedFinalizeBlockEventSDKType {
order_fill?: StreamOrderbookFillSDKType;
subaccount_update?: StreamSubaccountUpdateSDKType;
orderbook_update?: StreamOrderbookUpdateSDKType;
}

function createBaseStagedFinalizeBlockEvent(): StagedFinalizeBlockEvent {
return {
orderFill: undefined,
subaccountUpdate: undefined
subaccountUpdate: undefined,
orderbookUpdate: undefined
};
}

Expand All @@ -32,6 +35,10 @@ export const StagedFinalizeBlockEvent = {
StreamSubaccountUpdate.encode(message.subaccountUpdate, writer.uint32(18).fork()).ldelim();
}

if (message.orderbookUpdate !== undefined) {
StreamOrderbookUpdate.encode(message.orderbookUpdate, writer.uint32(26).fork()).ldelim();
}

return writer;
},

Expand All @@ -52,6 +59,10 @@ export const StagedFinalizeBlockEvent = {
message.subaccountUpdate = StreamSubaccountUpdate.decode(reader, reader.uint32());
break;

case 3:
message.orderbookUpdate = StreamOrderbookUpdate.decode(reader, reader.uint32());
break;

default:
reader.skipType(tag & 7);
break;
Expand All @@ -65,6 +76,7 @@ export const StagedFinalizeBlockEvent = {
const message = createBaseStagedFinalizeBlockEvent();
message.orderFill = object.orderFill !== undefined && object.orderFill !== null ? StreamOrderbookFill.fromPartial(object.orderFill) : undefined;
message.subaccountUpdate = object.subaccountUpdate !== undefined && object.subaccountUpdate !== null ? StreamSubaccountUpdate.fromPartial(object.subaccountUpdate) : undefined;
message.orderbookUpdate = object.orderbookUpdate !== undefined && object.orderbookUpdate !== null ? StreamOrderbookUpdate.fromPartial(object.orderbookUpdate) : undefined;
return message;
}

Expand Down
1 change: 1 addition & 0 deletions proto/dydxprotocol/clob/streaming.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ message StagedFinalizeBlockEvent {
oneof event {
StreamOrderbookFill order_fill = 1;
dydxprotocol.subaccounts.StreamSubaccountUpdate subaccount_update = 2;
StreamOrderbookUpdate orderbook_update = 3;
}
}
6 changes: 3 additions & 3 deletions protocol/mocks/MemClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

122 changes: 72 additions & 50 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,38 +391,21 @@ func getStagedEventsCount(store storetypes.KVStore) uint32 {
}

// Stage a subaccount update event in transient store, during `FinalizeBlock`.
func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockSubaccountUpdate(
func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate(
ctx sdk.Context,
subaccountUpdate satypes.StreamSubaccountUpdate,
) {
lib.AssertDeliverTxMode(ctx)
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{
SubaccountUpdate: &subaccountUpdate,
},
// If not `DeliverTx`, return since we don't stream optimistic subaccount updates.
if !lib.IsDeliverTxMode(ctx) {
return
}
sm.stageFinalizeBlockEvent(
ctx,
sm.cdc.MustMarshal(&stagedEvent),
)
}

// Stage a fill event in transient store, during `FinalizeBlock`.
// Since `FinalizeBlock` code block can be called more than once with optimistic
// execution (once optimistically and optionally once on the canonical block),
// we need to stage the events in transient store and later emit them
// during `Precommit`.
func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockFill(
ctx sdk.Context,
fill clobtypes.StreamOrderbookFill,
) {
lib.AssertDeliverTxMode(ctx)
// If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block.
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{
OrderFill: &fill,
Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{
SubaccountUpdate: &subaccountUpdate,
},
}

sm.stageFinalizeBlockEvent(
ctx,
sm.cdc.MustMarshal(&stagedEvent),
Expand Down Expand Up @@ -545,17 +528,38 @@ func getStreamUpdatesFromOffchainUpdates(
func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
blockHeight uint32,
execMode sdk.ExecMode,
ctx sdk.Context,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookUpdatesLatency,
time.Now(),
)
// If not `DeliverTx`, then updates are optimistic. Stream them directly.
if !lib.IsDeliverTxMode(ctx) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookUpdatesLatency,
time.Now(),
)

streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates(offchainUpdates, blockHeight, execMode)
streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates(offchainUpdates, blockHeight, ctx.ExecMode())
sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
return
}

sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
// If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block.
v1updates, err := streaming_util.GetOffchainUpdatesV1(offchainUpdates)
if err != nil {
panic(err)
}
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Updates: v1updates,
Snapshot: false,
},
},
}
sm.stageFinalizeBlockEvent(
ctx,
sm.cdc.MustMarshal(&stagedEvent),
)
}

func (sm *FullNodeStreamingManagerImpl) getStreamUpdatesForOrderbookFills(
Expand Down Expand Up @@ -595,36 +599,54 @@ func (sm *FullNodeStreamingManagerImpl) getStreamUpdatesForOrderbookFills(
return streamUpdates, clobPairIds
}

// SendOrderbookFillUpdates groups fills by their clob pair ids and
// SendOrderbookFillUpdate groups fills by their clob pair ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
orderbookFills []clobtypes.StreamOrderbookFill,
func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate(
orderbookFill clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
ctx sdk.Context,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookFillsLatency,
time.Now(),
)
// If not `DeliverTx`, then updates are optimistic. Stream them directly.
if !lib.IsDeliverTxMode(ctx) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookFillsLatency,
time.Now(),
)

streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills(
orderbookFills,
blockHeight,
execMode,
perpetualIdToClobPairId,
)
streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills(
[]clobtypes.StreamOrderbookFill{orderbookFill},
blockHeight,
ctx.ExecMode(),
perpetualIdToClobPairId,
)
sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
return
}

// If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block.
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{
OrderFill: &orderbookFill,
},
}

sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
sm.stageFinalizeBlockEvent(
ctx,
sm.cdc.MustMarshal(&stagedEvent),
)
}

// SendTakerOrderStatus sends out a taker order and its status to the full node streaming service.
func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus(
streamTakerOrder clobtypes.StreamTakerOrder,
blockHeight uint32,
execMode sdk.ExecMode,
ctx sdk.Context,
) {
// In current design, we never send this during `DeliverTx` (`FinalizeBlock`).
lib.AssertCheckTxMode(ctx)

clobPairId := uint32(0)
if liqOrder := streamTakerOrder.GetLiquidationOrder(); liqOrder != nil {
clobPairId = liqOrder.ClobPairId
Expand All @@ -640,7 +662,7 @@ func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus(
TakerOrder: &streamTakerOrder,
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
ExecMode: uint32(ctx.ExecMode()),
},
},
[]uint32{clobPairId},
Expand Down
18 changes: 6 additions & 12 deletions protocol/streaming/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,22 @@ func (sm *NoopGrpcStreamingManager) Subscribe(
func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates(
updates *clobtypes.OffchainUpdates,
blockHeight uint32,
execMode sdk.ExecMode,
ctx sdk.Context,
) {
}

func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates(
orderbookFills []clobtypes.StreamOrderbookFill,
func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdate(
orderbookFill clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
ctx sdk.Context,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) {
}

func (sm *NoopGrpcStreamingManager) SendTakerOrderStatus(
takerOrder clobtypes.StreamTakerOrder,
blockHeight uint32,
execMode sdk.ExecMode,
ctx sdk.Context,
) {
}

Expand Down Expand Up @@ -79,19 +79,13 @@ func (sm *NoopGrpcStreamingManager) InitializeNewStreams(
func (sm *NoopGrpcStreamingManager) Stop() {
}

func (sm *NoopGrpcStreamingManager) StageFinalizeBlockFill(
ctx sdk.Context,
fill clobtypes.StreamOrderbookFill,
) {
}

func (sm *NoopGrpcStreamingManager) GetStagedFinalizeBlockEvents(
ctx sdk.Context,
) []clobtypes.StagedFinalizeBlockEvent {
return nil
}

func (sm *NoopGrpcStreamingManager) StageFinalizeBlockSubaccountUpdate(
func (sm *NoopGrpcStreamingManager) SendSubaccountUpdate(
ctx sdk.Context,
subaccountUpdate satypes.StreamSubaccountUpdate,
) {
Expand Down
16 changes: 6 additions & 10 deletions protocol/streaming/types/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,25 @@ type FullNodeStreamingManager interface {
SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
blockHeight uint32,
execMode sdk.ExecMode,
ctx sdk.Context,
)
SendOrderbookFillUpdates(
orderbookFills []clobtypes.StreamOrderbookFill,
SendOrderbookFillUpdate(
orderbookFill clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
ctx sdk.Context,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
)
SendTakerOrderStatus(
takerOrder clobtypes.StreamTakerOrder,
blockHeight uint32,
execMode sdk.ExecMode,
ctx sdk.Context,
)
SendFinalizedSubaccountUpdates(
subaccountUpdates []satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
)
StageFinalizeBlockFill(
ctx sdk.Context,
fill clobtypes.StreamOrderbookFill,
)
StageFinalizeBlockSubaccountUpdate(
SendSubaccountUpdate(
ctx sdk.Context,
subaccountUpdate satypes.StreamSubaccountUpdate,
)
Expand Down
4 changes: 2 additions & 2 deletions protocol/testutil/memclob/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,9 +508,9 @@ func (f *FakeMemClobKeeper) SendOrderbookUpdates(
) {
}

func (f *FakeMemClobKeeper) SendOrderbookFillUpdates(
func (f *FakeMemClobKeeper) SendOrderbookFillUpdate(
ctx sdk.Context,
orderbookFills []types.StreamOrderbookFill,
orderbookFill types.StreamOrderbookFill,
) {
}

Expand Down
19 changes: 8 additions & 11 deletions protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,22 +310,19 @@ func (k Keeper) SendOrderbookUpdates(
k.GetFullNodeStreamingManager().SendOrderbookUpdates(
offchainUpdates,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
ctx,
)
}

// SendOrderbookFillUpdates sends the orderbook fills to the Full Node streaming manager.
func (k Keeper) SendOrderbookFillUpdates(
// SendOrderbookFillUpdate sends the orderbook fills to the Full Node streaming manager.
func (k Keeper) SendOrderbookFillUpdate(
ctx sdk.Context,
orderbookFills []types.StreamOrderbookFill,
orderbookFill types.StreamOrderbookFill,
) {
if len(orderbookFills) == 0 {
return
}
k.GetFullNodeStreamingManager().SendOrderbookFillUpdates(
orderbookFills,
k.GetFullNodeStreamingManager().SendOrderbookFillUpdate(
orderbookFill,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
ctx,
k.PerpetualIdToClobPairId,
)
}
Expand All @@ -338,6 +335,6 @@ func (k Keeper) SendTakerOrderStatus(
k.GetFullNodeStreamingManager().SendTakerOrderStatus(
takerOrder,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
ctx,
)
}
Loading

0 comments on commit 26723d6

Please sign in to comment.