Skip to content

Commit

Permalink
[ISSUE #896]🔥Implement Produer send batch message🚀
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Aug 15, 2024
1 parent 9568572 commit 12d30df
Show file tree
Hide file tree
Showing 19 changed files with 883 additions and 209 deletions.
6 changes: 3 additions & 3 deletions rocketmq-broker/src/processor/send_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ impl<MS: MessageStore> SendMessageProcessor<MS> {
message_ext
.message_ext_inner
.message
.put_property(MessageConst::PROPERTY_CLUSTER.to_string(), cluster_name);
.put_property(MessageConst::PROPERTY_CLUSTER, cluster_name.as_str());

let mut batch_message = MessageExtBatch {
message_ext_broker_inner: message_ext,
Expand Down Expand Up @@ -325,8 +325,8 @@ impl<MS: MessageStore> SendMessageProcessor<MS> {
.message_ext_inner
.message
.put_property(
MessageConst::PROPERTY_INNER_NUM.to_string(),
inner_num.to_string(),
MessageConst::PROPERTY_INNER_NUM,
inner_num.to_string().as_str(),
);
batch_message.message_ext_broker_inner.properties_string = message_properties_to_string(
batch_message
Expand Down
11 changes: 10 additions & 1 deletion rocketmq-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,13 @@ parking_lot = { workspace = true }

[[example]]
name = "simple-producer"
path = "examples/producer/simple_producer.rs"
path = "examples/producer/simple_producer.rs"

[[example]]
name = "producer"
path = "examples/quickstart/producer.rs"

[[example]]
name = "simple-batch-producer"
path = "examples/batch/simple_batch_producer.rs"

63 changes: 63 additions & 0 deletions rocketmq-client/examples/batch/simple_batch_producer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use rocketmq_client::producer::default_mq_producer::DefaultMQProducer;
use rocketmq_client::producer::mq_producer::MQProducer;
use rocketmq_common::common::message::message_single::Message;
use rocketmq_rust::rocketmq;

pub const PRODUCER_GROUP: &str = "BatchProducerGroupName";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "TopicTest";
pub const TAG: &str = "TagA";

#[rocketmq::main]
pub async fn main() -> rocketmq_client::Result<()> {
//init logger
rocketmq_common::log::init_logger();

// create a producer builder with default configuration
let builder = DefaultMQProducer::builder();

let mut producer = builder
.producer_group(PRODUCER_GROUP.to_string())
.name_server_addr(DEFAULT_NAMESRVADDR.to_string())
.build();
producer.start().await?;

let mut messages = Vec::new();
messages.push(Message::with_keys(
TOPIC,
TAG,
"OrderID001",
"Hello world 0".as_bytes(),
));
messages.push(Message::with_keys(
TOPIC,
TAG,
"OrderID002",
"Hello world 1".as_bytes(),
));
messages.push(Message::with_keys(
TOPIC,
TAG,
"OrderID003",
"Hello world 2".as_bytes(),
));
let send_result = producer.send_batch(messages).await?;
println!("send result: {}", send_result);
Ok(())
}
1 change: 1 addition & 0 deletions rocketmq-client/examples/producer/simple_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use rocketmq_client::producer::default_mq_producer::DefaultMQProducer;
use rocketmq_client::producer::mq_producer::MQProducer;
use rocketmq_client::Result;
Expand Down
53 changes: 53 additions & 0 deletions rocketmq-client/examples/quickstart/producer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use rocketmq_client::producer::default_mq_producer::DefaultMQProducer;
use rocketmq_client::producer::mq_producer::MQProducer;
use rocketmq_client::Result;
use rocketmq_common::common::message::message_single::Message;
use rocketmq_rust::rocketmq;

pub const MESSAGE_COUNT: usize = 1;
pub const PRODUCER_GROUP: &str = "please_rename_unique_group_name";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "TopicTest";
pub const TAG: &str = "TagA";

#[rocketmq::main]
pub async fn main() -> Result<()> {
//init logger
rocketmq_common::log::init_logger();

// create a producer builder with default configuration
let builder = DefaultMQProducer::builder();

let mut producer = builder
.producer_group(PRODUCER_GROUP.to_string())
.name_server_addr(DEFAULT_NAMESRVADDR.to_string())
.build();

producer.start().await?;

for _ in 0..10 {
let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes());

let send_result = producer.send_with_timeout(message, 2000).await?;
println!("send result: {}", send_result);
}
producer.shutdown().await;

Ok(())
}
4 changes: 2 additions & 2 deletions rocketmq-client/src/base/validators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ impl Validators {
}

pub fn check_message(
msg: &Option<Message>,
default_mq_producer: &DefaultMQProducer,
msg: Option<&Message>,
default_mq_producer: &mut DefaultMQProducer,
) -> Result<()> {
if msg.is_none() {
return Err(MQClientException(
Expand Down
4 changes: 2 additions & 2 deletions rocketmq-client/src/hook/check_forbidden_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::message::message_single::Message;
use rocketmq_common::common::message::MessageTrait;

use crate::implementation::communication_mode::CommunicationMode;
use crate::producer::send_result::SendResult;
Expand All @@ -24,7 +24,7 @@ use crate::producer::send_result::SendResult;
pub struct CheckForbiddenContext<'a> {
pub name_srv_addr: Option<String>,
pub group: Option<String>,
pub message: Option<&'a Message>,
pub message: Option<&'a dyn MessageTrait>,
pub mq: Option<&'a MessageQueue>,
pub broker_addr: Option<String>,
pub communication_mode: Option<CommunicationMode>,
Expand Down
4 changes: 2 additions & 2 deletions rocketmq-client/src/hook/send_message_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;

use rocketmq_common::common::message::message_enum::MessageType;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::message::message_single::Message;
use rocketmq_common::common::message::MessageTrait;

use crate::implementation::communication_mode::CommunicationMode;
use crate::producer::producer_impl::default_mq_producer_impl::DefaultMQProducerImpl;
Expand All @@ -29,7 +29,7 @@ use crate::producer::send_result::SendResult;
#[derive(Default)]
pub struct SendMessageContext<'a> {
pub producer_group: Option<String>,
pub message: Option<Message>,
pub message: Option<Box<dyn MessageTrait + Send + Sync>>,
pub mq: Option<&'a MessageQueue>,
pub broker_addr: Option<String>,
pub born_host: Option<String>,
Expand Down
58 changes: 35 additions & 23 deletions rocketmq-client/src/implementation/mq_client_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use lazy_static::lazy_static;
use rocketmq_common::common::message::message_batch::MessageBatch;
use rocketmq_common::common::message::message_client_id_setter::MessageClientIDSetter;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::message::message_single::Message;
use rocketmq_common::common::message::MessageConst;
use rocketmq_common::common::message::MessageTrait;
use rocketmq_common::common::mix_all;
use rocketmq_common::common::namesrv::default_top_addressing::DefaultTopAddressing;
use rocketmq_common::common::namesrv::name_server_update_callback::NameServerUpdateCallback;
Expand Down Expand Up @@ -226,11 +226,11 @@ impl MQClientAPIImpl {
self.remoting_client.get_name_server_address_list()
}

pub async fn send_message(
pub async fn send_message<T>(
&mut self,
addr: &str,
broker_name: &str,
msg: &mut Message,
msg: &mut T,
request_header: SendMessageRequestHeader,
timeout_millis: u64,
communication_mode: CommunicationMode,
Expand All @@ -240,7 +240,10 @@ impl MQClientAPIImpl {
retry_times_when_send_failed: u32,
context: &mut Option<SendMessageContext<'_>>,
producer: &DefaultMQProducerImpl,
) -> Result<Option<SendResult>> {
) -> Result<Option<SendResult>>
where
T: MessageTrait,
{
let begin_start_time = Instant::now();
let msg_type = msg.get_property(MessageConst::PROPERTY_MESSAGE_TYPE);
let is_reply = msg_type.is_some() && msg_type.unwrap() == mix_all::REPLY_MESSAGE_FLAG;
Expand Down Expand Up @@ -279,11 +282,11 @@ impl MQClientAPIImpl {
};

// if compressed_body is not None, set request body to compressed_body
if msg.compressed_body.is_some() {
let compressed_body = std::mem::take(&mut msg.compressed_body);
if msg.get_compressed_body_mut().is_some() {
let compressed_body = std::mem::take(msg.get_compressed_body_mut());
request.set_body_mut_ref(compressed_body);
} else {
request.set_body_mut_ref(msg.body().clone());
request.set_body_mut_ref(msg.get_body().cloned());
}
match communication_mode {
CommunicationMode::Sync => {
Expand Down Expand Up @@ -338,17 +341,20 @@ impl MQClientAPIImpl {
}
}

pub async fn send_message_simple(
pub async fn send_message_simple<T>(
&mut self,
addr: &str,
broker_name: &str,
msg: &mut Message,
msg: &mut T,
request_header: SendMessageRequestHeader,
timeout_millis: u64,
communication_mode: CommunicationMode,
context: &mut Option<SendMessageContext<'_>>,
producer: &DefaultMQProducerImpl,
) -> Result<Option<SendResult>> {
) -> Result<Option<SendResult>>
where
T: MessageTrait,
{
self.send_message(
addr,
broker_name,
Expand All @@ -366,26 +372,29 @@ impl MQClientAPIImpl {
.await
}

async fn send_message_sync(
async fn send_message_sync<T>(
&mut self,
addr: &str,
broker_name: &str,
msg: &Message,
msg: &T,
timeout_millis: u64,
request: RemotingCommand,
) -> Result<SendResult> {
) -> Result<SendResult>
where
T: MessageTrait,
{
let response = self
.remoting_client
.invoke_async(Some(addr.to_string()), request, timeout_millis)
.await?;
self.process_send_response(broker_name, msg, &response, addr)
}

async fn send_message_async(
async fn send_message_async<T: MessageTrait>(
&mut self,
addr: &str,
broker_name: &str,
msg: &Message,
msg: &T,
timeout_millis: u64,
request: RemotingCommand,
send_callback: Option<Arc<Box<dyn SendCallback>>>,
Expand Down Expand Up @@ -455,13 +464,16 @@ impl MQClientAPIImpl {
}
}

fn process_send_response(
fn process_send_response<T>(
&mut self,
broker_name: &str,
msg: &Message,
msg: &T,
response: &RemotingCommand,
addr: &str,
) -> Result<SendResult> {
) -> Result<SendResult>
where
T: MessageTrait,
{
let response_code = ResponseCode::from(response.code());
let send_status = match response_code {
ResponseCode::FlushDiskTimeout => SendStatus::FlushDiskTimeout,
Expand All @@ -479,7 +491,7 @@ impl MQClientAPIImpl {
let response_header = response
.decode_command_custom_header_fast::<SendMessageResponseHeader>()
.unwrap();
let mut topic = msg.topic.as_str().to_string();
let mut topic = msg.get_topic().to_string();
let namespace = self.client_config.get_namespace();
if namespace.is_some() && !namespace.as_ref().unwrap().is_empty() {
topic = NamespaceUtil::without_namespace_with_namespace(
Expand All @@ -493,7 +505,7 @@ impl MQClientAPIImpl {
let msgs = msg.as_any().downcast_ref::<MessageBatch>();
if msgs.is_some() && response_header.batch_uniq_id().is_none() {
let mut sb = String::new();
for msg in msgs.unwrap().messages.iter() {
for msg in msgs.unwrap().messages.as_ref().unwrap().iter() {
sb.push_str(if sb.is_empty() { "" } else { "," });
sb.push_str(MessageClientIDSetter::get_uniq_id(msg).unwrap().as_str());
}
Expand Down Expand Up @@ -527,10 +539,10 @@ impl MQClientAPIImpl {
Ok(send_result)
}

async fn on_exception_impl(
async fn on_exception_impl<T: MessageTrait>(
&mut self,
broker_name: &str,
msg: &Message,
msg: &T,
timeout_millis: u64,
mut request: RemotingCommand,
send_callback: Option<Arc<Box<dyn SendCallback>>>,
Expand Down Expand Up @@ -564,7 +576,7 @@ impl MQClientAPIImpl {
warn!(
"async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}",
tmp,
msg.topic(),
msg.get_topic(),
addr,
retry_broker_name
);
Expand Down
Loading

0 comments on commit 12d30df

Please sign in to comment.