Skip to content

Commit

Permalink
[ISSUE #2025]⚗️Implement PopBufferMergeService#add_ack🚀
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Jan 2, 2025
1 parent 2d08ec9 commit 737b70a
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 5 deletions.
3 changes: 2 additions & 1 deletion rocketmq-broker/src/processor/pop_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl<MS> PopMessageProcessor<MS> {
consumer_offset_manager,
consumer_manager,
consumer_order_info_manager,
broker_config,
broker_config: broker_config.clone(),

Check warning on line 123 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L123

Added line #L123 was not covered by tests
message_store,
message_store_config,
topic_config_manager,
Expand All @@ -131,6 +131,7 @@ impl<MS> PopMessageProcessor<MS> {
pop_buffer_merge_service: ArcMut::new(PopBufferMergeService::new(
revive_topic.clone(),
queue_lock_manager.clone(),
broker_config,

Check warning on line 134 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L134

Added line #L134 was not covered by tests
)),
pop_inflight_message_counter,
queue_lock_manager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,15 @@ use std::sync::Arc;

use cheetah_string::CheetahString;
use dashmap::DashMap;
use rocketmq_common::common::broker::broker_config::BrokerConfig;
use rocketmq_common::common::pop_ack_constants::PopAckConstants;
use rocketmq_common::utils::data_converter::DataConverter;
use rocketmq_common::TimeUtils::get_current_millis;
use rocketmq_store::pop::batch_ack_msg::BatchAckMsg;
use rocketmq_store::pop::pop_check_point::PopCheckPoint;
use rocketmq_store::pop::AckMessage;
use tracing::error;
use tracing::warn;

use crate::processor::pop_message_processor::QueueLockManager;

Expand All @@ -45,10 +50,15 @@ pub(crate) struct PopBufferMergeService {
count_of_second30: u64,
batch_ack_index_list: Vec<u8>,
master: AtomicBool,
broker_config: Arc<BrokerConfig>,
}

impl PopBufferMergeService {
pub fn new(revive_topic: CheetahString, queue_lock_manager: QueueLockManager) -> Self {
pub fn new(
revive_topic: CheetahString,
queue_lock_manager: QueueLockManager,
broker_config: Arc<BrokerConfig>,
) -> Self {

Check warning on line 61 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L57-L61

Added lines #L57 - L61 were not covered by tests
let interval = 5;
Self {
buffer: DashMap::with_capacity(1024 * 16),
Expand All @@ -65,13 +75,96 @@ impl PopBufferMergeService {
count_of_second30: 30 * 1000 / interval,
batch_ack_index_list: Vec::with_capacity(32),
master: AtomicBool::new(false),
broker_config,

Check warning on line 78 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L78

Added line #L78 was not covered by tests
}
}
}

impl PopBufferMergeService {
pub fn add_ack(&mut self, _revive_qid: i32, _ack_msg: &dyn AckMessage) -> bool {
unimplemented!("Not implemented yet");
pub fn add_ack(&mut self, revive_qid: i32, ack_msg: &dyn AckMessage) -> bool {
if !self.broker_config.enable_pop_buffer_merge {
return false;
}
if !self.serving.load(Ordering::Acquire) {
return false;
}
let point_wrapper = match self.buffer.get(&CheetahString::from_string(format!(
"{}{}{}{}{}{}",
ack_msg.topic(),
ack_msg.consumer_group(),
ack_msg.queue_id(),
ack_msg.start_offset(),
ack_msg.pop_time(),
ack_msg.broker_name()
))) {
Some(wrapper) => wrapper,

Check warning on line 100 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L84-L100

Added lines #L84 - L100 were not covered by tests
None => {
if self.broker_config.enable_pop_log {
warn!(
"[PopBuffer]add ack fail, rqId={}, no ck, {}",

Check warning on line 104 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L102-L104

Added lines #L102 - L104 were not covered by tests
revive_qid, ack_msg
);
}
return false;

Check warning on line 108 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L107-L108

Added lines #L107 - L108 were not covered by tests
}
};
if point_wrapper.is_just_offset() {
return false;
}
let point = point_wrapper.get_ck();
let now = get_current_millis();
if (point.get_revive_time() as u64 - now)
< self.broker_config.pop_ck_stay_buffer_time_out + 1500

Check warning on line 117 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L111-L117

Added lines #L111 - L117 were not covered by tests
{
if self.broker_config.enable_pop_log {
warn!(
"[PopBuffer]add ack fail, rqId={}, almost timeout for revive, {}, {}, {}",
revive_qid,
point_wrapper.value(),

Check warning on line 123 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L119-L123

Added lines #L119 - L123 were not covered by tests
ack_msg,
now
);
}
return false;
}
if now - point.pop_time as u64 > self.broker_config.pop_ck_stay_buffer_time_out - 1500 {
if self.broker_config.enable_pop_log {
warn!(
"[PopBuffer]add ack fail, rqId={}, timeout for revive, {}, {}, {}",
revive_qid,
point_wrapper.value(),

Check warning on line 135 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L127-L135

Added lines #L127 - L135 were not covered by tests
ack_msg,
now
);
}
return false;
}

Check warning on line 141 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L139-L141

Added lines #L139 - L141 were not covered by tests

if let Some(batch_ack_msg) = ack_msg.as_any().downcast_ref::<BatchAckMsg>() {
for ack_offset in &batch_ack_msg.ack_offset_list {
let index_of_ack = point.index_of_ack(*ack_offset);
if index_of_ack > -1 {
Self::mark_bit_cas(point_wrapper.get_bits(), index_of_ack as usize);
} else {
error!(
"[PopBuffer]Invalid index of ack, reviveQid={}, {}, {}",

Check warning on line 150 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L143-L150

Added lines #L143 - L150 were not covered by tests
revive_qid, ack_msg, point
);
}
}
} else {
let index_of_ack = point.index_of_ack(ack_msg.ack_offset());
if index_of_ack > -1 {
Self::mark_bit_cas(point_wrapper.get_bits(), index_of_ack as usize);
} else {
error!(
"[PopBuffer]Invalid index of ack, reviveQid={}, {}, {}",

Check warning on line 161 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L156-L161

Added lines #L156 - L161 were not covered by tests
revive_qid, ack_msg, point
);
return true;

Check warning on line 164 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L164

Added line #L164 was not covered by tests
}
}
true

Check warning on line 167 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L167

Added line #L167 was not covered by tests
}

pub fn get_latest_offset(&self, _lock_key: &str) -> i64 {
Expand All @@ -81,6 +174,23 @@ impl PopBufferMergeService {
pub fn clear_offset_queue(&self, _lock_key: &str) {
unimplemented!("Not implemented yet");
}

fn mark_bit_cas(set_bits: &AtomicI32, index: usize) {

Check warning on line 178 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L178

Added line #L178 was not covered by tests
loop {
let bits = set_bits.load(Ordering::Relaxed);
if DataConverter::get_bit(bits, index) {
break;
}
let new_bits = DataConverter::set_bit(bits, index, true);
if let Ok(value) =
set_bits.compare_exchange(bits, new_bits, Ordering::Acquire, Ordering::Relaxed)

Check warning on line 186 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L180-L186

Added lines #L180 - L186 were not covered by tests
{
if value == bits {
break;
}
}

Check warning on line 191 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L188-L191

Added lines #L188 - L191 were not covered by tests
}
}

Check warning on line 193 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L193

Added line #L193 was not covered by tests
}

pub struct QueueWithTime<T> {
Expand Down Expand Up @@ -128,7 +238,6 @@ pub struct PopCheckPointWrapper {
just_offset: bool,
ck_stored: AtomicBool,
}

impl PopCheckPointWrapper {
pub fn new(
revive_queue_id: i32,
Expand Down
4 changes: 4 additions & 0 deletions rocketmq-common/src/common/broker/broker_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ pub struct BrokerConfig {
pub pop_from_retry_probability: i32,
pub pop_response_return_actual_retry_topic: bool,
pub init_pop_offset_by_check_msg_in_mem: bool,
pub enable_pop_buffer_merge: bool,
pub pop_ck_stay_buffer_time_out: u64,
}

impl Default for BrokerConfig {
Expand Down Expand Up @@ -280,6 +282,8 @@ impl Default for BrokerConfig {
pop_from_retry_probability: 20,
pop_response_return_actual_retry_topic: false,
init_pop_offset_by_check_msg_in_mem: true,
enable_pop_buffer_merge: false,
pop_ck_stay_buffer_time_out: 3_000,
}
}
}
Expand Down

0 comments on commit 737b70a

Please sign in to comment.