Skip to content

Commit

Permalink
Change the behavior of persisted index for snapshot (#410)
Browse files Browse the repository at this point in the history
Signed-off-by: gengliqi <[email protected]>
  • Loading branch information
gengliqi authored Dec 18, 2020
1 parent fc1ef2f commit a45c4a3
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 32 deletions.
2 changes: 2 additions & 0 deletions harness/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ impl Interface {
let snap = snapshot.clone();
self.raft_log.stable_snap();
let index = snap.get_metadata().index;
let term = snap.get_metadata().term;
self.mut_store().wl().apply_snapshot(snap).expect("");
self.on_persist_entries(index, term);
self.commit_apply(index);
}
let unstable = self.raft_log.unstable_entries().to_vec();
Expand Down
118 changes: 103 additions & 15 deletions harness/tests/integration_cases/test_raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -889,30 +889,58 @@ fn test_raw_node_entries_after_snapshot() {

let mut raw_node = new_raw_node(1, vec![1, 2], 10, 1, s.clone(), &l);

let snapshot = new_snapshot(10, 2, vec![1, 2]);
let mut entries = vec![];
for i in 2..20 {
entries.push(new_entry(2, i, Some("hello")));
}
let mut append_msg = new_message_with_entries(2, 1, MessageType::MsgAppend, entries.to_vec());
append_msg.set_term(2);
append_msg.set_index(1);
append_msg.set_log_term(1);
append_msg.set_commit(5);
raw_node.step(append_msg).unwrap();

let rd = raw_node.ready();
must_cmp_ready(
&rd,
&Some(soft_state(2, StateRole::Follower)),
&Some(hard_state(2, 5, 0)),
&entries,
&[],
&None,
true,
true,
);
s.wl().set_hardstate(rd.hs().unwrap().clone());
s.wl().append(rd.entries()).unwrap();
let light_rd = raw_node.advance(rd);
assert_eq!(light_rd.commit_index(), None);
assert_eq!(light_rd.committed_entries().as_slice(), &entries[..4]);
assert!(!light_rd.messages().is_empty());

let snapshot = new_snapshot(10, 3, vec![1, 2]);
let mut snapshot_msg = new_message(2, 1, MessageType::MsgSnapshot, 0);
snapshot_msg.set_term(2);
snapshot_msg.set_term(3);
snapshot_msg.set_snapshot(snapshot.clone());
raw_node.step(snapshot_msg).unwrap();

let entries = [
new_entry(2, 11, Some("hello")),
new_entry(2, 12, Some("hello")),
new_entry(2, 13, Some("hello")),
];
let mut entries = vec![];
for i in 11..14 {
entries.push(new_entry(3, i, Some("hello")));
}
let mut append_msg = new_message_with_entries(2, 1, MessageType::MsgAppend, entries.to_vec());
append_msg.set_term(2);
append_msg.set_term(3);
append_msg.set_index(10);
append_msg.set_log_term(2);
append_msg.set_log_term(3);
append_msg.set_commit(12);
raw_node.step(append_msg).unwrap();

let rd = raw_node.ready();
// If there is a snapshot, the committed entries should be empty.
must_cmp_ready(
&rd,
&Some(soft_state(2, StateRole::Follower)),
&Some(hard_state(2, 12, 0)),
&None,
&Some(hard_state(3, 12, 0)),
&entries,
&[],
&Some(snapshot),
Expand Down Expand Up @@ -1193,7 +1221,7 @@ fn test_async_ready_leader() {
}

/// Test if async ready process is expected when a follower receives
/// some append msg.
/// some append msg and snapshot.
#[test]
fn test_async_ready_follower() {
let l = default_logger();
Expand All @@ -1204,7 +1232,7 @@ fn test_async_ready_follower() {

let mut raw_node = new_raw_node(1, vec![1, 2], 10, 1, s.clone(), &l);
let mut first_index = 1;
let mut rd_number = 1;
let mut rd_number = 0;
for cnt in 0..3 {
for i in 0..10 {
let entries = [
Expand All @@ -1225,7 +1253,7 @@ fn test_async_ready_follower() {
raw_node.step(append_msg).unwrap();

let rd = raw_node.ready();
assert_eq!(rd.number(), rd_number + i);
assert_eq!(rd.number(), rd_number + i + 1);
assert_eq!(rd.hs(), Some(&hard_state(2, first_index + i * 3 + 3, 0)));
assert_eq!(rd.entries(), &entries);
assert_eq!(rd.committed_entries().as_slice(), &[]);
Expand All @@ -1236,7 +1264,7 @@ fn test_async_ready_follower() {
raw_node.advance_append_async(rd);
}
// Unpersisted Ready number in range [1, 10]
raw_node.on_persist_ready(rd_number + 3);
raw_node.on_persist_ready(rd_number + 4);
let mut rd = raw_node.ready();
assert_eq!(rd.hs(), None);
assert_eq!(
Expand Down Expand Up @@ -1278,6 +1306,66 @@ fn test_async_ready_follower() {
first_index += 10 * 3;
rd_number += 11;
}

let snapshot = new_snapshot(first_index + 5, 2, vec![1, 2]);
let mut snapshot_msg = new_message(2, 1, MessageType::MsgSnapshot, 0);
snapshot_msg.set_term(2);
snapshot_msg.set_snapshot(snapshot.clone());
raw_node.step(snapshot_msg).unwrap();

let rd = raw_node.ready();
assert_eq!(rd.number(), rd_number + 1);
must_cmp_ready(
&rd,
&None,
&Some(hard_state(2, first_index + 5, 0)),
&[],
&[],
&Some(snapshot.clone()),
true,
true,
);

s.wl().set_hardstate(rd.hs().unwrap().clone());
s.wl().apply_snapshot(snapshot).unwrap();
s.wl().append(rd.entries()).unwrap();
raw_node.advance_append_async(rd);

let mut entries = vec![];
for i in 1..10 {
entries.push(new_entry(2, first_index + 5 + i, Some("hello")));
}
let mut append_msg = new_message_with_entries(2, 1, MessageType::MsgAppend, entries.to_vec());
append_msg.set_term(2);
append_msg.set_index(first_index + 5);
append_msg.set_log_term(2);
append_msg.set_commit(first_index + 5 + 3);
raw_node.step(append_msg).unwrap();

let rd = raw_node.ready();
assert_eq!(rd.number(), rd_number + 2);
must_cmp_ready(
&rd,
&None,
&Some(hard_state(2, first_index + 5 + 3, 0)),
&entries,
&[],
&None,
true,
true,
);
s.wl().set_hardstate(rd.hs().unwrap().clone());
s.wl().append(rd.entries()).unwrap();
raw_node.advance_append_async(rd);

raw_node.on_persist_ready(rd_number + 1);
assert_eq!(raw_node.raft.raft_log.persisted, first_index + 5);
raw_node.advance_apply_to(first_index + 5);

raw_node.on_persist_ready(rd_number + 2);

let rd = raw_node.ready();
must_cmp_ready(&rd, &None, &None, &[], &entries[..3], &None, false, false);
}

/// Test if a new leader immediately sends all messages recorded before without
Expand Down
20 changes: 16 additions & 4 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -971,14 +971,26 @@ impl<T: Storage> Raft<T> {
true
}

/// Notifies that these raft logs have been well persisted.
/// Notifies that these raft logs or snapshot have been persisted.
pub fn on_persist_entries(&mut self, index: u64, term: u64) {
let update = self.raft_log.maybe_persist(index, term);
if update && self.state == StateRole::Leader {
// Actually, if it is a leader and persisted index is updated, this term
// must be equal to self.term because the persisted index must be equal to
// the last index of entries from previous leader when it becomes leader
// (see the comments in become_leader), namely, the new persisted entries
// must come from this leader.
if term != self.term {
error!(
self.logger,
"leader's persisted index changed but the term {} is not the same as {}",
term,
self.term
);
}
let self_id = self.id;
let pr = self.mut_prs().get_mut(self_id).unwrap();
pr.maybe_update(index);
if self.maybe_commit() && self.should_bcast_commit() {
if pr.maybe_update(index) && self.maybe_commit() && self.should_bcast_commit() {
self.bcast_append();
}
}
Expand Down Expand Up @@ -1125,7 +1137,7 @@ impl<T: Storage> Raft<T> {
self.state = StateRole::Leader;

let last_index = self.raft_log.last_index();
// If there is only one peer, it becomes leader after starting
// If there is only one peer, it becomes leader after campaigning
// so all logs must be persisted.
// If not, it becomes leader after sending RequestVote msg.
// Since all logs must be persisted before sending RequestVote
Expand Down
22 changes: 13 additions & 9 deletions src/raft_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,11 +537,16 @@ impl<T: Storage> RaftLog<T> {
);
let index = snapshot.get_metadata().index;
assert!(index >= self.committed, "{} < {}", index, self.committed);
// If `persisted` is greater than `committed`, reset it to `committed`.
// It's because only the persisted entries whose index are less than `commited` can be
// considered the same as the data from snapshot.
// Although there may be some persisted entries with greater index are also committed,
// we can not judge them nor do we care about them because these entries can not be applied
// thus the invariant which is `applied` <= min(`persisted`, `committed`) is satisfied.
if self.persisted > self.committed {
self.persisted = self.committed;
}
self.committed = index;
// persisted is just used for fetching committed entries.
// Here reset the persisted to index to satisfy its invariant which is
// persisted < unstable.offset and applied <= persisted.
self.persisted = index;
self.unstable.restore(snapshot);
}
}
Expand Down Expand Up @@ -765,7 +770,7 @@ mod test {
let mut raft_log = RaftLog::new(store, default_logger());
raft_log.restore(new_snapshot(unstablesnapi, 1));
assert_eq!(raft_log.committed, unstablesnapi);
assert_eq!(raft_log.persisted, unstablesnapi);
assert_eq!(raft_log.persisted, storagesnapi);

let tests = vec![
// cannot get term from storage
Expand Down Expand Up @@ -1521,7 +1526,7 @@ mod test {
assert_eq!(raft_log.persisted, 100);
raft_log.restore(new_snapshot(200, 1));
assert_eq!(raft_log.committed, 200);
assert_eq!(raft_log.persisted, 200);
assert_eq!(raft_log.persisted, 100);

for i in 201..210 {
raft_log.append(&[new_entry(i, 1)]);
Expand All @@ -1536,13 +1541,12 @@ mod test {
raft_log.stable_entries();
raft_log.mut_store().wl().append(&unstable).expect("");
raft_log.maybe_persist(209, 1);

assert_eq!(raft_log.persisted, 209);

raft_log.restore(new_snapshot(205, 1));
assert_eq!(raft_log.committed, 205);
// persisted should backward to 205
assert_eq!(raft_log.persisted, 205);
// persisted should reset to previous commit index(200)
assert_eq!(raft_log.persisted, 200);

// use smaller commit index, should panic
assert!(
Expand Down
17 changes: 13 additions & 4 deletions src/raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ struct ReadyRecord {
number: u64,
// (index, term) of the last entry from the entries in Ready
last_entry: Option<(u64, u64)>,
has_snapshot: bool,
// (index, term) of the snapshot in Ready
snapshot: Option<(u64, u64)>,
messages: Vec<Message>,
}

Expand Down Expand Up @@ -436,7 +437,7 @@ impl<T: Storage> RawNode<T> {
// avoid out of order.
for record in self.records.drain(..) {
assert_eq!(record.last_entry, None);
assert!(!record.has_snapshot);
assert_eq!(record.snapshot, None);
if !record.messages.is_empty() {
self.messages.push(record.messages);
}
Expand Down Expand Up @@ -472,7 +473,10 @@ impl<T: Storage> RawNode<T> {
"has snapshot but also has committed entries since {}",
self.commit_since_index
);
rd_record.has_snapshot = true;
rd_record.snapshot = Some((
rd.snapshot.get_metadata().index,
rd.snapshot.get_metadata().term,
));
rd.must_sync = true;
}

Expand Down Expand Up @@ -538,7 +542,7 @@ impl<T: Storage> RawNode<T> {
let rd_record = self.records.back().unwrap();
assert!(rd_record.number == rd.number);
let raft = &mut self.raft;
if rd_record.has_snapshot {
if rd_record.snapshot.is_some() {
raft.raft_log.stable_snap();
}
if rd_record.last_entry.is_some() {
Expand All @@ -565,6 +569,11 @@ impl<T: Storage> RawNode<T> {
}
let mut record = self.records.pop_front().unwrap();

if let Some((i, t)) = record.snapshot {
index = i;
term = t;
}

if let Some((i, t)) = record.last_entry {
index = i;
term = t;
Expand Down

0 comments on commit a45c4a3

Please sign in to comment.