diff --git a/Cargo.lock b/Cargo.lock index 3d81484e..8236f1de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -628,6 +628,7 @@ dependencies = [ "lock_api", "once_cell", "parking_lot_core", + "serde", ] [[package]] @@ -2011,6 +2012,7 @@ dependencies = [ "cheetah-string", "clap", "criterion", + "dashmap", "dirs", "dns-lookup", "lazy_static", @@ -2159,6 +2161,7 @@ dependencies = [ "bytemuck", "bytes", "cheetah-string", + "dashmap", "flate2", "futures", "futures-util", diff --git a/Cargo.toml b/Cargo.toml index e020c2e7..cb86d663 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,3 +87,4 @@ futures = "0.3" cheetah-string = { version = "0.1.6", features = ["serde", "bytes"] } flate2 = "1.0.35" +dashmap = "6.1.0" diff --git a/rocketmq-broker/Cargo.toml b/rocketmq-broker/Cargo.toml index f6c8ab52..8bc58350 100644 --- a/rocketmq-broker/Cargo.toml +++ b/rocketmq-broker/Cargo.toml @@ -53,6 +53,7 @@ sysinfo = { workspace = true } thiserror = { workspace = true } trait-variant = { workspace = true } cheetah-string = { workspace = true } +dashmap = { workspace = true } [dev-dependencies] mockall = "0.13.1" static_assertions = { version = "1" } diff --git a/rocketmq-broker/src/client/consumer_group_info.rs b/rocketmq-broker/src/client/consumer_group_info.rs index f760ddf9..8689e7f3 100644 --- a/rocketmq-broker/src/client/consumer_group_info.rs +++ b/rocketmq-broker/src/client/consumer_group_info.rs @@ -14,11 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -use std::collections::HashMap; + use std::collections::HashSet; use std::sync::Arc; use cheetah_string::CheetahString; +use dashmap::DashMap; use parking_lot::Mutex; use parking_lot::RwLock; use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere; @@ -36,8 +37,8 @@ use crate::client::client_channel_info::ClientChannelInfo; #[derive(Debug, Clone)] pub struct ConsumerGroupInfo { group_name: CheetahString, - subscription_table: Arc>>, - channel_info_table: Arc>>, + subscription_table: Arc>, + channel_info_table: Arc>, consume_type: Arc>, message_model: Arc>, consume_from_where: Arc>, @@ -53,8 +54,8 @@ impl ConsumerGroupInfo { ) -> Self { ConsumerGroupInfo { group_name: group_name.into(), - subscription_table: Arc::new(RwLock::new(HashMap::new())), - channel_info_table: Arc::new(RwLock::new(HashMap::new())), + subscription_table: Arc::new(DashMap::new()), + channel_info_table: Arc::new(DashMap::new()), consume_type: Arc::new(RwLock::new(consume_type)), message_model: Arc::new(RwLock::new(message_model)), consume_from_where: Arc::new(RwLock::new(consume_from_where)), @@ -65,8 +66,8 @@ impl ConsumerGroupInfo { pub fn with_group_name(group_name: impl Into) -> Self { ConsumerGroupInfo { group_name: group_name.into(), - subscription_table: Arc::new(RwLock::new(HashMap::new())), - channel_info_table: Arc::new(RwLock::new(HashMap::new())), + subscription_table: Arc::new(DashMap::new()), + channel_info_table: Arc::new(DashMap::new()), consume_type: Arc::new(RwLock::new(ConsumeType::ConsumePassively)), message_model: Arc::new(RwLock::new(MessageModel::Clustering)), consume_from_where: Arc::new(RwLock::new(ConsumeFromWhere::ConsumeFromLastOffset)), @@ -75,8 +76,7 @@ impl ConsumerGroupInfo { } pub fn find_channel_by_client_id(&self, client_id: &str) -> Option { - let channel_info_table = self.channel_info_table.read(); - for (_, client_channel_info) in channel_info_table.iter() { + for client_channel_info in self.channel_info_table.iter() { if client_channel_info.client_id() == client_id { return Some(client_channel_info.clone()); } @@ -84,35 +84,37 @@ impl ConsumerGroupInfo { None } - pub fn get_subscription_table(&self) -> Arc>> { + pub fn get_subscription_table(&self) -> Arc> { Arc::clone(&self.subscription_table) } pub fn find_channel_by_channel(&self, channel: &Channel) -> Option { - let channel_info_table = self.channel_info_table.read(); - channel_info_table.get(channel).cloned() + self.channel_info_table + .get(channel) + .map(|item| item.value().clone()) } - pub fn get_channel_info_table(&self) -> Arc>> { + pub fn get_channel_info_table(&self) -> Arc> { Arc::clone(&self.channel_info_table) } pub fn get_all_channels(&self) -> Vec { - let channel_info_table = self.channel_info_table.read(); - channel_info_table.keys().cloned().collect() + self.channel_info_table + .iter() + .map(|item| item.key().clone()) + .collect::>() } pub fn get_all_client_ids(&self) -> Vec { - let channel_info_table = self.channel_info_table.read(); - channel_info_table - .values() - .map(|info| info.client_id().clone()) + self.channel_info_table + .iter() + .map(|info| info.value().client_id().clone()) .collect() } pub fn unregister_channel(&self, client_channel_info: &ClientChannelInfo) -> bool { - let mut channel_info_table = self.channel_info_table.write(); - if channel_info_table + if self + .channel_info_table .remove(client_channel_info.channel()) .is_some() { @@ -128,8 +130,7 @@ impl ConsumerGroupInfo { } pub fn handle_channel_close_event(&self, channel: &Channel) -> Option { - let mut channel_info_table = self.channel_info_table.write(); - if let Some(info) = channel_info_table.remove(channel) { + if let Some((_, info)) = self.channel_info_table.remove(channel) { warn!( "NETTY EVENT: remove not active channel [{:?}] from ConsumerGroupInfo \ groupChannelTable, consumer group: {}", @@ -166,8 +167,7 @@ impl ConsumerGroupInfo { *consume_from_where_lock = consume_from_where; } - let mut channel_info_table = self.channel_info_table.write(); - if let Some(info_old) = channel_info_table.get_mut(info_new.channel()) { + if let Some(mut info_old) = self.channel_info_table.get_mut(info_new.channel()) { if info_old.client_id() != info_new.client_id() { error!( "ConsumerGroupInfo: consumer channel exists in broker, but clientId is not \ @@ -180,7 +180,8 @@ impl ConsumerGroupInfo { } info_old.set_last_update_timestamp(get_current_millis()); } else { - channel_info_table.insert(info_new.channel().clone(), info_new.clone()); + self.channel_info_table + .insert(info_new.channel().clone(), info_new.clone()); info!( "New consumer connected, group: {} channel: {:?}", self.group_name, @@ -198,9 +199,8 @@ impl ConsumerGroupInfo { let mut updated = false; let mut topic_set = HashSet::new(); - let mut subscription_table = self.subscription_table.write(); for sub in sub_list.iter() { - if let Some(old) = subscription_table.get(sub.topic.as_str()) { + if let Some(old) = self.subscription_table.get(sub.topic.as_str()) { if sub.sub_version > old.sub_version { if *self.consume_type.read() == ConsumeType::ConsumePassively { info!( @@ -208,10 +208,12 @@ impl ConsumerGroupInfo { self.group_name, old, sub ); } - subscription_table.insert(sub.topic.clone(), sub.clone()); + self.subscription_table + .insert(sub.topic.clone(), sub.clone()); } } else { - subscription_table.insert(sub.topic.clone(), sub.clone()); + self.subscription_table + .insert(sub.topic.clone(), sub.clone()); info!( "Subscription changed, add new topic, group: {} {}", self.group_name, sub.topic @@ -221,7 +223,7 @@ impl ConsumerGroupInfo { topic_set.insert(sub.topic.clone()); } - subscription_table.retain(|old_topic, _| { + self.subscription_table.retain(|old_topic, _| { if !topic_set.contains(old_topic) { warn!( "Subscription changed, group: {} remove topic {}", @@ -240,13 +242,16 @@ impl ConsumerGroupInfo { } pub fn get_subscribe_topics(&self) -> HashSet { - let subscription_table = self.subscription_table.read(); - subscription_table.keys().cloned().collect() + self.subscription_table + .iter() + .map(|item| item.key().clone()) + .collect() } pub fn find_subscription_data(&self, topic: &CheetahString) -> Option { - let subscription_table = self.subscription_table.read(); - subscription_table.get(topic).cloned() + self.subscription_table + .get(topic) + .map(|item| item.value().clone()) } pub fn get_consume_type(&self) -> ConsumeType { diff --git a/rocketmq-broker/src/client/manager/consumer_manager.rs b/rocketmq-broker/src/client/manager/consumer_manager.rs index 1304c921..cbde5204 100644 --- a/rocketmq-broker/src/client/manager/consumer_manager.rs +++ b/rocketmq-broker/src/client/manager/consumer_manager.rs @@ -113,7 +113,7 @@ impl ConsumerManager { pub fn find_subscription_data_count(&self, group: &CheetahString) -> usize { if let Some(consumer_group_info) = self.get_consumer_group_info(group) { - return consumer_group_info.get_subscription_table().read().len(); + return consumer_group_info.get_subscription_table().len(); } 0 } @@ -150,7 +150,6 @@ impl ConsumerManager { .or_insert_with(|| ConsumerGroupInfo::with_group_name(group.clone())); consumer_group_info .get_subscription_table() - .write() .insert(topic.into(), subscription_data.clone()); } diff --git a/rocketmq-broker/src/processor/admin_broker_processor/consumer_request_handler.rs b/rocketmq-broker/src/processor/admin_broker_processor/consumer_request_handler.rs index 9a310262..8b220e96 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor/consumer_request_handler.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor/consumer_request_handler.rs @@ -69,18 +69,19 @@ impl ConsumerRequestHandler { body_data.set_consume_from_where(consumer_group_info.get_consume_from_where()); body_data.set_consume_type(consumer_group_info.get_consume_type()); body_data.set_message_model(consumer_group_info.get_message_model()); - let subscription_table = - consumer_group_info.get_subscription_table().read().clone(); - body_data - .get_subscription_table() - .extend(subscription_table); + let subscription_table_consumer = consumer_group_info.get_subscription_table(); + let subscription_table = body_data.get_subscription_table(); + for key_value in subscription_table_consumer.iter() { + subscription_table.insert(key_value.key().clone(), key_value.clone()); + } - for (channel, info) in consumer_group_info.get_channel_info_table().read().iter() { + for channel_info in consumer_group_info.get_channel_info_table().iter() { let mut connection = Connection::new(); - connection.set_client_id(info.client_id().clone()); - connection.set_language(info.language()); - connection.set_version(info.version()); - connection.set_client_addr(channel.remote_address().to_string().into()); + connection.set_client_id(channel_info.client_id().clone()); + connection.set_language(channel_info.language()); + connection.set_version(channel_info.version()); + connection + .set_client_addr(channel_info.key().remote_address().to_string().into()); body_data.get_connection_set().insert(connection); } let body = body_data diff --git a/rocketmq-remoting/Cargo.toml b/rocketmq-remoting/Cargo.toml index a90e436b..a85556a8 100644 --- a/rocketmq-remoting/Cargo.toml +++ b/rocketmq-remoting/Cargo.toml @@ -54,5 +54,7 @@ cheetah-string = { workspace = true } bitvec = "1.0.1" bytemuck = "1.21.0" +dashmap = { workspace = true, features = ["serde"] } + [dev-dependencies] bytes = "1.9.0" diff --git a/rocketmq-remoting/src/protocol/body/consumer_connection.rs b/rocketmq-remoting/src/protocol/body/consumer_connection.rs index 062e437d..fd9be24d 100644 --- a/rocketmq-remoting/src/protocol/body/consumer_connection.rs +++ b/rocketmq-remoting/src/protocol/body/consumer_connection.rs @@ -15,11 +15,11 @@ * limitations under the License. */ -use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; use cheetah_string::CheetahString; +use dashmap::DashMap; use parking_lot::RwLock; use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere; use serde::ser::SerializeStruct; @@ -34,7 +34,7 @@ use crate::protocol::heartbeat::subscription_data::SubscriptionData; #[derive(Debug, Clone, Default)] pub struct ConsumerConnection { connection_set: HashSet, - subscription_table: Arc>>, + subscription_table: Arc>, consume_type: Arc>, message_model: Arc>, consume_from_where: Arc>, @@ -44,7 +44,7 @@ impl ConsumerConnection { pub fn new() -> Self { ConsumerConnection { connection_set: HashSet::new(), - subscription_table: Arc::new(RwLock::new(HashMap::new())), + subscription_table: Arc::new(DashMap::new()), consume_type: Arc::new(RwLock::new(ConsumeType::default())), message_model: Arc::new(RwLock::new(MessageModel::default())), consume_from_where: Arc::new(RwLock::new(ConsumeFromWhere::default())), @@ -59,7 +59,7 @@ impl Serialize for ConsumerConnection { { let mut s = serializer.serialize_struct("ConsumerConnection", 5)?; s.serialize_field("connection_set", &self.connection_set)?; - s.serialize_field("subscription_table", &*self.subscription_table.read())?; + s.serialize_field("subscription_table", &*self.subscription_table)?; s.serialize_field("consume_type", &*self.consume_type.read())?; s.serialize_field("message_model", &*self.message_model.read())?; s.serialize_field("consume_from_where", &*self.consume_from_where.read())?; @@ -76,15 +76,15 @@ impl ConsumerConnection { self.connection_set = connection_set; } - pub fn get_subscription_table(&self) -> HashMap { - self.subscription_table.read().clone() + pub fn get_subscription_table(&self) -> Arc> { + self.subscription_table.clone() } pub fn set_subscription_table( &mut self, - subscription_table: HashMap, + subscription_table: DashMap, ) { - *self.subscription_table.write() = subscription_table; + self.subscription_table = Arc::new(subscription_table); } pub fn get_consume_type(&self) -> ConsumeType {