diff --git a/rocketmq-client/src/factory/mq_client_instance.rs b/rocketmq-client/src/factory/mq_client_instance.rs index 986167f9..6949f58d 100644 --- a/rocketmq-client/src/factory/mq_client_instance.rs +++ b/rocketmq-client/src/factory/mq_client_instance.rs @@ -41,7 +41,6 @@ use rocketmq_remoting::runtime::RPCHook; use rocketmq_runtime::RocketMQRuntime; use rocketmq_rust::RocketMQTokioMutex; use tokio::runtime::Handle; -use tokio::sync::Mutex; use tokio::sync::RwLock; use tracing::error; use tracing::info; @@ -64,6 +63,8 @@ use crate::producer::producer_impl::mq_producer_inner::MQProducerInner; use crate::producer::producer_impl::topic_publish_info::TopicPublishInfo; use crate::Result; +const LOCK_TIMEOUT_MILLIS: u64 = 3000; + pub struct MQClientInstance { pub(crate) client_config: Arc, pub(crate) client_id: String, @@ -88,7 +89,7 @@ pub struct MQClientInstance { pub(crate) topic_route_table: Arc>>, topic_end_points_table: Arc>>>, - lock_namesrv: Arc>, + lock_namesrv: Arc>, lock_heartbeat: Arc>, service_state: ServiceState, @@ -187,7 +188,7 @@ impl MQClientInstance { topic_route_table: Arc::new(Default::default()), topic_end_points_table: Arc::new(Default::default()), lock_namesrv: Default::default(), - lock_heartbeat: Arc::new(RocketMQTokioMutex::new(())), + lock_heartbeat: Default::default(), service_state: ServiceState::CreateJust, pull_message_service: ArcRefCellWrapper::new(PullMessageService::new()), rebalance_service: RebalanceService::new(), @@ -354,7 +355,7 @@ impl MQClientInstance { let mut client_instance = this.clone(); let poll_name_server_interval = self.client_config.poll_name_server_interval; self.instance_runtime.get_handle().spawn(async move { - info!("ScheduledTask updateTopicRouteInfoFromNameServer started"); + info!("ScheduledTask update_topic_route_info_from_name_server started"); tokio::time::sleep(Duration::from_millis(10)).await; loop { let current_execution_time = tokio::time::Instant::now(); @@ -372,7 +373,7 @@ impl MQClientInstance { let mut client_instance = this.clone(); let heartbeat_broker_interval = self.client_config.heartbeat_broker_interval; self.instance_runtime.get_handle().spawn(async move { - info!("ScheduledTask send_heartbeat_to_all_broker started"); + info!("ScheduledTask clean_offline_broker started"); tokio::time::sleep(Duration::from_secs(1)).await; loop { let current_execution_time = tokio::time::Instant::now(); @@ -632,17 +633,46 @@ impl MQClientInstance { } pub async fn clean_offline_broker(&mut self) { - println!("cleanOfflineBroker") + let lock = self + .lock_namesrv + .try_lock_timeout(Duration::from_millis(LOCK_TIMEOUT_MILLIS)) + .await; + if let Some(lock) = lock { + let mut broker_addr_table = self.broker_addr_table.write().await; + let mut updated_table = HashMap::with_capacity(broker_addr_table.len()); + let mut broker_name_set = HashSet::new(); + for (broker_name, one_table) in broker_addr_table.iter() { + let mut clone_addr_table = one_table.clone(); + let mut remove_id_set = HashSet::new(); + for (id, addr) in one_table.iter() { + if !self.is_broker_addr_exist_in_topic_route_table(addr).await { + remove_id_set.insert(*id); + } + } + clone_addr_table.retain(|k, _| !remove_id_set.contains(k)); + if clone_addr_table.is_empty() { + info!( + "the broker[{}] name's host is offline, remove it", + broker_name + ); + broker_name_set.insert(broker_name.clone()); + } else { + updated_table.insert(broker_name.clone(), clone_addr_table); + } + } + broker_addr_table.retain(|k, _| !broker_name_set.contains(k)); + if !updated_table.is_empty() { + broker_addr_table.extend(updated_table); + } + } } pub async fn send_heartbeat_to_all_broker_with_lock(&mut self) -> bool { if let Some(lock) = self.lock_heartbeat.try_lock().await { - let result = if self.client_config.use_heartbeat_v2 { + if self.client_config.use_heartbeat_v2 { self.send_heartbeat_to_all_broker_v2(false).await } else { self.send_heartbeat_to_all_broker().await - }; - drop(lock); - result + } } else { warn!("lock heartBeat, but failed. [{}]", self.client_id); false @@ -655,13 +685,11 @@ impl MQClientInstance { .try_lock_timeout(Duration::from_secs(2)) .await { - let result = if self.client_config.use_heartbeat_v2 { + if self.client_config.use_heartbeat_v2 { self.send_heartbeat_to_all_broker_v2(is_rebalance).await } else { self.send_heartbeat_to_all_broker().await - }; - drop(lock); - result + } } else { warn!("lock heartBeat, but failed. [{}]", self.client_id); false @@ -745,14 +773,12 @@ impl MQClientInstance { return false; } - let result = if self.client_config.use_heartbeat_v2 { + if self.client_config.use_heartbeat_v2 { unimplemented!("sendHeartbeatToBrokerV2") } else { self.send_heartbeat_to_broker_inner(id, broker_name, addr, &heartbeat_data) .await - }; - drop(lock); - result + } } else { false } @@ -1059,6 +1085,20 @@ impl MQClientInstance { } } } + + async fn is_broker_addr_exist_in_topic_route_table(&self, addr: &str) -> bool { + let topic_route_table = self.topic_route_table.read().await; + for (_, value) in topic_route_table.iter() { + for bd in value.broker_datas.iter() { + for (_, value) in bd.broker_addrs().iter() { + if value.as_str() == addr { + return true; + } + } + } + } + false + } } pub fn topic_route_data2topic_publish_info(