Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE ##1052] Support request code LOCK_BATCH_MQ(41) #1061

Merged
merged 3 commits into from
Oct 18, 2024
Merged

Conversation

mxsm
Copy link
Owner

@mxsm mxsm commented Oct 18, 2024

Which Issue(s) This PR Fixes(Closes)

Fixes #1052

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

Release Notes

  • New Features

    • Introduced a new method for batch locking of message queues in the Broker API.
    • Added a new field for managing broker member groups in the BrokerRuntime.
  • Improvements

    • Simplified error handling in the Broker2Client communication.
    • Enhanced the BrokerConfig with a strict mode option.
    • Updated the error message format for better clarity in client errors.
  • Bug Fixes

    • Corrected the handling of message queue locking and unlocking.
  • Documentation

    • Updated various method signatures and added new methods for better functionality and clarity.

Copy link
Contributor

coderabbitai bot commented Oct 18, 2024

Walkthrough

The pull request introduces several enhancements and modifications across various components of the RocketMQ broker. Key changes include the addition of new fields and methods to manage broker member groups and batch message queue operations. The error handling has been simplified in client interactions, and structural updates have been made to several data types to improve functionality and clarity. These changes aim to enhance the overall architecture and operational capabilities of the broker.

Changes

File Path Change Summary
rocketmq-broker/src/broker_runtime.rs Added broker_member_group field to BrokerRuntime, updated Clone and new methods.
rocketmq-broker/src/client/net/broker_to_client.rs Changed return type of call_client from BrokerResult<RemotingCommand> to Result<RemotingCommand>.
rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs Updated to use MessageQueue directly instead of Arc<MessageQueue> in method signatures.
rocketmq-broker/src/error.rs Added new enum variant MQBrokerError to BrokerError.
rocketmq-broker/src/lib.rs Changed type alias from BrokerResult<T> to Result<T>.
rocketmq-broker/src/out_api/broker_outer_api.rs Added asynchronous method lock_batch_mq_async to BrokerOuterAPI.
rocketmq-broker/src/processor/admin_broker_processor.rs Added batch_mq_handler field and updated process_request to handle RequestCode::LockBatchMq.
rocketmq-broker/src/processor/admin_broker_processor/batch_mq_handler.rs Introduced BatchMqHandler struct with methods for batch operations.
rocketmq-client/src/error.rs Updated MQBrokerError variant format in MQClientError.
rocketmq-common/src/common/broker/broker_config.rs Added lock_in_strict_mode field to BrokerConfig.
rocketmq-namesrv/src/route/route_info_manager.rs Added get_broker_member_group method to RouteInfoManager.
rocketmq-remoting/src/protocol/body/broker_body/broker_member_group.rs Modified BrokerMemberGroup to require non-optional fields.
rocketmq-remoting/src/protocol/body/request/lock_batch_request_body.rs Updated LockBatchRequestBody to derive Clone.

Assessment against linked issues

Objective Addressed Explanation
Support request code LOCK_BATCH_MQ (41) (#1052)

Possibly related PRs

Suggested labels

approved, auto merge

Suggested reviewers

  • TeslaRustor
  • SpaceXCN

Poem

In the broker's realm, changes take flight,
With members grouped, everything feels right.
Batch locks now dance, in a seamless array,
As errors simplify, guiding the way.
So hop along, dear code, shine bright and true,
For every change brings something new! 🐇✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@mxsm
Copy link
Owner Author

mxsm commented Oct 18, 2024

🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥

Copy link

codecov bot commented Oct 18, 2024

Codecov Report

Attention: Patch coverage is 22.15909% with 137 lines in your changes missing coverage. Please review.

Project coverage is 19.88%. Comparing base (c201729) to head (92cbef7).
Report is 4 commits behind head on main.

Files with missing lines Patch % Lines
...ocessor/admin_broker_processor/batch_mq_handler.rs 0.00% 70 Missing ⚠️
rocketmq-broker/src/out_api/broker_outer_api.rs 0.00% 27 Missing ⚠️
rocketmq-broker/src/broker_runtime.rs 0.00% 12 Missing ⚠️
...tmq-broker/src/processor/admin_broker_processor.rs 0.00% 9 Missing ⚠️
rocketmq-namesrv/src/route/route_info_manager.rs 0.00% 6 Missing ⚠️
...c/protocol/body/broker_body/broker_member_group.rs 0.00% 5 Missing ⚠️
...ker/src/client/rebalance/rebalance_lock_manager.rs 95.12% 2 Missing ⚠️
rocketmq-broker/src/error.rs 0.00% 2 Missing ⚠️
rocketmq-broker/src/client/net/broker_to_client.rs 0.00% 1 Missing ⚠️
rocketmq-client/src/error.rs 0.00% 1 Missing ⚠️
... and 2 more
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1061      +/-   ##
==========================================
- Coverage   19.89%   19.88%   -0.02%     
==========================================
  Files         425      426       +1     
  Lines       35374    35510     +136     
==========================================
+ Hits         7039     7062      +23     
- Misses      28335    28448     +113     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 14

🧹 Outside diff range and nitpick comments (11)
rocketmq-broker/src/error.rs (1)

24-25: LGTM! Consider adjusting parameter order in the error message.

The new MQBrokerError variant is a good addition, providing more detailed information about client exceptions. The error message is clear and includes all the necessary information.

Consider adjusting the order of parameters in the error message to match the declaration order for better readability:

#[error("Client exception occurred: CODE:{0}, Message:{1}, broker address:{2}")]
MQBrokerError(i32, String, String),

This change would make the code more intuitive and easier to maintain.

rocketmq-remoting/src/protocol/body/broker_body/broker_member_group.rs (3)

25-27: Approved: Type changes improve structure clarity and safety.

The modifications to the BrokerMemberGroup struct fields enhance type safety by making cluster and broker_name non-optional, and improve the broker_addrs field by using u64 for broker IDs. These changes align well with the intended use of the struct.

Consider adding documentation comments to explain the significance of these fields and any constraints (e.g., non-empty strings for cluster and broker_name, valid ranges for broker IDs).


31-31: Approved: Constructor signature aligns with struct changes.

The new method signature has been updated to match the changes in the BrokerMemberGroup struct fields. This ensures consistency and makes the API more straightforward to use.

Consider adding an additional constructor method (e.g., with_addrs) that allows initializing the struct with pre-populated broker addresses. This would provide flexibility for cases where the addresses are known at creation time:

impl BrokerMemberGroup {
    pub fn with_addrs(cluster: String, broker_name: String, broker_addrs: HashMap<u64, String>) -> Self {
        Self {
            cluster,
            broker_name,
            broker_addrs,
        }
    }
}

35-35: Approved: Consistent initialization of broker_addrs.

The initialization of broker_addrs with an empty HashMap is consistent with the updated constructor signature and ensures that the field is always properly initialized.

For a minor optimization, consider using HashMap::new() instead of HashMap::new():

broker_addrs: HashMap::new(),

This change doesn't affect functionality but is slightly more idiomatic in Rust.

rocketmq-remoting/src/protocol/body/request/lock_batch_request_body.rs (1)

24-24: LGTM. Consider optimizing Option<String> fields if appropriate.

The addition of the Clone trait is a good improvement, allowing for more flexible use of the LockBatchRequestBody struct. This change is safe and doesn't introduce any issues.

Consider reviewing the use of Option<String> for consumer_group and client_id. If these fields are always expected to have a value, changing them to String could simplify the code and improve performance slightly. For example:

pub struct LockBatchRequestBody {
    pub consumer_group: String,
    pub client_id: String,
    pub only_this_broker: bool,
    pub mq_set: HashSet<MessageQueue>,
}

This change would require updating the Display implementation and any code that constructs or uses this struct. Please verify if this optimization aligns with the intended use of these fields throughout the codebase.

rocketmq-common/src/common/broker/broker_config.rs (3)

172-172: Add documentation for the new field lock_in_strict_mode.

The purpose and impact of the new lock_in_strict_mode field are not clear from its name alone. Please add a doc comment explaining:

  1. What "strict mode" means in this context.
  2. How this field affects the broker's behavior.
  3. Any potential performance implications or use cases for enabling/disabling this mode.

This documentation will help other developers understand when and how to use this new configuration option.


249-249: LGTM. Consider grouping related configuration options.

The addition of lock_in_strict_mode with a default value of false is appropriate. It maintains backward compatibility and allows users to opt-in to the new behavior.

As a minor improvement, consider grouping this new option with other related locking or concurrency settings in the struct initialization, if any exist. This can enhance readability and make it easier for developers to locate and configure related options.


Line range hint 260-479: Update get_properties method to include the new lock_in_strict_mode field.

The get_properties method, which appears to serialize the broker configuration, doesn't include the newly added lock_in_strict_mode field. To ensure consistency and completeness of the configuration representation, please update this method to include the new field.

Add the following line to the get_properties method:

properties.insert(
    "lockInStrictMode".to_string(),
    self.lock_in_strict_mode.to_string(),
);

This will ensure that the new configuration option is properly serialized and available when the properties are used elsewhere in the system.

rocketmq-broker/src/processor/admin_broker_processor/batch_mq_handler.rs (1)

70-74: Avoid magic numbers by defining constants for capacity initialization.

Using hard-coded values like 32 and 8 for HashMap capacities can reduce code clarity. Defining these numbers as constants or configuring them makes the code more maintainable.

For example:

const MQ_LOCK_MAP_INITIAL_CAPACITY: usize = 32;
const ADDR_MAP_INITIAL_CAPACITY: usize = 8;

let mut mq_lock_map = HashMap::with_capacity(MQ_LOCK_MAP_INITIAL_CAPACITY);
let mut addr_map = HashMap::with_capacity(ADDR_MAP_INITIAL_CAPACITY);
rocketmq-broker/src/out_api/broker_outer_api.rs (2)

359-360: Handle invoke_async errors more specifically

Currently, all errors from invoke_async are wrapped in BrokerClientError(e), which might obscure the underlying issue. Consider matching on the error type to provide more specific error handling and better context for troubleshooting.

For example:

- Err(e) => Err(BrokerClientError(e)),
+ Err(e) => match e {
+     // Handle timeout errors
+     RemotingError::RequestTimeout => Err(BrokerError::TimeoutError(e)),
+     // Handle connection errors
+     RemotingError::ConnectionRefused => Err(BrokerError::ConnectionError(e)),
+     // Handle other errors
+     _ => Err(BrokerClientError(e)),
+ }

By matching on specific error variants, you can provide more meaningful error messages and handle different error scenarios appropriately.


330-361: Add unit tests for lock_batch_mq_async

To ensure the reliability of the new lock_batch_mq_async method, it is important to have unit tests that cover various scenarios, such as successful responses, error responses, and exceptions.

Would you like assistance in creating unit tests for this method? Proper testing will help catch potential issues early and ensure that the method behaves as expected under different conditions.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 976a33b and 92cbef7.

📒 Files selected for processing (13)
  • rocketmq-broker/src/broker_runtime.rs (6 hunks)
  • rocketmq-broker/src/client/net/broker_to_client.rs (2 hunks)
  • rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs (8 hunks)
  • rocketmq-broker/src/error.rs (1 hunks)
  • rocketmq-broker/src/lib.rs (1 hunks)
  • rocketmq-broker/src/out_api/broker_outer_api.rs (4 hunks)
  • rocketmq-broker/src/processor/admin_broker_processor.rs (8 hunks)
  • rocketmq-broker/src/processor/admin_broker_processor/batch_mq_handler.rs (1 hunks)
  • rocketmq-client/src/error.rs (1 hunks)
  • rocketmq-common/src/common/broker/broker_config.rs (2 hunks)
  • rocketmq-namesrv/src/route/route_info_manager.rs (1 hunks)
  • rocketmq-remoting/src/protocol/body/broker_body/broker_member_group.rs (1 hunks)
  • rocketmq-remoting/src/protocol/body/request/lock_batch_request_body.rs (1 hunks)
🧰 Additional context used
🔇 Additional comments (26)
rocketmq-broker/src/client/net/broker_to_client.rs (3)

32-32: LGTM! Verify consistency across the codebase.

The change from BrokerResult<RemotingCommand> to Result<RemotingCommand> simplifies the return type while maintaining the existing error handling logic. This is a good improvement for code clarity.

To ensure this change is consistent across the codebase and part of a larger refactoring effort, please run the following script:

#!/bin/bash
# Description: Check for consistency of Result usage across the codebase

# Test 1: Check for any remaining uses of BrokerResult<RemotingCommand>
echo "Checking for remaining uses of BrokerResult<RemotingCommand>:"
rg --type rust 'BrokerResult<RemotingCommand>'

# Test 2: Verify consistent use of Result<RemotingCommand>
echo "Verifying consistent use of Result<RemotingCommand>:"
rg --type rust 'Result<RemotingCommand>'

# Test 3: Check for any potential inconsistencies in error handling
echo "Checking for potential inconsistencies in error handling:"
rg --type rust 'BrokerClientError'

Line range hint 21-32: Overall positive impact. Verify alignment with project's error handling strategy.

The changes to simplify error handling by using a more generic Result type and updating the method signature accordingly are positive improvements. They enhance code clarity and consistency.

To ensure these changes align with the project's overall error handling strategy, please run the following script:

#!/bin/bash
# Description: Verify alignment with project's error handling strategy

# Test 1: Check for any custom error types still in use
echo "Checking for custom error types:"
rg --type rust 'pub enum .*Error'

# Test 2: Verify consistent use of Result across the project
echo "Verifying consistent use of Result:"
rg --type rust 'Result<'

# Test 3: Check for any potential inconsistencies in error mapping
echo "Checking for potential inconsistencies in error mapping:"
rg --type rust 'Err\((.*Error)\)'

Additionally, please confirm that these changes are part of a deliberate effort to standardize error handling across the codebase and that they align with the project's documentation or style guide regarding error handling.


21-21: LGTM! Verify impact on error handling patterns.

The change from BrokerResult to Result simplifies the error handling, which could improve code clarity and consistency. This aligns well with the method signature change.

To ensure this change doesn't break existing error handling patterns, please run the following script:

rocketmq-remoting/src/protocol/body/broker_body/broker_member_group.rs (1)

25-35: Verify impact of BrokerMemberGroup changes on the codebase.

The modifications to BrokerMemberGroup improve its structure and type safety. However, these changes might affect other parts of the codebase that rely on the previous structure, particularly the option to have null values for cluster and broker_name, and the use of i64 for broker IDs.

Please run the following script to identify potential areas that might need updates:

Please review the output of this script and update any affected code accordingly.

rocketmq-namesrv/src/route/route_info_manager.rs (1)

Line range hint 1-859: Summary of changes in route_info_manager.rs

The main change in this file is the addition of the get_broker_member_group function. While the implementation is generally correct, a small improvement has been suggested to make the function's behavior more intuitive and consistent with Rust's idiomatic use of Option.

No other parts of the file were modified, and the change is well-contained within the new function. The overall structure and functionality of the RouteInfoManager remain intact.

rocketmq-broker/src/processor/admin_broker_processor/batch_mq_handler.rs (2)

78-78: Validate the u32 conversion to prevent potential overflows.

Casting addr_map.len() to u32 could lead to an overflow if the length exceeds u32::MAX. Ensure that addr_map.len() is within the u32 range.

Run the following script to check for potential overflows:

#!/bin/bash
# Description: Confirm that 'addr_map.len()' does not exceed 'u32::MAX'.

# Expected: The length of 'addr_map' is within the 'u32' range.
rg --type rust 'CountDownLatch::new\(\s*\(addr_map\.len\(\) as u32\)\s*\)'

65-66: ⚠️ Potential issue

Ensure replica_size is greater than zero to avoid division by zero.

When calculating the quorum, there's a risk of division by zero if replica_size is zero. It's important to validate this value before performing the calculation.

Consider adding a check for replica_size:

 let replica_size = self.inner.message_store_config.total_replicas;
+if replica_size == 0 {
+    return Some(RemotingCommand::create_response_command_with_error("Replica size cannot be zero"));
+}
 let quorum = replica_size / 2 + 1;

Run the following script to verify that total_replicas is always set to a positive integer:

rocketmq-broker/src/processor/admin_broker_processor.rs (10)

24-24: Import Statement Added

The import of BrokerMemberGroup is appropriate and necessary for the added functionality involving broker member groups.


34-34: Import Statement Added

The inclusion of RebalanceLockManager is correct and required for managing rebalance locks.


37-37: Import Statement Added

Adding BatchMqHandler is essential for handling batch message queue operations.


47-47: Module Declaration for batch_mq_handler

The addition of the batch_mq_handler module is appropriate to encapsulate batch MQ handling logic.


59-59: Added batch_mq_handler Field to AdminBrokerProcessor Struct

Including batch_mq_handler as a field in AdminBrokerProcessor aligns with the need to handle batch MQ requests within this processor.


93-94: Initialized New Fields in Inner Struct

The rebalance_lock_manager and broker_member_group fields are appropriately initialized in the Inner struct. This ensures that the necessary components are available for batch MQ handling.


100-100: Initialized batch_mq_handler

Creating a new instance of BatchMqHandler with inner.clone() is correct. This properly initializes the handler with the necessary context.


106-106: Assigned batch_mq_handler in AdminBrokerProcessor

Assigning the initialized batch_mq_handler to the AdminBrokerProcessor struct ensures it is available for request processing.


244-245: Added Fields to Inner Struct

Adding rebalance_lock_manager and broker_member_group to the Inner struct is appropriate. This inclusion provides necessary access to these components within the handler.


76-77: Added Parameters to new Method

The new method of AdminBrokerProcessor now accepts rebalance_lock_manager and broker_member_group parameters. This change is necessary to initialize the new fields added to the Inner struct.

Please ensure that all calls to AdminBrokerProcessor::new throughout the codebase are updated to include the new parameters. You can verify this by running:

rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs (3)

64-66: Update documentation and callers for the changed try_lock_batch method signature

The try_lock_batch method now accepts a &HashSet<MessageQueue> and returns a HashSet<MessageQueue>. Ensure that all callers of this method are updated accordingly, and consider updating any related documentation or comments to reflect this signature change.


99-99: Consistency in handling MessageQueue ownership

When inserting mq into lock_mqs, ensure that ownership is correctly managed. If mq is being moved and used elsewhere, this could lead to issues. Confirm that this insertion does not affect the mq usage in subsequent code.


127-127: Update method signature for unlock_batch and check for potential side effects

The unlock_batch method now accepts a &HashSet<MessageQueue>. Ensure that all invocations of this method pass the correct parameter type. Additionally, verify that the change does not introduce any side effects in the unlocking logic, particularly regarding the handling of HashSet iteration order.

rocketmq-broker/src/broker_runtime.rs (6)

33-33: Import BrokerMemberGroup

The BrokerMemberGroup import is correctly added to use the BrokerMemberGroup struct in the code.


118-118: Add broker_member_group field to BrokerRuntime

The broker_member_group field is appropriately added to the BrokerRuntime struct as an Arc<BrokerMemberGroup> to manage broker member groups.


152-152: Include broker_member_group in Clone implementation

The Clone implementation now includes broker_member_group, ensuring the field is properly cloned along with the rest of the BrokerRuntime fields.


196-203: Initialize broker_member_group in new method

The broker_member_group is correctly initialized with the broker cluster name and broker name. The broker address is added to broker_addrs using the broker ID and the result of broker_config.get_broker_addr(), which ensures proper mapping of broker IDs to their addresses.


236-236: Assign broker_member_group to BrokerRuntime

The broker_member_group is correctly assigned to the BrokerRuntime struct using Arc::new(broker_member_group), ensuring shared ownership and thread-safe access.


486-487: Pass broker_member_group to AdminBrokerProcessor

The broker_member_group is appropriately passed to AdminBrokerProcessor::new, allowing the processor to utilize the broker member group data within its processing logic.

@@ -25,7 +25,7 @@
#[error("{0}")]
RemotingTooMuchRequestError(String),

#[error("Client exception occurred: CODE:{0}, broker address:{1}, Message:{2}")]
#[error("Client exception occurred: CODE:{0}, broker address:{2}, Message:{1}")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

🛠️ Refactor suggestion

Inconsistency Found in Error Message Formatting

The reordering of parameters in the MQBrokerError variant (broker address moved to {2}) introduces inconsistency with the OffsetNotFoundError variant, which still uses {1} for the broker address. This inconsistency is present in multiple areas of the codebase, potentially leading to confusion and errors in error handling.

To address this issue:

  1. Standardize Parameter Ordering:

    • Update the OffsetNotFoundError variant to match the parameter order of MQBrokerError:
      #[error("Client exception occurred: CODE:{0}, broker address:{2}, Message:{1}")]
      OffsetNotFoundError(i32, String, String),
  2. Update All Usages:

    • Ensure that all instances where OffsetNotFoundError is used align with the new parameter order to prevent runtime errors.
  3. Add Explanatory Comments:

    • Document the rationale behind the parameter ordering to aid future maintenance and understanding.
  4. Verify Error Parsing:

    • Confirm that error message parsing mechanisms are updated to accommodate the standardized format.
🔗 Analysis chain

Consider maintaining consistency across error variants and add explanatory comments.

The change in the MQBrokerError variant modifies the error message format, moving the broker address from {1} to {2}. While this change might be intentional, it introduces inconsistency with the OffsetNotFoundError variant, which still uses {1} for the broker address.

To improve code clarity and maintainability:

  1. Consider updating the OffsetNotFoundError variant for consistency:

    #[error("Client exception occurred: CODE:{0}, broker address:{2}, Message:{1}")]
    OffsetNotFoundError(i32, String, String),
  2. Add a comment explaining the reason for this change, which will help future maintainers understand the rationale behind the new format.

  3. Verify that this change doesn't break any existing code that might be parsing these error messages.

To ensure this change doesn't impact other parts of the codebase, please run the following script:

This script will help identify any code that might be affected by the change in error message format.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Search for usages of MQBrokerError and OffsetNotFoundError

# Search for MQBrokerError usages
echo "Searching for MQBrokerError usages:"
rg --type rust "MQBrokerError" -C 3

echo "\nSearching for OffsetNotFoundError usages:"
rg --type rust "OffsetNotFoundError" -C 3

echo "\nSearching for potential error message parsing:"
rg --type rust "broker address:" -C 3

Length of output: 18573

@@ -47,4 +47,4 @@ pub(crate) mod topic;
pub(crate) mod util;

type RemotingError = rocketmq_remoting::error::Error;
type BrokerResult<T> = Result<T, BrokerError>;
type Result<T> = std::result::Result<T, BrokerError>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Remaining usages of BrokerResult found.

Please update the following files to replace BrokerResult with the new Result<T> type:

  • rocketmq-remoting/src/protocol/namesrv.rs
  • rocketmq-namesrv/src/route/route_info_manager.rs
  • rocketmq-broker/src/out_api/broker_outer_api.rs
  • rocketmq-client/src/implementation/find_broker_result.rs
  • rocketmq-client/src/factory/mq_client_instance.rs
🔗 Analysis chain

Approve the type alias change and verify its usage.

The change from BrokerResult<T> to Result<T> is a good improvement. It aligns with Rust's naming conventions and simplifies the code while maintaining the same functionality.

To ensure this change doesn't break existing code, please run the following script to verify the usage of both BrokerResult and Result across the codebase:

Please review the script output and update any remaining occurrences of BrokerResult to use the new Result type.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the usage of BrokerResult and Result types

# Test 1: Check for any remaining usage of BrokerResult
echo "Checking for remaining usage of BrokerResult:"
rg --type rust 'BrokerResult'

# Test 2: Verify the usage of the new Result type
echo "Verifying usage of the new Result type:"
rg --type rust 'Result<.*>'

# Note: If Test 1 returns results, those occurrences need to be updated to use the new Result type.
# If Test 2 doesn't return results (other than in lib.rs), it might indicate that the new type isn't being used yet.

Length of output: 48328

Comment on lines +651 to +659
let mut group_member =
BrokerMemberGroup::new(cluster_name.to_string(), broker_name.to_string());
if let Some(broker_data) = self.broker_addr_table.get(broker_name) {
let map = broker_data.broker_addrs().clone();
return Some(BrokerMemberGroup::new(
Some(cluster_name.to_string()),
Some(broker_name.to_string()),
Some(map),
));
for (key, value) in map {
group_member.broker_addrs.insert(key as u64, value);
}
}
None
Some(group_member)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider returning None when no broker addresses are found.

The current implementation always returns Some(group_member), even if no broker addresses were added. This might lead to confusion for the caller, as they receive a BrokerMemberGroup with an empty broker_addrs map.

Consider modifying the function to return None when no broker addresses are found:

 pub(crate) fn get_broker_member_group(
     &mut self,
     cluster_name: &str,
     broker_name: &str,
 ) -> Option<BrokerMemberGroup> {
     let mut group_member =
         BrokerMemberGroup::new(cluster_name.to_string(), broker_name.to_string());
     if let Some(broker_data) = self.broker_addr_table.get(broker_name) {
         let map = broker_data.broker_addrs().clone();
         for (key, value) in map {
             group_member.broker_addrs.insert(key as u64, value);
         }
-        Some(group_member)
+        if group_member.broker_addrs.is_empty() {
+            None
+        } else {
+            Some(group_member)
+        }
+    } else {
+        None
     }
-    Some(group_member)
 }

This change ensures that the function only returns Some(group_member) when broker addresses are actually found, providing more meaningful information to the caller.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let mut group_member =
BrokerMemberGroup::new(cluster_name.to_string(), broker_name.to_string());
if let Some(broker_data) = self.broker_addr_table.get(broker_name) {
let map = broker_data.broker_addrs().clone();
return Some(BrokerMemberGroup::new(
Some(cluster_name.to_string()),
Some(broker_name.to_string()),
Some(map),
));
for (key, value) in map {
group_member.broker_addrs.insert(key as u64, value);
}
}
None
Some(group_member)
let mut group_member =
BrokerMemberGroup::new(cluster_name.to_string(), broker_name.to_string());
if let Some(broker_data) = self.broker_addr_table.get(broker_name) {
let map = broker_data.broker_addrs().clone();
for (key, value) in map {
group_member.broker_addrs.insert(key as u64, value);
}
if group_member.broker_addrs.is_empty() {
None
} else {
Some(group_member)
}
} else {
None
}

Comment on lines +91 to +99
Ok(lock_ok_mqs) => {
let mut mq_lock_map = mq_lock_map.lock().await;
for mq in lock_ok_mqs {
*mq_lock_map.entry(mq).or_insert(0) += 1;
}
}
Err(e) => {
warn!("lockBatchMQAsync failed: {:?}", e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance error handling for asynchronous operations.

Currently, when lock_batch_mq_async fails, the error is logged but not handled further. Consider implementing retries or alternative error handling strategies to improve reliability.

You might implement an exponential backoff retry mechanism or aggregate failed broker addresses for further analysis.

_request_code: RequestCode,
_request: RemotingCommand,
) -> Option<RemotingCommand> {
unimplemented!("unlockBatchMQ")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Implement unlock_batch_mq or track it with a TODO comment.

The unlock_batch_mq function is currently unimplemented. To prevent unexpected panics during runtime, consider implementing it or adding a clear TODO for future development.

Would you like assistance in creating an initial implementation or opening an issue to track this task?

@@ -163,7 +163,7 @@
}
}

fn is_locked(&self, group: &str, mq: &Arc<MessageQueue>, client_id: &str) -> bool {
fn is_locked(&self, group: &str, mq: &MessageQueue, client_id: &str) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Confirm thread safety without Arc<MessageQueue>

By removing Arc<MessageQueue> and using MessageQueue directly, confirm that thread safety is maintained, especially if MessageQueue contains internal mutability or is shared across threads. The use of RwLock helps with synchronization, but it's important to ensure no data races occur.

Comment on lines +234 to +237
let mq = MessageQueue::default();
let mut set = HashSet::new();
set.insert(mq.clone());
manager.try_lock_batch("test_group", &set, "client_1");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace repetitive test setup code with a helper function

The test cases repeatedly create a default MessageQueue and insert it into a HashSet. Consider refactoring this common setup into a helper function to adhere to the DRY (Don't Repeat Yourself) principle, improving maintainability and readability.

Implement a helper function for creating the test HashSet<MessageQueue>:

fn create_test_mq_set() -> HashSet<MessageQueue> {
    let mq = MessageQueue::default();
    let mut set = HashSet::new();
    set.insert(mq);
    set
}

Then update the tests:

 #[test]
 fn lock_all_expired_returns_false_when_active_locks_exist() {
     let manager = RebalanceLockManager::default();
-    let mq = MessageQueue::default();
-    let mut set = HashSet::new();
-    set.insert(mq.clone());
+    let set = create_test_mq_set();
     manager.try_lock_batch("test_group", &set, "client_1");
     assert!(!manager.is_lock_all_expired("test_group"));
 }

Repeat for other test cases.

Also applies to: 244-247, 254-258, 265-270, 277-281, 288-291, 298-298

Comment on lines +67 to +68
let mut lock_mqs = HashSet::with_capacity(mqs.len());
let mut not_locked_mqs = HashSet::with_capacity(mqs.len());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid unnecessary cloning of MessageQueue when inserting into HashSet

Cloning MessageQueue instances with mq.clone() may introduce performance overhead, especially if MessageQueue is a complex struct. Consider iterating over references and adjusting ownership to minimize or eliminate the need for cloning.

Apply this diff to reduce cloning:

 for mq in mqs.iter() {
     if self.is_locked(group, mq, client_id) {
-        lock_mqs.insert(mq.clone());
+        lock_mqs.insert((*mq).clone());
     } else {
-        not_locked_mqs.insert(mq.clone());
+        not_locked_mqs.insert((*mq).clone());
     }
 }

Committable suggestion was skipped due to low confidence.

Comment on lines +336 to +358
let mut request = RemotingCommand::create_request_command(
RequestCode::LockBatchMq,
LockBatchMqRequestHeader::default(),
);
request.set_body_mut_ref(Some(request_body));
let result = self
.remoting_client
.invoke_async(Some(addr), request, timeout_millis)
.await;
match result {
Ok(response) => {
if ResponseCode::from(response.code()) == ResponseCode::Success {
let lock_batch_response_body =
LockBatchResponseBody::decode(response.get_body().unwrap()).unwrap();
Ok(lock_batch_response_body.lock_ok_mq_set)
} else {
Err(BrokerError::MQBrokerError(
response.code(),
response.remark().cloned().unwrap_or("".to_string()),
"".to_string(),
))
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve logging and error messages for better debugging

When the response code is not ResponseCode::Success, the error returned lacks detailed context. Enhancing the error message with additional information can aid in debugging and provide clearer insights into failures.

Consider including the address and any relevant request details in the error:

Err(BrokerError::MQBrokerError(
    response.code(),
    response.remark().cloned().unwrap_or_else(|| "Unknown error".to_string()),
    "".to_string(),
))
+ .context(format!(
+     "Failed to lock batch MQ at addr: {}, request: {:?}",
+     addr, request
+ ))

This addition provides more context about where and why the failure occurred.

Committable suggestion was skipped due to low confidence.

Comment on lines +349 to +350
LockBatchResponseBody::decode(response.get_body().unwrap()).unwrap();
Ok(lock_batch_response_body.lock_ok_mq_set)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid using unwrap() to handle potential None values

Using unwrap() on response.get_body() can lead to a panic if the body is None. Instead, handle the None case gracefully to prevent unexpected crashes.

Apply this diff to handle the Option safely and propagate errors appropriately:

- let lock_batch_response_body =
-     LockBatchResponseBody::decode(response.get_body().unwrap()).unwrap();
+ let body = response.get_body().ok_or_else(|| {
+     BrokerError::MQBrokerError(
+         response.code(),
+         "Response body is empty".to_string(),
+         "".to_string(),
+     )
+ })?;
+ let lock_batch_response_body = LockBatchResponseBody::decode(body)?;

This modification ensures that if the response body is None, an appropriate error is returned. Additionally, using the ? operator on the decode method propagates any decoding errors without panicking.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
LockBatchResponseBody::decode(response.get_body().unwrap()).unwrap();
Ok(lock_batch_response_body.lock_ok_mq_set)
let body = response.get_body().ok_or_else(|| {
BrokerError::MQBrokerError(
response.code(),
"Response body is empty".to_string(),
"".to_string(),
)
})?;
let lock_batch_response_body = LockBatchResponseBody::decode(body)?;
Ok(lock_batch_response_body.lock_ok_mq_set)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved PR has approved auto merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature🚀] Support request code LOCK_BATCH_MQ(41)
2 participants