-
Notifications
You must be signed in to change notification settings - Fork 116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #1021]🚀Support mq client instance clean offline broker🎉 #1024
Conversation
WalkthroughThe changes in this pull request focus on improving the locking mechanisms and broker management within the Changes
Sequence Diagram(s)sequenceDiagram
participant Client as MQClientInstance
participant Broker as Broker Management
participant RouteTable as Topic Route Table
Client->>Broker: Request to clean offline brokers
Broker->>RouteTable: Check for offline brokers
RouteTable-->>Broker: Return list of offline brokers
Broker->>Client: Remove offline brokers from management
Assessment against linked issues
Possibly related PRs
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -64,6 +63,8 @@ use crate::producer::producer_impl::mq_producer_inner::MQProducerInner; | |||
use crate::producer::producer_impl::topic_publish_info::TopicPublishInfo; | |||
use crate::Result; | |||
|
|||
const LOCK_TIMEOUT_MILLIS: u64 = 3000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Suggestion: Make LOCK_TIMEOUT_MILLIS
configurable
Currently, LOCK_TIMEOUT_MILLIS
is hardcoded to 3000 milliseconds. Consider making this value configurable through client settings to provide flexibility for different environments or use cases.
let lock = self | ||
.lock_namesrv | ||
.try_lock_timeout(Duration::from_millis(LOCK_TIMEOUT_MILLIS)) | ||
.await; | ||
if let Some(lock) = lock { | ||
let mut broker_addr_table = self.broker_addr_table.write().await; | ||
let mut updated_table = HashMap::with_capacity(broker_addr_table.len()); | ||
let mut broker_name_set = HashSet::new(); | ||
for (broker_name, one_table) in broker_addr_table.iter() { | ||
let mut clone_addr_table = one_table.clone(); | ||
let mut remove_id_set = HashSet::new(); | ||
for (id, addr) in one_table.iter() { | ||
if !self.is_broker_addr_exist_in_topic_route_table(addr).await { | ||
remove_id_set.insert(*id); | ||
} | ||
} | ||
clone_addr_table.retain(|k, _| !remove_id_set.contains(k)); | ||
if clone_addr_table.is_empty() { | ||
info!( | ||
"the broker[{}] name's host is offline, remove it", | ||
broker_name | ||
); | ||
broker_name_set.insert(broker_name.clone()); | ||
} else { | ||
updated_table.insert(broker_name.clone(), clone_addr_table); | ||
} | ||
} | ||
broker_addr_table.retain(|k, _| !broker_name_set.contains(k)); | ||
if !updated_table.is_empty() { | ||
broker_addr_table.extend(updated_table); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle lock acquisition failure in clean_offline_broker
If try_lock_timeout
fails to acquire the lock_namesrv
lock, the method exits silently without any logging or error handling. Consider adding an else
block to handle the failure case, such as logging a warning or error, to improve observability and debugging.
if self.client_config.use_heartbeat_v2 { | ||
unimplemented!("sendHeartbeatToBrokerV2") | ||
} else { | ||
self.send_heartbeat_to_broker_inner(id, broker_name, addr, &heartbeat_data) | ||
.await | ||
}; | ||
drop(lock); | ||
result | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implement sendHeartbeatToBrokerV2
or handle unimplemented case
In send_heartbeat_to_broker
, when use_heartbeat_v2
is true, the method calls unimplemented!("sendHeartbeatToBrokerV2")
, which will cause a panic at runtime if this path is executed. Please implement the sendHeartbeatToBrokerV2
method or handle this case to prevent potential runtime panics.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1024 +/- ##
==========================================
- Coverage 19.87% 19.85% -0.03%
==========================================
Files 420 420
Lines 34594 34624 +30
==========================================
- Hits 6876 6875 -1
- Misses 27718 27749 +31 ☔ View full report in Codecov by Sentry. |
Which Issue(s) This PR Fixes(Closes)
Fixes #1021
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
Improvements