-
Notifications
You must be signed in to change notification settings - Fork 122
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #1398]🔥Implement the TopicRouteInfoManager function🚀 #1399
Conversation
WalkthroughThe pull request introduces enhancements to the RocketMQ broker's functionality by adding a new asynchronous method for retrieving topic route information, a new module for managing topic route information, and a struct that encapsulates this management. It also updates the broker configuration with new fields and modifies the visibility of a method in the Changes
Assessment against linked issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1399 +/- ##
==========================================
- Coverage 21.29% 21.20% -0.10%
==========================================
Files 442 443 +1
Lines 56660 56917 +257
==========================================
+ Hits 12065 12068 +3
- Misses 44595 44849 +254 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Outside diff range and nitpick comments (6)
rocketmq-broker/src/out_api/broker_outer_api.rs (1)
409-414
: Consider adding documentation for the public methodThis is a public API method and should have documentation explaining its purpose, parameters, and return value.
Add rustdoc comments:
+ /// Retrieves topic route information from the name server. + /// + /// # Arguments + /// * `topic` - The topic to get route information for + /// * `timeout_millis` - The timeout in milliseconds for the request + /// * `allow_topic_not_exist` - If true, a warning is logged when topic doesn't exist. If false, an error is returned + /// + /// # Returns + /// * `Ok(TopicRouteData)` - The route information for the topic + /// * `Err(BrokerError)` - If the request fails or the topic doesn't exist pub async fn get_topic_route_info_from_name_server(rocketmq-common/src/common/broker/broker_config.rs (1)
177-179
: Add documentation for the new configuration fields.Please add doc comments explaining:
- The purpose and impact of each field
- Valid value ranges
- Default values and their significance
rocketmq-broker/src/topic/manager/topic_route_info_manager.rs (4)
74-88
: Add error handling for the spawned asynchronous taskIn the
start
method (Lines 74-88), an asynchronous task is spawned to periodically update topic route information. If this task encounters an error or panic, it could terminate silently. Consider adding error handling within the task or a supervisory mechanism to monitor and restart the task if necessary.
231-235
: Avoid unnecessary cloning of thetopic
In
update_topic_publish_info
(Lines 231-235), thetopic
is cloned before insertion into thetopic_publish_info_table
. SinceCheetahString
is likely a reference-counted string, you can avoid cloning by directly using a reference to improve performance.Apply this diff to avoid cloning:
-fn update_topic_publish_info(&self, topic: &CheetahString, info: TopicPublishInfo) { - self.topic_publish_info_table - .mut_from_ref() - .insert(topic.clone(), info); +fn update_topic_publish_info(&self, topic: CheetahString, info: TopicPublishInfo) { + self.topic_publish_info_table + .mut_from_ref() + .insert(topic, info); }
147-148
: Handle potential errors explicitly when unwrappingtopic_route_data
At Line 148,
topic_route_data.unwrap()
unconditionally unwraps the result from the name server. Although an error check is performed earlier, for code clarity and robustness, consider usingexpect
with a descriptive message or pattern matching to handle potentialNone
values explicitly.Apply this diff for improved clarity:
-let mut topic_route_data = topic_route_data.unwrap(); +let mut topic_route_data = topic_route_data.expect("Expected TopicRouteData after successful retrieval from name server");
252-256
: Ensure consistency when updating and retrievingtopic_publish_info
In
try_to_find_topic_publish_info
(Lines 252-256), after updating thetopic_publish_info_table
, there's a brief window where the data might be modified by another task before it's retrieved again. Consider holding the lock during both the update and retrieval to prevent potential race conditions and ensure that the retrieved information is up-to-date.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (5)
rocketmq-broker/src/out_api/broker_outer_api.rs
(3 hunks)rocketmq-broker/src/topic/manager.rs
(1 hunks)rocketmq-broker/src/topic/manager/topic_route_info_manager.rs
(1 hunks)rocketmq-common/src/common/broker/broker_config.rs
(3 hunks)rocketmq-remoting/src/protocol/namespace_util.rs
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- rocketmq-broker/src/topic/manager.rs
🔇 Additional comments (7)
rocketmq-remoting/src/protocol/namespace_util.rs (2)
166-167
: Good performance optimization with #[inline]
!
The addition of the #[inline]
attribute is appropriate for this small, frequently called function. The implementation is correct and well-tested.
166-167
: Verify the necessity of making is_retry_topic
public
Let's verify that this function is actually needed by the TopicRouteInfoManager or other components.
✅ Verification successful
Let me check the actual usage of is_retry_topic
in the topic route manager to verify its necessity.
The is_retry_topic
function needs to be public
The function is actively used across multiple components:
- In
topic_route_info_manager.rs
for special handling of retry topics during route information retrieval - Imported and used by several client-side components:
client_remoting_processor.rs
mq_admin_impl.rs
mq_client_api_impl.rs
default_mq_producer_impl.rs
default_mq_push_consumer.rs
- Also used in broker components:
send_message_processor.rs
broker_runtime.rs
default_pull_message_result_handler.rs
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check if is_retry_topic is used outside this module
# Search for direct usage of is_retry_topic
echo "Searching for direct usage of is_retry_topic..."
rg --type rust "is_retry_topic" -l
# Search for TopicRouteInfoManager implementation to verify the need
echo -e "\nSearching for TopicRouteInfoManager implementation..."
rg --type rust "struct TopicRouteInfoManager" -A 10
Length of output: 1598
Script:
#!/bin/bash
# Check how is_retry_topic is used in topic_route_info_manager.rs
echo "Checking usage in topic_route_info_manager.rs..."
rg --type rust "is_retry_topic" -B 5 -A 5 rocketmq-broker/src/topic/manager/topic_route_info_manager.rs
# Also check if there are any other potential consumers
echo -e "\nChecking for other potential consumers..."
rg --type rust "use.*namespace_util" -B 2 -A 2
Length of output: 7890
rocketmq-broker/src/out_api/broker_outer_api.rs (2)
36-36
: LGTM: Import statements are correctly added
The new imports for GetRouteInfoRequestHeader
and warn
logging level are properly placed and utilized in the new functionality.
Also applies to: 58-58
409-451
: Verify integration with TopicRouteInfoManager
Let's verify how this method is used by the TopicRouteInfoManager implementation.
✅ Verification successful
Let me gather more context about the TopicRouteInfoManager implementation and its integration.
Let me get one final piece of context about the TopicRouteInfoManager's structure and dependencies.
Integration with TopicRouteInfoManager is correctly implemented
The get_topic_route_info_from_name_server
method in BrokerOuterAPI
is properly integrated with TopicRouteInfoManager
:
- It's used as a dependency through
broker_outer_api: Arc<BrokerOuterAPI>
in theTopicRouteInfoManager
struct - The method is called with appropriate parameters (topic, timeout, allow_topic_not_exist) and error handling for
TopicNotExist
response - The implementation aligns with the client-side usage pattern seen in
MQClientAPIImpl
andMQClientInstance
- The response handling matches the expected behavior for updating route information in the manager
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Find usage of get_topic_route_info_from_name_server in TopicRouteInfoManager
# Search for TopicRouteInfoManager implementation
echo "Searching for TopicRouteInfoManager implementation..."
ast-grep --pattern 'struct TopicRouteInfoManager {
$$$
}'
# Search for usage of get_topic_route_info_from_name_server
echo "Searching for method usage..."
rg "get_topic_route_info_from_name_server" -A 5
Length of output: 4622
Script:
#!/bin/bash
# Get the TopicRouteInfoManager implementation details
echo "Searching for TopicRouteInfoManager struct and implementation..."
ast-grep --pattern 'struct TopicRouteInfoManager {
$$$
}'
# Get the specific method where get_topic_route_info_from_name_server is used
echo "Searching for the method using get_topic_route_info_from_name_server in TopicRouteInfoManager..."
rg -B 10 -A 10 "get_topic_route_info_from_name_server.*topic.*GET_TOPIC_ROUTE_TIMEOUT" rocketmq-broker/src/topic/manager/topic_route_info_manager.rs
Length of output: 1393
Script:
#!/bin/bash
# Get the full TopicRouteInfoManager struct definition and its dependencies
echo "Searching for TopicRouteInfoManager struct definition..."
rg -B 5 -A 15 "struct TopicRouteInfoManager" rocketmq-broker/src/topic/manager/topic_route_info_manager.rs
# Get the imports to understand the dependencies
echo "Searching for imports in the file..."
rg "^use" rocketmq-broker/src/topic/manager/topic_route_info_manager.rs
Length of output: 2189
rocketmq-common/src/common/broker/broker_config.rs (2)
27-27
: LGTM!
The import is correctly placed and properly used by the new configuration field.
262-264
: Verify default values match Java implementation.
Please verify that the default values align with the Java implementation of RocketMQ for compatibility:
MessageRequestMode::Pull
as default mode-1
as default pop share queue number30_000
ms as load balance poll interval
✅ Verification successful
Default values match Rust implementation standards
Based on the search results, I can verify the default values:
-
MessageRequestMode::Pull
is consistently used as the default mode across the codebase, as seen in multiple implementations includingSetMessageRequestModeRequestBody::default()
and other configurations. -
-1
asdefault_pop_share_queue_num
is the standard default value in the broker configuration. This appears to be a special value indicating no queue sharing, as the normal pop share queue numbers are non-negative integers (seen in examples using 0, 3, 5). -
30_000
ms (30 seconds) is consistently used as the default polling interval across the codebase, confirmed in both broker config (load_balance_poll_name_server_interval: 30_000
) and client config (poll_name_server_interval: Duration::from_secs(30).as_millis()
).
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Search for default values in Java implementation
# Search for default message request mode
rg -A 5 "defaultMessageRequestMode|DEFAULT_MESSAGE_REQUEST_MODE"
# Search for default pop share queue number
rg -A 5 "defaultPopShareQueueNum|DEFAULT_POP_SHARE_QUEUE_NUM"
# Search for load balance poll interval
rg -A 5 "loadBalancePollNameServerInterval|LOAD_BALANCE_POLL_NAME_SERVER_INTERVAL"
Length of output: 215
Script:
#!/bin/bash
# Let's try searching with more variations and partial terms
# Search for message request mode related constants
rg -i "message.*request.*mode|REQUEST_MODE"
# Search for pop share queue configurations
rg -i "pop.*share.*queue|SHARE_QUEUE"
# Search for name server polling interval
rg -i "poll.*name.*server|NAME_SERVER.*INTERVAL|POLL_NAME_SERVER"
# Also search in any config/configuration files
fd -e yaml -e properties -e xml -e conf -e config --exec rg -i "poll.*interval|request.*mode|share.*queue" {}
Length of output: 21138
rocketmq-broker/src/topic/manager/topic_route_info_manager.rs (1)
290-293
: Verify the logic of incrementing broker_id
when searching for broker addresses
In find_broker_address_in_subscribe
(Lines 290-293), when the broker address is not found for the given broker_id
and it's determined to be a slave broker, the code increments broker_id
by 1 to search for the address. Please verify if incrementing broker_id
is the correct approach for retrieving the intended broker address.
ResponseCode::TopicNotExist => { | ||
if allow_topic_not_exist { | ||
warn!( | ||
"get Topic [{}] RouteInfoFromNameServer is not exist value", | ||
topic | ||
); | ||
} | ||
} | ||
ResponseCode::Success => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Enhance error handling and logging
The match arms could be more informative:
- TopicNotExist case should log an error when
allow_topic_not_exist
is false - The catch-all arm should log the unexpected response code
Consider this improved implementation:
ResponseCode::TopicNotExist => {
if allow_topic_not_exist {
warn!(
- "get Topic [{}] RouteInfoFromNameServer is not exist value",
+ "Topic [{}] does not exist in NameServer",
topic
);
+ } else {
+ error!(
+ "Topic [{}] not found in NameServer and allow_topic_not_exist is false",
+ topic
+ );
}
}
ResponseCode::Success => {
if let Some(body) = response.body() {
// ... existing code ...
}
}
- _ => {}
+ unexpected => {
+ warn!(
+ "Unexpected response code {:?} when getting route info for topic [{}]",
+ unexpected, topic
+ );
+ }
Also applies to: 440-441
if let Some(body) = response.body() { | ||
let topic_route_data = TopicRouteData::decode(body).unwrap(); | ||
return Ok(topic_route_data); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace unwrap() with proper error handling
The unwrap()
call on the TopicRouteData::decode
could panic if the decoding fails. This is a critical reliability issue.
Consider using this safer implementation:
- let topic_route_data = TopicRouteData::decode(body).unwrap();
- return Ok(topic_route_data);
+ return TopicRouteData::decode(body)
+ .map_err(|e| BrokerError::MQBrokerError(
+ response.code(),
+ format!("Failed to decode TopicRouteData: {}", e),
+ "".to_string(),
+ ));
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if let Some(body) = response.body() { | |
let topic_route_data = TopicRouteData::decode(body).unwrap(); | |
return Ok(topic_route_data); | |
} | |
if let Some(body) = response.body() { | |
return TopicRouteData::decode(body) | |
.map_err(|e| BrokerError::MQBrokerError( | |
response.code(), | |
format!("Failed to decode TopicRouteData: {}", e), | |
"".to_string(), | |
)); | |
} |
pub default_message_request_mode: MessageRequestMode, | ||
pub default_pop_share_queue_num: i32, | ||
pub load_balance_poll_name_server_interval: u64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update get_properties() method to include new fields.
The new configuration fields should be included in the properties map returned by get_properties()
. This ensures consistency in configuration serialization and monitoring.
Apply this diff to add the new properties:
pub fn get_properties(&self) -> HashMap<CheetahString, CheetahString> {
let mut properties = HashMap::new();
// ... existing properties ...
+ properties.insert(
+ "defaultMessageRequestMode".into(),
+ self.default_message_request_mode.to_string().into(),
+ );
+ properties.insert(
+ "defaultPopShareQueueNum".into(),
+ self.default_pop_share_queue_num.to_string().into(),
+ );
+ properties.insert(
+ "loadBalancePollNameServerInterval".into(),
+ self.load_balance_poll_name_server_interval.to_string().into(),
+ );
properties
}
Committable suggestion skipped: line range outside the PR's diff.
warn!("try to lock timeout"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Handle lock acquisition failure more robustly
Currently, if the attempt to acquire the lock times out (Lines 157-158), the method logs a warning and returns. This could lead to missed updates in the topic_route_table
and other data structures. Consider implementing a retry mechanism or adjusting the locking strategy to ensure critical updates are not skipped.
Which Issue(s) This PR Fixes(Closes)
Fixes #1398
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
Release Notes
New Features
TopicRouteInfoManager
for efficient topic routing and broker address management.Enhancements
BrokerConfig
with new configuration options for message request mode and load balancing.Improvements
is_retry_topic
method public for broader accessibility.These updates enhance the functionality and configurability of the RocketMQ broker, improving overall user experience.