Skip to content

Commit

Permalink
[ISSUE #1996]🍻Implement PopMessageProcessor#reset_pop_offset method l…
Browse files Browse the repository at this point in the history
…ogic🚀
  • Loading branch information
mxsm committed Jan 1, 2025
1 parent 59fd6b7 commit c618f82
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ impl<MS> ConfigManager for ConsumerOrderInfoManager<MS> {
}

impl<MS> ConsumerOrderInfoManager<MS> {
pub fn clear_block(&self, topic: &CheetahString, group: &CheetahString, queue_id: i32) {
unimplemented!()

Check warning on line 110 in rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs#L109-L110

Added lines #L109 - L110 were not covered by tests
}

pub fn auto_clean(&self) {
let mut consumer_order_info_wrapper = self.consumer_order_info_wrapper.lock();
let table = &mut consumer_order_info_wrapper.table;
Expand Down
35 changes: 32 additions & 3 deletions rocketmq-broker/src/processor/pop_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1028,13 +1028,42 @@ where
topic,
queue_id,
offset,
)
);

Check warning on line 1031 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L1031

Added line #L1031 was not covered by tests
}
offset
}

fn reset_pop_offset(&self, topic: &str, group: &str, queue_id: i32) -> Option<i64> {
unimplemented!("PopMessageProcessor reset_pop_offset")
fn reset_pop_offset(
&self,
topic: &CheetahString,
group: &CheetahString,
queue_id: i32,
) -> Option<i64> {
let lock_key = format!(
"{}{}{}{}{}",
topic,
PopAckConstants::SPLIT,
group,
PopAckConstants::SPLIT,
queue_id
);
let reset_offset = self
.consumer_offset_manager
.query_then_erase_reset_offset(group, topic, queue_id);
if reset_offset.is_some() {
self.consumer_order_info_manager
.clear_block(topic, group, queue_id);
self.pop_buffer_merge_service
.clear_offset_queue(lock_key.as_ref());
self.consumer_offset_manager.commit_offset(
"ResetPopOffset".into(),
group,
topic,
queue_id,
*reset_offset.as_ref().unwrap(),
)
}
reset_offset

Check warning on line 1066 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L1036-L1066

Added lines #L1036 - L1066 were not covered by tests
}

pub fn queue_lock_manager(&self) -> &QueueLockManager {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,8 @@ impl PopBufferMergeService {
pub fn get_latest_offset(&self, _lock_key: &str) -> i64 {
unimplemented!("Not implemented yet");
}

pub fn clear_offset_queue(&self, _lock_key: &str) {
unimplemented!("Not implemented yet");

Check warning on line 32 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L31-L32

Added lines #L31 - L32 were not covered by tests
}
}

0 comments on commit c618f82

Please sign in to comment.