Skip to content

Commit

Permalink
update persisted
Browse files Browse the repository at this point in the history
Signed-off-by: gengliqi <[email protected]>
  • Loading branch information
gengliqi committed Dec 17, 2020
1 parent 3139b2d commit 9324e81
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 13 deletions.
50 changes: 39 additions & 11 deletions harness/tests/integration_cases/test_raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -889,30 +889,58 @@ fn test_raw_node_entries_after_snapshot() {

let mut raw_node = new_raw_node(1, vec![1, 2], 10, 1, s.clone(), &l);

let snapshot = new_snapshot(10, 2, vec![1, 2]);
let mut entries = vec![];
for i in 2..20 {
entries.push(new_entry(2, i, Some("hello")));
}
let mut append_msg = new_message_with_entries(2, 1, MessageType::MsgAppend, entries.to_vec());
append_msg.set_term(2);
append_msg.set_index(1);
append_msg.set_log_term(1);
append_msg.set_commit(5);
raw_node.step(append_msg).unwrap();

let rd = raw_node.ready();
must_cmp_ready(
&rd,
&Some(soft_state(2, StateRole::Follower)),
&Some(hard_state(2, 5, 0)),
&entries,
&[],
&None,
true,
true,
);
s.wl().set_hardstate(rd.hs().unwrap().clone());
s.wl().append(rd.entries()).unwrap();
let light_rd = raw_node.advance(rd);
assert_eq!(light_rd.commit_index(), None);
assert_eq!(light_rd.committed_entries().as_slice(), &entries[..4]);
assert!(!light_rd.messages().is_empty());

let snapshot = new_snapshot(10, 3, vec![1, 2]);
let mut snapshot_msg = new_message(2, 1, MessageType::MsgSnapshot, 0);
snapshot_msg.set_term(2);
snapshot_msg.set_term(3);
snapshot_msg.set_snapshot(snapshot.clone());
raw_node.step(snapshot_msg).unwrap();

let entries = [
new_entry(2, 11, Some("hello")),
new_entry(2, 12, Some("hello")),
new_entry(2, 13, Some("hello")),
];
let mut entries = vec![];
for i in 11..14 {
entries.push(new_entry(3, i, Some("hello")));
}
let mut append_msg = new_message_with_entries(2, 1, MessageType::MsgAppend, entries.to_vec());
append_msg.set_term(2);
append_msg.set_term(3);
append_msg.set_index(10);
append_msg.set_log_term(2);
append_msg.set_log_term(3);
append_msg.set_commit(12);
raw_node.step(append_msg).unwrap();

let rd = raw_node.ready();
// If there is a snapshot, the committed entries should be empty.
must_cmp_ready(
&rd,
&Some(soft_state(2, StateRole::Follower)),
&Some(hard_state(2, 12, 0)),
&None,
&Some(hard_state(3, 12, 0)),
&entries,
&[],
&Some(snapshot),
Expand Down
9 changes: 7 additions & 2 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,12 @@ impl<T: Storage> Raft<T> {
/// Notifies that these raft logs or snapshot have been persisted.
pub fn on_persist_entries(&mut self, index: u64, term: u64) {
let update = self.raft_log.maybe_persist(index, term);
if update && self.state == StateRole::Leader {
// Actually, if it is a leader and persisted index is updated, this term
// must be equal to self.term because the persisted index must be equal to
// the last index of entries from previous leader when it becomes leader
// (see the comments in become_leader), namely, the new persisted entries
// must come from this leader. Here checking the term just for robustness.
if update && self.state == StateRole::Leader && term == self.term {
let self_id = self.id;
let pr = self.mut_prs().get_mut(self_id).unwrap();
pr.maybe_update(index);
Expand Down Expand Up @@ -1125,7 +1130,7 @@ impl<T: Storage> Raft<T> {
self.state = StateRole::Leader;

let last_index = self.raft_log.last_index();
// If there is only one peer, it becomes leader after starting
// If there is only one peer, it becomes leader after campaigning
// so all logs must be persisted.
// If not, it becomes leader after sending RequestVote msg.
// Since all logs must be persisted before sending RequestVote
Expand Down
12 changes: 12 additions & 0 deletions src/raft_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,15 @@ impl<T: Storage> RaftLog<T> {
);
let index = snapshot.get_metadata().index;
assert!(index >= self.committed, "{} < {}", index, self.committed);
// If `persisted` is greater than `committed`, reset it to `committed`.
// It's because only the persisted entries whose index are less than `commited` can be
// considered the same as the data from snapshot.
// Although there may be some persisted entries with greater index are also committed,
// we can not judge them nor do we care about them because these entries can not be applied
// thus the invariant which is `applied` <= min(`persisted`, `committed`) is satisfied.
if self.persisted > self.committed {
self.persisted = self.committed;
}
self.committed = index;
self.unstable.restore(snapshot);
}
Expand Down Expand Up @@ -761,6 +770,7 @@ mod test {
let mut raft_log = RaftLog::new(store, default_logger());
raft_log.restore(new_snapshot(unstablesnapi, 1));
assert_eq!(raft_log.committed, unstablesnapi);
assert_eq!(raft_log.persisted, storagesnapi);

let tests = vec![
// cannot get term from storage
Expand Down Expand Up @@ -1535,6 +1545,8 @@ mod test {

raft_log.restore(new_snapshot(205, 1));
assert_eq!(raft_log.committed, 205);
// persisted should reset to previous commit index(200)
assert_eq!(raft_log.persisted, 200);

// use smaller commit index, should panic
assert!(
Expand Down

0 comments on commit 9324e81

Please sign in to comment.