Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1021]🚀Support mq client instance clean offline broker🎉 #1024

Merged
merged 1 commit into from
Sep 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 58 additions & 18 deletions rocketmq-client/src/factory/mq_client_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use rocketmq_remoting::runtime::RPCHook;
use rocketmq_runtime::RocketMQRuntime;
use rocketmq_rust::RocketMQTokioMutex;
use tokio::runtime::Handle;
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use tracing::error;
use tracing::info;
Expand All @@ -64,6 +63,8 @@ use crate::producer::producer_impl::mq_producer_inner::MQProducerInner;
use crate::producer::producer_impl::topic_publish_info::TopicPublishInfo;
use crate::Result;

const LOCK_TIMEOUT_MILLIS: u64 = 3000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Suggestion: Make LOCK_TIMEOUT_MILLIS configurable

Currently, LOCK_TIMEOUT_MILLIS is hardcoded to 3000 milliseconds. Consider making this value configurable through client settings to provide flexibility for different environments or use cases.


pub struct MQClientInstance {
pub(crate) client_config: Arc<ClientConfig>,
pub(crate) client_id: String,
Expand All @@ -88,7 +89,7 @@ pub struct MQClientInstance {
pub(crate) topic_route_table: Arc<RwLock<HashMap<String /* Topic */, TopicRouteData>>>,
topic_end_points_table:
Arc<RwLock<HashMap<String /* Topic */, HashMap<MessageQueue, String /* brokerName */>>>>,
lock_namesrv: Arc<Mutex<()>>,
lock_namesrv: Arc<RocketMQTokioMutex<()>>,
lock_heartbeat: Arc<RocketMQTokioMutex<()>>,

service_state: ServiceState,
Expand Down Expand Up @@ -187,7 +188,7 @@ impl MQClientInstance {
topic_route_table: Arc::new(Default::default()),
topic_end_points_table: Arc::new(Default::default()),
lock_namesrv: Default::default(),
lock_heartbeat: Arc::new(RocketMQTokioMutex::new(())),
lock_heartbeat: Default::default(),
service_state: ServiceState::CreateJust,
pull_message_service: ArcRefCellWrapper::new(PullMessageService::new()),
rebalance_service: RebalanceService::new(),
Expand Down Expand Up @@ -354,7 +355,7 @@ impl MQClientInstance {
let mut client_instance = this.clone();
let poll_name_server_interval = self.client_config.poll_name_server_interval;
self.instance_runtime.get_handle().spawn(async move {
info!("ScheduledTask updateTopicRouteInfoFromNameServer started");
info!("ScheduledTask update_topic_route_info_from_name_server started");
tokio::time::sleep(Duration::from_millis(10)).await;
loop {
let current_execution_time = tokio::time::Instant::now();
Expand All @@ -372,7 +373,7 @@ impl MQClientInstance {
let mut client_instance = this.clone();
let heartbeat_broker_interval = self.client_config.heartbeat_broker_interval;
self.instance_runtime.get_handle().spawn(async move {
info!("ScheduledTask send_heartbeat_to_all_broker started");
info!("ScheduledTask clean_offline_broker started");
tokio::time::sleep(Duration::from_secs(1)).await;
loop {
let current_execution_time = tokio::time::Instant::now();
Expand Down Expand Up @@ -632,17 +633,46 @@ impl MQClientInstance {
}

pub async fn clean_offline_broker(&mut self) {
println!("cleanOfflineBroker")
let lock = self
.lock_namesrv
.try_lock_timeout(Duration::from_millis(LOCK_TIMEOUT_MILLIS))
.await;
if let Some(lock) = lock {
let mut broker_addr_table = self.broker_addr_table.write().await;
let mut updated_table = HashMap::with_capacity(broker_addr_table.len());
let mut broker_name_set = HashSet::new();
for (broker_name, one_table) in broker_addr_table.iter() {
let mut clone_addr_table = one_table.clone();
let mut remove_id_set = HashSet::new();
for (id, addr) in one_table.iter() {
if !self.is_broker_addr_exist_in_topic_route_table(addr).await {
remove_id_set.insert(*id);
}
}
clone_addr_table.retain(|k, _| !remove_id_set.contains(k));
if clone_addr_table.is_empty() {
info!(
"the broker[{}] name's host is offline, remove it",
broker_name
);
broker_name_set.insert(broker_name.clone());
} else {
updated_table.insert(broker_name.clone(), clone_addr_table);
}
}
broker_addr_table.retain(|k, _| !broker_name_set.contains(k));
if !updated_table.is_empty() {
broker_addr_table.extend(updated_table);
}
}
Comment on lines +636 to +667
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle lock acquisition failure in clean_offline_broker

If try_lock_timeout fails to acquire the lock_namesrv lock, the method exits silently without any logging or error handling. Consider adding an else block to handle the failure case, such as logging a warning or error, to improve observability and debugging.

}
pub async fn send_heartbeat_to_all_broker_with_lock(&mut self) -> bool {
if let Some(lock) = self.lock_heartbeat.try_lock().await {
let result = if self.client_config.use_heartbeat_v2 {
if self.client_config.use_heartbeat_v2 {
self.send_heartbeat_to_all_broker_v2(false).await
} else {
self.send_heartbeat_to_all_broker().await
};
drop(lock);
result
}
} else {
warn!("lock heartBeat, but failed. [{}]", self.client_id);
false
Expand All @@ -655,13 +685,11 @@ impl MQClientInstance {
.try_lock_timeout(Duration::from_secs(2))
.await
{
let result = if self.client_config.use_heartbeat_v2 {
if self.client_config.use_heartbeat_v2 {
self.send_heartbeat_to_all_broker_v2(is_rebalance).await
} else {
self.send_heartbeat_to_all_broker().await
};
drop(lock);
result
}
} else {
warn!("lock heartBeat, but failed. [{}]", self.client_id);
false
Expand Down Expand Up @@ -745,14 +773,12 @@ impl MQClientInstance {
return false;
}

let result = if self.client_config.use_heartbeat_v2 {
if self.client_config.use_heartbeat_v2 {
unimplemented!("sendHeartbeatToBrokerV2")
} else {
self.send_heartbeat_to_broker_inner(id, broker_name, addr, &heartbeat_data)
.await
};
drop(lock);
result
}
Comment on lines +776 to +781
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Implement sendHeartbeatToBrokerV2 or handle unimplemented case

In send_heartbeat_to_broker, when use_heartbeat_v2 is true, the method calls unimplemented!("sendHeartbeatToBrokerV2"), which will cause a panic at runtime if this path is executed. Please implement the sendHeartbeatToBrokerV2 method or handle this case to prevent potential runtime panics.

} else {
false
}
Expand Down Expand Up @@ -1059,6 +1085,20 @@ impl MQClientInstance {
}
}
}

async fn is_broker_addr_exist_in_topic_route_table(&self, addr: &str) -> bool {
let topic_route_table = self.topic_route_table.read().await;
for (_, value) in topic_route_table.iter() {
for bd in value.broker_datas.iter() {
for (_, value) in bd.broker_addrs().iter() {
if value.as_str() == addr {
return true;
}
}
}
}
false
}
}

pub fn topic_route_data2topic_publish_info(
Expand Down
Loading