From f4f113a898349560c75e5642154d65ee7fdaa709 Mon Sep 17 00:00:00 2001 From: mxsm Date: Sun, 21 Jan 2024 16:07:18 +0800 Subject: [PATCH] [ISSUE #76] :sparkles: Namesrv supports get all topic list from name server(request code 206) --- README.md | 2 +- rocketmq-namesrv/README.md | 34 +++++++++---------- .../processor/default_request_processor.rs | 15 ++++++++ .../src/route/route_info_manager.rs | 14 ++++++++ rocketmq-remoting/src/protocol/body.rs | 1 + rocketmq-remoting/src/protocol/body/topic.rs | 18 ++++++++++ .../src/protocol/body/topic/topic_list.rs | 30 ++++++++++++++++ 7 files changed, 96 insertions(+), 18 deletions(-) create mode 100644 rocketmq-remoting/src/protocol/body/topic.rs create mode 100644 rocketmq-remoting/src/protocol/body/topic/topic_list.rs diff --git a/README.md b/README.md index ee1c1bd5..f98ccf84 100644 --- a/README.md +++ b/README.md @@ -95,7 +95,7 @@ Feature list: | Get broker cluster info | 106 | :sparkling_heart: :white_check_mark: | | | Wipe write perm of boker | 205 | :sparkling_heart: :white_check_mark: | | | Add write perm of brober | 327 | :sparkling_heart: :white_check_mark: | | -| Get all topic list from name server | 206 | :broken_heart: :x: | | +| Get all topic list from name server | 206 | :sparkling_heart: :white_check_mark: | | | Delete topic in name server | 216 | :broken_heart: :x: | | | Register topic in name server | 217 | :broken_heart: :x: | | | Get topics by cluster | 224 | :broken_heart: :x: | | diff --git a/rocketmq-namesrv/README.md b/rocketmq-namesrv/README.md index c9df6036..9659d6ef 100644 --- a/rocketmq-namesrv/README.md +++ b/rocketmq-namesrv/README.md @@ -14,30 +14,30 @@ Feature list: - **Perfect support**: :sparkling_heart: :white_check_mark: -| Feature | request code | Support | remark | -| -------------------------------------- | ------------ | -------------- | ------ | -| Put KV Config | 100 | :sparkling_heart: :white_check_mark: | | +| Feature | request code | Support | remark | +| -------------------------------------- | ------------ |--------------------------------------|--------| +| Put KV Config | 100 | :sparkling_heart: :white_check_mark: | | | Get KV Config | 101 | :sparkling_heart: :white_check_mark: | | | Delete KV Config | 102 | :sparkling_heart: :white_check_mark: | | -| Get kv list by namespace | 219 | :broken_heart: :x: | | -| Query Data Version | 322 | :sparkling_heart: :white_check_mark:| | -| Register Broker | 103 | :heart: :white_check_mark: | | -| Unregister Broker | 104 | :broken_heart: :x: | | +| Get kv list by namespace | 219 | :broken_heart: :x: | | +| Query Data Version | 322 | :sparkling_heart: :white_check_mark: | | +| Register Broker | 103 | :heart: :white_check_mark: | | +| Unregister Broker | 104 | :broken_heart: :x: | | | Broker Heartbeat | 904 | :sparkling_heart: :white_check_mark: | | | Get broker member_group | 901 | :sparkling_heart: :white_check_mark: | | | Get broker cluster info | 106 | :sparkling_heart: :white_check_mark: | | | Wipe write perm of boker | 205 | :sparkling_heart: :white_check_mark: | | | Add write perm of brober | 327 | :sparkling_heart: :white_check_mark: | | -| Get all topic list from name server | 206 | :broken_heart: :x: | | -| Delete topic in name server | 216 | :broken_heart: :x: | | -| Register topic in name server | 217 | :broken_heart: :x: | | -| Get topics by cluster | 224 | :broken_heart: :x: | | -| Get system topic list from name server | 304 | :broken_heart: :x: | | -| Get unit topic list | 311 | :broken_heart: :x: | | -| Get has unit sub topic list | 312 | :broken_heart: :x: | | -| Get has unit sub ununit topic list | 313 | :broken_heart: :x: | | -| Update name server config | 318 | :broken_heart: :x: | | -| Get name server config | 318 | :broken_heart: :x: | | +| Get all topic list from name server | 206 | :sparkling_heart: :white_check_mark: | | +| Delete topic in name server | 216 | :broken_heart: :x: | | +| Register topic in name server | 217 | :broken_heart: :x: | | +| Get topics by cluster | 224 | :broken_heart: :x: | | +| Get system topic list from name server | 304 | :broken_heart: :x: | | +| Get unit topic list | 311 | :broken_heart: :x: | | +| Get has unit sub topic list | 312 | :broken_heart: :x: | | +| Get has unit sub ununit topic list | 313 | :broken_heart: :x: | | +| Update name server config | 318 | :broken_heart: :x: | | +| Get name server config | 318 | :broken_heart: :x: | | ## Getting Started diff --git a/rocketmq-namesrv/src/processor/default_request_processor.rs b/rocketmq-namesrv/src/processor/default_request_processor.rs index 3e965b5d..e9722ce9 100644 --- a/rocketmq-namesrv/src/processor/default_request_processor.rs +++ b/rocketmq-namesrv/src/processor/default_request_processor.rs @@ -83,6 +83,9 @@ impl RequestProcessor for DefaultRequestProcessor { Some(RequestCode::GetBrokerClusterInfo) => self.get_broker_cluster_info(request), Some(RequestCode::WipeWritePermOfBroker) => self.wipe_write_perm_of_broker(request), Some(RequestCode::AddWritePermOfBroker) => self.add_write_perm_of_broker(request), + Some(RequestCode::GetAllTopicListFromNameserver) => { + self.get_all_topic_list_from_nameserver(request) + } _ => RemotingCommand::create_response_command_with_code( RemotingSysResponseCode::SystemError, @@ -348,6 +351,18 @@ impl DefaultRequestProcessor { AddWritePermOfBrokerResponseHeader::new(add_topic_cnt), ))) } + + fn get_all_topic_list_from_nameserver(&mut self, _request: RemotingCommand) -> RemotingCommand { + let rd_lock = self.route_info_manager.read(); + if rd_lock.namesrv_config.enable_all_topic_list { + let topics = rd_lock.get_all_topic_list(); + drop(rd_lock); //release lock + return RemotingCommand::create_response_command() + .set_body(Some(Bytes::from(topics.encode()))); + } + RemotingCommand::create_response_command_with_code(RemotingSysResponseCode::SystemError) + .set_remark(Some(String::from("disable"))) + } } fn extract_register_topic_config_from_request( diff --git a/rocketmq-namesrv/src/route/route_info_manager.rs b/rocketmq-namesrv/src/route/route_info_manager.rs index 9f8cbd5c..76d0b17e 100644 --- a/rocketmq-namesrv/src/route/route_info_manager.rs +++ b/rocketmq-namesrv/src/route/route_info_manager.rs @@ -33,6 +33,7 @@ use rocketmq_remoting::{ protocol::{ body::{ broker_body::{broker_member_group::BrokerMemberGroup, cluster_info::ClusterInfo}, + topic::topic_list::TopicList, topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper, }, header::namesrv::brokerid_change_request_header::NotifyMinBrokerIdChangeRequestHeader, @@ -675,4 +676,17 @@ impl RouteInfoManager { } topic_cnt } + + pub(crate) fn get_all_topic_list(&self) -> TopicList { + let topics = self + .topic_queue_table + .keys() + .cloned() + .collect::>(); + + TopicList { + topic_list: topics, + broker_addr: None, + } + } } diff --git a/rocketmq-remoting/src/protocol/body.rs b/rocketmq-remoting/src/protocol/body.rs index c1e8a47d..c898341f 100644 --- a/rocketmq-remoting/src/protocol/body.rs +++ b/rocketmq-remoting/src/protocol/body.rs @@ -17,4 +17,5 @@ pub mod broker_body; pub mod kv_table; +pub mod topic; pub mod topic_info_wrapper; diff --git a/rocketmq-remoting/src/protocol/body/topic.rs b/rocketmq-remoting/src/protocol/body/topic.rs new file mode 100644 index 00000000..9e883793 --- /dev/null +++ b/rocketmq-remoting/src/protocol/body/topic.rs @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +pub mod topic_list; diff --git a/rocketmq-remoting/src/protocol/body/topic/topic_list.rs b/rocketmq-remoting/src/protocol/body/topic/topic_list.rs new file mode 100644 index 00000000..83781c37 --- /dev/null +++ b/rocketmq-remoting/src/protocol/body/topic/topic_list.rs @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use serde::{Deserialize, Serialize}; + +use crate::protocol::RemotingSerializable; + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct TopicList { + pub topic_list: Vec, + pub broker_addr: Option, +} + +impl RemotingSerializable for TopicList { + type Output = TopicList; +}