Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1097]🚧Support broker receive transaction message-2🔥 #1131

Merged
merged 1 commit into from
Nov 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ use crate::processor::admin_broker_processor::AdminBrokerProcessor;
use crate::processor::client_manage_processor::ClientManageProcessor;
use crate::processor::consumer_manage_processor::ConsumerManageProcessor;
use crate::processor::default_pull_message_result_handler::DefaultPullMessageResultHandler;
use crate::processor::end_transaction_processor::EndTransactionProcessor;
use crate::processor::pull_message_processor::PullMessageProcessor;
use crate::processor::pull_message_result_handler::PullMessageResultHandler;
use crate::processor::query_message_processor::QueryMessageProcessor;
Expand Down Expand Up @@ -534,7 +535,12 @@ impl BrokerRuntime {
consumer_manage_processor: ArcMut::new(consumer_manage_processor),
query_assignment_processor: Default::default(),
query_message_processor: ArcMut::new(query_message_processor),
end_transaction_processor: Default::default(),
end_transaction_processor: ArcMut::new(EndTransactionProcessor::new(
self.message_store_config.clone(),
self.broker_config.clone(),
self.transactional_message_service.as_ref().unwrap().clone(),
self.message_store.as_ref().unwrap().clone(),
)),
}
}

Expand Down
8 changes: 7 additions & 1 deletion rocketmq-broker/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub struct BrokerRequestProcessor<MS, TS> {
pub(crate) client_manage_processor: ArcMut<ClientManageProcessor<MS>>,
pub(crate) consumer_manage_processor: ArcMut<ConsumerManageProcessor<MS>>,
pub(crate) query_assignment_processor: ArcMut<QueryAssignmentProcessor>,
pub(crate) end_transaction_processor: ArcMut<EndTransactionProcessor>,
pub(crate) end_transaction_processor: ArcMut<EndTransactionProcessor<TS, MS>>,
pub(crate) admin_broker_processor: ArcMut<AdminBrokerProcessor>,
}
impl<MS, TS> Clone for BrokerRequestProcessor<MS, TS> {
Expand Down Expand Up @@ -154,6 +154,12 @@ where
.await
}

RequestCode::EndTransaction => {
self.end_transaction_processor
.process_request(channel, ctx, request_code, request)
.await
}

_ => {
self.admin_broker_processor
.process_request(channel, ctx, request_code, request)
Expand Down
361 changes: 355 additions & 6 deletions rocketmq-broker/src/processor/end_transaction_processor.rs

Large diffs are not rendered by default.

13 changes: 12 additions & 1 deletion rocketmq-broker/src/transaction/operation_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_remoting::code::response_code::ResponseCode;

#[derive(Debug, Clone)]
pub(crate) struct OperationResult {
pub(crate) prepare_message: MessageExt,
pub(crate) prepare_message: Option<MessageExt>,
pub(crate) response_remark: Option<String>,
pub(crate) response_code: ResponseCode,
}

impl Default for OperationResult {
fn default() -> Self {
Self {
prepare_message: None,
response_remark: None,
response_code: ResponseCode::Success,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,24 @@
*/
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_common::common::message::message_ext_broker_inner::MessageExtBrokerInner;
use rocketmq_common::common::message::message_single::Message;
use rocketmq_common::TimeUtils::get_current_millis;
use rocketmq_remoting::code::response_code::ResponseCode;
use rocketmq_remoting::protocol::header::end_transaction_request_header::EndTransactionRequestHeader;
use rocketmq_store::base::message_result::PutMessageResult;
use rocketmq_store::log_file::MessageStore;
use tokio::sync::Mutex;
use tracing::error;
use tracing::warn;

use crate::transaction::operation_result::OperationResult;
use crate::transaction::queue::message_queue_op_context::MessageQueueOpContext;
use crate::transaction::queue::transactional_message_bridge::TransactionalMessageBridge;
use crate::transaction::queue::transactional_message_util::TransactionalMessageUtil;
use crate::transaction::queue::transactional_op_batch_service::TransactionalOpBatchService;
use crate::transaction::transaction_metrics::TransactionMetrics;
use crate::transaction::transactional_message_service::TransactionalMessageService;
Expand All @@ -44,15 +52,94 @@ pub struct DefaultTransactionalMessageService<MS> {
transaction_metrics: TransactionMetrics,
}

impl<MS> DefaultTransactionalMessageService<MS> {
impl<MS> DefaultTransactionalMessageService<MS>
where
MS: MessageStore,
{
pub fn new(transactional_message_bridge: TransactionalMessageBridge<MS>) -> Self {
Self {
transactional_message_bridge,
delete_context: Arc::new(Mutex::new(HashMap::new())),
transactional_op_batch_service: TransactionalOpBatchService,
transactional_op_batch_service: TransactionalOpBatchService::new(),
transaction_metrics: TransactionMetrics,
}
}

fn get_half_message_by_offset(&self, offset: i64) -> OperationResult {
let message_ext = self
.transactional_message_bridge
.look_message_by_offset(offset);

if let Some(message_ext) = message_ext {
OperationResult {
prepare_message: Some(message_ext),
response_remark: None,
response_code: ResponseCode::Success,
}
} else {
OperationResult {
prepare_message: None,
response_remark: Some("Find prepared transaction message failed".to_owned()),
response_code: ResponseCode::SystemError,
}
}
}

pub async fn get_op_message(
&self,
queue_id: i32,
more_data: Option<String>,
) -> Option<Message> {
let topic = TransactionalMessageUtil::build_op_topic();
let delete_context = self.delete_context.lock().await;
let mq_context = delete_context.get(&queue_id)?;

let more_data_length = if let Some(ref data) = more_data {
data.len()
} else {
0
};
let mut length = more_data_length;
let max_size = self
.transactional_message_bridge
.broker_config
.transaction_op_msg_max_size as usize;
if length < max_size {
let sz = mq_context.get_total_size() as usize;
if sz > max_size || length + sz > max_size {
length = max_size + 100;
} else {
length += sz;
}
}

let mut sb = String::with_capacity(length);

if let Some(data) = more_data {
sb.push_str(&data);
}

while !mq_context.context_queue().is_empty().await {
if sb.len() >= max_size {
break;
}
{
if let Some(data) = mq_context.context_queue().try_poll().await {
sb.push_str(&data);
}
}
}

if sb.is_empty() {
return None;
}

Some(Message::with_tags(
topic,
TransactionalMessageUtil::REMOVE_TAG,
sb.as_bytes(),
))
}
}

impl<MS> TransactionalMessageService for DefaultTransactionalMessageService<MS>
Expand All @@ -74,38 +161,78 @@ where
.await
}

fn delete_prepare_message(&mut self, _message_ext: MessageExtBrokerInner) -> bool {
todo!()
async fn delete_prepare_message(&mut self, message_ext: &MessageExt) -> bool {
let queue_id = message_ext.queue_id;
let mut delete_context = self.delete_context.lock().await;
let mq_context = delete_context
.entry(queue_id)
.or_insert(MessageQueueOpContext::new(get_current_millis(), 20000));
let data = format!(
"{}{}",
message_ext.queue_offset,
TransactionalMessageUtil::OFFSET_SEPARATOR
);
let len = data.len();
let res = mq_context
.context_queue()
.offer(data.clone(), Duration::from_millis(100))
.await;
if res {
let total_size = mq_context.total_size_add_and_get(len as i32);
if total_size
> self
.transactional_message_bridge
.broker_config
.transaction_op_msg_max_size
{
self.transactional_op_batch_service.wakeup();
}
return true;
} else {
self.transactional_op_batch_service.wakeup();
}
let msg = self.get_op_message(queue_id, Some(data)).await;
if self
.transactional_message_bridge
.write_op(queue_id, msg.expect("message is none"))
.await
{
warn!("Force add remove op data. queueId={}", queue_id);
true
} else {
error!(
"Transaction op message write failed. messageId is {}, queueId is {}",
message_ext.msg_id, message_ext.queue_id
);
false
}
}

fn commit_message(&mut self, _request_header: EndTransactionRequestHeader) -> OperationResult {
todo!()
fn commit_message(&mut self, request_header: EndTransactionRequestHeader) -> OperationResult {
self.get_half_message_by_offset(request_header.commit_log_offset as i64)
}

fn rollback_message(
&mut self,
_request_header: EndTransactionRequestHeader,
) -> OperationResult {
todo!()
fn rollback_message(&mut self, request_header: EndTransactionRequestHeader) -> OperationResult {
self.get_half_message_by_offset(request_header.commit_log_offset as i64)
}

fn check(&self, _transaction_timeout: u64, _transaction_check_max: i32) {
todo!()
}

fn open(&self) -> bool {
todo!()
true
}

fn close(&self) {
todo!()
//nothing to do
}

fn get_transaction_metrics(&self) -> &TransactionMetrics {
todo!()
unimplemented!("get_transaction_metrics")
}

fn set_transaction_metrics(&mut self, _transaction_metrics: TransactionMetrics) {
todo!()
unimplemented!("set_transaction_metrics")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,46 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
pub struct MessageQueueOpContext;
use std::sync::atomic::AtomicI32;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use rocketmq_rust::RocketMQBlockingQueue;
use tokio::sync::Mutex;

pub struct MessageQueueOpContext {
total_size: AtomicI32,
last_write_timestamp: Arc<Mutex<u64>>,
context_queue: RocketMQBlockingQueue<String>,
}

impl MessageQueueOpContext {
pub fn new(timestamp: u64, queue_length: usize) -> Self {
MessageQueueOpContext {
total_size: AtomicI32::new(0),
last_write_timestamp: Arc::new(Mutex::new(timestamp)),
context_queue: RocketMQBlockingQueue::new(queue_length),
}
}

pub fn get_total_size(&self) -> i32 {
self.total_size.load(Ordering::Relaxed)
}

pub fn total_size_add_and_get(&self, delta: i32) -> i32 {
self.total_size.fetch_add(delta, Ordering::AcqRel) + delta
}

pub async fn get_last_write_timestamp(&self) -> u64 {
*self.last_write_timestamp.lock().await
}

pub async fn set_last_write_timestamp(&self, timestamp: u64) {
let mut last_timestamp = self.last_write_timestamp.lock().await;
*last_timestamp = timestamp;
}

pub fn context_queue(&self) -> &RocketMQBlockingQueue<String> {
&self.context_queue
}
}
Loading
Loading