-
Notifications
You must be signed in to change notification settings - Fork 116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #1067] Supprot mq fault strategy #1068
Conversation
WalkthroughThe pull request introduces significant modifications to the RocketMQ client, focusing on enhancing asynchronous capabilities across various components. Key changes include updating the Changes
Assessment against linked issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
🧹 Outside diff range and nitpick comments (5)
rocketmq-remoting/src/protocol/route/topic_route_data.rs (1)
45-51
: Approved: Improved functionality oftopic_route_data_changed
The changes correctly implement the comparison logic for detecting changes in topic route data. By sorting both the current and old data before comparison, we ensure a consistent and accurate result.
Consider a minor optimization: move the sorting operations to a separate method to improve readability and reusability. For example:
impl TopicRouteData { fn sort_data(&mut self) { self.queue_datas.sort(); self.broker_datas.sort(); } pub fn topic_route_data_changed(&self, old_data: Option<&TopicRouteData>) -> bool { if old_data.is_none() { return true; } let mut now = TopicRouteData::from_existing(self); let mut old = TopicRouteData::from_existing(old_data.unwrap()); now.sort_data(); old.sort_data(); now != old } }This refactoring would make the
topic_route_data_changed
method cleaner and allow reuse of the sorting logic if needed elsewhere.rocketmq-remoting/src/protocol/route/route_data_view.rs (1)
Line range hint
18-49
: Summary: Ordering capabilities added toBrokerData
The changes introduce
PartialOrd
andOrd
trait implementations forBrokerData
, enabling comparison and sorting based on thebroker_name
field. This enhancement allowsBrokerData
to be used in ordered collections and sorting operations.While the implementations are correct, it's important to consider the following:
- The choice of
broker_name
as the ordering key may affect howBrokerData
is used throughout the system.- Existing code that works with collections of
BrokerData
might behave differently now that ordering is defined.To ensure these changes align with the overall system design:
- Review any code that sorts or compares
BrokerData
instances to confirm that ordering bybroker_name
is appropriate.- Consider documenting this new behavior in the struct's documentation to inform other developers of the ordering semantics.
- If alternative ordering schemes are needed in some contexts, consider implementing custom comparison functions or using wrapper types with different
Ord
implementations.rocketmq-client/src/producer/default_mq_producer.rs (1)
255-255
: LGTM: Good use of the builder pattern innew()
.The update to the
new()
method to use the builder pattern is a good improvement. It simplifies the creation of a newDefaultMQProducer
instance and is consistent with modern Rust practices.Consider adding a
#[inline]
attribute to this method as well, similar to thebuilder()
method. This could potentially improve performance for frequent instantiations.rocketmq-client/src/latency/latency_fault_tolerance_impl.rs (1)
25-32
: DeriveClone
forLatencyFaultToleranceImpl<R, S>
if cloneability is requiredThe struct
LatencyFaultToleranceImpl<R, S>
contains fields likeOption<R>
andOption<S>
. If instances of this struct need to be cloned elsewhere in your code, consider derivingClone
:#[derive(Clone)] pub struct LatencyFaultToleranceImpl<R, S> { // ... existing fields ... }This makes it easier to work with the struct when cloning is necessary.
rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs (1)
Line range hint
828-837
: Ensure semaphore permits are released after the async task completesCurrently, semaphore permits are dropped immediately after spawning the async task in
execute_async_message_send
. This may lead to incorrect backpressure handling because the permits should be held until the async task completes. Consider moving the permit release into the async task so that the permits are released after the task finishes.Apply this diff to fix the permit handling:
- self.get_async_sender_executor().get_handle().spawn(f); - drop((acquire_value_num, acquire_value_size)); + self.get_async_sender_executor().get_handle().spawn(async move { + let result = f.await; + drop((acquire_value_num, acquire_value_size)); + result + });
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (9)
- rocketmq-client/src/implementation/mq_client_api_impl.rs (2 hunks)
- rocketmq-client/src/latency/latency_fault_tolerance.rs (5 hunks)
- rocketmq-client/src/latency/latency_fault_tolerance_impl.rs (5 hunks)
- rocketmq-client/src/latency/mq_fault_strategy.rs (7 hunks)
- rocketmq-client/src/latency/resolver.rs (1 hunks)
- rocketmq-client/src/producer/default_mq_producer.rs (1 hunks)
- rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs (11 hunks)
- rocketmq-remoting/src/protocol/route/route_data_view.rs (2 hunks)
- rocketmq-remoting/src/protocol/route/topic_route_data.rs (1 hunks)
🧰 Additional context used
🔇 Additional comments (10)
rocketmq-remoting/src/protocol/route/route_data_view.rs (3)
18-18
: LGTM: Necessary import added.The
Ordering
import is correctly added and is required for the new trait implementations.
39-43
: LGTM: Correct implementation ofPartialOrd
.The
PartialOrd
implementation forBrokerData
is correct and follows Rust's best practices by delegating to theOrd
implementation. This ensures consistency between partial and total ordering.
45-49
: LGTM: Correct implementation ofOrd
. Verify ordering choice.The
Ord
implementation forBrokerData
is correct and provides a total ordering based on thebroker_name
field. This allows for sorting and usingBrokerData
in ordered collections.Please confirm that ordering
BrokerData
bybroker_name
is the intended behavior for all use cases. If there are scenarios where a different ordering might be needed, consider implementing a customOrd
trait or using a separate comparison function.To verify the usage of this ordering, you can run the following script:
This will help identify any potential impacts of the new ordering implementation on existing code.
rocketmq-client/src/producer/default_mq_producer.rs (2)
250-253
: LGTM: Good addition of a builder method.The introduction of the
builder()
method is a positive change. It promotes the use of the builder pattern, which is beneficial for creating complex objects likeDefaultMQProducer
. The#[inline]
attribute is also a good optimization hint for the compiler.
250-255
: Verify usage ofDefaultMQProducer
in the codebase.The introduction of the builder pattern for
DefaultMQProducer
is a good improvement. However, it's important to ensure that all existing instantiations ofDefaultMQProducer
in the codebase are updated to use the new builder pattern if necessary.Run the following script to check for direct instantiations of
DefaultMQProducer
that might need to be updated:✅ Verification successful
DefaultMQProducer usage is properly updated.
All instantiations ofDefaultMQProducer
utilize the new builder pattern, and no directnew()
calls were found elsewhere in the codebase.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Find direct instantiations of DefaultMQProducer that might need updating # Search for DefaultMQProducer::new() calls echo "Searching for DefaultMQProducer::new() calls:" rg "DefaultMQProducer::new\(\)" --type rust # Search for DefaultMQProducer { .. } struct instantiations echo -e "\nSearching for DefaultMQProducer { .. } struct instantiations:" rg "DefaultMQProducer\s*\{" --type rustLength of output: 761
rocketmq-client/src/latency/resolver.rs (1)
19-20
: Introduction ofResolverLocal
trait with asyncresolve
methodThe new
ResolverLocal
trait with an asynchronousresolve
method aligns with the shift towards non-blocking operations in the codebase. This change enhances the system's ability to perform asynchronous resolution, improving overall concurrency and responsiveness.Run the following script to ensure all implementations of
ResolverLocal
correctly handle the asyncresolve
method:rocketmq-client/src/latency/latency_fault_tolerance.rs (1)
74-74
:⚠️ Potential issueEnsure proper handling of
Option<T>
inpick_one_at_least
The method
pick_one_at_least
now returnsOption<T>
instead ofT
. Please verify that all implementations and call sites of this method have been updated to handle theNone
case appropriately to prevent potential runtime errors.Run the following script to find usages of
pick_one_at_least
and check for proper handling:✅ Verification successful
No usages of
pick_one_at_least
found. Verification successful.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Find all usages of `pick_one_at_least` to ensure `Option<T>` is handled correctly. # Search for method invocations rg 'pick_one_at_least\(' -A 3 -B 2Length of output: 1159
Script:
#!/bin/bash # Description: Use ast-grep to find all call sites of `pick_one_at_least` and check for proper `Option<T>` handling. # Search for function calls and surrounding handling ast-grep --pattern ' match $$$ { Some($_) => { $$$ }, None => { $$$ }, } ' --lang rustLength of output: 96
rocketmq-client/src/implementation/mq_client_api_impl.rs (3)
460-462
: Correctly awaiting the asynchronousupdate_fault_item
methodThe
update_fault_item
method is now asynchronous, and it's appropriate toawait
it to ensure fault items are updated correctly.
475-477
: Duplicate comment: Correctly awaitingupdate_fault_item
As previously mentioned, awaiting the asynchronous
update_fault_item
method is necessary here as well.
481-483
: Duplicate comment: Correctly awaitingupdate_fault_item
As previously noted, awaiting the asynchronous
update_fault_item
method ensures proper error handling and fault item updates.
fn as_any(&self) -> &dyn Any; | ||
|
||
fn set_service_detector(&mut self, service_detector: Box<dyn ServiceDetector>); | ||
fn as_any_mut(&mut self) -> &mut dyn Any; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Reconsider the use of as_any
and as_any_mut
for downcasting
Introducing as_any
and as_any_mut
methods to enable downcasting can lead to fragile code and may indicate that the trait's abstraction is leaking. Consider redesigning the trait or its implementations to eliminate the need for downcasting. Alternative approaches include:
- Using associated types or generic parameters to retain type information.
- Defining additional trait methods that expose the necessary functionality without requiring downcasting.
- Employing an enum to represent different concrete types if the set of possible types is known and finite.
pub trait LatencyFaultTolerance<T>: Send + Sync + 'static { | ||
use rocketmq_common::ArcRefCellWrapper; | ||
|
||
#[allow(async_fn_in_trait)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using async_trait
instead of #[allow(async_fn_in_trait)]
Using #[allow(async_fn_in_trait)]
permits the use of async functions in traits but relies on an unstable feature that requires nightly Rust. To ensure compatibility with stable Rust, consider using the async_trait
crate, which provides a macro to enable async functions in traits safely.
Apply this diff to use async_trait
:
+use async_trait::async_trait;
-#[allow(async_fn_in_trait)]
-pub trait LatencyFaultTolerance<T, R, S>: Send + Sync + 'static {
+#[async_trait]
+pub trait LatencyFaultTolerance<T, R, S>: Send + Sync + 'static {
Committable suggestion was skipped due to low confidence.
let mut flag = false; | ||
let handle = Handle::current(); | ||
scope(|s| { | ||
s.spawn(|| { | ||
flag = handle.block_on(async { | ||
self.latency_fault_tolerance | ||
.is_reachable(&message_queue.get_broker_name().to_string()) | ||
.await | ||
}); | ||
}); | ||
}); | ||
flag |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Data race due to unsynchronized mutation of flag
in ReachableFilter::filter
The variable flag
is mutated inside a spawned thread without synchronization, which can lead to data races and undefined behavior.
Consider removing the thread and directly blocking on the async function to avoid the data race. Apply this diff to fix the issue:
- let mut flag = false;
- let handle = Handle::current();
- scope(|s| {
- s.spawn(|| {
- flag = handle.block_on(async {
- self.latency_fault_tolerance
- .is_reachable(&message_queue.get_broker_name().to_string())
- .await
- });
- });
- });
- flag
+ let handle = Handle::current();
+ let flag = handle.block_on(async {
+ self.latency_fault_tolerance
+ .is_reachable(&message_queue.get_broker_name().to_string())
+ .await
+ });
+ flag
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let mut flag = false; | |
let handle = Handle::current(); | |
scope(|s| { | |
s.spawn(|| { | |
flag = handle.block_on(async { | |
self.latency_fault_tolerance | |
.is_reachable(&message_queue.get_broker_name().to_string()) | |
.await | |
}); | |
}); | |
}); | |
flag | |
let handle = Handle::current(); | |
let flag = handle.block_on(async { | |
self.latency_fault_tolerance | |
.is_reachable(&message_queue.get_broker_name().to_string()) | |
.await | |
}); | |
flag |
let mut flag = false; | ||
let handle = Handle::current(); | ||
scope(|s| { | ||
s.spawn(|| { | ||
flag = handle.block_on(async { | ||
self.latency_fault_tolerance | ||
.is_available(&message_queue.get_broker_name().to_string()) | ||
.await | ||
}); | ||
}); | ||
}); | ||
flag |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Data race due to unsynchronized mutation of flag
in AvailableFilter::filter
Similar to the previous issue, the variable flag
is mutated inside a spawned thread without proper synchronization, causing potential data races.
Apply the same fix by removing the thread and directly blocking on the async function:
- let mut flag = false;
- let handle = Handle::current();
- scope(|s| {
- s.spawn(|| {
- flag = handle.block_on(async {
- self.latency_fault_tolerance
- .is_available(&message_queue.get_broker_name().to_string())
- .await
- });
- });
- });
- flag
+ let handle = Handle::current();
+ let flag = handle.block_on(async {
+ self.latency_fault_tolerance
+ .is_available(&message_queue.get_broker_name().to_string())
+ .await
+ });
+ flag
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let mut flag = false; | |
let handle = Handle::current(); | |
scope(|s| { | |
s.spawn(|| { | |
flag = handle.block_on(async { | |
self.latency_fault_tolerance | |
.is_available(&message_queue.get_broker_name().to_string()) | |
.await | |
}); | |
}); | |
}); | |
flag | |
let handle = Handle::current(); | |
let flag = handle.block_on(async { | |
self.latency_fault_tolerance | |
.is_available(&message_queue.get_broker_name().to_string()) | |
.await | |
}); | |
flag |
let mut fault_item_table = self.fault_item_table.lock().await; | ||
let mut remove_set = HashSet::new(); | ||
for (name, fault_item) in fault_item_table.iter() { | ||
if get_current_millis() as i64 | ||
- (fault_item | ||
.check_stamp | ||
.load(std::sync::atomic::Ordering::Relaxed) as i64) | ||
< 0 | ||
{ | ||
continue; | ||
} | ||
fault_item.check_stamp.store( | ||
get_current_millis() + self.detect_interval as u64, | ||
std::sync::atomic::Ordering::Release, | ||
); | ||
let broker_addr = self | ||
.resolver | ||
.as_ref() | ||
.unwrap() | ||
.resolve(fault_item.name.as_str()) | ||
.await; | ||
if broker_addr.is_none() { | ||
remove_set.insert(name.clone()); | ||
continue; | ||
} | ||
if self.service_detector.is_none() { | ||
continue; | ||
} | ||
let service_ok = self | ||
.service_detector | ||
.as_ref() | ||
.unwrap() | ||
.detect(broker_addr.unwrap().as_str(), self.detect_timeout as u64); | ||
if service_ok | ||
&& fault_item | ||
.reachable_flag | ||
.load(std::sync::atomic::Ordering::Acquire) | ||
{ | ||
info!("{} is reachable now, then it can be used.", name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid holding the fault_item_table
lock across await points to prevent potential deadlocks
In the detect_by_one_round
method, you're holding the fault_item_table
mutex lock while performing asynchronous operations like resolver.resolve().await
. Holding an asynchronous mutex across await points can lead to reduced concurrency or potential deadlocks because other tasks cannot access the locked resource until the lock is released.
To address this, consider collecting the necessary data while holding the lock and then release it before performing any asynchronous operations. Here's how you might adjust the code:
async fn detect_by_one_round(&self) {
let fault_items: Vec<(String, FaultItem)> = {
let fault_item_table = self.fault_item_table.lock().await;
fault_item_table.iter().map(|(name, item)| (name.clone(), item.clone())).collect()
};
let mut remove_set = HashSet::new();
for (name, fault_item) in fault_items {
if get_current_millis() as i64 - (fault_item.check_stamp.load(Ordering::Relaxed) as i64) < 0 {
continue;
}
fault_item.check_stamp.store(
get_current_millis() + self.detect_interval as u64,
Ordering::Release,
);
let broker_addr = match self.resolver.as_ref() {
Some(resolver) => resolver.resolve(fault_item.name.as_str()).await,
None => {
remove_set.insert(name.clone());
continue;
}
};
if broker_addr.is_none() {
remove_set.insert(name.clone());
continue;
}
if let Some(service_detector) = self.service_detector.as_ref() {
let service_ok = service_detector
.detect(broker_addr.unwrap().as_str(), self.detect_timeout as u64);
if service_ok && fault_item.reachable_flag.load(Ordering::Acquire) {
info!("{} is reachable now, then it can be used.", name);
fault_item.reachable_flag.store(true, Ordering::Release);
}
}
}
let mut fault_item_table = self.fault_item_table.lock().await;
for name in remove_set {
fault_item_table.remove(&name);
}
}
This modification collects the necessary data upfront and releases the lock before the asynchronous calls, enhancing concurrency and preventing potential deadlocks.
let broker_addr = self | ||
.resolver | ||
.as_ref() | ||
.unwrap() | ||
.resolve(fault_item.name.as_str()) | ||
.await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle the possibility of resolver
being None
to prevent potential panics
You're using self.resolver.as_ref().unwrap()
without checking if resolver
is Some
. If resolver
is None
, this will cause a panic at runtime.
Consider handling the None
case gracefully:
let broker_addr = match self.resolver.as_ref() {
Some(resolver) => resolver.resolve(fault_item.name.as_str()).await,
None => {
remove_set.insert(name.clone());
continue;
}
};
This ensures that your code doesn't panic and handles the scenario where resolver
is not set.
fn start_detector(this: ArcRefCellWrapper<Self>) { | ||
tokio::spawn(async move { | ||
loop { | ||
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; | ||
if !this | ||
.start_detector_enable | ||
.load(std::sync::atomic::Ordering::Relaxed) | ||
{ | ||
continue; | ||
} | ||
|
||
this.detect_by_one_round().await; | ||
} | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implement a shutdown mechanism for the detector task to prevent resource leaks
In the start_detector
method, you're spawning a Tokio task that runs indefinitely. Without a way to stop this task, it could continue running even when the instance is no longer needed, leading to resource leaks.
Consider adding a shutdown signal using a tokio::sync::Notify
or a cancellation token to allow the task to exit gracefully when it's appropriate.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1068 +/- ##
==========================================
- Coverage 19.86% 19.80% -0.06%
==========================================
Files 426 426
Lines 35561 35659 +98
==========================================
+ Hits 7063 7064 +1
- Misses 28498 28595 +97 ☔ View full report in Codecov by Sentry. |
Which Issue(s) This PR Fixes(Closes)
Fixes #1067
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
ResolverLocal
trait for non-blocking resolution of broker addresses.Bug Fixes
topic_route_data_changed
to ensure accurate topic route data changes.Documentation
Refactor
DefaultMQProducer
using a builder pattern.