From a0565467a7c1a97ff6112bfa51983da1b3d7ddf9 Mon Sep 17 00:00:00 2001 From: mxsm Date: Thu, 15 Aug 2024 23:38:36 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#896]=F0=9F=94=A5Implement=20Produer?= =?UTF-8?q?=20send=20batch=20message=F0=9F=9A=80=20(#897)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/processor/send_message_processor.rs | 6 +- rocketmq-client/Cargo.toml | 11 +- .../examples/batch/simple_batch_producer.rs | 63 ++++ .../examples/producer/simple_producer.rs | 1 + .../examples/quickstart/producer.rs | 53 +++ rocketmq-client/src/base/validators.rs | 4 +- .../src/hook/check_forbidden_context.rs | 4 +- .../src/hook/send_message_context.rs | 4 +- .../src/implementation/mq_client_api_impl.rs | 58 +-- .../src/producer/default_mq_producer.rs | 42 ++- rocketmq-client/src/producer/mq_producer.rs | 2 +- .../producer_impl/default_mq_producer_impl.rs | 114 +++--- rocketmq-common/src/common/message.rs | 356 ++++++++++++++++-- .../src/common/message/message_accessor.rs | 65 ++-- .../src/common/message/message_batch.rs | 160 +++++++- .../message/message_client_id_setter.rs | 12 +- .../src/common/message/message_decoder.rs | 54 +++ .../src/common/message/message_single.rs | 80 ++-- rocketmq-common/src/error.rs | 3 + 19 files changed, 883 insertions(+), 209 deletions(-) create mode 100644 rocketmq-client/examples/batch/simple_batch_producer.rs create mode 100644 rocketmq-client/examples/quickstart/producer.rs diff --git a/rocketmq-broker/src/processor/send_message_processor.rs b/rocketmq-broker/src/processor/send_message_processor.rs index 42391de1..fb3b5336 100644 --- a/rocketmq-broker/src/processor/send_message_processor.rs +++ b/rocketmq-broker/src/processor/send_message_processor.rs @@ -285,7 +285,7 @@ impl SendMessageProcessor { message_ext .message_ext_inner .message - .put_property(MessageConst::PROPERTY_CLUSTER.to_string(), cluster_name); + .put_property(MessageConst::PROPERTY_CLUSTER, cluster_name.as_str()); let mut batch_message = MessageExtBatch { message_ext_broker_inner: message_ext, @@ -325,8 +325,8 @@ impl SendMessageProcessor { .message_ext_inner .message .put_property( - MessageConst::PROPERTY_INNER_NUM.to_string(), - inner_num.to_string(), + MessageConst::PROPERTY_INNER_NUM, + inner_num.to_string().as_str(), ); batch_message.message_ext_broker_inner.properties_string = message_properties_to_string( batch_message diff --git a/rocketmq-client/Cargo.toml b/rocketmq-client/Cargo.toml index dfa6eae9..6a10b4d1 100644 --- a/rocketmq-client/Cargo.toml +++ b/rocketmq-client/Cargo.toml @@ -40,4 +40,13 @@ parking_lot = { workspace = true } [[example]] name = "simple-producer" -path = "examples/producer/simple_producer.rs" \ No newline at end of file +path = "examples/producer/simple_producer.rs" + +[[example]] +name = "producer" +path = "examples/quickstart/producer.rs" + +[[example]] +name = "simple-batch-producer" +path = "examples/batch/simple_batch_producer.rs" + diff --git a/rocketmq-client/examples/batch/simple_batch_producer.rs b/rocketmq-client/examples/batch/simple_batch_producer.rs new file mode 100644 index 00000000..8e766b78 --- /dev/null +++ b/rocketmq-client/examples/batch/simple_batch_producer.rs @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use rocketmq_client::producer::default_mq_producer::DefaultMQProducer; +use rocketmq_client::producer::mq_producer::MQProducer; +use rocketmq_common::common::message::message_single::Message; +use rocketmq_rust::rocketmq; + +pub const PRODUCER_GROUP: &str = "BatchProducerGroupName"; +pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876"; +pub const TOPIC: &str = "TopicTest"; +pub const TAG: &str = "TagA"; + +#[rocketmq::main] +pub async fn main() -> rocketmq_client::Result<()> { + //init logger + rocketmq_common::log::init_logger(); + + // create a producer builder with default configuration + let builder = DefaultMQProducer::builder(); + + let mut producer = builder + .producer_group(PRODUCER_GROUP.to_string()) + .name_server_addr(DEFAULT_NAMESRVADDR.to_string()) + .build(); + producer.start().await?; + + let mut messages = Vec::new(); + messages.push(Message::with_keys( + TOPIC, + TAG, + "OrderID001", + "Hello world 0".as_bytes(), + )); + messages.push(Message::with_keys( + TOPIC, + TAG, + "OrderID002", + "Hello world 1".as_bytes(), + )); + messages.push(Message::with_keys( + TOPIC, + TAG, + "OrderID003", + "Hello world 2".as_bytes(), + )); + let send_result = producer.send_batch(messages).await?; + println!("send result: {}", send_result); + Ok(()) +} diff --git a/rocketmq-client/examples/producer/simple_producer.rs b/rocketmq-client/examples/producer/simple_producer.rs index 14cf3bb5..0c158f6c 100644 --- a/rocketmq-client/examples/producer/simple_producer.rs +++ b/rocketmq-client/examples/producer/simple_producer.rs @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + use rocketmq_client::producer::default_mq_producer::DefaultMQProducer; use rocketmq_client::producer::mq_producer::MQProducer; use rocketmq_client::Result; diff --git a/rocketmq-client/examples/quickstart/producer.rs b/rocketmq-client/examples/quickstart/producer.rs new file mode 100644 index 00000000..14cf3bb5 --- /dev/null +++ b/rocketmq-client/examples/quickstart/producer.rs @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use rocketmq_client::producer::default_mq_producer::DefaultMQProducer; +use rocketmq_client::producer::mq_producer::MQProducer; +use rocketmq_client::Result; +use rocketmq_common::common::message::message_single::Message; +use rocketmq_rust::rocketmq; + +pub const MESSAGE_COUNT: usize = 1; +pub const PRODUCER_GROUP: &str = "please_rename_unique_group_name"; +pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876"; +pub const TOPIC: &str = "TopicTest"; +pub const TAG: &str = "TagA"; + +#[rocketmq::main] +pub async fn main() -> Result<()> { + //init logger + rocketmq_common::log::init_logger(); + + // create a producer builder with default configuration + let builder = DefaultMQProducer::builder(); + + let mut producer = builder + .producer_group(PRODUCER_GROUP.to_string()) + .name_server_addr(DEFAULT_NAMESRVADDR.to_string()) + .build(); + + producer.start().await?; + + for _ in 0..10 { + let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes()); + + let send_result = producer.send_with_timeout(message, 2000).await?; + println!("send result: {}", send_result); + } + producer.shutdown().await; + + Ok(()) +} diff --git a/rocketmq-client/src/base/validators.rs b/rocketmq-client/src/base/validators.rs index ae415413..fc34f381 100644 --- a/rocketmq-client/src/base/validators.rs +++ b/rocketmq-client/src/base/validators.rs @@ -63,8 +63,8 @@ impl Validators { } pub fn check_message( - msg: &Option, - default_mq_producer: &DefaultMQProducer, + msg: Option<&Message>, + default_mq_producer: &mut DefaultMQProducer, ) -> Result<()> { if msg.is_none() { return Err(MQClientException( diff --git a/rocketmq-client/src/hook/check_forbidden_context.rs b/rocketmq-client/src/hook/check_forbidden_context.rs index 8947d8fc..0ae04afe 100644 --- a/rocketmq-client/src/hook/check_forbidden_context.rs +++ b/rocketmq-client/src/hook/check_forbidden_context.rs @@ -15,7 +15,7 @@ * limitations under the License. */ use rocketmq_common::common::message::message_queue::MessageQueue; -use rocketmq_common::common::message::message_single::Message; +use rocketmq_common::common::message::MessageTrait; use crate::implementation::communication_mode::CommunicationMode; use crate::producer::send_result::SendResult; @@ -24,7 +24,7 @@ use crate::producer::send_result::SendResult; pub struct CheckForbiddenContext<'a> { pub name_srv_addr: Option, pub group: Option, - pub message: Option<&'a Message>, + pub message: Option<&'a dyn MessageTrait>, pub mq: Option<&'a MessageQueue>, pub broker_addr: Option, pub communication_mode: Option, diff --git a/rocketmq-client/src/hook/send_message_context.rs b/rocketmq-client/src/hook/send_message_context.rs index 3f508d92..45bf0b22 100644 --- a/rocketmq-client/src/hook/send_message_context.rs +++ b/rocketmq-client/src/hook/send_message_context.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use rocketmq_common::common::message::message_enum::MessageType; use rocketmq_common::common::message::message_queue::MessageQueue; -use rocketmq_common::common::message::message_single::Message; +use rocketmq_common::common::message::MessageTrait; use crate::implementation::communication_mode::CommunicationMode; use crate::producer::producer_impl::default_mq_producer_impl::DefaultMQProducerImpl; @@ -29,7 +29,7 @@ use crate::producer::send_result::SendResult; #[derive(Default)] pub struct SendMessageContext<'a> { pub producer_group: Option, - pub message: Option, + pub message: Option>, pub mq: Option<&'a MessageQueue>, pub broker_addr: Option, pub born_host: Option, diff --git a/rocketmq-client/src/implementation/mq_client_api_impl.rs b/rocketmq-client/src/implementation/mq_client_api_impl.rs index 15c848df..0029fc50 100644 --- a/rocketmq-client/src/implementation/mq_client_api_impl.rs +++ b/rocketmq-client/src/implementation/mq_client_api_impl.rs @@ -22,8 +22,8 @@ use lazy_static::lazy_static; use rocketmq_common::common::message::message_batch::MessageBatch; use rocketmq_common::common::message::message_client_id_setter::MessageClientIDSetter; use rocketmq_common::common::message::message_queue::MessageQueue; -use rocketmq_common::common::message::message_single::Message; use rocketmq_common::common::message::MessageConst; +use rocketmq_common::common::message::MessageTrait; use rocketmq_common::common::mix_all; use rocketmq_common::common::namesrv::default_top_addressing::DefaultTopAddressing; use rocketmq_common::common::namesrv::name_server_update_callback::NameServerUpdateCallback; @@ -226,11 +226,11 @@ impl MQClientAPIImpl { self.remoting_client.get_name_server_address_list() } - pub async fn send_message( + pub async fn send_message( &mut self, addr: &str, broker_name: &str, - msg: &mut Message, + msg: &mut T, request_header: SendMessageRequestHeader, timeout_millis: u64, communication_mode: CommunicationMode, @@ -240,7 +240,10 @@ impl MQClientAPIImpl { retry_times_when_send_failed: u32, context: &mut Option>, producer: &DefaultMQProducerImpl, - ) -> Result> { + ) -> Result> + where + T: MessageTrait, + { let begin_start_time = Instant::now(); let msg_type = msg.get_property(MessageConst::PROPERTY_MESSAGE_TYPE); let is_reply = msg_type.is_some() && msg_type.unwrap() == mix_all::REPLY_MESSAGE_FLAG; @@ -279,11 +282,11 @@ impl MQClientAPIImpl { }; // if compressed_body is not None, set request body to compressed_body - if msg.compressed_body.is_some() { - let compressed_body = std::mem::take(&mut msg.compressed_body); + if msg.get_compressed_body_mut().is_some() { + let compressed_body = std::mem::take(msg.get_compressed_body_mut()); request.set_body_mut_ref(compressed_body); } else { - request.set_body_mut_ref(msg.body().clone()); + request.set_body_mut_ref(msg.get_body().cloned()); } match communication_mode { CommunicationMode::Sync => { @@ -338,17 +341,20 @@ impl MQClientAPIImpl { } } - pub async fn send_message_simple( + pub async fn send_message_simple( &mut self, addr: &str, broker_name: &str, - msg: &mut Message, + msg: &mut T, request_header: SendMessageRequestHeader, timeout_millis: u64, communication_mode: CommunicationMode, context: &mut Option>, producer: &DefaultMQProducerImpl, - ) -> Result> { + ) -> Result> + where + T: MessageTrait, + { self.send_message( addr, broker_name, @@ -366,14 +372,17 @@ impl MQClientAPIImpl { .await } - async fn send_message_sync( + async fn send_message_sync( &mut self, addr: &str, broker_name: &str, - msg: &Message, + msg: &T, timeout_millis: u64, request: RemotingCommand, - ) -> Result { + ) -> Result + where + T: MessageTrait, + { let response = self .remoting_client .invoke_async(Some(addr.to_string()), request, timeout_millis) @@ -381,11 +390,11 @@ impl MQClientAPIImpl { self.process_send_response(broker_name, msg, &response, addr) } - async fn send_message_async( + async fn send_message_async( &mut self, addr: &str, broker_name: &str, - msg: &Message, + msg: &T, timeout_millis: u64, request: RemotingCommand, send_callback: Option>>, @@ -455,13 +464,16 @@ impl MQClientAPIImpl { } } - fn process_send_response( + fn process_send_response( &mut self, broker_name: &str, - msg: &Message, + msg: &T, response: &RemotingCommand, addr: &str, - ) -> Result { + ) -> Result + where + T: MessageTrait, + { let response_code = ResponseCode::from(response.code()); let send_status = match response_code { ResponseCode::FlushDiskTimeout => SendStatus::FlushDiskTimeout, @@ -479,7 +491,7 @@ impl MQClientAPIImpl { let response_header = response .decode_command_custom_header_fast::() .unwrap(); - let mut topic = msg.topic.as_str().to_string(); + let mut topic = msg.get_topic().to_string(); let namespace = self.client_config.get_namespace(); if namespace.is_some() && !namespace.as_ref().unwrap().is_empty() { topic = NamespaceUtil::without_namespace_with_namespace( @@ -493,7 +505,7 @@ impl MQClientAPIImpl { let msgs = msg.as_any().downcast_ref::(); if msgs.is_some() && response_header.batch_uniq_id().is_none() { let mut sb = String::new(); - for msg in msgs.unwrap().messages.iter() { + for msg in msgs.unwrap().messages.as_ref().unwrap().iter() { sb.push_str(if sb.is_empty() { "" } else { "," }); sb.push_str(MessageClientIDSetter::get_uniq_id(msg).unwrap().as_str()); } @@ -527,10 +539,10 @@ impl MQClientAPIImpl { Ok(send_result) } - async fn on_exception_impl( + async fn on_exception_impl( &mut self, broker_name: &str, - msg: &Message, + msg: &T, timeout_millis: u64, mut request: RemotingCommand, send_callback: Option>>, @@ -564,7 +576,7 @@ impl MQClientAPIImpl { warn!( "async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, - msg.topic(), + msg.get_topic(), addr, retry_broker_name ); diff --git a/rocketmq-client/src/producer/default_mq_producer.rs b/rocketmq-client/src/producer/default_mq_producer.rs index 942d9c04..9d9fe29e 100644 --- a/rocketmq-client/src/producer/default_mq_producer.rs +++ b/rocketmq-client/src/producer/default_mq_producer.rs @@ -21,16 +21,22 @@ use std::sync::Arc; use rocketmq_common::common::compression::compression_type::CompressionType; use rocketmq_common::common::compression::compressor::Compressor; use rocketmq_common::common::compression::compressor_factory::CompressorFactory; +use rocketmq_common::common::message::message_batch::MessageBatch; +use rocketmq_common::common::message::message_client_id_setter::MessageClientIDSetter; use rocketmq_common::common::message::message_queue::MessageQueue; use rocketmq_common::common::message::message_single::Message; +use rocketmq_common::common::message::MessageTrait; use rocketmq_common::common::mix_all::MESSAGE_COMPRESS_LEVEL; use rocketmq_common::common::mix_all::MESSAGE_COMPRESS_TYPE; use rocketmq_common::common::topic::TopicValidator; use rocketmq_common::ArcRefCellWrapper; use rocketmq_remoting::code::response_code::ResponseCode; use rocketmq_remoting::runtime::RPCHook; +use tracing::error; use crate::base::client_config::ClientConfig; +use crate::base::validators::Validators; +use crate::error::MQClientError::MQClientException; use crate::producer::default_mq_produce_builder::DefaultMQProducerBuilder; use crate::producer::message_queue_selector::MessageQueueSelector; use crate::producer::mq_producer::MQProducer; @@ -460,6 +466,29 @@ impl DefaultMQProducer { default_mqproducer_impl.set_send_latency_fault_enable(send_latency_fault_enable); } } + + fn batch(&mut self, messages: Vec) -> Result { + match MessageBatch::generate_from_vec(messages) { + Ok(mut msg_batch) => { + for message in msg_batch.messages.as_mut().unwrap() { + Validators::check_message(Some(message), self)?; + MessageClientIDSetter::set_uniq_id(message); + message.set_topic(self.with_namespace(message.get_topic()).as_str()); + } + MessageClientIDSetter::set_uniq_id(&mut msg_batch.final_message); + msg_batch.set_body(msg_batch.encode()); + msg_batch.set_topic(self.with_namespace(msg_batch.get_topic()).as_str()); + Ok(msg_batch) + } + Err(err) => { + error!("Failed to initiate the MessageBatch: {:?}", err); + Err(MQClientException( + -1, + "Failed to initiate the MessageBatch".to_string(), + )) + } + } + } } impl DefaultMQProducer { @@ -531,7 +560,7 @@ impl MQProducer for DefaultMQProducer { .default_mqproducer_impl .as_mut() .unwrap() - .send(msg, timeout) + .send_with_timeout(msg, timeout) .await?; Ok(result.expect("SendResult should not be None")) } @@ -646,8 +675,15 @@ impl MQProducer for DefaultMQProducer { todo!() } - async fn send_batch(&self, msgs: &[Message]) -> Result { - todo!() + async fn send_batch(&mut self, msgs: Vec) -> Result { + let batch = self.batch(msgs)?; + let result = self + .default_mqproducer_impl + .as_mut() + .unwrap() + .send(batch) + .await?; + Ok(result.expect("SendResult should not be None")) } async fn send_batch_with_timeout(&self, msgs: &[Message], timeout: u64) -> Result { diff --git a/rocketmq-client/src/producer/mq_producer.rs b/rocketmq-client/src/producer/mq_producer.rs index a1d7c95b..1c37e344 100644 --- a/rocketmq-client/src/producer/mq_producer.rs +++ b/rocketmq-client/src/producer/mq_producer.rs @@ -312,7 +312,7 @@ pub trait MQProducerLocal: Any + 'static { /// /// # Returns /// A `Result` containing the `SendResult`, or an error. - async fn send_batch(&self, msgs: &[Message]) -> Result; + async fn send_batch(&mut self, msgs: Vec) -> Result; /// Sends a batch of messages with a timeout. /// diff --git a/rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs b/rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs index dcb89c8c..8eb5415b 100644 --- a/rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs +++ b/rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs @@ -29,6 +29,7 @@ use rocketmq_common::common::message::message_queue::MessageQueue; use rocketmq_common::common::message::message_single::Message; use rocketmq_common::common::message::message_single::MessageExt; use rocketmq_common::common::message::MessageConst; +use rocketmq_common::common::message::MessageTrait; use rocketmq_common::common::mix_all; use rocketmq_common::common::mix_all::CLIENT_INNER_PRODUCER_GROUP; use rocketmq_common::common::mix_all::DEFAULT_PRODUCER_GROUP; @@ -125,24 +126,40 @@ impl DefaultMQProducerImpl { } } - pub async fn send(&mut self, msg: Message, timeout: u64) -> Result> { + #[inline] + pub async fn send_with_timeout(&mut self, msg: T, timeout: u64) -> Result> + where + T: MessageTrait + Clone + Send + Sync, + { self.send_default_impl(msg, CommunicationMode::Sync, None, timeout) .await } - async fn send_default_impl( + #[inline] + pub async fn send(&mut self, msg: T) -> Result> + where + T: MessageTrait + Clone + Send + Sync, + { + self.send_with_timeout(msg, self.producer_config.send_msg_timeout() as u64) + .await + } + + async fn send_default_impl( &mut self, - mut msg: Message, + mut msg: T, communication_mode: CommunicationMode, send_callback: Option, timeout: u64, - ) -> Result> { + ) -> Result> + where + T: MessageTrait + Clone + Send + Sync, + { self.make_sure_state_ok()?; let invoke_id = random::(); let begin_timestamp_first = Instant::now(); let mut begin_timestamp_prev = begin_timestamp_first; let mut end_timestamp = begin_timestamp_first; - let topic = msg.topic().to_string(); + let topic = msg.get_topic().to_string(); let topic_publish_info = self.try_to_find_topic_publish_info(topic.as_str()).await; if let Some(topic_publish_info) = topic_publish_info { if topic_publish_info.ok() { @@ -179,8 +196,10 @@ impl DefaultMQProducerImpl { //Reset topic with namespace during resend. let namespace = self.client_config.get_namespace().unwrap_or("".to_string()); - msg.topic = - NamespaceUtil::wrap_namespace(namespace.as_str(), topic.as_str()); + msg.set_topic( + NamespaceUtil::wrap_namespace(namespace.as_str(), topic.as_str()) + .as_str(), + ); } let cost_time = (begin_timestamp_prev - begin_timestamp_first).as_millis() as u64; @@ -200,16 +219,17 @@ impl DefaultMQProducerImpl { timeout - cost_time, ) .await; - end_timestamp = Instant::now(); - self.update_fault_item( - mq.as_ref().unwrap().get_broker_name(), - (end_timestamp - begin_timestamp_prev).as_millis() as u64, - false, - true, - ); + match result_inner { Ok(result) => { send_result = result; + end_timestamp = Instant::now(); + self.update_fault_item( + mq.as_ref().unwrap().get_broker_name(), + (end_timestamp - begin_timestamp_prev).as_millis() as u64, + false, + true, + ); return match communication_mode { CommunicationMode::Sync => { if let Some(ref result) = send_result { @@ -247,7 +267,7 @@ impl DefaultMQProducerImpl { mq, err.to_string() ); - warn!("{:?}", msg); + // warn!("{:?}", msg); exception = Some(err); continue; } @@ -381,15 +401,18 @@ impl DefaultMQProducerImpl { ); } - async fn send_kernel_impl( + async fn send_kernel_impl( &mut self, - msg: &mut Message, + msg: &mut T, mq: &MessageQueue, communication_mode: CommunicationMode, send_callback: Option>>, topic_publish_info: &TopicPublishInfo, timeout: u64, - ) -> Result> { + ) -> Result> + where + T: MessageTrait + Clone + Send + Sync, + { let begin_start_time = Instant::now(); let mut broker_name = self .client_instance @@ -486,7 +509,7 @@ impl DefaultMQProducerImpl { communication_mode: Some(communication_mode), born_host, broker_addr: Some(broker_addr.clone()), - message: Some(msg.clone()), + message: Some(Box::new(msg.clone())), mq: Some(mq), namespace, ..Default::default() @@ -511,15 +534,15 @@ impl DefaultMQProducerImpl { //build send message request header let mut request_header = SendMessageRequestHeader { producer_group: self.producer_config.producer_group().to_string(), - topic: msg.topic().to_string(), + topic: msg.get_topic().to_string(), default_topic: self.producer_config.create_topic_key().to_string(), default_topic_queue_nums: self.producer_config.default_topic_queue_nums() as i32, queue_id: Some(mq.get_queue_id()), sys_flag, born_timestamp: get_current_millis() as i64, - flag: msg.flag, + flag: msg.get_flag(), properties: Some(MessageDecoder::message_properties_to_string( - &msg.properties, + msg.get_properties(), )), reconsume_times: Some(0), unit_mode: Some(self.is_unit_mode()), @@ -554,12 +577,15 @@ impl DefaultMQProducerImpl { let send_result = match communication_mode { CommunicationMode::Async => { if topic_with_namespace { - msg.topic = NamespaceUtil::without_namespace_with_namespace( - msg.topic(), - self.client_config - .get_namespace() - .unwrap_or(String::from("")) - .as_str(), + msg.set_topic( + NamespaceUtil::without_namespace_with_namespace( + msg.get_topic(), + self.client_config + .get_namespace() + .unwrap_or(String::from("")) + .as_str(), + ) + .as_str(), ); } let cost_time_sync = (Instant::now() - begin_start_time).as_millis() as u64; @@ -661,25 +687,25 @@ impl DefaultMQProducerImpl { Ok(()) } - fn try_to_compress_message(&self, msg: &mut Message) -> bool { - if msg.as_any().downcast_ref::().is_some() { - return false; - } - if let Some(body) = msg.body.as_mut() { - if body.len() >= self.producer_config.compress_msg_body_over_howmuch() as usize { - let data = self - .producer_config - .compressor() - .as_ref() - .unwrap() - .compress(body, self.producer_config.compress_level()); - if let Ok(data) = data { - //store the compressed data - msg.compressed_body = Some(Bytes::from(data)); - return true; + fn try_to_compress_message(&self, msg: &mut T) -> bool { + if let Some(message) = msg.as_any_mut().downcast_mut::() { + if let Some(body) = message.compressed_body.as_mut() { + if body.len() >= self.producer_config.compress_msg_body_over_howmuch() as usize { + let data = self + .producer_config + .compressor() + .as_ref() + .unwrap() + .compress(body, self.producer_config.compress_level()); + if let Ok(data) = data { + //store the compressed data + msg.set_compressed_body_mut(Bytes::from(data)); + return true; + } } } } + false } diff --git a/rocketmq-common/src/common/message.rs b/rocketmq-common/src/common/message.rs index 6f1acbb6..f5e48283 100644 --- a/rocketmq-common/src/common/message.rs +++ b/rocketmq-common/src/common/message.rs @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - +use std::any::Any; use std::collections::HashMap; use std::collections::HashSet; use std::string::ToString; @@ -32,74 +32,350 @@ pub mod message_id; pub mod message_queue; pub mod message_single; -/// Trait defining the behavior of a message in a messaging system. -pub trait MessageTrait { - /// Retrieves the topic of the message. +/// This module defines the `MessageTrait` trait, which provides a flexible interface for working +/// with message objects in RocketMQ. It includes methods for managing message properties, keys, +/// tags, body, and other metadata related to the message. +pub trait MessageTrait: Any { + /// Sets the keys for the message. + /// + /// # Arguments + /// + /// * `keys` - The keys to set, converted into a `String`. + fn set_keys(&mut self, keys: &str) { + self.put_property(MessageConst::PROPERTY_KEYS, keys); + } + + /// Adds a property to the message. + /// + /// # Arguments + /// + /// * `key` - The property key, converted into a `String`. + /// * `value` - The property value, converted into a `String`. + fn put_property(&mut self, key: &str, value: &str); + + /// Clears a specific property from the message. + /// + /// # Arguments + /// + /// * `name` - The name of the property to clear. + fn clear_property(&mut self, name: &str); + + /// Adds a user-defined property to the message. + /// + /// # Arguments + /// + /// * `name` - The name of the user property, converted into a `String`. + /// * `value` - The value of the user property, converted into a `String`. + fn put_user_property(&mut self, name: &str, value: &str) { + let name = name.trim(); + let value = value.trim(); + if STRING_HASH_SET.contains(name) { + panic!( + "The Property<{}> is used by system, input another please", + name + ); + } + if value.is_empty() || name.is_empty() { + panic!("The name or value of property can not be null or blank string!"); + } + self.put_property(name, value); + } + + /// Retrieves a user-defined property from the message. + /// + /// # Arguments + /// + /// * `name` - The name of the user property to retrieve. /// /// # Returns - /// A string slice representing the topic of the message. - fn topic(&self) -> &str; + /// + /// An `Option` containing the property value if it exists, otherwise `None`. + fn get_user_property(&self, name: &str) -> Option { + self.get_property(name) + } - /// Sets the topic of the message. + /// Retrieves a property from the message. /// /// # Arguments - /// * `topic` - A string or a type that can be converted into a `String` representing the new - /// topic of the message. - fn with_topic(&mut self, topic: impl Into); + /// + /// * `name` - The name of the property to retrieve. + /// + /// # Returns + /// + /// An `Option` containing the property value if it exists, otherwise `None`. + fn get_property(&self, name: &str) -> Option; - /// Retrieves the tags associated with the message, if any. + /// Retrieves the topic of the message. /// /// # Returns - /// An `Option` containing a string slice representing the tags of the message, or `None` if no - /// tags are set. - fn tags(&self) -> Option<&str>; + /// + /// A reference to the topic as a `&str`. + fn get_topic(&self) -> &str; - /// Sets the tags of the message. + /// Sets the topic for the message. /// /// # Arguments - /// * `tags` - A string or a type that can be converted into a `String` representing the new - /// tags of the message. - fn with_tags(&mut self, tags: impl Into); + /// + /// * `topic` - The topic to set, converted into a `String`. + fn set_topic(&mut self, topic: &str); - /// Adds a property to the message. + /// Retrieves the tags associated with the message. + /// + /// # Returns + /// + /// An `Option` containing the tags if they exist, otherwise `None`. + fn get_tags(&self) -> Option { + self.get_property(MessageConst::PROPERTY_TAGS) + } + + /// Sets the tags for the message. /// /// # Arguments - /// * `key` - A string or a type that can be converted into a `String` representing the property - /// key. - /// * `value` - A string or a type that can be converted into a `String` representing the - /// property value. - fn put_property(&mut self, key: impl Into, value: impl Into); + /// + /// * `tags` - The tags to set, converted into a `String`. + fn set_tags(&mut self, tags: &str) { + self.put_property(MessageConst::PROPERTY_TAGS, tags); + } - /// Retrieves all properties of the message. + /// Retrieves the keys associated with the message. /// /// # Returns - /// A reference to a `HashMap` containing all properties of the message, where the key is the - /// property name and the value is the property value. - fn properties(&self) -> &HashMap; + /// + /// An `Option` containing the keys if they exist, otherwise `None`. + fn get_keys(&self) -> Option { + self.get_property(MessageConst::PROPERTY_KEYS) + } - /// Adds a user-defined property to the message. + /// Sets multiple keys from a collection for the message. /// /// # Arguments - /// * `name` - A string or a type that can be converted into a `String` representing the name of - /// the user-defined property. - /// * `value` - A string or a type that can be converted into a `String` representing the value - /// of the user-defined property. - fn put_user_property(&mut self, name: impl Into, value: impl Into); + /// + /// * `key_collection` - A vector of keys to set. + fn set_keys_from_collection(&mut self, key_collection: Vec) { + let keys = key_collection.join(MessageConst::KEY_SEPARATOR); + self.set_keys(keys.as_str()); + } /// Retrieves the delay time level of the message. /// /// # Returns - /// An `i32` representing the delay time level of the message. - fn delay_time_level(&self) -> i32; + /// + /// An `i32` representing the delay time level. + fn get_delay_time_level(&self) -> i32 { + self.get_property(MessageConst::PROPERTY_DELAY_TIME_LEVEL) + .unwrap_or("0".to_string()) + .parse() + .unwrap_or(0) + } - /// Sets the delay time level of the message. + /// Sets the delay time level for the message. /// /// # Arguments - /// * `level` - An `i32` representing the new delay time level of the message. + /// + /// * `level` - The delay time level to set. + fn set_delay_time_level(&mut self, level: i32) { + self.put_property( + MessageConst::PROPERTY_DELAY_TIME_LEVEL, + level.to_string().as_str(), + ); + } + + /// Checks if the message should wait for store acknowledgment. /// /// # Returns - /// The updated delay time level of the message. - fn with_delay_time_level(&self, level: i32) -> i32; + /// + /// `true` if the message should wait for store acknowledgment; `false` otherwise. + fn is_wait_store_msg_ok(&self) -> bool { + self.get_property(MessageConst::PROPERTY_WAIT_STORE_MSG_OK) + .unwrap_or("true".to_string()) + .parse() + .unwrap_or(true) + } + + /// Sets whether the message should wait for store acknowledgment. + /// + /// # Arguments + /// + /// * `wait_store_msg_ok` - A boolean indicating whether to wait for store acknowledgment. + fn set_wait_store_msg_ok(&mut self, wait_store_msg_ok: bool) { + self.put_property( + MessageConst::PROPERTY_WAIT_STORE_MSG_OK, + wait_store_msg_ok.to_string().as_str(), + ); + } + + /// Sets the instance ID for the message. + /// + /// # Arguments + /// + /// * `instance_id` - The instance ID to set. + fn set_instance_id(&mut self, instance_id: &str) { + self.put_property(MessageConst::PROPERTY_INSTANCE_ID, instance_id); + } + + /// Retrieves the flag associated with the message. + /// + /// # Returns + /// + /// An `i32` representing the flag. + fn get_flag(&self) -> i32; + + /// Sets the flag for the message. + /// + /// # Arguments + /// + /// * `flag` - The flag to set. + fn set_flag(&mut self, flag: i32); + + /// Retrieves the body of the message. + /// + /// # Returns + /// + /// A byte slice (`&[u8]`) representing the body of the message. + fn get_body(&self) -> Option<&Bytes>; + + /// Sets the body of the message. + /// + /// # Arguments + /// + /// * `body` - The byte slice (`&[u8]`) to set as the body. + fn set_body(&mut self, body: Bytes); + + /// Retrieves all properties associated with the message. + /// + /// # Returns + /// + /// A reference to a `HashMap` containing the properties. + fn get_properties(&self) -> &HashMap; + + /// Sets multiple properties for the message. + /// + /// # Arguments + /// + /// * `properties` - A `HashMap` containing the properties to set. + fn set_properties(&mut self, properties: HashMap); + + /// Retrieves the buyer ID associated with the message. + /// + /// # Returns + /// + /// An `Option` containing the buyer ID if it exists, otherwise `None`. + fn get_buyer_id(&self) -> Option { + self.get_property(MessageConst::PROPERTY_BUYER_ID) + } + + /// Sets the buyer ID for the message. + /// + /// # Arguments + /// + /// * `buyer_id` - The buyer ID to set. + fn set_buyer_id(&mut self, buyer_id: &str) { + self.put_property(MessageConst::PROPERTY_BUYER_ID, buyer_id); + } + + /// Retrieves the transaction ID associated with the message. + /// + /// # Returns + /// + /// A reference to the transaction ID as a `&str`. + fn get_transaction_id(&self) -> &str; + + /// Sets the transaction ID for the message. + /// + /// # Arguments + /// + /// * `transaction_id` - The transaction ID to set. + fn set_transaction_id(&mut self, transaction_id: &str); + + /// Sets the delay time for the message in seconds. + /// + /// # Arguments + /// + /// * `sec` - The delay time in seconds. + fn set_delay_time_sec(&mut self, sec: u64) { + self.put_property( + MessageConst::PROPERTY_TIMER_DELAY_SEC, + sec.to_string().as_str(), + ); + } + + /// Retrieves the delay time for the message in seconds. + /// + /// # Returns + /// + /// The delay time in seconds. + fn get_delay_time_sec(&self) -> u64 { + self.get_property(MessageConst::PROPERTY_TIMER_DELAY_SEC) + .unwrap_or("0".to_string()) + .parse() + .unwrap_or(0) + } + + /// Sets the delay time for the message in milliseconds. + /// + /// # Arguments + /// + /// * `time_ms` - The delay time in milliseconds. + fn set_delay_time_ms(&mut self, time_ms: u64) { + self.put_property( + MessageConst::PROPERTY_TIMER_DELAY_MS, + time_ms.to_string().as_str(), + ); + } + + /// Retrieves the delay time for the message in milliseconds. + /// + /// # Returns + /// + /// The delay time in milliseconds. + fn get_delay_time_ms(&self) -> u64 { + self.get_property(MessageConst::PROPERTY_TIMER_DELAY_MS) + .unwrap_or("0".to_string()) + .parse() + .unwrap_or(0) + } + + /// Sets the delivery time for the message in milliseconds. + /// + /// # Arguments + /// + /// * `time_ms` - The delivery time in milliseconds. + fn set_deliver_time_ms(&mut self, time_ms: u64) { + self.put_property( + MessageConst::PROPERTY_TIMER_DELIVER_MS, + time_ms.to_string().as_str(), + ); + } + + /// Retrieves the delivery time for the message in milliseconds. + /// + /// # Returns + /// + /// The delivery time in milliseconds. + fn get_deliver_time_ms(&self) -> u64 { + self.get_property(MessageConst::PROPERTY_TIMER_DELIVER_MS) + .unwrap_or("0".to_string()) + .parse() + .unwrap_or(0) + } + + fn get_compressed_body_mut(&mut self) -> &mut Option; + fn get_compressed_body(&self) -> Option<&Bytes>; + fn set_compressed_body_mut(&mut self, compressed_body: Bytes); + + /// Converts the message into a dynamic `Any` type. + /// + /// # Returns + /// + /// A reference to the message as `&dyn Any`. + fn as_any(&self) -> &dyn Any; + + /// Converts the message into a mutable dynamic `Any` type. + /// + /// # Returns + /// + /// A mutable reference to the message as `&mut dyn Any`. + fn as_any_mut(&mut self) -> &mut dyn Any; } pub const MESSAGE_MAGIC_CODE_V1: i32 = -626843481; diff --git a/rocketmq-common/src/common/message/message_accessor.rs b/rocketmq-common/src/common/message/message_accessor.rs index 88669ac7..203530a7 100644 --- a/rocketmq-common/src/common/message/message_accessor.rs +++ b/rocketmq-common/src/common/message/message_accessor.rs @@ -21,88 +21,73 @@ use crate::common::message::MessageTrait; pub struct MessageAccessor; impl MessageAccessor { - pub fn clear_property(msg: &mut Message, name: &str) { + pub fn clear_property(msg: &mut T, name: &str) { msg.clear_property(name); } - pub fn set_transfer_flag(msg: &mut Message, unit: &str) { - msg.put_property( - MessageConst::PROPERTY_TRANSFER_FLAG.to_string(), - unit.to_string(), - ); + pub fn set_transfer_flag(msg: &mut T, unit: &str) { + msg.put_property(MessageConst::PROPERTY_TRANSFER_FLAG, unit); } - pub fn get_transfer_flag(msg: &Message) -> Option { + pub fn get_transfer_flag(msg: &T) -> Option { msg.get_property(MessageConst::PROPERTY_TRANSFER_FLAG) } - pub fn set_correction_flag(msg: &mut Message, unit: &str) { - msg.put_property( - MessageConst::PROPERTY_CORRECTION_FLAG.to_string(), - unit.to_string(), - ); + pub fn set_correction_flag(msg: &mut T, unit: &str) { + msg.put_property(MessageConst::PROPERTY_CORRECTION_FLAG, unit); } - pub fn get_correction_flag(msg: &Message) -> Option { + pub fn get_correction_flag(msg: &T) -> Option { msg.get_property(MessageConst::PROPERTY_CORRECTION_FLAG) } - pub fn set_origin_message_id(msg: &mut Message, origin_message_id: &str) { - msg.put_property( - MessageConst::PROPERTY_ORIGIN_MESSAGE_ID.to_string(), - origin_message_id.to_string(), - ); + pub fn set_origin_message_id(msg: &mut T, origin_message_id: &str) { + msg.put_property(MessageConst::PROPERTY_ORIGIN_MESSAGE_ID, origin_message_id); } - pub fn get_origin_message_id(msg: &Message) -> Option { + pub fn get_origin_message_id(msg: &T) -> Option { msg.get_property(MessageConst::PROPERTY_ORIGIN_MESSAGE_ID) } - pub fn set_mq2_flag(msg: &mut Message, flag: &str) { - msg.put_property( - MessageConst::PROPERTY_MQ2_FLAG.to_string(), - flag.to_string(), - ); + pub fn set_mq2_flag(msg: &mut T, flag: &str) { + msg.put_property(MessageConst::PROPERTY_MQ2_FLAG, flag); } - pub fn get_mq2_flag(msg: &Message) -> Option { + pub fn get_mq2_flag(msg: &T) -> Option { msg.get_property(MessageConst::PROPERTY_MQ2_FLAG) } - pub fn set_reconsume_time(msg: &mut Message, reconsume_times: &str) { - msg.put_property( - MessageConst::PROPERTY_RECONSUME_TIME.to_string(), - reconsume_times.to_string(), - ); + pub fn set_reconsume_time(msg: &mut T, reconsume_times: &str) { + msg.put_property(MessageConst::PROPERTY_RECONSUME_TIME, reconsume_times); } #[inline] - pub fn get_reconsume_time(msg: &Message) -> Option { + pub fn get_reconsume_time(msg: &T) -> Option { msg.get_property(MessageConst::PROPERTY_RECONSUME_TIME) } - pub fn set_max_reconsume_times(msg: &mut Message, max_reconsume_times: &str) { + pub fn set_max_reconsume_times(msg: &mut T, max_reconsume_times: &str) { msg.put_property( - MessageConst::PROPERTY_MAX_RECONSUME_TIMES.to_string(), - max_reconsume_times.to_string(), + MessageConst::PROPERTY_MAX_RECONSUME_TIMES, + max_reconsume_times, ); } - pub fn get_max_reconsume_times(msg: &Message) -> Option { + pub fn get_max_reconsume_times(msg: &T) -> Option { msg.get_property(MessageConst::PROPERTY_MAX_RECONSUME_TIMES) } - pub fn set_consume_start_time_stamp( - msg: &mut Message, + pub fn set_consume_start_time_stamp( + msg: &mut T, property_consume_start_time_stamp: &str, ) { msg.put_property( - MessageConst::PROPERTY_CONSUME_START_TIMESTAMP.to_string(), - property_consume_start_time_stamp.to_string(), + MessageConst::PROPERTY_CONSUME_START_TIMESTAMP, + property_consume_start_time_stamp, ); } - pub fn get_consume_start_time_stamp(msg: &Message) -> Option { + pub fn get_consume_start_time_stamp(msg: &T) -> Option { msg.get_property(MessageConst::PROPERTY_CONSUME_START_TIMESTAMP) } } diff --git a/rocketmq-common/src/common/message/message_batch.rs b/rocketmq-common/src/common/message/message_batch.rs index 043d9d80..44018990 100644 --- a/rocketmq-common/src/common/message/message_batch.rs +++ b/rocketmq-common/src/common/message/message_batch.rs @@ -14,55 +14,175 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - +use std::any::Any; use std::collections::HashMap; +use std::path::Iter; use bytes::Bytes; +use crate::common::message::message_decoder; use crate::common::message::message_single::Message; use crate::common::message::message_single::MessageExtBrokerInner; use crate::common::message::MessageTrait; +use crate::common::mix_all; +use crate::error::Error::UnsupportedOperationException; +use crate::Result; +#[derive(Clone, Default)] pub struct MessageBatch { - pub messages: Vec, + ///`final_message` stores the batch-encoded messages. + pub final_message: Message, + + ///`messages` stores the batch of initialized messages. + pub messages: Option>, +} + +impl Iterator for MessageBatch { + type Item = Message; + + fn next(&mut self) -> Option { + match &self.messages { + Some(messages) => { + if let Some(message) = messages.iter().next() { + return Some(message.clone()); + } + None + } + None => None, + } + } +} + +impl MessageBatch { + pub fn encode(&self) -> Bytes { + message_decoder::encode_messages(self.messages.as_ref().unwrap()) + } + + pub fn generate_from_vec(messages: Vec) -> Result { + if messages.is_empty() { + return Err(UnsupportedOperationException( + "MessageBatch::generate_from_vec: messages is empty".to_string(), + )); + } + let mut first: Option<&Message> = None; + for message in &messages { + if message.get_delay_time_level() > 0 { + return Err(UnsupportedOperationException( + "TimeDelayLevel is not supported for batching".to_string(), + )); + } + if message + .get_topic() + .starts_with(mix_all::RETRY_GROUP_TOPIC_PREFIX) + { + return Err(UnsupportedOperationException( + "Retry group topic is not supported for batching".to_string(), + )); + } + if first.is_none() { + first = Some(message); + } else { + let first_message = first.unwrap(); + if first_message.get_topic() != message.get_topic() { + return Err(UnsupportedOperationException( + "The topic of the messages in one batch should be the same".to_string(), + )); + } + if first_message.is_wait_store_msg_ok() != message.is_wait_store_msg_ok() { + return Err(UnsupportedOperationException( + "The waitStoreMsgOK of the messages in one batch should the same" + .to_string(), + )); + } + } + } + let first = first.unwrap(); + let mut final_message = Message { + topic: first.topic.clone(), + ..Message::default() + }; + final_message.set_wait_store_msg_ok(first.is_wait_store_msg_ok()); + Ok(MessageBatch { + final_message, + messages: Some(messages), + }) + } } #[allow(unused_variables)] impl MessageTrait for MessageBatch { - fn topic(&self) -> &str { - todo!() + fn put_property(&mut self, key: &str, value: &str) { + self.final_message + .properties + .insert(key.to_string(), value.to_string()); + } + + fn clear_property(&mut self, name: &str) { + self.final_message.properties.remove(name); + } + + fn get_property(&self, name: &str) -> Option { + self.final_message.properties.get(name).cloned() + } + + fn get_topic(&self) -> &str { + &self.final_message.topic + } + + fn set_topic(&mut self, topic: &str) { + self.final_message.topic = topic.to_string(); + } + + fn get_flag(&self) -> i32 { + self.final_message.flag + } + + fn set_flag(&mut self, flag: i32) { + self.final_message.flag = flag; + } + + fn get_body(&self) -> Option<&Bytes> { + self.final_message.body.as_ref() + } + + fn set_body(&mut self, body: Bytes) { + self.final_message.body = Some(body); + } + + fn get_properties(&self) -> &HashMap { + &self.final_message.properties } - fn with_topic(&mut self, topic: impl Into) { - todo!() + fn set_properties(&mut self, properties: HashMap) { + self.final_message.properties = properties; } - fn tags(&self) -> Option<&str> { - todo!() + fn get_transaction_id(&self) -> &str { + self.final_message.transaction_id.as_deref().unwrap() } - fn with_tags(&mut self, tags: impl Into) { - todo!() + fn set_transaction_id(&mut self, transaction_id: &str) { + self.final_message.transaction_id = Some(transaction_id.to_string()); } - fn put_property(&mut self, key: impl Into, value: impl Into) { - todo!() + fn get_compressed_body_mut(&mut self) -> &mut Option { + &mut self.final_message.compressed_body } - fn properties(&self) -> &HashMap { - todo!() + fn get_compressed_body(&self) -> Option<&Bytes> { + self.final_message.compressed_body.as_ref() } - fn put_user_property(&mut self, name: impl Into, value: impl Into) { - todo!() + fn set_compressed_body_mut(&mut self, compressed_body: Bytes) { + self.final_message.compressed_body = Some(compressed_body); } - fn delay_time_level(&self) -> i32 { - todo!() + fn as_any(&self) -> &dyn Any { + self } - fn with_delay_time_level(&self, level: i32) -> i32 { - todo!() + fn as_any_mut(&mut self) -> &mut dyn Any { + self } } diff --git a/rocketmq-common/src/common/message/message_client_id_setter.rs b/rocketmq-common/src/common/message/message_client_id_setter.rs index d8926827..6c19469a 100644 --- a/rocketmq-common/src/common/message/message_client_id_setter.rs +++ b/rocketmq-common/src/common/message/message_client_id_setter.rs @@ -68,7 +68,10 @@ pub struct MessageClientIDSetter; impl MessageClientIDSetter { #[inline] - pub fn get_uniq_id(message: &Message) -> Option { + pub fn get_uniq_id(message: &T) -> Option + where + T: MessageTrait, + { message.get_property(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) } @@ -109,7 +112,10 @@ impl MessageClientIDSetter { sb.into_iter().collect() } - pub fn set_uniq_id(message: &mut Message) { + pub fn set_uniq_id(message: &mut T) + where + T: MessageTrait, + { let uniq_id = Self::create_uniq_id(); if message .get_property(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) @@ -117,7 +123,7 @@ impl MessageClientIDSetter { { message.put_property( MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, - uniq_id, + uniq_id.as_str(), ); } } diff --git a/rocketmq-common/src/common/message/message_decoder.rs b/rocketmq-common/src/common/message/message_decoder.rs index 08ac4c7f..bfd90f0d 100644 --- a/rocketmq-common/src/common/message/message_decoder.rs +++ b/rocketmq-common/src/common/message/message_decoder.rs @@ -23,14 +23,18 @@ use std::net::SocketAddrV6; use std::str; use bytes::Buf; +use bytes::BufMut; use bytes::Bytes; +use bytes::BytesMut; use crate::common::compression::compression_type::CompressionType; +use crate::common::message::message_single::Message; use crate::common::message::message_single::MessageExt; use crate::common::message::MessageVersion; use crate::common::sys_flag::message_sys_flag::MessageSysFlag; use crate::CRC32Utils::crc32; use crate::MessageUtils::build_message_id; +use crate::Result; pub const CHARSET_UTF8: &str = "UTF-8"; pub const MESSAGE_MAGIC_CODE_POSITION: usize = 4; @@ -298,6 +302,56 @@ pub fn count_inner_msg_num(bytes: Option) -> u32 { } } +pub fn encode_messages(messages: &[Message]) -> Bytes { + let mut bytes = BytesMut::new(); + let mut all_size = 0; + for message in messages { + let message_bytes = encode_message(message); + all_size += message_bytes.len(); + bytes.put_slice(&message_bytes); + } + bytes.freeze() +} + +pub fn encode_message(message: &Message) -> Bytes { + let body = message.body.as_ref().unwrap(); + let body_len = body.len(); + let properties = message_properties_to_string(&message.properties); + let properties_bytes = properties.as_bytes(); + let properties_length = properties_bytes.len(); + + let store_size = 4 // 1 TOTALSIZE + + 4 // 2 MAGICCOD + + 4 // 3 BODYCRC + + 4 // 4 FLAG + + 4 + body_len // 4 BODY + + 2 + properties_length; + + let mut bytes = BytesMut::with_capacity(store_size); + + // 1 TOTALSIZE + bytes.put_i32(store_size as i32); + + // 2 MAGICCODE + bytes.put_i32(0); + + // 3 BODYCRC + bytes.put_u32(0); + + // 4 FLAG + bytes.put_i32(message.flag); + + // 5 BODY + bytes.put_i32(body_len as i32); + bytes.put_slice(body); + + // 6 PROPERTIES + bytes.put_i16(properties_length as i16); + bytes.put_slice(properties_bytes); + + bytes.freeze() +} + #[cfg(test)] mod tests { use bytes::BufMut; diff --git a/rocketmq-common/src/common/message/message_single.rs b/rocketmq-common/src/common/message/message_single.rs index a7f7e649..b8f54165 100644 --- a/rocketmq-common/src/common/message/message_single.rs +++ b/rocketmq-common/src/common/message/message_single.rs @@ -23,6 +23,7 @@ use std::net::SocketAddr; use bytes::Buf; use bytes::BufMut; +use bytes::Bytes; use crate::common::hasher::string_hasher::JavaStringHasher; use crate::common::message::MessageConst; @@ -102,13 +103,6 @@ impl Message { .insert(MessageConst::PROPERTY_KEYS.to_string(), keys); } - pub fn set_wait_store_msg_ok(&mut self, wait_store_msg_ok: bool) { - self.properties.insert( - MessageConst::PROPERTY_WAIT_STORE_MSG_OK.to_string(), - wait_store_msg_ok.to_string(), - ); - } - pub fn clear_property(&mut self, name: impl Into) { self.properties.remove(name.into().as_str()); } @@ -184,40 +178,76 @@ impl Message { #[allow(unused_variables)] impl MessageTrait for Message { - fn topic(&self) -> &str { - todo!() + fn put_property(&mut self, key: &str, value: &str) { + self.properties.insert(key.to_string(), value.to_string()); + } + + fn clear_property(&mut self, name: &str) { + self.properties.remove(name); + } + + fn get_property(&self, name: &str) -> Option { + self.properties.get(name).cloned() + } + + fn get_topic(&self) -> &str { + &self.topic + } + + fn set_topic(&mut self, topic: &str) { + self.topic = topic.to_string(); + } + + fn get_flag(&self) -> i32 { + self.flag + } + + fn set_flag(&mut self, flag: i32) { + self.flag = flag; + } + + fn get_body(&self) -> Option<&Bytes> { + self.body.as_ref() + } + + fn set_body(&mut self, body: Bytes) { + self.body = Some(body); } - fn with_topic(&mut self, topic: impl Into) { - todo!() + fn get_properties(&self) -> &HashMap { + &self.properties } - fn tags(&self) -> Option<&str> { - todo!() + fn set_properties(&mut self, properties: HashMap) { + self.properties = properties; } - fn with_tags(&mut self, tags: impl Into) { - todo!() + fn get_transaction_id(&self) -> &str { + self.transaction_id.as_deref().unwrap() } - fn put_property(&mut self, key: impl Into, value: impl Into) { - self.properties.insert(key.into(), value.into()); + fn set_transaction_id(&mut self, transaction_id: &str) { + self.transaction_id = Some(transaction_id.to_string()); } - fn properties(&self) -> &HashMap { - todo!() + fn get_compressed_body_mut(&mut self) -> &mut Option { + &mut self.compressed_body } - fn put_user_property(&mut self, name: impl Into, value: impl Into) { - todo!() + fn get_compressed_body(&self) -> Option<&Bytes> { + self.compressed_body.as_ref() } - fn delay_time_level(&self) -> i32 { - todo!() + fn set_compressed_body_mut(&mut self, compressed_body: Bytes) { + self.compressed_body = Some(compressed_body); } - fn with_delay_time_level(&self, level: i32) -> i32 { - todo!() + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self } } diff --git a/rocketmq-common/src/error.rs b/rocketmq-common/src/error.rs index 7513e4c9..dffcae29 100644 --- a/rocketmq-common/src/error.rs +++ b/rocketmq-common/src/error.rs @@ -23,4 +23,7 @@ pub enum Error { #[error("{0}")] RuntimeException(String), + + #[error("{0}")] + UnsupportedOperationException(String), }