-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #2052]💫Implement PopBufferMergeService#putAckToStore🚀 #2053
Conversation
WalkthroughThe pull request modifies the Changes
Assessment against linked issues
Possibly related issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2053 +/- ##
==========================================
- Coverage 28.42% 28.40% -0.02%
==========================================
Files 488 488
Lines 69129 69170 +41
==========================================
Hits 19651 19651
- Misses 49478 49519 +41 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs (1)
331-344
: Consider parallelizing or adding error handling for single ACK storage.
Currently, the loop callsput_ack_to_store
sequentially and does not log or retry on failure. For large numbers of acks, this may be inefficient, and failures will be silently ignored.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs
(3 hunks)
🔇 Additional comments (7)
rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs (7)
27-27
: Import ofbytes::Bytes
looks good.
No issues found. This import is properly utilized at line 511 for setting the message body.
33-33
: Import ofmessage_decoder
is appropriate.
This import is used later to serialize message properties inmessage_properties_to_string
.
34-34
: Import ofMessageExtBrokerInner
is correct.
It is crucial for constructing broker-specific message structures, as utilized input_ack_to_store
.
35-35
: Import ofMessageConst
is used correctly for setting message properties.
No issues observed; it helps to identify the unique client message ID key.
36-36
: Import ofMessageTrait
is consistent with the overall message-building logic.
Looks good.
40-40
: Import ofRemotingSerializable
is well-aligned with message serialization.
Used inack_msg.to_json()
, enabling JSON serialization for ack messages.
44-44
: Import ofAckMsg
is correct for creating single acknowledgment messages.
This is essential for acknowledgment logic input_ack_to_store
.
async fn put_ack_to_store(&self, point_wrapper: &PopCheckPointWrapper, index: u8) -> bool { | ||
let point = point_wrapper.get_ck(); | ||
let ack_msg = AckMsg { | ||
ack_offset: point.ack_offset_by_index(index), | ||
start_offset: point.start_offset, | ||
consumer_group: point.cid.clone(), | ||
topic: point.topic.clone(), | ||
queue_id: point.queue_id, | ||
pop_time: point.pop_time, | ||
..Default::default() | ||
}; | ||
let mut msg = MessageExtBrokerInner::default(); | ||
msg.set_topic(self.revive_topic.clone()); | ||
msg.set_body(Bytes::from(ack_msg.to_json().unwrap())); | ||
msg.message_ext_inner.queue_id = point_wrapper.revive_queue_id; | ||
msg.set_tags(CheetahString::from_static_str(PopAckConstants::ACK_TAG)); | ||
msg.message_ext_inner.born_timestamp = get_current_millis() as i64; | ||
msg.message_ext_inner.born_host = self.store_host; | ||
msg.message_ext_inner.store_host = self.store_host; | ||
msg.set_delay_time_ms(point.get_revive_time() as u64); | ||
msg.put_property( | ||
CheetahString::from_static_str(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX), | ||
CheetahString::from_string(PopMessageProcessor::<MS>::gen_ack_unique_id(&ack_msg)), | ||
); | ||
msg.properties_string = message_decoder::message_properties_to_string(msg.get_properties()); | ||
|
||
let put_message_result = self | ||
.escape_bridge | ||
.mut_from_ref() | ||
.put_message_to_specific_queue(msg) | ||
.await; | ||
matches!( | ||
put_message_result.put_message_status(), | ||
PutMessageStatus::PutOk | ||
| PutMessageStatus::FlushDiskTimeout | ||
| PutMessageStatus::FlushSlaveTimeout | ||
| PutMessageStatus::SlaveNotAvailable | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Remove risky .unwrap()
and handle failures when putting ACK messages.
Relying on ack_msg.to_json().unwrap()
can cause panics if serialization fails. Also, no error is logged if the store operation fails. As a result, troubleshooting is harder. Consider logging or applying retry logic for improved reliability.
Below is an example refactor applying safer JSON handling and error logging:
- msg.set_body(Bytes::from(ack_msg.to_json().unwrap()));
+ let ack_msg_json = match ack_msg.to_json() {
+ Ok(json) => json,
+ Err(e) => {
+ error!("AckMsg to_json error: {:?}", e);
+ return false;
+ }
+ };
+ msg.set_body(Bytes::from(ack_msg_json));
- matches!(
- put_message_result.put_message_status(),
- PutMessageStatus::PutOk
- | PutMessageStatus::FlushDiskTimeout
- | PutMessageStatus::FlushSlaveTimeout
- | PutMessageStatus::SlaveNotAvailable
- )
+ let success = matches!(
+ put_message_result.put_message_status(),
+ PutMessageStatus::PutOk
+ | PutMessageStatus::FlushDiskTimeout
+ | PutMessageStatus::FlushSlaveTimeout
+ | PutMessageStatus::SlaveNotAvailable
+ );
+ if !success {
+ error!(
+ "put_message_to_specific_queue failed with status: {:?}",
+ put_message_result.put_message_status()
+ );
+ }
+ success
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async fn put_ack_to_store(&self, point_wrapper: &PopCheckPointWrapper, index: u8) -> bool { | |
let point = point_wrapper.get_ck(); | |
let ack_msg = AckMsg { | |
ack_offset: point.ack_offset_by_index(index), | |
start_offset: point.start_offset, | |
consumer_group: point.cid.clone(), | |
topic: point.topic.clone(), | |
queue_id: point.queue_id, | |
pop_time: point.pop_time, | |
..Default::default() | |
}; | |
let mut msg = MessageExtBrokerInner::default(); | |
msg.set_topic(self.revive_topic.clone()); | |
msg.set_body(Bytes::from(ack_msg.to_json().unwrap())); | |
msg.message_ext_inner.queue_id = point_wrapper.revive_queue_id; | |
msg.set_tags(CheetahString::from_static_str(PopAckConstants::ACK_TAG)); | |
msg.message_ext_inner.born_timestamp = get_current_millis() as i64; | |
msg.message_ext_inner.born_host = self.store_host; | |
msg.message_ext_inner.store_host = self.store_host; | |
msg.set_delay_time_ms(point.get_revive_time() as u64); | |
msg.put_property( | |
CheetahString::from_static_str(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX), | |
CheetahString::from_string(PopMessageProcessor::<MS>::gen_ack_unique_id(&ack_msg)), | |
); | |
msg.properties_string = message_decoder::message_properties_to_string(msg.get_properties()); | |
let put_message_result = self | |
.escape_bridge | |
.mut_from_ref() | |
.put_message_to_specific_queue(msg) | |
.await; | |
matches!( | |
put_message_result.put_message_status(), | |
PutMessageStatus::PutOk | |
| PutMessageStatus::FlushDiskTimeout | |
| PutMessageStatus::FlushSlaveTimeout | |
| PutMessageStatus::SlaveNotAvailable | |
) | |
async fn put_ack_to_store(&self, point_wrapper: &PopCheckPointWrapper, index: u8) -> bool { | |
let point = point_wrapper.get_ck(); | |
let ack_msg = AckMsg { | |
ack_offset: point.ack_offset_by_index(index), | |
start_offset: point.start_offset, | |
consumer_group: point.cid.clone(), | |
topic: point.topic.clone(), | |
queue_id: point.queue_id, | |
pop_time: point.pop_time, | |
..Default::default() | |
}; | |
let mut msg = MessageExtBrokerInner::default(); | |
msg.set_topic(self.revive_topic.clone()); | |
let ack_msg_json = match ack_msg.to_json() { | |
Ok(json) => json, | |
Err(e) => { | |
error!("AckMsg to_json error: {:?}", e); | |
return false; | |
} | |
}; | |
msg.set_body(Bytes::from(ack_msg_json)); | |
msg.message_ext_inner.queue_id = point_wrapper.revive_queue_id; | |
msg.set_tags(CheetahString::from_static_str(PopAckConstants::ACK_TAG)); | |
msg.message_ext_inner.born_timestamp = get_current_millis() as i64; | |
msg.message_ext_inner.born_host = self.store_host; | |
msg.message_ext_inner.store_host = self.store_host; | |
msg.set_delay_time_ms(point.get_revive_time() as u64); | |
msg.put_property( | |
CheetahString::from_static_str(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX), | |
CheetahString::from_string(PopMessageProcessor::<MS>::gen_ack_unique_id(&ack_msg)), | |
); | |
msg.properties_string = message_decoder::message_properties_to_string(msg.get_properties()); | |
let put_message_result = self | |
.escape_bridge | |
.mut_from_ref() | |
.put_message_to_specific_queue(msg) | |
.await; | |
let success = matches!( | |
put_message_result.put_message_status(), | |
PutMessageStatus::PutOk | |
| PutMessageStatus::FlushDiskTimeout | |
| PutMessageStatus::FlushSlaveTimeout | |
| PutMessageStatus::SlaveNotAvailable | |
); | |
if !success { | |
error!( | |
"put_message_to_specific_queue failed with status: {:?}", | |
put_message_result.put_message_status() | |
); | |
} | |
success |
Which Issue(s) This PR Fixes(Closes)
Fixes #2052
Brief Description
How Did You Test This Change?
Summary by CodeRabbit