Skip to content

Commit

Permalink
add commit index to PersistLastReadyResult
Browse files Browse the repository at this point in the history
Signed-off-by: gengliqi <[email protected]>
  • Loading branch information
gengliqi committed Oct 26, 2020
1 parent 9da4829 commit d72d121
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 20 deletions.
5 changes: 4 additions & 1 deletion harness/tests/integration_cases/test_raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -614,20 +614,23 @@ fn test_raw_node_start() {
);
store.wl().append(rd.entries()).expect("");
let res = raw_node.advance(rd);
assert_eq!(res.commit_index(), Some(2));
assert_eq!(*res.committed_entries(), vec![new_entry(2, 2, None)]);
assert!(!raw_node.has_ready());

raw_node.propose(vec![], b"somedata".to_vec()).expect("");
let rd = raw_node.ready();
must_cmp_ready(
&rd,
&None,
&Some(hard_state(2, 2, 1)),
&None,
&[new_entry(2, 3, SOME_DATA)],
vec![],
true,
);
store.wl().append(rd.entries()).expect("");
let res = raw_node.advance(rd);
assert_eq!(res.commit_index(), Some(3));
assert_eq!(*res.committed_entries(), vec![new_entry(2, 3, SOME_DATA)]);

assert!(!raw_node.has_ready());
Expand Down
57 changes: 38 additions & 19 deletions src/raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,15 @@ impl Ready {
}

/// The current volatile state of a Node.
/// SoftState will be nil if there is no update.
/// SoftState will be None if there is no update.
/// It is not required to consume or store SoftState.
#[inline]
pub fn ss(&self) -> Option<&SoftState> {
self.ss.as_ref()
}

/// The current state of a Node to be saved to stable storage BEFORE
/// Messages are sent.
/// HardState will be equal to empty state if there is no update.
/// The current state of a Node to be saved to stable storage.
/// HardState will be None state if there is no update.
#[inline]
pub fn hs(&self) -> Option<&HardState> {
self.hs.as_ref()
Expand Down Expand Up @@ -193,7 +192,7 @@ impl Ready {
}
}

/// ReadyRecord encapsulates the needed data for sync reply
/// ReadyRecord encapsulates some needed data from the corresponding Ready.
#[derive(Default, Debug, PartialEq)]
struct ReadyRecord {
number: u64,
Expand All @@ -203,15 +202,24 @@ struct ReadyRecord {
messages: Vec<Message>,
}

/// PersistLastReadyResult encapsulates the committed entries and messages that are ready to
/// be applied or be sent to other peers.
/// PersistLastReadyResult encapsulates the commit index, committed entries and
/// messages that are ready to be applied or be sent to other peers.
#[derive(Default, Debug, PartialEq)]
pub struct PersistLastReadyResult {
commit_index: Option<u64>,
committed_entries: Vec<Entry>,
messages: Vec<Vec<Message>>,
}

impl PersistLastReadyResult {
/// The current commit index.
/// It will be None state if there is no update.
/// It is not required to save it to stable storage.
#[inline]
pub fn commit_index(&self) -> Option<u64> {
self.commit_index
}

/// CommittedEntries specifies entries to be committed to a
/// store/state-machine. These have previously been committed to stable
/// store.
Expand Down Expand Up @@ -472,7 +480,6 @@ impl<T: Storage> RawNode<T> {
}

/// HasReady called when RawNode user need to check if any Ready pending.
/// Checking logic in this method should be consistent with Ready.containsUpdates().
pub fn has_ready(&self) -> bool {
if self.pending_snapshot {
// If there is a pending snapshot, there is no ready.
Expand All @@ -482,9 +489,7 @@ impl<T: Storage> RawNode<T> {
if raft.soft_state() != self.prev_ss {
return true;
}
let mut hs = raft.hard_state();
// If just commit is changed, no need to get ready.
hs.commit = self.prev_hs.commit;
let hs = raft.hard_state();
if hs != self.prev_hs {
return true;
}
Expand Down Expand Up @@ -562,22 +567,36 @@ impl<T: Storage> RawNode<T> {

/// Notifies that the last ready has been well persisted.
///
/// Returns the PersistLastReadyResult that contains committed entries and messages.
/// Returns the PersistLastReadyResult that contains commit index, committed entries and messages.
///
/// This function must be called before entering the next round(e.g. propose, step).
/// Since Ready must be persisted in order, calling this function implicitly means
/// all readys collected before have been persisted.
pub fn on_persist_last_ready(&mut self) -> PersistLastReadyResult {
self.on_persist_ready(self.max_number);

let raft = &mut self.raft;
let mut res = PersistLastReadyResult {
committed_entries: raft
.raft_log
.next_entries_between(self.commit_since_index, self.last_persisted_index)
.unwrap_or_default(),
messages: vec![],
};

let mut res = PersistLastReadyResult::default();

let hard_state = raft.hard_state();
if hard_state.commit > self.prev_hs.commit {
res.commit_index = Some(hard_state.commit);
self.prev_hs.commit = hard_state.commit;
} else {
assert!(hard_state.commit == self.prev_hs.commit);
res.commit_index = None;
}
assert_eq!(
hard_state, self.prev_hs,
"hard state {:?} != prev_hs {:?}",
hard_state, self.prev_hs
);

res.committed_entries = raft
.raft_log
.next_entries_between(self.commit_since_index, self.last_persisted_index)
.unwrap_or_default();
if let Some(e) = res.committed_entries.last() {
self.commit_since_index = e.get_index();
}
Expand Down

0 comments on commit d72d121

Please sign in to comment.