Skip to content

Commit

Permalink
[ISSUE #901]🔥Implement Produer send single message other methods🚀 (#905)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Aug 18, 2024
1 parent 73fb139 commit 34af22a
Show file tree
Hide file tree
Showing 5 changed files with 440 additions and 22 deletions.
2 changes: 1 addition & 1 deletion rocketmq-client/src/implementation/mq_client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl MQClientManager {
let accumulator = accumulator_table
.entry(client_id.clone())
.or_insert_with(|| {
let accumulator = ProduceAccumulator::new();
let accumulator = ProduceAccumulator::new(client_id.as_str());
info!(
"Created new ProduceAccumulator for clientId: [{}]",
client_id
Expand Down
179 changes: 168 additions & 11 deletions rocketmq-client/src/producer/default_mq_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use rocketmq_common::common::message::message_batch::MessageBatch;
use rocketmq_common::common::message::message_client_id_setter::MessageClientIDSetter;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::message::message_single::Message;
use rocketmq_common::common::message::MessageConst;
use rocketmq_common::common::message::MessageTrait;
use rocketmq_common::common::mix_all;
use rocketmq_common::common::mix_all::MESSAGE_COMPRESS_LEVEL;
use rocketmq_common::common::mix_all::MESSAGE_COMPRESS_TYPE;
use rocketmq_common::common::topic::TopicValidator;
Expand All @@ -44,6 +46,7 @@ use crate::producer::produce_accumulator::ProduceAccumulator;
use crate::producer::producer_impl::default_mq_producer_impl::DefaultMQProducerImpl;
use crate::producer::request_callback::RequestCallback;
use crate::producer::send_callback::SendCallback;
use crate::producer::send_callback::SendMessageCallback;
use crate::producer::send_result::SendResult;
use crate::producer::transaction_send_result::TransactionSendResult;
use crate::trace::async_trace_dispatcher::AsyncTraceDispatcher;
Expand Down Expand Up @@ -239,7 +242,7 @@ impl Default for ProducerConfig {
}
}

#[derive(Default)]
#[derive(Default, Clone)]
pub struct DefaultMQProducer {
client_config: ClientConfig,
producer_config: ProducerConfig,
Expand Down Expand Up @@ -489,6 +492,123 @@ impl DefaultMQProducer {
}
}
}

#[inline]
pub fn get_auto_batch(&self) -> bool {
self.producer_config.produce_accumulator.is_some() && self.producer_config.auto_batch
}

pub async fn send_direct<M>(
&mut self,
msg: M,
mq: Option<MessageQueue>,
send_callback: Option<SendMessageCallback>,
) -> Result<Option<SendResult>>
where
M: MessageTrait + Clone + Send + Sync,
{
if send_callback.is_none() {
if mq.is_none() {
self.default_mqproducer_impl
.as_mut()
.unwrap()
.send(msg)
.await
} else {
self.default_mqproducer_impl
.as_mut()
.unwrap()
.sync_send_with_message_queue(msg, mq.unwrap())
.await
}
} else if mq.is_none() {
self.default_mqproducer_impl
.as_mut()
.unwrap()
.async_send_with_callback(msg, send_callback)
.await?;
Ok(None)
} else {
self.default_mqproducer_impl
.as_mut()
.unwrap()
.async_send_with_message_queue_callback(msg, mq.unwrap(), send_callback)
.await?;
Ok(None)
}
}

pub async fn send_by_accumulator<M>(
&mut self,
mut msg: M,
mq: Option<MessageQueue>,
send_callback: Option<SendMessageCallback>,
) -> Result<Option<SendResult>>
where
M: MessageTrait + Send + std::clone::Clone + std::marker::Sync,
{
if !self.can_batch(&msg) {
self.send_direct(msg, mq, send_callback).await
} else {
Validators::check_message(Some(&msg), self.producer_config())?;
MessageClientIDSetter::set_uniq_id(&mut msg);
if send_callback.is_none() {
let mq_producer = self.clone();
self.producer_config
.produce_accumulator
.as_mut()
.unwrap()
.send(msg, mq, mq_producer)
} else {
let mq_producer = self.clone();
self.producer_config
.produce_accumulator
.as_mut()
.unwrap()
.send_callback(msg, send_callback, mq_producer)?;
Ok(None)
}
}
}

fn can_batch<M>(&self, msg: &M) -> bool
where
M: MessageTrait,
{
// produceAccumulator is full
if !self
.producer_config
.produce_accumulator
.as_ref()
.unwrap()
.try_add_message(msg)
{
return false;
}
// delay message do not support batch processing
if msg.get_delay_time_level() > 0
|| msg.get_delay_time_ms() > 0
|| msg.get_delay_time_sec() > 0
|| msg.get_deliver_time_ms() > 0
{
return false;
}
// retry message do not support batch processing
if msg
.get_topic()
.starts_with(mix_all::RETRY_GROUP_TOPIC_PREFIX)
{
return false;
}
// message which have been assigned to producer group do not support batch processing
if msg
.get_properties()
.contains_key(MessageConst::PROPERTY_PRODUCER_GROUP)
{
return false;
}
true
}
}

impl DefaultMQProducer {
Expand Down Expand Up @@ -550,8 +670,19 @@ impl MQProducer for DefaultMQProducer {
todo!()
}

async fn send(&self, msg: &Message) -> Result<SendResult> {
todo!()
async fn send<M>(&mut self, mut msg: M) -> Result<SendResult>
where
M: MessageTrait + Clone + Send + Sync,
{
msg.set_topic(self.with_namespace(msg.get_topic()).as_str());

let result =
if self.get_auto_batch() && msg.as_any().downcast_ref::<MessageBatch>().is_none() {
self.send_by_accumulator(msg, None, None).await
} else {
self.send_direct(msg, None, None).await
}?;
Ok(result.expect("SendResult should not be None"))
}

async fn send_with_timeout(&mut self, mut msg: Message, timeout: u64) -> Result<SendResult> {
Expand All @@ -565,17 +696,43 @@ impl MQProducer for DefaultMQProducer {
Ok(result.expect("SendResult should not be None"))
}

async fn send_with_callback(&self, msg: &Message, send_callback: impl SendCallback) {
todo!()
async fn send_with_callback<M, F>(&mut self, mut msg: M, send_callback: F) -> Result<()>
where
M: MessageTrait + Clone + Send + Sync,
F: Fn(Option<&SendResult>, Option<&dyn std::error::Error>) + Send + Sync + 'static,
{
msg.set_topic(self.with_namespace(msg.get_topic()).as_str());
let send_callback_inner = Arc::new(send_callback);
let result =
if self.get_auto_batch() && msg.as_any().downcast_ref::<MessageBatch>().is_none() {
self.send_by_accumulator(msg, None, Some(send_callback_inner.clone()))
.await
} else {
self.send_direct(msg, None, Some(send_callback_inner.clone()))
.await
};
if let Err(err) = result {
send_callback_inner(None, Some(&err));
}
Ok(())
}

async fn send_with_callback_timeout(
&self,
msg: &Message,
send_callback: impl SendCallback,
async fn send_with_callback_timeout<F>(
&mut self,
mut msg: Message,
send_callback: F,
timeout: u64,
) {
todo!()
) -> Result<()>
where
F: Fn(Option<&SendResult>, Option<&dyn std::error::Error>) + Send + Sync + 'static,
{
msg.set_topic(self.with_namespace(msg.topic.as_str()).as_str());
self.default_mqproducer_impl
.as_mut()
.unwrap()
.async_send_with_callback_timeout(msg, Some(Arc::new(send_callback)), timeout)
.await?;
Ok(())
}

async fn send_oneway(&self, msg: &Message) -> Result<()> {
Expand Down
22 changes: 15 additions & 7 deletions rocketmq-client/src/producer/mq_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::any::Any;

use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::message::message_single::Message;
use rocketmq_common::common::message::MessageTrait;

use crate::producer::message_queue_selector::MessageQueueSelector;
use crate::producer::request_callback::RequestCallback;
Expand Down Expand Up @@ -61,7 +62,9 @@ pub trait MQProducerLocal {
///
/// # Returns
/// A `Result` containing the `SendResult`, or an error.
async fn send(&self, msg: &Message) -> Result<SendResult>;
async fn send<M>(&mut self, msg: M) -> Result<SendResult>
where
M: MessageTrait + Clone + Send + Sync;

/// Sends a message with a timeout.
///
Expand All @@ -83,7 +86,10 @@ pub trait MQProducerLocal {
/// # Arguments
/// * `msg` - A reference to the `Message` to be sent.
/// * `send_callback` - A callback function to be invoked with the result of the send operation.
async fn send_with_callback(&self, msg: &Message, send_callback: impl SendCallback);
async fn send_with_callback<M, F>(&mut self, msg: M, send_callback: F) -> Result<()>
where
M: MessageTrait + Clone + Send + Sync,
F: Fn(Option<&SendResult>, Option<&dyn std::error::Error>) + Send + Sync + 'static;

/// Sends a message with a callback and a timeout.
///
Expand All @@ -94,12 +100,14 @@ pub trait MQProducerLocal {
/// * `msg` - A reference to the `Message` to be sent.
/// * `send_callback` - A callback function to be invoked with the result of the send operation.
/// * `timeout` - The timeout duration in milliseconds.
async fn send_with_callback_timeout(
&self,
msg: &Message,
send_callback: impl SendCallback,
async fn send_with_callback_timeout<F>(
&mut self,
msg: Message,
send_callback: F,
timeout: u64,
);
) -> Result<()>
where
F: Fn(Option<&SendResult>, Option<&dyn std::error::Error>) + Send + Sync + 'static;

/// Sends a message without waiting for a response.
///
Expand Down
Loading

0 comments on commit 34af22a

Please sign in to comment.