From 3a9020a686a084717a83be80a5c44ecb4d495770 Mon Sep 17 00:00:00 2001 From: mxsm Date: Sat, 20 Jan 2024 00:57:24 +0800 Subject: [PATCH] [ISSUE #64]Namesrv support query data version (request code 322) --- .../processor/default_request_processor.rs | 36 +++++- .../src/route/route_info_manager.rs | 30 +++-- .../src/route_info/broker_addr_info.rs | 8 +- rocketmq-remoting/src/protocol.rs | 4 + .../src/protocol/header/namesrv.rs | 3 +- ..._request_header.rs => kv_config_header.rs} | 0 .../namesrv/query_data_version_header.rs | 122 ++++++++++++++++++ 7 files changed, 188 insertions(+), 15 deletions(-) rename rocketmq-remoting/src/protocol/header/namesrv/{kv_config_request_header.rs => kv_config_header.rs} (100%) create mode 100644 rocketmq-remoting/src/protocol/header/namesrv/query_data_version_header.rs diff --git a/rocketmq-namesrv/src/processor/default_request_processor.rs b/rocketmq-namesrv/src/processor/default_request_processor.rs index f0fbe549..c5cf6f56 100644 --- a/rocketmq-namesrv/src/processor/default_request_processor.rs +++ b/rocketmq-namesrv/src/processor/default_request_processor.rs @@ -30,14 +30,17 @@ use rocketmq_remoting::{ topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper, }, header::namesrv::{ - kv_config_request_header::{ + kv_config_header::{ DeleteKVConfigRequestHeader, GetKVConfigRequestHeader, GetKVConfigResponseHeader, PutKVConfigRequestHeader, }, + query_data_version_header::{ + QueryDataVersionRequestHeader, QueryDataVersionResponseHeader, + }, register_broker_header::{RegisterBrokerRequestHeader, RegisterBrokerResponseHeader}, }, remoting_command::RemotingCommand, - RemotingSerializable, + DataVersion, RemotingSerializable, }, runtime::processor::RequestProcessor, }; @@ -59,6 +62,7 @@ impl RequestProcessor for DefaultRequestProcessor { Some(RequestCode::PutKvConfig) => self.put_kv_config(request), Some(RequestCode::GetKvConfig) => self.get_kv_config(request), Some(RequestCode::DeleteKvConfig) => self.delete_kv_config(request), + Some(RequestCode::QueryDataVersion) => self.query_broker_topic_config(request), //handle register broker Some(RequestCode::RegisterBroker) => self.process_register_broker(request), Some(RequestCode::BrokerHeartbeat) => self.process_broker_heartbeat(request), @@ -128,6 +132,34 @@ impl DefaultRequestProcessor { ); RemotingCommand::create_response_command() } + + fn query_broker_topic_config(&mut self, request: RemotingCommand) -> RemotingCommand { + let request_header = request + .decode_command_custom_header::() + .unwrap(); + let data_version = + DataVersion::decode(request.body().as_ref().map(|v| v.as_ref()).unwrap()); + let changed = self + .route_info_manager + .read() + .is_broker_topic_config_changed( + &request_header.cluster_name, + &request_header.broker_addr, + &data_version, + ); + + let mut command = RemotingCommand::create_response_command().set_command_custom_header( + Some(Box::new(QueryDataVersionResponseHeader::new(changed))), + ); + if let Some(value) = self.route_info_manager.write().query_broker_topic_config( + request_header.cluster_name.as_str(), + request_header.broker_addr.as_str(), + ) { + command = command.set_body(Some(Bytes::from(value.encode()))); + } + + command + } } #[allow(clippy::new_without_default)] diff --git a/rocketmq-namesrv/src/route/route_info_manager.rs b/rocketmq-namesrv/src/route/route_info_manager.rs index 4314423a..4f095cba 100644 --- a/rocketmq-namesrv/src/route/route_info_manager.rs +++ b/rocketmq-namesrv/src/route/route_info_manager.rs @@ -20,9 +20,12 @@ use std::{ time::{Duration, SystemTime}, }; -use rocketmq_common::common::{ - config::TopicConfig, constant::PermName, mix_all, namesrv::namesrv_config::NamesrvConfig, - topic::TopicValidator, +use rocketmq_common::{ + common::{ + config::TopicConfig, constant::PermName, mix_all, namesrv::namesrv_config::NamesrvConfig, + topic::TopicValidator, + }, + TimeUtils, }; use rocketmq_remoting::{ clients::RemoteClient, @@ -474,7 +477,7 @@ impl RouteInfoManager { topic_of_broker } - fn is_topic_config_changed( + pub(crate) fn is_topic_config_changed( &mut self, cluster_name: &str, broker_addr: &str, @@ -498,8 +501,8 @@ impl RouteInfoManager { } } - fn is_broker_topic_config_changed( - &mut self, + pub(crate) fn is_broker_topic_config_changed( + &self, cluster_name: &str, broker_addr: &str, data_version: &DataVersion, @@ -513,8 +516,8 @@ impl RouteInfoManager { false } - fn query_broker_topic_config( - &mut self, + pub(crate) fn query_broker_topic_config( + &self, cluster_name: &str, broker_addr: &str, ) -> Option<&DataVersion> { @@ -613,4 +616,15 @@ impl RouteInfoManager { .collect(); Some(broker_addr_vec) } + + pub(crate) fn update_broker_info_update_timestamp( + &mut self, + cluster_name: impl Into, + broker_addr: impl Into, + ) { + let broker_addr_info = BrokerAddrInfo::new(cluster_name, broker_addr); + if let Some(value) = self.broker_live_table.get_mut(broker_addr_info.as_ref()) { + value.last_update_timestamp = TimeUtils::get_current_millis() as i64; + } + } } diff --git a/rocketmq-namesrv/src/route_info/broker_addr_info.rs b/rocketmq-namesrv/src/route_info/broker_addr_info.rs index 4f41d78c..610b47c1 100644 --- a/rocketmq-namesrv/src/route_info/broker_addr_info.rs +++ b/rocketmq-namesrv/src/route_info/broker_addr_info.rs @@ -98,10 +98,10 @@ impl Display for BrokerStatusChangeInfo { #[derive(Clone, Debug)] pub(crate) struct BrokerLiveInfo { - last_update_timestamp: i64, - heartbeat_timeout_millis: i64, - data_version: DataVersion, - ha_server_addr: String, + pub last_update_timestamp: i64, + pub heartbeat_timeout_millis: i64, + pub data_version: DataVersion, + pub ha_server_addr: String, } impl BrokerLiveInfo { diff --git a/rocketmq-remoting/src/protocol.rs b/rocketmq-remoting/src/protocol.rs index 97008f96..da849e87 100644 --- a/rocketmq-remoting/src/protocol.rs +++ b/rocketmq-remoting/src/protocol.rs @@ -172,6 +172,10 @@ pub struct DataVersion { counter_inner: AtomicI64, } +impl RemotingSerializable for DataVersion { + type Output = DataVersion; +} + impl Clone for DataVersion { fn clone(&self) -> Self { DataVersion { diff --git a/rocketmq-remoting/src/protocol/header/namesrv.rs b/rocketmq-remoting/src/protocol/header/namesrv.rs index 13b84ec5..1930d594 100644 --- a/rocketmq-remoting/src/protocol/header/namesrv.rs +++ b/rocketmq-remoting/src/protocol/header/namesrv.rs @@ -16,5 +16,6 @@ */ pub mod brokerid_change_request_header; -pub mod kv_config_request_header; +pub mod kv_config_header; +pub mod query_data_version_header; pub mod register_broker_header; diff --git a/rocketmq-remoting/src/protocol/header/namesrv/kv_config_request_header.rs b/rocketmq-remoting/src/protocol/header/namesrv/kv_config_header.rs similarity index 100% rename from rocketmq-remoting/src/protocol/header/namesrv/kv_config_request_header.rs rename to rocketmq-remoting/src/protocol/header/namesrv/kv_config_header.rs diff --git a/rocketmq-remoting/src/protocol/header/namesrv/query_data_version_header.rs b/rocketmq-remoting/src/protocol/header/namesrv/query_data_version_header.rs new file mode 100644 index 00000000..e3b215e8 --- /dev/null +++ b/rocketmq-remoting/src/protocol/header/namesrv/query_data_version_header.rs @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +use crate::protocol::command_custom_header::{CommandCustomHeader, FromMap}; + +#[derive(Debug, Clone, Deserialize, Serialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct QueryDataVersionRequestHeader { + pub broker_name: String, + pub broker_addr: String, + pub cluster_name: String, + pub broker_id: u64, +} + +impl QueryDataVersionRequestHeader { + const BROKER_NAME: &'static str = "brokerName"; + const BROKER_ADDR: &'static str = "brokerAddr"; + const CLUSTER_NAME: &'static str = "clusterName"; + const BROKER_ID: &'static str = "brokerId"; + + pub fn new( + broker_name: impl Into, + broker_addr: impl Into, + cluster_name: impl Into, + broker_id: u64, + ) -> Self { + Self { + broker_name: broker_name.into(), + broker_addr: broker_addr.into(), + cluster_name: cluster_name.into(), + broker_id, + } + } +} + +impl CommandCustomHeader for QueryDataVersionRequestHeader { + fn to_map(&self) -> Option> { + Some(HashMap::from([ + (Self::BROKER_NAME.to_string(), self.broker_name.clone()), + (Self::BROKER_ADDR.to_string(), self.broker_addr.clone()), + (Self::CLUSTER_NAME.to_string(), self.cluster_name.clone()), + (Self::BROKER_ID.to_string(), self.broker_id.to_string()), + ])) + } +} + +impl FromMap for QueryDataVersionRequestHeader { + type Target = Self; + + fn from(map: &HashMap) -> Option { + Some(QueryDataVersionRequestHeader { + broker_name: map + .get(QueryDataVersionRequestHeader::BROKER_NAME) + .cloned() + .unwrap_or_default(), + broker_addr: map + .get(QueryDataVersionRequestHeader::BROKER_ADDR) + .cloned() + .unwrap_or_default(), + cluster_name: map + .get(QueryDataVersionRequestHeader::CLUSTER_NAME) + .cloned() + .unwrap_or_default(), + broker_id: map + .get(QueryDataVersionRequestHeader::BROKER_ID) + .and_then(|s| s.parse::().ok()) + .unwrap(), + }) + } +} + +#[derive(Debug, Clone, Deserialize, Serialize, Default)] +pub struct QueryDataVersionResponseHeader { + changed: bool, +} + +impl QueryDataVersionResponseHeader { + const CHANGED: &'static str = "changed"; + + pub fn new(changed: bool) -> Self { + Self { changed } + } +} + +impl CommandCustomHeader for QueryDataVersionResponseHeader { + fn to_map(&self) -> Option> { + Some(HashMap::from([( + Self::CHANGED.to_string(), + self.changed.to_string(), + )])) + } +} + +impl FromMap for QueryDataVersionResponseHeader { + type Target = Self; + + fn from(map: &HashMap) -> Option { + Some(QueryDataVersionResponseHeader { + changed: map + .get(QueryDataVersionResponseHeader::CHANGED) + .and_then(|s| s.parse::().ok()) + .unwrap_or(false), + }) + } +}