Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #696]🚀Support pull message consume-5 #697

Merged
merged 1 commit into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
use crate::protocol::command_custom_header::FromMap;
use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
use crate::protocol::header::namesrv::topic_operation_header::TopicRequestHeader;
use crate::rpc::rpc_request_header::RpcRequestHeader;

#[derive(Debug, Clone, Deserialize, Serialize, Default)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -239,6 +240,11 @@
self.proxy_forward_client_id = Some(str.clone());
}

self.topic_request = Some(TopicRequestHeader {
rpc: Some(RpcRequestHeader::default()),
..TopicRequestHeader::default()
});

Check warning on line 246 in rocketmq-remoting/src/protocol/header/pull_message_request_header.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/header/pull_message_request_header.rs#L243-L246

Added lines #L243 - L246 were not covered by tests

if let Some(str) = fields.get("lo") {
self.topic_request.as_mut().unwrap().lo = Some(str.parse::<bool>().unwrap());
}
Expand Down
11 changes: 11 additions & 0 deletions rocketmq-store/src/base/select_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::sync::Arc;

use crate::log_file::mapped_file::default_impl::DefaultMappedFile;
use crate::log_file::mapped_file::MappedFile;

/// Represents the result of selecting a mapped buffer.
pub struct SelectMappedBufferResult {
Expand All @@ -38,4 +39,14 @@
[self.start_offset as usize..(self.start_offset + self.size as u64) as usize]
.as_ref()
}

pub fn is_in_mem(&self) -> bool {
match self.mapped_file.as_ref() {
None => true,
Some(inner) => {
let pos = self.start_offset - inner.get_file_from_offset();
inner.is_loaded(pos as i64, self.size as usize)

Check warning on line 48 in rocketmq-store/src/base/select_result.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/base/select_result.rs#L43-L48

Added lines #L43 - L48 were not covered by tests
}
}
}

Check warning on line 51 in rocketmq-store/src/base/select_result.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/base/select_result.rs#L51

Added line #L51 was not covered by tests
Comment on lines +43 to +51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review the logic of is_in_mem method.

The method is_in_mem uses a match statement to determine if the mapped file is in memory based on the start_offset. The logic seems correct, but the return value when mapped_file is None is set to true, which might be incorrect as it assumes the buffer is in memory when there's no associated file. This could lead to potential bugs.

- None => true,
+ None => false,
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn is_in_mem(&self) -> bool {
match self.mapped_file.as_ref() {
None => true,
Some(inner) => {
let pos = self.start_offset - inner.get_file_from_offset();
inner.is_loaded(pos as i64, self.size as usize)
}
}
}
pub fn is_in_mem(&self) -> bool {
match self.mapped_file.as_ref() {
None => false,
Some(inner) => {
let pos = self.start_offset - inner.get_file_from_offset();
inner.is_loaded(pos as i64, self.size as usize)
}
}
}

}
16 changes: 16 additions & 0 deletions rocketmq-store/src/consume_queue/consume_queue_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,20 @@
filter_bit_map,
}
}

pub fn size(&self) -> i16 {
self.size
}
pub fn tags_code(&self) -> i64 {
self.tags_code
}
pub fn msg_store_time(&self) -> i64 {
self.msg_store_time
}
pub fn bit_map_size(&self) -> i16 {
self.bit_map_size
}
pub fn filter_bit_map(&self) -> &Option<Vec<u8>> {

Check warning on line 61 in rocketmq-store/src/consume_queue/consume_queue_ext.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/consume_queue/consume_queue_ext.rs#L49-L61

Added lines #L49 - L61 were not covered by tests
&self.filter_bit_map
}

Check warning on line 63 in rocketmq-store/src/consume_queue/consume_queue_ext.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/consume_queue/consume_queue_ext.rs#L63

Added line #L63 was not covered by tests
}
6 changes: 3 additions & 3 deletions rocketmq-store/src/log_file/mapped_file/default_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ impl MappedFile for DefaultMappedFile {
start_offset: self.file_from_offset + pos as u64,
size,
mapped_file: Some(self),
is_in_cache: false,
is_in_cache: true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Property is_in_cache Set to Always true

The property is_in_cache has been set to always return true. This change could potentially improve the performance by reducing disk access times, but it may also lead to data consistency issues if not handled correctly across different scenarios.

Consider adding checks or conditions that ensure this property does not lead to stale data being served, especially in environments with high concurrency or frequent data updates.

- is_in_cache: true,
+ is_in_cache: self.check_cache_validity(),

Also applies to: 391-391

})
} else {
None
Expand All @@ -388,7 +388,7 @@ impl MappedFile for DefaultMappedFile {
start_offset: self.get_file_from_offset() + pos as u64,
size: read_position - pos,
mapped_file: Some(self),
is_in_cache: false,
is_in_cache: true,
})
} else {
None
Expand Down Expand Up @@ -588,7 +588,7 @@ impl MappedFile for DefaultMappedFile {
}*/

fn is_loaded(&self, position: i64, size: usize) -> bool {
todo!()
true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method is_loaded Now Always Returns true

Setting is_loaded to always return true simplifies the interface but risks masking issues where data may not be properly loaded. This could lead to serious application errors.

It's recommended to implement a more robust checking mechanism that verifies the actual load status of the data.

- true
+ self.verify_data_integrity()

Committable suggestion was skipped due to low confidence.

}
}

Expand Down
31 changes: 30 additions & 1 deletion rocketmq-store/src/message_store/default_message_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,14 @@
}
next_offset
}

fn check_in_mem_by_commit_offset(&self, offset_py: i64, size: i32) -> bool {
let message = self.commit_log.get_message(offset_py, size);
match message {
None => false,
Some(msg) => msg.is_in_mem(),

Check warning on line 445 in rocketmq-store/src/message_store/default_message_store.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/message_store/default_message_store.rs#L441-L445

Added lines #L441 - L445 were not covered by tests
}
Comment on lines +441 to +446
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

Potential issue with get_message method implementation

The get_message method in commit_log.rs currently returns None, which will cause the check_in_mem_by_commit_offset method to always return false. Ensure that get_message is fully implemented and correctly retrieves messages from the commit log.

  • rocketmq-store/src/log_file/commit_log.rs: get_message method needs implementation.
Analysis chain

Method check_in_mem_by_commit_offset looks good but verify dependent methods.

The implementation uses Rust's pattern matching effectively. Ensure that the get_message and is_in_mem methods of CommitLog and Message respectively are optimized and correctly handle edge cases.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the implementations of `get_message` and `is_in_mem`.

# Test: Search for the implementations of these methods. Expect: Efficient and correct implementations.
ast-grep --lang rust --pattern $'impl CommitLog {
  $$$
  get_message($_, $_) {
    $$$
  }
  $$$  
}'
ast-grep --lang rust --pattern $'impl Message {
  $$$
  is_in_mem() {
    $$$
  }
  $$$  
}'

Length of output: 1169



Script:

#!/bin/bash
# Description: Verify the implementations of `get_message` and `is_in_mem`.

# Corrected pattern for `get_message` method in `CommitLog` implementation.
ast-grep --lang rust --pattern 'impl CommitLog {
  fn get_message($_, $_) -> $_ {
    $$$
  }
}'

# Corrected pattern for `is_in_mem` method in `Message` implementation.
ast-grep --lang rust --pattern 'impl Message {
  fn is_in_mem(&self) -> $_ {
    $$$
  }
}'

Length of output: 192



Script:

#!/bin/bash
# Description: Search for the implementations of `get_message` and `is_in_mem` methods across the codebase.

# Search for `get_message` method in the codebase.
rg 'fn get_message'

# Search for `is_in_mem` method in the codebase.
rg 'fn is_in_mem'

Length of output: 1278



Script:

#!/bin/bash
# Extract and review the implementation of `get_message` method from commit_log.rs.
rg -A 10 -B 2 'fn get_message' rocketmq-store/src/log_file/commit_log.rs

# Extract and review the implementation of `is_in_mem` method from select_result.rs.
rg -A 10 -B 2 'fn is_in_mem' rocketmq-store/src/base/select_result.rs

Length of output: 1322

}

Check warning on line 447 in rocketmq-store/src/message_store/default_message_store.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/message_store/default_message_store.rs#L447

Added line #L447 was not covered by tests
}

fn estimate_in_mem_by_commit_offset(
Expand Down Expand Up @@ -926,7 +934,28 @@
consume_offset: i64,
batch_size: i32,
) -> bool {
todo!()
let consume_queue = self

Check warning on line 937 in rocketmq-store/src/message_store/default_message_store.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/message_store/default_message_store.rs#L937

Added line #L937 was not covered by tests
.consume_queue_store
.find_or_create_consume_queue(topic, queue_id);
let first_cqitem = consume_queue.lock().get(consume_offset);
if first_cqitem.is_none() {
return false;

Check warning on line 942 in rocketmq-store/src/message_store/default_message_store.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/message_store/default_message_store.rs#L940-L942

Added lines #L940 - L942 were not covered by tests
}
let cq = first_cqitem.as_ref().unwrap();
let start_offset_py = cq.pos;
if batch_size <= 1 {
let size = cq.size;
return self.check_in_mem_by_commit_offset(start_offset_py, size);

Check warning on line 948 in rocketmq-store/src/message_store/default_message_store.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/message_store/default_message_store.rs#L944-L948

Added lines #L944 - L948 were not covered by tests
}
let last_cqitem = consume_queue.lock().get(consume_offset + batch_size as i64);
if last_cqitem.is_none() {
let size = cq.size;
return self.check_in_mem_by_commit_offset(start_offset_py, size);

Check warning on line 953 in rocketmq-store/src/message_store/default_message_store.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/message_store/default_message_store.rs#L950-L953

Added lines #L950 - L953 were not covered by tests
}
let last_cqitem = last_cqitem.as_ref().unwrap();
let end_offset_py = last_cqitem.pos;
let size = (end_offset_py - start_offset_py) + last_cqitem.size as i64;
self.check_in_mem_by_commit_offset(start_offset_py, size as i32)

Check warning on line 958 in rocketmq-store/src/message_store/default_message_store.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/message_store/default_message_store.rs#L955-L958

Added lines #L955 - L958 were not covered by tests
}
}

Expand Down
17 changes: 16 additions & 1 deletion rocketmq-store/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,21 @@
pub compacted_offset: i32,
}

impl Default for CqUnit {
fn default() -> Self {
CqUnit {

Check warning on line 95 in rocketmq-store/src/queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/queue.rs#L94-L95

Added lines #L94 - L95 were not covered by tests
queue_offset: 0,
size: 0,
pos: 0,
batch_num: 1,
tags_code: 0,
cq_ext_unit: None,
native_buffer: vec![],

Check warning on line 102 in rocketmq-store/src/queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/queue.rs#L101-L102

Added lines #L101 - L102 were not covered by tests
compacted_offset: 0,
}
}

Check warning on line 105 in rocketmq-store/src/queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/queue.rs#L104-L105

Added lines #L104 - L105 were not covered by tests
}

impl CqUnit {
pub fn get_valid_tags_code_as_long(&self) -> Option<i64> {
if !self.is_tags_code_valid() {
Expand Down Expand Up @@ -291,7 +306,7 @@
) -> Result<Box<dyn Iterator<Item = CqUnit>>, RocksDBException>;*/

/// Get cq unit at specified index.
fn get(&self, index: i64) -> CqUnit;
fn get(&self, index: i64) -> Option<CqUnit>;

fn get_cq_unit_and_store_time(&self, index: i64) -> Option<(CqUnit, i64)>;

Expand Down
2 changes: 1 addition & 1 deletion rocketmq-store/src/queue/batch_consume_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@
todo!()
}

fn get(&self, index: i64) -> CqUnit {
fn get(&self, index: i64) -> Option<CqUnit> {

Check warning on line 208 in rocketmq-store/src/queue/batch_consume_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/queue/batch_consume_queue.rs#L208

Added line #L208 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Placeholder method get in BatchConsumeQueue.

The method get is currently marked with unimplemented!. This is a typical placeholder in Rust for functionality that is yet to be implemented. Ensure that this method is implemented before merging the PR or handling it properly to avoid runtime panics.

todo!()
}

Expand Down
4 changes: 4 additions & 0 deletions rocketmq-store/src/queue/consume_queue_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,8 @@
pub fn destroy(&mut self) {
self.mapped_file_queue.destroy();
}

pub fn get(&self, address: i64, cq_ext_unit: &CqExtUnit) -> bool {
unimplemented!()

Check warning on line 97 in rocketmq-store/src/queue/consume_queue_ext.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/queue/consume_queue_ext.rs#L96-L97

Added lines #L96 - L97 were not covered by tests
}
Comment on lines +96 to +98
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Placeholder method get in ConsumeQueueExt.

The method get is currently marked with unimplemented!. This is a typical placeholder in Rust for functionality that is yet to be implemented. Ensure that this method is implemented before merging the PR or handling it properly to avoid runtime panics.

}
98 changes: 94 additions & 4 deletions rocketmq-store/src/queue/single_consume_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

use bytes::Buf;
use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;
use rocketmq_common::common::attribute::cq_type::CQType;
use rocketmq_common::common::boundary_type::BoundaryType;
Expand All @@ -31,6 +32,7 @@
use tracing::warn;

use crate::base::dispatch_request::DispatchRequest;
use crate::base::select_result::SelectMappedBufferResult;
use crate::base::store_checkpoint::StoreCheckpoint;
use crate::base::swappable::Swappable;
use crate::config::broker_role::BrokerRole;
Expand Down Expand Up @@ -315,6 +317,20 @@
mapped_file.append_message_bytes(&bytes);
}
}

pub fn get_index_buffer(&self, start_index: i64) -> Option<SelectMappedBufferResult> {
let mapped_file_size = self.mapped_file_size;
let offset = start_index * CQ_STORE_UNIT_SIZE as i64;
if offset >= self.get_min_logic_offset() {
if let Some(mapped_file) = self

Check warning on line 325 in rocketmq-store/src/queue/single_consume_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/queue/single_consume_queue.rs#L321-L325

Added lines #L321 - L325 were not covered by tests
.mapped_file_queue
.find_mapped_file_by_offset(offset, false)
{
return mapped_file.select_mapped_buffer((offset % mapped_file_size as i64) as i32);

Check warning on line 329 in rocketmq-store/src/queue/single_consume_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/queue/single_consume_queue.rs#L329

Added line #L329 was not covered by tests
}
}
None
}

Check warning on line 333 in rocketmq-store/src/queue/single_consume_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/queue/single_consume_queue.rs#L331-L333

Added lines #L331 - L333 were not covered by tests
Comment on lines +321 to +333
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add error handling in get_index_buffer method.

Consider adding more robust error handling and validation for out-of-bounds or invalid index values to prevent potential runtime errors.

if offset < self.get_min_logic_offset() {
    return None; // Early return if offset is out of bounds
}

}

impl FileQueueLifeCycle for ConsumeQueue {
Expand Down Expand Up @@ -485,8 +501,11 @@
self.queue_id
}

fn get(&self, index: i64) -> CqUnit {
todo!()
fn get(&self, index: i64) -> Option<CqUnit> {
match self.iterate_from(index) {
None => None,
Some(value) => None,

Check warning on line 507 in rocketmq-store/src/queue/single_consume_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/queue/single_consume_queue.rs#L504-L507

Added lines #L504 - L507 were not covered by tests
}
}

fn get_cq_unit_and_store_time(&self, index: i64) -> Option<(CqUnit, i64)> {
Expand Down Expand Up @@ -538,7 +557,7 @@
}

fn get_min_logic_offset(&self) -> i64 {
todo!()
self.min_logic_offset.load(Ordering::Relaxed)

Check warning on line 560 in rocketmq-store/src/queue/single_consume_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/queue/single_consume_queue.rs#L560

Added line #L560 was not covered by tests
}

fn get_cq_type(&self) -> CQType {
Expand Down Expand Up @@ -776,7 +795,15 @@
}

fn iterate_from(&self, start_index: i64) -> Option<Box<dyn Iterator<Item = CqUnit>>> {
todo!()
match self.get_index_buffer(start_index) {
None => None,
Some(value) => Some(Box::new(ConsumeQueueIterator {
smbr: Some(value),

Check warning on line 801 in rocketmq-store/src/queue/single_consume_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/queue/single_consume_queue.rs#L798-L801

Added lines #L798 - L801 were not covered by tests
relative_pos: 0,
counter: 0,
consume_queue_ext: self.consume_queue_ext.clone(),
})),

Check warning on line 805 in rocketmq-store/src/queue/single_consume_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/queue/single_consume_queue.rs#L804-L805

Added lines #L804 - L805 were not covered by tests
}
}

fn iterate_from_inner(
Expand All @@ -787,3 +814,66 @@
todo!()
}
}

struct ConsumeQueueIterator {
smbr: Option<SelectMappedBufferResult>,
relative_pos: i32,
counter: i32,
consume_queue_ext: Option<ConsumeQueueExt>,
}

impl ConsumeQueueIterator {
fn get_ext(&self, offset: i64, cq_ext_unit: &CqExtUnit) -> bool {
match self.consume_queue_ext.as_ref() {
None => false,
Some(value) => value.get(offset, cq_ext_unit),

Check warning on line 829 in rocketmq-store/src/queue/single_consume_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/queue/single_consume_queue.rs#L826-L829

Added lines #L826 - L829 were not covered by tests
}
}

Check warning on line 831 in rocketmq-store/src/queue/single_consume_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/queue/single_consume_queue.rs#L831

Added line #L831 was not covered by tests
}

impl Iterator for ConsumeQueueIterator {
type Item = CqUnit;

fn next(&mut self) -> Option<Self::Item> {
match self.smbr.as_ref() {
None => None,
Some(value) => {
if self.counter * CQ_STORE_UNIT_SIZE >= value.size {
return None;

Check warning on line 842 in rocketmq-store/src/queue/single_consume_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/queue/single_consume_queue.rs#L837-L842

Added lines #L837 - L842 were not covered by tests
}
let mmp = value.mapped_file.as_ref().unwrap().get_mapped_file();

Check warning on line 844 in rocketmq-store/src/queue/single_consume_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/queue/single_consume_queue.rs#L844

Added line #L844 was not covered by tests
let start =
value.start_offset as usize + (self.counter * CQ_STORE_UNIT_SIZE) as usize;
self.counter += 1;
let end = start + CQ_STORE_UNIT_SIZE as usize;
let mut bytes = Bytes::copy_from_slice(&mmp[start..end]);
let pos = bytes.get_i64();
let size = bytes.get_i32();
let tags_code = bytes.get_i64();
let mut cq_unit = CqUnit {
queue_offset: start as i64 / CQ_STORE_UNIT_SIZE as i64,

Check warning on line 854 in rocketmq-store/src/queue/single_consume_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/queue/single_consume_queue.rs#L846-L854

Added lines #L846 - L854 were not covered by tests
size,
pos,
tags_code,
..CqUnit::default()

Check warning on line 858 in rocketmq-store/src/queue/single_consume_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/queue/single_consume_queue.rs#L858

Added line #L858 was not covered by tests
};

if ConsumeQueueExt::is_ext_addr(cq_unit.tags_code) {
let cq_ext_unit = CqExtUnit::default();
let ext_ret = self.get_ext(cq_unit.tags_code, &cq_ext_unit);
if ext_ret {
cq_unit.tags_code = cq_ext_unit.tags_code();
cq_unit.cq_ext_unit = Some(cq_ext_unit);

Check warning on line 866 in rocketmq-store/src/queue/single_consume_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/queue/single_consume_queue.rs#L861-L866

Added lines #L861 - L866 were not covered by tests
} else {
error!(

Check warning on line 868 in rocketmq-store/src/queue/single_consume_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/queue/single_consume_queue.rs#L868

Added line #L868 was not covered by tests
"[BUG] can't find consume queue extend file content! addr={}, \
offsetPy={}, sizePy={}",
cq_unit.tags_code, cq_unit.pos, cq_unit.pos,
);
}
}
Some(cq_unit)
}

Check warning on line 876 in rocketmq-store/src/queue/single_consume_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/queue/single_consume_queue.rs#L874-L876

Added lines #L874 - L876 were not covered by tests
}
}

Check warning on line 878 in rocketmq-store/src/queue/single_consume_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/queue/single_consume_queue.rs#L878

Added line #L878 was not covered by tests
}
Comment on lines +818 to +879
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor ConsumeQueueIterator for better error handling and state management.

The iterator should robustly handle cases where the underlying data might be modified during iteration, preventing inconsistent states or crashes.

if let Some(mmp) = value.mapped_file.as_ref() {
    // Proceed with existing logic
} else {
    error!("Mapped file missing during iteration.");
    return None;
}

Loading