Skip to content

Commit

Permalink
[ISSUE #1186]🔥Optimize MQClientAPIImpl method🚀 (#1187)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Nov 17, 2024
1 parent 8c99b6a commit 6682299
Show file tree
Hide file tree
Showing 14 changed files with 56 additions and 49 deletions.
6 changes: 3 additions & 3 deletions rocketmq-client/examples/transaction/transaction_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use std::collections::HashMap;
use std::sync::atomic::AtomicI32;
use std::sync::Arc;

use cheetah_string::CheetahString;
use parking_lot::Mutex;
use rocketmq_client::producer::default_mq_producer::DefaultMQProducer;
use rocketmq_client::producer::local_transaction_state::LocalTransactionState;
use rocketmq_client::producer::mq_producer::MQProducer;
use rocketmq_client::producer::transaction_listener::TransactionListener;
Expand Down Expand Up @@ -68,7 +68,7 @@ pub async fn main() -> Result<()> {
}

struct TransactionListenerImpl {
local_trans: Arc<Mutex<HashMap<String, i32>>>,
local_trans: Arc<Mutex<HashMap<CheetahString, i32>>>,
transaction_index: AtomicI32,
}

Expand All @@ -92,7 +92,7 @@ impl TransactionListener for TransactionListenerImpl {
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
let status = value % 3;
let mut guard = self.local_trans.lock();
guard.insert(msg.get_transaction_id().to_string(), status);
guard.insert(msg.get_transaction_id().clone(), status);
LocalTransactionState::Unknown
}

Expand Down
2 changes: 1 addition & 1 deletion rocketmq-client/src/factory/mq_client_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ impl MQClientInstance {
false
}

async fn is_need_update_topic_route_info(&self, topic: &str) -> bool {
async fn is_need_update_topic_route_info(&self, topic: &CheetahString) -> bool {
let mut result = false;
let producer_table = self.producer_table.read().await;
for (key, value) in producer_table.iter() {
Expand Down
9 changes: 5 additions & 4 deletions rocketmq-client/src/hook/end_transaction_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use cheetah_string::CheetahString;
use rocketmq_common::common::message::message_single::Message;

use crate::producer::local_transaction_state::LocalTransactionState;

pub struct EndTransactionContext<'a> {
pub producer_group: String,
pub broker_addr: String,
pub producer_group: CheetahString,
pub broker_addr: CheetahString,
pub message: &'a Message,
pub msg_id: String,
pub transaction_id: String,
pub msg_id: CheetahString,
pub transaction_id: CheetahString,
pub transaction_state: LocalTransactionState,
pub from_transaction_check: bool,
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,8 @@ impl ClientRemotingProcessor {
if let Some(group) = group {
let producer = client_instance.select_producer(&group).await;
if let Some(producer) = producer {
let addr = channel.remote_address().to_string();
producer.check_transaction_state(
addr.as_str(),
message_ext,
request_header,
);
let addr = CheetahString::from_string(channel.remote_address().to_string());
producer.check_transaction_state(&addr, message_ext, request_header);
} else {
warn!("checkTransactionState, pick producer group failed");
}
Expand Down
4 changes: 2 additions & 2 deletions rocketmq-client/src/implementation/mq_client_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1156,9 +1156,9 @@ impl MQClientAPIImpl {

pub async fn end_transaction_oneway(
&mut self,
addr: &str,
addr: &CheetahString,
request_header: EndTransactionRequestHeader,
remark: String,
remark: CheetahString,
timeout_millis: u64,
) -> Result<()> {
let request =
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-client/src/producer/default_mq_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ impl MQProducer for DefaultMQProducer {
self.default_mqproducer_impl
.as_mut()
.unwrap()
.fetch_publish_message_queues(topic.as_str())
.fetch_publish_message_queues(topic.as_ref())
.await
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1416,7 +1416,10 @@ impl DefaultMQProducerImpl {
Ok(result.expect("send result is none"))
}

pub async fn fetch_publish_message_queues(&mut self, topic: &str) -> Result<Vec<MessageQueue>> {
pub async fn fetch_publish_message_queues(
&mut self,
topic: &CheetahString,
) -> Result<Vec<MessageQueue>> {
self.make_sure_state_ok()?;
let client_instance = self
.client_instance
Expand Down Expand Up @@ -1477,7 +1480,7 @@ impl DefaultMQProducerImpl {
.set_cause(Box::new(MQClientError::MQClientErr(-1, error.to_string())));
}
};
let topic = msg.get_topic().to_string();
let topic = msg.get_topic().clone();
let _ = self
.send_select_impl(
msg,
Expand Down Expand Up @@ -1595,7 +1598,7 @@ impl DefaultMQProducerImpl {
.set_cause(Box::new(MQClientError::MQClientErr(-1, error.to_string())));
}
};
let topic = msg.get_topic().to_string();
let topic = msg.get_topic().clone();
let _ = self
.send_kernel_impl(
&mut msg,
Expand Down Expand Up @@ -1761,7 +1764,7 @@ impl DefaultMQProducerImpl {
.set_cause(Box::new(MQClientError::MQClientErr(-1, error.to_string())));
}
};
let topic = msg.get_topic().to_string();
let topic = msg.get_topic().clone();
let cost = begin_timestamp.elapsed().as_millis() as u64;
self.send_default_impl(
&mut msg,
Expand All @@ -1783,7 +1786,7 @@ impl DefaultMQProducerImpl {

async fn wait_response(
&mut self,
topic: &str,
topic: &CheetahString,
timeout: u64,
request_response_future: Arc<RequestResponseFuture>,
cost: u64,
Expand Down Expand Up @@ -2000,7 +2003,7 @@ impl DefaultMQProducerImpl {
.end_transaction_oneway(
broker_addr.as_ref().unwrap(),
request_header,
"".to_string(),
CheetahString::from_static_str(""),
self.producer_config.send_msg_timeout() as u64,
)
.await;
Expand All @@ -2014,20 +2017,20 @@ impl DefaultMQProducerImpl {
pub fn do_execute_end_transaction_hook(
&mut self,
msg: &Message,
msg_id: &str,
broker_addr: &str,
msg_id: &CheetahString,
broker_addr: &CheetahString,
local_transaction_state: LocalTransactionState,
from_transaction_check: bool,
) {
if !self.has_end_transaction_hook() {
return;
}
let end_transaction_context = EndTransactionContext {
producer_group: self.producer_config.producer_group().to_string(),
producer_group: self.producer_config.producer_group().clone(),
message: msg,
msg_id: msg_id.to_string(),
transaction_id: msg.get_transaction_id().to_string(),
broker_addr: broker_addr.to_string(),
msg_id: msg_id.clone(),
transaction_id: msg.get_transaction_id().clone(),
broker_addr: broker_addr.clone(),
from_transaction_check,
transaction_state: local_transaction_state,
};
Expand Down Expand Up @@ -2079,9 +2082,9 @@ impl MQProducerInner for DefaultMQProducerImpl {
.unwrap()
}

fn is_publish_topic_need_update(&self, topic: &str) -> bool {
fn is_publish_topic_need_update(&self, topic: &CheetahString) -> bool {
let handle = Handle::current();
let topic = topic.to_string();
let topic = topic.clone();
let topic_publish_info_table = self.topic_publish_info_table.clone();
thread::spawn(move || {
handle.block_on(async move {
Expand All @@ -2103,13 +2106,13 @@ impl MQProducerInner for DefaultMQProducerImpl {

fn check_transaction_state(
&self,
broker_addr: &str,
broker_addr: &CheetahString,
msg: MessageExt,
check_request_header: CheckTransactionStateRequestHeader,
) {
let transaction_listener = self.transaction_listener.clone().unwrap();
let mut producer_impl_inner = self.default_mqproducer_impl_inner.clone().unwrap();
let broker_addr = broker_addr.to_string();
let broker_addr = broker_addr.clone();
self.check_runtime
.as_ref()
.unwrap()
Expand Down Expand Up @@ -2154,8 +2157,8 @@ impl MQProducerInner for DefaultMQProducerImpl {
};
producer_impl_inner.do_execute_end_transaction_hook(
&msg.message,
unique_key.as_ref().unwrap().as_str(),
broker_addr.as_str(),
unique_key.as_ref().unwrap(),
&broker_addr,
transaction_state,
true,
);
Expand All @@ -2167,9 +2170,9 @@ impl MQProducerInner for DefaultMQProducerImpl {
.as_mut()
.unwrap()
.end_transaction_oneway(
broker_addr.as_str(),
&broker_addr,
request_header,
"".to_string(),
CheetahString::from_static_str(""),
3000,
)
.await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ use crate::producer::transaction_listener::TransactionListener;
pub trait MQProducerInner: Send + Sync + 'static {
fn get_publish_topic_list(&self) -> HashSet<CheetahString>;

fn is_publish_topic_need_update(&self, topic: &str) -> bool;
fn is_publish_topic_need_update(&self, topic: &CheetahString) -> bool;

fn get_check_listener(&self) -> Arc<Box<dyn TransactionListener>>;

fn check_transaction_state(
&self,
broker_addr: &str,
broker_addr: &CheetahString,
msg: MessageExt,
check_request_header: CheckTransactionStateRequestHeader,
);
Expand All @@ -58,7 +58,7 @@ impl MQProducerInnerImpl {
HashSet::new()
}

pub fn is_publish_topic_need_update(&self, topic: &str) -> bool {
pub fn is_publish_topic_need_update(&self, topic: &CheetahString) -> bool {
if let Some(default_mqproducer_impl_inner) = &self.default_mqproducer_impl_inner {
return default_mqproducer_impl_inner.is_publish_topic_need_update(topic);
}
Expand All @@ -74,7 +74,7 @@ impl MQProducerInnerImpl {

pub fn check_transaction_state(
&self,
addr: &str,
addr: &CheetahString,
msg: MessageExt,
check_request_header: CheckTransactionStateRequestHeader,
) {
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-common/src/common/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ pub trait MessageTrait: Any + Display + Debug {
/// # Returns
///
/// A reference to the transaction ID as a `&str`.
fn get_transaction_id(&self) -> &str;
fn get_transaction_id(&self) -> &CheetahString;

/// Sets the transaction ID for the message.
///
Expand Down
7 changes: 5 additions & 2 deletions rocketmq-common/src/common/message/message_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,11 @@ impl MessageTrait for MessageBatch {
self.final_message.properties = properties;
}

fn get_transaction_id(&self) -> &str {
self.final_message.transaction_id.as_deref().unwrap()
fn get_transaction_id(&self) -> &CheetahString {
self.final_message
.transaction_id
.as_ref()
.expect("transaction_id is None")
}

fn set_transaction_id(&mut self, transaction_id: CheetahString) {
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-common/src/common/message/message_client_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl MessageTrait for MessageClientExt {
self.message_ext_inner.set_properties(properties);
}

fn get_transaction_id(&self) -> &str {
fn get_transaction_id(&self) -> &CheetahString {
self.message_ext_inner.get_transaction_id()
}

Expand Down
3 changes: 2 additions & 1 deletion rocketmq-common/src/common/message/message_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,8 @@ impl MessageTrait for MessageExt {
self.message.set_properties(properties);
}

fn get_transaction_id(&self) -> &str {
#[inline]
fn get_transaction_id(&self) -> &CheetahString {
self.message.get_transaction_id()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ impl MessageTrait for MessageExtBrokerInner {
self.message_ext_inner.set_properties(properties);
}

fn get_transaction_id(&self) -> &str {
fn get_transaction_id(&self) -> &CheetahString {
self.message_ext_inner.get_transaction_id()
}

Expand Down
7 changes: 5 additions & 2 deletions rocketmq-common/src/common/message/message_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,11 @@ impl MessageTrait for Message {
self.properties = properties;
}

fn get_transaction_id(&self) -> &str {
self.transaction_id.as_deref().unwrap()
#[inline]
fn get_transaction_id(&self) -> &CheetahString {
self.transaction_id
.as_ref()
.expect("transaction_id is None")
}

fn set_transaction_id(&mut self, transaction_id: CheetahString) {
Expand Down

0 comments on commit 6682299

Please sign in to comment.