Skip to content

Commit

Permalink
Feature: Raft::enable_tick() to enable or disable election timeout
Browse files Browse the repository at this point in the history
drmingdrmer committed Jul 31, 2022

Unverified

This user has not yet uploaded their public signing key.
1 parent f62315b commit 86eb298
Showing 4 changed files with 75 additions and 32 deletions.
1 change: 1 addition & 0 deletions openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
@@ -20,4 +20,5 @@ pub use server_state::ServerState;
pub(crate) use snapshot_state::SnapshotState;
pub(crate) use snapshot_state::SnapshotUpdate;
pub(crate) use tick::Tick;
pub(crate) use tick::TickHandle;
pub(crate) use tick::VoteWiseTime;
76 changes: 51 additions & 25 deletions openraft/src/core/tick.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
//! tick emitter emits a `RaftMsg::Tick` event at a certain interval.
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio::time::sleep_until;
use tokio::time::Instant;
use tracing::Level;
use tracing::Span;
use tracing_futures::Instrument;

use crate::raft::RaftMsg;
use crate::NodeId;
@@ -44,6 +43,7 @@ impl<NID: NodeId> VoteWiseTime<NID> {
}
}

/// Emit RaftMsg::Tick event at regular `interval`.
pub(crate) struct Tick<C, N, S>
where
C: RaftTypeConfig,
@@ -53,6 +53,13 @@ where
interval: Duration,

tx: mpsc::UnboundedSender<RaftMsg<C, N, S>>,

/// Emit event or not
running: Arc<AtomicBool>,
}

pub(crate) struct TickHandle {
running: Arc<AtomicBool>,
}

impl<C, N, S> Tick<C, N, S>
@@ -61,27 +68,46 @@ where
N: RaftNetworkFactory<C>,
S: RaftStorage<C>,
{
pub(crate) fn spawn(interval: Duration, tx: mpsc::UnboundedSender<RaftMsg<C, N, S>>) -> JoinHandle<()> {
let t = Tick { interval, tx };

tokio::spawn(
async move {
let mut i = 0;
loop {
i += 1;

let at = Instant::now() + t.interval;
sleep_until(at).await;

let send_res = t.tx.send(RaftMsg::Tick { i });
if let Err(_e) = send_res {
tracing::info!("Tick fails to send, receiving end quit.");
} else {
tracing::debug!("Tick sent: {}", i)
}
}
pub(crate) fn new(interval: Duration, tx: mpsc::UnboundedSender<RaftMsg<C, N, S>>, enabled: bool) -> Self {
Tick {
interval,
running: Arc::new(AtomicBool::from(enabled)),
tx,
}
}

pub(crate) async fn tick_loop(self) {
let mut i = 0;
loop {
i += 1;

let at = Instant::now() + self.interval;
sleep_until(at).await;

if !self.running.load(Ordering::Relaxed) {
i -= 1;
continue;
}
.instrument(tracing::span!(parent: &Span::current(), Level::DEBUG, "tick")),
)

let send_res = self.tx.send(RaftMsg::Tick { i });
if let Err(_e) = send_res {
tracing::info!("Tick fails to send, receiving end quit.");
} else {
tracing::debug!("Tick sent: {}", i)
}
}
}

/// Return a handle to control the ticker.
pub(crate) fn get_handle(&self) -> TickHandle {
TickHandle {
running: self.running.clone(),
}
}
}

impl TickHandle {
pub(crate) fn enable(&self, enabled: bool) {
self.running.store(enabled, Ordering::Relaxed);
}
}
23 changes: 22 additions & 1 deletion openraft/src/raft.rs
Original file line number Diff line number Diff line change
@@ -12,14 +12,17 @@ use tokio::sync::watch;
use tokio::sync::Mutex;
use tokio::task::JoinError;
use tokio::task::JoinHandle;
use tracing::Instrument;
use tracing::Level;
use tracing::Span;

use crate::config::Config;
use crate::core::replication_lag;
use crate::core::Expectation;
use crate::core::RaftCore;
use crate::core::SnapshotUpdate;
use crate::core::Tick;
use crate::core::TickHandle;
use crate::error::AddLearnerError;
use crate::error::AppendEntriesError;
use crate::error::CheckIsLeaderError;
@@ -124,6 +127,7 @@ enum CoreState<NID: NodeId> {
struct RaftInner<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> {
id: C::NodeId,
config: Arc<Config>,
tick_handle: TickHandle,
tx_api: mpsc::UnboundedSender<RaftMsg<C, N, S>>,
rx_metrics: watch::Receiver<RaftMetrics<C::NodeId>>,
// TODO(xp): it does not need to be a async mutex.
@@ -184,7 +188,15 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
let (tx_metrics, rx_metrics) = watch::channel(RaftMetrics::new_initial(id));
let (tx_shutdown, rx_shutdown) = oneshot::channel();

let _tick_handle = Tick::spawn(Duration::from_millis(config.heartbeat_interval * 3 / 2), tx_api.clone());
let tick = Tick::new(
Duration::from_millis(config.heartbeat_interval * 3 / 2),
tx_api.clone(),
true,
);

let tick_handle = tick.get_handle();
let _tick_join_handle =
tokio::spawn(tick.tick_loop().instrument(tracing::span!(parent: &Span::current(), Level::DEBUG, "tick")));

let core_handle = RaftCore::spawn(
id,
@@ -200,6 +212,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
let inner = RaftInner {
id,
config,
tick_handle,
tx_api,
rx_metrics,
tx_shutdown: Mutex::new(Some(tx_shutdown)),
@@ -210,6 +223,14 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
Self { inner: Arc::new(inner) }
}

/// Enable or disable raft internal ticker.
///
/// The internal ticker triggers all timeout based event, e.g. election event or heartbeat event.
/// By disabling the ticker, a follower will not enter candidate again, a leader will not send heartbeat.
pub fn enable_tick(&self, enabled: bool) {
self.inner.tick_handle.enable(enabled);
}

/// Submit an AppendEntries RPC to this Raft node.
///
/// These RPCs are sent by the cluster leader to replicate log entries (§5.3), and are also
7 changes: 1 addition & 6 deletions openraft/tests/membership/t30_step_down.rs
Original file line number Diff line number Diff line change
@@ -28,16 +28,11 @@ async fn step_down() -> Result<()> {
);
let mut router = RaftRouter::new(config.clone());

let mut log_index = router.new_nodes_from_single(btreeset! {0,1}, btreeset! {}).await?;
let mut log_index = router.new_nodes_from_single(btreeset! {0,1}, btreeset! {2,3}).await?;

// Submit a config change which adds two new nodes and removes the current leader.
let orig_leader = router.leader().expect("expected the cluster to have a leader");
assert_eq!(0, orig_leader, "expected original leader to be node 0");
router.new_raft_node(2);
router.new_raft_node(3);
router.add_learner(0, 2).await?;
router.add_learner(0, 3).await?;
log_index += 2;
router.wait_for_log(&btreeset![0, 1], Some(log_index), timeout(), "add learner").await?;

let node = router.get_raft_handle(&orig_leader)?;

0 comments on commit 86eb298

Please sign in to comment.