Skip to content

Commit

Permalink
[OTE-456] FNS x OE: stage FinalizeBlock events and emit in `Precomm…
Browse files Browse the repository at this point in the history
…it` (#2253)
  • Loading branch information
teddyding authored Sep 16, 2024
1 parent 03454e4 commit 025cc85
Show file tree
Hide file tree
Showing 17 changed files with 1,132 additions and 304 deletions.
494 changes: 248 additions & 246 deletions indexer/packages/v4-protos/src/codegen/dydxprotocol/bundle.ts

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { StreamOrderbookFill, StreamOrderbookFillSDKType } from "./query";
import { StreamSubaccountUpdate, StreamSubaccountUpdateSDKType } from "../subaccounts/streaming";
import * as _m0 from "protobufjs/minimal";
import { DeepPartial } from "../../helpers";
/** StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`. */

export interface StagedFinalizeBlockEvent {
orderFill?: StreamOrderbookFill;
subaccountUpdate?: StreamSubaccountUpdate;
}
/** StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`. */

export interface StagedFinalizeBlockEventSDKType {
order_fill?: StreamOrderbookFillSDKType;
subaccount_update?: StreamSubaccountUpdateSDKType;
}

function createBaseStagedFinalizeBlockEvent(): StagedFinalizeBlockEvent {
return {
orderFill: undefined,
subaccountUpdate: undefined
};
}

export const StagedFinalizeBlockEvent = {
encode(message: StagedFinalizeBlockEvent, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.orderFill !== undefined) {
StreamOrderbookFill.encode(message.orderFill, writer.uint32(10).fork()).ldelim();
}

if (message.subaccountUpdate !== undefined) {
StreamSubaccountUpdate.encode(message.subaccountUpdate, writer.uint32(18).fork()).ldelim();
}

return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): StagedFinalizeBlockEvent {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseStagedFinalizeBlockEvent();

while (reader.pos < end) {
const tag = reader.uint32();

switch (tag >>> 3) {
case 1:
message.orderFill = StreamOrderbookFill.decode(reader, reader.uint32());
break;

case 2:
message.subaccountUpdate = StreamSubaccountUpdate.decode(reader, reader.uint32());
break;

default:
reader.skipType(tag & 7);
break;
}
}

return message;
},

fromPartial(object: DeepPartial<StagedFinalizeBlockEvent>): StagedFinalizeBlockEvent {
const message = createBaseStagedFinalizeBlockEvent();
message.orderFill = object.orderFill !== undefined && object.orderFill !== null ? StreamOrderbookFill.fromPartial(object.orderFill) : undefined;
message.subaccountUpdate = object.subaccountUpdate !== undefined && object.subaccountUpdate !== null ? StreamSubaccountUpdate.fromPartial(object.subaccountUpdate) : undefined;
return message;
}

};
4 changes: 2 additions & 2 deletions indexer/packages/v4-protos/src/codegen/gogoproto/bundle.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
import * as _127 from "./gogo";
export const gogoproto = { ..._127
import * as _128 from "./gogo";
export const gogoproto = { ..._128
};
22 changes: 11 additions & 11 deletions indexer/packages/v4-protos/src/codegen/google/bundle.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import * as _128 from "./api/annotations";
import * as _129 from "./api/http";
import * as _130 from "./protobuf/descriptor";
import * as _131 from "./protobuf/duration";
import * as _132 from "./protobuf/timestamp";
import * as _133 from "./protobuf/any";
import * as _129 from "./api/annotations";
import * as _130 from "./api/http";
import * as _131 from "./protobuf/descriptor";
import * as _132 from "./protobuf/duration";
import * as _133 from "./protobuf/timestamp";
import * as _134 from "./protobuf/any";
export namespace google {
export const api = { ..._128,
..._129
export const api = { ..._129,
..._130
};
export const protobuf = { ..._130,
..._131,
export const protobuf = { ..._131,
..._132,
..._133
..._133,
..._134
};
}
16 changes: 16 additions & 0 deletions proto/dydxprotocol/clob/streaming.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
syntax = "proto3";
package dydxprotocol.clob;

import "dydxprotocol/subaccounts/streaming.proto";
import "dydxprotocol/clob/query.proto";

option go_package = "github.com/dydxprotocol/v4-chain/protocol/x/clob/types";

// StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`.
message StagedFinalizeBlockEvent {
// Contains one of StreamOrderbookFill, StreamSubaccountUpdate.
oneof event {
StreamOrderbookFill order_fill = 1;
dydxprotocol.subaccounts.StreamSubaccountUpdate subaccount_update = 2;
}
}
4 changes: 4 additions & 0 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ func New(
statsmoduletypes.TransientStoreKey,
rewardsmoduletypes.TransientStoreKey,
indexer_manager.TransientStoreKey,
streaming.StreamingManagerTransientStoreKey,
perpetualsmoduletypes.TransientStoreKey,
)
memKeys := storetypes.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, clobmoduletypes.MemStoreKey)
Expand Down Expand Up @@ -764,6 +765,7 @@ func New(
appFlags,
appCodec,
logger,
tkeys[streaming.StreamingManagerTransientStoreKey],
)

timeProvider := &timelib.TimeProviderImpl{}
Expand Down Expand Up @@ -2059,6 +2061,7 @@ func getFullNodeStreamingManagerFromOptions(
appFlags flags.Flags,
cdc codec.Codec,
logger log.Logger,
streamingManagerTransientStoreKey storetypes.StoreKey,
) (manager streamingtypes.FullNodeStreamingManager, wsServer *ws.WebsocketServer) {
logger = logger.With(log.ModuleKey, "full-node-streaming")
if appFlags.GrpcStreamingEnabled {
Expand All @@ -2072,6 +2075,7 @@ func getFullNodeStreamingManagerFromOptions(
appFlags.GrpcStreamingMaxBatchSize,
appFlags.GrpcStreamingMaxChannelBufferSize,
appFlags.FullNodeStreamingSnapshotInterval,
streamingManagerTransientStoreKey,
)

// Start websocket server.
Expand Down
1 change: 1 addition & 0 deletions protocol/lib/metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ const (
UpdateType = "update_type"
ValidateMatches = "validate_matches"
ValidateOrder = "validate_order"
StreamBatchUpdatesAfterFinalizeBlock = "stream_batch_updates_after_finalize_block"

// MemCLOB.
AddedToOrderBook = "added_to_orderbook"
Expand Down
29 changes: 16 additions & 13 deletions protocol/lib/metrics/metric_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,22 @@ const (
GateWithdrawalsIfNegativeTncSubaccountSeenLatency = "gate_withdrawals_if_negative_tnc_subaccount_seen_latency"

// Full node grpc
FullNodeGrpc = "full_node_grpc"
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency"
GrpcSendSubaccountSnapshotLatency = "grpc_send_subaccount_snapshot_latency"
GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
GrpcSendFinalizedSubaccountUpdatesLatency = "grpc_send_finalized_subaccount_updates_latency"
GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count"
GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count"
GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count"
GrpcStreamSubscriberCount = "grpc_stream_subscriber_count"
GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered"
GrpcFlushUpdatesLatency = "grpc_flush_updates_latency"
GrpcSubscriptionChannelLength = "grpc_subscription_channel_length"
FullNodeGrpc = "full_node_grpc"
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency"
GrpcSendSubaccountSnapshotLatency = "grpc_send_subaccount_snapshot_latency"
GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
GrpcSendFinalizedSubaccountUpdatesLatency = "grpc_send_finalized_subaccount_updates_latency"
GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count"
GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count"
GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count"
GrpcStreamSubscriberCount = "grpc_stream_subscriber_count"
GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered"
GrpcFlushUpdatesLatency = "grpc_flush_updates_latency"
GrpcSubscriptionChannelLength = "grpc_subscription_channel_length"
GrpcStagedAllFinalizeBlockUpdatesCount = "grpc_staged_all_finalize_block_updates_count"
GrpcStagedFillFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_fill_updates_count"
GrpcStagedSubaccountFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_subaccount_updates_count"

EndBlocker = "end_blocker"
EndBlockerLag = "end_blocker_lag"
Expand Down
13 changes: 13 additions & 0 deletions protocol/streaming/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package streaming

// Constants for FullNodeStreamingManager.
const (
// Transient store key for storing staged events.
StreamingManagerTransientStoreKey = "tmp_streaming"

// Key for storing the count of staged events.
StagedEventsCountKey = "EvtCnt"

// Key prefix for staged events.
StagedEventsKeyPrefix = "Evt:"
)
Loading

0 comments on commit 025cc85

Please sign in to comment.