-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #702]🚀Support pull message result handle-2🚀 #706
Conversation
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
Warning Rate limit exceeded@mxsm has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 52 minutes and 35 seconds before requesting another review. How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. WalkthroughThe changes introduce functionalities to support enhanced message pulling and offset management in RocketMQ's broker processor. This includes adding fields and methods to various structs for managing broadcast offsets and consumer offsets, enabling methods for incrementing statistics, and modifying method signatures and logic for message encoding and response handling. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant BrokerRuntime
participant BroadcastOffsetManager
participant ConsumerManager
client ->> BrokerRuntime: Pull Message Request
BrokerRuntime ->> BroadcastOffsetManager: Update Offset
BrokerRuntime ->> ConsumerManager: Commit Offset
BrokerRuntime ->> Client: Return Message
Assessment against linked issues
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files selected for processing (10)
- rocketmq-broker/src/broker_runtime.rs (4 hunks)
- rocketmq-broker/src/offset/manager/broadcast_offset_manager.rs (1 hunks)
- rocketmq-broker/src/offset/manager/consumer_offset_manager.rs (1 hunks)
- rocketmq-broker/src/processor/default_pull_message_result_handler.rs (5 hunks)
- rocketmq-broker/src/processor/pull_message_processor.rs (3 hunks)
- rocketmq-common/src/common/broker/broker_config.rs (2 hunks)
- rocketmq-remoting/src/protocol/request_source.rs (1 hunks)
- rocketmq-remoting/src/rpc/rpc_client_utils.rs (3 hunks)
- rocketmq-store/src/base/get_message_result.rs (1 hunks)
- rocketmq-store/src/stats/broker_stats_manager.rs (1 hunks)
Additional comments not posted (10)
rocketmq-remoting/src/protocol/request_source.rs (1)
54-54
: Good enhancement toRequestSource
usability.Making the
from_value
function public enhances the usability of theRequestSource
enum by allowing external modules to directly utilize this conversion functionality.rocketmq-broker/src/offset/manager/consumer_offset_manager.rs (1)
183-198
: Proper implementation ofcommit_pull_offset
.The new method
commit_pull_offset
correctly updates the pull offset table, aligning with the PR's objectives to enhance message handling.rocketmq-store/src/base/get_message_result.rs (1)
159-161
: Useful addition toGetMessageResult
.The method
message_mapped_list
provides a clean, read-only access to the internal message mappings, which enhances the usability of theGetMessageResult
class.rocketmq-common/src/common/broker/broker_config.rs (1)
157-157
: Addition oftransfer_msg_by_heap
field inBrokerConfig
The new boolean field
transfer_msg_by_heap
has been added to theBrokerConfig
struct and is set totrue
by default in thedefault
implementation. Ensure that this default value aligns with the intended use cases and performance expectations, especially considering that it affects how messages are transferred (potentially impacting memory usage and performance).Also applies to: 223-223
rocketmq-broker/src/processor/default_pull_message_result_handler.rs (1)
56-59
: Proper Initialization of New Fields inDefaultPullMessageResultHandler
The new fields
consumer_offset_manager
,consumer_manager
,broadcast_offset_manager
, andbroker_stats_manager
are properly initialized in the constructor ofDefaultPullMessageResultHandler
. This ensures that the handler has all necessary components to manage message pulling effectively.Also applies to: 76-79
rocketmq-broker/src/processor/pull_message_processor.rs (1)
798-810
: Addition ofis_broadcast
Utility FunctionThe
is_broadcast
function has been added to determine if a message should be treated as a broadcast based on the consumer group info. This utility function enhances the modularity and readability of the code. Make sure it is used consistently wherever applicable.rocketmq-broker/src/broker_runtime.rs (4)
128-128
: Review cloning ofbroadcast_offset_manager
.Adding
broadcast_offset_manager
to theClone
implementation ofBrokerRuntime
is essential to ensure that each clone of the runtime has its own reference to the shared manager. This is correctly implemented usingArc::clone()
.
201-201
: Initialization ofbroadcast_offset_manager
inBrokerRuntime
constructor.The initialization of
broadcast_offset_manager
usingArc::new(Default::default())
in the constructor is appropriate. This ensures that the manager is ready for use immediately afterBrokerRuntime
instantiation.
367-370
: Correct integration ofbroadcast_offset_manager
inDefaultPullMessageResultHandler
.The integration of
broadcast_offset_manager
inDefaultPullMessageResultHandler
within theinit_processor
method ensures that it is available for message processing. This aligns with the PR's objective to enhance message handling capabilities.
97-97
: Ensure proper handling ofbroadcast_offset_manager
initialization.The addition of
broadcast_offset_manager
toBrokerRuntime
is a significant change. It is crucial to ensure that this manager is properly initialized to avoid runtime errors or uninitialized access.Verification successful
Initialization of
broadcast_offset_manager
is properly handled.The
broadcast_offset_manager
is initialized usingArc::new(Default::default())
, ensuring it is properly set up in theBrokerRuntime
struct.
rocketmq-broker/src/broker_runtime.rs
:
broadcast_offset_manager: Arc::new(Default::default())
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Search for the initialization of `broadcast_offset_manager` in the constructor to ensure it is properly handled. # Test: Search for the initialization of `broadcast_offset_manager`. Expect: Proper initialization in the constructor. rg --type rust $'broadcast_offset_manager: Arc::new(Default::default())' rocketmq-broker/src/broker_runtime.rsLength of output: 111
Script:
#!/bin/bash # Description: Search for the initialization of `broadcast_offset_manager` in the BrokerRuntime struct to ensure it is properly handled. # Test: Search for the initialization of `broadcast_offset_manager` in the BrokerRuntime struct. rg --type rust 'broadcast_offset_manager' rocketmq-broker/src/broker_runtime.rs # Test: Search for the constructor of the BrokerRuntime struct to verify initialization. rg --type rust 'fn new' rocketmq-broker/src/broker_runtime.rsLength of output: 499
pub fn update_offset( | ||
&self, | ||
topic: &str, | ||
group: &str, | ||
queue_id: i32, | ||
offset: i64, | ||
client_id: &str, | ||
from_proxy: bool, | ||
) { | ||
unimplemented!() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implementation needed for update_offset
method.
The method update_offset
is currently marked as unimplemented!()
. This suggests that the functionality is not yet complete.
Do you have a timeline for implementing this? It might be beneficial to add a TODO
comment or create a tracking issue to ensure this does not get overlooked.
self.update_broadcast_pulled_offset( | ||
request_header.topic.as_str(), | ||
request_header.consumer_group.as_str(), | ||
request_header.queue_id.unwrap(), | ||
&request_header, | ||
&channel, | ||
Some(&response), | ||
get_message_result.next_begin_offset(), | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Complex Logic in update_broadcast_pulled_offset
and try_commit_offset
Methods
The methods update_broadcast_pulled_offset
and try_commit_offset
contain complex logic for managing offsets which might be hard to maintain. Consider simplifying or refactoring these methods to improve readability and maintainability.
- Existing complex methods
+ Proposed simplified versions (details depend on further context)
Also applies to: 143-147
pub fn inc_topic_put_nums(&self, topic: &str, num: i32, times: i32) {} | ||
|
||
pub fn inc_topic_put_size(&self, topic: &str, size: i32) {} | ||
|
||
pub fn inc_group_get_nums(&self, group: &str, topic: &str, inc_value: i32) {} | ||
pub fn inc_group_get_size(&self, group: &str, topic: &str, inc_value: i32) {} | ||
|
||
pub fn inc_group_ck_nums(&self, group: &str, topic: &str, inc_value: i32) {} | ||
|
||
pub fn inc_group_ack_nums(&self, group: &str, topic: &str, inc_value: i32) {} | ||
pub fn inc_broker_get_nums(&self, group: &str, inc_value: i32) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tip
Codebase Verification
Ensure Usage of New Statistics Increment Methods
The methods inc_topic_put_nums
, inc_topic_put_size
, inc_group_ck_nums
, and inc_group_ack_nums
are defined but not used anywhere in the codebase. Ensure these methods are invoked where necessary to maintain accurate statistics.
- File:
rocketmq-store/src/stats/broker_stats_manager.rs
- Methods:
inc_topic_put_nums
inc_topic_put_size
inc_group_ck_nums
inc_group_ack_nums
- Methods:
Analysis chain
Implementation of Statistics Increment Methods
New methods for incrementing various statistics (inc_topic_put_nums
, inc_topic_put_size
, inc_group_get_nums
, inc_group_get_size
, inc_group_ck_nums
, inc_group_ack_nums
, inc_broker_get_nums
) have been added. Ensure that these methods are called appropriately within the system to reflect accurate statistics.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify if the new statistics methods are used appropriately throughout the codebase.
# Test: Search for the usage of the new statistics methods. Expect: Usages in relevant parts of the codebase.
rg --type rust $'inc_'
Length of output: 2995
Script:
#!/bin/bash
# Description: Verify if the remaining new statistics methods are used appropriately throughout the codebase.
# Test: Search for the usage of the specific new statistics methods that were not found in the initial search.
rg --type rust -e 'inc_topic_put_nums' -e 'inc_topic_put_size' -e 'inc_group_ck_nums' -e 'inc_group_ack_nums'
Length of output: 601
min_offset = Some(next_item.start_offset); | ||
max_offset = min_offset; | ||
response_code = ResponseCode::PullRetryImmediately; | ||
} else if request_offset >= max_offset? { | ||
if let Some(next_item) = | ||
TopicQueueMappingUtils::find_next(mapping_items, Some(current_item), true) | ||
{ | ||
next_begin_offset = Some(next_item.start_offset); | ||
min_offset = Some(next_item.start_offset); | ||
max_offset = min_offset; | ||
response_code = ResponseCode::PullRetryImmediately; | ||
} else { | ||
response_code = ResponseCode::PullNotFound; | ||
} | ||
} else { | ||
response_code = code; | ||
response_code = ResponseCode::PullNotFound; | ||
} | ||
} else { | ||
response_code = code; | ||
} | ||
} | ||
|
||
if current_item.check_if_end_offset_decided() | ||
&& next_begin_offset.unwrap() >= current_item.end_offset | ||
if !is_revised | ||
&& leader_item.gen != current_item.gen | ||
&& earlist_item.gen != current_item.gen | ||
{ | ||
next_begin_offset = Some(current_item.end_offset); | ||
if request_offset < min_offset? { | ||
next_begin_offset = min_offset; | ||
response_code = ResponseCode::PullRetryImmediately; | ||
} else if request_offset >= max_offset? { | ||
if let Some(next_item) = | ||
TopicQueueMappingUtils::find_next(mapping_items, Some(current_item), true) | ||
{ | ||
next_begin_offset = Some(next_item.start_offset); | ||
min_offset = Some(next_item.start_offset); | ||
max_offset = min_offset; | ||
response_code = ResponseCode::PullRetryImmediately; | ||
} else { | ||
response_code = ResponseCode::PullNotFound; | ||
} | ||
} else { | ||
response_code = code; | ||
} | ||
} | ||
} | ||
|
||
response_header.next_begin_offset = | ||
Some(current_item.compute_static_queue_offset_strictly(next_begin_offset.unwrap())); | ||
response_header.min_offset = Some(current_item.compute_static_queue_offset_strictly( | ||
if current_item.check_if_end_offset_decided() | ||
&& next_begin_offset.unwrap() >= current_item.end_offset | ||
{ | ||
next_begin_offset = Some(current_item.end_offset); | ||
} | ||
|
||
response_header.next_begin_offset = | ||
Some(current_item.compute_static_queue_offset_strictly(next_begin_offset.unwrap())); | ||
response_header.min_offset = | ||
Some(current_item.compute_static_queue_offset_strictly( | ||
min_offset.unwrap().max(current_item.start_offset), | ||
)); | ||
response_header.max_offset = Some( | ||
current_item | ||
.compute_static_queue_offset_strictly(max_offset.unwrap()) | ||
.max(TopicQueueMappingDetail::compute_max_offset_from_mapping( | ||
mapping_detail, | ||
mapping_context.global_id, | ||
)), | ||
); | ||
response_header.offset_delta = Some(current_item.compute_offset_delta()); | ||
|
||
if code != ResponseCode::Success { | ||
Some( | ||
RemotingCommand::create_response_command_with_header(response_header.clone()) | ||
.set_code(response_code), | ||
) | ||
} else { | ||
None | ||
} | ||
response_header.max_offset = Some( | ||
current_item | ||
.compute_static_queue_offset_strictly(max_offset.unwrap()) | ||
.max(TopicQueueMappingDetail::compute_max_offset_from_mapping( | ||
mapping_detail, | ||
mapping_context.global_id, | ||
)), | ||
); | ||
response_header.offset_delta = Some(current_item.compute_offset_delta()); | ||
|
||
if code != ResponseCode::Success { | ||
Some( | ||
RemotingCommand::create_response_command_with_header(response_header.clone()) | ||
.set_code(response_code), | ||
) | ||
} else { | ||
None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tip
Codebase Verification
Lack of Direct Tests for rewrite_response_for_static_topic
Function
The rewrite_response_for_static_topic
function is not directly covered by unit tests in the relevant files. This lack of direct testing poses a risk for potential regressions or unintended behavior changes. Please add comprehensive unit tests to ensure the function handles all edge cases and error conditions correctly.
- Files to review:
rocketmq-broker/src/processor/consumer_manage_processor.rs
rocketmq-broker/src/processor/default_pull_message_result_handler.rs
rocketmq-broker/src/processor/pull_message_processor.rs
Analysis chain
Refactor of rewrite_response_for_static_topic
Function
The rewrite_response_for_static_topic
function has been significantly refactored to handle various response codes and offsets. Ensure that this refactoring does not introduce any regressions or unintended behavior changes, especially in how it handles edge cases and error conditions.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the correct behavior of the refactored function through unit tests.
# Test: Search for unit tests covering the `rewrite_response_for_static_topic` function. Expect: Comprehensive tests covering all branches.
rg --type rust -A 3 'fn test_rewrite_response_for_static_topic'
Length of output: 65
Script:
#!/bin/bash
# Description: Verify if `rewrite_response_for_static_topic` is referenced in any test files.
# Search for references to `rewrite_response_for_static_topic` in the codebase
rg 'rewrite_response_for_static_topic' --type rust
# Find all test files in the repository
fd --type f --extension rs --exec rg 'rewrite_response_for_static_topic' {}
Length of output: 1437
Script:
#!/bin/bash
# Description: Verify the presence of test functions in files where `rewrite_response_for_static_topic` is referenced.
# List of files where `rewrite_response_for_static_topic` is referenced
files=(
"rocketmq-broker/src/processor/consumer_manage_processor.rs"
"rocketmq-broker/src/processor/default_pull_message_result_handler.rs"
"rocketmq-broker/src/processor/pull_message_processor.rs"
)
# Search for test functions in the listed files
for file in "${files[@]}"; do
echo "Searching for test functions in $file"
rg --type rust 'fn test_' "$file"
done
Length of output: 1138
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
Which Issue(s) This PR Fixes(Closes)
Fixes #702
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
Enhancements
Configuration
transfer_msg_by_heap
inBrokerConfig
.API Changes
BroadcastOffsetManager
,ConsumerOffsetManager
,GetMessageResult
, andBrokerStatsManager
.from_value
function inRequestSource
public for better accessibility.