diff --git a/rocketmq-store/src/queue/single_consume_queue.rs b/rocketmq-store/src/queue/single_consume_queue.rs index a88d7faa..017c708c 100644 --- a/rocketmq-store/src/queue/single_consume_queue.rs +++ b/rocketmq-store/src/queue/single_consume_queue.rs @@ -82,6 +82,7 @@ pub struct ConsumeQueue { } impl ConsumeQueue { + #[inline] pub fn new( topic: CheetahString, queue_id: i32, @@ -129,11 +130,13 @@ impl ConsumeQueue { } impl ConsumeQueue { + #[inline] pub fn set_max_physic_offset(&self, max_physic_offset: i64) { self.max_physic_offset .store(max_physic_offset, std::sync::atomic::Ordering::SeqCst); } + #[inline] pub fn truncate_dirty_logic_files_handler(&mut self, phy_offset: i64, delete_file: bool) { self.set_max_physic_offset(phy_offset); let mut max_ext_addr = 1i64; @@ -213,18 +216,22 @@ impl ConsumeQueue { } } + #[inline] pub fn is_ext_read_enable(&self) -> bool { self.consume_queue_ext.is_some() } + #[inline] pub fn is_ext_addr(tags_code: i64) -> bool { ConsumeQueueExt::is_ext_addr(tags_code) } + #[inline] pub fn is_ext_write_enable(&self) -> bool { self.consume_queue_ext.is_some() && self.message_store_config.enable_consume_queue_ext } + #[inline] pub fn put_message_position_info( &mut self, offset: i64, @@ -306,6 +313,7 @@ impl ConsumeQueue { } } + #[inline] fn fill_pre_blank(&self, mapped_file: &Arc, until_where: i64) { let mut bytes_mut = BytesMut::with_capacity(CQ_STORE_UNIT_SIZE as usize); @@ -320,6 +328,7 @@ impl ConsumeQueue { } } + #[inline] pub fn get_index_buffer(&self, start_index: i64) -> Option { let mapped_file_size = self.mapped_file_size; let offset = start_index * CQ_STORE_UNIT_SIZE as i64; @@ -336,6 +345,7 @@ impl ConsumeQueue { } impl FileQueueLifeCycle for ConsumeQueue { + #[inline] fn load(&mut self) -> bool { let mut result = self.mapped_file_queue.load(); info!( @@ -350,6 +360,7 @@ impl FileQueueLifeCycle for ConsumeQueue { result } + #[inline] fn recover(&mut self) { let binding = self.mapped_file_queue.get_mapped_files(); let mapped_files = binding.read(); @@ -440,14 +451,17 @@ impl FileQueueLifeCycle for ConsumeQueue { } } + #[inline] fn check_self(&self) { todo!() } + #[inline] fn flush(&self, flush_least_pages: i32) -> bool { todo!() } + #[inline] fn destroy(&mut self) { self.set_max_physic_offset(-1); self.min_logic_offset.store(0, Ordering::SeqCst); @@ -457,28 +471,34 @@ impl FileQueueLifeCycle for ConsumeQueue { } } + #[inline] fn truncate_dirty_logic_files(&mut self, max_commit_log_pos: i64) { self.truncate_dirty_logic_files_handler(max_commit_log_pos, true); } + #[inline] fn delete_expired_file(&self, min_commit_log_pos: i64) -> i32 { todo!() } + #[inline] fn roll_next_file(&self, next_begin_offset: i64) -> i64 { todo!() } + #[inline] fn is_first_file_available(&self) -> bool { todo!() } + #[inline] fn is_first_file_exist(&self) -> bool { todo!() } } impl Swappable for ConsumeQueue { + #[inline] fn swap_map( &self, reserve_num: i32, @@ -488,6 +508,7 @@ impl Swappable for ConsumeQueue { todo!() } + #[inline] fn clean_swapped_map(&self, _force_clean_swap_interval_ms: i64) { todo!() } @@ -495,14 +516,17 @@ impl Swappable for ConsumeQueue { #[allow(unused_variables)] impl ConsumeQueueTrait for ConsumeQueue { + #[inline] fn get_topic(&self) -> &CheetahString { &self.topic } + #[inline] fn get_queue_id(&self) -> i32 { self.queue_id } + #[inline] fn get(&self, index: i64) -> Option { match self.iterate_from(index) { None => None, @@ -510,42 +534,52 @@ impl ConsumeQueueTrait for ConsumeQueue { } } + #[inline] fn get_cq_unit_and_store_time(&self, index: i64) -> Option<(CqUnit, i64)> { todo!() } + #[inline] fn get_earliest_unit_and_store_time(&self) -> Option<(CqUnit, i64)> { todo!() } + #[inline] fn get_earliest_unit(&self) -> CqUnit { todo!() } + #[inline] fn get_latest_unit(&self) -> CqUnit { todo!() } + #[inline] fn get_last_offset(&self) -> i64 { todo!() } + #[inline] fn get_min_offset_in_queue(&self) -> i64 { self.min_logic_offset.load(Ordering::Acquire) / CQ_STORE_UNIT_SIZE as i64 } + #[inline] fn get_max_offset_in_queue(&self) -> i64 { self.mapped_file_queue.get_max_offset() / CQ_STORE_UNIT_SIZE as i64 } + #[inline] fn get_message_total_in_queue(&self) -> i64 { todo!() } + #[inline] fn get_offset_in_queue_by_time(&self, timestamp: i64) -> i64 { todo!() } + #[inline] fn get_offset_in_queue_by_time_boundary( &self, timestamp: i64, @@ -554,26 +588,32 @@ impl ConsumeQueueTrait for ConsumeQueue { todo!() } + #[inline] fn get_max_physic_offset(&self) -> i64 { self.max_physic_offset.load(Ordering::SeqCst) } + #[inline] fn get_min_logic_offset(&self) -> i64 { self.min_logic_offset.load(Ordering::Relaxed) } + #[inline] fn get_cq_type(&self) -> CQType { CQType::SimpleCQ } + #[inline] fn get_total_size(&self) -> i64 { todo!() } + #[inline] fn get_unit_size(&self) -> i32 { CQ_STORE_UNIT_SIZE } + #[inline] fn correct_min_offset(&self, min_commit_log_offset: i64) { if min_commit_log_offset >= self.mapped_file_queue.get_max_offset() { info!( @@ -715,6 +755,7 @@ impl ConsumeQueueTrait for ConsumeQueue { } } + #[inline] fn put_message_position_info_wrapper(&mut self, request: &DispatchRequest) { let max_retries = 30i32; let can_write = self.running_flags.is_cq_writeable(); @@ -770,6 +811,7 @@ impl ConsumeQueueTrait for ConsumeQueue { self.running_flags.make_logics_queue_error(); } + #[inline] fn increase_queue_offset( &self, queue_offset_assigner: &QueueOffsetOperator, @@ -782,6 +824,7 @@ impl ConsumeQueueTrait for ConsumeQueue { ); } + #[inline] fn assign_queue_offset( &self, queue_offset_operator: &QueueOffsetOperator, @@ -793,10 +836,12 @@ impl ConsumeQueueTrait for ConsumeQueue { msg.message_ext_inner.queue_offset = queue_offset; } + #[inline] fn estimate_message_count(&self, from: i64, to: i64, filter: &dyn MessageFilter) -> i64 { todo!() } + #[inline] fn iterate_from(&self, start_index: i64) -> Option>> { match self.get_index_buffer(start_index) { None => None, @@ -809,6 +854,7 @@ impl ConsumeQueueTrait for ConsumeQueue { } } + #[inline] fn iterate_from_inner( &self, start_index: i64,