Skip to content

Commit

Permalink
[ISSUE #1979]♻️Refactor ConsumerGroupInfo with dashmap🚀 (#1980)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Dec 31, 2024
1 parent 298cf9b commit dcff411
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 55 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,4 @@ futures = "0.3"
cheetah-string = { version = "0.1.6", features = ["serde", "bytes"] }

flate2 = "1.0.35"
dashmap = "6.1.0"
1 change: 1 addition & 0 deletions rocketmq-broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ sysinfo = { workspace = true }
thiserror = { workspace = true }
trait-variant = { workspace = true }
cheetah-string = { workspace = true }
dashmap = { workspace = true }
[dev-dependencies]
mockall = "0.13.1"
static_assertions = { version = "1" }
Expand Down
75 changes: 40 additions & 35 deletions rocketmq-broker/src/client/consumer_group_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::collections::HashMap;

use std::collections::HashSet;
use std::sync::Arc;

use cheetah_string::CheetahString;
use dashmap::DashMap;
use parking_lot::Mutex;
use parking_lot::RwLock;
use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
Expand All @@ -36,8 +37,8 @@ use crate::client::client_channel_info::ClientChannelInfo;
#[derive(Debug, Clone)]
pub struct ConsumerGroupInfo {
group_name: CheetahString,
subscription_table: Arc<RwLock<HashMap<CheetahString, SubscriptionData>>>,
channel_info_table: Arc<RwLock<HashMap<Channel, ClientChannelInfo>>>,
subscription_table: Arc<DashMap<CheetahString, SubscriptionData>>,
channel_info_table: Arc<DashMap<Channel, ClientChannelInfo>>,
consume_type: Arc<RwLock<ConsumeType>>,
message_model: Arc<RwLock<MessageModel>>,
consume_from_where: Arc<RwLock<ConsumeFromWhere>>,
Expand All @@ -53,8 +54,8 @@ impl ConsumerGroupInfo {
) -> Self {
ConsumerGroupInfo {
group_name: group_name.into(),
subscription_table: Arc::new(RwLock::new(HashMap::new())),
channel_info_table: Arc::new(RwLock::new(HashMap::new())),
subscription_table: Arc::new(DashMap::new()),
channel_info_table: Arc::new(DashMap::new()),
consume_type: Arc::new(RwLock::new(consume_type)),
message_model: Arc::new(RwLock::new(message_model)),
consume_from_where: Arc::new(RwLock::new(consume_from_where)),
Expand All @@ -65,8 +66,8 @@ impl ConsumerGroupInfo {
pub fn with_group_name(group_name: impl Into<CheetahString>) -> Self {
ConsumerGroupInfo {
group_name: group_name.into(),
subscription_table: Arc::new(RwLock::new(HashMap::new())),
channel_info_table: Arc::new(RwLock::new(HashMap::new())),
subscription_table: Arc::new(DashMap::new()),
channel_info_table: Arc::new(DashMap::new()),
consume_type: Arc::new(RwLock::new(ConsumeType::ConsumePassively)),
message_model: Arc::new(RwLock::new(MessageModel::Clustering)),
consume_from_where: Arc::new(RwLock::new(ConsumeFromWhere::ConsumeFromLastOffset)),
Expand All @@ -75,44 +76,45 @@ impl ConsumerGroupInfo {
}

pub fn find_channel_by_client_id(&self, client_id: &str) -> Option<ClientChannelInfo> {
let channel_info_table = self.channel_info_table.read();
for (_, client_channel_info) in channel_info_table.iter() {
for client_channel_info in self.channel_info_table.iter() {
if client_channel_info.client_id() == client_id {
return Some(client_channel_info.clone());
}
}
None
}

pub fn get_subscription_table(&self) -> Arc<RwLock<HashMap<CheetahString, SubscriptionData>>> {
pub fn get_subscription_table(&self) -> Arc<DashMap<CheetahString, SubscriptionData>> {
Arc::clone(&self.subscription_table)
}

pub fn find_channel_by_channel(&self, channel: &Channel) -> Option<ClientChannelInfo> {
let channel_info_table = self.channel_info_table.read();
channel_info_table.get(channel).cloned()
self.channel_info_table
.get(channel)
.map(|item| item.value().clone())
}

pub fn get_channel_info_table(&self) -> Arc<RwLock<HashMap<Channel, ClientChannelInfo>>> {
pub fn get_channel_info_table(&self) -> Arc<DashMap<Channel, ClientChannelInfo>> {
Arc::clone(&self.channel_info_table)
}

pub fn get_all_channels(&self) -> Vec<Channel> {
let channel_info_table = self.channel_info_table.read();
channel_info_table.keys().cloned().collect()
self.channel_info_table
.iter()
.map(|item| item.key().clone())
.collect::<Vec<Channel>>()
}

pub fn get_all_client_ids(&self) -> Vec<CheetahString> {
let channel_info_table = self.channel_info_table.read();
channel_info_table
.values()
.map(|info| info.client_id().clone())
self.channel_info_table
.iter()
.map(|info| info.value().client_id().clone())
.collect()
}

pub fn unregister_channel(&self, client_channel_info: &ClientChannelInfo) -> bool {
let mut channel_info_table = self.channel_info_table.write();
if channel_info_table
if self
.channel_info_table
.remove(client_channel_info.channel())
.is_some()
{
Expand All @@ -128,8 +130,7 @@ impl ConsumerGroupInfo {
}

pub fn handle_channel_close_event(&self, channel: &Channel) -> Option<ClientChannelInfo> {
let mut channel_info_table = self.channel_info_table.write();
if let Some(info) = channel_info_table.remove(channel) {
if let Some((_, info)) = self.channel_info_table.remove(channel) {
warn!(
"NETTY EVENT: remove not active channel [{:?}] from ConsumerGroupInfo \
groupChannelTable, consumer group: {}",
Expand Down Expand Up @@ -166,8 +167,7 @@ impl ConsumerGroupInfo {
*consume_from_where_lock = consume_from_where;
}

let mut channel_info_table = self.channel_info_table.write();
if let Some(info_old) = channel_info_table.get_mut(info_new.channel()) {
if let Some(mut info_old) = self.channel_info_table.get_mut(info_new.channel()) {
if info_old.client_id() != info_new.client_id() {
error!(
"ConsumerGroupInfo: consumer channel exists in broker, but clientId is not \
Expand All @@ -180,7 +180,8 @@ impl ConsumerGroupInfo {
}
info_old.set_last_update_timestamp(get_current_millis());
} else {
channel_info_table.insert(info_new.channel().clone(), info_new.clone());
self.channel_info_table
.insert(info_new.channel().clone(), info_new.clone());
info!(
"New consumer connected, group: {} channel: {:?}",
self.group_name,
Expand All @@ -198,20 +199,21 @@ impl ConsumerGroupInfo {
let mut updated = false;
let mut topic_set = HashSet::new();

let mut subscription_table = self.subscription_table.write();
for sub in sub_list.iter() {
if let Some(old) = subscription_table.get(sub.topic.as_str()) {
if let Some(old) = self.subscription_table.get(sub.topic.as_str()) {
if sub.sub_version > old.sub_version {
if *self.consume_type.read() == ConsumeType::ConsumePassively {
info!(
"Subscription changed, group: {} OLD: {:?} NEW: {:?}",
self.group_name, old, sub
);
}
subscription_table.insert(sub.topic.clone(), sub.clone());
self.subscription_table
.insert(sub.topic.clone(), sub.clone());
}
} else {
subscription_table.insert(sub.topic.clone(), sub.clone());
self.subscription_table
.insert(sub.topic.clone(), sub.clone());
info!(
"Subscription changed, add new topic, group: {} {}",
self.group_name, sub.topic
Expand All @@ -221,7 +223,7 @@ impl ConsumerGroupInfo {
topic_set.insert(sub.topic.clone());
}

subscription_table.retain(|old_topic, _| {
self.subscription_table.retain(|old_topic, _| {
if !topic_set.contains(old_topic) {
warn!(
"Subscription changed, group: {} remove topic {}",
Expand All @@ -240,13 +242,16 @@ impl ConsumerGroupInfo {
}

pub fn get_subscribe_topics(&self) -> HashSet<CheetahString> {
let subscription_table = self.subscription_table.read();
subscription_table.keys().cloned().collect()
self.subscription_table
.iter()
.map(|item| item.key().clone())
.collect()
}

pub fn find_subscription_data(&self, topic: &CheetahString) -> Option<SubscriptionData> {
let subscription_table = self.subscription_table.read();
subscription_table.get(topic).cloned()
self.subscription_table
.get(topic)
.map(|item| item.value().clone())
}

pub fn get_consume_type(&self) -> ConsumeType {
Expand Down
3 changes: 1 addition & 2 deletions rocketmq-broker/src/client/manager/consumer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl ConsumerManager {

pub fn find_subscription_data_count(&self, group: &CheetahString) -> usize {
if let Some(consumer_group_info) = self.get_consumer_group_info(group) {
return consumer_group_info.get_subscription_table().read().len();
return consumer_group_info.get_subscription_table().len();
}
0
}
Expand Down Expand Up @@ -150,7 +150,6 @@ impl ConsumerManager {
.or_insert_with(|| ConsumerGroupInfo::with_group_name(group.clone()));
consumer_group_info
.get_subscription_table()
.write()
.insert(topic.into(), subscription_data.clone());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,19 @@ impl ConsumerRequestHandler {
body_data.set_consume_from_where(consumer_group_info.get_consume_from_where());
body_data.set_consume_type(consumer_group_info.get_consume_type());
body_data.set_message_model(consumer_group_info.get_message_model());
let subscription_table =
consumer_group_info.get_subscription_table().read().clone();
body_data
.get_subscription_table()
.extend(subscription_table);
let subscription_table_consumer = consumer_group_info.get_subscription_table();
let subscription_table = body_data.get_subscription_table();
for key_value in subscription_table_consumer.iter() {
subscription_table.insert(key_value.key().clone(), key_value.clone());
}

for (channel, info) in consumer_group_info.get_channel_info_table().read().iter() {
for channel_info in consumer_group_info.get_channel_info_table().iter() {
let mut connection = Connection::new();
connection.set_client_id(info.client_id().clone());
connection.set_language(info.language());
connection.set_version(info.version());
connection.set_client_addr(channel.remote_address().to_string().into());
connection.set_client_id(channel_info.client_id().clone());
connection.set_language(channel_info.language());
connection.set_version(channel_info.version());
connection
.set_client_addr(channel_info.key().remote_address().to_string().into());
body_data.get_connection_set().insert(connection);
}
let body = body_data
Expand Down
2 changes: 2 additions & 0 deletions rocketmq-remoting/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,7 @@ cheetah-string = { workspace = true }
bitvec = "1.0.1"
bytemuck = "1.21.0"

dashmap = { workspace = true, features = ["serde"] }

[dev-dependencies]
bytes = "1.9.0"
16 changes: 8 additions & 8 deletions rocketmq-remoting/src/protocol/body/consumer_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
* limitations under the License.
*/

use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;

use cheetah_string::CheetahString;
use dashmap::DashMap;
use parking_lot::RwLock;
use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
use serde::ser::SerializeStruct;
Expand All @@ -34,7 +34,7 @@ use crate::protocol::heartbeat::subscription_data::SubscriptionData;
#[derive(Debug, Clone, Default)]
pub struct ConsumerConnection {
connection_set: HashSet<Connection>,
subscription_table: Arc<RwLock<HashMap<CheetahString, SubscriptionData>>>,
subscription_table: Arc<DashMap<CheetahString, SubscriptionData>>,
consume_type: Arc<RwLock<ConsumeType>>,
message_model: Arc<RwLock<MessageModel>>,
consume_from_where: Arc<RwLock<ConsumeFromWhere>>,
Expand All @@ -44,7 +44,7 @@ impl ConsumerConnection {
pub fn new() -> Self {
ConsumerConnection {
connection_set: HashSet::new(),
subscription_table: Arc::new(RwLock::new(HashMap::new())),
subscription_table: Arc::new(DashMap::new()),
consume_type: Arc::new(RwLock::new(ConsumeType::default())),
message_model: Arc::new(RwLock::new(MessageModel::default())),
consume_from_where: Arc::new(RwLock::new(ConsumeFromWhere::default())),
Expand All @@ -59,7 +59,7 @@ impl Serialize for ConsumerConnection {
{
let mut s = serializer.serialize_struct("ConsumerConnection", 5)?;
s.serialize_field("connection_set", &self.connection_set)?;
s.serialize_field("subscription_table", &*self.subscription_table.read())?;
s.serialize_field("subscription_table", &*self.subscription_table)?;
s.serialize_field("consume_type", &*self.consume_type.read())?;
s.serialize_field("message_model", &*self.message_model.read())?;
s.serialize_field("consume_from_where", &*self.consume_from_where.read())?;
Expand All @@ -76,15 +76,15 @@ impl ConsumerConnection {
self.connection_set = connection_set;
}

pub fn get_subscription_table(&self) -> HashMap<CheetahString, SubscriptionData> {
self.subscription_table.read().clone()
pub fn get_subscription_table(&self) -> Arc<DashMap<CheetahString, SubscriptionData>> {
self.subscription_table.clone()
}

pub fn set_subscription_table(
&mut self,
subscription_table: HashMap<CheetahString, SubscriptionData>,
subscription_table: DashMap<CheetahString, SubscriptionData>,
) {
*self.subscription_table.write() = subscription_table;
self.subscription_table = Arc::new(subscription_table);
}

pub fn get_consume_type(&self) -> ConsumeType {
Expand Down

0 comments on commit dcff411

Please sign in to comment.