Skip to content

Commit

Permalink
[ISSUE #1067] Supprot mq fault strategy (#1068)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Oct 21, 2024
1 parent 31dbf92 commit 2a9a342
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 100 deletions.
12 changes: 9 additions & 3 deletions rocketmq-client/src/implementation/mq_client_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,9 @@ impl MQClientAPIImpl {
}
}
let duration = (Instant::now() - begin_start_time).as_millis() as u64;
producer.update_fault_item(broker_name, duration, false, true);
producer
.update_fault_item(broker_name, duration, false, true)
.await;
return;
}
let send_result = self.process_send_response(broker_name, msg, &response, addr);
Expand All @@ -470,11 +472,15 @@ impl MQClientAPIImpl {
}
let duration = (Instant::now() - begin_start_time).as_millis() as u64;
send_callback.as_ref().unwrap()(Some(&result), None);
producer.update_fault_item(broker_name, duration, false, true);
producer
.update_fault_item(broker_name, duration, false, true)
.await;
}
Err(err) => {
let duration = (Instant::now() - begin_start_time).as_millis() as u64;
producer.update_fault_item(broker_name, duration, true, true);
producer
.update_fault_item(broker_name, duration, true, true)
.await;
Box::pin(self.on_exception_impl(
broker_name,
msg,
Expand Down
30 changes: 18 additions & 12 deletions rocketmq-client/src/latency/latency_fault_tolerance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::latency::resolver::Resolver;
use crate::latency::service_detector::ServiceDetector;
use std::any::Any;

pub trait LatencyFaultTolerance<T>: Send + Sync + 'static {
use rocketmq_common::ArcRefCellWrapper;

#[allow(async_fn_in_trait)]
pub trait LatencyFaultTolerance<T, R, S>: Send + Sync + 'static {
/// Update brokers' states, to decide if they are good or not.
///
/// # Arguments
Expand All @@ -27,7 +29,7 @@ pub trait LatencyFaultTolerance<T>: Send + Sync + 'static {
/// * `not_available_duration` - Corresponding not available time, ms. The broker will be not
/// available until it
/// * `reachable` - To decide if this broker is reachable or not.
fn update_fault_item(
async fn update_fault_item(
&mut self,
name: T,
current_latency: u64,
Expand All @@ -44,7 +46,7 @@ pub trait LatencyFaultTolerance<T>: Send + Sync + 'static {
/// # Returns
///
/// * `true` if the broker is available, `false` otherwise.
fn is_available(&self, name: &T) -> bool;
async fn is_available(&self, name: &T) -> bool;

/// To check if this broker is reachable.
///
Expand All @@ -55,30 +57,30 @@ pub trait LatencyFaultTolerance<T>: Send + Sync + 'static {
/// # Returns
///
/// * `true` if the broker is reachable, `false` otherwise.
fn is_reachable(&self, name: &T) -> bool;
async fn is_reachable(&self, name: &T) -> bool;

/// Remove the broker in this fault item table.
///
/// # Arguments
///
/// * `name` - Broker's name.
fn remove(&mut self, name: &T);
async fn remove(&mut self, name: &T);

/// The worst situation, no broker can be available. Then choose a random one.
///
/// # Returns
///
/// * A random broker will be returned.
fn pick_one_at_least(&self) -> T;
async fn pick_one_at_least(&self) -> Option<T>;

/// Start a new thread, to detect the broker's reachable tag.
fn start_detector(&self);
fn start_detector(this: ArcRefCellWrapper<Self>);

/// Shutdown threads that started by `LatencyFaultTolerance`.
fn shutdown(&self);

/// A function reserved, just detect by once, won't create a new thread.
fn detect_by_one_round(&self);
async fn detect_by_one_round(&self);

/// Use it to set the detect timeout bound.
///
Expand Down Expand Up @@ -109,7 +111,11 @@ pub trait LatencyFaultTolerance<T>: Send + Sync + 'static {
/// * `true` if the detector should be started, `false` otherwise.
fn is_start_detector_enable(&self) -> bool;

fn set_resolver(&mut self, resolver: Box<dyn Resolver>);
fn set_resolver(&mut self, resolver: R);

fn set_service_detector(&mut self, service_detector: S);

fn as_any(&self) -> &dyn Any;

fn set_service_detector(&mut self, service_detector: Box<dyn ServiceDetector>);
fn as_any_mut(&mut self) -> &mut dyn Any;
}
150 changes: 122 additions & 28 deletions rocketmq-client/src/latency/latency_fault_tolerance_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::any::Any;
use std::collections::HashMap;
use std::collections::HashSet;

use crate::latency::latency_fault_tolerance::LatencyFaultTolerance;
use crate::latency::resolver::Resolver;
use crate::latency::service_detector::ServiceDetector;

pub struct LatencyFaultToleranceImpl {
fault_item_table: parking_lot::Mutex<HashMap<String, FaultItem>>,
detect_timeout: i32,
detect_interval: i32,
pub struct LatencyFaultToleranceImpl<R, S> {
fault_item_table: tokio::sync::Mutex<HashMap<String, FaultItem>>,
detect_timeout: u32,
detect_interval: u32,
which_item_worst: ThreadLocalIndex,
start_detector_enable: AtomicBool,
resolver: Option<Box<dyn Resolver>>,
service_detector: Option<Box<dyn ServiceDetector>>,
resolver: Option<R>,
service_detector: Option<S>,
}

impl LatencyFaultToleranceImpl {
impl<R, S> LatencyFaultToleranceImpl<R, S> {
pub fn new(/*fetcher: impl Resolver, service_detector: impl ServiceDetector*/) -> Self {
Self {
resolver: None,
Expand All @@ -44,15 +46,19 @@ impl LatencyFaultToleranceImpl {
}
}

impl LatencyFaultTolerance<String> for LatencyFaultToleranceImpl {
fn update_fault_item(
impl<R, S> LatencyFaultTolerance<String, R, S> for LatencyFaultToleranceImpl<R, S>
where
R: Resolver,
S: ServiceDetector,
{
async fn update_fault_item(
&mut self,
name: String,
current_latency: u64,
not_available_duration: u64,
reachable: bool,
) {
let mut table = self.fault_item_table.lock();
let mut table = self.fault_item_table.lock().await;
let fault_item = table
.entry(name.clone())
.or_insert_with(|| FaultItem::new(name.clone()));
Expand All @@ -69,42 +75,120 @@ impl LatencyFaultTolerance<String> for LatencyFaultToleranceImpl {
}
}

fn is_available(&self, name: &String) -> bool {
let fault_item_table = self.fault_item_table.lock();
async fn is_available(&self, name: &String) -> bool {
let fault_item_table = self.fault_item_table.lock().await;
if let Some(fault_item) = fault_item_table.get(name) {
return fault_item.is_available();
}
true
}

fn is_reachable(&self, name: &String) -> bool {
todo!()
async fn is_reachable(&self, name: &String) -> bool {
let fault_item_table = self.fault_item_table.lock().await;
if let Some(fault_item) = fault_item_table.get(name) {
return fault_item.is_reachable();
}
true
}

fn remove(&mut self, name: &String) {
todo!()
async fn remove(&mut self, name: &String) {
self.fault_item_table.lock().await.remove(name);
}

fn pick_one_at_least(&self) -> String {
todo!()
async fn pick_one_at_least(&self) -> Option<String> {
let fault_item_table = self.fault_item_table.lock().await;
let mut tmp_list: Vec<_> = fault_item_table.values().collect();

if !tmp_list.is_empty() {
use rand::seq::SliceRandom;
let mut rng = rand::thread_rng();
tmp_list.shuffle(&mut rng);
for fault_item in tmp_list {
if fault_item
.reachable_flag
.load(std::sync::atomic::Ordering::Acquire)
{
return Some(fault_item.name.clone());
}
}
}
None
}

fn start_detector(&self) {
todo!()
fn start_detector(this: ArcRefCellWrapper<Self>) {
tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
if !this
.start_detector_enable
.load(std::sync::atomic::Ordering::Relaxed)
{
continue;
}

this.detect_by_one_round().await;
}
});
}

fn shutdown(&self) {}

fn detect_by_one_round(&self) {
todo!()
async fn detect_by_one_round(&self) {
let mut fault_item_table = self.fault_item_table.lock().await;
let mut remove_set = HashSet::new();
for (name, fault_item) in fault_item_table.iter() {
if get_current_millis() as i64
- (fault_item
.check_stamp
.load(std::sync::atomic::Ordering::Relaxed) as i64)
< 0
{
continue;
}
fault_item.check_stamp.store(
get_current_millis() + self.detect_interval as u64,
std::sync::atomic::Ordering::Release,
);
let broker_addr = self
.resolver
.as_ref()
.unwrap()
.resolve(fault_item.name.as_str())
.await;
if broker_addr.is_none() {
remove_set.insert(name.clone());
continue;
}
if self.service_detector.is_none() {
continue;
}
let service_ok = self
.service_detector
.as_ref()
.unwrap()
.detect(broker_addr.unwrap().as_str(), self.detect_timeout as u64);
if service_ok
&& fault_item
.reachable_flag
.load(std::sync::atomic::Ordering::Acquire)
{
info!("{} is reachable now, then it can be used.", name);
fault_item
.reachable_flag
.store(true, std::sync::atomic::Ordering::Release);
}
}
for name in remove_set {
fault_item_table.remove(&name);
}
}

fn set_detect_timeout(&mut self, detect_timeout: u32) {
todo!()
self.detect_timeout = detect_timeout;
}

fn set_detect_interval(&mut self, detect_interval: u32) {
todo!()
self.detect_interval = detect_interval;
}

fn set_start_detector_enable(&mut self, start_detector_enable: bool) {
Expand All @@ -113,22 +197,32 @@ impl LatencyFaultTolerance<String> for LatencyFaultToleranceImpl {
}

fn is_start_detector_enable(&self) -> bool {
todo!()
self.start_detector_enable
.load(std::sync::atomic::Ordering::Acquire)
}

fn set_resolver(&mut self, resolver: Box<dyn Resolver>) {
fn set_resolver(&mut self, resolver: R) {
self.resolver = Some(resolver);
}

fn set_service_detector(&mut self, service_detector: Box<dyn ServiceDetector>) {
fn set_service_detector(&mut self, service_detector: S) {
self.service_detector = Some(service_detector);
}

fn as_any(&self) -> &dyn Any {
self
}

fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}

use std::cmp::Ordering;
use std::hash::Hash;
use std::sync::atomic::AtomicBool;

use rocketmq_common::ArcRefCellWrapper;
use rocketmq_common::TimeUtils::get_current_millis;
use tracing::info;

Expand Down Expand Up @@ -166,7 +260,7 @@ impl FaultItem {
now + not_available_duration,
std::sync::atomic::Ordering::Relaxed,
);
println!(
info!(
"{} will be isolated for {} ms.",
self.name, not_available_duration
);
Expand Down
Loading

0 comments on commit 2a9a342

Please sign in to comment.