Skip to content

Commit

Permalink
[ISSUE #350]📌Optimize AppendMessageCallback trait and implemention
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed May 10, 2024
1 parent 1491f69 commit bc21f68
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 27 deletions.
64 changes: 50 additions & 14 deletions rocketmq-store/src/base/append_message_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

use bytes::BytesMut;
use rocketmq_common::{
common::{
message::{message_batch::MessageExtBatch, message_single::MessageExtBrokerInner},
attribute::cq_type::CQType,
config::TopicConfig,
message::{
message_batch::MessageExtBatch, message_single::MessageExtBrokerInner, MessageConst,
},
sys_flag::message_sys_flag::MessageSysFlag,
},
utils::message_utils,
utils::{message_utils, queue_type_utils::QueueTypeUtils},
};

use crate::{
Expand All @@ -31,7 +35,10 @@ use crate::{
put_message_context::PutMessageContext,
},
config::message_store_config::MessageStoreConfig,
log_file::commit_log::{CommitLog, CRC32_RESERVED_LEN},
log_file::{
commit_log::{CommitLog, CRC32_RESERVED_LEN},
mapped_file::default_impl_refactor::LocalMappedFile,
},
};

/// Write messages callback interface
Expand All @@ -50,9 +57,9 @@ pub trait AppendMessageCallback {
///
/// The number of bytes written
fn do_append(
&mut self,
&self,
file_from_offset: i64,
file_wrote_position: i64,
mapped_file: &LocalMappedFile,
max_blank: i32,
msg: &mut MessageExtBrokerInner,
put_message_context: &PutMessageContext,
Expand Down Expand Up @@ -85,29 +92,58 @@ pub trait AppendMessageCallback {
const END_FILE_MIN_BLANK_LENGTH: i32 = 4 + 4;

pub(crate) struct DefaultAppendMessageCallback {
pub msg_store_item_memory: bytes::BytesMut,
pub crc32_reserved_length: i32,
pub message_store_config: Arc<MessageStoreConfig>,
msg_store_item_memory: bytes::BytesMut,
crc32_reserved_length: i32,
message_store_config: Arc<MessageStoreConfig>,
topic_config_table: Arc<parking_lot::Mutex<HashMap<String, TopicConfig>>>,
}

impl DefaultAppendMessageCallback {
pub fn new(message_store_config: Arc<MessageStoreConfig>) -> Self {
pub fn new(
message_store_config: Arc<MessageStoreConfig>,
topic_config_table: Arc<parking_lot::Mutex<HashMap<String, TopicConfig>>>,
) -> Self {
Self {
msg_store_item_memory: bytes::BytesMut::with_capacity(
END_FILE_MIN_BLANK_LENGTH as usize,
),
crc32_reserved_length: CRC32_RESERVED_LEN,
message_store_config,
topic_config_table,
}
}
}

impl DefaultAppendMessageCallback {
fn get_message_num(&self, msg_inner: &MessageExtBrokerInner) -> i16 {
let mut message_num = 1i16;
let cq_type = self.get_cq_type(msg_inner);
if MessageSysFlag::check(msg_inner.sys_flag(), MessageSysFlag::INNER_BATCH_FLAG)
|| CQType::BatchCQ == cq_type
{
if let Some(key) = msg_inner.property(MessageConst::PROPERTY_INNER_NUM) {
message_num = key.parse::<i16>().unwrap_or(1);
}
}
message_num
}

fn get_cq_type(&self, msg_inner: &MessageExtBrokerInner) -> CQType {
let topic_config = self
.topic_config_table
.lock()
.get(msg_inner.message_ext_inner.topic())
.cloned();
QueueTypeUtils::get_cq_type(&topic_config)
}
}

#[allow(unused_variables)]
impl AppendMessageCallback for DefaultAppendMessageCallback {
fn do_append(
&mut self,
&self,
file_from_offset: i64,
file_wrote_position: i64,
mapped_file: &LocalMappedFile,
max_blank: i32,
msg_inner: &mut MessageExtBrokerInner,
put_message_context: &PutMessageContext,
Expand All @@ -122,14 +158,14 @@ impl AppendMessageCallback for DefaultAppendMessageCallback {
}

let msg_len = i32::from_le_bytes(pre_encode_buffer[0..4].try_into().unwrap());
let wrote_offset = file_from_offset + file_wrote_position;
let wrote_offset = file_from_offset + mapped_file.wrote_position() as i64;

let msg_id =
message_utils::build_message_id(msg_inner.message_ext_inner.store_host, wrote_offset);

let mut queue_offset = msg_inner.queue_offset();
//let message_num = CommitLog::get_message_num(msg_inner);
let message_num = 1;
let message_num = self.get_message_num(msg_inner);
match MessageSysFlag::get_transaction_value(msg_inner.sys_flag()) {
MessageSysFlag::TRANSACTION_PREPARED_TYPE
| MessageSysFlag::TRANSACTION_ROLLBACK_TYPE => queue_offset = 0,
Expand Down
23 changes: 14 additions & 9 deletions rocketmq-store/src/log_file/commit_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use rocketmq_common::{
common::{
attribute::cq_type::CQType,
broker::broker_config::BrokerConfig,
config::TopicConfig,
message::{
message_single::{tags_string2tags_code, MessageExtBrokerInner},
MessageConst, MessageVersion,
Expand Down Expand Up @@ -86,6 +87,7 @@ pub struct CommitLog {
dispatcher: CommitLogDispatcherDefault,
confirm_offset: i64,
store_checkpoint: StoreCheckpoint,
append_message_callback: Arc<DefaultAppendMessageCallback>,
}

impl CommitLog {
Expand All @@ -94,19 +96,24 @@ impl CommitLog {
broker_config: Arc<BrokerConfig>,
dispatcher: &CommitLogDispatcherDefault,
store_checkpoint: StoreCheckpoint,
topic_config_table: Arc<parking_lot::Mutex<HashMap<String, TopicConfig>>>,
) -> Self {
let enabled_append_prop_crc = message_store_config.enabled_append_prop_crc;
let store_path = message_store_config.get_store_path_commit_log();
let mapped_file_size = message_store_config.mapped_file_size_commit_log;
Self {
mapped_file_queue: MappedFileQueue::new(store_path, mapped_file_size as u64, None),
message_store_config,
message_store_config: message_store_config.clone(),
broker_config,
enabled_append_prop_crc,
//local_file_message_store: None,
dispatcher: dispatcher.clone(),
confirm_offset: -1,
store_checkpoint,
append_message_callback: Arc::new(DefaultAppendMessageCallback::new(
message_store_config,
topic_config_table,
)),
}
}
}
Expand Down Expand Up @@ -189,14 +196,12 @@ impl CommitLog {
};
let topic_queue_key = generate_key(&msg);
let mut put_message_context = PutMessageContext::new(topic_queue_key);
let append_message_callback =
DefaultAppendMessageCallback::new(self.message_store_config.clone());
/* let result =
append_message_callback.do_append(mapped_file.file_from_offset() as i64, 0, &mut msg);
mapped_file.append_data(msg.encoded_buff.clone(), false);*/

let result =
mapped_file.append_message(msg, append_message_callback, &mut put_message_context);

let result = mapped_file.append_message(
msg,
self.append_message_callback.as_ref(),
&mut put_message_context,
);

match result.status {
AppendMessageStatus::PutOk => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@ impl LocalMappedFile {
write_success
}

pub fn append_message(
pub fn append_message<AMC: AppendMessageCallback>(
&self,
message: MessageExtBrokerInner,
message_callback: impl AppendMessageCallback,
message_callback: &AMC,
put_message_context: &mut PutMessageContext,
) -> AppendMessageResult {
let mut message = message;
Expand All @@ -177,10 +177,9 @@ impl LocalMappedFile {
..Default::default()
};
}
let mut message_callback = message_callback;
let append_message_result = message_callback.do_append(
self.file_from_offset() as i64,
current_pos as i64,
self,
(self.file_size - current_pos) as i32,
&mut message,
put_message_context,
Expand Down
1 change: 1 addition & 0 deletions rocketmq-store/src/message_store/default_message_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl DefaultMessageStore {
broker_config.clone(),
&dispatcher,
store_checkpoint.clone(),
topic_config_table.clone(),
);
Self {
message_store_config: message_store_config.clone(),
Expand Down

0 comments on commit bc21f68

Please sign in to comment.