Skip to content

Commit

Permalink
Adding #[inline] for ConsumeQueue methods
Browse files Browse the repository at this point in the history
  • Loading branch information
nakul-py committed Jan 3, 2025
1 parent 3dda72b commit f6d671f
Showing 1 changed file with 46 additions and 0 deletions.
46 changes: 46 additions & 0 deletions rocketmq-store/src/queue/single_consume_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub struct ConsumeQueue {
}

impl ConsumeQueue {
#[inline]
pub fn new(
topic: CheetahString,
queue_id: i32,
Expand Down Expand Up @@ -129,11 +130,13 @@ impl ConsumeQueue {
}

impl ConsumeQueue {
#[inline]
pub fn set_max_physic_offset(&self, max_physic_offset: i64) {
self.max_physic_offset
.store(max_physic_offset, std::sync::atomic::Ordering::SeqCst);
}

#[inline]
pub fn truncate_dirty_logic_files_handler(&mut self, phy_offset: i64, delete_file: bool) {
self.set_max_physic_offset(phy_offset);
let mut max_ext_addr = 1i64;
Expand Down Expand Up @@ -213,18 +216,22 @@ impl ConsumeQueue {
}
}

#[inline]
pub fn is_ext_read_enable(&self) -> bool {
self.consume_queue_ext.is_some()
}

#[inline]
pub fn is_ext_addr(tags_code: i64) -> bool {
ConsumeQueueExt::is_ext_addr(tags_code)
}

#[inline]
pub fn is_ext_write_enable(&self) -> bool {
self.consume_queue_ext.is_some() && self.message_store_config.enable_consume_queue_ext
}

#[inline]
pub fn put_message_position_info(
&mut self,
offset: i64,
Expand Down Expand Up @@ -306,6 +313,7 @@ impl ConsumeQueue {
}
}

#[inline]
fn fill_pre_blank(&self, mapped_file: &Arc<DefaultMappedFile>, until_where: i64) {
let mut bytes_mut = BytesMut::with_capacity(CQ_STORE_UNIT_SIZE as usize);

Expand All @@ -320,6 +328,7 @@ impl ConsumeQueue {
}
}

#[inline]
pub fn get_index_buffer(&self, start_index: i64) -> Option<SelectMappedBufferResult> {
let mapped_file_size = self.mapped_file_size;
let offset = start_index * CQ_STORE_UNIT_SIZE as i64;
Expand All @@ -336,6 +345,7 @@ impl ConsumeQueue {
}

impl FileQueueLifeCycle for ConsumeQueue {
#[inline]
fn load(&mut self) -> bool {
let mut result = self.mapped_file_queue.load();
info!(
Expand All @@ -350,6 +360,7 @@ impl FileQueueLifeCycle for ConsumeQueue {
result
}

#[inline]
fn recover(&mut self) {
let binding = self.mapped_file_queue.get_mapped_files();
let mapped_files = binding.read();
Expand Down Expand Up @@ -440,14 +451,17 @@ impl FileQueueLifeCycle for ConsumeQueue {
}
}

#[inline]
fn check_self(&self) {
todo!()
}

#[inline]
fn flush(&self, flush_least_pages: i32) -> bool {
todo!()
}

#[inline]
fn destroy(&mut self) {
self.set_max_physic_offset(-1);
self.min_logic_offset.store(0, Ordering::SeqCst);
Expand All @@ -457,28 +471,34 @@ impl FileQueueLifeCycle for ConsumeQueue {
}
}

#[inline]
fn truncate_dirty_logic_files(&mut self, max_commit_log_pos: i64) {
self.truncate_dirty_logic_files_handler(max_commit_log_pos, true);
}

#[inline]
fn delete_expired_file(&self, min_commit_log_pos: i64) -> i32 {
todo!()
}

#[inline]
fn roll_next_file(&self, next_begin_offset: i64) -> i64 {
todo!()
}

#[inline]
fn is_first_file_available(&self) -> bool {
todo!()
}

#[inline]
fn is_first_file_exist(&self) -> bool {
todo!()
}
}

impl Swappable for ConsumeQueue {
#[inline]
fn swap_map(
&self,
reserve_num: i32,
Expand All @@ -488,64 +508,78 @@ impl Swappable for ConsumeQueue {
todo!()
}

#[inline]
fn clean_swapped_map(&self, _force_clean_swap_interval_ms: i64) {
todo!()
}
}

#[allow(unused_variables)]
impl ConsumeQueueTrait for ConsumeQueue {
#[inline]
fn get_topic(&self) -> &CheetahString {
&self.topic
}

#[inline]
fn get_queue_id(&self) -> i32 {
self.queue_id
}

#[inline]
fn get(&self, index: i64) -> Option<CqUnit> {
match self.iterate_from(index) {
None => None,
Some(value) => None,
}
}

#[inline]
fn get_cq_unit_and_store_time(&self, index: i64) -> Option<(CqUnit, i64)> {
todo!()
}

#[inline]
fn get_earliest_unit_and_store_time(&self) -> Option<(CqUnit, i64)> {
todo!()
}

#[inline]
fn get_earliest_unit(&self) -> CqUnit {
todo!()
}

#[inline]
fn get_latest_unit(&self) -> CqUnit {
todo!()
}

#[inline]
fn get_last_offset(&self) -> i64 {
todo!()
}

#[inline]
fn get_min_offset_in_queue(&self) -> i64 {
self.min_logic_offset.load(Ordering::Acquire) / CQ_STORE_UNIT_SIZE as i64
}

#[inline]
fn get_max_offset_in_queue(&self) -> i64 {
self.mapped_file_queue.get_max_offset() / CQ_STORE_UNIT_SIZE as i64
}

#[inline]
fn get_message_total_in_queue(&self) -> i64 {
todo!()
}

#[inline]
fn get_offset_in_queue_by_time(&self, timestamp: i64) -> i64 {
todo!()
}

#[inline]
fn get_offset_in_queue_by_time_boundary(
&self,
timestamp: i64,
Expand All @@ -554,26 +588,32 @@ impl ConsumeQueueTrait for ConsumeQueue {
todo!()
}

#[inline]
fn get_max_physic_offset(&self) -> i64 {
self.max_physic_offset.load(Ordering::SeqCst)
}

#[inline]
fn get_min_logic_offset(&self) -> i64 {
self.min_logic_offset.load(Ordering::Relaxed)
}

#[inline]
fn get_cq_type(&self) -> CQType {
CQType::SimpleCQ
}

#[inline]
fn get_total_size(&self) -> i64 {
todo!()
}

#[inline]
fn get_unit_size(&self) -> i32 {
CQ_STORE_UNIT_SIZE
}

#[inline]
fn correct_min_offset(&self, min_commit_log_offset: i64) {
if min_commit_log_offset >= self.mapped_file_queue.get_max_offset() {
info!(
Expand Down Expand Up @@ -715,6 +755,7 @@ impl ConsumeQueueTrait for ConsumeQueue {
}
}

#[inline]
fn put_message_position_info_wrapper(&mut self, request: &DispatchRequest) {
let max_retries = 30i32;
let can_write = self.running_flags.is_cq_writeable();
Expand Down Expand Up @@ -770,6 +811,7 @@ impl ConsumeQueueTrait for ConsumeQueue {
self.running_flags.make_logics_queue_error();
}

#[inline]
fn increase_queue_offset(
&self,
queue_offset_assigner: &QueueOffsetOperator,
Expand All @@ -782,6 +824,7 @@ impl ConsumeQueueTrait for ConsumeQueue {
);
}

#[inline]
fn assign_queue_offset(
&self,
queue_offset_operator: &QueueOffsetOperator,
Expand All @@ -793,10 +836,12 @@ impl ConsumeQueueTrait for ConsumeQueue {
msg.message_ext_inner.queue_offset = queue_offset;
}

#[inline]
fn estimate_message_count(&self, from: i64, to: i64, filter: &dyn MessageFilter) -> i64 {
todo!()
}

#[inline]
fn iterate_from(&self, start_index: i64) -> Option<Box<dyn Iterator<Item = CqUnit>>> {
match self.get_index_buffer(start_index) {
None => None,
Expand All @@ -809,6 +854,7 @@ impl ConsumeQueueTrait for ConsumeQueue {
}
}

#[inline]
fn iterate_from_inner(
&self,
start_index: i64,
Expand Down

0 comments on commit f6d671f

Please sign in to comment.