diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index 902802a4..f957a56d 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -569,6 +569,9 @@ impl BrokerRuntime { self.broker_config.clone(), self.pop_inflight_message_counter.clone(), self.store_host, + Arc::new(self.consumer_offset_manager.clone()), + pop_message_processor.clone(), + self.consumer_order_info_manager.clone(), )); BrokerRequestProcessor { send_message_processor: ArcMut::new(send_message_processor), diff --git a/rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs b/rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs index bc34b281..3840ab27 100644 --- a/rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs +++ b/rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#![allow(unused_variables)] use std::collections::HashMap; use std::collections::HashSet; @@ -241,6 +242,28 @@ impl ConsumerOrderInfoManager { ) { unimplemented!("") } + + pub fn commit_and_next( + &self, + topic: &CheetahString, + group: &CheetahString, + queue_id: i32, + queue_offset: u64, + pop_time: u64, + ) -> i64 { + unimplemented!() + } + + pub fn check_block( + &self, + attempt_id: &CheetahString, + topic: &CheetahString, + group: &CheetahString, + queue_id: i32, + invisible_time: u64, + ) -> bool { + unimplemented!() + } } #[inline] diff --git a/rocketmq-broker/src/processor/ack_message_processor.rs b/rocketmq-broker/src/processor/ack_message_processor.rs index 6d3b65fb..0c08ddd3 100644 --- a/rocketmq-broker/src/processor/ack_message_processor.rs +++ b/rocketmq-broker/src/processor/ack_message_processor.rs @@ -16,6 +16,7 @@ */ #![allow(unused_variables)] +use std::cmp::Ordering; use std::net::SocketAddr; use std::sync::Arc; @@ -52,6 +53,8 @@ use tracing::error; use crate::broker_error::BrokerError::BrokerCommonError; use crate::broker_error::BrokerError::BrokerRemotingError; use crate::failover::escape_bridge::EscapeBridge; +use crate::offset::manager::consumer_offset_manager::ConsumerOffsetManager; +use crate::offset::manager::consumer_order_info_manager::ConsumerOrderInfoManager; use crate::processor::pop_inflight_message_counter::PopInflightMessageCounter; use crate::processor::pop_message_processor::PopMessageProcessor; use crate::processor::processor_service::pop_buffer_merge_service::PopBufferMergeService; @@ -64,6 +67,9 @@ pub struct AckMessageProcessor { escape_bridge: ArcMut>, store_host: SocketAddr, pop_inflight_message_counter: Arc, + consumer_offset_manager: Arc, + consumer_order_info_manager: Arc>, + pop_message_processor: ArcMut, } impl AckMessageProcessor @@ -77,6 +83,9 @@ where broker_config: Arc, pop_inflight_message_counter: Arc, store_host: SocketAddr, + consumer_offset_manager: Arc, + pop_message_processor: ArcMut, + consumer_order_info_manager: Arc>, ) -> AckMessageProcessor { AckMessageProcessor { topic_config_manager, @@ -86,6 +95,9 @@ where escape_bridge, store_host, pop_inflight_message_counter, + consumer_offset_manager, + pop_message_processor, + consumer_order_info_manager, } } @@ -260,7 +272,8 @@ where invisible_time, channel, response, - ); + ) + .await; return; } let ack = AckMsg::default(); @@ -324,7 +337,8 @@ where invisible_time, channel, response, - ); + ) + .await; } else { batch_ack_msg.ack_offset_list.push(offset); } @@ -429,7 +443,7 @@ where ); } - fn ack_orderly( + async fn ack_orderly( &mut self, topic: CheetahString, consume_group: CheetahString, @@ -440,6 +454,93 @@ where channel: &Channel, response: &mut RemotingCommand, ) { - unimplemented!("ack_orderly") + let lock_key = CheetahString::from_string(format!( + "{}{}{}{}{}", + &topic, + PopAckConstants::SPLIT, + &consume_group, + PopAckConstants::SPLIT, + q_id + )); + let old_offset = self + .consumer_offset_manager + .query_offset(&consume_group, &topic, q_id); + if old_offset > ack_offset { + return; + } + while !self + .pop_message_processor + .queue_lock_manager() + .try_lock_with_key(lock_key.clone()) + .await + {} + let old_offset = self + .consumer_offset_manager + .query_offset(&consume_group, &topic, q_id); + if old_offset > ack_offset { + return; + } + let next_offset = self.consumer_order_info_manager.commit_and_next( + &consume_group, + &topic, + q_id, + ack_offset as u64, + pop_time as u64, + ); + match next_offset.cmp(&-1) { + Ordering::Less => {} + Ordering::Equal => { + let error_info = format!( + "offset is illegal, key:{}, old:{}, commit:{}, next:{}, {}", + lock_key, + old_offset, + ack_offset, + next_offset, + channel.remote_address() + ); + response.set_code_ref(ResponseCode::MessageIllegal); + response.set_remark_mut(error_info); + self.pop_message_processor + .queue_lock_manager() + .unlock_with_key(lock_key) + .await; + return; + } + Ordering::Greater => { + if !self.consumer_offset_manager.has_offset_reset( + consume_group.as_str(), + topic.as_str(), + q_id, + ) { + self.consumer_offset_manager.commit_offset( + channel.remote_address(), + &consume_group, + &topic, + q_id, + next_offset, + ); + } + + if !self.consumer_order_info_manager.check_block( + &CheetahString::empty(), + &consume_group, + &topic, + q_id, + invisible_time as u64, + ) { + self.pop_message_processor.notify_message_arriving( + &topic, + q_id, + &consume_group, + ); + } + } + } + self.pop_message_processor + .queue_lock_manager() + .unlock_with_key(lock_key) + .await; + self.pop_inflight_message_counter + .decrement_in_flight_message_num(&topic, &consume_group, pop_time, q_id, 1); } } diff --git a/rocketmq-broker/src/processor/pop_message_processor.rs b/rocketmq-broker/src/processor/pop_message_processor.rs index 109b2ce2..b1c5c9f1 100644 --- a/rocketmq-broker/src/processor/pop_message_processor.rs +++ b/rocketmq-broker/src/processor/pop_message_processor.rs @@ -50,6 +50,19 @@ impl PopMessageProcessor { pub fn queue_lock_manager(&self) -> &QueueLockManager { unimplemented!("PopMessageProcessor QueueLockManager") } + + pub fn notify_message_arriving( + &self, + topic: &CheetahString, + queue_id: i32, + cid: &CheetahString, + ) { + info!( + "notifyMessageArriving topic={} queueId={} cid={}", + topic, queue_id, cid + ); + unimplemented!("PopMessageProcessor notify_message_arriving") + } } impl PopMessageProcessor {