Skip to content

Commit

Permalink
[ISSUE #2052]💫Implement PopBufferMergeService#putAckToStore🚀
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Jan 3, 2025
1 parent b7ac028 commit 7ef1d97
Showing 1 changed file with 59 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,24 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Instant;

use bytes::Bytes;
use cheetah_string::CheetahString;
use dashmap::DashMap;
use rocketmq_common::common::broker::broker_config::BrokerConfig;
use rocketmq_common::common::broker::broker_role::BrokerRole;
use rocketmq_common::common::key_builder::KeyBuilder;
use rocketmq_common::common::message::message_decoder;
use rocketmq_common::common::message::message_ext_broker_inner::MessageExtBrokerInner;
use rocketmq_common::common::message::MessageConst;
use rocketmq_common::common::message::MessageTrait;
use rocketmq_common::common::pop_ack_constants::PopAckConstants;
use rocketmq_common::utils::data_converter::DataConverter;
use rocketmq_common::TimeUtils::get_current_millis;
use rocketmq_remoting::protocol::RemotingSerializable;
use rocketmq_rust::ArcMut;
use rocketmq_store::base::message_status_enum::PutMessageStatus;
use rocketmq_store::log_file::MessageStore;
use rocketmq_store::pop::ack_msg::AckMsg;
use rocketmq_store::pop::batch_ack_msg::BatchAckMsg;
use rocketmq_store::pop::pop_check_point::PopCheckPoint;
use rocketmq_store::pop::AckMessage;
Expand Down Expand Up @@ -321,6 +328,20 @@ impl<MS: MessageStore> PopBufferMergeService<MS> {
}

self.batch_ack_index_list.clear();
} else {
for i in 0..point.num {
if DataConverter::get_bit(
point_wrapper.get_bits().load(Ordering::Relaxed),
i as usize,
) && !DataConverter::get_bit(
point_wrapper.get_to_store_bits().load(Ordering::Relaxed),
i as usize,
) && self.put_ack_to_store(point_wrapper, i).await
{
count += 1;
Self::mark_bit_cas(point_wrapper.get_to_store_bits(), i as usize);
}

Check warning on line 343 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L332-L343

Added lines #L332 - L343 were not covered by tests
}
}
} else if point_wrapper.get_revive_queue_offset() < 0 {
self.put_ck_to_store(point_wrapper, false).await;
Expand Down Expand Up @@ -474,9 +495,44 @@ impl<MS: MessageStore> PopBufferMergeService<MS> {
unimplemented!()
}

fn put_ack_to_store(&self, point_wrapper: &PopCheckPointWrapper, index: u8) -> bool {
// Implement the logic to put ack to store
unimplemented!()
async fn put_ack_to_store(&self, point_wrapper: &PopCheckPointWrapper, index: u8) -> bool {
let point = point_wrapper.get_ck();
let ack_msg = AckMsg {
ack_offset: point.ack_offset_by_index(index),
start_offset: point.start_offset,
consumer_group: point.cid.clone(),
topic: point.topic.clone(),
queue_id: point.queue_id,
pop_time: point.pop_time,
..Default::default()
};
let mut msg = MessageExtBrokerInner::default();
msg.set_topic(self.revive_topic.clone());
msg.set_body(Bytes::from(ack_msg.to_json().unwrap()));
msg.message_ext_inner.queue_id = point_wrapper.revive_queue_id;
msg.set_tags(CheetahString::from_static_str(PopAckConstants::ACK_TAG));
msg.message_ext_inner.born_timestamp = get_current_millis() as i64;
msg.message_ext_inner.born_host = self.store_host;
msg.message_ext_inner.store_host = self.store_host;
msg.set_delay_time_ms(point.get_revive_time() as u64);
msg.put_property(
CheetahString::from_static_str(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),
CheetahString::from_string(PopMessageProcessor::<MS>::gen_ack_unique_id(&ack_msg)),
);
msg.properties_string = message_decoder::message_properties_to_string(msg.get_properties());

Check warning on line 522 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L498-L522

Added lines #L498 - L522 were not covered by tests

let put_message_result = self
.escape_bridge
.mut_from_ref()
.put_message_to_specific_queue(msg)
.await;
matches!(
put_message_result.put_message_status(),

Check warning on line 530 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L524-L530

Added lines #L524 - L530 were not covered by tests
PutMessageStatus::PutOk
| PutMessageStatus::FlushDiskTimeout
| PutMessageStatus::FlushSlaveTimeout
| PutMessageStatus::SlaveNotAvailable
)
}
}

Expand Down

0 comments on commit 7ef1d97

Please sign in to comment.