Skip to content

Commit

Permalink
[ISSUE #1216]✅Optimize TransactionalMessageUtil some method⚡️ (#1217)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Nov 18, 2024
1 parent 79e3fae commit 69ba1f1
Showing 1 changed file with 84 additions and 11 deletions.
95 changes: 84 additions & 11 deletions rocketmq-broker/src/transaction/queue/transactional_message_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,16 @@ impl TransactionalMessageUtil {
pub const OFFSET_SEPARATOR: &'static str = ",";
pub const TRANSACTION_ID: &'static str = "__transactionId__";

pub fn build_op_topic() -> &'static str {
#[inline(always)]
pub const fn build_op_topic() -> &'static str {
TopicValidator::RMQ_SYS_TRANS_OP_HALF_TOPIC
}

pub fn build_half_topic() -> &'static str {
#[inline(always)]
pub const fn build_half_topic() -> &'static str {
TopicValidator::RMQ_SYS_TRANS_HALF_TOPIC
}

pub fn build_consumer_group() -> &'static str {
#[inline(always)]
pub const fn build_consumer_group() -> &'static str {
mix_all::CID_SYS_RMQ_TRANS
}

Expand All @@ -60,7 +61,9 @@ impl TransactionalMessageUtil {
))
.unwrap_or_default(),
);
// msg_inner.set_body(msg_ext.get_body().clone());
if let Some(body) = msg_ext.get_body() {
msg_inner.set_body(body.clone());
}
if let Some(real_queue_id_str) =
msg_ext.get_property(&CheetahString::from_static_str("REAL_QUEUE_ID"))
{
Expand All @@ -69,11 +72,8 @@ impl TransactionalMessageUtil {
}
}
msg_inner.set_flag(msg_ext.get_flag());
/* msg_inner.set_tags_code(MessageExtBrokerInner::tags_string_to_tags_code(
msg_inner.get_tags(),
));*/
msg_inner.tags_code = MessageExtBrokerInner::tags_string_to_tags_code(
msg_inner.get_tags().as_deref().unwrap(),
msg_inner.get_tags().unwrap_or_default().as_str(),
);
msg_inner
.message_ext_inner
Expand All @@ -91,7 +91,7 @@ impl TransactionalMessageUtil {
MessageAccessor::put_property(
&mut msg_inner,
CheetahString::from_static_str(MessageConst::PROPERTY_TRANSACTION_PREPARED),
CheetahString::from_string("true".to_owned()),
CheetahString::from_static_str("true"),
);
MessageAccessor::clear_property(
&mut msg_inner,
Expand Down Expand Up @@ -121,3 +121,76 @@ impl TransactionalMessageUtil {
check_immunity_time
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn build_op_topic_returns_correct_topic() {
assert_eq!(
TransactionalMessageUtil::build_op_topic(),
TopicValidator::RMQ_SYS_TRANS_OP_HALF_TOPIC
);
}

#[test]
fn build_half_topic_returns_correct_topic() {
assert_eq!(
TransactionalMessageUtil::build_half_topic(),
TopicValidator::RMQ_SYS_TRANS_HALF_TOPIC
);
}

#[test]
fn build_consumer_group_returns_correct_group() {
assert_eq!(
TransactionalMessageUtil::build_consumer_group(),
mix_all::CID_SYS_RMQ_TRANS
);
}

#[test]
fn build_transactional_message_from_half_message_with_valid_message() {
let msg_ext = MessageExt::default();
let msg_inner =
TransactionalMessageUtil::build_transactional_message_from_half_message(&msg_ext);
assert_eq!(msg_inner.message_ext_inner.msg_id(), msg_ext.msg_id());
assert_eq!(
msg_inner.get_topic(),
&msg_ext
.get_property(&CheetahString::from_static_str(
MessageConst::PROPERTY_REAL_TOPIC
))
.unwrap_or_default()
);
assert_eq!(msg_inner.get_body(), msg_ext.get_body());
assert_eq!(msg_inner.get_flag(), msg_ext.get_flag());
assert_eq!(
msg_inner.get_transaction_id(),
&msg_ext
.get_property(&CheetahString::from_static_str(
MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX
))
.unwrap_or_default()
);
}

#[test]
fn get_immunity_time_with_valid_time_string() {
let immunity_time = TransactionalMessageUtil::get_immunity_time("10", 5000);
assert_eq!(immunity_time, 10000);
}

#[test]
fn get_immunity_time_with_invalid_time_string() {
let immunity_time = TransactionalMessageUtil::get_immunity_time("invalid", 5000);
assert_eq!(immunity_time, 5000);
}

#[test]
fn get_immunity_time_with_time_less_than_transaction_timeout() {
let immunity_time = TransactionalMessageUtil::get_immunity_time("3", 5000);
assert_eq!(immunity_time, 5000);
}
}

0 comments on commit 69ba1f1

Please sign in to comment.