From 66822991e0bcc92756a40ec4b05ae5c27c2076ec Mon Sep 17 00:00:00 2001 From: mxsm Date: Sun, 17 Nov 2024 13:04:54 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#1186]=F0=9F=94=A5Optimize=20MQClientA?= =?UTF-8?q?PIImpl=20method=F0=9F=9A=80=20(#1187)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../transaction/transaction_producer.rs | 6 +-- .../src/factory/mq_client_instance.rs | 2 +- .../src/hook/end_transaction_context.rs | 9 ++-- .../client_remoting_processor.rs | 8 +--- .../src/implementation/mq_client_api_impl.rs | 4 +- .../src/producer/default_mq_producer.rs | 2 +- .../producer_impl/default_mq_producer_impl.rs | 43 ++++++++++--------- .../producer_impl/mq_producer_inner.rs | 8 ++-- rocketmq-common/src/common/message.rs | 2 +- .../src/common/message/message_batch.rs | 7 ++- .../src/common/message/message_client_ext.rs | 2 +- .../src/common/message/message_ext.rs | 3 +- .../message/message_ext_broker_inner.rs | 2 +- .../src/common/message/message_single.rs | 7 ++- 14 files changed, 56 insertions(+), 49 deletions(-) diff --git a/rocketmq-client/examples/transaction/transaction_producer.rs b/rocketmq-client/examples/transaction/transaction_producer.rs index 5617cf90..16e3eab7 100644 --- a/rocketmq-client/examples/transaction/transaction_producer.rs +++ b/rocketmq-client/examples/transaction/transaction_producer.rs @@ -19,8 +19,8 @@ use std::collections::HashMap; use std::sync::atomic::AtomicI32; use std::sync::Arc; +use cheetah_string::CheetahString; use parking_lot::Mutex; -use rocketmq_client::producer::default_mq_producer::DefaultMQProducer; use rocketmq_client::producer::local_transaction_state::LocalTransactionState; use rocketmq_client::producer::mq_producer::MQProducer; use rocketmq_client::producer::transaction_listener::TransactionListener; @@ -68,7 +68,7 @@ pub async fn main() -> Result<()> { } struct TransactionListenerImpl { - local_trans: Arc>>, + local_trans: Arc>>, transaction_index: AtomicI32, } @@ -92,7 +92,7 @@ impl TransactionListener for TransactionListenerImpl { .fetch_add(1, std::sync::atomic::Ordering::AcqRel); let status = value % 3; let mut guard = self.local_trans.lock(); - guard.insert(msg.get_transaction_id().to_string(), status); + guard.insert(msg.get_transaction_id().clone(), status); LocalTransactionState::Unknown } diff --git a/rocketmq-client/src/factory/mq_client_instance.rs b/rocketmq-client/src/factory/mq_client_instance.rs index 54740cfa..0765a13c 100644 --- a/rocketmq-client/src/factory/mq_client_instance.rs +++ b/rocketmq-client/src/factory/mq_client_instance.rs @@ -631,7 +631,7 @@ impl MQClientInstance { false } - async fn is_need_update_topic_route_info(&self, topic: &str) -> bool { + async fn is_need_update_topic_route_info(&self, topic: &CheetahString) -> bool { let mut result = false; let producer_table = self.producer_table.read().await; for (key, value) in producer_table.iter() { diff --git a/rocketmq-client/src/hook/end_transaction_context.rs b/rocketmq-client/src/hook/end_transaction_context.rs index ce7e64a0..7a3aa2fd 100644 --- a/rocketmq-client/src/hook/end_transaction_context.rs +++ b/rocketmq-client/src/hook/end_transaction_context.rs @@ -14,16 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use cheetah_string::CheetahString; use rocketmq_common::common::message::message_single::Message; use crate::producer::local_transaction_state::LocalTransactionState; pub struct EndTransactionContext<'a> { - pub producer_group: String, - pub broker_addr: String, + pub producer_group: CheetahString, + pub broker_addr: CheetahString, pub message: &'a Message, - pub msg_id: String, - pub transaction_id: String, + pub msg_id: CheetahString, + pub transaction_id: CheetahString, pub transaction_state: LocalTransactionState, pub from_transaction_check: bool, } diff --git a/rocketmq-client/src/implementation/client_remoting_processor.rs b/rocketmq-client/src/implementation/client_remoting_processor.rs index 37e805f0..6ae87da5 100644 --- a/rocketmq-client/src/implementation/client_remoting_processor.rs +++ b/rocketmq-client/src/implementation/client_remoting_processor.rs @@ -257,12 +257,8 @@ impl ClientRemotingProcessor { if let Some(group) = group { let producer = client_instance.select_producer(&group).await; if let Some(producer) = producer { - let addr = channel.remote_address().to_string(); - producer.check_transaction_state( - addr.as_str(), - message_ext, - request_header, - ); + let addr = CheetahString::from_string(channel.remote_address().to_string()); + producer.check_transaction_state(&addr, message_ext, request_header); } else { warn!("checkTransactionState, pick producer group failed"); } diff --git a/rocketmq-client/src/implementation/mq_client_api_impl.rs b/rocketmq-client/src/implementation/mq_client_api_impl.rs index 413e48b8..113e8337 100644 --- a/rocketmq-client/src/implementation/mq_client_api_impl.rs +++ b/rocketmq-client/src/implementation/mq_client_api_impl.rs @@ -1156,9 +1156,9 @@ impl MQClientAPIImpl { pub async fn end_transaction_oneway( &mut self, - addr: &str, + addr: &CheetahString, request_header: EndTransactionRequestHeader, - remark: String, + remark: CheetahString, timeout_millis: u64, ) -> Result<()> { let request = diff --git a/rocketmq-client/src/producer/default_mq_producer.rs b/rocketmq-client/src/producer/default_mq_producer.rs index 7bfe21db..c2def162 100644 --- a/rocketmq-client/src/producer/default_mq_producer.rs +++ b/rocketmq-client/src/producer/default_mq_producer.rs @@ -678,7 +678,7 @@ impl MQProducer for DefaultMQProducer { self.default_mqproducer_impl .as_mut() .unwrap() - .fetch_publish_message_queues(topic.as_str()) + .fetch_publish_message_queues(topic.as_ref()) .await } diff --git a/rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs b/rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs index 23b0c8b2..12a8ab79 100644 --- a/rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs +++ b/rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs @@ -1416,7 +1416,10 @@ impl DefaultMQProducerImpl { Ok(result.expect("send result is none")) } - pub async fn fetch_publish_message_queues(&mut self, topic: &str) -> Result> { + pub async fn fetch_publish_message_queues( + &mut self, + topic: &CheetahString, + ) -> Result> { self.make_sure_state_ok()?; let client_instance = self .client_instance @@ -1477,7 +1480,7 @@ impl DefaultMQProducerImpl { .set_cause(Box::new(MQClientError::MQClientErr(-1, error.to_string()))); } }; - let topic = msg.get_topic().to_string(); + let topic = msg.get_topic().clone(); let _ = self .send_select_impl( msg, @@ -1595,7 +1598,7 @@ impl DefaultMQProducerImpl { .set_cause(Box::new(MQClientError::MQClientErr(-1, error.to_string()))); } }; - let topic = msg.get_topic().to_string(); + let topic = msg.get_topic().clone(); let _ = self .send_kernel_impl( &mut msg, @@ -1761,7 +1764,7 @@ impl DefaultMQProducerImpl { .set_cause(Box::new(MQClientError::MQClientErr(-1, error.to_string()))); } }; - let topic = msg.get_topic().to_string(); + let topic = msg.get_topic().clone(); let cost = begin_timestamp.elapsed().as_millis() as u64; self.send_default_impl( &mut msg, @@ -1783,7 +1786,7 @@ impl DefaultMQProducerImpl { async fn wait_response( &mut self, - topic: &str, + topic: &CheetahString, timeout: u64, request_response_future: Arc, cost: u64, @@ -2000,7 +2003,7 @@ impl DefaultMQProducerImpl { .end_transaction_oneway( broker_addr.as_ref().unwrap(), request_header, - "".to_string(), + CheetahString::from_static_str(""), self.producer_config.send_msg_timeout() as u64, ) .await; @@ -2014,8 +2017,8 @@ impl DefaultMQProducerImpl { pub fn do_execute_end_transaction_hook( &mut self, msg: &Message, - msg_id: &str, - broker_addr: &str, + msg_id: &CheetahString, + broker_addr: &CheetahString, local_transaction_state: LocalTransactionState, from_transaction_check: bool, ) { @@ -2023,11 +2026,11 @@ impl DefaultMQProducerImpl { return; } let end_transaction_context = EndTransactionContext { - producer_group: self.producer_config.producer_group().to_string(), + producer_group: self.producer_config.producer_group().clone(), message: msg, - msg_id: msg_id.to_string(), - transaction_id: msg.get_transaction_id().to_string(), - broker_addr: broker_addr.to_string(), + msg_id: msg_id.clone(), + transaction_id: msg.get_transaction_id().clone(), + broker_addr: broker_addr.clone(), from_transaction_check, transaction_state: local_transaction_state, }; @@ -2079,9 +2082,9 @@ impl MQProducerInner for DefaultMQProducerImpl { .unwrap() } - fn is_publish_topic_need_update(&self, topic: &str) -> bool { + fn is_publish_topic_need_update(&self, topic: &CheetahString) -> bool { let handle = Handle::current(); - let topic = topic.to_string(); + let topic = topic.clone(); let topic_publish_info_table = self.topic_publish_info_table.clone(); thread::spawn(move || { handle.block_on(async move { @@ -2103,13 +2106,13 @@ impl MQProducerInner for DefaultMQProducerImpl { fn check_transaction_state( &self, - broker_addr: &str, + broker_addr: &CheetahString, msg: MessageExt, check_request_header: CheckTransactionStateRequestHeader, ) { let transaction_listener = self.transaction_listener.clone().unwrap(); let mut producer_impl_inner = self.default_mqproducer_impl_inner.clone().unwrap(); - let broker_addr = broker_addr.to_string(); + let broker_addr = broker_addr.clone(); self.check_runtime .as_ref() .unwrap() @@ -2154,8 +2157,8 @@ impl MQProducerInner for DefaultMQProducerImpl { }; producer_impl_inner.do_execute_end_transaction_hook( &msg.message, - unique_key.as_ref().unwrap().as_str(), - broker_addr.as_str(), + unique_key.as_ref().unwrap(), + &broker_addr, transaction_state, true, ); @@ -2167,9 +2170,9 @@ impl MQProducerInner for DefaultMQProducerImpl { .as_mut() .unwrap() .end_transaction_oneway( - broker_addr.as_str(), + &broker_addr, request_header, - "".to_string(), + CheetahString::from_static_str(""), 3000, ) .await; diff --git a/rocketmq-client/src/producer/producer_impl/mq_producer_inner.rs b/rocketmq-client/src/producer/producer_impl/mq_producer_inner.rs index 560f9f1c..684a4997 100644 --- a/rocketmq-client/src/producer/producer_impl/mq_producer_inner.rs +++ b/rocketmq-client/src/producer/producer_impl/mq_producer_inner.rs @@ -29,13 +29,13 @@ use crate::producer::transaction_listener::TransactionListener; pub trait MQProducerInner: Send + Sync + 'static { fn get_publish_topic_list(&self) -> HashSet; - fn is_publish_topic_need_update(&self, topic: &str) -> bool; + fn is_publish_topic_need_update(&self, topic: &CheetahString) -> bool; fn get_check_listener(&self) -> Arc>; fn check_transaction_state( &self, - broker_addr: &str, + broker_addr: &CheetahString, msg: MessageExt, check_request_header: CheckTransactionStateRequestHeader, ); @@ -58,7 +58,7 @@ impl MQProducerInnerImpl { HashSet::new() } - pub fn is_publish_topic_need_update(&self, topic: &str) -> bool { + pub fn is_publish_topic_need_update(&self, topic: &CheetahString) -> bool { if let Some(default_mqproducer_impl_inner) = &self.default_mqproducer_impl_inner { return default_mqproducer_impl_inner.is_publish_topic_need_update(topic); } @@ -74,7 +74,7 @@ impl MQProducerInnerImpl { pub fn check_transaction_state( &self, - addr: &str, + addr: &CheetahString, msg: MessageExt, check_request_header: CheckTransactionStateRequestHeader, ) { diff --git a/rocketmq-common/src/common/message.rs b/rocketmq-common/src/common/message.rs index 1d4ad0a2..259cb753 100644 --- a/rocketmq-common/src/common/message.rs +++ b/rocketmq-common/src/common/message.rs @@ -302,7 +302,7 @@ pub trait MessageTrait: Any + Display + Debug { /// # Returns /// /// A reference to the transaction ID as a `&str`. - fn get_transaction_id(&self) -> &str; + fn get_transaction_id(&self) -> &CheetahString; /// Sets the transaction ID for the message. /// diff --git a/rocketmq-common/src/common/message/message_batch.rs b/rocketmq-common/src/common/message/message_batch.rs index 5ddae134..1b55da1f 100644 --- a/rocketmq-common/src/common/message/message_batch.rs +++ b/rocketmq-common/src/common/message/message_batch.rs @@ -179,8 +179,11 @@ impl MessageTrait for MessageBatch { self.final_message.properties = properties; } - fn get_transaction_id(&self) -> &str { - self.final_message.transaction_id.as_deref().unwrap() + fn get_transaction_id(&self) -> &CheetahString { + self.final_message + .transaction_id + .as_ref() + .expect("transaction_id is None") } fn set_transaction_id(&mut self, transaction_id: CheetahString) { diff --git a/rocketmq-common/src/common/message/message_client_ext.rs b/rocketmq-common/src/common/message/message_client_ext.rs index 3e605fcc..8faa763f 100644 --- a/rocketmq-common/src/common/message/message_client_ext.rs +++ b/rocketmq-common/src/common/message/message_client_ext.rs @@ -105,7 +105,7 @@ impl MessageTrait for MessageClientExt { self.message_ext_inner.set_properties(properties); } - fn get_transaction_id(&self) -> &str { + fn get_transaction_id(&self) -> &CheetahString { self.message_ext_inner.get_transaction_id() } diff --git a/rocketmq-common/src/common/message/message_ext.rs b/rocketmq-common/src/common/message/message_ext.rs index e9131886..c991b403 100644 --- a/rocketmq-common/src/common/message/message_ext.rs +++ b/rocketmq-common/src/common/message/message_ext.rs @@ -320,7 +320,8 @@ impl MessageTrait for MessageExt { self.message.set_properties(properties); } - fn get_transaction_id(&self) -> &str { + #[inline] + fn get_transaction_id(&self) -> &CheetahString { self.message.get_transaction_id() } diff --git a/rocketmq-common/src/common/message/message_ext_broker_inner.rs b/rocketmq-common/src/common/message/message_ext_broker_inner.rs index 9ef3a536..e35c8e76 100644 --- a/rocketmq-common/src/common/message/message_ext_broker_inner.rs +++ b/rocketmq-common/src/common/message/message_ext_broker_inner.rs @@ -231,7 +231,7 @@ impl MessageTrait for MessageExtBrokerInner { self.message_ext_inner.set_properties(properties); } - fn get_transaction_id(&self) -> &str { + fn get_transaction_id(&self) -> &CheetahString { self.message_ext_inner.get_transaction_id() } diff --git a/rocketmq-common/src/common/message/message_single.rs b/rocketmq-common/src/common/message/message_single.rs index d593ebcf..981e0463 100644 --- a/rocketmq-common/src/common/message/message_single.rs +++ b/rocketmq-common/src/common/message/message_single.rs @@ -327,8 +327,11 @@ impl MessageTrait for Message { self.properties = properties; } - fn get_transaction_id(&self) -> &str { - self.transaction_id.as_deref().unwrap() + #[inline] + fn get_transaction_id(&self) -> &CheetahString { + self.transaction_id + .as_ref() + .expect("transaction_id is None") } fn set_transaction_id(&mut self, transaction_id: CheetahString) {