-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #1230]🚀Support broker receive transaction message-4 #1231
Conversation
Caution Review failedThe pull request is closed. WalkthroughThe pull request introduces several enhancements to the RocketMQ broker's codebase, focusing on the Changes
Assessment against linked issues
Possibly related PRs
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 9
🧹 Outside diff range and nitpick comments (11)
rocketmq-broker/src/transaction/transactional_message_check_listener.rs (1)
20-22
: Enhance trait documentationThe trait documentation should provide more context about:
- When and how this listener is used in the transaction message flow
- What constitutes a "discarded message" in this context
- The relationship with the broker's transaction handling system
Consider adding more comprehensive documentation:
/// Trait defining the listener for transactional message checks. -/// This trait provides a method for resolving discarded messages. +/// This trait is responsible for handling messages that need verification or cleanup +/// during the broker's transaction message processing. It provides mechanisms to +/// resolve messages that have been marked as discarded, typically due to transaction +/// timeouts or failures. +/// +/// Implementations of this trait are used by the broker to maintain transaction +/// consistency and cleanup resources associated with abandoned transactions.rocketmq-remoting/src/protocol/header/check_transaction_state_request_header.rs (1)
Line range hint
89-102
: Consider improving error handling for numeric parsing.The current implementation silently ignores parsing errors for numeric fields. Consider adding error logging or handling for production environments.
Example improvement:
tran_state_table_offset: map .get(&CheetahString::from_static_str( Self::TRAN_STATE_TABLE_OFFSET, )) - .and_then(|v| v.parse().ok()) + .and_then(|v| match v.parse() { + Ok(val) => Some(val), + Err(e) => { + tracing::warn!("Failed to parse tran_state_table_offset: {}", e); + None + } + }) .unwrap_or_default(),rocketmq-broker/src/client/manager/producer_manager.rs (3)
19-20
: Consider a more descriptive name for the atomic counterThe
positive_atomic_counter
name could be more specific to its purpose in channel selection. Consider renaming it tochannel_selection_counter
orround_robin_counter
to better reflect its role in the load balancing mechanism.Also applies to: 36-36
124-141
: Add documentation for the round-robin channel selection behaviorThe method implements a round-robin channel selection strategy but lacks documentation explaining this behavior. Consider adding a doc comment explaining the selection strategy and thread-safety guarantees.
+/// Returns an available channel for the given group using a round-robin selection strategy. +/// This method is thread-safe and ensures fair distribution of channel selection across multiple threads. +/// +/// # Arguments +/// * `group` - The producer group to select a channel from +/// +/// # Returns +/// * `Some(Channel)` - If a channel is available for the group +/// * `None` - If the group doesn't exist or has no channels pub fn get_available_channel(&self, group: Option<&CheetahString>) -> Option<Channel> {
134-136
: Consider using Relaxed ordering for the atomic counterThe current
AcqRel
ordering is stronger than necessary for a simple counter increment. Since we're not using this counter for synchronization,Relaxed
ordering would be sufficient and potentially more performant.- .fetch_add(1, std::sync::atomic::Ordering::AcqRel); + .fetch_add(1, std::sync::atomic::Ordering::Relaxed);rocketmq-remoting/src/net/channel.rs (1)
221-232
: LGTM with minor suggestions for improvement.The implementation of
send_one_way
is well-structured and correctly integrates with the existing channel infrastructure. However, there are two suggestions for improvement:
The
timeout_millis
parameter appears to be unused in the actual implementation. While it's passed to the channel message tuple, there's no timeout logic implemented for one-way messages.The error handling could be more specific about what went wrong during the send operation.
Consider this improved implementation:
pub async fn send_one_way( &mut self, - request: RemotingCommand, - timeout_millis: u64, + request: RemotingCommand ) -> Result<()> { let request = request.mark_oneway_rpc(); - if let Err(err) = self.tx.send((request, None, Some(timeout_millis))).await { + if let Err(err) = self.tx.send((request, None, None)).await { error!("send one way request failed: {}", err); - return Err(ChannelSendRequestFailed(err.to_string())); + return Err(ChannelSendRequestFailed(format!( + "Failed to send one-way request to {}: {}", + self.remote_address, err + ))); } Ok(()) }rocketmq-broker/src/client/net/broker_to_client.rs (2)
45-51
: Unused parameter_group
The parameter
_group
in the functioncheck_producer_transaction_state
is not used within the function body. If this parameter is unnecessary, consider removing it to simplify the code. If it is intended for future use or required by a trait implementation, you can keep it but remove the underscore prefix to adhere to naming conventions.
64-67
: Avoid using magic numbers for timeout valuesIn the call to
channel.send_one_way
, a hardcoded timeout value of100
is used. Consider defining a constant or configuration parameter for timeout values to enhance readability and maintainability.Apply this diff to define a constant for the timeout:
+const SEND_TIMEOUT_MILLIS: u64 = 100; ... match channel.send_one_way(request, SEND_TIMEOUT_MILLIS).await {
rocketmq-broker/src/transaction/queue/default_transactional_message_check_listener.rs (3)
203-211
: Avoid unnecessary cloning to improve performanceCloning large payloads like message bodies and properties can impact performance. Consider using references or smart pointers to minimize copies.
Suggested change:
if let Some(body) = msg_ext.get_body() { - inner.set_body(body.clone()); + inner.set_body(body.to_owned()); } inner.set_flag(msg_ext.get_flag()); - MessageAccessor::set_properties(&mut inner, msg_ext.get_properties().clone()); + MessageAccessor::set_properties(&mut inner, msg_ext.get_properties());
189-191
: Consider enhanced error handling within the spawned taskErrors within the spawned task are only logged. Implementing retry logic or other error handling strategies can ensure reliable message processing.
Suggested approach:
let mut attempts = 0; let max_attempts = 3; while attempts < max_attempts { if let Err(e) = this.send_check_message(msg_ext.clone()).await { warn!("Attempt {}: Send check message failed: {:?}", attempts + 1, e); attempts += 1; // Optionally add a delay between retries tokio::time::sleep(Duration::from_secs(1)).await; } else { break; } }
230-300
: Enhance test coverage and assertionsThe test cases can be expanded to cover more scenarios and assert additional properties, improving confidence in the code's correctness.
Would you like assistance in enhancing the test cases?
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (10)
rocketmq-broker/src/broker_runtime.rs
(3 hunks)rocketmq-broker/src/client/manager/producer_manager.rs
(3 hunks)rocketmq-broker/src/client/net/broker_to_client.rs
(2 hunks)rocketmq-broker/src/error.rs
(1 hunks)rocketmq-broker/src/transaction.rs
(1 hunks)rocketmq-broker/src/transaction/queue/default_transactional_message_check_listener.rs
(1 hunks)rocketmq-broker/src/transaction/transactional_message_check_listener.rs
(1 hunks)rocketmq-broker/src/transaction/transactional_message_check_service.rs
(1 hunks)rocketmq-remoting/src/net/channel.rs
(1 hunks)rocketmq-remoting/src/protocol/header/check_transaction_state_request_header.rs
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- rocketmq-broker/src/transaction.rs
🔇 Additional comments (13)
rocketmq-broker/src/error.rs (2)
19-19
: LGTM: Appropriate use of clippy directive
The #[allow(clippy::enum_variant_names)]
directive is justified here as these are domain-specific error types where the "Broker" prefix adds clarity and context.
25-26
: Verify the error handling flow for transaction state checks
The new BrokerCommonError
variant and its implementation look good. However, let's verify its usage in the transaction state checking context.
✅ Verification successful
Let me gather more information about the error handling flow in transaction state checks.
Let me try one more search to get the complete implementation of the transaction state check method.
Error handling for transaction state checks is properly implemented
The verification shows that the error handling flow for transaction state checks is correctly implemented:
BrokerCommonError
is properly used to handle encoding errors fromMessageDecoder
- Transaction state check errors are appropriately propagated through the Result type
- The error hierarchy with
BrokerClientError
andBrokerCommonError
provides clear error distinction and proper error conversion using the#[from]
attribute
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the usage of BrokerCommonError in transaction-related code
# Search for usage of BrokerCommonError in transaction-related code
echo "Searching for BrokerCommonError usage in transaction handling..."
rg -l "BrokerCommonError" | xargs rg -A 5 "transaction.*state"
# Search for potential error propagation paths
echo "Checking error propagation paths..."
ast-grep --pattern 'Result<$_, BrokerError>'
Length of output: 1471
Script:
#!/bin/bash
# Let's examine the implementation of check_producer_transaction_state and its error handling
# Check the full implementation of check_producer_transaction_state
echo "Checking check_producer_transaction_state implementation..."
ast-grep --pattern 'check_producer_transaction_state($$$) {
$$$
}'
# Look for any error conversions or propagation using BrokerCommonError
echo "Checking BrokerCommonError usage..."
rg -A 5 "BrokerCommonError" rocketmq-broker/src/client/net/broker_to_client.rs
# Check for any transaction state error handling patterns
echo "Checking transaction state error patterns..."
rg -A 5 "Result.*transaction" rocketmq-broker/src/client/net/broker_to_client.rs
Length of output: 876
Script:
#!/bin/bash
# Search for the complete implementation with more context
echo "Searching for complete transaction state check implementation..."
rg -B 5 -A 15 "check_producer_transaction_state" rocketmq-broker/src/client/net/broker_to_client.rs
# Also check for any transaction-related error handling in the error.rs file
echo "Checking error definitions..."
cat rocketmq-broker/src/error.rs
Length of output: 2253
rocketmq-broker/src/transaction/transactional_message_check_listener.rs (1)
22-33
: Clarify mutability requirement
The method requires &mut self
, but it's not clear why mutability is needed.
Could you please clarify:
- Why mutable access to self is required?
- What state changes occur during message resolution?
This will help ensure the interface is not overly restrictive.
rocketmq-remoting/src/protocol/header/check_transaction_state_request_header.rs (2)
Line range hint 26-35
: LGTM! Well-structured transaction state header.
The struct is well-designed with appropriate field types and serde attributes:
- Optional
CheetahString
fields for variable-length data - Non-optional
i64
fields for required offsets - Proper flattening of the
RpcRequestHeader
25-25
: LGTM! Verify usage of Default implementation.
Adding the Default
trait is appropriate for this struct as it contains optional fields and is used in transaction state checking. This change aligns well with Rust's patterns and the existing unwrap_or_default()
usage in the FromMap
implementation.
Let's verify how this new Default
implementation is being used:
✅ Verification successful
Let me analyze the results and generate the final response:
Default trait implementation is correctly used and safe
The Default
trait addition is verified to be appropriate and safe because:
- The struct is only constructed explicitly with values in
default_transactional_message_check_listener.rs
and throughFromMap
implementation - No direct usage of
Default::default()
was found, indicating the trait is primarily used to support theFromMap
trait's implementation - All optional fields (
Option<T>
) naturally getNone
as their default value, while numeric fields get0
as default, which is safe forcommit_log_offset
andtran_state_table_offset
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for direct usage of Default::default() or struct initialization
rg -t rust "CheckTransactionStateRequestHeader::default\(\)"
rg -t rust "CheckTransactionStateRequestHeader \{" -A 5
Length of output: 4572
rocketmq-broker/src/client/manager/producer_manager.rs (2)
44-44
: LGTM!
The atomic counter initialization is correctly implemented using Default::default().
124-141
: Verify the impact on transaction message handling
Since this code is part of transaction message support, we should verify how this channel selection affects transaction message processing.
✅ Verification successful
Let me gather more specific information about how get_available_channel
is used in the transaction-related code.
Let me check one more aspect to understand the transaction state checking flow completely.
Based on the gathered information, I can now provide the final response:
The channel selection implementation is correctly handling transaction message processing
The get_available_channel
method is appropriately used in the transaction message flow:
- It's used by
DefaultTransactionalMessageCheckListener
to obtain a channel for checking transaction states - The channel selection is properly integrated with producer group registration/unregistration
- Failed channel retrieval is properly handled with appropriate warning logs
- The implementation maintains consistency with the broker-client transaction state verification protocol
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for transaction-related usages of get_available_channel
rg -l "get_available_channel" | xargs rg "transaction"
# Search for transaction message handling patterns
ast-grep --pattern 'impl $_ {
$$$
fn $_(transaction$_) {
$$$
get_available_channel($$$)
$$$
}
$$$
}'
Length of output: 981
Script:
#!/bin/bash
# Look for the actual usage of get_available_channel in transaction code
rg -B 5 -A 5 "get_available_channel" rocketmq-broker/src/transaction/queue/default_transactional_message_check_listener.rs
# Find the implementation of check_producer_transaction_state
ast-grep --pattern 'fn check_producer_transaction_state($$$) {
$$$
}'
# Look for any producer group related transaction handling
rg -B 3 -A 3 "producer.*group.*transaction|transaction.*producer.*group"
Length of output: 728
Script:
#!/bin/bash
# Look for the broker client's check_producer_transaction_state implementation
rg -B 5 -A 10 "check_producer_transaction_state"
# Look for any transaction state handling in the codebase
ast-grep --pattern 'fn $_(transaction_state$_) {
$$$
}'
# Find where transaction producer groups are registered
rg -B 3 -A 3 "register.*producer.*group|producer.*group.*register"
Length of output: 8266
rocketmq-remoting/src/net/channel.rs (1)
221-232
: Verify the integration with transaction message handling.
Since this method is intended to support broker transaction message handling, let's verify its usage pattern.
✅ Verification successful
Based on the search results, I can now generate a final response about the integration of send_one_way
with transaction message handling:
The one-way RPC implementation is correctly integrated with transaction message handling
The code shows proper integration between the send_one_way
method and transaction message handling:
- The broker's
Broker2Client
uses one-way RPC to check transaction states viacheck_producer_transaction_state
method - The transaction check flow is well-defined:
TransactionalMessageCheckService
initiates checksDefaultTransactionalMessageCheckListener
processes checksBroker2Client
sends one-way requests to producers- Producers respond with transaction states (COMMIT/ROLLBACK/UNKNOWN)
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the integration of send_one_way with transaction handling
# Search for usages of send_one_way in transaction-related code
echo "Searching for send_one_way usage in transaction handling..."
rg -l "send_one_way" | xargs rg -B 5 -A 5 "transaction.*send_one_way|send_one_way.*transaction"
# Search for potential transaction-related command types that should use one-way communication
echo "Checking for transaction-related command types..."
rg -B 2 -A 2 "RemotingCommandCode::.*TRANSACTION"
Length of output: 385
Script:
#!/bin/bash
# Let's try a different approach to find transaction-related code and one-way communication patterns
# Search for files containing 'send_one_way'
echo "Files containing send_one_way:"
rg --files-with-matches "send_one_way"
# Search for transaction-related code in the codebase
echo -e "\nTransaction-related code patterns:"
rg -B 3 -A 3 "(?i)transaction.*state|check.*transaction"
# Search for RemotingCommand usage with one-way pattern
echo -e "\nRemotingCommand one-way patterns:"
rg -B 3 -A 3 "mark_oneway_rpc"
# Look for broker client communication patterns
echo -e "\nBroker client communication patterns:"
rg -B 3 -A 3 "Broker2Client|broker.*client"
Length of output: 90510
rocketmq-broker/src/broker_runtime.rs (2)
57-57
: LGTM!
The import of Broker2Client
is correctly placed and necessary for the transactional message handling functionality.
130-132
: LGTM! Type parameter addition improves type safety
The addition of DefaultMessageStore
as a generic parameter to DefaultTransactionalMessageCheckListener
enhances type safety and makes the relationship between the listener and message store explicit.
rocketmq-broker/src/client/net/broker_to_client.rs (2)
56-63
: Consistent error handling with BrokerError
variants
In line 61, errors from MessageDecoder::encode
are wrapped with BrokerCommonError
, whereas in line 66, errors from channel.send_one_way
are wrapped with BrokerClientError
. Review the usage of these error variants to ensure consistency and that they accurately represent the error contexts.
Run the following script to examine the usage of BrokerCommonError
and BrokerClientError
in error handling:
#!/bin/bash
# Description: Find all instances where errors are wrapped with `BrokerCommonError` and `BrokerClientError`.
# Find occurrences of `BrokerCommonError` in error returns.
rg 'Err\([^)]*\) => Err\(BrokerCommonError'
# Find occurrences of `BrokerClientError` in error returns.
rg 'Err\([^)]*\) => Err\(BrokerClientError'
64-67
: Verify the usage of the timeout parameter in send_one_way
Ensure that the send_one_way
method accepts a timeout parameter and that using the value 100
is appropriate. If send_one_way
does not utilize the timeout, consider removing it to prevent confusion.
Run the following script to confirm the method signature of send_one_way
:
✅ Verification successful
Let me try a different search pattern to find the send_one_way
method definition.
Let me check how this timeout is used in the implementation.
Let me try one more search to get the implementation details with a different pattern.
The timeout parameter in send_one_way
is used and appropriate
The implementation shows that the timeout_millis
parameter in send_one_way
is properly used in the channel's message passing system. The value is passed through to the internal channel sender (self.tx.send((request, None, Some(timeout_millis)))
), which is used by the run_send
function to handle timeouts for message processing. The value of 100 milliseconds is a reasonable timeout for one-way messaging.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify if `send_one_way` accepts a timeout parameter.
# Search for the `send_one_way` method definition and display its signature.
rg -A 3 'fn\s+send_one_way\s*\('
Length of output: 485
Script:
#!/bin/bash
# Search for send_one_way method definition using ast-grep
ast-grep --pattern 'async fn send_one_way($$$) {
$$$
}'
# Also try with ripgrep for a broader search
rg -B 2 -A 5 'send_one_way'
Length of output: 1162
Script:
#!/bin/bash
# Get the implementation details of send_one_way to see how timeout is used
rg -B 5 -A 15 'pub async fn send_one_way.*timeout_millis' rocketmq-remoting/src/net/channel.rs
Length of output: 96
Script:
#!/bin/bash
# Get the content of the channel.rs file to see the implementation
cat rocketmq-remoting/src/net/channel.rs
# Also search for any documentation or comments about timeout usage
rg -B 2 -A 2 'timeout.*millis' rocketmq-remoting/src/net/channel.rs
Length of output: 10291
rocketmq-broker/src/transaction/queue/default_transactional_message_check_listener.rs (1)
217-217
: Ensure correct assignment of queue_id
Currently, queue_id
is hardcoded to zero, which may not distribute messages across different queues as intended.
Run the following script to check if messages are properly distributed across queues:
✅ Verification successful
Based on the code analysis, I can now provide a final response:
Queue ID assignment is intentionally set to 0 for transactional message check
The hardcoded queue_id = 0 is correct in this context. This code is part of the transactional message check listener implementation where:
- Transactional messages are temporarily stored in a special queue (queue_id = 0) while waiting for transaction status check
- The original queue_id is preserved in the message properties as "REAL_QUEUE_ID" and restored when the transaction completes
- This is consistent with RocketMQ's design where transactional message processing uses a dedicated queue
The code correctly handles queue assignment:
- Saves original queue_id in message properties before setting to 0
- Restores the original queue_id when transaction completes
- Uses a single queue (0) for all transactional messages to maintain order
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify that messages are assigned to different queues.
# Expectation: Messages should be distributed among available queues.
# Count the number of messages per queue_id in the message store logs.
rg 'queue_id' ./message_store/ | grep -o 'queue_id: [0-9]*' | sort | uniq -c
Length of output: 133
Script:
#!/bin/bash
# Let's analyze the codebase to understand the context of queue_id assignment
# First, let's find where queue_id is used in the codebase
rg "queue_id" -A 3 -B 3
# Also check for any queue configuration or queue-related constants
ast-grep --pattern 'const $_: $_ = $_; // .*queue.*'
# Look for methods that handle queue assignment
ast-grep --pattern 'fn $_(queue_id: $_) {
$$$
}'
Length of output: 242701
#[derive(Clone)] | ||
pub struct TransactionalMessageCheckService; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add documentation and implementation details for TransactionalMessageCheckService
While adding Clone
is appropriate for a service that might be shared across components, the current implementation appears to be a placeholder. Consider:
- Adding documentation explaining the service's purpose and responsibilities in transaction message handling
- Implementing the core functionality for transaction message checking
Would you like help with:
- Generating documentation that follows Rust documentation guidelines?
- Implementing the core transaction message checking functionality based on RocketMQ's transaction message specifications?
rocketmq-broker/src/transaction/transactional_message_check_listener.rs
Outdated
Show resolved
Hide resolved
let channels = channel_map.keys().collect::<Vec<&Channel>>(); | ||
let index = self | ||
.positive_atomic_counter | ||
.fetch_add(1, std::sync::atomic::Ordering::AcqRel); | ||
let index = index.unsigned_abs() as usize % channels.len(); | ||
let channel = channels[index].clone(); | ||
return Some(channel); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Optimize channel selection implementation
The current implementation has several areas for improvement:
- Creates a temporary Vec allocation for keys
- Doesn't consider channel health or status
- Could be more efficient using iterator operations
Consider this more efficient implementation:
- let channels = channel_map.keys().collect::<Vec<&Channel>>();
- let index = self
- .positive_atomic_counter
- .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
- let index = index.unsigned_abs() as usize % channels.len();
- let channel = channels[index].clone();
- return Some(channel);
+ let len = channel_map.len();
+ let index = self
+ .positive_atomic_counter
+ .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
+ .unsigned_abs() as usize % len;
+ channel_map
+ .iter()
+ .nth(index)
+ .map(|(channel, _)| channel.clone())
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let channels = channel_map.keys().collect::<Vec<&Channel>>(); | |
let index = self | |
.positive_atomic_counter | |
.fetch_add(1, std::sync::atomic::Ordering::AcqRel); | |
let index = index.unsigned_abs() as usize % channels.len(); | |
let channel = channels[index].clone(); | |
return Some(channel); | |
let len = channel_map.len(); | |
let index = self | |
.positive_atomic_counter | |
.fetch_add(1, std::sync::atomic::Ordering::Relaxed) | |
.unsigned_abs() as usize % len; | |
channel_map | |
.iter() | |
.nth(index) | |
.map(|(channel, _)| channel.clone()) |
Some(Arc::new(DefaultTransactionalMessageCheckListener::new( | ||
self.broker_config.clone(), | ||
self.producer_manager.clone(), | ||
Broker2Client, | ||
self.topic_config_manager.clone(), | ||
self.message_store.as_ref().cloned().unwrap(), | ||
))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Review initialization of Broker2Client and cloning approach
Two suggestions for improvement:
Broker2Client
is used directly without instantiation. Consider if this is intentional or if an instance should be created.- Replace
cloned()
withclone()
on line 700 for consistency with Rust idioms.
Apply this diff to improve the code:
- Broker2Client,
+ Broker2Client::new(), // Add appropriate parameters if needed
- self.message_store.as_ref().cloned().unwrap(),
+ self.message_store.as_ref().unwrap().clone(),
Committable suggestion skipped: line range outside the PR's diff.
fn to_message_ext_broker_inner( | ||
topic_config: &TopicConfig, | ||
msg_ext: &MessageExt, | ||
) -> MessageExtBrokerInner { | ||
// TODO | ||
//let queue_id = rand::thread_rng().gen_range(0..=99999999) % TCMT_QUEUE_NUMS; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implement the queue selection logic in to_message_ext_broker_inner
The TODO comment indicates that the queue selection logic is not yet implemented. This may affect message routing and load balancing. Implement the logic to assign a proper queue_id
.
Would you like assistance in implementing the queue selection logic?
rocketmq-broker/src/transaction/queue/default_transactional_message_check_listener.rs
Outdated
Show resolved
Hide resolved
pub async fn send_check_message(&self, mut msg_ext: MessageExt) -> crate::Result<()> { | ||
let msg_id = msg_ext.get_user_property(&CheetahString::from_static_str( | ||
MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, | ||
)); | ||
let header = CheckTransactionStateRequestHeader { | ||
topic: Some(msg_ext.message.topic.clone()), | ||
commit_log_offset: msg_ext.commit_log_offset, | ||
offset_msg_id: Some(msg_ext.msg_id().clone()), | ||
msg_id: msg_id.clone(), | ||
transaction_id: msg_id, | ||
tran_state_table_offset: msg_ext.queue_offset, | ||
rpc_request_header: Some(RpcRequestHeader { | ||
broker_name: Some(self.broker_config.broker_name.clone()), | ||
..Default::default() | ||
}), | ||
}; | ||
let topic = msg_ext.get_user_property(&CheetahString::from_static_str( | ||
MessageConst::PROPERTY_REAL_TOPIC, | ||
)); | ||
if let Some(topic) = topic { | ||
msg_ext.set_topic(topic); | ||
} | ||
let queue_id = msg_ext.get_user_property(&CheetahString::from_static_str( | ||
MessageConst::PROPERTY_REAL_QUEUE_ID, | ||
)); | ||
if let Some(queue_id) = queue_id { | ||
msg_ext.set_queue_id(queue_id.as_str().parse::<i32>().unwrap_or_default()); | ||
} | ||
msg_ext.store_size = 0; | ||
let group_id = msg_ext.get_user_property(&CheetahString::from_static_str( | ||
MessageConst::PROPERTY_PRODUCER_GROUP, | ||
)); | ||
let channel = self | ||
.producer_manager | ||
.get_available_channel(group_id.as_ref()); | ||
if let Some(mut channel) = channel { | ||
let _ = self | ||
.broker_client | ||
.check_producer_transaction_state( | ||
group_id.as_ref().unwrap(), | ||
&mut channel, | ||
header, | ||
msg_ext, | ||
) | ||
.await; | ||
} else { | ||
warn!( | ||
"Check transaction failed, channel is null. groupId={:?}", | ||
group_id | ||
); | ||
} | ||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure proper error propagation in send_check_message
The method returns a Result
, but errors are not propagated in some branches. Ensure that all errors are returned to the caller.
Suggested change:
} else {
warn!(
"Check transaction failed, channel is null. groupId={:?}",
group_id
);
+ return Err(crate::Error::ChannelUnavailable(group_id.map(|g| g.to_string())));
}
Committable suggestion skipped: line range outside the PR's diff.
msg_ext.set_queue_id(queue_id.as_str().parse::<i32>().unwrap_or_default()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explicitly handle parsing errors when setting queue_id
Using unwrap_or_default()
may silently assign queue_id
as zero if parsing fails, which might not be the intended behavior. Consider handling the parsing error to avoid unexpected queue_id
values.
Suggested change:
msg_ext.set_queue_id(
- queue_id.as_str().parse::<i32>().unwrap_or_default()
+ match queue_id.as_str().parse::<i32>() {
+ Ok(id) => id,
+ Err(e) => {
+ warn!("Failed to parse REAL_QUEUE_ID: {:?}, error: {:?}", queue_id, e);
+ // Handle the error appropriately, possibly by returning or assigning a default
+ return Err(crate::Error::InvalidQueueId(e.to_string()));
+ }
+ }
);
Committable suggestion skipped: line range outside the PR's diff.
.get_available_channel(group_id.as_ref()); | ||
if let Some(mut channel) = channel { | ||
let _ = self | ||
.broker_client | ||
.check_producer_transaction_state( | ||
group_id.as_ref().unwrap(), | ||
&mut channel, | ||
header, | ||
msg_ext, | ||
) | ||
.await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check for errors when sending the check message
Currently, the result of check_producer_transaction_state
is ignored. It's important to check for errors to ensure any failures are appropriately handled.
Suggested change:
let result = self
.broker_client
.check_producer_transaction_state(
group_id.as_ref().unwrap(),
&mut channel,
header,
msg_ext,
)
.await;
+ if let Err(e) = result {
+ warn!("Failed to send check producer transaction state: {:?}", e);
+ // Consider retrying or handling the error appropriately
+ }
Committable suggestion skipped: line range outside the PR's diff.
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
…stener.rs Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
…ssage_check_listener.rs Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1231 +/- ##
==========================================
+ Coverage 17.94% 18.02% +0.08%
==========================================
Files 427 428 +1
Lines 53297 53561 +264
==========================================
+ Hits 9563 9655 +92
- Misses 43734 43906 +172 ☔ View full report in Codecov by Sentry. 🚨 Try these New Features:
|
Which Issue(s) This PR Fixes(Closes)
Fixes #1230
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
Release Notes
New Features
Bug Fixes
Documentation
Chores