Skip to content

Commit

Permalink
[ISSUE #883]Implement Produer send single message-3 (#885)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Aug 12, 2024
1 parent 708bca7 commit 2f7a2a9
Show file tree
Hide file tree
Showing 11 changed files with 463 additions and 36 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions rocketmq-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ lazy_static = { workspace = true }
tracing.workspace = true
tracing-subscriber.workspace = true
regex = { version = "1.10.6", features = [] }

parking_lot = { workspace = true }

[[example]]
name = "simple-producer"
path = "examples/producer/simple_producer.rs"
8 changes: 5 additions & 3 deletions rocketmq-client/examples/producer/simple_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ pub async fn main() -> Result<()> {

producer.start().await?;

let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes());

producer.send_with_timeout(message, 2000).await?;
for _ in 0..10 {
let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes());

let send_result = producer.send_with_timeout(message, 2000).await?;
println!("send result: {}", send_result);
}
producer.shutdown().await;

Ok(())
Expand Down
18 changes: 16 additions & 2 deletions rocketmq-client/src/factory/mq_client_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,11 +429,25 @@ impl MQClientInstance {
}

pub async fn get_broker_name_from_message_queue(&self, message_queue: &MessageQueue) -> String {
unimplemented!()
let guard = self.topic_end_points_table.read().await;
if let Some(broker_name) = guard.get(message_queue.get_topic()) {
if let Some(addr) = broker_name.get(message_queue) {
return addr.clone();
}
}
message_queue.get_broker_name().to_string()
}

pub async fn find_broker_address_in_publish(&self, broker_name: &str) -> Option<String> {
unimplemented!()
if broker_name.is_empty() {
return None;
}
let guard = self.broker_addr_table.read().await;
let map = guard.get(broker_name);
if let Some(map) = map {
return map.get(&(mix_all::MASTER_ID as i64)).cloned();
}
None
}
}

Expand Down
16 changes: 12 additions & 4 deletions rocketmq-client/src/implementation/mq_client_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,16 @@ impl MQClientAPIImpl {
"sendMessage call timeout".to_string(),
));
}
let result = self
.send_message_sync(
addr,
broker_name,
msg,
timeout_millis - cost_time_sync,
request,
)
.await?;
Ok(Some(result))
}
CommunicationMode::Async => {
let times = AtomicU32::new(0);
Expand All @@ -311,17 +321,15 @@ impl MQClientAPIImpl {
producer,
))
.await;
return Ok(None);
Ok(None)
}
CommunicationMode::Oneway => {
self.remoting_client
.invoke_oneway(addr.to_string(), request, timeout_millis)
.await;
return Ok(None);
Ok(None)
}
}

unimplemented!()
}

pub async fn send_message_simple(
Expand Down
9 changes: 8 additions & 1 deletion rocketmq-client/src/latency/latency_fault_tolerance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
pub trait LatencyFaultTolerance<T> {
use crate::latency::resolver::Resolver;
use crate::latency::service_detector::ServiceDetector;

pub trait LatencyFaultTolerance<T>: Send + Sync + 'static {
/// Update brokers' states, to decide if they are good or not.
///
/// # Arguments
Expand Down Expand Up @@ -105,4 +108,8 @@ pub trait LatencyFaultTolerance<T> {
///
/// * `true` if the detector should be started, `false` otherwise.
fn is_start_detector_enable(&self) -> bool;

fn set_resolver(&mut self, resolver: Box<dyn Resolver>);

fn set_service_detector(&mut self, service_detector: Box<dyn ServiceDetector>);
}
235 changes: 224 additions & 11 deletions rocketmq-client/src/latency/latency_fault_tolerance_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,32 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::collections::HashMap;

use crate::latency::latency_fault_tolerance::LatencyFaultTolerance;
use crate::latency::resolver::Resolver;
use crate::latency::service_detector::ServiceDetector;

pub struct LatencyFaultToleranceImpl {
resolver: Box<dyn Resolver>,
service_detector: Box<dyn ServiceDetector>,
fault_item_table: parking_lot::Mutex<HashMap<String, FaultItem>>,
detect_timeout: i32,
detect_interval: i32,
which_item_worst: ThreadLocalIndex,
start_detector_enable: AtomicBool,
resolver: Option<Box<dyn Resolver>>,
service_detector: Option<Box<dyn ServiceDetector>>,
}

impl LatencyFaultToleranceImpl {
pub fn new(fetcher: impl Resolver, service_detector: impl ServiceDetector) -> Self {
pub fn new(/*fetcher: impl Resolver, service_detector: impl ServiceDetector*/) -> Self {
Self {
resolver: Box::new(fetcher),
service_detector: Box::new(service_detector),
resolver: None,
service_detector: None,
fault_item_table: Default::default(),
detect_timeout: 200,
detect_interval: 2000,
which_item_worst: Default::default(),
start_detector_enable: AtomicBool::new(false),
}
}
}
Expand All @@ -40,11 +52,29 @@ impl LatencyFaultTolerance<String> for LatencyFaultToleranceImpl {
not_available_duration: u64,
reachable: bool,
) {
todo!()
let mut table = self.fault_item_table.lock();
let fault_item = table
.entry(name.clone())
.or_insert_with(|| FaultItem::new(name.clone()));

fault_item.set_current_latency(current_latency);
fault_item.update_not_available_duration(not_available_duration);
fault_item.set_reachable(reachable);

if !reachable {
info!(
"{} is unreachable, it will not be used until it's reachable",
name
);
}
}

fn is_available(&self, name: &String) -> bool {
todo!()
let fault_item_table = self.fault_item_table.lock();
if let Some(fault_item) = fault_item_table.get(name) {
return fault_item.is_available();
}
true
}

fn is_reachable(&self, name: &String) -> bool {
Expand All @@ -63,9 +93,7 @@ impl LatencyFaultTolerance<String> for LatencyFaultToleranceImpl {
todo!()
}

fn shutdown(&self) {
todo!()
}
fn shutdown(&self) {}

fn detect_by_one_round(&self) {
todo!()
Expand All @@ -80,10 +108,195 @@ impl LatencyFaultTolerance<String> for LatencyFaultToleranceImpl {
}

fn set_start_detector_enable(&mut self, start_detector_enable: bool) {
todo!()
self.start_detector_enable
.store(start_detector_enable, std::sync::atomic::Ordering::Relaxed);
}

fn is_start_detector_enable(&self) -> bool {
todo!()
}

fn set_resolver(&mut self, resolver: Box<dyn Resolver>) {
self.resolver = Some(resolver);
}

fn set_service_detector(&mut self, service_detector: Box<dyn ServiceDetector>) {
self.service_detector = Some(service_detector);
}
}

use std::cmp::Ordering;
use std::hash::Hash;
use std::sync::atomic::AtomicBool;

use rocketmq_common::TimeUtils::get_current_millis;
use tracing::info;

use crate::common::thread_local_index::ThreadLocalIndex;

#[derive(Debug)]
pub struct FaultItem {
name: String,
current_latency: std::sync::atomic::AtomicU64,
start_timestamp: std::sync::atomic::AtomicU64,
check_stamp: std::sync::atomic::AtomicU64,
reachable_flag: std::sync::atomic::AtomicBool,
}

impl FaultItem {
pub fn new(name: String) -> Self {
FaultItem {
name,
current_latency: std::sync::atomic::AtomicU64::new(0),
start_timestamp: std::sync::atomic::AtomicU64::new(0),
check_stamp: std::sync::atomic::AtomicU64::new(0),
reachable_flag: std::sync::atomic::AtomicBool::new(true),
}
}

pub fn update_not_available_duration(&self, not_available_duration: u64) {
let now = get_current_millis();
if not_available_duration > 0
&& now + not_available_duration
> self
.start_timestamp
.load(std::sync::atomic::Ordering::Relaxed)
{
self.start_timestamp.store(
now + not_available_duration,
std::sync::atomic::Ordering::Relaxed,
);
println!(
"{} will be isolated for {} ms.",
self.name, not_available_duration
);
}
}

pub fn set_reachable(&self, reachable_flag: bool) {
self.reachable_flag
.store(reachable_flag, std::sync::atomic::Ordering::Relaxed);
}

pub fn set_check_stamp(&self, check_stamp: u64) {
self.check_stamp
.store(check_stamp, std::sync::atomic::Ordering::Relaxed);
}

pub fn is_available(&self) -> bool {
let now = get_current_millis();
now >= self
.start_timestamp
.load(std::sync::atomic::Ordering::Relaxed)
}

pub fn is_reachable(&self) -> bool {
self.reachable_flag
.load(std::sync::atomic::Ordering::Relaxed)
}

pub fn set_current_latency(&self, latency: u64) {
self.current_latency
.store(latency, std::sync::atomic::Ordering::Relaxed);
}

pub fn get_current_latency(&self) -> u64 {
self.current_latency
.load(std::sync::atomic::Ordering::Relaxed)
}

pub fn get_start_timestamp(&self) -> u64 {
self.start_timestamp
.load(std::sync::atomic::Ordering::Relaxed)
}
}

impl Eq for FaultItem {}

impl Ord for FaultItem {
fn cmp(&self, other: &Self) -> Ordering {
if self.is_available() != other.is_available() {
if self.is_available() {
return Ordering::Less;
}
if other.is_available() {
return Ordering::Greater;
}
}

match self
.current_latency
.load(std::sync::atomic::Ordering::Relaxed)
.cmp(
&other
.current_latency
.load(std::sync::atomic::Ordering::Relaxed),
) {
Ordering::Equal => (),
ord => return ord,
}

match self
.start_timestamp
.load(std::sync::atomic::Ordering::Relaxed)
.cmp(
&other
.start_timestamp
.load(std::sync::atomic::Ordering::Relaxed),
) {
Ordering::Equal => (),
ord => return ord,
}

Ordering::Equal
}
}

impl PartialEq<Self> for FaultItem {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
&& self
.current_latency
.load(std::sync::atomic::Ordering::Relaxed)
== other
.current_latency
.load(std::sync::atomic::Ordering::Relaxed)
&& self
.start_timestamp
.load(std::sync::atomic::Ordering::Relaxed)
== other
.start_timestamp
.load(std::sync::atomic::Ordering::Relaxed)
}
}

impl PartialOrd for FaultItem {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Hash for FaultItem {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.name.hash(state);
self.current_latency
.load(std::sync::atomic::Ordering::Relaxed)
.hash(state);
self.start_timestamp
.load(std::sync::atomic::Ordering::Relaxed)
.hash(state);
}
}

impl std::fmt::Display for FaultItem {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"FaultItem{{ name='{}', current_latency={}, start_timestamp={}, reachable_flag={} }}",
self.name,
self.get_current_latency(),
self.get_start_timestamp(),
self.is_reachable()
)
}
}
Loading

0 comments on commit 2f7a2a9

Please sign in to comment.