Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GRPC Streaming Batching #1633

Merged
merged 8 commits into from
Jun 10, 2024
Merged

GRPC Streaming Batching #1633

merged 8 commits into from
Jun 10, 2024

Conversation

jonfung-dydx
Copy link
Contributor

@jonfung-dydx jonfung-dydx commented Jun 6, 2024

Two flags added for maximum buffer size and flush interval ms
batch updates + gouroutine to emit them

TODO: exit all subscriptions upon loop overlap

Summary by CodeRabbit

  • New Features

    • Introduced a new gRPC streaming manager with enhanced buffering and periodic flushing capabilities.
    • Added methods for sending snapshots and stopping the streaming manager.
  • Bug Fixes

    • Removed unnecessary parameters from various functions to streamline orderbook updates.
  • Refactor

    • Renamed and restructured fields in orderbook update responses for consistency.
    • Updated gRPC streaming manager to handle updates via a cache and periodic flushes.
  • Documentation

    • Improved comments and documentation for better clarity on handling updates and snapshots.
  • Tests

    • Added tests for new gRPC streaming flags and validation logic.

Copy link
Contributor

coderabbitai bot commented Jun 6, 2024

Walkthrough

The changes primarily focus on refactoring and enhancing the gRPC streaming functionality within the dYdX protocol. Key modifications include restructuring the streaming manager to handle updates more efficiently, removing redundant fields, and adding new parameters for better configuration. The updates also involve renaming methods and adjusting function signatures to streamline the process of managing and sending orderbook updates.

Changes

File(s) Change Summary
indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts, proto/dydxprotocol/clob/query.proto Renamed and restructured fields within StreamOrderbookUpdatesResponse and StreamUpdate interfaces, removed blockHeight and execMode from StreamOrderbookUpdatesResponse, added these fields to StreamUpdate.
protocol/app/app.go, protocol/app/flags/flags.go, protocol/app/flags/flags_test.go Modified getGrpcStreamingManagerFromOptions to accept additional parameters. Added new fields and constants for gRPC streaming configuration in Flags struct, updated validation logic and tests.
protocol/mocks/ClobKeeper.go, protocol/mocks/MemClobKeeper.go, protocol/testutil/memclob/keeper.go Removed snapshot parameter from SendOrderbookUpdates function signature.
protocol/streaming/grpc/grpc_streaming_manager.go, protocol/streaming/grpc/noop_streaming_manager.go, protocol/streaming/grpc/types/manager.go Added fields and methods to GrpcStreamingManagerImpl for managing updates in a cache and periodic flushing. Renamed SendOrderbookUpdates to SendSnapshot and added new logic for handling updates. Added Stop method. Updated GrpcStreamingManager interface to reflect these changes.
protocol/x/clob/keeper/keeper.go, protocol/x/clob/keeper/order_state.go, protocol/x/clob/keeper/process_operations.go, protocol/x/clob/types/clob_keeper.go, protocol/x/clob/types/mem_clob_keeper.go Updated calls to SendOrderbookUpdates to remove the snapshot parameter. Modified InitializeNewGrpcStreams to call SendSnapshot instead.
protocol/lib/metrics/metric_keys.go Added constants for new metrics related to gRPC streaming.
protocol/x/clob/memclob/memclob.go Modified SendOrderbookUpdates method calls to remove the boolean parameter false in matchOrder, mustAddOrderToOrderbook, and mustRemoveOrder.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant GrpcStreamingManager
    participant Cache
    participant Subscribers

    Client->>GrpcStreamingManager: SendOrderbookUpdates(updates)
    GrpcStreamingManager->>Cache: AddUpdatesToCache(updates)
    loop Every flushIntervalMs
        GrpcStreamingManager->>Cache: FlushStreamUpdates()
        Cache->>Subscribers: Send updates
    end
    Client->>GrpcStreamingManager: Stop()
    GrpcStreamingManager->>Cache: Stop flushing
Loading

Poem

In the code where streams do flow,
Updates come and swiftly go.
Cache and flush, a dance so neat,
Making sure the data's sweet.
With every tick, the buffers clear,
Bringing order, far and near.
Cheers to changes, big and small,
Our protocol stands proud and tall! 🚀🐇


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Outside diff range and nitpick comments (2)
indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts (2)

Line range hint 325-346: Clarify the behavior when snapshot is true in StreamOrderbookUpdate.

The comment about the snapshot field could be clearer. Consider rephrasing to:

-  * All updates should be ignored until snapshot is recieved.
-  * If the snapshot is true, then all previous entries should be
-  * discarded and the orderbook should be resynced.
+  * When `snapshot` is true, ignore all previous updates and reset the orderbook to this snapshot's state.

This rephrasing avoids ambiguity and clearly states the expected behavior when a snapshot is received.


[!TIP]
Codebase Verification

Explicit null or undefined checks for clobMatch are missing in the usages. Ensure that all usages of clobMatch properly handle the case where it might be undefined to prevent potential runtime errors.

  • indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts
Analysis chain

Line range hint 360-381: Review the use of optional fields in StreamOrderbookFill.

Ensure that all usages of StreamOrderbookFill properly handle the case where clobMatch might be undefined.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify that optional fields in `StreamOrderbookFill` are handled correctly in all usages.

# Test: Search for usages of `StreamOrderbookFill` and check for null checks.
rg --type ts $'StreamOrderbookFill'

Length of output: 2183



Script:

#!/bin/bash
# Description: Verify that optional fields in `StreamOrderbookFill` are handled correctly in all usages.

# Test: Search for null or undefined checks for `clobMatch` in the identified file.
rg --type ts 'clobMatch' indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts

Length of output: 474

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 0e67d7a and 0b89169.

Files ignored due to path filters (1)
  • protocol/x/clob/types/query.pb.go is excluded by !**/*.pb.go
Files selected for processing (6)
  • indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts (15 hunks)
  • proto/dydxprotocol/clob/query.proto (4 hunks)
  • protocol/app/app.go (1 hunks)
  • protocol/app/flags/flags.go (7 hunks)
  • protocol/app/flags/flags_test.go (6 hunks)
  • protocol/streaming/grpc/grpc_streaming_manager.go (7 hunks)
Additional comments not posted (20)
protocol/app/flags/flags.go (6)

24-27: Ensure proper initialization and usage of new GRPC streaming flags.


43-45: LGTM! New constants for GRPC streaming flags are well-defined.


58-61: Default values for new GRPC streaming flags are set appropriately.


96-105: Properly added new GRPC streaming flags to the command line interface.


125-130: Validation logic for new GRPC streaming flags ensures positive values, which is crucial for system stability.


Line range hint 151-211: Correct retrieval of new GRPC streaming flag values from options.

protocol/app/flags/flags_test.go (3)

35-40: New GRPC streaming flags are correctly included in the command flag tests.


Line range hint 72-112: Comprehensive tests for new GRPC streaming flags cover both success and failure scenarios.


Line range hint 138-221: Tests for retrieving new GRPC streaming flag values from options are thorough and cover various scenarios.

proto/dydxprotocol/clob/query.proto (2)

173-173: StreamOrderbookUpdatesResponse message correctly updated to reflect the new structure.


186-191: StreamUpdate message enhancements correctly include block height and execution mode.

protocol/streaming/grpc/grpc_streaming_manager.go (6)

28-36: GrpcStreamingManagerImpl struct enhancements support efficient management of GRPC streams.


53-83: NewGrpcStreamingManager function correctly initializes the GRPC streaming manager with appropriate configurations.


125-127: Addition of the Stop method allows for clean termination of GRPC streaming.


168-169: SendOrderbookUpdates function correctly handles block height and execution mode in updates.


205-206: SendOrderbookFillUpdates function correctly handles block height and execution mode in fill updates.


214-236: AddUpdatesToCache function efficiently manages updates in the cache, including handling overflow scenarios.

indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts (1)

Line range hint 290-310: Ensure the new fields blockHeight and execMode are properly documented.

The addition of blockHeight and execMode to StreamUpdate aligns with the PR's objectives to enhance the granularity of streaming data. Good job on including detailed comments for these new fields.

protocol/app/app.go (2)

1934-1938: Ensure proper validation for GRPC streaming parameters.

Verification successful

The Validate function in protocol/app/flags/flags.go does include checks to ensure that GrpcStreamingMaxBufferSize and GrpcStreamingFlushIntervalMs are positive values. This confirms that the GRPC streaming parameters are properly validated.

  • GrpcStreamingMaxBufferSize must be a positive number.
  • GrpcStreamingFlushIntervalMs must be a positive number.
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify that the GRPC streaming parameters are validated for positive values.

# Test: Search for validation logic in the flags.go file. Expect: Validation logic for positive values.
rg --type go $'Validate' 'protocol/app/flags/flags.go'

Length of output: 131



Script:

#!/bin/bash
# Description: Extract the implementation of the Validate function from flags.go to check for validation logic.

# Extract the Validate function implementation
ast-grep --lang go --pattern $'func (f *Flags) Validate() error {
  $$$
}' protocol/app/flags/flags.go

Length of output: 1402


1934-1938: Verify correct retrieval and usage of GRPC streaming flags in the New function.

Verification successful

Verify the retrieval of GrpcStreamingFlushIntervalMs and GrpcStreamingMaxBufferSize from appFlags.


The GRPC streaming flags GrpcStreamingFlushIntervalMs and GrpcStreamingMaxBufferSize are correctly retrieved from appFlags and used in the configuration of the GRPC streaming manager.

  • GrpcStreamingFlushIntervalMs is used at line 1935.
  • GrpcStreamingMaxBufferSize is used at line 1936.
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify that GRPC streaming flags are correctly retrieved and used.

# Test: Search for the usage of GRPC streaming flags in the New function. Expect: Correct retrieval and usage.
rg --type go $'New' 'protocol/app/app.go'

Length of output: 9409



Script:

#!/bin/bash
# Description: Verify the retrieval of GRPC streaming flags from appFlags.

# Test: Search for the definitions and usages of GrpcStreamingFlushIntervalMs and GrpcStreamingMaxBufferSize.
rg --type go 'GrpcStreamingFlushIntervalMs' 'protocol/app/app.go'
rg --type go 'GrpcStreamingMaxBufferSize' 'protocol/app/app.go'

Length of output: 207

DefaultVEOracleEnabled = true
DefaultGrpcStreamingEnabled = false
DefaultGrpcStreamingFlushIntervalMs = 50
DefaultGrpcStreamingMaxBufferSize = 10000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is 10k reasonable here? should we go higher in case of spikes in traffic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess if we are doing 10ms, 10k should be enough

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'll use these for now and then tune later based on metrics

ticker *time.Ticker
done chan bool
// map of clob pair id to stream updates.
streamUpdateCache map[uint32][]clobtypes.StreamUpdate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this PR, but in memclob we should probably only generate updates for clob pair ids that are relevant. e.g. those with at lease one subscriber

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between e673b9d and e23f3cb.

Files selected for processing (14)
  • protocol/app/app.go (2 hunks)
  • protocol/app/flags/flags.go (7 hunks)
  • protocol/app/flags/flags_test.go (6 hunks)
  • protocol/mocks/ClobKeeper.go (1 hunks)
  • protocol/mocks/MemClobKeeper.go (1 hunks)
  • protocol/streaming/grpc/grpc_streaming_manager.go (7 hunks)
  • protocol/streaming/grpc/noop_streaming_manager.go (2 hunks)
  • protocol/streaming/grpc/types/manager.go (2 hunks)
  • protocol/testutil/memclob/keeper.go (1 hunks)
  • protocol/x/clob/keeper/keeper.go (1 hunks)
  • protocol/x/clob/keeper/order_state.go (1 hunks)
  • protocol/x/clob/keeper/process_operations.go (1 hunks)
  • protocol/x/clob/types/clob_keeper.go (1 hunks)
  • protocol/x/clob/types/mem_clob_keeper.go (1 hunks)
Files skipped from review due to trivial changes (1)
  • protocol/x/clob/types/mem_clob_keeper.go
Additional comments not posted (17)
protocol/streaming/grpc/types/manager.go (2)

10-10: The addition of the Stop method is appropriate for managing the lifecycle of streaming services.


19-23: The SendSnapshot method is well-defined and aligns with the existing methods in the interface, enhancing the functionality of the streaming manager.

protocol/streaming/grpc/noop_streaming_manager.go (2)

30-35: The SendSnapshot method in NoopGrpcStreamingManager correctly implements a no-operation version, which is suitable for its purpose.


56-57: The Stop method is correctly implemented as a no-operation, aligning with the class's intended functionality.

protocol/x/clob/types/clob_keeper.go (1)

Line range hint 1-1: The simplification of the SendOrderbookUpdates method signature is a positive change, making the interface cleaner and easier to use.

protocol/app/flags/flags.go (3)

24-27: The addition of gRPC streaming-related flags in the Flags struct is necessary for the new functionality and is well-integrated into the existing structure.


58-61: The defined default values for the new gRPC streaming flags are sensible and provide a good starting point for configuration.


96-105: The updates to the AddFlagsToCmd function to include the new gRPC streaming flags are correctly implemented, enhancing the configurability of the application.

protocol/app/flags/flags_test.go (3)

35-40: Ensure the new flags for gRPC streaming are correctly added to the command.


72-76: Validate the new gRPC streaming flags for correct behavior in various scenarios.

Also applies to: 93-111


Line range hint 138-221: Check the retrieval of flag values from options, ensuring new gRPC streaming flags are correctly handled.

protocol/x/clob/keeper/keeper.go (1)

275-279: Ensure the new method for sending snapshots via gRPC streams is correctly implemented.

protocol/x/clob/keeper/order_state.go (1)

290-290: Ensure the updated method for sending orderbook updates is correctly implemented.

protocol/streaming/grpc/grpc_streaming_manager.go (2)

309-330: Ensure the method for adding updates to the cache handles buffer overflow correctly.


Line range hint 333-377: Check the method for flushing stream updates to ensure it correctly handles sending updates and cleaning up subscriptions.

protocol/testutil/memclob/keeper.go (1)

Line range hint 1-1: The file keeper.go provides necessary test utilities for the MemClobKeeper interface and aligns with the expected functionality.

protocol/x/clob/keeper/process_operations.go (1)

77-77: Ensure the updated SendOrderbookUpdates function is called correctly.

The call to SendOrderbookUpdates correctly omits the previously required snapshot parameter, aligning with the changes described in the PR. This simplifies the function's usage and is consistent with the new data handling approach.

Comment on lines +1957 to +1961
return streaming.NewGrpcStreamingManager(
logger,
appFlags.GrpcStreamingFlushIntervalMs,
appFlags.GrpcStreamingMaxBufferSize,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error handling for GRPC streaming manager initialization.

- return streaming.NewGrpcStreamingManager(
-   logger,
-   appFlags.GrpcStreamingFlushIntervalMs,
-   appFlags.GrpcStreamingMaxBufferSize,
- )
+ manager, err := streaming.NewGrpcStreamingManager(
+   logger,
+   appFlags.GrpcStreamingFlushIntervalMs,
+   appFlags.GrpcStreamingMaxBufferSize,
+ )
+ if err != nil {
+   logger.Error("Failed to initialize GRPC Streaming Manager", "error", err)
+   return nil
+ }
+ return manager
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
return streaming.NewGrpcStreamingManager(
logger,
appFlags.GrpcStreamingFlushIntervalMs,
appFlags.GrpcStreamingMaxBufferSize,
)
manager, err := streaming.NewGrpcStreamingManager(
logger,
appFlags.GrpcStreamingFlushIntervalMs,
appFlags.GrpcStreamingMaxBufferSize,
)
if err != nil {
logger.Error("Failed to initialize GRPC Streaming Manager", "error", err)
return nil
}
return manager

Comment on lines +1152 to +1154
// SendOrderbookUpdates provides a mock function with given fields: ctx, offchainUpdates
func (_m *ClobKeeper) SendOrderbookUpdates(ctx types.Context, offchainUpdates *clobtypes.OffchainUpdates) {
_m.Called(ctx, offchainUpdates)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

Some instances of SendOrderbookUpdates still use the old signature with the snapshot parameter. These need to be updated to match the new signature.

  • protocol/x/clob/memclob/memclob_remove_order_test.go: Line(s) with SendOrderbookUpdates using three parameters.
  • protocol/x/clob/memclob/memclob_purge_invalid_memclob_state_test.go: Line(s) with SendOrderbookUpdates using three parameters.
  • protocol/x/clob/memclob/memclob.go: Line(s) with SendOrderbookUpdates using three parameters.
Analysis chain

Simplified function signature by removing the snapshot parameter.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify all function calls to `SendOrderbookUpdates` match the new signature.

# Test: Search for the function usage. Expect: Only occurrences of the new signature.
rg --type go $'SendOrderbookUpdates'

Length of output: 3436

Comment on lines +403 to +405
// SendOrderbookUpdates provides a mock function with given fields: ctx, offchainUpdates
func (_m *MemClobKeeper) SendOrderbookUpdates(ctx types.Context, offchainUpdates *clobtypes.OffchainUpdates) {
_m.Called(ctx, offchainUpdates)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

Some instances of SendOrderbookUpdates still use the old signature with three parameters and need to be updated to match the new signature.

  • protocol/x/clob/memclob/memclob_remove_order_test.go
  • protocol/x/clob/memclob/memclob_purge_invalid_memclob_state_test.go
Analysis chain

The updated function signature for SendOrderbookUpdates aligns with the new data handling approach.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify all function calls to `SendOrderbookUpdates` match the new signature.

# Test: Search for the function usage. Expect: Only occurrences of the new signature.
rg --type go $'SendOrderbookUpdates'

Length of output: 3436

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Outside diff range and nitpick comments (2)
protocol/streaming/grpc/grpc_streaming_manager.go (2)

Line range hint 132-206: The SendSnapshot method should handle errors more gracefully instead of using panic. Consider logging the error and continuing with the next update or returning the error to the caller.

- panic(err)
+ sm.logger.Error("Failed to get V1 updates", "error", err)
+ continue // or return err

Line range hint 316-366: The FlushStreamUpdates method is well-implemented. Consider adding a strategy for retrying or handling failed sends more explicitly in future iterations.

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between e23f3cb and 536dffe.

Files selected for processing (2)
  • protocol/lib/metrics/metric_keys.go (1 hunks)
  • protocol/streaming/grpc/grpc_streaming_manager.go (7 hunks)
Additional comments not posted (4)
protocol/lib/metrics/metric_keys.go (3)

70-70: The metric key GrpcSendOrderbookSnapshotLatency adheres to the naming guidelines and is appropriately named.


75-75: The metric key GrpcStreamNumUpdatesBuffered is well-named and follows the established metric naming conventions.


76-76: The metric key GrpcFlushUpdatesLatency correctly follows the naming guidelines.

protocol/streaming/grpc/grpc_streaming_manager.go (1)

128-130: The Stop method is correctly implemented for signaling the termination of the streaming manager.

Comment on lines +53 to +82
flushIntervalMs uint32,
maxUpdatesInCache uint32,
) *GrpcStreamingManagerImpl {
logger = logger.With(log.ModuleKey, "grpc-streaming")
return &GrpcStreamingManagerImpl{
grpcStreamingManager := &GrpcStreamingManagerImpl{
logger: logger,
orderbookSubscriptions: make(map[uint32]*OrderbookSubscription),
nextSubscriptionId: 0,

ticker: time.NewTicker(time.Duration(flushIntervalMs) * time.Millisecond),
done: make(chan bool),
streamUpdateCache: make(map[uint32][]clobtypes.StreamUpdate),
numUpdatesInCache: 0,

maxUpdatesInCache: maxUpdatesInCache,
}

// Start the goroutine for pushing order updates through
go func() {
for {
select {
case <-grpcStreamingManager.ticker.C:
grpcStreamingManager.FlushStreamUpdates()
case <-grpcStreamingManager.done:
return
}
}
}()

return grpcStreamingManager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor NewGrpcStreamingManager is well-implemented. Consider adding error handling or recovery inside the goroutine to manage any potential panics or errors gracefully.

Comment on lines +208 to +252
// SendOrderbookUpdates groups updates by their clob pair ids and
// sends messages to the subscribers.
func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
blockHeight uint32,
execMode sdk.ExecMode,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookUpdatesLatency,
time.Now(),
)

// Group updates by clob pair id.
updates := make(map[uint32]*clobtypes.OffchainUpdates)
for _, message := range offchainUpdates.Messages {
clobPairId := message.OrderId.ClobPairId
if _, ok := updates[clobPairId]; !ok {
updates[clobPairId] = clobtypes.NewOffchainUpdates()
}
updates[clobPairId].Messages = append(updates[clobPairId].Messages, message)
}

// Unmarshal each per-clob pair message to v1 updates.
updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate)
for clobPairId, update := range updates {
v1updates, err := GetOffchainUpdatesV1(update)
if err != nil {
panic(err)
}
updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{
{
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Updates: v1updates,
Snapshot: false,
},
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
},
}
}

sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(updates)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to SendSnapshot, replace the panic in SendOrderbookUpdates with proper error handling to enhance the robustness of the method.

- panic(err)
+ sm.logger.Error("Failed to get V1 updates", "error", err)
+ continue // or return err
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// SendOrderbookUpdates groups updates by their clob pair ids and
// sends messages to the subscribers.
func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
blockHeight uint32,
execMode sdk.ExecMode,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookUpdatesLatency,
time.Now(),
)
// Group updates by clob pair id.
updates := make(map[uint32]*clobtypes.OffchainUpdates)
for _, message := range offchainUpdates.Messages {
clobPairId := message.OrderId.ClobPairId
if _, ok := updates[clobPairId]; !ok {
updates[clobPairId] = clobtypes.NewOffchainUpdates()
}
updates[clobPairId].Messages = append(updates[clobPairId].Messages, message)
}
// Unmarshal each per-clob pair message to v1 updates.
updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate)
for clobPairId, update := range updates {
v1updates, err := GetOffchainUpdatesV1(update)
if err != nil {
panic(err)
}
updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{
{
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Updates: v1updates,
Snapshot: false,
},
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
},
}
}
sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(updates)))
v1updates, err := GetOffchainUpdatesV1(update)
if err != nil {
sm.logger.Error("Failed to get V1 updates", "error", err)
continue // or return err
}
```
### Full Updated Code Snippet
```go
// SendOrderbookUpdates groups updates by their clob pair ids and
// sends messages to the subscribers.
func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
blockHeight uint32,
execMode sdk.ExecMode,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookUpdatesLatency,
time.Now(),
)
// Group updates by clob pair id.
updates := make(map[uint32]*clobtypes.OffchainUpdates)
for _, message := range offchainUpdates.Messages {
clobPairId := message.OrderId.ClobPairId
if _, ok := updates[clobPairId]; !ok {
updates[clobPairId] = clobtypes.NewOffchainUpdates()
}
updates[clobPairId].Messages = append(updates[clobPairId].Messages, message)
}
// Unmarshal each per-clob pair message to v1 updates.
updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate)
for clobPairId, update := range updates {
v1updates, err := GetOffchainUpdatesV1(update)
if err != nil {
sm.logger.Error("Failed to get V1 updates", "error", err)
continue // or return err
}
updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{
{
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Updates: v1updates,
Snapshot: false,
},
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
},
}
}
sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(updates)))

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 536dffe and 7d40694.

Files selected for processing (1)
  • protocol/x/clob/memclob/memclob.go (3 hunks)
Files skipped from review due to trivial changes (1)
  • protocol/x/clob/memclob/memclob.go

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Outside diff range and nitpick comments (1)
protocol/streaming/grpc/grpc_streaming_manager.go (1)

Line range hint 144-222: The SendSnapshot method has been significantly refactored to bypass the buffer and send updates directly. This is a critical change as it affects how updates are processed and sent to subscribers. The method now groups updates by clob pair ids and sends them directly, which could improve performance by reducing latency. However, the use of panic for error handling when fetching V1 updates (line 252) is risky and could lead to crashes. Replace this with proper error handling to enhance the robustness of the method.

- panic(err)
+ sm.logger.Error("Failed to get V1 updates", "error", err)
+ return err // or handle the error appropriately
Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 7d40694 and 434e4d3.

Files selected for processing (1)
  • protocol/streaming/grpc/grpc_streaming_manager.go (7 hunks)
Additional comments not posted (2)
protocol/streaming/grpc/grpc_streaming_manager.go (2)

54-83: The implementation of the NewGrpcStreamingManager constructor is robust, incorporating the new configuration parameters effectively. The goroutine for flushing stream updates is well encapsulated within the constructor, ensuring that the streaming manager starts its operation immediately upon instantiation. However, consider adding error handling or recovery inside the goroutine to manage any potential panics or errors gracefully.


224-268: The SendOrderbookUpdates method has been updated to group updates and send them to subscribers. This method now aligns with the new streaming strategy by managing updates in a batched manner. The method's implementation is clear and follows the new architectural changes. However, similar to the SendSnapshot method, replace the panic in case of errors with proper error handling.

Comment on lines +308 to +329
func (sm *GrpcStreamingManagerImpl) AddUpdatesToCache(
updatesByClobPairId map[uint32][]clobtypes.StreamUpdate,
numUpdatesToAdd uint32,
) {
sm.Lock()
defer sm.Unlock()

sm.sendStreamUpdate(updatesBySubscriptionId)
for clobPairId, streamUpdates := range updatesByClobPairId {
sm.streamUpdateCache[clobPairId] = append(sm.streamUpdateCache[clobPairId], streamUpdates...)
}
sm.numUpdatesInCache += numUpdatesToAdd

// Remove all subscriptions and wipe the buffer if buffer overflows.
if sm.numUpdatesInCache > sm.maxUpdatesInCache {
sm.logger.Error("GRPC Streaming buffer full capacity. Dropping messages and all subscriptions. " +
"Disconnect all clients and increase buffer size via the grpc-stream-buffer-size flag.")
for id := range sm.orderbookSubscriptions {
sm.removeSubscription(id)
}
clear(sm.streamUpdateCache)
sm.numUpdatesInCache = 0
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AddUpdatesToCache method is a new addition that manages stream updates in the cache. This method is crucial for the buffering strategy of the streaming manager. It correctly locks the manager during updates to ensure thread safety. However, the method could be improved by adding more detailed logging, especially when the buffer overflows and all subscriptions are dropped. This would aid in debugging and operational monitoring.

+ sm.logger.Info("Adding updates to cache", "numUpdatesToAdd", numUpdatesToAdd)
+ if sm.numUpdatesInCache > sm.maxUpdatesInCache {
+     sm.logger.Warn("Buffer overflow", "maxUpdatesInCache", sm.maxUpdatesInCache)
+ }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (sm *GrpcStreamingManagerImpl) AddUpdatesToCache(
updatesByClobPairId map[uint32][]clobtypes.StreamUpdate,
numUpdatesToAdd uint32,
) {
sm.Lock()
defer sm.Unlock()
sm.sendStreamUpdate(updatesBySubscriptionId)
for clobPairId, streamUpdates := range updatesByClobPairId {
sm.streamUpdateCache[clobPairId] = append(sm.streamUpdateCache[clobPairId], streamUpdates...)
}
sm.numUpdatesInCache += numUpdatesToAdd
// Remove all subscriptions and wipe the buffer if buffer overflows.
if sm.numUpdatesInCache > sm.maxUpdatesInCache {
sm.logger.Error("GRPC Streaming buffer full capacity. Dropping messages and all subscriptions. " +
"Disconnect all clients and increase buffer size via the grpc-stream-buffer-size flag.")
for id := range sm.orderbookSubscriptions {
sm.removeSubscription(id)
}
clear(sm.streamUpdateCache)
sm.numUpdatesInCache = 0
}
func (sm *GrpcStreamingManagerImpl) AddUpdatesToCache(
updatesByClobPairId map[uint32][]clobtypes.StreamUpdate,
numUpdatesToAdd uint32,
) {
sm.Lock()
defer sm.Unlock()
sm.logger.Info("Adding updates to cache", "numUpdatesToAdd", numUpdatesToAdd)
for clobPairId, streamUpdates := range updatesByClobPairId {
sm.streamUpdateCache[clobPairId] = append(sm.streamUpdateCache[clobPairId], streamUpdates...)
}
sm.numUpdatesInCache += numUpdatesToAdd
// Remove all subscriptions and wipe the buffer if buffer overflows.
if sm.numUpdatesInCache > sm.maxUpdatesInCache {
sm.logger.Error("GRPC Streaming buffer full capacity. Dropping messages and all subscriptions. " +
"Disconnect all clients and increase buffer size via the grpc-stream-buffer-size flag.")
sm.logger.Warn("Buffer overflow", "maxUpdatesInCache", sm.maxUpdatesInCache)
for id := range sm.orderbookSubscriptions {
sm.removeSubscription(id)
}
clear(sm.streamUpdateCache)
sm.numUpdatesInCache = 0
}

Comment on lines +332 to +385
// FlushStreamUpdates takes in a map of clob pair id to stream updates and emits them to subscribers.
func (sm *GrpcStreamingManagerImpl) FlushStreamUpdates() {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcFlushUpdatesLatency,
time.Now(),
)

sm.Lock()
defer sm.Unlock()

metrics.IncrCounter(
metrics.GrpcEmitProtocolUpdateCount,
1,
)

// Send updates to subscribers.
idsToRemove := make([]uint32, 0)
for id, subscription := range sm.orderbookSubscriptions {
if streamUpdatesForSubscription, ok := updatesBySubscriptionId[id]; ok {
if len(streamUpdatesForSubscription) > 0 {
metrics.IncrCounter(
metrics.GrpcSendResponseToSubscriberCount,
1,
streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0)
for _, clobPairId := range subscription.clobPairIds {
if update, ok := sm.streamUpdateCache[clobPairId]; ok {
streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...)
}
}

if len(streamUpdatesForSubscription) > 0 {
metrics.IncrCounter(
metrics.GrpcSendResponseToSubscriberCount,
1,
)
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: streamUpdatesForSubscription,
},
); err != nil {
sm.logger.Error(
fmt.Sprintf("Error sending out update for grpc streaming subscription %+v", id),
"err", err,
)
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: streamUpdatesForSubscription,
},
); err != nil {
idsToRemove = append(idsToRemove, id)
}
idsToRemove = append(idsToRemove, id)
}
}
}

// Clean up subscriptions that have been closed.
// If a Send update has failed for any clob pair id, the whole subscription will be removed.
for _, id := range idsToRemove {
delete(sm.orderbookSubscriptions, id)
sm.removeSubscription(id)
}

clear(sm.streamUpdateCache)
sm.numUpdatesInCache = 0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FlushStreamUpdates method effectively manages the periodic flushing of updates from the cache to subscribers. The implementation ensures that updates are sent only if they exist for a subscriber's clob pair ids, which is efficient. However, consider enhancing the error handling by adding retry logic or more sophisticated error recovery mechanisms when sending updates fails.

+ if err != nil {
+     sm.logger.Error("Failed to send update, retrying...", "error", err)
+     // Implement retry logic here
+ }

Committable suggestion was skipped due to low confidence.

@jonfung-dydx jonfung-dydx merged commit 5aa268e into main Jun 10, 2024
17 checks passed
@jonfung-dydx jonfung-dydx deleted the jonfung/batching branch June 10, 2024 19:17
@jonfung-dydx
Copy link
Contributor Author

jonfung-dydx commented Jun 10, 2024

https://github.com/Mergifyio backport release/protocol/v5.x

Copy link
Contributor

mergify bot commented Jun 10, 2024

backport v5.x

❌ No backport have been created

  • Backport to branch v5.x failed

GitHub error: Branch not found

Copy link
Contributor

mergify bot commented Jun 10, 2024

backport protocol/v5.x

❌ No backport have been created

  • Backport to branch protocol/v5.x failed

GitHub error: Branch not found

Copy link
Contributor

mergify bot commented Jun 10, 2024

release /protocol/v5.x

❌ Sorry but I didn't understand the command. Please consult the commands documentation 📚.

Copy link
Contributor

mergify bot commented Jun 10, 2024

backport release/protocol/v5.x

✅ Backports have been created

mergify bot pushed a commit that referenced this pull request Jun 10, 2024
(cherry picked from commit 5aa268e)

# Conflicts:
#	protocol/streaming/grpc/grpc_streaming_manager.go
jonfung-dydx added a commit that referenced this pull request Jun 10, 2024
Co-authored-by: Jonathan Fung <[email protected]>
Co-authored-by: Jonathan Fung <[email protected]>
roy-dydx added a commit that referenced this pull request Jun 11, 2024
jonfung-dydx added a commit that referenced this pull request Jun 13, 2024
Co-authored-by: Jonathan Fung <[email protected]>
Co-authored-by: Jonathan Fung <[email protected]>
jonfung-dydx added a commit that referenced this pull request Jun 18, 2024
Co-authored-by: Jonathan Fung <[email protected]>
Co-authored-by: Jonathan Fung <[email protected]>
jonfung-dydx added a commit that referenced this pull request Jun 18, 2024
Co-authored-by: Jonathan Fung <[email protected]>
Co-authored-by: Jonathan Fung <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

Successfully merging this pull request may close these issues.

2 participants