Skip to content

Commit

Permalink
[ISSUE #1862]🍻Implment PopInflightMessageCounter java for rust struct💡 (
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Dec 19, 2024
1 parent d18deb1 commit 4f6ec9c
Show file tree
Hide file tree
Showing 4 changed files with 308 additions and 7 deletions.
11 changes: 10 additions & 1 deletion rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ use crate::processor::client_manage_processor::ClientManageProcessor;
use crate::processor::consumer_manage_processor::ConsumerManageProcessor;
use crate::processor::default_pull_message_result_handler::DefaultPullMessageResultHandler;
use crate::processor::end_transaction_processor::EndTransactionProcessor;
use crate::processor::pop_inflight_message_counter::PopInflightMessageCounter;
use crate::processor::pop_message_processor::PopMessageProcessor;
use crate::processor::processor_service::pop_buffer_merge_service::PopBufferMergeService;
use crate::processor::pull_message_processor::PullMessageProcessor;
Expand Down Expand Up @@ -144,6 +145,7 @@ pub(crate) struct BrokerRuntime {
topic_route_info_manager: Arc<TopicRouteInfoManager>,
#[cfg(feature = "local_file_store")]
escape_bridge: ArcMut<EscapeBridge<DefaultMessageStore>>,
pop_inflight_message_counter: Arc<PopInflightMessageCounter>,
}

impl Clone for BrokerRuntime {
Expand Down Expand Up @@ -185,6 +187,7 @@ impl Clone for BrokerRuntime {
transaction_metrics_flush_service: None,
topic_route_info_manager: self.topic_route_info_manager.clone(),
escape_bridge: self.escape_bridge.clone(),
pop_inflight_message_counter: self.pop_inflight_message_counter.clone(),
}
}
}
Expand Down Expand Up @@ -255,6 +258,9 @@ impl BrokerRuntime {
Arc::new(topic_config_manager.clone()),
subscription_group_manager.clone(),
));
let should_start_time = Arc::new(AtomicU64::new(0));
let pop_inflight_message_counter =
Arc::new(PopInflightMessageCounter::new(should_start_time.clone()));
Self {
store_host,
broker_config: broker_config.clone(),
Expand All @@ -281,7 +287,7 @@ impl BrokerRuntime {
broker_stats_manager,
topic_queue_mapping_clean_service: None,
update_master_haserver_addr_periodically: false,
should_start_time: Arc::new(AtomicU64::new(0)),
should_start_time,
is_isolated: Arc::new(AtomicBool::new(false)),
pull_request_hold_service: None,
rebalance_lock_manager: Arc::new(Default::default()),
Expand All @@ -292,6 +298,7 @@ impl BrokerRuntime {
transaction_metrics_flush_service: None,
topic_route_info_manager,
escape_bridge,
pop_inflight_message_counter,
}
}

Expand Down Expand Up @@ -550,13 +557,15 @@ impl BrokerRuntime {
self.broker_stats_manager.clone(),
self.rebalance_lock_manager.clone(),
self.broker_member_group.clone(),
self.pop_inflight_message_counter.clone(),
);
let pop_message_processor = ArcMut::new(PopMessageProcessor::default());
let ack_message_processor = ArcMut::new(AckMessageProcessor::new(
self.topic_config_manager.clone(),
self.message_store.as_ref().unwrap().clone(),
self.escape_bridge.clone(),
self.broker_config.clone(),
self.pop_inflight_message_counter.clone(),
));
BrokerRequestProcessor {
send_message_processor: ArcMut::new(send_message_processor),
Expand Down
16 changes: 14 additions & 2 deletions rocketmq-broker/src/processor/ack_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use tracing::error;
use crate::broker_error::BrokerError::BrokerCommonError;
use crate::broker_error::BrokerError::BrokerRemotingError;
use crate::failover::escape_bridge::EscapeBridge;
use crate::processor::pop_inflight_message_counter::PopInflightMessageCounter;
use crate::processor::pop_message_processor::PopMessageProcessor;
use crate::processor::processor_service::pop_buffer_merge_service::PopBufferMergeService;
use crate::topic::manager::topic_config_manager::TopicConfigManager;
Expand All @@ -62,6 +63,7 @@ pub struct AckMessageProcessor<MS> {
pop_buffer_merge_service: ArcMut<PopBufferMergeService>,
escape_bridge: ArcMut<EscapeBridge<MS>>,
store_host: SocketAddr,
pop_inflight_message_counter: Arc<PopInflightMessageCounter>,
}

impl<MS> AckMessageProcessor<MS>
Expand All @@ -73,6 +75,7 @@ where
message_store: ArcMut<MS>,
escape_bridge: ArcMut<EscapeBridge<MS>>,
broker_config: Arc<BrokerConfig>,
pop_inflight_message_counter: Arc<PopInflightMessageCounter>,
) -> AckMessageProcessor<MS> {
let store_host = format!("{}:{}", broker_config.broker_ip1, broker_config.listen_port)
.parse::<SocketAddr>()
Expand All @@ -84,6 +87,7 @@ where
pop_buffer_merge_service: ArcMut::new(PopBufferMergeService),
escape_bridge,
store_host,
pop_inflight_message_counter,
}
}

Expand Down Expand Up @@ -353,7 +357,7 @@ where
//this.brokerController.getBrokerStatsManager().incBrokerAckNums(ackCount);
//this.brokerController.getBrokerStatsManager().incGroupAckNums(consumeGroup,topic,
// ackCount);
ack_msg.set_consumer_group(consume_group);
ack_msg.set_consumer_group(consume_group.clone());
ack_msg.set_topic(topic.clone());
ack_msg.set_queue_id(qid);
ack_msg.set_start_offset(start_offset);
Expand All @@ -367,7 +371,7 @@ where
return;
}
let mut inner = MessageExtBrokerInner::default();
inner.set_topic(topic);
inner.set_topic(topic.clone());
inner.message_ext_inner.queue_id = qid;
if let Some(batch_ack) = ack_msg.as_any().downcast_ref::<BatchAckMsg>() {
inner.set_body(Bytes::from(batch_ack.encode().unwrap()));
Expand Down Expand Up @@ -417,6 +421,14 @@ where
);
}
}
self.pop_inflight_message_counter
.decrement_in_flight_message_num(
&topic,
&consume_group,
pop_time,
qid,
ack_count as i64,
);
}

fn ack_orderly(
Expand Down
3 changes: 2 additions & 1 deletion rocketmq-broker/src/processor/admin_broker_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ impl AdminBrokerProcessor {
broker_stats_manager: Arc<BrokerStatsManager>,
rebalance_lock_manager: Arc<RebalanceLockManager>,
broker_member_group: Arc<BrokerMemberGroup>,
pop_inflight_message_counter: Arc<PopInflightMessageCounter>,
) -> Self {
let inner = Inner {
broker_config,
Expand All @@ -84,7 +85,7 @@ impl AdminBrokerProcessor {
consumer_offset_manager,
topic_queue_mapping_manager,
default_message_store,
pop_inflight_message_counter: Arc::new(PopInflightMessageCounter),
pop_inflight_message_counter,
schedule_message_service,
broker_stats,
consume_manager,
Expand Down
Loading

0 comments on commit 4f6ec9c

Please sign in to comment.