diff --git a/rocketmq-namesrv/src/processor/default_request_processor.rs b/rocketmq-namesrv/src/processor/default_request_processor.rs index 2412421d..07c69bc8 100644 --- a/rocketmq-namesrv/src/processor/default_request_processor.rs +++ b/rocketmq-namesrv/src/processor/default_request_processor.rs @@ -359,7 +359,7 @@ impl DefaultRequestProcessor { .decode_command_custom_header::() .expect("decode RegisterTopicRequestHeader failed"); if let Some(ref body) = request.body() { - let topic_route_data = SerdeJsonUtils::decode::(body).unwrap(); + let topic_route_data = TopicRouteData::decode(body).unwrap_or_default(); if !topic_route_data.queue_datas.is_empty() { self.route_info_manager .register_topic(request_header.topic, topic_route_data.queue_datas) @@ -371,7 +371,7 @@ impl DefaultRequestProcessor { fn get_kv_list_by_namespace(&self, request: RemotingCommand) -> RemotingCommand { let request_header = request .decode_command_custom_header::() - .unwrap(); + .expect("decode GetKVListByNamespaceRequestHeader failed"); let value = self .kvconfig_manager .get_kv_list_by_namespace(&request_header.namespace); @@ -396,10 +396,10 @@ impl DefaultRequestProcessor { let request_header = request .decode_command_custom_header::() - .unwrap(); + .expect("decode GetTopicsByClusterRequestHeader failed"); let topics_by_cluster = self .route_info_manager - .get_topics_by_cluster(request_header.cluster.as_str()); + .get_topics_by_cluster(&request_header.cluster); RemotingCommand::create_response_command().set_body(topics_by_cluster.encode()) } diff --git a/rocketmq-namesrv/src/route/route_info_manager.rs b/rocketmq-namesrv/src/route/route_info_manager.rs index a44c4ee4..a9feb54c 100644 --- a/rocketmq-namesrv/src/route/route_info_manager.rs +++ b/rocketmq-namesrv/src/route/route_info_manager.rs @@ -762,15 +762,21 @@ impl RouteInfoManager { drop(lock) } - pub(crate) fn register_topic(&mut self, topic: CheetahString, queue_data_vec: Vec) { + pub(crate) fn register_topic(&self, topic: CheetahString, queue_data_vec: Vec) { if queue_data_vec.is_empty() { return; } - + let lock = self.lock.write(); if !self.topic_queue_table.contains_key(&topic) { - self.topic_queue_table.insert(topic.clone(), HashMap::new()); + self.topic_queue_table + .mut_from_ref() + .insert(topic.clone(), HashMap::new()); } - let queue_data_map = self.topic_queue_table.get_mut(&topic).unwrap(); + let queue_data_map = self + .topic_queue_table + .mut_from_ref() + .get_mut(&topic) + .unwrap(); let vec_length = queue_data_vec.len(); for queue_data in queue_data_vec { if !self @@ -785,6 +791,7 @@ impl RouteInfoManager { } queue_data_map.insert(queue_data.broker_name().clone(), queue_data); } + drop(lock); if queue_data_map.len() > vec_length { info!("Topic route already exist.{}, {:?}", topic, queue_data_map) @@ -793,8 +800,9 @@ impl RouteInfoManager { } } - pub(crate) fn get_topics_by_cluster(&self, cluster: &str) -> TopicList { + pub(crate) fn get_topics_by_cluster(&self, cluster: &CheetahString) -> TopicList { let mut topic_list = Vec::new(); + let lock = self.lock.read(); if let Some(broker_name_set) = self.cluster_addr_table.get(cluster) { for broker_name in broker_name_set { for (topic, queue_data_map) in self.topic_queue_table.iter() { @@ -804,6 +812,7 @@ impl RouteInfoManager { } } } + drop(lock); TopicList { topic_list, broker_addr: None, @@ -821,10 +830,6 @@ impl RouteInfoManager { } if !self.broker_addr_table.is_empty() { for broker_addr in self.broker_addr_table.values() { - /*for ip in broker_addr.broker_addrs().values() { - broker_addr_out = Some(ip.clone()); - break; - }*/ let broker_addrs = broker_addr.broker_addrs(); if !broker_addrs.is_empty() { broker_addr_out = Some(broker_addrs.values().next().unwrap().clone());