Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #699]🚀Support pull message result handle #701

Merged
merged 2 commits into from
Jun 27, 2024

Conversation

TeslaRustor
Copy link
Collaborator

@TeslaRustor TeslaRustor commented Jun 27, 2024

Which Issue(s) This PR Fixes(Closes)

Fixes #699

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Introduced broadcast offset management to the broker for enhanced handling of message offsets.
    • Added static topic mapping logic to improve the flexibility and scalability of topic management.
    • Enhanced error handling with a new RpcException struct for better RPC exception management.
  • Improvements

    • Updated BrokerConfig to include a new enable_broadcast_offset_store field.
    • Improved PullMessageResponseHeader with the Clone trait for better object handling.
    • Enhanced RemotingCommand with a new method to set custom headers.
  • Bug Fixes

    • Fixed initialization of mapping_item_list in TopicQueueMappingManager.
  • Documentation

    • Added detailed documentation for new methods and structures in various modules to aid developers.
  • Refactors

    • Reorganized RPC-related modules for better code maintainability and clarity.

Copy link
Contributor

coderabbitai bot commented Jun 27, 2024

Walkthrough

The RocketMQ project has undergone several significant changes. Key additions include the integration of BroadcastOffsetManager within BrokerRuntime, new methods in BroadcastOffsetManager, and enhancements in PullMessageProcessor and DefaultPullMessageResultHandler to support static topic mapping and pull message result handling. Moreover, updates to the BrokerConfig struct, and new utility modules for RPC and static topic management, augment the system's flexibility and efficiency.

Changes

File Path/Group Change Summary
rocketmq-broker/.../broker_runtime.rs Added BroadcastOffsetManager instance creation in BrokerRuntime.
rocketmq-broker/.../broadcast_offset_manager.rs Introduced query_init_offset method in BroadcastOffsetManager.
rocketmq-broker/.../default_pull_message_result_handler.rs Altered DefaultPullMessageResultHandler for better pull message handling; uncommented execute_consume_message_hook_before method.
rocketmq-broker/.../pull_message_processor.rs Enhanced by adding BroadcastOffsetManager, new methods for static topic mapping, and updated imports.
rocketmq-broker/.../topic_queue_mapping_manager.rs Explicitly cloned mapping_item_list during struct field initialization.
rocketmq-common/.../broker_config.rs Added enable_broadcast_offset_store boolean field to BrokerConfig.
rocketmq-remoting/.../error.rs Introduced RpcException struct for enhanced RPC error handling.
rocketmq-remoting/.../pull_message_response_header.rs Added Clone trait to PullMessageResponseHeader.
rocketmq-remoting/.../remoting_command.rs Added set_command_custom_header_origin method for RemotingCommand.
rocketmq-remoting/.../static_topic.rs Introduced topic_queue_mapping_utils module.
rocketmq-remoting/.../logic_queue_mapping_item.rs Added methods for computing offsets and managing logic queue mapping items.
rocketmq-remoting/.../topic_queue_mapping_detail.rs Modified get_mapping_info to return a reference; added compute_max_offset_from_mapping method.
rocketmq-remoting/.../topic_queue_mapping_utils.rs Introduced TopicQueueMappingUtils struct with utility methods.
rocketmq-remoting/.../rpc.rs Reorganized module structure; made certain modules public.
rocketmq-remoting/.../rpc_client_utils.rs Added RpcClientUtils struct with methods for RPC command handling.
rocketmq-remoting/.../rpc_request.rs Changed RpcRequest.body to Option<Box<dyn Any>>; added new method.
rocketmq-remoting/.../rpc_response.rs Introduced RpcResponse struct with methods for creating and accessing response fields.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant BrokerRuntime
    participant BroadcastOffsetManager
    participant PullMessageProcessor

    Client->>BrokerRuntime: Initiate BrokerRuntime
    BrokerRuntime->>BroadcastOffsetManager: Create Instance
    Client->>PullMessageProcessor: Request Pull Message
    PullMessageProcessor->>BroadcastOffsetManager: Query Init Offset
    BroadcastOffsetManager-->>PullMessageProcessor: Return Initial Offset
    PullMessageProcessor-->>Client: Return Pull Message Result
Loading

Assessment against linked issues

Objective Addressed Explanation
Support for pull message result handling (#699)

Poem

In the realm of RocketMQ, we revamped the queue,
With Broadcast Offsets sparkling anew.✨
Pull requests polished, offsets aligned,
Static topics' secrets, now easily mined.🚀
Errors with a smile, and RPCs in style,
A seamless flow, mile after mile.🌟


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@github-actions github-actions bot requested review from mxsm and SpaceXCN June 27, 2024 15:21
@mxsm
Copy link
Owner

mxsm commented Jun 27, 2024

🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥

Copy link

codecov bot commented Jun 27, 2024

Codecov Report

Attention: Patch coverage is 0% with 259 lines in your changes missing coverage. Please review.

Project coverage is 26.98%. Comparing base (f60b39c) to head (4dfc094).

Files Patch % Lines
...tmq-broker/src/processor/pull_message_processor.rs 0.00% 147 Missing ⚠️
...protocol/static_topic/topic_queue_mapping_utils.rs 0.00% 29 Missing ⚠️
rocketmq-remoting/src/rpc/rpc_response.rs 0.00% 21 Missing ⚠️
rocketmq-remoting/src/rpc/rpc_client_utils.rs 0.00% 17 Missing ⚠️
.../protocol/static_topic/logic_queue_mapping_item.rs 0.00% 14 Missing ⚠️
...rotocol/static_topic/topic_queue_mapping_detail.rs 0.00% 11 Missing ⚠️
rocketmq-remoting/src/protocol/remoting_command.rs 0.00% 5 Missing ⚠️
rocketmq-remoting/src/rpc/rpc_request.rs 0.00% 5 Missing ⚠️
rocketmq-remoting/src/error.rs 0.00% 3 Missing ⚠️
...ker/src/offset/manager/broadcast_offset_manager.rs 0.00% 2 Missing ⚠️
... and 5 more
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #701      +/-   ##
==========================================
- Coverage   27.30%   26.98%   -0.33%     
==========================================
  Files         256      260       +4     
  Lines       20113    20364     +251     
==========================================
+ Hits         5492     5495       +3     
- Misses      14621    14869     +248     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@mxsm mxsm added approved PR has approved and removed ready to review labels Jun 27, 2024
@mxsm mxsm merged commit 5f6a7f5 into mxsm:main Jun 27, 2024
15 of 18 checks passed
@TeslaRustor TeslaRustor deleted the feature-699 branch June 27, 2024 15:29
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

Commits

Files that changed from the base of the PR and between f60b39c and 4dfc094.

Files selected for processing (17)
  • rocketmq-broker/src/broker_runtime.rs (2 hunks)
  • rocketmq-broker/src/offset/manager/broadcast_offset_manager.rs (1 hunks)
  • rocketmq-broker/src/processor/default_pull_message_result_handler.rs (1 hunks)
  • rocketmq-broker/src/processor/pull_message_processor.rs (9 hunks)
  • rocketmq-broker/src/topic/manager/topic_queue_mapping_manager.rs (1 hunks)
  • rocketmq-common/src/common/broker/broker_config.rs (2 hunks)
  • rocketmq-remoting/src/error.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header/pull_message_response_header.rs (1 hunks)
  • rocketmq-remoting/src/protocol/remoting_command.rs (1 hunks)
  • rocketmq-remoting/src/protocol/static_topic.rs (1 hunks)
  • rocketmq-remoting/src/protocol/static_topic/logic_queue_mapping_item.rs (1 hunks)
  • rocketmq-remoting/src/protocol/static_topic/topic_queue_mapping_detail.rs (2 hunks)
  • rocketmq-remoting/src/protocol/static_topic/topic_queue_mapping_utils.rs (1 hunks)
  • rocketmq-remoting/src/rpc.rs (1 hunks)
  • rocketmq-remoting/src/rpc/rpc_client_utils.rs (1 hunks)
  • rocketmq-remoting/src/rpc/rpc_request.rs (1 hunks)
  • rocketmq-remoting/src/rpc/rpc_response.rs (1 hunks)
Files skipped from review due to trivial changes (2)
  • rocketmq-broker/src/processor/default_pull_message_result_handler.rs
  • rocketmq-broker/src/topic/manager/topic_queue_mapping_manager.rs
Additional comments not posted (15)
rocketmq-remoting/src/rpc.rs (1)

17-21: Review module visibility changes.

The visibility of the modules rpc_client_utils, rpc_request, rpc_request_header, rpc_response, and topic_request_header has been modified to public. This change can significantly affect the use of these modules across the project. Please ensure that the increased visibility is necessary and that there are no unintended exposure of internal functionalities.

Verification successful

Visibility changes are justified.

The modules rpc_client_utils, rpc_request, rpc_request_header, rpc_response, and topic_request_header are used across multiple files in the codebase. Making these modules public is necessary to allow access to their functionalities in different parts of the project.

  • rocketmq-remoting/src/rpc.rs
  • rocketmq-remoting/src/rpc/rpc_request.rs
  • rocketmq-remoting/src/rpc/rpc_client_utils.rs
  • rocketmq-remoting/src/rpc/topic_request_header.rs
  • rocketmq-remoting/src/protocol/header/pull_message_request_header.rs
  • rocketmq-remoting/src/protocol/header/update_consumer_offset_header.rs
  • rocketmq-remoting/src/protocol/header/query_consumer_offset_request_header.rs
  • rocketmq-remoting/src/protocol/header/get_consumer_listby_group_request_header.rs
  • rocketmq-remoting/src/protocol/header/namesrv/topic_operation_header.rs
  • rocketmq-remoting/src/protocol/header/unregister_client_request_header.rs
  • rocketmq-broker/src/processor/pull_message_processor.rs
  • rocketmq-broker/src/processor/consumer_manage_processor.rs
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Verify if the newly public modules are being used externally as intended.
rg --type rust --files-with-matches 'rpc_client_utils|rpc_request|rpc_request_header|rpc_response|topic_request_header'

Length of output: 848



Script:

#!/bin/bash
# Extract lines where the modules are used in the identified files for further analysis.
files=(
  "rocketmq-remoting/src/rpc.rs"
  "rocketmq-remoting/src/rpc/rpc_request.rs"
  "rocketmq-remoting/src/rpc/rpc_client_utils.rs"
  "rocketmq-remoting/src/rpc/topic_request_header.rs"
  "rocketmq-remoting/src/protocol/header/pull_message_request_header.rs"
  "rocketmq-remoting/src/protocol/header/update_consumer_offset_header.rs"
  "rocketmq-remoting/src/protocol/header/query_consumer_offset_request_header.rs"
  "rocketmq-remoting/src/protocol/header/get_consumer_listby_group_request_header.rs"
  "rocketmq-remoting/src/protocol/header/namesrv/topic_operation_header.rs"
  "rocketmq-remoting/src/protocol/header/unregister_client_request_header.rs"
  "rocketmq-broker/src/processor/pull_message_processor.rs"
  "rocketmq-broker/src/processor/consumer_manage_processor.rs"
)

for file in "${files[@]}"; do
  echo "File: $file"
  rg 'rpc_client_utils|rpc_request|rpc_request_header|rpc_response|topic_request_header' "$file"
  echo "--------------------"
done

Length of output: 9120

rocketmq-remoting/src/protocol/static_topic.rs (1)

22-22: New module addition: topic_queue_mapping_utils.

The addition of the topic_queue_mapping_utils module suggests an extension or enhancement in handling topic queue mappings. Please confirm that the new functionalities provided by this module are integrated and utilized properly across the project, particularly in areas dealing with topic management.

Verification successful

New module addition: topic_queue_mapping_utils.

The module topic_queue_mapping_utils is declared in rocketmq-remoting/src/protocol/static_topic.rs and is utilized in rocketmq-broker/src/processor/pull_message_processor.rs. The functionalities provided by TopicQueueMappingUtils are being used effectively within pull_message_processor.rs.

  • rocketmq-remoting/src/protocol/static_topic.rs: Module declaration.
  • rocketmq-broker/src/processor/pull_message_processor.rs: Utilizes TopicQueueMappingUtils for handling logic queue mapping items and finding the next mapping.

The new functionalities provided by the topic_queue_mapping_utils module are integrated and utilized properly across the project.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Check for the usage of the new module across the project.
rg --type rust --files-with-matches 'topic_queue_mapping_utils'

Length of output: 166



Script:

#!/bin/bash
# Extract and display the usage of topic_queue_mapping_utils in the identified files.

# Extract usage in rocketmq-remoting/src/protocol/static_topic.rs
echo "Usage in rocketmq-remoting/src/protocol/static_topic.rs:"
rg --type rust 'topic_queue_mapping_utils' rocketmq-remoting/src/protocol/static_topic.rs

# Extract usage in rocketmq-broker/src/processor/pull_message_processor.rs
echo "Usage in rocketmq-broker/src/processor/pull_message_processor.rs:"
rg --type rust 'topic_queue_mapping_utils' rocketmq-broker/src/processor/pull_message_processor.rs

Length of output: 585



Script:

#!/bin/bash
# Extract and display the usage of TopicQueueMappingUtils in rocketmq-broker/src/processor/pull_message_processor.rs

echo "Usage of TopicQueueMappingUtils in rocketmq-broker/src/processor/pull_message_processor.rs:"
rg --type rust 'TopicQueueMappingUtils' rocketmq-broker/src/processor/pull_message_processor.rs

Length of output: 766

rocketmq-remoting/src/rpc/rpc_request.rs (1)

17-17: Review new import: std::any::Any.

The import of std::any::Any allows for type-agnostic handling in the RpcRequest struct. This is a powerful feature but should be used carefully to avoid runtime type errors.

rocketmq-remoting/src/protocol/static_topic/logic_queue_mapping_item.rs (1)

46-64: Enhance LogicQueueMappingItem with new methods.

The addition of methods like compute_physical_queue_offset, compute_offset_delta, check_if_end_offset_decided, and compute_max_static_queue_offset significantly enhances the functionality of LogicQueueMappingItem. Ensure that these methods are properly tested and integrated with the rest of the queue management logic.

rocketmq-remoting/src/protocol/static_topic/topic_queue_mapping_detail.rs (1)

31-36: Optimization of get_mapping_info method.

Refactoring the method to return a reference instead of a cloned vector is a good practice for performance, reducing unnecessary data copying.

rocketmq-remoting/src/rpc/rpc_response.rs (1)

24-30: Updated RpcResponse structure.

The addition of exception handling and the flexible header system enhance the RPC response's robustness. However, ensure that the exception handling aligns with the overall error strategy of the application.

rocketmq-remoting/src/protocol/static_topic/topic_queue_mapping_utils.rs (1)

19-80: Review of logic in find_logic_queue_mapping_item and find_next.

The methods implement search logic for mapping items. The comment about using binary search for performance improvement is valid and should be considered for future optimization. Additionally, the handling of ignore_negative in both methods is consistent and appropriate.

rocketmq-remoting/src/error.rs (1)

34-37: Addition of RpcException.

The introduction of RpcException is a good addition for specific error handling in RPC operations. Ensure that it is integrated properly within the RPC error handling flow.

rocketmq-remoting/src/protocol/header/pull_message_response_header.rs (1)

28-30: Addition of Clone Trait to PullMessageResponseHeader

The addition of the Clone trait to PullMessageResponseHeader is appropriate as it allows for easier management of response header instances, particularly useful in asynchronous operations where headers might need to be duplicated. This change enhances the flexibility and usability of the PullMessageResponseHeader struct without altering its intended functionality.

rocketmq-common/src/common/broker/broker_config.rs (1)

156-156: New Configuration Field: enable_broadcast_offset_store

Adding the enable_broadcast_offset_store field to BrokerConfig allows for conditional activation of the broadcast offset store feature. This is a significant addition as it directly influences the broker's operational behavior. The default setting of true in the Default implementation suggests an opt-out strategy, which is generally safe but should be clearly documented to avoid unexpected behavior in systems that do not require this feature.

Also applies to: 221-221

rocketmq-remoting/src/protocol/remoting_command.rs (1)

220-228: Addition of set_command_custom_header_origin Method

The introduction of the set_command_custom_header_origin method in the RemotingCommand struct provides a flexible way to manage command headers, especially in a multi-threaded environment where command headers might need to be modified or reset. The use of Option<Arc<SyncUnsafeCell>> for the command custom header ensures thread safety and mutable access, which is essential for the correct operation of remoting commands in RocketMQ.
[APROVED]

rocketmq-broker/src/processor/pull_message_processor.rs (4)

33-41: Review of new imports for static topics and RPC handling.

The newly added imports are specific to the functionalities being introduced in this PR, such as handling static topics and RPC responses. Ensure that these imports are used effectively throughout the code to avoid any unused import warnings.


73-73: Integration of broadcast_offset_manager in PullMessageProcessor.

The addition of broadcast_offset_manager is crucial for managing broadcast offsets. This change is well integrated into the constructor and the struct itself. Ensure that this field is utilized appropriately in methods that deal with message offsets.

Also applies to: 87-99


Line range hint 732-781: Review of query_broadcast_pull_init_offset method.

This method is well-designed to query the initial offset for broadcast pulls. It properly integrates with the broadcast_offset_manager and handles different scenarios based on the client and group information.

  • Error Handling: The method returns -1 in several conditions to indicate errors or special cases. Ensure that these cases are handled appropriately in the calling code to avoid misinterpretations of the error code.

783-794: Review of is_broadcast helper method.

The is_broadcast method efficiently determines the broadcast status based on the consumer group info and the request source. The logic is clear and concise.

  • Suggestion: Add more detailed comments explaining the conditions under which an operation is considered a broadcast. This will improve the readability and maintainability of the code.

Comment on lines +104 to +193
pub fn rewrite_request_for_static_topic(
request_header: &mut PullMessageRequestHeader,
mapping_context: &mut TopicQueueMappingContext,
) -> Option<RemotingCommand> {
mapping_context.mapping_detail.as_ref()?;
let mapping_detail = mapping_context.mapping_detail.as_ref().unwrap();
let topic = mapping_context.topic.as_str();
let global_id = mapping_context.global_id;
if !mapping_context.is_leader() {
return Some(RemotingCommand::create_response_command_with_code_remark(
ResponseCode::NotLeaderForQueue,
format!(
"{}-{} cannot find mapping item in request process of current broker {}",
topic,
global_id.unwrap_or_default(),
mapping_detail
.topic_queue_mapping_info
.bname
.clone()
.unwrap_or_default()
),
));
}

let global_offset = request_header.queue_offset;
let mapping_item = TopicQueueMappingUtils::find_logic_queue_mapping_item(
&mapping_context.mapping_item_list,
global_offset,
true,
)?;
mapping_context.current_item = Some(mapping_item.clone());

if global_offset < mapping_item.logic_offset {
// Handle offset moved...
}

let bname = &mapping_item.bname;
let phy_queue_id = mapping_item.queue_id;
let phy_queue_offset = mapping_item.compute_physical_queue_offset(global_offset);
request_header.queue_id = Some(phy_queue_id);
request_header.queue_offset = phy_queue_offset;
if mapping_item.check_if_end_offset_decided()
/* && request_header.max_msg_nums.is_some() */
{
request_header.max_msg_nums = std::cmp::min(
(mapping_item.end_offset - mapping_item.start_offset) as i32,
request_header.max_msg_nums,
);
}

if &mapping_detail.topic_queue_mapping_info.bname == bname {
return None;
}

let mut sys_flag = request_header.sys_flag;
let topic_request = request_header.topic_request.as_mut().unwrap();
topic_request.lo = Some(false);
topic_request
.rpc
.as_mut()
.unwrap()
.broker_name
.clone_from(bname);
sys_flag = PullSysFlag::clear_suspend_flag(sys_flag as u32) as i32;
sys_flag = PullSysFlag::clear_commit_offset_flag(sys_flag as u32) as i32;
request_header.sys_flag = sys_flag;
/* let rpc_request = RpcRequest::new(RequestCode::PullMessage, request_header.clone(), None);
let rpc_response = broker_controller
.broker_outer_api
.rpc_client
.invoke(rpc_request, broker_controller.broker_config.forward_timeout)?;
if rpc_response.exception.is_some() {
return Err(rpc_response.exception.unwrap());
}*/

let rpc_response = RpcResponse::default();
let response_header = rpc_response.get_header_mut::<PullMessageResponseHeader>();
let rewrite_result = Self::rewrite_response_for_static_topic(
request_header,
response_header.unwrap(),
mapping_context,
ResponseCode::from(rpc_response.code),
);
if rewrite_result.is_some() {
return rewrite_result;
}
Some(RpcClientUtils::create_command_for_rpc_response(
rpc_response,
))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review of rewrite_request_for_static_topic method.

This method is central to handling requests for static topics. The logic for rewriting the request based on the static topic mapping is comprehensive. However, ensure that all edge cases are handled, particularly in scenarios where optional fields might be None. Consider adding more robust error handling and logging for critical steps.

  • Potential Issue: The method assumes global_id and other optional fields are always present (unwrap() usage). This might lead to panics if the fields are None.
  • Suggestion: Use safe unwrapping or default values to handle None values gracefully.
- let global_id = mapping_context.global_id;
+ let global_id = mapping_context.global_id.unwrap_or_default();

Committable suggestion was skipped due to low confidence.

Comment on lines +195 to +316
if code != ResponseCode::Success {
let mut is_revised = false;
if leader_item.gen == current_item.gen {
if request_offset > max_offset.unwrap() {
if code == ResponseCode::PullOffsetMoved {
response_code = ResponseCode::PullOffsetMoved;
next_begin_offset = max_offset;
} else {
response_code = code;
}
} else if request_offset < min_offset.unwrap() {
next_begin_offset = min_offset;
response_code = ResponseCode::PullRetryImmediately;
} else {
response_code = code;
}
}

if earlist_item.gen == current_item.gen {
if request_offset < min_offset.unwrap() {
/*if code == ResponseCode::PullOffsetMoved {
response_code = ResponseCode::PullOffsetMoved;
next_begin_offset = min_offset;
} else {
response_code = ResponseCode::PullOffsetMoved;
next_begin_offset = min_offset;
}*/
response_code = ResponseCode::PullOffsetMoved;
next_begin_offset = min_offset;
} else if request_offset >= max_offset.unwrap() {
if let Some(next_item) =
TopicQueueMappingUtils::find_next(mapping_items, Some(current_item), true)
{
is_revised = true;
next_begin_offset = Some(next_item.start_offset);
min_offset = Some(next_item.start_offset);
max_offset = min_offset;
response_code = ResponseCode::PullRetryImmediately;
} else {
response_code = ResponseCode::PullNotFound;
}
} else {
response_code = code;
}
}

if !is_revised
&& leader_item.gen != current_item.gen
&& earlist_item.gen != current_item.gen
{
if request_offset < min_offset? {
next_begin_offset = min_offset;
response_code = ResponseCode::PullRetryImmediately;
} else if request_offset >= max_offset? {
if let Some(next_item) =
TopicQueueMappingUtils::find_next(mapping_items, Some(current_item), true)
{
next_begin_offset = Some(next_item.start_offset);
min_offset = Some(next_item.start_offset);
max_offset = min_offset;
response_code = ResponseCode::PullRetryImmediately;
} else {
response_code = ResponseCode::PullNotFound;
}
} else {
response_code = code;
}
}
}

if current_item.check_if_end_offset_decided()
&& next_begin_offset.unwrap() >= current_item.end_offset
{
next_begin_offset = Some(current_item.end_offset);
}

response_header.next_begin_offset =
Some(current_item.compute_static_queue_offset_strictly(next_begin_offset.unwrap()));
response_header.min_offset = Some(current_item.compute_static_queue_offset_strictly(
min_offset.unwrap().max(current_item.start_offset),
));
response_header.max_offset = Some(
current_item
.compute_static_queue_offset_strictly(max_offset.unwrap())
.max(TopicQueueMappingDetail::compute_max_offset_from_mapping(
mapping_detail,
mapping_context.global_id,
)),
);
response_header.offset_delta = Some(current_item.compute_offset_delta());

if code != ResponseCode::Success {
Some(
RemotingCommand::create_response_command_with_header(response_header.clone())
.set_code(response_code),
)
} else {
None
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review of rewrite_response_for_static_topic method.

This method effectively handles the rewriting of responses for static topics. It adjusts the response based on the mapping details and the current state of the message queue.

  • Performance Consideration: The method contains multiple conditional checks and potentially expensive operations within loops. Consider optimizing these parts to improve performance, especially in high-load scenarios.
  • Error Handling: The method uses unwrap() extensively, which could lead to runtime panics. Implement more robust error handling to prevent potential crashes.
- let leader_item = mapping_context.leader_item.as_ref().unwrap();
+ let leader_item = mapping_context.leader_item.as_ref().expect("Leader item must be present");
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn rewrite_response_for_static_topic(
request_header: &PullMessageRequestHeader,
response_header: &mut PullMessageResponseHeader,
mapping_context: &mut TopicQueueMappingContext,
code: ResponseCode,
) -> Option<RemotingCommand> {
mapping_context.mapping_detail.as_ref()?;
let mapping_detail = mapping_context.mapping_detail.as_ref().unwrap();
let leader_item = mapping_context.leader_item.as_ref().unwrap();
let current_item = mapping_context.current_item.as_ref().unwrap();
let mapping_items = &mut mapping_context.mapping_item_list;
let earlist_item =
TopicQueueMappingUtils::find_logic_queue_mapping_item(mapping_items, 0, true).unwrap();
assert!(current_item.logic_offset >= 0);
let request_offset = request_header.queue_offset;
let mut next_begin_offset = response_header.next_begin_offset;
let mut min_offset = response_header.min_offset;
let mut max_offset = response_header.max_offset;
let mut response_code = code;
if code != ResponseCode::Success {
let mut is_revised = false;
if leader_item.gen == current_item.gen {
if request_offset > max_offset.unwrap() {
if code == ResponseCode::PullOffsetMoved {
response_code = ResponseCode::PullOffsetMoved;
next_begin_offset = max_offset;
} else {
response_code = code;
}
} else if request_offset < min_offset.unwrap() {
next_begin_offset = min_offset;
response_code = ResponseCode::PullRetryImmediately;
} else {
response_code = code;
}
}
if earlist_item.gen == current_item.gen {
if request_offset < min_offset.unwrap() {
/*if code == ResponseCode::PullOffsetMoved {
response_code = ResponseCode::PullOffsetMoved;
next_begin_offset = min_offset;
} else {
response_code = ResponseCode::PullOffsetMoved;
next_begin_offset = min_offset;
}*/
response_code = ResponseCode::PullOffsetMoved;
next_begin_offset = min_offset;
} else if request_offset >= max_offset.unwrap() {
if let Some(next_item) =
TopicQueueMappingUtils::find_next(mapping_items, Some(current_item), true)
{
is_revised = true;
next_begin_offset = Some(next_item.start_offset);
min_offset = Some(next_item.start_offset);
max_offset = min_offset;
response_code = ResponseCode::PullRetryImmediately;
} else {
response_code = ResponseCode::PullNotFound;
}
} else {
response_code = code;
}
}
if !is_revised
&& leader_item.gen != current_item.gen
&& earlist_item.gen != current_item.gen
{
if request_offset < min_offset? {
next_begin_offset = min_offset;
response_code = ResponseCode::PullRetryImmediately;
} else if request_offset >= max_offset? {
if let Some(next_item) =
TopicQueueMappingUtils::find_next(mapping_items, Some(current_item), true)
{
next_begin_offset = Some(next_item.start_offset);
min_offset = Some(next_item.start_offset);
max_offset = min_offset;
response_code = ResponseCode::PullRetryImmediately;
} else {
response_code = ResponseCode::PullNotFound;
}
} else {
response_code = code;
}
}
}
if current_item.check_if_end_offset_decided()
&& next_begin_offset.unwrap() >= current_item.end_offset
{
next_begin_offset = Some(current_item.end_offset);
}
response_header.next_begin_offset =
Some(current_item.compute_static_queue_offset_strictly(next_begin_offset.unwrap()));
response_header.min_offset = Some(current_item.compute_static_queue_offset_strictly(
min_offset.unwrap().max(current_item.start_offset),
));
response_header.max_offset = Some(
current_item
.compute_static_queue_offset_strictly(max_offset.unwrap())
.max(TopicQueueMappingDetail::compute_max_offset_from_mapping(
mapping_detail,
mapping_context.global_id,
)),
);
response_header.offset_delta = Some(current_item.compute_offset_delta());
if code != ResponseCode::Success {
Some(
RemotingCommand::create_response_command_with_header(response_header.clone())
.set_code(response_code),
)
} else {
None
}
}
pub fn rewrite_response_for_static_topic(
request_header: &PullMessageRequestHeader,
response_header: &mut PullMessageResponseHeader,
mapping_context: &mut TopicQueueMappingContext,
code: ResponseCode,
) -> Option<RemotingCommand> {
mapping_context.mapping_detail.as_ref()?;
let mapping_detail = mapping_context.mapping_detail.as_ref().unwrap();
let leader_item = mapping_context.leader_item.as_ref().expect("Leader item must be present");
let current_item = mapping_context.current_item.as_ref().unwrap();
let mapping_items = &mut mapping_context.mapping_item_list;
let earlist_item =
TopicQueueMappingUtils::find_logic_queue_mapping_item(mapping_items, 0, true).unwrap();
assert!(current_item.logic_offset >= 0);
let request_offset = request_header.queue_offset;
let mut next_begin_offset = response_header.next_begin_offset;
let mut min_offset = response_header.min_offset;
let mut max_offset = response_header.max_offset;
let mut response_code = code;
if code != ResponseCode::Success {
let mut is_revised = false;
if leader_item.gen == current_item.gen {
if request_offset > max_offset.unwrap() {
if code == ResponseCode::PullOffsetMoved {
response_code = ResponseCode::PullOffsetMoved;
next_begin_offset = max_offset;
} else {
response_code = code;
}
} else if request_offset < min_offset.unwrap() {
next_begin_offset = min_offset;
response_code = ResponseCode::PullRetryImmediately;
} else {
response_code = code;
}
}
if earlist_item.gen == current_item.gen {
if request_offset < min_offset.unwrap() {
/*if code == ResponseCode::PullOffsetMoved {
response_code = ResponseCode::PullOffsetMoved;
next_begin_offset = min_offset;
} else {
response_code = ResponseCode::PullOffsetMoved;
next_begin_offset = min_offset;
}*/
response_code = ResponseCode::PullOffsetMoved;
next_begin_offset = min_offset;
} else if request_offset >= max_offset.unwrap() {
if let Some(next_item) =
TopicQueueMappingUtils::find_next(mapping_items, Some(current_item), true)
{
is_revised = true;
next_begin_offset = Some(next_item.start_offset);
min_offset = Some(next_item.start_offset);
max_offset = min_offset;
response_code = ResponseCode::PullRetryImmediately;
} else {
response_code = ResponseCode::PullNotFound;
}
} else {
response_code = code;
}
}
if !is_revised
&& leader_item.gen != current_item.gen
&& earlist_item.gen != current_item.gen
{
if request_offset < min_offset? {
next_begin_offset = min_offset;
response_code = ResponseCode::PullRetryImmediately;
} else if request_offset >= max_offset? {
if let Some(next_item) =
TopicQueueMappingUtils::find_next(mapping_items, Some(current_item), true)
{
next_begin_offset = Some(next_item.start_offset);
min_offset = Some(next_item.start_offset);
max_offset = min_offset;
response_code = ResponseCode::PullRetryImmediately;
} else {
response_code = ResponseCode::PullNotFound;
}
} else {
response_code = code;
}
}
}
if current_item.check_if_end_offset_decided()
&& next_begin_offset.unwrap() >= current_item.end_offset
{
next_begin_offset = Some(current_item.end_offset);
}
response_header.next_begin_offset =
Some(current_item.compute_static_queue_offset_strictly(next_begin_offset.unwrap()));
response_header.min_offset = Some(current_item.compute_static_queue_offset_strictly(
min_offset.unwrap().max(current_item.start_offset),
));
response_header.max_offset = Some(
current_item
.compute_static_queue_offset_strictly(max_offset.unwrap())
.max(TopicQueueMappingDetail::compute_max_offset_from_mapping(
mapping_detail,
mapping_context.global_id,
)),
);
response_header.offset_delta = Some(current_item.compute_offset_delta());
if code != ResponseCode::Success {
Some(
RemotingCommand::create_response_command_with_header(response_header.clone())
.set_code(response_code),
)
} else {
None
}
}

Comment on lines +38 to +51
pub fn compute_max_offset_from_mapping(
mapping_detail: &TopicQueueMappingDetail,
global_id: Option<i32>,
) -> i64 {
match Self::get_mapping_info(mapping_detail, global_id.unwrap()) {
Some(mapping_items) => {
if mapping_items.is_empty() {
return -1;
}
let item = mapping_items.last().unwrap();
item.compute_max_static_queue_offset()
}
None => -1,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review of compute_max_offset_from_mapping.

This method handles the computation of the maximum offset. The use of unwrap() could lead to panic if global_id is None or if mapping_items is empty. Consider adding error handling or checks to prevent runtime panics.

- match Self::get_mapping_info(mapping_detail, global_id.unwrap()) {
+ match global_id.and_then(|id| Self::get_mapping_info(mapping_detail, id)) {
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn compute_max_offset_from_mapping(
mapping_detail: &TopicQueueMappingDetail,
global_id: Option<i32>,
) -> i64 {
match Self::get_mapping_info(mapping_detail, global_id.unwrap()) {
Some(mapping_items) => {
if mapping_items.is_empty() {
return -1;
}
let item = mapping_items.last().unwrap();
item.compute_max_static_queue_offset()
}
None => -1,
}
pub fn compute_max_offset_from_mapping(
mapping_detail: &TopicQueueMappingDetail,
global_id: Option<i32>,
) -> i64 {
match global_id.and_then(|id| Self::get_mapping_info(mapping_detail, id)) {
Some(mapping_items) => {
if mapping_items.is_empty() {
return -1;
}
let item = mapping_items.last().unwrap();
item.compute_max_static_queue_offset()
}
None => -1,
}

Comment on lines +42 to +53
pub fn get_header<T>(&self) -> Option<&T>
where
T: CommandCustomHeader + Send + Sync + 'static,
{
match self.header.as_ref() {
None => None,
Some(value) => {
let value = value.get();
let value = value as *const dyn CommandCustomHeader as *const T;
unsafe { Some(&*value) }
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential safety issues with header access methods.

The get_header and get_header_mut methods use unsafe code to cast types, which can lead to undefined behavior if the types do not match. Consider implementing additional type safety mechanisms or documentation to ensure safe usage.

- let value = value as *const dyn CommandCustomHeader as *const T;
+ let value = value.downcast_ref::<T>().ok_or_else(|| "Type mismatch")?;

Also applies to: 56-68

Comment on lines +28 to +71
impl RpcClientUtils {
pub fn create_command_for_rpc_request(rpc_request: RpcRequest) -> RemotingCommand {
let cmd = RemotingCommand::create_request_command(rpc_request.code, rpc_request.header);
cmd.set_body(Self::encode_body(rpc_request.body))
}

pub fn create_command_for_rpc_response(mut rpc_response: RpcResponse) -> RemotingCommand {
let mut cmd = match rpc_response.header.take() {
None => RemotingCommand::create_response_command_with_code(rpc_response.code),
Some(value) => RemotingCommand::create_response_command()
.set_command_custom_header_origin(Some(value)),
};
match rpc_response.exception {
None => {}
Some(value) => cmd.set_remark_ref(Some(value.1.clone())),
}
cmd.set_body(Self::encode_body(rpc_response.body))
}

pub fn encode_body(_body: Option<Box<dyn Any>>) -> Option<Bytes> {
/*if body.is_none() {
return None;
}
let body = body.unwrap();
if body.is::<Bytes>() {
return Some(body.downcast_ref::<Bytes>().unwrap().clone());
} else if body.is::<Vec<u8>>() {
return Some(Bytes::from(
body.downcast_ref::<Vec<u8>>().unwrap().as_ref(),
));
}
/*else if body.is::<dyn RemotingSerializable<Output = Self>>() {
return Some(Bytes::from(
body.downcast_ref::<dyn RemotingSerializable<Output = Self>>()
.unwrap()
.encode(),
));

}*/
else {
None
}*/
None
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incomplete implementation in encode_body.

The method encode_body contains commented-out code which suggests that not all cases for encoding the body are handled. It's important to either complete the implementation or remove the unused code to maintain code cleanliness.

- /*if body.is_none() {
+ if let Some(body) = body {
-        return None;
-    }
-    let body = body.unwrap();
+    if body.is::<Bytes>() {
+        Some(body.downcast_ref::<Bytes>().unwrap().clone())
+    } else if body.is::<Vec<u8>>() {
+        Some(Bytes::from(body.downcast_ref::<Vec<u8>>().unwrap().as_ref()))
+    } else {
+        None
+    }
-    }*/
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
impl RpcClientUtils {
pub fn create_command_for_rpc_request(rpc_request: RpcRequest) -> RemotingCommand {
let cmd = RemotingCommand::create_request_command(rpc_request.code, rpc_request.header);
cmd.set_body(Self::encode_body(rpc_request.body))
}
pub fn create_command_for_rpc_response(mut rpc_response: RpcResponse) -> RemotingCommand {
let mut cmd = match rpc_response.header.take() {
None => RemotingCommand::create_response_command_with_code(rpc_response.code),
Some(value) => RemotingCommand::create_response_command()
.set_command_custom_header_origin(Some(value)),
};
match rpc_response.exception {
None => {}
Some(value) => cmd.set_remark_ref(Some(value.1.clone())),
}
cmd.set_body(Self::encode_body(rpc_response.body))
}
pub fn encode_body(_body: Option<Box<dyn Any>>) -> Option<Bytes> {
/*if body.is_none() {
return None;
}
let body = body.unwrap();
if body.is::<Bytes>() {
return Some(body.downcast_ref::<Bytes>().unwrap().clone());
} else if body.is::<Vec<u8>>() {
return Some(Bytes::from(
body.downcast_ref::<Vec<u8>>().unwrap().as_ref(),
));
}
/*else if body.is::<dyn RemotingSerializable<Output = Self>>() {
return Some(Bytes::from(
body.downcast_ref::<dyn RemotingSerializable<Output = Self>>()
.unwrap()
.encode(),
));
}*/
else {
None
}*/
None
}
pub fn encode_body(_body: Option<Box<dyn Any>>) -> Option<Bytes> {
if let Some(body) = body {
if body.is::<Bytes>() {
Some(body.downcast_ref::<Bytes>().unwrap().clone())
} else if body.is::<Vec<u8>>() {
Some(Bytes::from(body.downcast_ref::<Vec<u8>>().unwrap().as_ref()))
} else {
None
}
} else {
None
}
}

Comment on lines +21 to +31
#[derive(Default)]
pub struct RpcRequest {
pub code: i32,
pub header: RpcRequestHeader,
pub body: Option<Bytes>,
pub body: Option<Box<dyn Any>>,
}

impl RpcRequest {
pub fn new(code: i32, header: RpcRequestHeader, body: Option<Box<dyn Any>>) -> Self {
Self { code, header, body }
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor RpcRequest to handle any type of body.

The changes to the RpcRequest struct to include an Option<Box<dyn Any>> for the body enhances flexibility but also introduces complexity in managing the type safety of RPC request bodies. Ensure that there are adequate checks and balances to handle the dynamic type safely.

// Ensure type safety when handling the body of RpcRequest.
impl RpcRequest {
    pub fn set_body<T: Any + Send + Sync>(&mut self, body: T) {
        self.body = Some(Box::new(body));
    }

    pub fn get_body<T: Any>(&self) -> Option<&T> {
        self.body.as_ref()?.downcast_ref::<T>()
    }
}

Comment on lines +20 to +33
#[allow(unused_variables)]
impl BroadcastOffsetManager {
pub fn query_init_offset(
&self,
topic: &str,
group_id: &str,
queue_id: i32,
client_id: &str,
request_offset: i64,
from_proxy: bool,
) -> i64 {
unimplemented!()
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement query_init_offset method in BroadcastOffsetManager.

The new method query_init_offset is a critical addition for handling initial offset queries. It currently contains a placeholder implementation (unimplemented!()). Ensure that the actual implementation is completed before merging this PR, and consider adding unit tests to cover this new functionality.

// Suggested test for query_init_offset
#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_query_init_offset() {
        let manager = BroadcastOffsetManager::default();
        assert_eq!(manager.query_init_offset("topic", "group", 1, "client", 0, false), 0);
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved PR has approved auto merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature] Support pull message result handle
2 participants