Skip to content

Commit

Permalink
change: RaftCore: replace snapshot_index with `snapshot_last_includ…
Browse files Browse the repository at this point in the history
…ed: LogId`. Keep tracks of both snapshot last log term and index.

Also `SnapshotUpdate::SnapshotComplete` now contains an LogId instead of an u64 index.
drmingdrmer committed Jul 9, 2021

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 85859d0 commit 5eb9d3a
Showing 2 changed files with 12 additions and 11 deletions.
2 changes: 1 addition & 1 deletion async-raft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
@@ -185,7 +185,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
self.last_log_index = req.last_included.index;
self.last_log_term = req.last_included.term;
self.last_applied = req.last_included.index;
self.snapshot_index = req.last_included.index;
self.snapshot_last_included = req.last_included;
Ok(())
}
}
21 changes: 11 additions & 10 deletions async-raft/src/core/mod.rs
Original file line number Diff line number Diff line change
@@ -112,10 +112,11 @@ pub struct RaftCore<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftSt

/// The node's current snapshot state.
snapshot_state: Option<SnapshotState<S::Snapshot>>,
/// The index of the current snapshot, if a snapshot exists.

/// The log id upto which the current snapshot includes, inclusive, if a snapshot exists.
///
/// This is primarily used in making a determination on when a compaction job needs to be triggered.
snapshot_index: u64,
snapshot_last_included: LogId,

/// A cache of entries which are waiting to be replicated to the state machine.
///
@@ -176,7 +177,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
last_log_index: 0,
last_log_term: 0,
snapshot_state: None,
snapshot_index: 0,
snapshot_last_included: LogId { term: 0, index: 0 },
entries_cache: Default::default(),
replicate_to_sm_handle: FuturesOrdered::new(),
has_completed_initial_replication_to_sm: false,
@@ -211,7 +212,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
if let Some(snapshot) =
self.storage.get_current_snapshot().await.map_err(|err| self.map_fatal_storage_error(err))?
{
self.snapshot_index = snapshot.included.index;
self.snapshot_last_included = snapshot.included;
}

let has_log = self.last_log_index != u64::min_value();
@@ -407,8 +408,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
/// Update the system's snapshot state based on the given data.
#[tracing::instrument(level = "trace", skip(self))]
fn update_snapshot_state(&mut self, update: SnapshotUpdate) {
if let SnapshotUpdate::SnapshotComplete(index) = update {
self.snapshot_index = index
if let SnapshotUpdate::SnapshotComplete(log_id) = update {
self.snapshot_last_included = log_id
}
// If snapshot state is anything other than streaming, then drop it.
if let Some(state @ SnapshotState::Streaming { .. }) = self.snapshot_state.take() {
@@ -425,13 +426,13 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
}
let SnapshotPolicy::LogsSinceLast(threshold) = &self.config.snapshot_policy;
// Check to ensure we have actual entries for compaction.
if self.last_applied == 0 || self.last_applied < self.snapshot_index {
if self.last_applied == 0 || self.last_applied < self.snapshot_last_included.index {
return;
}

if !force {
// If we are below the threshold, then there is nothing to do.
if self.last_applied < self.snapshot_index + *threshold {
if self.last_applied < self.snapshot_last_included.index + *threshold {
return;
}
}
@@ -452,7 +453,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
match res {
Ok(res) => match res {
Ok(snapshot) => {
let _ = tx_compaction.try_send(SnapshotUpdate::SnapshotComplete(snapshot.included.index));
let _ = tx_compaction.try_send(SnapshotUpdate::SnapshotComplete(snapshot.included));
let _ = chan_tx.send(snapshot.included.index); // This will always succeed.
}
Err(err) => {
@@ -549,7 +550,7 @@ pub(self) enum SnapshotState<S> {
#[derive(Debug)]
pub(self) enum SnapshotUpdate {
/// Snapshot creation has finished successfully and covers the given index.
SnapshotComplete(u64),
SnapshotComplete(LogId),
/// Snapshot creation failed.
SnapshotFailed,
}

0 comments on commit 5eb9d3a

Please sign in to comment.