From 4df11a6857b4b431aef2f0806c45be99b003aa6a Mon Sep 17 00:00:00 2001 From: mxsm Date: Fri, 25 Oct 2024 23:15:06 +0800 Subject: [PATCH] add --- .../src/factory/mq_client_instance.rs | 13 ++++++------ .../producer_impl/default_mq_producer_impl.rs | 21 +++++++++---------- .../producer_impl/mq_producer_inner.rs | 15 +++++++------ 3 files changed, 24 insertions(+), 25 deletions(-) diff --git a/rocketmq-client/src/factory/mq_client_instance.rs b/rocketmq-client/src/factory/mq_client_instance.rs index 7b94c6db..dd6e1b76 100644 --- a/rocketmq-client/src/factory/mq_client_instance.rs +++ b/rocketmq-client/src/factory/mq_client_instance.rs @@ -75,7 +75,7 @@ pub struct MQClientInstance { * The container of the producer in the current client. The key is the name of * producerGroup. */ - producer_table: Arc>>, + producer_table: Arc>>>, /** * The container of the consumer in the current client. The key is the name of * consumer_group. @@ -328,7 +328,7 @@ impl MQClientInstance { pub async fn shutdown(&mut self) {} - pub async fn register_producer(&mut self, group: &str, producer: MQProducerInnerImpl) -> bool { + pub async fn register_producer(&mut self, group: &str, producer: impl MQProducerInner) -> bool { if group.is_empty() { return false; } @@ -337,7 +337,7 @@ impl MQClientInstance { warn!("the producer group[{}] exist already.", group); return false; } - producer_table.insert(group.to_string(), producer); + producer_table.insert(group.to_string(), Box::new(producer)); true } @@ -1057,9 +1057,10 @@ impl MQClientInstance { consumer_table.get(group).cloned() } - pub async fn select_producer(&self, group: &str) -> Option { - let producer_table = self.producer_table.read().await; - producer_table.get(group).cloned() + pub async fn select_producer(&self, group: &str) -> Option> { + /*let producer_table = self.producer_table.read().await; + producer_table.get(group).cloned()*/ + unimplemented!("select_producer") } pub async fn unregister_consumer(&mut self, group: impl Into) { diff --git a/rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs b/rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs index cc23c4be..96b413b9 100644 --- a/rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs +++ b/rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs @@ -2124,17 +2124,16 @@ impl DefaultMQProducerImpl { .set_service_detector(service_detector); self.client_instance = Some(client_instance); let self_clone = self.default_mqproducer_impl_inner.clone(); - let register_ok = self - .client_instance - .as_mut() - .unwrap() - .register_producer( - self.producer_config.producer_group(), - MQProducerInnerImpl { - default_mqproducer_impl_inner: self_clone, - }, - ) - .await; + /*let register_ok = self + .client_instance + .as_mut() + .unwrap() + .register_producer( + self.producer_config.producer_group(), + self_clone, + ) + .await;*/ + let register_ok = true; if !register_ok { self.service_state = ServiceState::CreateJust; return Err(MQClientError::MQClientErr( diff --git a/rocketmq-client/src/producer/producer_impl/mq_producer_inner.rs b/rocketmq-client/src/producer/producer_impl/mq_producer_inner.rs index ce637867..66b5885e 100644 --- a/rocketmq-client/src/producer/producer_impl/mq_producer_inner.rs +++ b/rocketmq-client/src/producer/producer_impl/mq_producer_inner.rs @@ -21,7 +21,6 @@ use rocketmq_common::common::message::message_ext::MessageExt; use rocketmq_common::WeakCellWrapper; use rocketmq_remoting::protocol::header::check_transaction_state_request_header::CheckTransactionStateRequestHeader; -use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl; use crate::producer::producer_impl::default_mq_producer_impl::DefaultMQProducerImpl; use crate::producer::producer_impl::topic_publish_info::TopicPublishInfo; use crate::producer::transaction_listener::TransactionListener; @@ -50,8 +49,8 @@ pub(crate) struct MQProducerInnerImpl { pub(crate) default_mqproducer_impl_inner: Option>, } -impl MQProducerInner for MQProducerInnerImpl { - fn get_publish_topic_list(&self) -> HashSet { +impl MQProducerInnerImpl { + pub fn get_publish_topic_list(&self) -> HashSet { if let Some(default_mqproducer_impl_inner) = &self.default_mqproducer_impl_inner { if let Some(inner) = default_mqproducer_impl_inner.upgrade() { return inner.get_publish_topic_list(); @@ -60,7 +59,7 @@ impl MQProducerInner for MQProducerInnerImpl { HashSet::new() } - fn is_publish_topic_need_update(&self, topic: &str) -> bool { + pub fn is_publish_topic_need_update(&self, topic: &str) -> bool { if let Some(default_mqproducer_impl_inner) = &self.default_mqproducer_impl_inner { if let Some(inner) = default_mqproducer_impl_inner.upgrade() { return inner.is_publish_topic_need_update(topic); @@ -69,7 +68,7 @@ impl MQProducerInner for MQProducerInnerImpl { false } - fn get_check_listener(&self) -> Arc> { + pub fn get_check_listener(&self) -> Arc> { if let Some(default_mqproducer_impl_inner) = &self.default_mqproducer_impl_inner { if let Some(inner) = default_mqproducer_impl_inner.upgrade() { return inner.get_check_listener(); @@ -78,7 +77,7 @@ impl MQProducerInner for MQProducerInnerImpl { unreachable!("default_mqproducer_impl_inner is None") } - fn check_transaction_state( + pub fn check_transaction_state( &self, addr: &str, msg: &MessageExt, @@ -91,7 +90,7 @@ impl MQProducerInner for MQProducerInnerImpl { } } - fn update_topic_publish_info(&mut self, topic: String, info: Option) { + pub fn update_topic_publish_info(&mut self, topic: String, info: Option) { if let Some(default_mqproducer_impl_inner) = &self.default_mqproducer_impl_inner { if let Some(mut inner) = default_mqproducer_impl_inner.upgrade() { inner.update_topic_publish_info(topic, info); @@ -99,7 +98,7 @@ impl MQProducerInner for MQProducerInnerImpl { } } - fn is_unit_mode(&self) -> bool { + pub fn is_unit_mode(&self) -> bool { if let Some(default_mqproducer_impl_inner) = &self.default_mqproducer_impl_inner { if let Some(inner) = default_mqproducer_impl_inner.upgrade() { return inner.is_unit_mode();