Skip to content

Commit

Permalink
add
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Oct 25, 2024
1 parent 8e8deed commit 4df11a6
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 25 deletions.
13 changes: 7 additions & 6 deletions rocketmq-client/src/factory/mq_client_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub struct MQClientInstance {
* The container of the producer in the current client. The key is the name of
* producerGroup.
*/
producer_table: Arc<RwLock<HashMap<String, MQProducerInnerImpl>>>,
producer_table: Arc<RwLock<HashMap<String, Box<dyn MQProducerInner>>>>,
/**
* The container of the consumer in the current client. The key is the name of
* consumer_group.
Expand Down Expand Up @@ -328,7 +328,7 @@ impl MQClientInstance {

pub async fn shutdown(&mut self) {}

pub async fn register_producer(&mut self, group: &str, producer: MQProducerInnerImpl) -> bool {
pub async fn register_producer(&mut self, group: &str, producer: impl MQProducerInner) -> bool {
if group.is_empty() {
return false;
}
Expand All @@ -337,7 +337,7 @@ impl MQClientInstance {
warn!("the producer group[{}] exist already.", group);
return false;
}
producer_table.insert(group.to_string(), producer);
producer_table.insert(group.to_string(), Box::new(producer));
true
}

Expand Down Expand Up @@ -1057,9 +1057,10 @@ impl MQClientInstance {
consumer_table.get(group).cloned()
}

pub async fn select_producer(&self, group: &str) -> Option<MQProducerInnerImpl> {
let producer_table = self.producer_table.read().await;
producer_table.get(group).cloned()
pub async fn select_producer(&self, group: &str) -> Option<Box<dyn MQProducerInner>> {
/*let producer_table = self.producer_table.read().await;
producer_table.get(group).cloned()*/
unimplemented!("select_producer")
}

pub async fn unregister_consumer(&mut self, group: impl Into<String>) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2124,17 +2124,16 @@ impl DefaultMQProducerImpl {
.set_service_detector(service_detector);
self.client_instance = Some(client_instance);
let self_clone = self.default_mqproducer_impl_inner.clone();
let register_ok = self
.client_instance
.as_mut()
.unwrap()
.register_producer(
self.producer_config.producer_group(),
MQProducerInnerImpl {
default_mqproducer_impl_inner: self_clone,
},
)
.await;
/*let register_ok = self
.client_instance
.as_mut()
.unwrap()
.register_producer(
self.producer_config.producer_group(),
self_clone,
)
.await;*/
let register_ok = true;
if !register_ok {
self.service_state = ServiceState::CreateJust;
return Err(MQClientError::MQClientErr(
Expand Down
15 changes: 7 additions & 8 deletions rocketmq-client/src/producer/producer_impl/mq_producer_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_common::WeakCellWrapper;
use rocketmq_remoting::protocol::header::check_transaction_state_request_header::CheckTransactionStateRequestHeader;

use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl;
use crate::producer::producer_impl::default_mq_producer_impl::DefaultMQProducerImpl;
use crate::producer::producer_impl::topic_publish_info::TopicPublishInfo;
use crate::producer::transaction_listener::TransactionListener;
Expand Down Expand Up @@ -50,8 +49,8 @@ pub(crate) struct MQProducerInnerImpl {
pub(crate) default_mqproducer_impl_inner: Option<WeakCellWrapper<DefaultMQProducerImpl>>,
}

impl MQProducerInner for MQProducerInnerImpl {
fn get_publish_topic_list(&self) -> HashSet<String> {
impl MQProducerInnerImpl {
pub fn get_publish_topic_list(&self) -> HashSet<String> {
if let Some(default_mqproducer_impl_inner) = &self.default_mqproducer_impl_inner {
if let Some(inner) = default_mqproducer_impl_inner.upgrade() {
return inner.get_publish_topic_list();
Expand All @@ -60,7 +59,7 @@ impl MQProducerInner for MQProducerInnerImpl {
HashSet::new()
}

fn is_publish_topic_need_update(&self, topic: &str) -> bool {
pub fn is_publish_topic_need_update(&self, topic: &str) -> bool {
if let Some(default_mqproducer_impl_inner) = &self.default_mqproducer_impl_inner {
if let Some(inner) = default_mqproducer_impl_inner.upgrade() {
return inner.is_publish_topic_need_update(topic);
Expand All @@ -69,7 +68,7 @@ impl MQProducerInner for MQProducerInnerImpl {
false
}

fn get_check_listener(&self) -> Arc<Box<dyn TransactionListener>> {
pub fn get_check_listener(&self) -> Arc<Box<dyn TransactionListener>> {
if let Some(default_mqproducer_impl_inner) = &self.default_mqproducer_impl_inner {
if let Some(inner) = default_mqproducer_impl_inner.upgrade() {
return inner.get_check_listener();
Expand All @@ -78,7 +77,7 @@ impl MQProducerInner for MQProducerInnerImpl {
unreachable!("default_mqproducer_impl_inner is None")
}

fn check_transaction_state(
pub fn check_transaction_state(
&self,
addr: &str,
msg: &MessageExt,
Expand All @@ -91,15 +90,15 @@ impl MQProducerInner for MQProducerInnerImpl {
}
}

fn update_topic_publish_info(&mut self, topic: String, info: Option<TopicPublishInfo>) {
pub fn update_topic_publish_info(&mut self, topic: String, info: Option<TopicPublishInfo>) {
if let Some(default_mqproducer_impl_inner) = &self.default_mqproducer_impl_inner {
if let Some(mut inner) = default_mqproducer_impl_inner.upgrade() {
inner.update_topic_publish_info(topic, info);
}
}
}

fn is_unit_mode(&self) -> bool {
pub fn is_unit_mode(&self) -> bool {
if let Some(default_mqproducer_impl_inner) = &self.default_mqproducer_impl_inner {
if let Some(inner) = default_mqproducer_impl_inner.upgrade() {
return inner.is_unit_mode();
Expand Down

0 comments on commit 4df11a6

Please sign in to comment.