-
Notifications
You must be signed in to change notification settings - Fork 112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #696]🚀Support pull message consume-5 #697
Conversation
WalkthroughThis update introduces enhancements and new features to various components of RocketMQ, with a primary focus on supporting pull message consumption. Significant changes include adding methods for checking if messages are in memory or cache, updating buffer selection logic, and enhancing access methods for struct properties. Changes
Assessment against linked issues
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files selected for processing (9)
- rocketmq-remoting/src/protocol/header/pull_message_request_header.rs (2 hunks)
- rocketmq-store/src/base/select_result.rs (2 hunks)
- rocketmq-store/src/consume_queue/consume_queue_ext.rs (1 hunks)
- rocketmq-store/src/log_file/mapped_file/default_impl.rs (3 hunks)
- rocketmq-store/src/message_store/default_message_store.rs (2 hunks)
- rocketmq-store/src/queue.rs (2 hunks)
- rocketmq-store/src/queue/batch_consume_queue.rs (1 hunks)
- rocketmq-store/src/queue/consume_queue_ext.rs (1 hunks)
- rocketmq-store/src/queue/single_consume_queue.rs (7 hunks)
Additional comments not posted (6)
rocketmq-store/src/consume_queue/consume_queue_ext.rs (1)
49-63
: Public access methods added forCqExtUnit
fields.The methods added for accessing the fields of
CqExtUnit
are correctly implemented and follow Rust's conventions for getter methods. These changes enhance encapsulation by allowing controlled access to the fields.rocketmq-store/src/queue.rs (1)
93-105
: Default implementation forCqUnit
.The default implementation for
CqUnit
is provided, setting reasonable default values for its fields. This implementation follows Rust's conventions and helps in creating instances ofCqUnit
with default values easily.rocketmq-remoting/src/protocol/header/pull_message_request_header.rs (2)
28-28
: Import Statement Added:RpcRequestHeader
The addition of the
RpcRequestHeader
import is appropriate given the initialization oftopic_request
with aRpcRequestHeader
instance. This change aligns with the new feature requirements.
243-246
: Initialization oftopic_request
withRpcRequestHeader
The initialization of
topic_request
with a defaultRpcRequestHeader
is crucial for supporting the new pull message consume-5 feature. This change is well-implemented.Ensure that this new initialization is compatible and does not introduce inconsistencies with other parts of the application.
Verification successful
Initialization of
topic_request
withRpcRequestHeader
The initialization of
topic_request
with a defaultRpcRequestHeader
is crucial for supporting the new pull message consume-5 feature. This change appears well-implemented and consistent across the application.
rocketmq-remoting/src/rpc/rpc_request.rs
rocketmq-remoting/src/rpc/rpc_request_header.rs
rocketmq-remoting/src/rpc/topic_request_header.rs
rocketmq-remoting/src/protocol/header/unregister_client_request_header.rs
rocketmq-remoting/src/protocol/header/get_consumer_listby_group_request_header.rs
rocketmq-remoting/src/protocol/header/namesrv/topic_operation_header.rs
rocketmq-remoting/src/protocol/header/query_consumer_offset_request_header.rs
Ensure that this new initialization is compatible and does not introduce inconsistencies with other parts of the application.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify if the new initialization of `topic_request` is consistent across the application. # Test: Search for usage of `RpcRequestHeader` and `topic_request` across all Rust files to ensure consistency. rg --type rust $'RpcRequestHeader|topic_request'Length of output: 7531
rocketmq-store/src/queue/single_consume_queue.rs (2)
24-24
: Approved the addition of new imports for byte manipulation.These imports are essential for handling byte-level data manipulation which is typical in message queuing systems.
504-508
: Approved the modification inget
method to returnOption<CqUnit>
.This change enhances safety by properly handling cases where the queue item is not found, avoiding potential panics.
pub fn get_index_buffer(&self, start_index: i64) -> Option<SelectMappedBufferResult> { | ||
let mapped_file_size = self.mapped_file_size; | ||
let offset = start_index * CQ_STORE_UNIT_SIZE as i64; | ||
if offset >= self.get_min_logic_offset() { | ||
if let Some(mapped_file) = self | ||
.mapped_file_queue | ||
.find_mapped_file_by_offset(offset, false) | ||
{ | ||
return mapped_file.select_mapped_buffer((offset % mapped_file_size as i64) as i32); | ||
} | ||
} | ||
None | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add error handling in get_index_buffer
method.
Consider adding more robust error handling and validation for out-of-bounds or invalid index values to prevent potential runtime errors.
if offset < self.get_min_logic_offset() {
return None; // Early return if offset is out of bounds
}
struct ConsumeQueueIterator { | ||
smbr: Option<SelectMappedBufferResult>, | ||
relative_pos: i32, | ||
counter: i32, | ||
consume_queue_ext: Option<ConsumeQueueExt>, | ||
} | ||
|
||
impl ConsumeQueueIterator { | ||
fn get_ext(&self, offset: i64, cq_ext_unit: &CqExtUnit) -> bool { | ||
match self.consume_queue_ext.as_ref() { | ||
None => false, | ||
Some(value) => value.get(offset, cq_ext_unit), | ||
} | ||
} | ||
} | ||
|
||
impl Iterator for ConsumeQueueIterator { | ||
type Item = CqUnit; | ||
|
||
fn next(&mut self) -> Option<Self::Item> { | ||
match self.smbr.as_ref() { | ||
None => None, | ||
Some(value) => { | ||
if self.counter * CQ_STORE_UNIT_SIZE >= value.size { | ||
return None; | ||
} | ||
let mmp = value.mapped_file.as_ref().unwrap().get_mapped_file(); | ||
let start = | ||
value.start_offset as usize + (self.counter * CQ_STORE_UNIT_SIZE) as usize; | ||
self.counter += 1; | ||
let end = start + CQ_STORE_UNIT_SIZE as usize; | ||
let mut bytes = Bytes::copy_from_slice(&mmp[start..end]); | ||
let pos = bytes.get_i64(); | ||
let size = bytes.get_i32(); | ||
let tags_code = bytes.get_i64(); | ||
let mut cq_unit = CqUnit { | ||
queue_offset: start as i64 / CQ_STORE_UNIT_SIZE as i64, | ||
size, | ||
pos, | ||
tags_code, | ||
..CqUnit::default() | ||
}; | ||
|
||
if ConsumeQueueExt::is_ext_addr(cq_unit.tags_code) { | ||
let cq_ext_unit = CqExtUnit::default(); | ||
let ext_ret = self.get_ext(cq_unit.tags_code, &cq_ext_unit); | ||
if ext_ret { | ||
cq_unit.tags_code = cq_ext_unit.tags_code(); | ||
cq_unit.cq_ext_unit = Some(cq_ext_unit); | ||
} else { | ||
error!( | ||
"[BUG] can't find consume queue extend file content! addr={}, \ | ||
offsetPy={}, sizePy={}", | ||
cq_unit.tags_code, cq_unit.pos, cq_unit.pos, | ||
); | ||
} | ||
} | ||
Some(cq_unit) | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor ConsumeQueueIterator
for better error handling and state management.
The iterator should robustly handle cases where the underlying data might be modified during iteration, preventing inconsistent states or crashes.
if let Some(mmp) = value.mapped_file.as_ref() {
// Proceed with existing logic
} else {
error!("Mapped file missing during iteration.");
return None;
}
fn check_in_mem_by_commit_offset(&self, offset_py: i64, size: i32) -> bool { | ||
let message = self.commit_log.get_message(offset_py, size); | ||
match message { | ||
None => false, | ||
Some(msg) => msg.is_in_mem(), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tip
Codebase Verification
Potential issue with get_message
method implementation
The get_message
method in commit_log.rs
currently returns None
, which will cause the check_in_mem_by_commit_offset
method to always return false
. Ensure that get_message
is fully implemented and correctly retrieves messages from the commit log.
rocketmq-store/src/log_file/commit_log.rs: get_message
method needs implementation.
Analysis chain
Method check_in_mem_by_commit_offset
looks good but verify dependent methods.
The implementation uses Rust's pattern matching effectively. Ensure that the get_message
and is_in_mem
methods of CommitLog
and Message
respectively are optimized and correctly handle edge cases.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the implementations of `get_message` and `is_in_mem`.
# Test: Search for the implementations of these methods. Expect: Efficient and correct implementations.
ast-grep --lang rust --pattern $'impl CommitLog {
$$$
get_message($_, $_) {
$$$
}
$$$
}'
ast-grep --lang rust --pattern $'impl Message {
$$$
is_in_mem() {
$$$
}
$$$
}'
Length of output: 1169
Script:
#!/bin/bash
# Description: Verify the implementations of `get_message` and `is_in_mem`.
# Corrected pattern for `get_message` method in `CommitLog` implementation.
ast-grep --lang rust --pattern 'impl CommitLog {
fn get_message($_, $_) -> $_ {
$$$
}
}'
# Corrected pattern for `is_in_mem` method in `Message` implementation.
ast-grep --lang rust --pattern 'impl Message {
fn is_in_mem(&self) -> $_ {
$$$
}
}'
Length of output: 192
Script:
#!/bin/bash
# Description: Search for the implementations of `get_message` and `is_in_mem` methods across the codebase.
# Search for `get_message` method in the codebase.
rg 'fn get_message'
# Search for `is_in_mem` method in the codebase.
rg 'fn is_in_mem'
Length of output: 1278
Script:
#!/bin/bash
# Extract and review the implementation of `get_message` method from commit_log.rs.
rg -A 10 -B 2 'fn get_message' rocketmq-store/src/log_file/commit_log.rs
# Extract and review the implementation of `is_in_mem` method from select_result.rs.
rg -A 10 -B 2 'fn is_in_mem' rocketmq-store/src/base/select_result.rs
Length of output: 1322
@@ -366,7 +366,7 @@ impl MappedFile for DefaultMappedFile { | |||
start_offset: self.file_from_offset + pos as u64, | |||
size, | |||
mapped_file: Some(self), | |||
is_in_cache: false, | |||
is_in_cache: true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Property is_in_cache
Set to Always true
The property is_in_cache
has been set to always return true
. This change could potentially improve the performance by reducing disk access times, but it may also lead to data consistency issues if not handled correctly across different scenarios.
Consider adding checks or conditions that ensure this property does not lead to stale data being served, especially in environments with high concurrency or frequent data updates.
- is_in_cache: true,
+ is_in_cache: self.check_cache_validity(),
Also applies to: 391-391
@@ -588,7 +588,7 @@ | |||
}*/ | |||
|
|||
fn is_loaded(&self, position: i64, size: usize) -> bool { | |||
todo!() | |||
true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Method is_loaded
Now Always Returns true
Setting is_loaded
to always return true
simplifies the interface but risks masking issues where data may not be properly loaded. This could lead to serious application errors.
It's recommended to implement a more robust checking mechanism that verifies the actual load status of the data.
- true
+ self.verify_data_integrity()
Committable suggestion was skipped due to low confidence.
pub fn is_in_mem(&self) -> bool { | ||
match self.mapped_file.as_ref() { | ||
None => true, | ||
Some(inner) => { | ||
let pos = self.start_offset - inner.get_file_from_offset(); | ||
inner.is_loaded(pos as i64, self.size as usize) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review the logic of is_in_mem
method.
The method is_in_mem
uses a match statement to determine if the mapped file is in memory based on the start_offset
. The logic seems correct, but the return value when mapped_file
is None
is set to true
, which might be incorrect as it assumes the buffer is in memory when there's no associated file. This could lead to potential bugs.
- None => true,
+ None => false,
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
pub fn is_in_mem(&self) -> bool { | |
match self.mapped_file.as_ref() { | |
None => true, | |
Some(inner) => { | |
let pos = self.start_offset - inner.get_file_from_offset(); | |
inner.is_loaded(pos as i64, self.size as usize) | |
} | |
} | |
} | |
pub fn is_in_mem(&self) -> bool { | |
match self.mapped_file.as_ref() { | |
None => false, | |
Some(inner) => { | |
let pos = self.start_offset - inner.get_file_from_offset(); | |
inner.is_loaded(pos as i64, self.size as usize) | |
} | |
} | |
} |
pub fn get(&self, address: i64, cq_ext_unit: &CqExtUnit) -> bool { | ||
unimplemented!() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Placeholder method get
in ConsumeQueueExt
.
The method get
is currently marked with unimplemented!
. This is a typical placeholder in Rust for functionality that is yet to be implemented. Ensure that this method is implemented before merging the PR or handling it properly to avoid runtime panics.
@@ -205,7 +205,7 @@ impl ConsumeQueueTrait for BatchConsumeQueue { | |||
todo!() | |||
} | |||
|
|||
fn get(&self, index: i64) -> CqUnit { | |||
fn get(&self, index: i64) -> Option<CqUnit> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Placeholder method get
in BatchConsumeQueue
.
The method get
is currently marked with unimplemented!
. This is a typical placeholder in Rust for functionality that is yet to be implemented. Ensure that this method is implemented before merging the PR or handling it properly to avoid runtime panics.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #697 +/- ##
==========================================
- Coverage 27.42% 27.26% -0.16%
==========================================
Files 255 255
Lines 19760 19868 +108
==========================================
- Hits 5419 5417 -2
- Misses 14341 14451 +110 ☔ View full report in Codecov by Sentry. |
Which Issue(s) This PR Fixes(Closes)
Fixes #696
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
Improvements
ConsumeQueueExt
with a newget
method.CqExtUnit
with public access methods.