diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index df1f323b..d4b59c4f 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -74,6 +74,7 @@ use crate::processor::client_manage_processor::ClientManageProcessor; use crate::processor::consumer_manage_processor::ConsumerManageProcessor; use crate::processor::default_pull_message_result_handler::DefaultPullMessageResultHandler; use crate::processor::end_transaction_processor::EndTransactionProcessor; +use crate::processor::pop_inflight_message_counter::PopInflightMessageCounter; use crate::processor::pop_message_processor::PopMessageProcessor; use crate::processor::processor_service::pop_buffer_merge_service::PopBufferMergeService; use crate::processor::pull_message_processor::PullMessageProcessor; @@ -144,6 +145,7 @@ pub(crate) struct BrokerRuntime { topic_route_info_manager: Arc, #[cfg(feature = "local_file_store")] escape_bridge: ArcMut>, + pop_inflight_message_counter: Arc, } impl Clone for BrokerRuntime { @@ -185,6 +187,7 @@ impl Clone for BrokerRuntime { transaction_metrics_flush_service: None, topic_route_info_manager: self.topic_route_info_manager.clone(), escape_bridge: self.escape_bridge.clone(), + pop_inflight_message_counter: self.pop_inflight_message_counter.clone(), } } } @@ -255,6 +258,9 @@ impl BrokerRuntime { Arc::new(topic_config_manager.clone()), subscription_group_manager.clone(), )); + let should_start_time = Arc::new(AtomicU64::new(0)); + let pop_inflight_message_counter = + Arc::new(PopInflightMessageCounter::new(should_start_time.clone())); Self { store_host, broker_config: broker_config.clone(), @@ -281,7 +287,7 @@ impl BrokerRuntime { broker_stats_manager, topic_queue_mapping_clean_service: None, update_master_haserver_addr_periodically: false, - should_start_time: Arc::new(AtomicU64::new(0)), + should_start_time, is_isolated: Arc::new(AtomicBool::new(false)), pull_request_hold_service: None, rebalance_lock_manager: Arc::new(Default::default()), @@ -292,6 +298,7 @@ impl BrokerRuntime { transaction_metrics_flush_service: None, topic_route_info_manager, escape_bridge, + pop_inflight_message_counter, } } @@ -550,6 +557,7 @@ impl BrokerRuntime { self.broker_stats_manager.clone(), self.rebalance_lock_manager.clone(), self.broker_member_group.clone(), + self.pop_inflight_message_counter.clone(), ); let pop_message_processor = ArcMut::new(PopMessageProcessor::default()); let ack_message_processor = ArcMut::new(AckMessageProcessor::new( @@ -557,6 +565,7 @@ impl BrokerRuntime { self.message_store.as_ref().unwrap().clone(), self.escape_bridge.clone(), self.broker_config.clone(), + self.pop_inflight_message_counter.clone(), )); BrokerRequestProcessor { send_message_processor: ArcMut::new(send_message_processor), diff --git a/rocketmq-broker/src/processor/ack_message_processor.rs b/rocketmq-broker/src/processor/ack_message_processor.rs index db9fc8cf..6d75037f 100644 --- a/rocketmq-broker/src/processor/ack_message_processor.rs +++ b/rocketmq-broker/src/processor/ack_message_processor.rs @@ -52,6 +52,7 @@ use tracing::error; use crate::broker_error::BrokerError::BrokerCommonError; use crate::broker_error::BrokerError::BrokerRemotingError; use crate::failover::escape_bridge::EscapeBridge; +use crate::processor::pop_inflight_message_counter::PopInflightMessageCounter; use crate::processor::pop_message_processor::PopMessageProcessor; use crate::processor::processor_service::pop_buffer_merge_service::PopBufferMergeService; use crate::topic::manager::topic_config_manager::TopicConfigManager; @@ -62,6 +63,7 @@ pub struct AckMessageProcessor { pop_buffer_merge_service: ArcMut, escape_bridge: ArcMut>, store_host: SocketAddr, + pop_inflight_message_counter: Arc, } impl AckMessageProcessor @@ -73,6 +75,7 @@ where message_store: ArcMut, escape_bridge: ArcMut>, broker_config: Arc, + pop_inflight_message_counter: Arc, ) -> AckMessageProcessor { let store_host = format!("{}:{}", broker_config.broker_ip1, broker_config.listen_port) .parse::() @@ -84,6 +87,7 @@ where pop_buffer_merge_service: ArcMut::new(PopBufferMergeService), escape_bridge, store_host, + pop_inflight_message_counter, } } @@ -353,7 +357,7 @@ where //this.brokerController.getBrokerStatsManager().incBrokerAckNums(ackCount); //this.brokerController.getBrokerStatsManager().incGroupAckNums(consumeGroup,topic, // ackCount); - ack_msg.set_consumer_group(consume_group); + ack_msg.set_consumer_group(consume_group.clone()); ack_msg.set_topic(topic.clone()); ack_msg.set_queue_id(qid); ack_msg.set_start_offset(start_offset); @@ -367,7 +371,7 @@ where return; } let mut inner = MessageExtBrokerInner::default(); - inner.set_topic(topic); + inner.set_topic(topic.clone()); inner.message_ext_inner.queue_id = qid; if let Some(batch_ack) = ack_msg.as_any().downcast_ref::() { inner.set_body(Bytes::from(batch_ack.encode().unwrap())); @@ -417,6 +421,14 @@ where ); } } + self.pop_inflight_message_counter + .decrement_in_flight_message_num( + &topic, + &consume_group, + pop_time, + qid, + ack_count as i64, + ); } fn ack_orderly( diff --git a/rocketmq-broker/src/processor/admin_broker_processor.rs b/rocketmq-broker/src/processor/admin_broker_processor.rs index 775add19..637a0ed5 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor.rs @@ -75,6 +75,7 @@ impl AdminBrokerProcessor { broker_stats_manager: Arc, rebalance_lock_manager: Arc, broker_member_group: Arc, + pop_inflight_message_counter: Arc, ) -> Self { let inner = Inner { broker_config, @@ -84,7 +85,7 @@ impl AdminBrokerProcessor { consumer_offset_manager, topic_queue_mapping_manager, default_message_store, - pop_inflight_message_counter: Arc::new(PopInflightMessageCounter), + pop_inflight_message_counter, schedule_message_service, broker_stats, consume_manager, diff --git a/rocketmq-broker/src/processor/pop_inflight_message_counter.rs b/rocketmq-broker/src/processor/pop_inflight_message_counter.rs index 36e92e6b..ac81bdeb 100644 --- a/rocketmq-broker/src/processor/pop_inflight_message_counter.rs +++ b/rocketmq-broker/src/processor/pop_inflight_message_counter.rs @@ -14,12 +14,291 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use std::collections::HashMap; +use std::sync::atomic::AtomicI64; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use std::sync::Arc; + use cheetah_string::CheetahString; +use parking_lot::Mutex; +use rocketmq_store::pop::pop_check_point::PopCheckPoint; +use tracing::info; + +type PopInflightMessageCounterMap = + Arc>>>>; -pub struct PopInflightMessageCounter; +pub(crate) struct PopInflightMessageCounter { + should_start_time: Arc, + topic_in_flight_message_num: PopInflightMessageCounterMap, +} impl PopInflightMessageCounter { - pub fn clear_in_flight_message_num_by_topic_name(&self, _topic: &CheetahString) { - // TODO + const TOPIC_GROUP_SEPARATOR: &'static str = "@"; + + pub fn new(should_start_time: Arc) -> Self { + PopInflightMessageCounter { + should_start_time, + topic_in_flight_message_num: Arc::new(Mutex::new(HashMap::new())), + } + } + + pub fn increment_in_flight_message_num( + &self, + topic: &CheetahString, + group: &CheetahString, + queue_id: i32, + num: i64, + ) { + if num <= 0 { + return; + } + let key = Self::build_key(topic, group); + let mut map = self.topic_in_flight_message_num.lock(); + map.entry(key) + .or_default() + .entry(queue_id) + .or_insert_with(|| Arc::new(AtomicI64::new(0))) + .fetch_add(num, Ordering::SeqCst); + } + + pub fn decrement_in_flight_message_num( + &self, + topic: &CheetahString, + group: &CheetahString, + pop_time: i64, + queue_id: i32, + delta: i64, + ) { + if pop_time < self.should_start_time.load(Ordering::SeqCst) as i64 { + return; + } + self.decrement_in_flight_message_num_internal(topic, group, queue_id, delta); + } + + pub fn decrement_in_flight_message_num_checkpoint(&self, check_point: &PopCheckPoint) { + if check_point.pop_time < self.should_start_time.load(Ordering::SeqCst) as i64 { + return; + } + self.decrement_in_flight_message_num_internal( + &check_point.topic, + &check_point.cid, + check_point.queue_id, + 1, + ); + } + + fn decrement_in_flight_message_num_internal( + &self, + topic: &CheetahString, + group: &CheetahString, + queue_id: i32, + delta: i64, + ) { + let key = Self::build_key(topic, group); + let mut map = self.topic_in_flight_message_num.lock(); + if let Some(queue_num) = map.get_mut(&key) { + if let Some(counter) = queue_num.get(&queue_id) { + if counter.fetch_add(-delta, Ordering::SeqCst) <= 0 { + queue_num.remove(&queue_id); + } + } + if queue_num.is_empty() { + map.remove(&key); + } + } + } + + pub fn clear_in_flight_message_num_by_group_name(&self, group: &CheetahString) { + let keys: Vec = { + let map = self.topic_in_flight_message_num.lock(); + map.keys().cloned().collect() + }; + for key in keys { + if key.contains(group.as_str()) { + if let Some((topic, group_name)) = Self::split_key(&key) { + if &group_name == group { + let mut map = self.topic_in_flight_message_num.lock(); + map.remove(&key); + info!( + "PopInflightMessageCounter#clearInFlightMessageNumByGroupName: clean \ + by group, topic={}, group={}", + topic, group_name + ); + } + } + } + } + } + + pub fn clear_in_flight_message_num_by_topic_name(&self, topic: &CheetahString) { + let keys: Vec = { + let map = self.topic_in_flight_message_num.lock(); + map.keys().cloned().collect() + }; + for key in keys { + if key.contains(topic.as_str()) { + if let Some((topic_name, group)) = Self::split_key(&key) { + if topic_name.as_str() == topic { + let mut map = self.topic_in_flight_message_num.lock(); + map.remove(&key); + info!( + "PopInflightMessageCounter#clearInFlightMessageNumByTopicName: clean \ + by topic, topic={}, group={}", + topic_name, group + ); + } + } + } + } + } + + pub fn clear_in_flight_message_num( + &self, + topic: &CheetahString, + group: &CheetahString, + queue_id: i32, + ) { + let key = Self::build_key(topic, group); + let mut map = self.topic_in_flight_message_num.lock(); + if let Some(queue_num) = map.get_mut(&key) { + queue_num.remove(&queue_id); + if queue_num.is_empty() { + map.remove(&key); + } + } + } + + pub fn get_group_pop_in_flight_message_num( + &self, + topic: &CheetahString, + group: &CheetahString, + queue_id: i32, + ) -> i64 { + let key = Self::build_key(topic, group); + let map = self.topic_in_flight_message_num.lock(); + if let Some(queue_counter) = map.get(&key) { + if let Some(counter) = queue_counter.get(&queue_id) { + return counter.load(Ordering::SeqCst).max(0); + } + } + 0 + } + + fn split_key(key: &CheetahString) -> Option<(CheetahString, CheetahString)> { + let parts: Vec<&str> = key.split(Self::TOPIC_GROUP_SEPARATOR).collect(); + if parts.len() == 2 { + Some((parts[0].into(), parts[1].into())) + } else { + None + } + } + + fn build_key(topic: &CheetahString, group: &CheetahString) -> CheetahString { + format!("{}{}{}", topic, Self::TOPIC_GROUP_SEPARATOR, group).into() + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::AtomicU64; + + use cheetah_string::CheetahString; + + use super::*; + + fn setup_counter() -> PopInflightMessageCounter { + PopInflightMessageCounter::new(Arc::new(AtomicU64::new(0))) + } + + #[test] + fn increment_in_flight_message_num_increments_correctly() { + let counter = setup_counter(); + let topic = CheetahString::from("test_topic"); + let group = CheetahString::from("test_group"); + counter.increment_in_flight_message_num(&topic, &group, 1, 5); + assert_eq!( + counter.get_group_pop_in_flight_message_num(&topic, &group, 1), + 5 + ); + } + + #[test] + fn decrement_in_flight_message_num_decrements_correctly() { + let counter = setup_counter(); + let topic = CheetahString::from("test_topic"); + let group = CheetahString::from("test_group"); + counter.increment_in_flight_message_num(&topic, &group, 1, 5); + counter.decrement_in_flight_message_num(&topic, &group, 0, 1, 3); + assert_eq!( + counter.get_group_pop_in_flight_message_num(&topic, &group, 1), + 2 + ); + } + + #[test] + fn clear_in_flight_message_num_by_group_name_clears_correctly() { + let counter = setup_counter(); + let topic = CheetahString::from("test_topic"); + let group = CheetahString::from("test_group"); + counter.increment_in_flight_message_num(&topic, &group, 1, 5); + counter.clear_in_flight_message_num_by_group_name(&group); + assert_eq!( + counter.get_group_pop_in_flight_message_num(&topic, &group, 1), + 0 + ); + } + + #[test] + fn clear_in_flight_message_num_by_topic_name_clears_correctly() { + let counter = setup_counter(); + let topic = CheetahString::from("test_topic"); + let group = CheetahString::from("test_group"); + counter.increment_in_flight_message_num(&topic, &group, 1, 5); + counter.clear_in_flight_message_num_by_topic_name(&topic); + assert_eq!( + counter.get_group_pop_in_flight_message_num(&topic, &group, 1), + 0 + ); + } + + #[test] + fn clear_in_flight_message_num_clears_correctly() { + let counter = setup_counter(); + let topic = CheetahString::from("test_topic"); + let group = CheetahString::from("test_group"); + counter.increment_in_flight_message_num(&topic, &group, 1, 5); + counter.clear_in_flight_message_num(&topic, &group, 1); + assert_eq!( + counter.get_group_pop_in_flight_message_num(&topic, &group, 1), + 0 + ); + } + + #[test] + fn decrement_in_flight_message_num_checkpoint_decrements_correctly() { + let counter = setup_counter(); + let topic = CheetahString::from("test_topic"); + let group = CheetahString::from("test_group"); + let checkpoint = PopCheckPoint { + start_offset: 0, + topic: topic.clone(), + cid: group.clone(), + revive_offset: 0, + queue_offset_diff: vec![], + broker_name: None, + queue_id: 1, + pop_time: 0, + invisible_time: 0, + bit_map: 0, + num: 0, + re_put_times: None, + }; + counter.increment_in_flight_message_num(&topic, &group, 1, 5); + counter.decrement_in_flight_message_num_checkpoint(&checkpoint); + assert_eq!( + counter.get_group_pop_in_flight_message_num(&topic, &group, 1), + 4 + ); } }