Skip to content

Commit

Permalink
[ISSUE #1021]🚀Support mq client instance clean offline broker🎉 (#1024)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Sep 30, 2024
1 parent 3dfebe6 commit 3ac6cd0
Showing 1 changed file with 58 additions and 18 deletions.
76 changes: 58 additions & 18 deletions rocketmq-client/src/factory/mq_client_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use rocketmq_remoting::runtime::RPCHook;
use rocketmq_runtime::RocketMQRuntime;
use rocketmq_rust::RocketMQTokioMutex;
use tokio::runtime::Handle;
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use tracing::error;
use tracing::info;
Expand All @@ -64,6 +63,8 @@ use crate::producer::producer_impl::mq_producer_inner::MQProducerInner;
use crate::producer::producer_impl::topic_publish_info::TopicPublishInfo;
use crate::Result;

const LOCK_TIMEOUT_MILLIS: u64 = 3000;

pub struct MQClientInstance {
pub(crate) client_config: Arc<ClientConfig>,
pub(crate) client_id: String,
Expand All @@ -88,7 +89,7 @@ pub struct MQClientInstance {
pub(crate) topic_route_table: Arc<RwLock<HashMap<String /* Topic */, TopicRouteData>>>,
topic_end_points_table:
Arc<RwLock<HashMap<String /* Topic */, HashMap<MessageQueue, String /* brokerName */>>>>,
lock_namesrv: Arc<Mutex<()>>,
lock_namesrv: Arc<RocketMQTokioMutex<()>>,
lock_heartbeat: Arc<RocketMQTokioMutex<()>>,

service_state: ServiceState,
Expand Down Expand Up @@ -187,7 +188,7 @@ impl MQClientInstance {
topic_route_table: Arc::new(Default::default()),
topic_end_points_table: Arc::new(Default::default()),
lock_namesrv: Default::default(),
lock_heartbeat: Arc::new(RocketMQTokioMutex::new(())),
lock_heartbeat: Default::default(),
service_state: ServiceState::CreateJust,
pull_message_service: ArcRefCellWrapper::new(PullMessageService::new()),
rebalance_service: RebalanceService::new(),
Expand Down Expand Up @@ -354,7 +355,7 @@ impl MQClientInstance {
let mut client_instance = this.clone();
let poll_name_server_interval = self.client_config.poll_name_server_interval;
self.instance_runtime.get_handle().spawn(async move {
info!("ScheduledTask updateTopicRouteInfoFromNameServer started");
info!("ScheduledTask update_topic_route_info_from_name_server started");
tokio::time::sleep(Duration::from_millis(10)).await;
loop {
let current_execution_time = tokio::time::Instant::now();
Expand All @@ -372,7 +373,7 @@ impl MQClientInstance {
let mut client_instance = this.clone();
let heartbeat_broker_interval = self.client_config.heartbeat_broker_interval;
self.instance_runtime.get_handle().spawn(async move {
info!("ScheduledTask send_heartbeat_to_all_broker started");
info!("ScheduledTask clean_offline_broker started");
tokio::time::sleep(Duration::from_secs(1)).await;
loop {
let current_execution_time = tokio::time::Instant::now();
Expand Down Expand Up @@ -632,17 +633,46 @@ impl MQClientInstance {
}

pub async fn clean_offline_broker(&mut self) {
println!("cleanOfflineBroker")
let lock = self
.lock_namesrv
.try_lock_timeout(Duration::from_millis(LOCK_TIMEOUT_MILLIS))
.await;
if let Some(lock) = lock {
let mut broker_addr_table = self.broker_addr_table.write().await;
let mut updated_table = HashMap::with_capacity(broker_addr_table.len());
let mut broker_name_set = HashSet::new();
for (broker_name, one_table) in broker_addr_table.iter() {
let mut clone_addr_table = one_table.clone();
let mut remove_id_set = HashSet::new();
for (id, addr) in one_table.iter() {
if !self.is_broker_addr_exist_in_topic_route_table(addr).await {
remove_id_set.insert(*id);
}
}
clone_addr_table.retain(|k, _| !remove_id_set.contains(k));
if clone_addr_table.is_empty() {
info!(
"the broker[{}] name's host is offline, remove it",
broker_name
);
broker_name_set.insert(broker_name.clone());
} else {
updated_table.insert(broker_name.clone(), clone_addr_table);
}
}
broker_addr_table.retain(|k, _| !broker_name_set.contains(k));
if !updated_table.is_empty() {
broker_addr_table.extend(updated_table);
}
}
}
pub async fn send_heartbeat_to_all_broker_with_lock(&mut self) -> bool {
if let Some(lock) = self.lock_heartbeat.try_lock().await {
let result = if self.client_config.use_heartbeat_v2 {
if self.client_config.use_heartbeat_v2 {
self.send_heartbeat_to_all_broker_v2(false).await
} else {
self.send_heartbeat_to_all_broker().await
};
drop(lock);
result
}
} else {
warn!("lock heartBeat, but failed. [{}]", self.client_id);
false
Expand All @@ -655,13 +685,11 @@ impl MQClientInstance {
.try_lock_timeout(Duration::from_secs(2))
.await
{
let result = if self.client_config.use_heartbeat_v2 {
if self.client_config.use_heartbeat_v2 {
self.send_heartbeat_to_all_broker_v2(is_rebalance).await
} else {
self.send_heartbeat_to_all_broker().await
};
drop(lock);
result
}
} else {
warn!("lock heartBeat, but failed. [{}]", self.client_id);
false
Expand Down Expand Up @@ -745,14 +773,12 @@ impl MQClientInstance {
return false;
}

let result = if self.client_config.use_heartbeat_v2 {
if self.client_config.use_heartbeat_v2 {
unimplemented!("sendHeartbeatToBrokerV2")
} else {
self.send_heartbeat_to_broker_inner(id, broker_name, addr, &heartbeat_data)
.await
};
drop(lock);
result
}
} else {
false
}
Expand Down Expand Up @@ -1059,6 +1085,20 @@ impl MQClientInstance {
}
}
}

async fn is_broker_addr_exist_in_topic_route_table(&self, addr: &str) -> bool {
let topic_route_table = self.topic_route_table.read().await;
for (_, value) in topic_route_table.iter() {
for bd in value.broker_datas.iter() {
for (_, value) in bd.broker_addrs().iter() {
if value.as_str() == addr {
return true;
}
}
}
}
false
}
}

pub fn topic_route_data2topic_publish_info(
Expand Down

0 comments on commit 3ac6cd0

Please sign in to comment.