Skip to content

Commit

Permalink
Feature: add Raft::get_snapshot() to get the last snapshot from state…
Browse files Browse the repository at this point in the history
… machine
  • Loading branch information
drmingdrmer committed Feb 10, 2024
1 parent 343b27e commit c1cf8a8
Show file tree
Hide file tree
Showing 11 changed files with 122 additions and 25 deletions.
7 changes: 7 additions & 0 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1183,6 +1183,13 @@ where
self.send_heartbeat("ExternalCommand");
}
ExternalCommand::Snapshot => self.trigger_snapshot(),
ExternalCommand::GetSnapshot { tx } => {
let cmd = sm::Command::get_snapshot(tx);
let res = self.sm_handle.send(cmd);
if let Err(e) = res {
tracing::error!(error = display(e), "error sending GetSnapshot to sm worker");
}
}
ExternalCommand::PurgeLog { upto } => {
self.engine.trigger_purge_log(upto);
}
Expand Down
31 changes: 25 additions & 6 deletions openraft/src/core/raft_msg/external_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
use std::fmt;

use crate::core::raft_msg::ResultSender;
use crate::RaftTypeConfig;
use crate::Snapshot;

/// Application-triggered Raft actions for testing and administration.
///
/// Typically, openraft handles actions automatically.
///
/// An application can also disable these policy-based triggering and use these commands manually,
/// for testing or administrative purpose.
#[derive(Debug, Clone)]
pub(crate) enum ExternalCommand {
pub(crate) enum ExternalCommand<C: RaftTypeConfig> {
/// Initiate an election at once.
Elect,

Expand All @@ -19,6 +22,9 @@ pub(crate) enum ExternalCommand {
/// Initiate to build a snapshot on this node.
Snapshot,

/// Get a snapshot from the state machine, send back via a oneshot::Sender.
GetSnapshot { tx: ResultSender<Option<Snapshot<C>>> },

/// Purge logs covered by a snapshot up to a specified index.
///
/// Openraft respects the [`max_in_snapshot_log_to_keep`] config when purging.
Expand All @@ -27,17 +33,30 @@ pub(crate) enum ExternalCommand {
PurgeLog { upto: u64 },
}

impl fmt::Display for ExternalCommand {
impl<C> fmt::Debug for ExternalCommand<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(self, f)
}
}

impl<C> fmt::Display for ExternalCommand<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ExternalCommand::Elect => {
write!(f, "{:?}", self)
write!(f, "Elect")
}
ExternalCommand::Heartbeat => {
write!(f, "{:?}", self)
write!(f, "Heartbeat")
}
ExternalCommand::Snapshot => {
write!(f, "{:?}", self)
write!(f, "Snapshot")
}
ExternalCommand::GetSnapshot { .. } => {
write!(f, "GetSnapshot")
}
ExternalCommand::PurgeLog { upto } => {
write!(f, "PurgeLog[..={}]", upto)
Expand Down
10 changes: 6 additions & 4 deletions openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,18 @@ use crate::RaftTypeConfig;
pub(crate) mod external_command;

/// A oneshot TX to send result from `RaftCore` to external caller, e.g. `Raft::append_entries`.
pub(crate) type ResultSender<T, E> = oneshot::Sender<Result<T, E>>;
pub(crate) type ResultSender<T, E = Infallible> = oneshot::Sender<Result<T, E>>;

pub(crate) type ResultReceiver<T, E = Infallible> = oneshot::Receiver<Result<T, E>>;

/// TX for Install Snapshot Response
pub(crate) type InstallSnapshotTx<NID> = ResultSender<InstallSnapshotResponse<NID>, InstallSnapshotError>;

/// TX for Vote Response
pub(crate) type VoteTx<NID> = ResultSender<VoteResponse<NID>, Infallible>;
pub(crate) type VoteTx<NID> = ResultSender<VoteResponse<NID>>;

/// TX for Append Entries Response
pub(crate) type AppendEntriesTx<NID> = ResultSender<AppendEntriesResponse<NID>, Infallible>;
pub(crate) type AppendEntriesTx<NID> = ResultSender<AppendEntriesResponse<NID>>;

/// TX for Client Write Response
pub(crate) type ClientWriteTx<C> = ResultSender<ClientWriteResponse<C>, ClientWriteError<NodeIdOf<C>, NodeOf<C>>>;
Expand Down Expand Up @@ -94,7 +96,7 @@ where C: RaftTypeConfig
},

ExternalCommand {
cmd: ExternalCommand,
cmd: ExternalCommand<C>,
},
}

Expand Down
7 changes: 3 additions & 4 deletions openraft/src/core/sm/command.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::fmt::Debug;
use std::fmt::Formatter;

use tokio::sync::oneshot;

use crate::core::raft_msg::ResultSender;
use crate::display_ext::DisplaySlice;
use crate::log_id::RaftLogId;
use crate::raft::InstallSnapshotRequest;
Expand Down Expand Up @@ -56,7 +55,7 @@ where C: RaftTypeConfig
Command::new(payload)
}

pub(crate) fn get_snapshot(tx: oneshot::Sender<Option<Snapshot<C>>>) -> Self {
pub(crate) fn get_snapshot(tx: ResultSender<Option<Snapshot<C>>>) -> Self {
let payload = CommandPayload::GetSnapshot { tx };
Command::new(payload)
}
Expand Down Expand Up @@ -104,7 +103,7 @@ where C: RaftTypeConfig
BuildSnapshot,

/// Get the latest built snapshot.
GetSnapshot { tx: oneshot::Sender<Option<Snapshot<C>>> },
GetSnapshot { tx: ResultSender<Option<Snapshot<C>>> },

/// Receive a chunk of snapshot.
///
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/core/sm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc;
use tokio::sync::oneshot;

use crate::core::snapshot_state::SnapshotRequestId;
use crate::core::streaming_state::Streaming;
Expand Down Expand Up @@ -35,6 +34,7 @@ pub(crate) use response::CommandResult;
pub(crate) use response::Response;

use crate::core::notify::Notify;
use crate::core::raft_msg::ResultSender;

/// State machine worker handle for sending command to it.
pub(crate) struct Handle<C>
Expand Down Expand Up @@ -242,7 +242,7 @@ where
}

#[tracing::instrument(level = "info", skip_all)]
async fn get_snapshot(&mut self, tx: oneshot::Sender<Option<Snapshot<C>>>) -> Result<(), StorageError<C::NodeId>> {
async fn get_snapshot(&mut self, tx: ResultSender<Option<Snapshot<C>>>) -> Result<(), StorageError<C::NodeId>> {
tracing::info!("{}", func_name!());

let snapshot = self.state_machine.get_current_snapshot().await?;
Expand All @@ -251,7 +251,7 @@ where
"sending back snapshot: meta: {:?}",
snapshot.as_ref().map(|s| s.meta.summary())
);
let _ = tx.send(snapshot);
let _ = tx.send(Ok(snapshot));
Ok(())
}

Expand Down
15 changes: 15 additions & 0 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use tracing::Level;
use crate::config::Config;
use crate::config::RuntimeConfig;
use crate::core::command_state::CommandState;
use crate::core::raft_msg::external_command::ExternalCommand;
use crate::core::raft_msg::RaftMsg;
use crate::core::replication_lag;
use crate::core::sm;
Expand Down Expand Up @@ -69,6 +70,7 @@ use crate::LogIdOptionExt;
use crate::MessageSummary;
use crate::RaftState;
pub use crate::RaftTypeConfig;
use crate::Snapshot;
use crate::StorageHelper;

/// Define types for a Raft type configuration.
Expand Down Expand Up @@ -346,6 +348,19 @@ where C: RaftTypeConfig
self.call_core(RaftMsg::RequestVote { rpc, tx }, rx).await
}

/// Get the latest snapshot from the state machine.
///
/// It returns error only when `RaftCore` fails to serve the request, e.g., Encountering a
/// storage error or shutting down.
#[tracing::instrument(level = "debug", skip_all)]
pub async fn get_snapshot(&self) -> Result<Option<Snapshot<C>>, RaftError<C::NodeId>> {
tracing::debug!("Raft::get_snapshot()");

let (tx, rx) = oneshot::channel();
let cmd = ExternalCommand::GetSnapshot { tx };
self.call_core(RaftMsg::ExternalCommand { cmd }, rx).await
}

/// Submit an InstallSnapshot RPC to this Raft node.
///
/// These RPCs are sent by the cluster leader in order to bring a new node or a slow node
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/raft/raft_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ where C: RaftTypeConfig
/// It returns at once.
pub(in crate::raft) async fn send_external_command(
&self,
cmd: ExternalCommand,
cmd: ExternalCommand<C>,
cmd_desc: impl fmt::Display + Default,
) -> Result<(), Fatal<C::NodeId>> {
let send_res = self.tx_api.send(RaftMsg::ExternalCommand { cmd });
Expand Down
7 changes: 5 additions & 2 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeekExt;
use tokio::select;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tracing_futures::Instrument;

use crate::config::Config;
use crate::core::notify::Notify;
use crate::core::raft_msg::ResultReceiver;
use crate::display_ext::DisplayOption;
use crate::display_ext::DisplayOptionExt;
use crate::error::HigherVote;
Expand Down Expand Up @@ -684,7 +684,7 @@ where
#[tracing::instrument(level = "info", skip_all)]
async fn stream_snapshot(
&mut self,
snapshot_rx: DataWithId<oneshot::Receiver<Option<Snapshot<C>>>>,
snapshot_rx: DataWithId<ResultReceiver<Option<Snapshot<C>>>>,
) -> Result<Option<Data<C>>, ReplicationError<C::NodeId, C::Node>> {
let request_id = snapshot_rx.request_id();
let rx = snapshot_rx.into_data();
Expand All @@ -696,6 +696,9 @@ where
StorageError::IO { source: io_err }
})?;

// Safe unwrap(): the error is Infallible, so it is safe to unwrap.
let snapshot = snapshot.unwrap();

tracing::info!(
"received snapshot: request_id={}; meta:{}",
request_id.display(),
Expand Down
9 changes: 4 additions & 5 deletions openraft/src/replication/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ where C: RaftTypeConfig
Self::Data(Data::new_logs(id, log_id_range))
}

pub(crate) fn snapshot(id: Option<u64>, snapshot_rx: oneshot::Receiver<Option<Snapshot<C>>>) -> Self {
pub(crate) fn snapshot(id: Option<u64>, snapshot_rx: ResultReceiver<Option<Snapshot<C>>>) -> Self {
Self::Data(Data::new_snapshot(id, snapshot_rx))
}
}
Expand All @@ -42,8 +42,7 @@ where C: RaftTypeConfig
}
}

use tokio::sync::oneshot;

use crate::core::raft_msg::ResultReceiver;
use crate::display_ext::DisplayOptionExt;
use crate::log_id_range::LogIdRange;
use crate::LogId;
Expand All @@ -60,7 +59,7 @@ pub(crate) enum Data<C>
where C: RaftTypeConfig
{
Logs(DataWithId<LogIdRange<C::NodeId>>),
Snapshot(DataWithId<oneshot::Receiver<Option<Snapshot<C>>>>),
Snapshot(DataWithId<ResultReceiver<Option<Snapshot<C>>>>),
}

impl<C> fmt::Debug for Data<C>
Expand Down Expand Up @@ -111,7 +110,7 @@ where C: RaftTypeConfig
Self::Logs(DataWithId::new(request_id, log_id_range))
}

pub(crate) fn new_snapshot(request_id: Option<u64>, snapshot_rx: oneshot::Receiver<Option<Snapshot<C>>>) -> Self {
pub(crate) fn new_snapshot(request_id: Option<u64>, snapshot_rx: ResultReceiver<Option<Snapshot<C>>>) -> Self {
Self::Snapshot(DataWithId::new(request_id, snapshot_rx))
}

Expand Down
1 change: 1 addition & 0 deletions tests/tests/client_api/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod fixtures;
mod t10_client_writes;
mod t11_client_reads;
mod t12_trigger_purge_log;
mod t13_get_snapshot;
mod t13_trigger_snapshot;
mod t16_with_raft_state;
mod t50_lagging_network_write;
Expand Down
52 changes: 52 additions & 0 deletions tests/tests/client_api/t13_get_snapshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use std::sync::Arc;
use std::time::Duration;

use maplit::btreeset;
use openraft::testing::log_id;
use openraft::Config;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;

/// Get snapshot with `Raft::get_snapshot()`
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn get_snapshot() -> anyhow::Result<()> {
let config = Arc::new(
Config {
enable_heartbeat: false,
..Default::default()
}
.validate()?,
);

let mut router = RaftRouter::new(config.clone());

tracing::info!("--- initializing cluster");
let log_index = router.new_cluster(btreeset! {0,1}, btreeset! {}).await?;

tracing::info!(log_index, "--- get None snapshot for node-1");
{
let n1 = router.get_raft_handle(&1)?;

let curr_snap = n1.get_snapshot().await?;
assert!(curr_snap.is_none());
}

tracing::info!(log_index, "--- trigger and get snapshot for node-1");
{
let n1 = router.get_raft_handle(&1)?;
n1.trigger().snapshot().await?;

router.wait(&1, timeout()).snapshot(log_id(1, 0, log_index), "node-1 snapshot").await?;

let curr_snap = n1.get_snapshot().await?;
let snap = curr_snap.unwrap();
assert_eq!(snap.meta.last_log_id, Some(log_id(1, 0, log_index)));
}

Ok(())
}

fn timeout() -> Option<Duration> {
Some(Duration::from_millis(1_000))
}

0 comments on commit c1cf8a8

Please sign in to comment.