From 2775a6ee2076103c017ed8678c60cad01d767c23 Mon Sep 17 00:00:00 2001 From: mxsm Date: Fri, 12 Jul 2024 09:49:20 +0000 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#765]=E2=99=BB=EF=B8=8FRefactor=20Cons?= =?UTF-8?q?umeQueueStore=F0=9F=9A=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rocketmq-common/src/lib.rs | 108 ++++++++++++- rocketmq-store/src/log_file.rs | 8 +- .../message_store/default_message_store.rs | 36 ++--- rocketmq-store/src/queue.rs | 33 ++-- .../src/queue/batch_consume_queue.rs | 2 +- .../queue/local_file_consume_queue_store.rs | 144 +++++++----------- .../src/queue/single_consume_queue.rs | 4 +- 7 files changed, 200 insertions(+), 135 deletions(-) diff --git a/rocketmq-common/src/lib.rs b/rocketmq-common/src/lib.rs index 31b5bb96..180c5c9b 100644 --- a/rocketmq-common/src/lib.rs +++ b/rocketmq-common/src/lib.rs @@ -17,6 +17,12 @@ #![allow(dead_code)] #![allow(unused_imports)] +#![feature(sync_unsafe_cell)] + +use std::cell::SyncUnsafeCell; +use std::ops::Deref; +use std::ops::DerefMut; +use std::sync::Arc; pub use crate::common::attribute::topic_attributes as TopicAttributes; pub use crate::common::message::message_accessor as MessageAccessor; @@ -40,5 +46,105 @@ pub mod log; mod thread_pool; pub mod utils; +pub struct ArcCellWrapper { + inner: Arc>, +} + +impl ArcCellWrapper { + #[inline] + pub fn new(value: T) -> Self { + Self { + inner: Arc::new(SyncUnsafeCell::new(value)), + } + } +} + +impl Clone for ArcCellWrapper { + fn clone(&self) -> Self { + ArcCellWrapper { + inner: Arc::clone(&self.inner), + } + } +} + +impl AsRef for ArcCellWrapper { + fn as_ref(&self) -> &T { + unsafe { &*self.inner.get() } + } +} + +impl AsMut for ArcCellWrapper { + fn as_mut(&mut self) -> &mut T { + unsafe { &mut *self.inner.get() } + } +} + +impl Deref for ArcCellWrapper { + type Target = T; + fn deref(&self) -> &Self::Target { + self.as_ref() + } +} + +impl DerefMut for ArcCellWrapper { + fn deref_mut(&mut self) -> &mut Self::Target { + self.as_mut() + } +} + #[cfg(test)] -mod tests {} +mod arc_cell_wrapper_tests { + use std::sync::Arc; + + use super::*; + + #[test] + fn new_creates_arc_cell_wrapper_with_provided_value() { + let wrapper = ArcCellWrapper::new(10); + assert_eq!(*wrapper.as_ref(), 10); + } + + #[test] + fn clone_creates_a_new_instance_with_same_value() { + let wrapper = ArcCellWrapper::new(20); + let cloned_wrapper = wrapper.clone(); + assert_eq!(*cloned_wrapper.as_ref(), 20); + } + + #[test] + fn as_ref_returns_immutable_reference_to_value() { + let wrapper = ArcCellWrapper::new(30); + assert_eq!(*wrapper.as_ref(), 30); + } + + #[test] + fn as_mut_returns_mutable_reference_to_value() { + let mut wrapper = ArcCellWrapper::new(40); + *wrapper.as_mut() = 50; + assert_eq!(*wrapper.as_ref(), 50); + } + + #[test] + fn deref_returns_reference_to_inner_value() { + let wrapper = ArcCellWrapper::new(60); + assert_eq!(*wrapper, 60); + } + + #[test] + fn deref_mut_allows_modification_of_inner_value() { + let mut wrapper = ArcCellWrapper::new(70); + *wrapper = 80; + assert_eq!(*wrapper, 80); + } + + #[test] + fn multiple_clones_share_the_same_underlying_data() { + let wrapper = ArcCellWrapper::new(Arc::new(90)); + let cloned_wrapper1 = wrapper.clone(); + let cloned_wrapper2 = wrapper.clone(); + + assert_eq!(Arc::strong_count(wrapper.as_ref()), 3); + assert_eq!(Arc::strong_count(cloned_wrapper1.as_ref()), 3); + assert_eq!(Arc::strong_count(cloned_wrapper2.as_ref()), 3); + } +} diff --git a/rocketmq-store/src/log_file.rs b/rocketmq-store/src/log_file.rs index ee78fb63..8284f1fa 100644 --- a/rocketmq-store/src/log_file.rs +++ b/rocketmq-store/src/log_file.rs @@ -26,7 +26,7 @@ use crate::base::get_message_result::GetMessageResult; use crate::base::message_result::PutMessageResult; use crate::filter::MessageFilter; use crate::hook::put_message_hook::BoxedPutMessageHook; -use crate::queue::ConsumeQueueTrait; +use crate::queue::ArcConsumeQueue; use crate::stats::broker_stats_manager::BrokerStatsManager; use crate::store::running_flags::RunningFlags; @@ -113,11 +113,7 @@ pub trait RocketMQMessageStore: Clone + 'static { fn notify_message_arrive_if_necessary(&self, dispatch_request: &mut DispatchRequest); - fn find_consume_queue( - &self, - topic: &str, - queue_id: i32, - ) -> Option>>>; + fn find_consume_queue(&self, topic: &str, queue_id: i32) -> Option; fn delete_topics(&self, delete_topics: Vec); } diff --git a/rocketmq-store/src/message_store/default_message_store.rs b/rocketmq-store/src/message_store/default_message_store.rs index 0cbdbd7f..40af679e 100644 --- a/rocketmq-store/src/message_store/default_message_store.rs +++ b/rocketmq-store/src/message_store/default_message_store.rs @@ -80,8 +80,8 @@ use crate::log_file::MessageStore; use crate::log_file::MAX_PULL_MSG_SIZE; use crate::queue::build_consume_queue::CommitLogDispatcherBuildConsumeQueue; use crate::queue::local_file_consume_queue_store::ConsumeQueueStore; +use crate::queue::ArcConsumeQueue; use crate::queue::ConsumeQueueStoreTrait; -use crate::queue::ConsumeQueueTrait; use crate::stats::broker_stats_manager::BrokerStatsManager; use crate::store::running_flags::RunningFlags; use crate::store_path_config_helper::get_abort_file; @@ -728,10 +728,11 @@ impl MessageStore for DefaultMessageStore { committed: bool, ) -> i64 { if committed { - self.consume_queue_store - .find_or_create_consume_queue(topic, queue_id) - .lock() - .get_max_offset_in_queue() + let queue = self + .consume_queue_store + .find_or_create_consume_queue(topic, queue_id); + + queue.get_max_offset_in_queue() } else { self.consume_queue_store .get_max_offset(topic, queue_id) @@ -785,8 +786,8 @@ impl MessageStore for DefaultMessageStore { let max_offset_py = self.commit_log.get_max_offset(); let consume_queue = self.find_consume_queue(topic, queue_id); if let Some(consume_queue) = consume_queue { - min_offset = consume_queue.lock().get_min_offset_in_queue(); - max_offset = consume_queue.lock().get_max_offset_in_queue(); + min_offset = consume_queue.get_min_offset_in_queue(); + max_offset = consume_queue.get_max_offset_in_queue(); if max_offset == 0 { status = GetMessageStatus::NoMessageInQueue; next_begin_offset = self.next_offset_correction(offset, 0); @@ -803,7 +804,7 @@ impl MessageStore for DefaultMessageStore { let max_filter_message_size = self .message_store_config .max_filter_message_size - .max(max_msg_nums * consume_queue.lock().get_unit_size()); + .max(max_msg_nums * consume_queue.get_unit_size()); let disk_fall_recorded = self.message_store_config.disk_fall_recorded; let mut max_pull_size = max_total_msg_size.max(100); if max_pull_size > MAX_PULL_MSG_SIZE { @@ -824,15 +825,14 @@ impl MessageStore for DefaultMessageStore { .travel_cq_file_num_when_get_message { cq_file_num += 1; - let buffer_consume_queue = consume_queue - .lock() - .iterate_from_inner(next_begin_offset, max_msg_nums); + let buffer_consume_queue = + consume_queue.iterate_from_inner(next_begin_offset, max_msg_nums); if buffer_consume_queue.is_none() { status = GetMessageStatus::OffsetFoundNull; next_begin_offset = self.next_offset_correction( next_begin_offset, self.consume_queue_store - .roll_next_file(&**consume_queue.lock(), next_begin_offset), + .roll_next_file(&**consume_queue, next_begin_offset), ); warn!( "consumer request topic: {}, offset: {}, minOffset: {}, maxOffset: \ @@ -856,7 +856,7 @@ impl MessageStore for DefaultMessageStore { &self.message_store_config, ); if (cq_unit.queue_offset - offset) - * consume_queue.lock().get_unit_size() as i64 + * consume_queue.get_unit_size() as i64 > max_filter_message_size as i64 { break; @@ -996,7 +996,7 @@ impl MessageStore for DefaultMessageStore { let consume_queue = self .consume_queue_store .find_or_create_consume_queue(topic, queue_id); - let first_cqitem = consume_queue.lock().get(consume_offset); + let first_cqitem = consume_queue.get(consume_offset); if first_cqitem.is_none() { return false; } @@ -1006,7 +1006,7 @@ impl MessageStore for DefaultMessageStore { let size = cq.size; return self.check_in_mem_by_commit_offset(start_offset_py, size); } - let last_cqitem = consume_queue.lock().get(consume_offset + batch_size as i64); + let last_cqitem = consume_queue.get(consume_offset + batch_size as i64); if last_cqitem.is_none() { let size = cq.size; return self.check_in_mem_by_commit_offset(start_offset_py, size); @@ -1033,11 +1033,7 @@ impl MessageStore for DefaultMessageStore { } } - fn find_consume_queue( - &self, - topic: &str, - queue_id: i32, - ) -> Option>>> { + fn find_consume_queue(&self, topic: &str, queue_id: i32) -> Option { Some( self.consume_queue_store .find_or_create_consume_queue(topic, queue_id), diff --git a/rocketmq-store/src/queue.rs b/rocketmq-store/src/queue.rs index 2ddf91ef..7ec76c23 100644 --- a/rocketmq-store/src/queue.rs +++ b/rocketmq-store/src/queue.rs @@ -14,19 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -mod batch_consume_queue; -pub mod build_consume_queue; -mod consume_queue_ext; -pub mod local_file_consume_queue_store; -mod queue_offset_operator; -pub mod single_consume_queue; - use std::collections::HashMap; -use std::sync::Arc; use rocketmq_common::common::attribute::cq_type::CQType; use rocketmq_common::common::boundary_type::BoundaryType; use rocketmq_common::common::message::message_single::MessageExtBrokerInner; +use rocketmq_common::ArcCellWrapper; use crate::base::dispatch_request::DispatchRequest; use crate::base::swappable::Swappable; @@ -35,6 +28,17 @@ use crate::filter::MessageFilter; use crate::queue::consume_queue_ext::ConsumeQueueExt; use crate::queue::queue_offset_operator::QueueOffsetOperator; +mod batch_consume_queue; +pub mod build_consume_queue; +mod consume_queue_ext; +pub mod local_file_consume_queue_store; +mod queue_offset_operator; +pub mod single_consume_queue; + +//pub type ArcConsumeQueue = Arc>>; +pub type ArcConsumeQueue = ArcCellWrapper>; +pub type ConsumeQueueTable = parking_lot::Mutex>>; + /// FileQueueLifeCycle contains life cycle methods of ConsumerQueue that is directly implemented by /// FILE. pub trait FileQueueLifeCycle: Swappable { @@ -263,19 +267,12 @@ pub trait ConsumeQueueStoreTrait: Send + Sync { /// `topic`: Topic. /// `queue_id`: Queue ID. /// Returns the consumeQueue. - fn find_or_create_consume_queue( - &self, - topic: &str, - queue_id: i32, - ) -> Arc>>; + fn find_or_create_consume_queue(&self, topic: &str, queue_id: i32) -> ArcConsumeQueue; /// Find the consumeQueueMap of topic. /// `topic`: Topic. /// Returns the consumeQueueMap of topic. - fn find_consume_queue_map( - &self, - topic: &str, - ) -> Option>>; + fn find_consume_queue_map(&self, topic: &str) -> Option>; /// Get the total size of all consumeQueue. /// Returns the total size of all consumeQueue. @@ -293,7 +290,7 @@ pub trait ConsumeQueueStoreTrait: Send + Sync { /// Trait representing ConsumeQueueInterface. pub trait ConsumeQueueTrait: Send + Sync + FileQueueLifeCycle { /// Get the topic name. - fn get_topic(&self) -> String; + fn get_topic(&self) -> &str; /// Get queue id. fn get_queue_id(&self) -> i32; diff --git a/rocketmq-store/src/queue/batch_consume_queue.rs b/rocketmq-store/src/queue/batch_consume_queue.rs index 2a811e03..e0f37498 100644 --- a/rocketmq-store/src/queue/batch_consume_queue.rs +++ b/rocketmq-store/src/queue/batch_consume_queue.rs @@ -197,7 +197,7 @@ impl Swappable for BatchConsumeQueue { } impl ConsumeQueueTrait for BatchConsumeQueue { - fn get_topic(&self) -> String { + fn get_topic(&self) -> &str { todo!() } diff --git a/rocketmq-store/src/queue/local_file_consume_queue_store.rs b/rocketmq-store/src/queue/local_file_consume_queue_store.rs index 7382f414..3558c666 100644 --- a/rocketmq-store/src/queue/local_file_consume_queue_store.rs +++ b/rocketmq-store/src/queue/local_file_consume_queue_store.rs @@ -27,6 +27,7 @@ use rocketmq_common::common::broker::broker_config::BrokerConfig; use rocketmq_common::common::config::TopicConfig; use rocketmq_common::common::message::message_single::MessageExtBrokerInner; use rocketmq_common::utils::queue_type_utils::QueueTypeUtils; +use rocketmq_common::ArcCellWrapper; use tracing::info; use crate::base::dispatch_request::DispatchRequest; @@ -35,7 +36,9 @@ use crate::config::message_store_config::MessageStoreConfig; use crate::queue::batch_consume_queue::BatchConsumeQueue; use crate::queue::queue_offset_operator::QueueOffsetOperator; use crate::queue::single_consume_queue::ConsumeQueue; +use crate::queue::ArcConsumeQueue; use crate::queue::ConsumeQueueStoreTrait; +use crate::queue::ConsumeQueueTable; use crate::queue::ConsumeQueueTrait; use crate::queue::CqUnit; use crate::store::running_flags::RunningFlags; @@ -44,17 +47,13 @@ use crate::store_path_config_helper::get_store_path_consume_queue; #[derive(Clone)] pub struct ConsumeQueueStore { - inner: Arc, + inner: Arc, running_flags: Arc, store_checkpoint: Arc, topic_config_table: Arc>>, } -type ConsumeQueueTable = parking_lot::Mutex< - HashMap>>>>, ->; - -struct ConsumeQueueStoreInner { +struct Inner { // commit_log: Arc>, pub(crate) message_store_config: Arc, pub(crate) broker_config: Arc, @@ -62,7 +61,7 @@ struct ConsumeQueueStoreInner { pub(crate) consume_queue_table: ConsumeQueueTable, } -impl ConsumeQueueStoreInner { +impl Inner { fn put_message_position_info_wrapper( &self, consume_queue: &mut dyn ConsumeQueueTrait, @@ -81,7 +80,7 @@ impl ConsumeQueueStore { store_checkpoint: Arc, ) -> Self { Self { - inner: Arc::new(ConsumeQueueStoreInner { + inner: Arc::new(Inner { //commit_log, message_store_config, broker_config, @@ -122,14 +121,11 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore { fn recover(&mut self) { let mut mutex = self.inner.consume_queue_table.lock().clone(); for (_topic, consume_queue_table) in mutex.iter_mut() { - for (_queue_id, consume_queue) in consume_queue_table.iter_mut() { - //consume_queue.lock().recover(); - let lock = consume_queue.lock(); - let queue_id = lock.get_queue_id(); - let topic = lock.get_topic(); - drop(lock); - let file_queue_life_cycle = self.get_life_cycle(topic.as_str(), queue_id); - file_queue_life_cycle.lock().recover(); + for (_queue_id, consume_queue) in consume_queue_table.iter() { + let queue_id = consume_queue.get_queue_id(); + let topic = consume_queue.get_topic(); + let mut file_queue_life_cycle = self.get_life_cycle(topic, queue_id); + file_queue_life_cycle.recover(); } } } @@ -146,12 +142,10 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore { let mutex = self.inner.consume_queue_table.lock().clone(); for consume_queue_table in mutex.values() { for consume_queue in consume_queue_table.values() { - let lock = consume_queue.lock(); - let queue_id = lock.get_queue_id(); - let topic = lock.get_topic(); - drop(lock); - let file_queue_life_cycle = self.get_life_cycle(topic.as_str(), queue_id); - file_queue_life_cycle.lock().destroy(); + let queue_id = consume_queue.get_queue_id(); + let topic = consume_queue.get_topic(); + let mut file_queue_life_cycle = self.get_life_cycle(topic, queue_id); + file_queue_life_cycle.destroy(); } } } @@ -196,18 +190,16 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore { let cloned = self.inner.consume_queue_table.lock().clone(); for consume_queue_table in cloned.values() { for logic in consume_queue_table.values() { - let lock = logic.lock(); - let topic = lock.get_topic(); - let queue_id = lock.get_queue_id(); - drop(lock); - self.truncate_dirty_logic_files(topic.as_str(), queue_id, offset_to_truncate); + let topic = logic.get_topic(); + let queue_id = logic.get_queue_id(); + self.truncate_dirty_logic_files(topic, queue_id, offset_to_truncate); } } } fn put_message_position_info_wrapper(&self, request: &DispatchRequest) { - let cq = self.find_or_create_consume_queue(request.topic.as_str(), request.queue_id); - self.put_message_position_info_wrapper_with_cq(cq.lock().as_mut(), request); + let mut cq = self.find_or_create_consume_queue(request.topic.as_str(), request.queue_id); + self.put_message_position_info_wrapper_with_cq(&mut **cq.as_mut(), request); // println!("put_message_position_info_wrapper-----{}", request.topic) } @@ -236,18 +228,12 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore { fn increase_queue_offset(&self, msg: &MessageExtBrokerInner, message_num: i16) { let consume_queue = self.find_or_create_consume_queue(msg.topic(), msg.queue_id()); - consume_queue.lock().increase_queue_offset( - &self.inner.queue_offset_operator, - msg, - message_num, - ); + consume_queue.increase_queue_offset(&self.inner.queue_offset_operator, msg, message_num); } fn assign_queue_offset(&self, msg: &mut MessageExtBrokerInner) { let consume_queue = self.find_or_create_consume_queue(msg.topic(), msg.queue_id()); - consume_queue - .lock() - .assign_queue_offset(&self.inner.queue_offset_operator, msg); + consume_queue.assign_queue_offset(&self.inner.queue_offset_operator, msg); } fn increase_lmq_offset(&mut self, queue_key: &str, message_num: i16) { @@ -262,16 +248,19 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore { let mut cq_offset_table = HashMap::with_capacity(1024); let mut bcq_offset_table = HashMap::with_capacity(1024); for (topic, consume_queue_table) in self.inner.consume_queue_table.lock().iter_mut() { - for (queue_id, consume_queue) in consume_queue_table.iter_mut() { - let guard = consume_queue.lock(); - let key = format!("{}-{}", guard.get_topic(), guard.get_queue_id()); - let max_offset_in_queue = guard.get_max_offset_in_queue(); - if guard.get_cq_type() == CQType::SimpleCQ { + for (queue_id, consume_queue) in consume_queue_table.iter() { + let key = format!( + "{}-{}", + consume_queue.get_topic(), + consume_queue.get_queue_id() + ); + let max_offset_in_queue = consume_queue.get_max_offset_in_queue(); + if consume_queue.get_cq_type() == CQType::SimpleCQ { cq_offset_table.insert(key, max_offset_in_queue); } else { bcq_offset_table.insert(key, max_offset_in_queue); } - self.correct_min_offset(&**guard, min_phy_offset) + self.correct_min_offset(&***consume_queue, min_phy_offset) } } if self.inner.message_store_config.duplication_enable @@ -308,8 +297,7 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore { let mut max_physic_offset = -1i64; for (topic, consume_queue_table) in self.inner.consume_queue_table.lock().iter() { for (queue_id, consume_queue) in consume_queue_table.iter() { - let max_physic_offset_in_consume_queue = - consume_queue.lock().get_max_physic_offset(); + let max_physic_offset_in_consume_queue = consume_queue.get_max_physic_offset(); if max_physic_offset_in_consume_queue > max_physic_offset { max_physic_offset = max_physic_offset_in_consume_queue; } @@ -326,11 +314,7 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore { ) } - fn find_or_create_consume_queue( - &self, - topic: &str, - queue_id: i32, - ) -> Arc>> { + fn find_or_create_consume_queue(&self, topic: &str, queue_id: i32) -> ArcConsumeQueue { let mut consume_queue_table = self.inner.consume_queue_table.lock(); let topic_map = consume_queue_table.entry(topic.to_string()).or_default(); @@ -346,7 +330,7 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore { .get(&topic.to_string()) .cloned(); match QueueTypeUtils::get_cq_type(&option) { - CQType::SimpleCQ => Arc::new(parking_lot::Mutex::new(Box::new(ConsumeQueue::new( + CQType::SimpleCQ => ArcCellWrapper::new(Box::new(ConsumeQueue::new( topic.to_string(), queue_id, get_store_path_consume_queue( @@ -358,21 +342,19 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore { self.inner.message_store_config.clone(), self.running_flags.clone(), self.store_checkpoint.clone(), - )))), - CQType::BatchCQ => { - Arc::new(parking_lot::Mutex::new(Box::new(BatchConsumeQueue::new( - topic.to_string(), - queue_id, - get_store_path_batch_consume_queue( - self.inner.message_store_config.store_path_root_dir.as_str(), - ), - self.inner - .message_store_config - .mapper_file_size_batch_consume_queue, - None, - self.inner.message_store_config.clone(), - )))) - } + ))), + CQType::BatchCQ => ArcCellWrapper::new(Box::new(BatchConsumeQueue::new( + topic.to_string(), + queue_id, + get_store_path_batch_consume_queue( + self.inner.message_store_config.store_path_root_dir.as_str(), + ), + self.inner + .message_store_config + .mapper_file_size_batch_consume_queue, + None, + self.inner.message_store_config.clone(), + ))), CQType::RocksDBCQ => { unimplemented!() } @@ -381,10 +363,7 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore { consume_queue.clone() } - fn find_consume_queue_map( - &self, - topic: &str, - ) -> Option>> { + fn find_consume_queue_map(&self, topic: &str) -> Option> { todo!() } @@ -397,9 +376,8 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore { } fn get_min_offset_in_queue(&self, topic: &str, queue_id: i32) -> i64 { - self.find_or_create_consume_queue(topic, queue_id) - .lock() - .get_min_offset_in_queue() + let queue = self.find_or_create_consume_queue(topic, queue_id); + queue.get_min_offset_in_queue() } fn get_max_offset_in_queue(&self, topic: &str, queue_id: i32) -> i64 { @@ -464,9 +442,8 @@ impl ConsumeQueueStore { } fn load_logic(&mut self, topic: String, queue_id: i32) -> bool { - let file_queue_life_cycle = self.get_life_cycle(topic.as_str(), queue_id); - let result = file_queue_life_cycle.lock().load(); - result + let mut file_queue_life_cycle = self.get_life_cycle(topic.as_str(), queue_id); + file_queue_life_cycle.load() } fn put_consume_queue( @@ -477,7 +454,7 @@ impl ConsumeQueueStore { ) { let mut consume_queue_table = self.inner.consume_queue_table.lock(); let topic_table = consume_queue_table.entry(topic).or_default(); - topic_table.insert(queue_id, Arc::new(parking_lot::Mutex::new(consume_queue))); + topic_table.insert(queue_id, ArcCellWrapper::new(consume_queue)); } fn queue_type_should_be(&self, topic: &str, cq_type: CQType) { @@ -502,11 +479,8 @@ impl ConsumeQueueStore { }*/ fn truncate_dirty_logic_files(&self, topic: &str, queue_id: i32, phy_offset: i64) { - let file_queue_life_cycle = self.get_life_cycle(topic, queue_id); - file_queue_life_cycle - .as_ref() - .lock() - .truncate_dirty_logic_files(phy_offset); + let mut file_queue_life_cycle = self.get_life_cycle(topic, queue_id); + file_queue_life_cycle.truncate_dirty_logic_files(phy_offset); } } @@ -552,11 +526,7 @@ impl ConsumeQueueStore { } } - fn get_life_cycle( - &self, - topic: &str, - queue_id: i32, - ) -> Arc>> { + fn get_life_cycle(&self, topic: &str, queue_id: i32) -> ArcConsumeQueue { self.find_or_create_consume_queue(topic, queue_id) } } diff --git a/rocketmq-store/src/queue/single_consume_queue.rs b/rocketmq-store/src/queue/single_consume_queue.rs index 28127c1f..b2981df1 100644 --- a/rocketmq-store/src/queue/single_consume_queue.rs +++ b/rocketmq-store/src/queue/single_consume_queue.rs @@ -493,8 +493,8 @@ impl Swappable for ConsumeQueue { #[allow(unused_variables)] impl ConsumeQueueTrait for ConsumeQueue { - fn get_topic(&self) -> String { - self.topic.clone() + fn get_topic(&self) -> &str { + self.topic.as_str() } fn get_queue_id(&self) -> i32 {