Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #2060]Adding #[inline] for ConsumeQueueStore methods #2061

Merged
merged 1 commit into from
Jan 3, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 53 additions & 1 deletion rocketmq-store/src/queue/local_file_consume_queue_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl Inner {
}

impl ConsumeQueueStore {
#[inline]
pub fn new(
message_store_config: Arc<MessageStoreConfig>,
broker_config: Arc<BrokerConfig>,
Expand All @@ -97,10 +98,12 @@ impl ConsumeQueueStore {

#[allow(unused_variables)]
impl ConsumeQueueStoreTrait for ConsumeQueueStore {
#[inline]
fn start(&self) {
todo!()
}

#[inline]
fn load(&mut self) -> bool {
self.load_consume_queues(
CheetahString::from_string(get_store_path_consume_queue(
Expand All @@ -115,10 +118,12 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore {
)
}

#[inline]
fn load_after_destroy(&self) -> bool {
true
}

#[inline]
fn recover(&mut self) {
let mut mutex = self.inner.consume_queue_table.lock().clone();
for (_topic, consume_queue_table) in mutex.iter_mut() {
Expand All @@ -131,14 +136,17 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore {
}
}

#[inline]
fn recover_concurrently(&mut self) -> bool {
todo!()
}

#[inline]
fn shutdown(&self) -> bool {
todo!()
}

#[inline]
fn destroy(&self) {
let mutex = self.inner.consume_queue_table.lock().clone();
for consume_queue_table in mutex.values() {
Expand All @@ -151,24 +159,29 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore {
}
}

#[inline]
fn destroy_consume_queue(&self, consume_queue: &dyn ConsumeQueueTrait) {
let mut file_queue_life_cycle =
self.get_life_cycle(consume_queue.get_topic(), consume_queue.get_queue_id());
file_queue_life_cycle.destroy();
}

#[inline]
fn flush(&self, consume_queue: &dyn ConsumeQueueTrait, flush_least_pages: i32) -> bool {
todo!()
}

#[inline]
fn clean_expired(&self, min_phy_offset: i64) {
todo!()
}

#[inline]
fn check_self(&self) {
println!("ConsumeQueueStore::check_self unimplemented");
}

#[inline]
fn delete_expired_file(
&self,
consume_queue: &dyn ConsumeQueueTrait,
Expand All @@ -177,18 +190,22 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore {
todo!()
}

#[inline]
fn is_first_file_available(&self, consume_queue: &dyn ConsumeQueueTrait) -> bool {
todo!()
}

#[inline]
fn is_first_file_exist(&self, consume_queue: &dyn ConsumeQueueTrait) -> bool {
todo!()
}

#[inline]
fn roll_next_file(&self, consume_queue: &dyn ConsumeQueueTrait, offset: i64) -> i64 {
todo!()
}

#[inline]
fn truncate_dirty(&self, offset_to_truncate: i64) {
let cloned = self.inner.consume_queue_table.lock().clone();
for consume_queue_table in cloned.values() {
Expand All @@ -200,12 +217,14 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore {
}
}

#[inline]
fn put_message_position_info_wrapper(&self, request: &DispatchRequest) {
let mut cq = self.find_or_create_consume_queue(request.topic.as_ref(), request.queue_id);
self.put_message_position_info_wrapper_with_cq(&mut **cq.as_mut(), request);
// println!("put_message_position_info_wrapper-----{}", request.topic)
}

#[inline]
fn put_message_position_info_wrapper_with_cq(
&self,
consume_queue: &mut dyn ConsumeQueueTrait,
Expand All @@ -215,6 +234,7 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore {
.put_message_position_info_wrapper(consume_queue, request);
}

#[inline]
fn range_query(
&self,
topic: &CheetahString,
Expand All @@ -225,28 +245,34 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore {
todo!()
}

#[inline]
fn get_signal(&self, topic: &CheetahString, queue_id: i32, start_index: i64) -> Option<Bytes> {
todo!()
}

#[inline]
fn increase_queue_offset(&self, msg: &MessageExtBrokerInner, message_num: i16) {
let consume_queue = self.find_or_create_consume_queue(msg.get_topic(), msg.queue_id());
consume_queue.increase_queue_offset(&self.inner.queue_offset_operator, msg, message_num);
}

#[inline]
fn assign_queue_offset(&self, msg: &mut MessageExtBrokerInner) {
let consume_queue = self.find_or_create_consume_queue(msg.get_topic(), msg.queue_id());
consume_queue.assign_queue_offset(&self.inner.queue_offset_operator, msg);
}

#[inline]
fn increase_lmq_offset(&mut self, queue_key: &CheetahString, message_num: i16) {
todo!()
}

#[inline]
fn get_lmq_queue_offset(&self, queue_key: &CheetahString) -> i64 {
todo!()
}

#[inline]
fn recover_offset_table(&mut self, min_phy_offset: i64) {
let mut cq_offset_table = HashMap::with_capacity(1024);
let mut bcq_offset_table = HashMap::with_capacity(1024);
Expand Down Expand Up @@ -275,6 +301,7 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore {
self.set_batch_topic_queue_table(bcq_offset_table);
}

#[inline]
fn set_topic_queue_table(&mut self, topic_queue_table: HashMap<CheetahString, i64>) {
self.inner
.queue_offset_operator
Expand All @@ -284,18 +311,22 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore {
.set_lmq_topic_queue_table(topic_queue_table);
}

#[inline]
fn remove_topic_queue_table(&mut self, topic: &CheetahString, queue_id: i32) {
self.inner.queue_offset_operator.remove(topic, queue_id);
}

#[inline]
fn get_topic_queue_table(&self) -> HashMap<CheetahString, i64> {
todo!()
}

#[inline]
fn get_max_phy_offset_in_consume_queue_id(&self, topic: &CheetahString, queue_id: i32) -> i64 {
todo!()
}

#[inline]
fn get_max_phy_offset_in_consume_queue(&self) -> i64 {
let mut max_physic_offset = -1i64;
for (topic, consume_queue_table) in self.inner.consume_queue_table.lock().iter() {
Expand All @@ -309,6 +340,7 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore {
max_physic_offset
}

#[inline]
fn get_max_offset(&self, topic: &CheetahString, queue_id: i32) -> Option<i64> {
Some(
self.inner
Expand All @@ -317,6 +349,7 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore {
)
}

#[inline]
fn find_or_create_consume_queue(
&self,
topic: &CheetahString,
Expand Down Expand Up @@ -366,36 +399,43 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore {
consume_queue.clone()
}

#[inline]
fn find_consume_queue_map(
&self,
topic: &CheetahString,
) -> Option<HashMap<i32, ArcConsumeQueue>> {
todo!()
}

#[inline]
fn get_total_size(&self) -> i64 {
todo!()
}

#[inline]
fn get_store_time(&self, cq_unit: CqUnit) -> i64 {
todo!()
}

#[inline]
fn get_min_offset_in_queue(&self, topic: &CheetahString, queue_id: i32) -> i64 {
let queue = self.find_or_create_consume_queue(topic, queue_id);
queue.get_min_offset_in_queue()
}

#[inline]
fn get_max_offset_in_queue(&self, topic: &CheetahString, queue_id: i32) -> i64 {
todo!()
}

#[inline]
fn get_consume_queue_table(&self) -> Arc<ConsumeQueueTable> {
self.inner.consume_queue_table.clone()
}
}

impl ConsumeQueueStore {
#[inline]
pub fn correct_min_offset(
&self,
consume_queue: &dyn ConsumeQueueTrait,
Expand All @@ -404,6 +444,7 @@ impl ConsumeQueueStore {
consume_queue.correct_min_offset(min_commit_log_offset)
}

#[inline]
pub fn set_batch_topic_queue_table(
&self,
batch_topic_queue_table: HashMap<CheetahString, i64>,
Expand All @@ -413,6 +454,7 @@ impl ConsumeQueueStore {
.set_batch_topic_queue_table(batch_topic_queue_table)
}

#[inline]
fn load_consume_queues(&mut self, store_path: CheetahString, cq_type: CQType) -> bool {
let dir = Path::new(store_path.as_str());
if let Ok(ls) = fs::read_dir(dir) {
Expand Down Expand Up @@ -456,11 +498,13 @@ impl ConsumeQueueStore {
true
}

#[inline]
fn load_logic(&mut self, topic: &CheetahString, queue_id: i32) -> bool {
let mut file_queue_life_cycle = self.get_life_cycle(topic, queue_id);
file_queue_life_cycle.load()
}

#[inline]
fn put_consume_queue(
&self,
topic: CheetahString,
Expand All @@ -472,6 +516,7 @@ impl ConsumeQueueStore {
topic_table.insert(queue_id, ArcMut::new(consume_queue));
}

#[inline]
fn queue_type_should_be(&self, topic: &str, cq_type: CQType) {
let option = self.topic_config_table.lock().get(topic).cloned();
let act = QueueTypeUtils::get_cq_type(&option);
Expand All @@ -483,23 +528,29 @@ impl ConsumeQueueStore {
}
}

/*fn truncate_dirty_logic_files(&self, consume_queue: &dyn ConsumeQueueTrait, phy_offset: i64) {
/*

#[inline]
fn truncate_dirty_logic_files(&self, consume_queue: &dyn ConsumeQueueTrait, phy_offset: i64) {
let file_queue_life_cycle = self.get_life_cycle(
consume_queue.get_topic().as_str(),
consume_queue.get_queue_id(),
);
file_queue_life_cycle
.lock()
.truncate_dirty_logic_files(phy_offset);

}*/

#[inline]
fn truncate_dirty_logic_files(&self, topic: &CheetahString, queue_id: i32, phy_offset: i64) {
let mut file_queue_life_cycle = self.get_life_cycle(topic, queue_id);
file_queue_life_cycle.truncate_dirty_logic_files(phy_offset);
}
}

impl ConsumeQueueStore {
#[inline]
fn create_consume_queue_by_type(
&self,
topic: &CheetahString,
Expand Down Expand Up @@ -541,6 +592,7 @@ impl ConsumeQueueStore {
}
}

#[inline]
fn get_life_cycle(&self, topic: &CheetahString, queue_id: i32) -> ArcConsumeQueue {
self.find_or_create_consume_queue(topic, queue_id)
}
Expand Down
Loading