Skip to content

Commit

Permalink
[ISSUE #2058]💫Implement PopBufferMergeService scan_garbage and get_of…
Browse files Browse the repository at this point in the history
…fset_total_size📸 (#2059)
  • Loading branch information
mxsm authored Jan 3, 2025
1 parent b264077 commit 3dda72b
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 6 deletions.
6 changes: 4 additions & 2 deletions rocketmq-broker/src/processor/pop_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ impl<MS> PopMessageProcessor<MS> {
broker_config: broker_config.clone(),
message_store,
message_store_config,
topic_config_manager,
subscription_group_manager,
topic_config_manager: topic_config_manager.clone(),
subscription_group_manager: subscription_group_manager.clone(),
consumer_filter_manager,
ck_message_number: Default::default(),
pop_long_polling_service: ArcMut::new(PopLongPollingService),
Expand All @@ -143,6 +143,8 @@ impl<MS> PopMessageProcessor<MS> {
broker_config,
store_host,
escape_bridge,
topic_config_manager,
subscription_group_manager,
)),
pop_inflight_message_counter,
queue_lock_manager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ use tracing::warn;
use crate::failover::escape_bridge::EscapeBridge;
use crate::processor::pop_message_processor::PopMessageProcessor;
use crate::processor::pop_message_processor::QueueLockManager;
use crate::subscription::manager::subscription_group_manager::SubscriptionGroupManager;
use crate::topic::manager::topic_config_manager::TopicConfigManager;

pub(crate) struct PopBufferMergeService<MS> {
buffer: DashMap<CheetahString /* mergeKey */, PopCheckPointWrapper>,
Expand All @@ -75,6 +77,8 @@ pub(crate) struct PopBufferMergeService<MS> {
shutdown: Arc<Notify>,
store_host: SocketAddr,
escape_bridge: ArcMut<EscapeBridge<MS>>,
topic_config_manager: Arc<TopicConfigManager>,
subscription_group_manager: Arc<SubscriptionGroupManager<MS>>,
}

impl<MS> PopBufferMergeService<MS> {
Expand All @@ -84,6 +88,8 @@ impl<MS> PopBufferMergeService<MS> {
broker_config: Arc<BrokerConfig>,
store_host: SocketAddr,
escape_bridge: ArcMut<EscapeBridge<MS>>,
topic_config_manager: Arc<TopicConfigManager>,
subscription_group_manager: Arc<SubscriptionGroupManager<MS>>,
) -> Self {
let interval = 5;
Self {
Expand All @@ -105,6 +111,8 @@ impl<MS> PopBufferMergeService<MS> {
shutdown: Arc::new(Notify::new()),
store_host,
escape_bridge,
topic_config_manager,
subscription_group_manager,
}
}
}
Expand Down Expand Up @@ -386,12 +394,42 @@ impl<MS: MessageStore> PopBufferMergeService<MS> {
self.scan_times = 0;
}
}
fn scan_garbage(&self) {
unimplemented!("scan_garbage")
fn scan_garbage(&mut self) {
self.commit_offsets.retain(|key, value| {
let key_array: Vec<&str> = key.split(PopAckConstants::SPLIT).collect();
if key_array.is_empty() || key_array.len() != 3 {
return true;
}
let topic = key_array[0];
let cid = key_array[1];
if self
.topic_config_manager
.select_topic_config(&topic.into())
.is_none()
{
return false;
}

if self
.subscription_group_manager
.contains_subscription_group(&cid.into())
{
return false;
}
if get_current_millis() - value.get_time() > self.minute5 {
return false;
}
true
});
}

pub fn get_offset_total_size(&self) -> usize {
unimplemented!("getOffsetTotalSize")
let mut count = 0;
for entry in self.commit_offsets.iter() {
let queue = entry.value();
count += queue.get_queue().len();
}
count
}

pub fn start(this: ArcMut<Self>) {
Expand All @@ -413,7 +451,7 @@ impl<MS: MessageStore> PopBufferMergeService<MS> {
}
this.mut_from_ref().scan().await;
if this.scan_times % this.count_of_second30 == 0 {
this.scan_garbage();
this.mut_from_ref().scan_garbage();
}
tokio::time::sleep(tokio::time::Duration::from_millis(this.interval)).await;
if !this.serving.load(Ordering::Acquire) && this.buffer.is_empty() && this.get_offset_total_size() == 0 {
Expand Down

0 comments on commit 3dda72b

Please sign in to comment.