Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #512]🔥Implementing Functionality do append batch messages🚀 #513

Merged
merged 1 commit into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions rocketmq-common/src/utils/message_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,57 @@ pub fn build_message_id(socket_addr: SocketAddr, wrote_offset: i64) -> String {
bytes_to_string(msg_id_vec.as_slice())
}

pub fn socket_address_to_vec(socket_addr: SocketAddr) -> Vec<u8> {
match socket_addr {
SocketAddr::V4(addr) => {
let mut msg_id_vec = Vec::<u8>::with_capacity(8);
msg_id_vec.extend_from_slice(&addr.ip().octets());
msg_id_vec.put_i32(addr.port() as i32);
msg_id_vec
}
SocketAddr::V6(addr) => {
let mut msg_id_vec = Vec::<u8>::with_capacity(20);
msg_id_vec.extend_from_slice(&addr.ip().octets());
msg_id_vec.put_i32(addr.port() as i32);
msg_id_vec
}
}
}

pub fn build_batch_message_id(
socket_addr: SocketAddr,
store_host_length: i32,
batch_size: usize,
phy_pos: &[i64],
) -> String {
if batch_size == 0 {
return String::new();
}
let msg_id_len = (store_host_length + 8) as usize;
let mut msg_id_vec = vec![0u8; msg_id_len];
msg_id_vec[..store_host_length as usize].copy_from_slice(&socket_address_to_vec(socket_addr));
let mut message_id = String::with_capacity(batch_size * msg_id_len * 2 + batch_size - 1);
for (index, value) in phy_pos.iter().enumerate() {
msg_id_vec[msg_id_len - 8..msg_id_len].copy_from_slice(&value.to_be_bytes());
if index != 0 {
message_id.push(',');
}
message_id.push_str(&bytes_to_string(&msg_id_vec));
}
message_id
}

pub fn parse_message_id(_msg_id: impl Into<String>) -> (SocketAddr, i64) {
unimplemented!()
}

#[cfg(test)]
mod tests {
use std::net::Ipv4Addr;

use bytes::Bytes;
use bytes::BytesMut;

use super::*;
use crate::common::message::message_single::MessageExt;

Expand Down Expand Up @@ -154,4 +199,43 @@ mod tests {
let result = build_message_id(socket_addr, wrote_offset);
assert_eq!(result, "7F0000010000000C0000000000000001");
}

#[test]
fn build_batch_message_id_creates_correct_id_for_single_position() {
let socket_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 8080);
let store_host_length = 8;
let batch_size = 1;
let phy_pos = vec![12345];

let result = build_batch_message_id(socket_addr, store_host_length, batch_size, &phy_pos);

assert_eq!(result, "7F00000100001F900000000000003039");
}

#[test]
fn build_batch_message_id_creates_correct_id_for_multiple_positions() {
let socket_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 8080);
let store_host_length = 8;
let batch_size = 2;
let phy_pos = vec![12345, 67890];

let result = build_batch_message_id(socket_addr, store_host_length, batch_size, &phy_pos);

assert_eq!(
result,
"7F00000100001F900000000000003039,7F00000100001F900000000000010932"
);
}

#[test]
fn build_batch_message_id_creates_empty_id_for_no_positions() {
let socket_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 8080);
let store_host_length = 8;
let batch_size = 0;
let phy_pos = vec![];

let result = build_batch_message_id(socket_addr, store_host_length, batch_size, &phy_pos);

assert_eq!(result, "");
}
}
95 changes: 91 additions & 4 deletions rocketmq-store/src/base/append_message_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use rocketmq_common::common::message::message_batch::MessageExtBatch;
use rocketmq_common::common::message::message_single::MessageExtBrokerInner;
use rocketmq_common::common::sys_flag::message_sys_flag::MessageSysFlag;
use rocketmq_common::utils::message_utils;
use rocketmq_common::MessageUtils::build_batch_message_id;
use rocketmq_common::TimeUtils::get_current_millis;

use crate::base::message_result::AppendMessageResult;
use crate::base::message_status_enum::AppendMessageStatus;
Expand Down Expand Up @@ -65,7 +67,8 @@ pub trait AppendMessageCallback {
mapped_file: &MF,
max_blank: i32,
msg: &mut MessageExtBatch,
put_message_context: &PutMessageContext,
put_message_context: &mut PutMessageContext,
enabled_append_prop_crc: bool,
) -> AppendMessageResult;
}

Expand Down Expand Up @@ -179,9 +182,93 @@ impl AppendMessageCallback for DefaultAppendMessageCallback {
file_from_offset: i64,
mapped_file: &MF,
max_blank: i32,
msg: &mut MessageExtBatch,
put_message_context: &PutMessageContext,
msg_batch: &mut MessageExtBatch,
put_message_context: &mut PutMessageContext,
enabled_append_prop_crc: bool,
) -> AppendMessageResult {
todo!()
let wrote_offset = file_from_offset + mapped_file.get_wrote_position() as i64;
let queue_offset = msg_batch.message_ext_broker_inner.queue_offset();
let begin_queue_offset = queue_offset;

let begin_time_mills = get_current_millis();

// Assuming get_encoded_buff returns Option<ByteBuffer>
let mut pre_encode_buffer = msg_batch.encoded_buff.take().unwrap();
let sys_flag = msg_batch.message_ext_broker_inner.sys_flag();
let born_host_length = if sys_flag & MessageSysFlag::BORNHOST_V6_FLAG == 0 {
4 + 4
} else {
16 + 4
};
let store_host_length = if sys_flag & MessageSysFlag::STOREHOSTADDRESS_V6_FLAG == 0 {
4 + 4
} else {
16 + 4
};
let mut total_msg_len = 0;
let mut msg_num = 0;
let mut msg_pos = 0;
let mut index = 0;
while total_msg_len < pre_encode_buffer.len() as i32 {
let msg_len = i32::from_be_bytes(
pre_encode_buffer[total_msg_len as usize..(total_msg_len + 4) as usize]
.try_into()
.unwrap(),
);
total_msg_len += msg_len;
if total_msg_len + END_FILE_MIN_BLANK_LENGTH > max_blank {
let mut bytes = BytesMut::with_capacity(END_FILE_MIN_BLANK_LENGTH as usize);
bytes.put_i32(max_blank);
bytes.put_i32(BLANK_MAGIC_CODE);
mapped_file.append_message_bytes(&bytes.freeze());
return AppendMessageResult {
status: AppendMessageStatus::EndOfFile,
wrote_offset,
wrote_bytes: max_blank,
msg_id: "".to_string(),
store_timestamp: msg_batch.message_ext_broker_inner.store_timestamp(),
logics_offset: begin_queue_offset,
..Default::default()
};
}
let mut pos = msg_pos + 20;
pre_encode_buffer[pos..(pos + 8)].copy_from_slice(&queue_offset.to_be_bytes());
pos += 8;
let phy_pos = wrote_offset + total_msg_len as i64 - msg_len as i64;
pre_encode_buffer[pos..(pos + 8)].copy_from_slice(&phy_pos.to_be_bytes());
pos += 8 + 4 + 8 + born_host_length;
pre_encode_buffer[pos..(pos + 8)].copy_from_slice(
&msg_batch
.message_ext_broker_inner
.store_timestamp()
.to_be_bytes(),
);
if enabled_append_prop_crc {
let _check_size = msg_len - self.crc32_reserved_length;
}
put_message_context.get_phy_pos_mut()[index] = phy_pos;
msg_num += 1;
msg_pos += msg_len as usize;
index += 1;
}

let bytes = pre_encode_buffer.freeze();
mapped_file.append_message_bytes(&bytes);
let msg_id = build_batch_message_id(
msg_batch.message_ext_broker_inner.store_host(),
store_host_length,
put_message_context.get_batch_size() as usize,
put_message_context.get_phy_pos(),
);
AppendMessageResult {
status: AppendMessageStatus::PutOk,
wrote_offset,
wrote_bytes: total_msg_len,
msg_id,
store_timestamp: msg_batch.message_ext_broker_inner.store_timestamp(),
logics_offset: begin_queue_offset,
msg_num,
..Default::default()
}
}
}
4 changes: 4 additions & 0 deletions rocketmq-store/src/base/put_message_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ impl PutMessageContext {
self.phy_pos = phy_pos;
}

pub fn get_phy_pos_mut(&mut self) -> &mut [i64] {
&mut self.phy_pos
}

pub fn get_batch_size(&self) -> i32 {
self.batch_size
}
Expand Down
8 changes: 5 additions & 3 deletions rocketmq-store/src/log_file/commit_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl CommitLog {
.store_timestamp = get_current_millis() as i64;
let tran_type =
MessageSysFlag::get_transaction_value(msg_batch.message_ext_broker_inner.sys_flag());
if MessageSysFlag::TRANSACTION_NOT_TYPE == tran_type {
if MessageSysFlag::TRANSACTION_NOT_TYPE != tran_type {
return PutMessageResult::new_default(PutMessageStatus::MessageIllegal);
}
if msg_batch
Expand Down Expand Up @@ -374,6 +374,7 @@ impl CommitLog {
&mut msg_batch,
self.append_message_callback.as_ref(),
&mut put_message_context,
self.enabled_append_prop_crc,
);
let put_message_result = match result.status {
AppendMessageStatus::PutOk => {
Expand Down Expand Up @@ -403,6 +404,7 @@ impl CommitLog {
&mut msg_batch,
self.append_message_callback.as_ref(),
&mut put_message_context,
self.enabled_append_prop_crc,
);
if AppendMessageStatus::PutOk == result.status {
PutMessageResult::new_append_result(PutMessageStatus::PutOk, Some(result))
Expand Down Expand Up @@ -865,7 +867,7 @@ impl CommitLog {
} else {
warn!(
"The commitlog files are deleted, and delete the consume queue
files"
files"
);
self.mapped_file_queue.set_flushed_where(0);
self.mapped_file_queue.set_committed_where(0);
Expand Down Expand Up @@ -1045,7 +1047,7 @@ impl CommitLog {
} else {
warn!(
"The commitlog files are deleted, and delete the consume queue
files"
files"
);
self.mapped_file_queue.set_flushed_where(0);
self.mapped_file_queue.set_committed_where(0);
Expand Down
1 change: 1 addition & 0 deletions rocketmq-store/src/log_file/mapped_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub trait MappedFile {
message: &mut MessageExtBatch,
message_callback: &AMC,
put_message_context: &mut PutMessageContext,
enabled_append_prop_crc: bool,
) -> AppendMessageResult;

fn append_message_compaction(
Expand Down
2 changes: 2 additions & 0 deletions rocketmq-store/src/log_file/mapped_file/default_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ impl MappedFile for DefaultMappedFile {
message: &mut MessageExtBatch,
message_callback: &AMC,
put_message_context: &mut PutMessageContext,
enabled_append_prop_crc: bool,
) -> AppendMessageResult {
let current_pos = self.wrote_position.load(Ordering::Acquire) as u64;
if current_pos >= self.file_size {
Expand All @@ -273,6 +274,7 @@ impl MappedFile for DefaultMappedFile {
(self.file_size - current_pos) as i32,
message,
put_message_context,
enabled_append_prop_crc,
);
self.store_timestamp.store(
message.message_ext_broker_inner.store_timestamp(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ impl MappedFile for LocalMappedFile {
message: &mut MessageExtBatch,
message_callback: &AMC,
put_message_context: &mut PutMessageContext,
enabled_append_prop_crc: bool,
) -> AppendMessageResult {
todo!()
}
Expand Down
5 changes: 2 additions & 3 deletions rocketmq-store/src/message_encoder/message_ext_encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,7 @@ impl MessageExtEncoder {
self.byte_buf.clear();

let messages_byte_buff = message_ext_batch.wrap();
messages_byte_buff.as_ref()?;
let mut messages_byte_buff = messages_byte_buff.unwrap();
let mut messages_byte_buff = messages_byte_buff?;
let total_length = messages_byte_buff.len();
if total_length > self.max_message_body_size as usize {
warn!(
Expand Down Expand Up @@ -426,7 +425,7 @@ impl MessageExtEncoder {
let current = total_length - messages_byte_buff.remaining();
let need_append_last_property_separator = properties_len > 0
&& batch_prop_len > 0
&& messages_byte_buff[total_length - 1..total_length][0]
&& properties_body.as_ref()[(properties_len - 1) as usize..][0]
!= PROPERTY_SEPARATOR as u8;
let topic_data = message_ext_batch
.message_ext_broker_inner
Expand Down
Loading