From 5aa268ebefb73b50024632423760298f4423b366 Mon Sep 17 00:00:00 2001 From: Jonathan Fung <121899091+jonfung-dydx@users.noreply.github.com> Date: Mon, 10 Jun 2024 15:17:09 -0400 Subject: [PATCH] GRPC Streaming Batching (#1633) --- protocol/app/app.go | 9 +- protocol/app/flags/flags.go | 53 +++- protocol/app/flags/flags_test.go | 69 ++++- protocol/lib/metrics/metric_keys.go | 3 + protocol/mocks/ClobKeeper.go | 6 +- protocol/mocks/MemClobKeeper.go | 6 +- .../streaming/grpc/grpc_streaming_manager.go | 240 ++++++++++++++---- .../streaming/grpc/noop_streaming_manager.go | 11 +- protocol/streaming/grpc/types/manager.go | 8 +- protocol/testutil/memclob/keeper.go | 1 - protocol/x/clob/keeper/keeper.go | 8 +- protocol/x/clob/keeper/order_state.go | 2 +- protocol/x/clob/keeper/process_operations.go | 2 +- protocol/x/clob/memclob/memclob.go | 6 +- protocol/x/clob/types/clob_keeper.go | 1 - protocol/x/clob/types/mem_clob_keeper.go | 1 - 16 files changed, 342 insertions(+), 84 deletions(-) diff --git a/protocol/app/app.go b/protocol/app/app.go index d05be10f0d..6028530e97 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -464,6 +464,9 @@ func New( if app.SlinkyClient != nil { app.SlinkyClient.Stop() } + if app.GrpcStreamingManager != nil { + app.GrpcStreamingManager.Stop() + } return nil }, ) @@ -1952,7 +1955,11 @@ func getGrpcStreamingManagerFromOptions( ) (manager streamingtypes.GrpcStreamingManager) { if appFlags.GrpcStreamingEnabled { logger.Info("GRPC streaming is enabled") - return streaming.NewGrpcStreamingManager(logger) + return streaming.NewGrpcStreamingManager( + logger, + appFlags.GrpcStreamingFlushIntervalMs, + appFlags.GrpcStreamingMaxBufferSize, + ) } return streaming.NewNoopGrpcStreamingManager() } diff --git a/protocol/app/flags/flags.go b/protocol/app/flags/flags.go index 22bf36243c..75708ef92b 100644 --- a/protocol/app/flags/flags.go +++ b/protocol/app/flags/flags.go @@ -21,8 +21,11 @@ type Flags struct { GrpcEnable bool // Grpc Streaming - GrpcStreamingEnabled bool - VEOracleEnabled bool // Slinky Vote Extensions + GrpcStreamingEnabled bool + GrpcStreamingFlushIntervalMs uint32 + GrpcStreamingMaxBufferSize uint32 + + VEOracleEnabled bool // Slinky Vote Extensions } // List of CLI flags. @@ -37,7 +40,9 @@ const ( GrpcEnable = "grpc.enable" // Grpc Streaming - GrpcStreamingEnabled = "grpc-streaming-enabled" + GrpcStreamingEnabled = "grpc-streaming-enabled" + GrpcStreamingFlushIntervalMs = "grpc-streaming-flush-interval-ms" + GrpcStreamingMaxBufferSize = "grpc-streaming-max-buffer-size" // Slinky VEs enabled VEOracleEnabled = "slinky-vote-extension-oracle-enabled" @@ -50,8 +55,11 @@ const ( DefaultNonValidatingFullNode = false DefaultDdErrorTrackingFormat = false - DefaultGrpcStreamingEnabled = false - DefaultVEOracleEnabled = true + DefaultGrpcStreamingEnabled = false + DefaultGrpcStreamingFlushIntervalMs = 50 + DefaultGrpcStreamingMaxBufferSize = 10000 + + DefaultVEOracleEnabled = true ) // AddFlagsToCmd adds flags to app initialization. @@ -85,6 +93,16 @@ func AddFlagsToCmd(cmd *cobra.Command) { DefaultGrpcStreamingEnabled, "Whether to enable grpc streaming for full nodes", ) + cmd.Flags().Uint32( + GrpcStreamingFlushIntervalMs, + DefaultGrpcStreamingFlushIntervalMs, + "Flush interval (in ms) for grpc streaming", + ) + cmd.Flags().Uint32( + GrpcStreamingMaxBufferSize, + DefaultGrpcStreamingMaxBufferSize, + "Maximum buffer size before grpc streaming cancels all connections", + ) cmd.Flags().Bool( VEOracleEnabled, DefaultVEOracleEnabled, @@ -104,6 +122,12 @@ func (f *Flags) Validate() error { if !f.GrpcEnable { return fmt.Errorf("grpc.enable must be set to true - grpc streaming requires gRPC server") } + if f.GrpcStreamingMaxBufferSize == 0 { + return fmt.Errorf("grpc streaming buffer size must be positive number") + } + if f.GrpcStreamingFlushIntervalMs == 0 { + return fmt.Errorf("grpc streaming flush interval must be positive number") + } } return nil } @@ -124,8 +148,11 @@ func GetFlagValuesFromOptions( GrpcAddress: config.DefaultGRPCAddress, GrpcEnable: true, - GrpcStreamingEnabled: DefaultGrpcStreamingEnabled, - VEOracleEnabled: true, + GrpcStreamingEnabled: DefaultGrpcStreamingEnabled, + GrpcStreamingFlushIntervalMs: DefaultGrpcStreamingFlushIntervalMs, + GrpcStreamingMaxBufferSize: DefaultGrpcStreamingMaxBufferSize, + + VEOracleEnabled: true, } // Populate the flags if they exist. @@ -171,6 +198,18 @@ func GetFlagValuesFromOptions( } } + if option := appOpts.Get(GrpcStreamingFlushIntervalMs); option != nil { + if v, err := cast.ToUint32E(option); err == nil { + result.GrpcStreamingFlushIntervalMs = v + } + } + + if option := appOpts.Get(GrpcStreamingMaxBufferSize); option != nil { + if v, err := cast.ToUint32E(option); err == nil { + result.GrpcStreamingMaxBufferSize = v + } + } + if option := appOpts.Get(VEOracleEnabled); option != nil { if v, err := cast.ToBoolE(option); err == nil { result.VEOracleEnabled = v diff --git a/protocol/app/flags/flags_test.go b/protocol/app/flags/flags_test.go index dd6f12db85..c3b5a7bc59 100644 --- a/protocol/app/flags/flags_test.go +++ b/protocol/app/flags/flags_test.go @@ -32,6 +32,12 @@ func TestAddFlagsToCommand(t *testing.T) { fmt.Sprintf("Has %s flag", flags.GrpcStreamingEnabled): { flagName: flags.GrpcStreamingEnabled, }, + fmt.Sprintf("Has %s flag", flags.GrpcStreamingFlushIntervalMs): { + flagName: flags.GrpcStreamingFlushIntervalMs, + }, + fmt.Sprintf("Has %s flag", flags.GrpcStreamingMaxBufferSize): { + flagName: flags.GrpcStreamingMaxBufferSize, + }, } for name, tc := range tests { @@ -63,9 +69,11 @@ func TestValidate(t *testing.T) { }, "success - gRPC streaming enabled for validating nodes": { flags: flags.Flags{ - NonValidatingFullNode: false, - GrpcEnable: true, - GrpcStreamingEnabled: true, + NonValidatingFullNode: false, + GrpcEnable: true, + GrpcStreamingEnabled: true, + GrpcStreamingFlushIntervalMs: 100, + GrpcStreamingMaxBufferSize: 10000, }, }, "failure - gRPC disabled": { @@ -82,6 +90,26 @@ func TestValidate(t *testing.T) { }, expectedErr: fmt.Errorf("grpc.enable must be set to true - grpc streaming requires gRPC server"), }, + "failure - gRPC streaming enabled with zero buffer size": { + flags: flags.Flags{ + NonValidatingFullNode: true, + GrpcEnable: true, + GrpcStreamingEnabled: true, + GrpcStreamingFlushIntervalMs: 100, + GrpcStreamingMaxBufferSize: 0, + }, + expectedErr: fmt.Errorf("grpc streaming buffer size must be positive number"), + }, + "failure - gRPC streaming enabled with zero flush interval ms": { + flags: flags.Flags{ + NonValidatingFullNode: true, + GrpcEnable: true, + GrpcStreamingEnabled: true, + GrpcStreamingFlushIntervalMs: 0, + GrpcStreamingMaxBufferSize: 10000, + }, + expectedErr: fmt.Errorf("grpc streaming flush interval must be positive number"), + }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { @@ -107,6 +135,8 @@ func TestGetFlagValuesFromOptions(t *testing.T) { expectedGrpcAddress string expectedGrpcEnable bool expectedGrpcStreamingEnable bool + expectedGrpcStreamingFlushMs uint32 + expectedGrpcStreamingBufferSize uint32 }{ "Sets to default if unset": { expectedNonValidatingFullNodeFlag: false, @@ -115,15 +145,19 @@ func TestGetFlagValuesFromOptions(t *testing.T) { expectedGrpcAddress: "localhost:9090", expectedGrpcEnable: true, expectedGrpcStreamingEnable: false, + expectedGrpcStreamingFlushMs: 50, + expectedGrpcStreamingBufferSize: 10000, }, "Sets values from options": { optsMap: map[string]any{ - flags.NonValidatingFullNodeFlag: true, - flags.DdAgentHost: "agentHostTest", - flags.DdTraceAgentPort: uint16(777), - flags.GrpcEnable: false, - flags.GrpcAddress: "localhost:9091", - flags.GrpcStreamingEnabled: "true", + flags.NonValidatingFullNodeFlag: true, + flags.DdAgentHost: "agentHostTest", + flags.DdTraceAgentPort: uint16(777), + flags.GrpcEnable: false, + flags.GrpcAddress: "localhost:9091", + flags.GrpcStreamingEnabled: "true", + flags.GrpcStreamingFlushIntervalMs: uint32(408), + flags.GrpcStreamingMaxBufferSize: uint32(650), }, expectedNonValidatingFullNodeFlag: true, expectedDdAgentHost: "agentHostTest", @@ -131,6 +165,8 @@ func TestGetFlagValuesFromOptions(t *testing.T) { expectedGrpcEnable: false, expectedGrpcAddress: "localhost:9091", expectedGrpcStreamingEnable: true, + expectedGrpcStreamingFlushMs: 408, + expectedGrpcStreamingBufferSize: 650, }, } @@ -168,6 +204,21 @@ func TestGetFlagValuesFromOptions(t *testing.T) { tc.expectedGrpcAddress, flags.GrpcAddress, ) + require.Equal( + t, + tc.expectedGrpcStreamingEnable, + flags.GrpcStreamingEnabled, + ) + require.Equal( + t, + tc.expectedGrpcStreamingFlushMs, + flags.GrpcStreamingFlushIntervalMs, + ) + require.Equal( + t, + tc.expectedGrpcStreamingBufferSize, + flags.GrpcStreamingMaxBufferSize, + ) }) } } diff --git a/protocol/lib/metrics/metric_keys.go b/protocol/lib/metrics/metric_keys.go index a9d2e36b4d..f92b19d4a5 100644 --- a/protocol/lib/metrics/metric_keys.go +++ b/protocol/lib/metrics/metric_keys.go @@ -67,10 +67,13 @@ const ( // Full node grpc FullNodeGrpc = "full_node_grpc" GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency" + GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency" GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency" GrpcEmitProtocolUpdateCount = "grpc_emit_protocol_update_count" GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count" GrpcStreamSubscriberCount = "grpc_stream_subscriber_count" + GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered" + GrpcFlushUpdatesLatency = "grpc_flush_updates_latency" EndBlocker = "end_blocker" EndBlockerLag = "end_blocker_lag" diff --git a/protocol/mocks/ClobKeeper.go b/protocol/mocks/ClobKeeper.go index b1a869afa3..b79121b052 100644 --- a/protocol/mocks/ClobKeeper.go +++ b/protocol/mocks/ClobKeeper.go @@ -1149,9 +1149,9 @@ func (_m *ClobKeeper) RemoveOrderFillAmount(ctx types.Context, orderId clobtypes _m.Called(ctx, orderId) } -// SendOrderbookUpdates provides a mock function with given fields: ctx, offchainUpdates, snapshot -func (_m *ClobKeeper) SendOrderbookUpdates(ctx types.Context, offchainUpdates *clobtypes.OffchainUpdates, snapshot bool) { - _m.Called(ctx, offchainUpdates, snapshot) +// SendOrderbookUpdates provides a mock function with given fields: ctx, offchainUpdates +func (_m *ClobKeeper) SendOrderbookUpdates(ctx types.Context, offchainUpdates *clobtypes.OffchainUpdates) { + _m.Called(ctx, offchainUpdates) } // SetLongTermOrderPlacement provides a mock function with given fields: ctx, order, blockHeight diff --git a/protocol/mocks/MemClobKeeper.go b/protocol/mocks/MemClobKeeper.go index 973d0100e3..83bf530fbe 100644 --- a/protocol/mocks/MemClobKeeper.go +++ b/protocol/mocks/MemClobKeeper.go @@ -400,9 +400,9 @@ func (_m *MemClobKeeper) SendOrderbookFillUpdates(ctx types.Context, orderbookFi _m.Called(ctx, orderbookFills) } -// SendOrderbookUpdates provides a mock function with given fields: ctx, offchainUpdates, snapshot -func (_m *MemClobKeeper) SendOrderbookUpdates(ctx types.Context, offchainUpdates *clobtypes.OffchainUpdates, snapshot bool) { - _m.Called(ctx, offchainUpdates, snapshot) +// SendOrderbookUpdates provides a mock function with given fields: ctx, offchainUpdates +func (_m *MemClobKeeper) SendOrderbookUpdates(ctx types.Context, offchainUpdates *clobtypes.OffchainUpdates) { + _m.Called(ctx, offchainUpdates) } // SetLongTermOrderPlacement provides a mock function with given fields: ctx, order, blockHeight diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index fd876a1fb1..9ad4dfa5f6 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -1,6 +1,7 @@ package grpc import ( + "fmt" "sync" "time" @@ -25,6 +26,15 @@ type GrpcStreamingManagerImpl struct { // orderbookSubscriptions maps subscription IDs to their respective orderbook subscriptions. orderbookSubscriptions map[uint32]*OrderbookSubscription nextSubscriptionId uint32 + + // grpc stream will batch and flush out messages every 10 ms. + ticker *time.Ticker + done chan bool + // map of clob pair id to stream updates. + streamUpdateCache map[uint32][]clobtypes.StreamUpdate + numUpdatesInCache uint32 + + maxUpdatesInCache uint32 } // OrderbookSubscription represents a active subscription to the orderbook updates stream. @@ -41,12 +51,36 @@ type OrderbookSubscription struct { func NewGrpcStreamingManager( logger log.Logger, + flushIntervalMs uint32, + maxUpdatesInCache uint32, ) *GrpcStreamingManagerImpl { logger = logger.With(log.ModuleKey, "grpc-streaming") - return &GrpcStreamingManagerImpl{ + grpcStreamingManager := &GrpcStreamingManagerImpl{ logger: logger, orderbookSubscriptions: make(map[uint32]*OrderbookSubscription), + nextSubscriptionId: 0, + + ticker: time.NewTicker(time.Duration(flushIntervalMs) * time.Millisecond), + done: make(chan bool), + streamUpdateCache: make(map[uint32][]clobtypes.StreamUpdate), + numUpdatesInCache: 0, + + maxUpdatesInCache: maxUpdatesInCache, } + + // Start the goroutine for pushing order updates through + go func() { + for { + select { + case <-grpcStreamingManager.ticker.C: + grpcStreamingManager.FlushStreamUpdates() + case <-grpcStreamingManager.done: + return + } + } + }() + + return grpcStreamingManager } func (sm *GrpcStreamingManagerImpl) Enabled() bool { @@ -54,6 +88,10 @@ func (sm *GrpcStreamingManagerImpl) Enabled() bool { } func (sm *GrpcStreamingManagerImpl) EmitMetrics() { + metrics.SetGauge( + metrics.GrpcStreamNumUpdatesBuffered, + float32(sm.numUpdatesInCache), + ) metrics.SetGauge( metrics.GrpcStreamSubscriberCount, float32(len(sm.orderbookSubscriptions)), @@ -88,17 +126,32 @@ func (sm *GrpcStreamingManagerImpl) Subscribe( return nil } -// SendOrderbookUpdates groups updates by their clob pair ids and -// sends messages to the subscribers. -func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( +// removeSubscription removes a subscription from the grpc streaming manager. +// The streaming manager's lock should already be acquired from the thread running this. +func (sm *GrpcStreamingManagerImpl) removeSubscription( + subscriptionIdToRemove uint32, +) { + delete(sm.orderbookSubscriptions, subscriptionIdToRemove) + sm.logger.Info( + fmt.Sprintf("Removed grpc streaming subscription id %+v", subscriptionIdToRemove), + ) +} + +func (sm *GrpcStreamingManagerImpl) Stop() { + sm.done <- true +} + +// SendSnapshot groups updates by their clob pair ids and +// sends messages to the subscribers. It groups out updates differently +// and bypasses the buffer. +func (sm *GrpcStreamingManagerImpl) SendSnapshot( offchainUpdates *clobtypes.OffchainUpdates, - snapshot bool, blockHeight uint32, execMode sdk.ExecMode, ) { defer metrics.ModuleMeasureSince( metrics.FullNodeGrpc, - metrics.GrpcSendOrderbookUpdatesLatency, + metrics.GrpcSendOrderbookSnapshotLatency, time.Now(), ) @@ -122,7 +175,10 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( updatesByClobPairId[clobPairId] = v1updates } - updatesBySubscriptionId := make(map[uint32][]clobtypes.StreamUpdate) + sm.Lock() + defer sm.Unlock() + + idsToRemove := make([]uint32, 0) for id, subscription := range sm.orderbookSubscriptions { // Consolidate orderbook updates into a single `StreamUpdate`. v1updates := make([]ocutypes.OffChainUpdateV1, 0) @@ -133,22 +189,83 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( } if len(v1updates) > 0 { - updatesBySubscriptionId[id] = []clobtypes.StreamUpdate{ - { - UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ - OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ - Updates: v1updates, - Snapshot: snapshot, + if err := subscription.srv.Send( + &clobtypes.StreamOrderbookUpdatesResponse{ + Updates: []clobtypes.StreamUpdate{ + { + UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ + OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ + Updates: v1updates, + Snapshot: true, + }, + }, + BlockHeight: blockHeight, + ExecMode: uint32(execMode), }, }, - BlockHeight: blockHeight, - ExecMode: uint32(execMode), }, + ); err != nil { + sm.logger.Error( + fmt.Sprintf("Error sending out update for grpc streaming subscription %+v", id), + "err", err, + ) + idsToRemove = append(idsToRemove, id) } } } - sm.sendStreamUpdate(updatesBySubscriptionId) + // Clean up subscriptions that have been closed. + // If a Send update has failed for any clob pair id, the whole subscription will be removed. + for _, id := range idsToRemove { + sm.removeSubscription(id) + } +} + +// SendOrderbookUpdates groups updates by their clob pair ids and +// sends messages to the subscribers. +func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( + offchainUpdates *clobtypes.OffchainUpdates, + blockHeight uint32, + execMode sdk.ExecMode, +) { + defer metrics.ModuleMeasureSince( + metrics.FullNodeGrpc, + metrics.GrpcSendOrderbookUpdatesLatency, + time.Now(), + ) + + // Group updates by clob pair id. + updates := make(map[uint32]*clobtypes.OffchainUpdates) + for _, message := range offchainUpdates.Messages { + clobPairId := message.OrderId.ClobPairId + if _, ok := updates[clobPairId]; !ok { + updates[clobPairId] = clobtypes.NewOffchainUpdates() + } + updates[clobPairId].Messages = append(updates[clobPairId].Messages, message) + } + + // Unmarshal each per-clob pair message to v1 updates. + updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate) + for clobPairId, update := range updates { + v1updates, err := GetOffchainUpdatesV1(update) + if err != nil { + panic(err) + } + updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{ + { + UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ + OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ + Updates: v1updates, + Snapshot: false, + }, + }, + BlockHeight: blockHeight, + ExecMode: uint32(execMode), + }, + } + } + + sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(updates))) } // SendOrderbookFillUpdates groups fills by their clob pair ids and @@ -185,49 +302,74 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates( updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate) } - updatesBySubscriptionId := make(map[uint32][]clobtypes.StreamUpdate) - for id, subscription := range sm.orderbookSubscriptions { - streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0) - for _, clobPairId := range subscription.clobPairIds { - if update, ok := updatesByClobPairId[clobPairId]; ok { - streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...) - } - } + sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(orderbookFills))) +} - updatesBySubscriptionId[id] = streamUpdatesForSubscription - } +func (sm *GrpcStreamingManagerImpl) AddUpdatesToCache( + updatesByClobPairId map[uint32][]clobtypes.StreamUpdate, + numUpdatesToAdd uint32, +) { + sm.Lock() + defer sm.Unlock() - sm.sendStreamUpdate(updatesBySubscriptionId) + for clobPairId, streamUpdates := range updatesByClobPairId { + sm.streamUpdateCache[clobPairId] = append(sm.streamUpdateCache[clobPairId], streamUpdates...) + } + sm.numUpdatesInCache += numUpdatesToAdd + + // Remove all subscriptions and wipe the buffer if buffer overflows. + if sm.numUpdatesInCache > sm.maxUpdatesInCache { + sm.logger.Error("GRPC Streaming buffer full capacity. Dropping messages and all subscriptions. " + + "Disconnect all clients and increase buffer size via the grpc-stream-buffer-size flag.") + for id := range sm.orderbookSubscriptions { + sm.removeSubscription(id) + } + clear(sm.streamUpdateCache) + sm.numUpdatesInCache = 0 + } } -// sendStreamUpdate takes in a map of clob pair id to stream updates and emits them to subscribers. -func (sm *GrpcStreamingManagerImpl) sendStreamUpdate( - updatesBySubscriptionId map[uint32][]clobtypes.StreamUpdate, -) { - metrics.IncrCounter( - metrics.GrpcEmitProtocolUpdateCount, - 1, +// FlushStreamUpdates takes in a map of clob pair id to stream updates and emits them to subscribers. +func (sm *GrpcStreamingManagerImpl) FlushStreamUpdates() { + defer metrics.ModuleMeasureSince( + metrics.FullNodeGrpc, + metrics.GrpcFlushUpdatesLatency, + time.Now(), ) sm.Lock() defer sm.Unlock() + metrics.IncrCounter( + metrics.GrpcEmitProtocolUpdateCount, + 1, + ) + // Send updates to subscribers. idsToRemove := make([]uint32, 0) for id, subscription := range sm.orderbookSubscriptions { - if streamUpdatesForSubscription, ok := updatesBySubscriptionId[id]; ok { - if len(streamUpdatesForSubscription) > 0 { - metrics.IncrCounter( - metrics.GrpcSendResponseToSubscriberCount, - 1, + streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0) + for _, clobPairId := range subscription.clobPairIds { + if update, ok := sm.streamUpdateCache[clobPairId]; ok { + streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...) + } + } + + if len(streamUpdatesForSubscription) > 0 { + metrics.IncrCounter( + metrics.GrpcSendResponseToSubscriberCount, + 1, + ) + if err := subscription.srv.Send( + &clobtypes.StreamOrderbookUpdatesResponse{ + Updates: streamUpdatesForSubscription, + }, + ); err != nil { + sm.logger.Error( + fmt.Sprintf("Error sending out update for grpc streaming subscription %+v", id), + "err", err, ) - if err := subscription.srv.Send( - &clobtypes.StreamOrderbookUpdatesResponse{ - Updates: streamUpdatesForSubscription, - }, - ); err != nil { - idsToRemove = append(idsToRemove, id) - } + idsToRemove = append(idsToRemove, id) } } } @@ -235,8 +377,12 @@ func (sm *GrpcStreamingManagerImpl) sendStreamUpdate( // Clean up subscriptions that have been closed. // If a Send update has failed for any clob pair id, the whole subscription will be removed. for _, id := range idsToRemove { - delete(sm.orderbookSubscriptions, id) + sm.removeSubscription(id) } + + clear(sm.streamUpdateCache) + sm.numUpdatesInCache = 0 + sm.EmitMetrics() } diff --git a/protocol/streaming/grpc/noop_streaming_manager.go b/protocol/streaming/grpc/noop_streaming_manager.go index 424871b4c3..0875e89faa 100644 --- a/protocol/streaming/grpc/noop_streaming_manager.go +++ b/protocol/streaming/grpc/noop_streaming_manager.go @@ -27,9 +27,15 @@ func (sm *NoopGrpcStreamingManager) Subscribe( return clobtypes.ErrGrpcStreamingManagerNotEnabled } +func (sm *NoopGrpcStreamingManager) SendSnapshot( + updates *clobtypes.OffchainUpdates, + blockHeight uint32, + execMode sdk.ExecMode, +) { +} + func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates( updates *clobtypes.OffchainUpdates, - snapshot bool, blockHeight uint32, execMode sdk.ExecMode, ) { @@ -46,3 +52,6 @@ func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates( func (sm *NoopGrpcStreamingManager) GetUninitializedClobPairIds() []uint32 { return []uint32{} } + +func (sm *NoopGrpcStreamingManager) Stop() { +} diff --git a/protocol/streaming/grpc/types/manager.go b/protocol/streaming/grpc/types/manager.go index 9b5af0c093..ec43821093 100644 --- a/protocol/streaming/grpc/types/manager.go +++ b/protocol/streaming/grpc/types/manager.go @@ -7,7 +7,7 @@ import ( type GrpcStreamingManager interface { Enabled() bool - + Stop() // L3+ Orderbook updates. Subscribe( req clobtypes.StreamOrderbookUpdatesRequest, @@ -16,9 +16,13 @@ type GrpcStreamingManager interface { err error, ) GetUninitializedClobPairIds() []uint32 + SendSnapshot( + offchainUpdates *clobtypes.OffchainUpdates, + blockHeight uint32, + execMode sdk.ExecMode, + ) SendOrderbookUpdates( offchainUpdates *clobtypes.OffchainUpdates, - snapshot bool, blockHeight uint32, execMode sdk.ExecMode, ) diff --git a/protocol/testutil/memclob/keeper.go b/protocol/testutil/memclob/keeper.go index 1284e98e2f..6410307b54 100644 --- a/protocol/testutil/memclob/keeper.go +++ b/protocol/testutil/memclob/keeper.go @@ -503,7 +503,6 @@ func (f *FakeMemClobKeeper) Logger(ctx sdk.Context) log.Logger { func (f *FakeMemClobKeeper) SendOrderbookUpdates( ctx sdk.Context, offchainUpdates *types.OffchainUpdates, - snapshot bool, ) { } diff --git a/protocol/x/clob/keeper/keeper.go b/protocol/x/clob/keeper/keeper.go index 0cbf14e586..5a31a0fa19 100644 --- a/protocol/x/clob/keeper/keeper.go +++ b/protocol/x/clob/keeper/keeper.go @@ -271,14 +271,17 @@ func (k Keeper) InitializeNewGrpcStreams(ctx sdk.Context) { allUpdates.Append(update) } - k.SendOrderbookUpdates(ctx, allUpdates, true) + streamingManager.SendSnapshot( + allUpdates, + lib.MustConvertIntegerToUint32(ctx.BlockHeight()), + ctx.ExecMode(), + ) } // SendOrderbookUpdates sends the offchain updates to the gRPC streaming manager. func (k Keeper) SendOrderbookUpdates( ctx sdk.Context, offchainUpdates *types.OffchainUpdates, - snapshot bool, ) { if len(offchainUpdates.Messages) == 0 { return @@ -286,7 +289,6 @@ func (k Keeper) SendOrderbookUpdates( k.GetGrpcStreamingManager().SendOrderbookUpdates( offchainUpdates, - snapshot, lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx.ExecMode(), ) diff --git a/protocol/x/clob/keeper/order_state.go b/protocol/x/clob/keeper/order_state.go index 51fa457a2a..df6909323d 100644 --- a/protocol/x/clob/keeper/order_state.go +++ b/protocol/x/clob/keeper/order_state.go @@ -267,7 +267,7 @@ func (k Keeper) RemoveOrderFillAmount(ctx sdk.Context, orderId types.OrderId) { ); success { allUpdates.AddUpdateMessage(orderId, message) } - k.SendOrderbookUpdates(ctx, allUpdates, false) + k.SendOrderbookUpdates(ctx, allUpdates) } } diff --git a/protocol/x/clob/keeper/process_operations.go b/protocol/x/clob/keeper/process_operations.go index 703895902e..887d89b938 100644 --- a/protocol/x/clob/keeper/process_operations.go +++ b/protocol/x/clob/keeper/process_operations.go @@ -74,7 +74,7 @@ func (k Keeper) ProcessProposerOperations( orderbookUpdate := k.MemClob.GetOrderbookUpdatesForOrderUpdate(ctx, orderId) allUpdates.Append(orderbookUpdate) } - k.SendOrderbookUpdates(ctx, allUpdates, false) + k.SendOrderbookUpdates(ctx, allUpdates) } log.DebugLog(ctx, "Processing operations queue", diff --git a/protocol/x/clob/memclob/memclob.go b/protocol/x/clob/memclob/memclob.go index 2ad5310654..2b838a440a 100644 --- a/protocol/x/clob/memclob/memclob.go +++ b/protocol/x/clob/memclob/memclob.go @@ -877,7 +877,7 @@ func (m *MemClobPriceTimePriority) matchOrder( updates := m.GetOrderbookUpdatesForOrderUpdate(ctx, fill.MakerOrderId) allUpdates.Append(updates) } - m.clobKeeper.SendOrderbookUpdates(ctx, allUpdates, false) + m.clobKeeper.SendOrderbookUpdates(ctx, allUpdates) } return takerOrderStatus, offchainUpdates, makerOrdersToRemove, matchingErr @@ -1510,7 +1510,7 @@ func (m *MemClobPriceTimePriority) mustAddOrderToOrderbook( if m.generateOrderbookUpdates { // Send an orderbook update to grpc streams. orderbookUpdate := m.GetOrderbookUpdatesForOrderPlacement(ctx, newOrder) - m.clobKeeper.SendOrderbookUpdates(ctx, orderbookUpdate, false) + m.clobKeeper.SendOrderbookUpdates(ctx, orderbookUpdate) } } @@ -1950,7 +1950,7 @@ func (m *MemClobPriceTimePriority) mustRemoveOrder( if m.generateOrderbookUpdates { // Send an orderbook update to grpc streams. orderbookUpdate := m.GetOrderbookUpdatesForOrderRemoval(ctx, order.OrderId) - m.clobKeeper.SendOrderbookUpdates(ctx, orderbookUpdate, false) + m.clobKeeper.SendOrderbookUpdates(ctx, orderbookUpdate) } } diff --git a/protocol/x/clob/types/clob_keeper.go b/protocol/x/clob/types/clob_keeper.go index 97d5dce483..c49a08fb0b 100644 --- a/protocol/x/clob/types/clob_keeper.go +++ b/protocol/x/clob/types/clob_keeper.go @@ -156,7 +156,6 @@ type ClobKeeper interface { SendOrderbookUpdates( ctx sdk.Context, offchainUpdates *OffchainUpdates, - snapshot bool, ) MigratePruneableOrders(ctx sdk.Context) } diff --git a/protocol/x/clob/types/mem_clob_keeper.go b/protocol/x/clob/types/mem_clob_keeper.go index a894a89605..f50176303d 100644 --- a/protocol/x/clob/types/mem_clob_keeper.go +++ b/protocol/x/clob/types/mem_clob_keeper.go @@ -105,7 +105,6 @@ type MemClobKeeper interface { SendOrderbookUpdates( ctx sdk.Context, offchainUpdates *OffchainUpdates, - snapshot bool, ) SendOrderbookFillUpdates( ctx sdk.Context,