diff --git a/rocketmq-broker/src/transaction/queue/transactional_message_util.rs b/rocketmq-broker/src/transaction/queue/transactional_message_util.rs index 07aa7d9f..84ce7b86 100644 --- a/rocketmq-broker/src/transaction/queue/transactional_message_util.rs +++ b/rocketmq-broker/src/transaction/queue/transactional_message_util.rs @@ -33,15 +33,16 @@ impl TransactionalMessageUtil { pub const OFFSET_SEPARATOR: &'static str = ","; pub const TRANSACTION_ID: &'static str = "__transactionId__"; - pub fn build_op_topic() -> &'static str { + #[inline(always)] + pub const fn build_op_topic() -> &'static str { TopicValidator::RMQ_SYS_TRANS_OP_HALF_TOPIC } - - pub fn build_half_topic() -> &'static str { + #[inline(always)] + pub const fn build_half_topic() -> &'static str { TopicValidator::RMQ_SYS_TRANS_HALF_TOPIC } - - pub fn build_consumer_group() -> &'static str { + #[inline(always)] + pub const fn build_consumer_group() -> &'static str { mix_all::CID_SYS_RMQ_TRANS } @@ -60,7 +61,9 @@ impl TransactionalMessageUtil { )) .unwrap_or_default(), ); - // msg_inner.set_body(msg_ext.get_body().clone()); + if let Some(body) = msg_ext.get_body() { + msg_inner.set_body(body.clone()); + } if let Some(real_queue_id_str) = msg_ext.get_property(&CheetahString::from_static_str("REAL_QUEUE_ID")) { @@ -69,11 +72,8 @@ impl TransactionalMessageUtil { } } msg_inner.set_flag(msg_ext.get_flag()); - /* msg_inner.set_tags_code(MessageExtBrokerInner::tags_string_to_tags_code( - msg_inner.get_tags(), - ));*/ msg_inner.tags_code = MessageExtBrokerInner::tags_string_to_tags_code( - msg_inner.get_tags().as_deref().unwrap(), + msg_inner.get_tags().unwrap_or_default().as_str(), ); msg_inner .message_ext_inner @@ -91,7 +91,7 @@ impl TransactionalMessageUtil { MessageAccessor::put_property( &mut msg_inner, CheetahString::from_static_str(MessageConst::PROPERTY_TRANSACTION_PREPARED), - CheetahString::from_string("true".to_owned()), + CheetahString::from_static_str("true"), ); MessageAccessor::clear_property( &mut msg_inner, @@ -121,3 +121,76 @@ impl TransactionalMessageUtil { check_immunity_time } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn build_op_topic_returns_correct_topic() { + assert_eq!( + TransactionalMessageUtil::build_op_topic(), + TopicValidator::RMQ_SYS_TRANS_OP_HALF_TOPIC + ); + } + + #[test] + fn build_half_topic_returns_correct_topic() { + assert_eq!( + TransactionalMessageUtil::build_half_topic(), + TopicValidator::RMQ_SYS_TRANS_HALF_TOPIC + ); + } + + #[test] + fn build_consumer_group_returns_correct_group() { + assert_eq!( + TransactionalMessageUtil::build_consumer_group(), + mix_all::CID_SYS_RMQ_TRANS + ); + } + + #[test] + fn build_transactional_message_from_half_message_with_valid_message() { + let msg_ext = MessageExt::default(); + let msg_inner = + TransactionalMessageUtil::build_transactional_message_from_half_message(&msg_ext); + assert_eq!(msg_inner.message_ext_inner.msg_id(), msg_ext.msg_id()); + assert_eq!( + msg_inner.get_topic(), + &msg_ext + .get_property(&CheetahString::from_static_str( + MessageConst::PROPERTY_REAL_TOPIC + )) + .unwrap_or_default() + ); + assert_eq!(msg_inner.get_body(), msg_ext.get_body()); + assert_eq!(msg_inner.get_flag(), msg_ext.get_flag()); + assert_eq!( + msg_inner.get_transaction_id(), + &msg_ext + .get_property(&CheetahString::from_static_str( + MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX + )) + .unwrap_or_default() + ); + } + + #[test] + fn get_immunity_time_with_valid_time_string() { + let immunity_time = TransactionalMessageUtil::get_immunity_time("10", 5000); + assert_eq!(immunity_time, 10000); + } + + #[test] + fn get_immunity_time_with_invalid_time_string() { + let immunity_time = TransactionalMessageUtil::get_immunity_time("invalid", 5000); + assert_eq!(immunity_time, 5000); + } + + #[test] + fn get_immunity_time_with_time_less_than_transaction_timeout() { + let immunity_time = TransactionalMessageUtil::get_immunity_time("3", 5000); + assert_eq!(immunity_time, 5000); + } +}