-
Notifications
You must be signed in to change notification settings - Fork 112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #1996]🍻Implement PopMessageProcessor#reset_pop_offset method logic🚀 #1997
Conversation
Caution Review failedThe pull request is closed. WalkthroughThis pull request introduces modifications to the RocketMQ broker's message processing components, focusing on enhancing offset management and message processing capabilities. The changes include adding new methods to Changes
Assessment against linked issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
📜 Recent review detailsConfiguration used: .coderabbit.yaml 📒 Files selected for processing (1)
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1997 +/- ##
==========================================
- Coverage 28.29% 28.27% -0.02%
==========================================
Files 487 487
Lines 68438 68472 +34
==========================================
Hits 19363 19363
- Misses 49075 49109 +34 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs
(1 hunks)rocketmq-broker/src/processor/pop_message_processor.rs
(1 hunks)rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs
(1 hunks)
); | ||
} | ||
offset | ||
} | ||
|
||
fn reset_pop_offset(&self, topic: &str, group: &str, queue_id: i32) -> Option<i64> { | ||
unimplemented!("PopMessageProcessor reset_pop_offset") | ||
fn reset_pop_offset( | ||
&self, | ||
topic: &CheetahString, | ||
group: &CheetahString, | ||
queue_id: i32, | ||
) -> Option<i64> { | ||
let lock_key = format!( | ||
"{}{}{}{}{}", | ||
topic, | ||
PopAckConstants::SPLIT, | ||
group, | ||
PopAckConstants::SPLIT, | ||
queue_id | ||
); | ||
let reset_offset = self | ||
.consumer_offset_manager | ||
.query_then_erase_reset_offset(group, topic, queue_id); | ||
if reset_offset.is_some() { | ||
self.consumer_order_info_manager | ||
.clear_block(topic, group, queue_id); | ||
self.pop_buffer_merge_service | ||
.clear_offset_queue(lock_key.as_ref()); | ||
self.consumer_offset_manager.commit_offset( | ||
"ResetPopOffset".into(), | ||
group, | ||
topic, | ||
queue_id, | ||
*reset_offset.as_ref().unwrap(), | ||
) | ||
} | ||
reset_offset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider handling unimplemented calls within reset_pop_offset
to avoid runtime panics
This method unconditionally calls clear_block
and clear_offset_queue
, which are both unimplemented and will cause a panic!
if invoked at runtime. Until these methods are implemented, consider returning an error or logging a warning instead, to avoid unexpected crashes in production.
Here's a sample diff to gate these calls with a TODO warning:
fn reset_pop_offset(
&self,
topic: &CheetahString,
group: &CheetahString,
queue_id: i32,
) -> Option<i64> {
let lock_key = format!(
"{}{}{}{}{}",
topic,
PopAckConstants::SPLIT,
group,
PopAckConstants::SPLIT,
queue_id
);
let reset_offset = self
.consumer_offset_manager
.query_then_erase_reset_offset(group, topic, queue_id);
if reset_offset.is_some() {
- self.consumer_order_info_manager.clear_block(topic, group, queue_id);
- self.pop_buffer_merge_service.clear_offset_queue(lock_key.as_ref());
+ // TODO: Implement these calls or handle them gracefully to avoid runtime panic.
+ // self.consumer_order_info_manager.clear_block(topic, group, queue_id);
+ // self.pop_buffer_merge_service.clear_offset_queue(lock_key.as_ref());
+ tracing::warn!(
+ "clear_block() and clear_offset_queue() are not yet implemented; skipping to prevent panic"
+ );
self.consumer_offset_manager.commit_offset(
"ResetPopOffset".into(),
group,
topic,
queue_id,
*reset_offset.as_ref().unwrap(),
)
}
reset_offset
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
); | |
} | |
offset | |
} | |
fn reset_pop_offset(&self, topic: &str, group: &str, queue_id: i32) -> Option<i64> { | |
unimplemented!("PopMessageProcessor reset_pop_offset") | |
fn reset_pop_offset( | |
&self, | |
topic: &CheetahString, | |
group: &CheetahString, | |
queue_id: i32, | |
) -> Option<i64> { | |
let lock_key = format!( | |
"{}{}{}{}{}", | |
topic, | |
PopAckConstants::SPLIT, | |
group, | |
PopAckConstants::SPLIT, | |
queue_id | |
); | |
let reset_offset = self | |
.consumer_offset_manager | |
.query_then_erase_reset_offset(group, topic, queue_id); | |
if reset_offset.is_some() { | |
self.consumer_order_info_manager | |
.clear_block(topic, group, queue_id); | |
self.pop_buffer_merge_service | |
.clear_offset_queue(lock_key.as_ref()); | |
self.consumer_offset_manager.commit_offset( | |
"ResetPopOffset".into(), | |
group, | |
topic, | |
queue_id, | |
*reset_offset.as_ref().unwrap(), | |
) | |
} | |
reset_offset | |
fn reset_pop_offset( | |
&self, | |
topic: &CheetahString, | |
group: &CheetahString, | |
queue_id: i32, | |
) -> Option<i64> { | |
let lock_key = format!( | |
"{}{}{}{}{}", | |
topic, | |
PopAckConstants::SPLIT, | |
group, | |
PopAckConstants::SPLIT, | |
queue_id | |
); | |
let reset_offset = self | |
.consumer_offset_manager | |
.query_then_erase_reset_offset(group, topic, queue_id); | |
if reset_offset.is_some() { | |
// TODO: Implement these calls or handle them gracefully to avoid runtime panic. | |
// self.consumer_order_info_manager.clear_block(topic, group, queue_id); | |
// self.pop_buffer_merge_service.clear_offset_queue(lock_key.as_ref()); | |
tracing::warn!( | |
"clear_block() and clear_offset_queue() are not yet implemented; skipping to prevent panic" | |
); | |
self.consumer_offset_manager.commit_offset( | |
"ResetPopOffset".into(), | |
group, | |
topic, | |
queue_id, | |
*reset_offset.as_ref().unwrap(), | |
) | |
} | |
reset_offset |
pub fn clear_offset_queue(&self, _lock_key: &str) { | ||
unimplemented!("Not implemented yet"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Provide an implementation or safe fallback for clear_offset_queue
Currently, this function is unimplemented, which will lead to a panic!
if called. Consider providing an actual implementation or handling it gracefully until the logic is available, so that callers do not trigger a runtime error.
pub fn clear_offset_queue(&self, _lock_key: &str) {
- unimplemented!("Not implemented yet");
+ // TODO: Implement the cleanup logic for offset queues or handle unimplemented case gracefully.
+ tracing::warn!("clear_offset_queue() is not implemented yet");
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
pub fn clear_offset_queue(&self, _lock_key: &str) { | |
unimplemented!("Not implemented yet"); | |
} | |
pub fn clear_offset_queue(&self, _lock_key: &str) { | |
// TODO: Implement the cleanup logic for offset queues or handle unimplemented case gracefully. | |
tracing::warn!("clear_offset_queue() is not implemented yet"); | |
} |
pub fn clear_block(&self, topic: &CheetahString, group: &CheetahString, queue_id: i32) { | ||
unimplemented!() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Implement or remove the clear_block
method to prevent panics
Since clear_block
is unimplemented, any invocation will panic. If you intend to keep this method signature, consider adding a real implementation or returning a meaningful error until the feature is fully supported.
pub fn clear_block(&self, topic: &CheetahString, group: &CheetahString, queue_id: i32) {
- unimplemented!()
+ // TODO: Add logic to clear block state from the consumer order info, or remove this method
+ tracing::warn!("clear_block() is not implemented yet");
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
pub fn clear_block(&self, topic: &CheetahString, group: &CheetahString, queue_id: i32) { | |
unimplemented!() | |
} | |
pub fn clear_block(&self, topic: &CheetahString, group: &CheetahString, queue_id: i32) { | |
// TODO: Add logic to clear block state from the consumer order info, or remove this method | |
tracing::warn!("clear_block() is not implemented yet"); | |
} |
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
Which Issue(s) This PR Fixes(Closes)
Fixes #1996
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
clear_block
method toConsumerOrderInfoManager
clear_offset_queue
method inPopBufferMergeService
Improvements
reset_pop_offset
andget_init_offset
method signatures inPopMessageProcessor