diff --git a/rocketmq-broker/src/processor/pop_message_processor.rs b/rocketmq-broker/src/processor/pop_message_processor.rs index 6e3e3b3c..3c3e652a 100644 --- a/rocketmq-broker/src/processor/pop_message_processor.rs +++ b/rocketmq-broker/src/processor/pop_message_processor.rs @@ -434,8 +434,9 @@ where let pop_time = get_current_millis(); let message_filter = message_filter.map(Arc::new); + let mut rest_num = 0; if need_retry && !request_header.order.unwrap_or(false) { - if need_retry_v1 { + rest_num = if need_retry_v1 { let retry_topic = CheetahString::from_string(KeyBuilder::build_pop_retry_topic_v1( &request_header.topic, &request_header.consumer_group, @@ -454,7 +455,7 @@ where &mut msg_offset_info, &mut order_count_info, randomq, - 0, + rest_num, ) .await } else { @@ -476,12 +477,13 @@ where &mut msg_offset_info, &mut order_count_info, randomq, - 0, + rest_num, ) .await }; } - let mut rest_num = if request_header.queue_id < 0 { + rest_num = if request_header.queue_id < 0 { + // read all queue self.pop_msg_from_topic( &topic_config, false, @@ -495,7 +497,7 @@ where &mut msg_offset_info, &mut order_count_info, randomq, - 0, + rest_num, ) .await } else { @@ -512,10 +514,11 @@ where &mut msg_offset_info, &mut order_count_info, randomq, - 0, + rest_num, ) .await }; + // if not full , fetch retry again if !need_retry && get_message_result.message_mapped_list().len() < request_header.max_msg_nums as usize && !request_header.order.unwrap_or(false) @@ -567,10 +570,10 @@ where }; } let mut final_response = RemotingCommand::create_response_command(); - if get_message_result.message_mapped_list().is_empty() { + if !get_message_result.message_mapped_list().is_empty() { get_message_result.set_status(Some(GetMessageStatus::Found)); if rest_num > 0 { - // + // all queue pop can not notify specified queue pop, and vice versa self.pop_long_polling_service.notify_message_arriving( &request_header.topic, request_header.queue_id, @@ -607,10 +610,9 @@ where PollingResult::PollingFull => { final_response.set_code_ref(ResponseCode::PollingFull); } - PollingResult::PollingTimeout => { + _ => { final_response.set_code_ref(ResponseCode::PollingTimeout); } - PollingResult::NotPolling => {} } get_message_result.set_status(Some(GetMessageStatus::NoMessageInQueue)); }