Skip to content

Commit

Permalink
[ISSUE #2056]💫Implement PopBufferMergeService#putBatchAckToStore for …
Browse files Browse the repository at this point in the history
…rust (#2057)
  • Loading branch information
mxsm authored Jan 3, 2025
1 parent 0545a8b commit b264077
Showing 1 changed file with 53 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,9 @@ impl<MS: MessageStore> PopBufferMergeService<MS> {
}
}
if !self.batch_ack_index_list.is_empty()
&& self.put_batch_ack_to_store(point_wrapper, &self.batch_ack_index_list)
&& self
.put_batch_ack_to_store(point_wrapper, &self.batch_ack_index_list)
.await
{
count += self.batch_ack_index_list.len();
for index in &self.batch_ack_index_list {
Expand Down Expand Up @@ -486,15 +488,61 @@ impl<MS: MessageStore> PopBufferMergeService<MS> {
}
}

fn put_batch_ack_to_store(
async fn put_batch_ack_to_store(
&self,
point_wrapper: &PopCheckPointWrapper,
index_list: &[u8],
) -> bool {
// Implement the logic to put batch ack to store
unimplemented!()
}
let point = point_wrapper.get_ck();
let ack_msg = AckMsg {
start_offset: point.start_offset,
consumer_group: point.cid.clone(),
topic: point.topic.clone(),
queue_id: point.queue_id,
pop_time: point.pop_time,
..Default::default()
};
let mut batch_ack_msg = BatchAckMsg {
ack_offset_list: Vec::with_capacity(index_list.len()),
ack_msg,
};
for index in index_list {
batch_ack_msg
.ack_offset_list
.push(point.ack_offset_by_index(*index));
}
let mut msg = MessageExtBrokerInner::default();
msg.set_topic(self.revive_topic.clone());
msg.set_body(Bytes::from(batch_ack_msg.to_json().unwrap()));
msg.message_ext_inner.queue_id = point_wrapper.revive_queue_id;
msg.set_tags(CheetahString::from_static_str(
PopAckConstants::BATCH_ACK_TAG,
));
msg.message_ext_inner.born_timestamp = get_current_millis() as i64;
msg.message_ext_inner.born_host = self.store_host;
msg.message_ext_inner.store_host = self.store_host;
msg.set_delay_time_ms(point.get_revive_time() as u64);
msg.put_property(
CheetahString::from_static_str(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),
CheetahString::from_string(PopMessageProcessor::<MS>::gen_batch_ack_unique_id(
&batch_ack_msg,
)),
);
msg.properties_string = message_decoder::message_properties_to_string(msg.get_properties());

let put_message_result = self
.escape_bridge
.mut_from_ref()
.put_message_to_specific_queue(msg)
.await;
matches!(
put_message_result.put_message_status(),
PutMessageStatus::PutOk
| PutMessageStatus::FlushDiskTimeout
| PutMessageStatus::FlushSlaveTimeout
| PutMessageStatus::SlaveNotAvailable
)
}
async fn put_ack_to_store(&self, point_wrapper: &PopCheckPointWrapper, index: u8) -> bool {
let point = point_wrapper.get_ck();
let ack_msg = AckMsg {
Expand Down

0 comments on commit b264077

Please sign in to comment.