-
Notifications
You must be signed in to change notification settings - Fork 112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #710]🚀Support pull message result handle-3🎨 #721
Conversation
(cherry picked from commit 7d2662a0e6a4fa8265f3fca762d96f63ff05a3a7)
WalkthroughThe changes enhance the Changes
Sequence DiagramsequenceDiagram
participant Client
participant Broker
participant PullRequestHoldService
participant ManyPullRequest
Client->>Broker: Send Pull Request
Broker->>PullRequestHoldService: Suspend Pull Request
PullRequestHoldService->>ManyPullRequest: Add Pull Request to List
ManyPullRequest->>PullRequestHoldService: Confirm Addition
Note over Client, Broker: Client waits for message
Broker->>PullRequestHoldService: Notify Message Arrival
PullRequestHoldService->>ManyPullRequest: Check and Retrieve Requests
ManyPullRequest->>PullRequestHoldService: Return Pull Requests
PullRequestHoldService->>Broker: Notify Message to Client
Broker->>Client: Deliver Message
Assessment against linked issues
Poem
Tip Early access features: enabledWe are currently testing the following features in early access:
Note:
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Outside diff range and nitpick comments (1)
rocketmq-common/src/common/broker/broker_config.rs (1)
158-159
: Add documentation for new fields inBrokerConfig
.The newly added fields
short_polling_time_mills
andlong_polling_enable
should have documentation comments to explain their purpose and usage./// The time in milliseconds for short polling. pub short_polling_time_mills: u64, /// Flag to enable long polling. pub long_polling_enable: bool,
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files ignored due to path filters (1)
Cargo.lock
is excluded by!**/*.lock
Files selected for processing (12)
- rocketmq-broker/Cargo.toml (1 hunks)
- rocketmq-broker/src/lib.rs (1 hunks)
- rocketmq-broker/src/long_polling.rs (1 hunks)
- rocketmq-broker/src/long_polling/long_polling_service/pull_request_hold_service.rs (1 hunks)
- rocketmq-broker/src/long_polling/many_pull_request.rs (1 hunks)
- rocketmq-broker/src/long_polling/pull_request.rs (1 hunks)
- rocketmq-broker/src/processor/default_pull_message_result_handler.rs (2 hunks)
- rocketmq-broker/src/processor/pull_message_processor.rs (2 hunks)
- rocketmq-common/src/common/broker/broker_config.rs (2 hunks)
- rocketmq-remoting/src/protocol.rs (3 hunks)
- rocketmq-remoting/src/protocol/header/pull_message_request_header.rs (2 hunks)
- rocketmq-remoting/src/protocol/remoting_command.rs (3 hunks)
Files skipped from review due to trivial changes (3)
- rocketmq-broker/Cargo.toml
- rocketmq-broker/src/long_polling.rs
- rocketmq-remoting/src/protocol.rs
Additional comments not posted (31)
rocketmq-broker/src/lib.rs (1)
36-36
: LGTM!The module renaming from
longpolling
tolong_polling
improves consistency.However, ensure the renamed module is correctly referenced throughout the codebase.
Verification successful
LGTM!
The module renaming from
longpolling
tolong_polling
has been consistently applied across the codebase.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the usage of the renamed module `long_polling`. # Test: Search for the usage of the module `long_polling`. Expect: Only occurances of the new module name. rg --type rust $'long_polling'Length of output: 853
Script:
#!/bin/bash # Description: Verify the absence of the old module name `longpolling`. # Test: Search for the usage of the old module name `longpolling`. Expect: No occurrences of the old module name. rg --type rust $'longpolling'Length of output: 28
rocketmq-broker/src/long_polling/many_pull_request.rs (6)
27-32
: LGTM!The
new
function correctly initializes theManyPullRequest
struct.
34-37
: LGTM!The
add_pull_request
function correctly adds a singlePullRequest
to the list.
39-42
: LGTM!The
add_pull_requests
function correctly adds multiplePullRequest
objects to the list.
44-53
: LGTM!The
clone_list_and_clear
function correctly clones the list and clears it.
60-63
: LGTM!The
is_empty
function correctly checks if the list is empty.
55-58
: Commented-out code.The
get_pull_request_list
function is commented out.rocketmq-broker/src/long_polling/pull_request.rs (9)
36-54
: LGTM!The
new
function correctly initializes thePullRequest
struct.
56-58
: LGTM!The
request_command
function correctly returns a reference to therequest_command
field.
60-62
: LGTM!The
request_command_mut
function correctly returns a mutable reference to therequest_command
field.
64-66
: LGTM!The
client_channel
function correctly returns a reference to theclient_channel
field.
68-70
: LGTM!The
pull_from_this_offset
function correctly returns thepull_from_this_offset
field.
72-74
: LGTM!The
subscription_data
function correctly returns a reference to thesubscription_data
field.
76-78
: LGTM!The
message_filter
function correctly returns a clonedArc
of themessage_filter
field.
80-82
: LGTM!The
timeout_millis
function correctly returns thetimeout_millis
field.
83-85
: LGTM!The
suspend_timestamp
function correctly returns thesuspend_timestamp
field.rocketmq-broker/src/long_polling/long_polling_service/pull_request_hold_service.rs (7)
42-51
: LGTM!The
new
function correctly initializes thePullRequestHoldService
struct.
59-65
: LGTM!The
suspend_pull_request
function correctly suspends a pull request and adds it to the list.
67-77
: LGTM!The
check_hold_request
function correctly checks the hold requests.
79-82
: LGTM!The
notify_message_arriving
function correctly notifies the arrival of a message.
84-158
: LGTM!The
notify_message_arriving_ext
function correctly notifies the arrival of a message with additional parameters.
160-178
: LGTM!The
notify_master_online
function correctly notifies that the master is online.
181-183
: LGTM!The
build_key
function correctly constructs a key from the topic and queue ID.rocketmq-common/src/common/broker/broker_config.rs (1)
226-227
: LGTM!rocketmq-remoting/src/protocol/header/pull_message_request_header.rs (1)
40-40
: Update the type ofsuspend_timeout_millis
tou64
.The type change from
i64
tou64
is appropriate for representing timeout values, which are non-negative.rocketmq-remoting/src/protocol/remoting_command.rs (3)
19-19
: Importfmt
forDisplay
implementation.The import of
fmt
is necessary for theDisplay
trait implementation.
119-138
: ImplementDisplay
forRemotingCommand
.The implementation provides a detailed string representation of the
RemotingCommand
struct, which is useful for debugging and logging.
344-347
: Add methodset_suspended_ref
toRemotingCommand
.The new method provides a way to set the
suspended
field, enhancing the flexibility of theRemotingCommand
struct.rocketmq-broker/src/processor/default_pull_message_result_handler.rs (2)
87-87
: Allow unused assignments.The attribute
#[allow(unused_assignments)]
is added to suppress warnings about unused assignments, which may be necessary for certain logic flows.
179-197
: HandleResponseCode::PullNotFound
inDefaultPullMessageResultHandler
.The added logic handles the scenario where no messages are found for a pull request. It checks for suspend flags and adjusts polling times based on configuration.
rocketmq-broker/src/processor/pull_message_processor.rs (1)
756-756
: Verify the usage ofis_broadcast
function.Ensure that the
is_broadcast
function is correctly implemented and used within this function.Verification successful
The
is_broadcast
function is correctly implemented and used.The function checks if the
proxy_pull_broadcast
flag is set or if theconsumer_group_info
indicates a broadcasting message model and passive consume type. The usage of this function in the code is appropriate and aligns with its intended purpose.
is_broadcast
function implementation is located inrocketmq-broker/src/processor/pull_message_processor.rs
at line 760.- The function is used at line 756 in the same file.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the implementation and usage of `is_broadcast` function. # Test: Search for the function implementation. Expect: Only one occurrence. rg --type rust $'fn is_broadcast'Length of output: 118
Script:
#!/bin/bash # Description: Extract the definition and all usages of the `is_broadcast` function within the file `rocketmq-broker/src/processor/pull_message_processor.rs`. # Extract the definition of `is_broadcast`. rg --type rust --context 5 'fn is_broadcast' rocketmq-broker/src/processor/pull_message_processor.rs # Extract all usages of `is_broadcast` within the file. rg 'is_broadcast' rocketmq-broker/src/processor/pull_message_processor.rsLength of output: 582
Script:
#!/bin/bash # Description: Extract the complete implementation of the `is_broadcast` function from the file `rocketmq-broker/src/processor/pull_message_processor.rs`. # Extract the complete implementation of `is_broadcast`. rg --type rust --context 30 'fn is_broadcast' rocketmq-broker/src/processor/pull_message_processor.rsLength of output: 1539
None => proxy_pull_broadcast, | ||
} | ||
} | ||
pub async fn execute_request_when_wakeup(&self, channel: Channel, request: RemotingCommand) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Empty function detected.
The execute_request_when_wakeup
function is currently empty. Consider implementing the function or adding a TODO comment.
Do you want me to help implement this function or open a GitHub issue to track this task?
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files selected for processing (4)
- rocketmq-broker/src/long_polling/long_polling_service/pull_request_hold_service.rs (1 hunks)
- rocketmq-broker/src/processor/default_pull_message_result_handler.rs (2 hunks)
- rocketmq-broker/src/processor/pull_message_processor.rs (2 hunks)
- rocketmq-remoting/src/protocol/remoting_command.rs (3 hunks)
Files skipped from review as they are similar to previous changes (4)
- rocketmq-broker/src/long_polling/long_polling_service/pull_request_hold_service.rs
- rocketmq-broker/src/processor/default_pull_message_result_handler.rs
- rocketmq-broker/src/processor/pull_message_processor.rs
- rocketmq-remoting/src/protocol/remoting_command.rs
(cherry picked from commit 7d2662a0e6a4fa8265f3fca762d96f63ff05a3a7)
Which Issue(s) This PR Fixes(Closes)
Fixes #710
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
PullRequestHoldService
to manage suspended pull requests for message processing.ManyPullRequest
to handle multiple pull requests with synchronization.Enhancements
BrokerConfig
with new fields for short polling time and long polling enablement.fmt::Display
for various enums andRemotingCommand
for better logging and debugging.Bug Fixes
PullNotFound
inDefaultPullMessageResultHandler
.Refactor
API Changes
PullMessageRequestHeader
to useu64
forsuspend_timeout_millis
.