Skip to content

Commit

Permalink
[ISSUE #844]✨Add MQProducer trait for sdk client🚀 (#845)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Jul 28, 2024
1 parent 7afc534 commit f5f8922
Show file tree
Hide file tree
Showing 17 changed files with 1,224 additions and 2 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions rocketmq-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,13 @@ description.workspace = true

[dependencies]
rocketmq-common = { workspace = true }
rocketmq-remoting = { workspace = true }

thiserror = { workspace = true }

#json spupport
serde.workspace = true
serde_json.workspace = true

trait-variant = { workspace = true }
num_cpus = "1.16.0"
2 changes: 2 additions & 0 deletions rocketmq-client/src/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod access_channel;
mod client_config;
mod mq_admin;
mod query_result;
55 changes: 55 additions & 0 deletions rocketmq-client/src/base/access_channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use serde::ser::SerializeStruct;
use serde::Deserialize;
use serde::Deserializer;
use serde::Serialize;
use serde::Serializer;

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum AccessChannel {
Local,
Cloud,
}

impl Serialize for AccessChannel {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("AccessChannel", 1)?;
match *self {
AccessChannel::Local => state.serialize_field("AccessChannel", "LOCAL")?,
AccessChannel::Cloud => state.serialize_field("AccessChannel", "CLOUD")?,
}
state.end()
}
}

impl<'de> Deserialize<'de> for AccessChannel {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let s: &str = Deserialize::deserialize(deserializer)?;
match s {
"LOCAL" => Ok(AccessChannel::Local),
"CLOUD" => Ok(AccessChannel::Cloud),
_ => Err(serde::de::Error::custom("unknown AccessChannel variant")),
}
}
}
112 changes: 112 additions & 0 deletions rocketmq-client/src/base/client_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::env;
use std::time::Duration;

use rocketmq_common::utils::name_server_address_utils::NameServerAddressUtils;
use rocketmq_common::utils::network_util::NetworkUtil;
use rocketmq_remoting::protocol::LanguageCode;

use crate::base::access_channel::AccessChannel;

pub const SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY: &str = "com.rocketmq.sendMessageWithVIPChannel";
pub const SOCKS_PROXY_CONFIG: &str = "com.rocketmq.socks.proxy.config";
pub const DECODE_READ_BODY: &str = "com.rocketmq.read.body";
pub const DECODE_DECOMPRESS_BODY: &str = "com.rocketmq.decompress.body";
pub const SEND_LATENCY_ENABLE: &str = "com.rocketmq.sendLatencyEnable";
pub const START_DETECTOR_ENABLE: &str = "com.rocketmq.startDetectorEnable";
pub const HEART_BEAT_V2: &str = "com.rocketmq.heartbeat.v2";

#[allow(dead_code)]
pub struct ClientConfig {
pub namesrv_addr: Option<String>,
pub client_ip: Option<String>,
pub instance_name: String,
pub client_callback_executor_threads: usize,
pub namespace: Option<String>,
pub namespace_v2: Option<String>,
pub access_channel: AccessChannel,
pub poll_name_server_interval: u32,
pub heartbeat_broker_interval: u32,
pub persist_consumer_offset_interval: u32,
pub pull_time_delay_millis_when_exception: u32,
pub unit_mode: bool,
pub unit_name: Option<String>,
pub decode_read_body: bool,
pub decode_decompress_body: bool,
pub vip_channel_enabled: bool,
pub use_heartbeat_v2: bool,
pub use_tls: bool,
pub socks_proxy_config: String,
pub mq_client_api_timeout: u32,
pub detect_timeout: u32,
pub detect_interval: u32,
pub language: LanguageCode,
pub enable_stream_request_type: bool,
pub send_latency_enable: bool,
pub start_detector_enable: bool,
pub enable_heartbeat_channel_event_listener: bool,
pub enable_trace: bool,
pub trace_topic: Option<String>,
}

impl ClientConfig {
pub fn new() -> Self {
ClientConfig {
namesrv_addr: NameServerAddressUtils::get_name_server_addresses(),
client_ip: NetworkUtil::get_local_address(),
instance_name: env::var("rocketmq.client.name")
.unwrap_or_else(|_| "DEFAULT".to_string()),
client_callback_executor_threads: num_cpus::get(),
namespace: None,
namespace_v2: None,
access_channel: AccessChannel::Local,
poll_name_server_interval: Duration::from_secs(30).as_millis() as u32,
heartbeat_broker_interval: Duration::from_secs(30).as_millis() as u32,
persist_consumer_offset_interval: Duration::from_secs(5).as_millis() as u32,
pull_time_delay_millis_when_exception: 1000,
unit_mode: false,
unit_name: None,
decode_read_body: env::var(DECODE_READ_BODY).unwrap_or_else(|_| "true".to_string())
== "true",
decode_decompress_body: env::var(DECODE_DECOMPRESS_BODY)
.unwrap_or_else(|_| "true".to_string())
== "true",
vip_channel_enabled: env::var(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY)
.unwrap_or_else(|_| "false".to_string())
== "false",
use_heartbeat_v2: env::var(HEART_BEAT_V2).unwrap_or_else(|_| "false".to_string())
== "false",
use_tls: false,
socks_proxy_config: env::var(SOCKS_PROXY_CONFIG).unwrap_or_else(|_| "{}".to_string()),
mq_client_api_timeout: Duration::from_secs(3).as_millis() as u32,
detect_timeout: 200,
detect_interval: Duration::from_secs(2).as_millis() as u32,
language: LanguageCode::JAVA,
enable_stream_request_type: false,
send_latency_enable: env::var(SEND_LATENCY_ENABLE)
.unwrap_or_else(|_| "false".to_string())
== "false",
start_detector_enable: env::var(START_DETECTOR_ENABLE)
.unwrap_or_else(|_| "false".to_string())
== "false",
enable_heartbeat_channel_event_listener: true,
enable_trace: false,
trace_topic: None,
}
}
}
13 changes: 11 additions & 2 deletions rocketmq-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ use thiserror::Error;

#[derive(Debug, Error)]
pub enum MQClientError {
#[error("Client exception occurred: {0}")]
ClientException(String),
#[error("Client exception occurred: CODE:{0}, Message:{1}")]
MQClientException(i32, String),

#[error("Client exception occurred: CODE:{0}, broker address:{1}, Message:{2}")]
MQBrokerException(i32, String, String),

#[error("Client exception occurred: CODE:{0}, Message:{1}")]
RequestTimeoutException(i32, String),

#[error("Client exception occurred: CODE:{0}, broker address:{1}, Message:{2}")]
OffsetNotFoundException(i32, String, String),
}
1 change: 1 addition & 0 deletions rocketmq-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ use crate::error::MQClientError;

pub mod base;
pub mod error;
pub mod producer;

pub type Result<T> = std::result::Result<T, MQClientError>;
22 changes: 22 additions & 0 deletions rocketmq-client/src/producer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
pub mod local_transaction_state;
pub mod message_queue_selector;
pub mod mq_producer;
pub mod send_result;
pub mod send_status;
pub mod transaction_send_result;
78 changes: 78 additions & 0 deletions rocketmq-client/src/producer/local_transaction_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::fmt;

use serde::Deserialize;
use serde::Deserializer;
use serde::Serialize;
use serde::Serializer;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum LocalTransactionState {
#[default]
CommitMessage,
RollbackMessage,
//java is UNKNOW
Unknown,
}

impl Serialize for LocalTransactionState {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let value = match self {
LocalTransactionState::CommitMessage => "COMMIT_MESSAGE",
LocalTransactionState::RollbackMessage => "ROLLBACK_MESSAGE",
LocalTransactionState::Unknown => "UNKNOW",
};
serializer.serialize_str(value)
}
}

impl<'de> Deserialize<'de> for LocalTransactionState {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
struct StoreTypeVisitor;

impl<'de> serde::de::Visitor<'de> for StoreTypeVisitor {
type Value = LocalTransactionState;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a string representing SendStatus")
}

fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
match value {
"COMMIT_MESSAGE" => Ok(LocalTransactionState::CommitMessage),
"ROLLBACK_MESSAGE" => Ok(LocalTransactionState::RollbackMessage),
"UNKNOW" | "UNKNOWN" => Ok(LocalTransactionState::Unknown),
_ => Err(serde::de::Error::unknown_variant(
value,
&["COMMIT_MESSAGE", "ROLLBACK_MESSAGE", "UNKNOWN", "UNKNOW"],
)),
}
}
}
deserializer.deserialize_str(StoreTypeVisitor)
}
}
35 changes: 35 additions & 0 deletions rocketmq-client/src/producer/message_queue_selector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::message::message_single::Message;

/// A trait for selecting a message queue.
///
/// This trait defines a method for selecting a message queue from a list of available queues
/// based on the provided message and an additional argument.
pub trait MessageQueueSelector {
/// Selects a message queue from the provided list.
///
/// # Arguments
/// * `mqs` - A reference to a vector of `MessageQueue` from which to select.
/// * `msg` - A reference to the `Message` for which the queue is being selected.
/// * `arg` - An additional argument that can be used in the selection process.
///
/// # Returns
/// The selected `MessageQueue`.
fn select(&self, mqs: &[MessageQueue], msg: &Message, arg: &dyn std::any::Any) -> MessageQueue;
}
Loading

0 comments on commit f5f8922

Please sign in to comment.