Skip to content

Commit

Permalink
[ISSUE #1136]🚀Support broker receive transaction message-3
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Nov 18, 2024
1 parent 69ba1f1 commit 006752b
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 34 deletions.
152 changes: 122 additions & 30 deletions rocketmq-broker/src/processor/end_transaction_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use rocketmq_store::base::message_status_enum::PutMessageStatus;
use rocketmq_store::config::broker_role::BrokerRole;
use rocketmq_store::config::message_store_config::MessageStoreConfig;
use rocketmq_store::log_file::MessageStore;
use tracing::warn;

use crate::transaction::operation_result::OperationResult;
use crate::transaction::queue::transactional_message_util::TransactionalMessageUtil;
Expand Down Expand Up @@ -84,6 +85,7 @@ where
.decode_command_custom_header::<EndTransactionRequestHeader>()
.expect("EndTransactionRequestHeader decode failed");
if BrokerRole::Slave == self.message_store_config.broker_role {
warn!("Message store is slave mode, so end transaction is forbidden. ");

Check warning on line 88 in rocketmq-broker/src/processor/end_transaction_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/end_transaction_processor.rs#L88

Added line #L88 was not covered by tests
return Some(RemotingCommand::create_response_command_with_code(
ResponseCode::SlaveNotAvailable,
));
Expand All @@ -110,35 +112,36 @@ where

let result = if MessageSysFlag::TRANSACTION_COMMIT_TYPE == request_header.commit_or_rollback
{
let from_transaction_check = request_header.from_transaction_check;
let commit_or_rollback = request_header.commit_or_rollback;
let params = (
request_header.producer_group.to_string(),
request_header.tran_state_table_offset as i64,
request_header.commit_log_offset as i64,
);
let result = self
.transactional_message_service
.commit_message(request_header);
.commit_message(&request_header);

Check warning on line 117 in rocketmq-broker/src/processor/end_transaction_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/end_transaction_processor.rs#L117

Added line #L117 was not covered by tests
if result.response_code == ResponseCode::Success {
if self.reject_commit_or_rollback(
from_transaction_check,
request_header.from_transaction_check,

Check warning on line 120 in rocketmq-broker/src/processor/end_transaction_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/end_transaction_processor.rs#L120

Added line #L120 was not covered by tests
result.prepare_message.as_ref().unwrap(),
) {
warn!(
"Message commit fail [producer end]. currentTimeMillis - bornTime > \
checkImmunityTime, msgId={},commitLogOffset={}, wait check",

Check warning on line 125 in rocketmq-broker/src/processor/end_transaction_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/end_transaction_processor.rs#L123-L125

Added lines #L123 - L125 were not covered by tests
request_header.msg_id, request_header.commit_log_offset
);
return Some(RemotingCommand::create_response_command_with_code(
ResponseCode::IllegalOperation,
));
}
let res = self.check_prepare_message(result.prepare_message.as_ref(), &params);
let res =
self.check_prepare_message(result.prepare_message.as_ref(), &request_header);

Check warning on line 133 in rocketmq-broker/src/processor/end_transaction_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/end_transaction_processor.rs#L132-L133

Added lines #L132 - L133 were not covered by tests
if ResponseCode::from(res.code()) != ResponseCode::Success {
let mut msg_inner =
end_message_transaction(result.prepare_message.as_ref().unwrap());
msg_inner.message_ext_inner.sys_flag = MessageSysFlag::reset_transaction_value(
msg_inner.message_ext_inner.sys_flag,
commit_or_rollback,
request_header.commit_or_rollback,

Check warning on line 139 in rocketmq-broker/src/processor/end_transaction_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/end_transaction_processor.rs#L139

Added line #L139 was not covered by tests
);
msg_inner.message_ext_inner.queue_offset = params.1;
msg_inner.message_ext_inner.prepared_transaction_offset = params.2;
msg_inner.message_ext_inner.queue_offset =
request_header.tran_state_table_offset as i64;
msg_inner.message_ext_inner.prepared_transaction_offset =
request_header.commit_log_offset as i64;

Check warning on line 144 in rocketmq-broker/src/processor/end_transaction_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/end_transaction_processor.rs#L141-L144

Added lines #L141 - L144 were not covered by tests
msg_inner.message_ext_inner.store_timestamp =
result.prepare_message.as_ref().unwrap().store_timestamp;
MessageAccessor::clear_property(
Expand All @@ -159,25 +162,25 @@ where
OperationResult::default()
}
} else if MessageSysFlag::TRANSACTION_ROLLBACK_TYPE == request_header.commit_or_rollback {
let from_transaction_check = request_header.from_transaction_check;
let params = (
request_header.producer_group.to_string(),
request_header.tran_state_table_offset as i64,
request_header.commit_log_offset as i64,
);
let result = self
.transactional_message_service
.rollback_message(request_header);
.rollback_message(&request_header);

Check warning on line 167 in rocketmq-broker/src/processor/end_transaction_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/end_transaction_processor.rs#L167

Added line #L167 was not covered by tests
if result.response_code == ResponseCode::Success {
if self.reject_commit_or_rollback(
from_transaction_check,
request_header.from_transaction_check,

Check warning on line 170 in rocketmq-broker/src/processor/end_transaction_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/end_transaction_processor.rs#L170

Added line #L170 was not covered by tests
result.prepare_message.as_ref().unwrap(),
) {
warn!(
"Message commit fail [producer end]. currentTimeMillis - bornTime > \
checkImmunityTime, msgId={},commitLogOffset={}, wait check",

Check warning on line 175 in rocketmq-broker/src/processor/end_transaction_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/end_transaction_processor.rs#L173-L175

Added lines #L173 - L175 were not covered by tests
request_header.msg_id, request_header.commit_log_offset
);
return Some(RemotingCommand::create_response_command_with_code(
ResponseCode::IllegalOperation,
));
}
let res = self.check_prepare_message(result.prepare_message.as_ref(), &params);
let res =
self.check_prepare_message(result.prepare_message.as_ref(), &request_header);

Check warning on line 183 in rocketmq-broker/src/processor/end_transaction_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/end_transaction_processor.rs#L182-L183

Added lines #L182 - L183 were not covered by tests
if ResponseCode::from(res.code()) == ResponseCode::Success {
let _ = self
.transactional_message_service
Expand Down Expand Up @@ -223,7 +226,8 @@ where
fn check_prepare_message(
&self,
message_ext: Option<&MessageExt>,
params: &(String, i64, i64),
// params: &(String, i64, i64),
request_header: &EndTransactionRequestHeader,

Check warning on line 230 in rocketmq-broker/src/processor/end_transaction_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/end_transaction_processor.rs#L229-L230

Added lines #L229 - L230 were not covered by tests
) -> RemotingCommand {
let mut command = RemotingCommand::create_response_command();
if let Some(message_ext) = message_ext {
Expand All @@ -236,17 +240,17 @@ where
return command;
}
let pgroup = pgroup_read.unwrap();
if pgroup != params.0 {
if pgroup != request_header.producer_group.as_str() {

Check warning on line 243 in rocketmq-broker/src/processor/end_transaction_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/end_transaction_processor.rs#L243

Added line #L243 was not covered by tests
command.set_code_mut(ResponseCode::SystemError);
command.set_remark_mut("The producer group wrong");
return command;
}
if message_ext.queue_offset != params.1 {
if message_ext.queue_offset != request_header.tran_state_table_offset as i64 {

Check warning on line 248 in rocketmq-broker/src/processor/end_transaction_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/end_transaction_processor.rs#L248

Added line #L248 was not covered by tests
command.set_code_mut(ResponseCode::SystemError);
command.set_remark_mut("The transaction state table offset wrong");
return command;
}
if message_ext.commit_log_offset != params.2 {
if message_ext.commit_log_offset != request_header.commit_log_offset as i64 {

Check warning on line 253 in rocketmq-broker/src/processor/end_transaction_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/end_transaction_processor.rs#L253

Added line #L253 was not covered by tests
command.set_code_mut(ResponseCode::SystemError);
command.set_remark_mut("The commit log offset wrong");
return command;
Expand Down Expand Up @@ -372,10 +376,11 @@ fn end_message_transaction(msg_ext: &MessageExt) -> MessageExtBrokerInner {
} else {
TopicFilterType::SingleTag
};
let tags_code_value = MessageExtBrokerInner::tags_string2tags_code(
&topic_filter_type,
msg_ext.get_tags().as_ref().unwrap(),
);
let tags_code_value = if let Some(tags) = msg_ext.get_tags() {
MessageExtBrokerInner::tags_string2tags_code(&topic_filter_type, tags.as_str())

Check warning on line 380 in rocketmq-broker/src/processor/end_transaction_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/end_transaction_processor.rs#L380

Added line #L380 was not covered by tests
} else {
0
};
msg_inner.tags_code = tags_code_value;
MessageAccessor::set_properties(&mut msg_inner, msg_ext.get_properties().clone());
msg_inner.properties_string =
Expand All @@ -384,3 +389,90 @@ fn end_message_transaction(msg_ext: &MessageExt) -> MessageExtBrokerInner {
MessageAccessor::clear_property(&mut msg_inner, MessageConst::PROPERTY_REAL_QUEUE_ID);
msg_inner
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn end_message_transaction_with_valid_message() {
let msg_ext = MessageExt::default();
let msg_inner = end_message_transaction(&msg_ext);
assert_eq!(
msg_inner.get_topic(),
&msg_ext
.get_user_property(&CheetahString::from_static_str(
MessageConst::PROPERTY_REAL_TOPIC
))
.unwrap_or_default()
);
assert_eq!(
msg_inner.message_ext_inner.queue_id,
msg_ext
.get_user_property(&CheetahString::from_static_str(
MessageConst::PROPERTY_REAL_QUEUE_ID
))
.unwrap_or_default()
.parse::<i32>()
.unwrap_or_default()
);
assert_eq!(msg_inner.get_body(), msg_ext.get_body());
assert_eq!(msg_inner.get_flag(), msg_ext.get_flag());
assert_eq!(
msg_inner.message_ext_inner.born_timestamp,
msg_ext.born_timestamp
);
assert_eq!(msg_inner.message_ext_inner.born_host, msg_ext.born_host);
assert_eq!(msg_inner.message_ext_inner.store_host, msg_ext.store_host);
assert_eq!(
msg_inner.message_ext_inner.reconsume_times,
msg_ext.reconsume_times
);
assert!(msg_inner.is_wait_store_msg_ok());
/* assert_eq!(
msg_inner.get_transaction_id(),
&msg_ext
.get_user_property(&CheetahString::from_static_str(
MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX
))
.unwrap_or_default()
);*/
assert_eq!(msg_inner.message_ext_inner.sys_flag, msg_ext.sys_flag);
/* assert_eq!(
msg_inner.tags_code,
MessageExtBrokerInner::tags_string2tags_code(
&TopicFilterType::SingleTag,
msg_ext.get_tags().as_ref().unwrap()
)
);*/
assert_eq!(msg_inner.get_properties(), msg_ext.get_properties());
assert_eq!(
msg_inner.properties_string,
message_decoder::message_properties_to_string(msg_ext.get_properties())
);
}

#[test]
fn end_message_transaction_with_empty_body() {
let mut msg_ext = MessageExt::default();
//msg_ext.set_body(None);
let msg_inner = end_message_transaction(&msg_ext);
assert!(!msg_inner.get_body().is_some_and(|b| b.is_empty()));
}

#[test]
fn end_message_transaction_with_missing_properties() {
let mut msg_ext = MessageExt::default();
msg_ext.put_property(
CheetahString::from_static_str(MessageConst::PROPERTY_REAL_TOPIC),
CheetahString::empty(),
);
msg_ext.put_property(
CheetahString::from_static_str(MessageConst::PROPERTY_REAL_QUEUE_ID),
CheetahString::empty(),
);
let msg_inner = end_message_transaction(&msg_ext);
assert!(msg_inner.get_topic().is_empty());
assert_eq!(msg_inner.message_ext_inner.queue_id, 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,16 @@ where
}
}

fn commit_message(&mut self, request_header: EndTransactionRequestHeader) -> OperationResult {
#[inline]
fn commit_message(&mut self, request_header: &EndTransactionRequestHeader) -> OperationResult {

Check warning on line 212 in rocketmq-broker/src/transaction/queue/default_transactional_message_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/transaction/queue/default_transactional_message_service.rs#L212

Added line #L212 was not covered by tests
self.get_half_message_by_offset(request_header.commit_log_offset as i64)
}

fn rollback_message(&mut self, request_header: EndTransactionRequestHeader) -> OperationResult {
#[inline]
fn rollback_message(
&mut self,
request_header: &EndTransactionRequestHeader,
) -> OperationResult {

Check warning on line 220 in rocketmq-broker/src/transaction/queue/default_transactional_message_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/transaction/queue/default_transactional_message_service.rs#L217-L220

Added lines #L217 - L220 were not covered by tests
self.get_half_message_by_offset(request_header.commit_log_offset as i64)
}

Expand Down
75 changes: 73 additions & 2 deletions rocketmq-broker/src/transaction/transactional_message_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,104 @@ use rocketmq_store::base::message_result::PutMessageResult;
use crate::transaction::operation_result::OperationResult;
use crate::transaction::transaction_metrics::TransactionMetrics;

/// Trait defining the local transactional message service.
/// This trait provides methods for preparing, committing, rolling back, and checking transactional
/// messages, as well as managing the state of the transactional message service.
#[trait_variant::make(TransactionalMessageService: Send)]
pub trait TransactionalMessageServiceLocal: Sync + 'static {
/// Prepares a transactional message.
///
/// # Arguments
///
/// * `message_inner` - The inner message to be prepared.
///
/// # Returns
///
/// A `PutMessageResult` indicating the result of the message preparation.
async fn prepare_message(&mut self, message_inner: MessageExtBrokerInner) -> PutMessageResult;

/// Asynchronously prepares a transactional message.
///
/// # Arguments
///
/// * `message_inner` - The inner message to be prepared.
///
/// # Returns
///
/// A `PutMessageResult` indicating the result of the message preparation.
async fn async_prepare_message(
&mut self,
message_inner: MessageExtBrokerInner,
) -> PutMessageResult;

/// Deletes a prepared transactional message.
///
/// # Arguments
///
/// * `message_ext` - The external message to be deleted.
///
/// # Returns
///
/// A boolean indicating whether the message was successfully deleted.
async fn delete_prepare_message(&mut self, message_ext: &MessageExt) -> bool;

fn commit_message(&mut self, request_header: EndTransactionRequestHeader) -> OperationResult;
/// Commits a transactional message.
///
/// # Arguments
///
/// * `request_header` - The request header containing the transaction details.
///
/// # Returns
///
/// An `OperationResult` indicating the result of the commit operation.
fn commit_message(&mut self, request_header: &EndTransactionRequestHeader) -> OperationResult;

fn rollback_message(&mut self, request_header: EndTransactionRequestHeader) -> OperationResult;
/// Rolls back a transactional message.
///
/// # Arguments
///
/// * `request_header` - The request header containing the transaction details.
///
/// # Returns
///
/// An `OperationResult` indicating the result of the rollback operation.
fn rollback_message(&mut self, request_header: &EndTransactionRequestHeader)
-> OperationResult;

/// Checks the state of transactional messages.
///
/// # Arguments
///
/// * `transaction_timeout` - The timeout for the transaction.
/// * `transaction_check_max` - The maximum number of transaction checks.
fn check(
&self,
transaction_timeout: u64,
transaction_check_max: i32,
// listener: AbstractTransactionalMessageCheckListener,
);

/// Opens the transactional message service.
///
/// # Returns
///
/// A boolean indicating whether the service was successfully opened.
fn open(&self) -> bool;

/// Closes the transactional message service.
fn close(&self);

/// Gets the transaction metrics.
///
/// # Returns
///
/// A reference to the `TransactionMetrics`.
fn get_transaction_metrics(&self) -> &TransactionMetrics;

/// Sets the transaction metrics.
///
/// # Arguments
///
/// * `transaction_metrics` - The transaction metrics to be set.
fn set_transaction_metrics(&mut self, transaction_metrics: TransactionMetrics);
}

0 comments on commit 006752b

Please sign in to comment.