Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1949]♻️PopMessageProcessor supports process_request handle-5🚀 #1981

Merged
merged 1 commit into from
Dec 31, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions rocketmq-broker/src/processor/pop_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,9 @@
let pop_time = get_current_millis();

let message_filter = message_filter.map(Arc::new);
let mut rest_num = 0;

Check warning on line 437 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L437

Added line #L437 was not covered by tests
if need_retry && !request_header.order.unwrap_or(false) {
if need_retry_v1 {
rest_num = if need_retry_v1 {

Check warning on line 439 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L439

Added line #L439 was not covered by tests
let retry_topic = CheetahString::from_string(KeyBuilder::build_pop_retry_topic_v1(
&request_header.topic,
&request_header.consumer_group,
Expand All @@ -454,7 +455,7 @@
&mut msg_offset_info,
&mut order_count_info,
randomq,
0,
rest_num,

Check warning on line 458 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L458

Added line #L458 was not covered by tests
)
.await
} else {
Expand All @@ -476,12 +477,13 @@
&mut msg_offset_info,
&mut order_count_info,
randomq,
0,
rest_num,

Check warning on line 480 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L480

Added line #L480 was not covered by tests
)
.await
};
}
let mut rest_num = if request_header.queue_id < 0 {
rest_num = if request_header.queue_id < 0 {

Check warning on line 485 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L485

Added line #L485 was not covered by tests
// read all queue
self.pop_msg_from_topic(
&topic_config,
false,
Expand All @@ -495,7 +497,7 @@
&mut msg_offset_info,
&mut order_count_info,
randomq,
0,
rest_num,

Check warning on line 500 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L500

Added line #L500 was not covered by tests
)
.await
} else {
Expand All @@ -512,10 +514,11 @@
&mut msg_offset_info,
&mut order_count_info,
randomq,
0,
rest_num,

Check warning on line 517 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L517

Added line #L517 was not covered by tests
)
.await
};
// if not full , fetch retry again
if !need_retry
&& get_message_result.message_mapped_list().len() < request_header.max_msg_nums as usize
&& !request_header.order.unwrap_or(false)
Expand Down Expand Up @@ -567,10 +570,10 @@
};
}
let mut final_response = RemotingCommand::create_response_command();
if get_message_result.message_mapped_list().is_empty() {
if !get_message_result.message_mapped_list().is_empty() {

Check warning on line 573 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L573

Added line #L573 was not covered by tests
get_message_result.set_status(Some(GetMessageStatus::Found));
if rest_num > 0 {
//
// all queue pop can not notify specified queue pop, and vice versa

Check warning on line 576 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L576

Added line #L576 was not covered by tests
self.pop_long_polling_service.notify_message_arriving(
&request_header.topic,
request_header.queue_id,
Expand Down Expand Up @@ -607,10 +610,9 @@
PollingResult::PollingFull => {
final_response.set_code_ref(ResponseCode::PollingFull);
}
PollingResult::PollingTimeout => {
_ => {

Check warning on line 613 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L613

Added line #L613 was not covered by tests
final_response.set_code_ref(ResponseCode::PollingTimeout);
}
PollingResult::NotPolling => {}
}
get_message_result.set_status(Some(GetMessageStatus::NoMessageInQueue));
}
Expand Down
Loading