Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1812] ⚡️ Optimize PullResult msg_found_list with Option #1861

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,13 @@
next_begin_offset: get_message_result.next_begin_offset() as u64,
min_offset: get_message_result.min_offset() as u64,
max_offset: get_message_result.max_offset() as u64,
msg_found_list: msg_found_list
.unwrap_or_default()
.into_iter()
.map(ArcMut::new)
.collect(),
msg_found_list: Some(
msg_found_list
.unwrap_or_default()
.into_iter()
.map(ArcMut::new)
.collect(),
),

Check warning on line 221 in rocketmq-broker/src/transaction/queue/transactional_message_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/transaction/queue/transactional_message_bridge.rs#L215-L221

Added lines #L215 - L221 were not covered by tests
})
} else {
error!(
Expand Down
11 changes: 6 additions & 5 deletions rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,12 @@
msg.message_ext_inner.queue_offset += offset_delta;
}
}

pull_result_ext.pull_result.msg_found_list = msg_list_filter_again
.into_iter()
.map(ArcMut::new)
.collect::<Vec<_>>();
pull_result_ext.pull_result.msg_found_list = Some(
msg_list_filter_again
.into_iter()
.map(ArcMut::new)
.collect::<Vec<_>>(),
);

Check warning on line 208 in rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs#L203-L208

Added lines #L203 - L208 were not covered by tests
}
}

Expand Down
21 changes: 17 additions & 4 deletions rocketmq-client/src/consumer/pull_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,26 +77,39 @@
/*let pull_rt = get_current_millis() - begin_timestamp.elapsed().as_millis() as u64;
self.client_instance.as_mut().unwrap().*/
let mut first_msg_offset = i64::MAX;
if pull_result_ext.pull_result.msg_found_list.is_empty() {
if pull_result_ext
.pull_result
.msg_found_list
.as_ref()
.map_or(true, |v| v.is_empty())

Check warning on line 84 in rocketmq-client/src/consumer/pull_callback.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/pull_callback.rs#L80-L84

Added lines #L80 - L84 were not covered by tests
{
push_consumer_impl
.execute_pull_request_immediately(pull_request)
.await;
} else {
first_msg_offset = pull_result_ext
.pull_result
.msg_found_list()
.msg_found_list
.as_ref()
.unwrap()

Check warning on line 94 in rocketmq-client/src/consumer/pull_callback.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/pull_callback.rs#L92-L94

Added lines #L92 - L94 were not covered by tests
Comment on lines +92 to +94
Copy link
Contributor

@coderabbitai coderabbitai bot Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add unwrap_or_default to handle None case safely

The unwrap call could panic if msg_found_list is None. Consider using unwrap_or_default:

-        .as_ref()
-        .unwrap()
+        .as_ref()
+        .unwrap_or_default()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
.msg_found_list
.as_ref()
.unwrap()
.msg_found_list
.as_ref()
.unwrap_or_default()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not correct

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@XmchxUp, understood. I'll dismiss that suggestion. If you need any further assistance or refinements, please let me know!

.first()
.unwrap()
.message_ext_inner
.queue_offset;
let vec = pull_result_ext.pull_result.msg_found_list.clone();
let dispatch_to_consume = pull_request.process_queue.put_message(vec).await;
let dispatch_to_consume = pull_request
.process_queue
.put_message(vec.unwrap_or_default())
.await;

Check warning on line 103 in rocketmq-client/src/consumer/pull_callback.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/pull_callback.rs#L100-L103

Added lines #L100 - L103 were not covered by tests
push_consumer_impl
.consume_message_service
.as_mut()
.unwrap()
.submit_consume_request(
pull_result_ext.pull_result.msg_found_list,
pull_result_ext
.pull_result
.msg_found_list
.unwrap_or_default(),

Check warning on line 112 in rocketmq-client/src/consumer/pull_callback.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/pull_callback.rs#L109-L112

Added lines #L109 - L112 were not covered by tests
pull_request.get_process_queue().clone(),
pull_request.get_message_queue().clone(),
dispatch_to_consume,
Expand Down
10 changes: 5 additions & 5 deletions rocketmq-client/src/consumer/pull_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
pub next_begin_offset: u64,
pub min_offset: u64,
pub max_offset: u64,
pub msg_found_list: Vec<ArcMut<MessageClientExt>>,
pub msg_found_list: Option<Vec<ArcMut<MessageClientExt>>>,
}

impl PullResult {
Expand All @@ -33,7 +33,7 @@
next_begin_offset: u64,
min_offset: u64,
max_offset: u64,
msg_found_list: Vec<ArcMut<MessageClientExt>>,
msg_found_list: Option<Vec<ArcMut<MessageClientExt>>>,

Check warning on line 36 in rocketmq-client/src/consumer/pull_result.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/pull_result.rs#L36

Added line #L36 was not covered by tests
) -> Self {
Self {
pull_status,
Expand All @@ -60,11 +60,11 @@
self.max_offset
}

pub fn msg_found_list(&self) -> &Vec<ArcMut<MessageClientExt>> {
pub fn msg_found_list(&self) -> &Option<Vec<ArcMut<MessageClientExt>>> {

Check warning on line 63 in rocketmq-client/src/consumer/pull_result.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/pull_result.rs#L63

Added line #L63 was not covered by tests
&self.msg_found_list
}

pub fn set_msg_found_list(&mut self, msg_found_list: Vec<ArcMut<MessageClientExt>>) {
pub fn set_msg_found_list(&mut self, msg_found_list: Option<Vec<ArcMut<MessageClientExt>>>) {

Check warning on line 67 in rocketmq-client/src/consumer/pull_result.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/pull_result.rs#L67

Added line #L67 was not covered by tests
self.msg_found_list = msg_found_list;
}
}
Expand All @@ -79,7 +79,7 @@
self.next_begin_offset,
self.min_offset,
self.max_offset,
self.msg_found_list.len()
self.msg_found_list.as_ref().map_or(0, |v| v.len()),

Check warning on line 82 in rocketmq-client/src/consumer/pull_result.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/pull_result.rs#L82

Added line #L82 was not covered by tests
)
}
}
2 changes: 1 addition & 1 deletion rocketmq-client/src/implementation/mq_client_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@
next_begin_offset: response_header.next_begin_offset as u64,
min_offset: response_header.min_offset as u64,
max_offset: response_header.max_offset as u64,
msg_found_list: vec![],
msg_found_list: Some(vec![]),

Check warning on line 1006 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1006

Added line #L1006 was not covered by tests
},
suggest_which_broker_id: response_header.suggest_which_broker_id,
message_binary: response.take_body(),
Expand Down
Loading