From 886ad5cd963d2092f31ccc3c0beec967fcf70745 Mon Sep 17 00:00:00 2001 From: Jay Lee Date: Wed, 25 Mar 2020 19:16:04 +0800 Subject: [PATCH 1/3] *: allow index starts from 0 Signed-off-by: Jay Lee --- examples/five_mem_node/main.rs | 12 +- harness/tests/failpoints_cases/mod.rs | 2 +- harness/tests/integration_cases/test_raft.rs | 390 ++++++++++-------- .../test_raft_flow_control.rs | 29 +- .../integration_cases/test_raft_paper.rs | 167 ++++---- .../tests/integration_cases/test_raft_snap.rs | 6 +- .../tests/integration_cases/test_raw_node.rs | 50 +-- src/raft.rs | 4 - src/storage.rs | 22 +- 9 files changed, 349 insertions(+), 333 deletions(-) diff --git a/examples/five_mem_node/main.rs b/examples/five_mem_node/main.rs index 18c5ef3e7..5af0264c7 100644 --- a/examples/five_mem_node/main.rs +++ b/examples/five_mem_node/main.rs @@ -11,7 +11,6 @@ use std::time::{Duration, Instant}; use std::{str, thread}; use protobuf::Message as PbMessage; -use raft::eraftpb::ConfState; use raft::storage::MemStorage; use raft::{prelude::*, StateRole}; use regex::Regex; @@ -175,8 +174,15 @@ impl Node { let mut cfg = example_config(); cfg.id = id; let logger = logger.new(o!("tag" => format!("peer_{}", id))); - - let storage = MemStorage::new_with_conf_state(ConfState::from((vec![id], vec![]))); + let mut s = Snapshot::default(); + // Because we don't use the same configuration to initialize every node, so we use + // a non-zero index to force new followers catch up logs by snapshot first, which will + // bring all nodes to the same initial state. + s.mut_metadata().index = 1; + s.mut_metadata().term = 1; + s.mut_metadata().mut_conf_state().voters = vec![1]; + let storage = MemStorage::new(); + storage.wl().apply_snapshot(s).unwrap(); let raft_group = Some(RawNode::new(&cfg, storage, &logger).unwrap()); Node { raft_group, diff --git a/harness/tests/failpoints_cases/mod.rs b/harness/tests/failpoints_cases/mod.rs index 7908775ef..529862ee7 100644 --- a/harness/tests/failpoints_cases/mod.rs +++ b/harness/tests/failpoints_cases/mod.rs @@ -15,7 +15,7 @@ fn test_reject_stale_term_message() { let l = default_logger(); let mut r = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage(), &l); fail::cfg("before_step", "panic").unwrap(); - r.load_state(&hard_state(2, 1, 0)); + r.load_state(&hard_state(2, 0, 0)); let mut m = new_message(0, 0, MessageType::MsgAppend, 0); m.term = r.term - 1; diff --git a/harness/tests/integration_cases/test_raft.rs b/harness/tests/integration_cases/test_raft.rs index c61c6f8a9..8ee3edb43 100644 --- a/harness/tests/integration_cases/test_raft.rs +++ b/harness/tests/integration_cases/test_raft.rs @@ -58,8 +58,7 @@ fn ents_with_config( let store = MemStorage::new_with_conf_state((peers.clone(), vec![])); for (i, term) in terms.iter().enumerate() { let mut e = Entry::default(); - // An additional `plus one` for initialized storage. - e.index = i as u64 + 1 + 1; + e.index = i as u64 + 1; e.term = *term; store.wl().append(&[e]).expect(""); } @@ -301,8 +300,7 @@ fn test_progress_leader() { let matched = raft.mut_prs().get_mut(1).unwrap().matched; let next_idx = raft.mut_prs().get_mut(1).unwrap().next_idx; - // An additional `+ 1` because the raft is initialized with index = 1. - assert_eq!(matched, i + 1 + 1); + assert_eq!(matched, i + 1); assert_eq!(next_idx, matched + 1); assert!(raft.step(prop_msg.clone()).is_ok()); @@ -368,27 +366,27 @@ fn test_leader_election_with_config(pre_vote: bool, l: &Logger) { ( Network::new_with_config(vec![None, None, None], &config, l), StateRole::Leader, - 2, + 1, ), ( Network::new_with_config(vec![None, None, NOP_STEPPER], &config, l), StateRole::Leader, - 2, + 1, ), ( Network::new_with_config(vec![None, NOP_STEPPER, NOP_STEPPER], &config, l), StateRole::Candidate, - 2, + 1, ), ( Network::new_with_config(vec![None, NOP_STEPPER, NOP_STEPPER, None], &config, l), StateRole::Candidate, - 2, + 1, ), ( Network::new_with_config(vec![None, NOP_STEPPER, NOP_STEPPER, None, None], &config, l), StateRole::Leader, - 2, + 1, ), // three logs further along than 0, but in the same term so rejection // are returned instead of the votes being ignored. @@ -396,10 +394,10 @@ fn test_leader_election_with_config(pre_vote: bool, l: &Logger) { Network::new_with_config( vec![ None, - Some(ents_with_config(&[2], pre_vote, 2, vec![1, 2, 3, 4, 5], l)), - Some(ents_with_config(&[2], pre_vote, 3, vec![1, 2, 3, 4, 5], l)), + Some(ents_with_config(&[1], pre_vote, 2, vec![1, 2, 3, 4, 5], l)), + Some(ents_with_config(&[1], pre_vote, 3, vec![1, 2, 3, 4, 5], l)), Some(ents_with_config( - &[2, 2], + &[1, 1], pre_vote, 4, vec![1, 2, 3, 4, 5], @@ -411,7 +409,7 @@ fn test_leader_election_with_config(pre_vote: bool, l: &Logger) { l, ), StateRole::Follower, - 2, + 1, ), ]; @@ -426,7 +424,7 @@ fn test_leader_election_with_config(pre_vote: bool, l: &Logger) { // In pre-vote mode, an election that fails to complete // leaves the node in pre-candidate state without advancing // the term. - (StateRole::PreCandidate, 1) + (StateRole::PreCandidate, 0) } else { (state, term) }; @@ -682,7 +680,7 @@ fn test_log_replicatioin() { ( Network::new(vec![None, None, None], &l), vec![new_message(1, 1, MessageType::MsgPropose, 1)], - 3, + 2, ), ( Network::new(vec![None, None, None], &l), @@ -691,7 +689,7 @@ fn test_log_replicatioin() { new_message(1, 2, MessageType::MsgHup, 0), new_message(1, 2, MessageType::MsgPropose, 1), ], - 5, + 4, ), ]; @@ -731,11 +729,10 @@ fn test_log_replicatioin() { fn test_single_node_commit() { let l = default_logger(); let mut tt = Network::new(vec![None], &l); - assert_eq!(tt.peers[&1].raft_log.first_index(), 2); tt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); tt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]); tt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]); - assert_eq!(tt.peers[&1].raft_log.committed, 4); + assert_eq!(tt.peers[&1].raft_log.committed, 3); } // test_cannot_commit_without_new_term_entry tests the entries cannot be committed @@ -745,9 +742,7 @@ fn test_single_node_commit() { fn test_cannot_commit_without_new_term_entry() { let l = default_logger(); let mut tt = Network::new(vec![None, None, None, None, None], &l); - assert_eq!(tt.peers[&1].raft_log.committed, 1); tt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); - assert_eq!(tt.peers[&1].raft_log.committed, 2); // Empty entry of the term. // 0 cannot reach 2, 3, 4 tt.cut(1, 3); @@ -757,7 +752,7 @@ fn test_cannot_commit_without_new_term_entry() { tt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]); tt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]); - assert_eq!(tt.peers[&1].raft_log.committed, 2); + assert_eq!(tt.peers[&1].raft_log.committed, 1); // network recovery tt.recover(); @@ -768,7 +763,7 @@ fn test_cannot_commit_without_new_term_entry() { tt.send(vec![new_message(2, 2, MessageType::MsgHup, 0)]); // no log entries from previous term should be committed - assert_eq!(tt.peers[&2].raft_log.committed, 2); + assert_eq!(tt.peers[&2].raft_log.committed, 1); tt.recover(); // send heartbeat; reset wait @@ -776,7 +771,7 @@ fn test_cannot_commit_without_new_term_entry() { // append an entry at current term tt.send(vec![new_message(2, 2, MessageType::MsgPropose, 1)]); // expect the committed to be advanced - assert_eq!(tt.peers[&2].raft_log.committed, 6); + assert_eq!(tt.peers[&2].raft_log.committed, 5); } // test_commit_without_new_term_entry tests the entries could be committed @@ -795,7 +790,7 @@ fn test_commit_without_new_term_entry() { tt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]); tt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]); - assert_eq!(tt.peers[&1].raft_log.committed, 2); + assert_eq!(tt.peers[&1].raft_log.committed, 1); // network recovery tt.recover(); @@ -805,7 +800,7 @@ fn test_commit_without_new_term_entry() { // should be committed tt.send(vec![new_message(2, 2, MessageType::MsgHup, 0)]); - assert_eq!(tt.peers[&1].raft_log.committed, 5); + assert_eq!(tt.peers[&1].raft_log.committed, 4); } #[test] @@ -835,20 +830,14 @@ fn test_dueling_candidates() { // enough log. nt.send(vec![new_message(3, 3, MessageType::MsgHup, 0)]); - let raft_logs = vec![ - // committed, applied, last index. - (2, 1, 2), - (2, 1, 2), - (1, 1, 1), - ]; - let tests = vec![ - (StateRole::Follower, 3), - (StateRole::Follower, 3), - (StateRole::Follower, 3), + // role, term, committed, applied, last index. + (StateRole::Follower, 2, (1, 0, 1)), + (StateRole::Follower, 2, (1, 0, 1)), + (StateRole::Follower, 2, (0, 0, 0)), ]; - for (i, &(state, term)) in tests.iter().enumerate() { + for (i, &(state, term, raft_log)) in tests.iter().enumerate() { let id = i as u64 + 1; if nt.peers[&id].state != state { panic!( @@ -861,7 +850,7 @@ fn test_dueling_candidates() { } let prefix = format!("#{}: ", i); - assert_raft_log(&prefix, &nt.peers[&id].raft_log, raft_logs[i]); + assert_raft_log(&prefix, &nt.peers[&id].raft_log, raft_log); } } @@ -892,15 +881,13 @@ fn test_dueling_pre_candidates() { // With pre-vote, it does not disrupt the leader. nt.send(vec![new_message(3, 3, MessageType::MsgHup, 0)]); - // 3 items in every tuple is committed index, applied index and last index. - let expects = vec![(2, 1, 2), (2, 1, 2), (1, 1, 1)]; - let tests = vec![ - (1, StateRole::Leader, 2), - (2, StateRole::Follower, 2), - (3, StateRole::Follower, 2), + // role, term, committed, applied, last index. + (1, StateRole::Leader, 1, (1, 0, 1)), + (2, StateRole::Follower, 1, (1, 0, 1)), + (3, StateRole::Follower, 1, (0, 0, 0)), ]; - for (i, &(id, state, term)) in tests.iter().enumerate() { + for (i, &(id, state, term, raft_log)) in tests.iter().enumerate() { if nt.peers[&id].state != state { panic!( "#{}: state = {:?}, want {:?}", @@ -911,7 +898,7 @@ fn test_dueling_pre_candidates() { panic!("#{}: term = {}, want {}", i, nt.peers[&id].term, term); } let prefix = format!("#{}: ", i); - assert_raft_log(&prefix, &nt.peers[&id].raft_log, expects[i]); + assert_raft_log(&prefix, &nt.peers[&id].raft_log, raft_log); } } @@ -938,12 +925,12 @@ fn test_candidate_concede() { tt.send(vec![new_message(3, 3, MessageType::MsgBeat, 0)]); assert_eq!(tt.peers[&1].state, StateRole::Follower); - assert_eq!(tt.peers[&1].term, 2); + assert_eq!(tt.peers[&1].term, 1); for p in tt.peers.values() { - assert_eq!(p.raft_log.committed, 3); // All raft logs are committed. - assert_eq!(p.raft_log.applied, 1); // Raft logs are based on a snapshot with index 1. - assert_eq!(p.raft_log.last_index(), 3); + assert_eq!(p.raft_log.committed, 2); // All raft logs are committed. + assert_eq!(p.raft_log.applied, 0); // Raft logs are based on a snapshot with index 1. + assert_eq!(p.raft_log.last_index(), 2); } } @@ -985,9 +972,9 @@ fn test_old_messages() { for p in tt.peers.values() { let raft = p.raft.as_ref().unwrap(); - assert_eq!(raft.raft_log.committed, 5); - assert_eq!(raft.raft_log.applied, 1); - assert_eq!(raft.raft_log.last_index(), 5); + assert_eq!(raft.raft_log.committed, 4); + assert_eq!(raft.raft_log.applied, 0); + assert_eq!(raft.raft_log.last_index(), 4); } } @@ -1024,7 +1011,7 @@ fn test_proposal() { send(&mut nw, new_message(1, 1, MessageType::MsgPropose, 1)); // committed index, applied index and last index. - let want_log = if success { (3, 1, 3) } else { (1, 1, 1) }; + let want_log = if success { (2, 0, 2) } else { (0, 0, 0) }; for p in nw.peers.values() { if let Some(ref raft) = p.raft { @@ -1032,8 +1019,8 @@ fn test_proposal() { assert_raft_log(&prefix, &raft.raft_log, want_log); } } - if nw.peers[&1].term != 2 { - panic!("#{}: term = {}, want: {}", j, nw.peers[&1].term, 2); + if nw.peers[&1].term != 1 { + panic!("#{}: term = {}, want: {}", j, nw.peers[&1].term, 1); } } } @@ -1058,11 +1045,11 @@ fn test_proposal_by_proxy() { } if let Some(ref raft) = p.raft { let prefix = format!("#{}: ", j); - assert_raft_log(&prefix, &raft.raft_log, (3, 1, 3)); + assert_raft_log(&prefix, &raft.raft_log, (2, 0, 2)); } } - if tt.peers[&1].term != 2 { - panic!("#{}: term = {}, want {}", j, tt.peers[&1].term, 2); + if tt.peers[&1].term != 1 { + panic!("#{}: term = {}, want {}", j, tt.peers[&1].term, 1); } } } @@ -1072,38 +1059,87 @@ fn test_commit() { let l = default_logger(); let mut tests = vec![ // single - (vec![2], vec![empty_entry(2, 2)], 2, 2), + (vec![1], vec![empty_entry(1, 1)], 1, 1), + (vec![1], vec![empty_entry(1, 1)], 2, 0), + (vec![2], vec![empty_entry(1, 1), empty_entry(2, 2)], 2, 2), + (vec![1], vec![empty_entry(2, 1)], 2, 1), // odd - (vec![2, 1, 1], vec![empty_entry(2, 2)], 1, 1), - (vec![2, 1, 1], vec![empty_entry(1, 2)], 2, 1), - (vec![2, 1, 2], vec![empty_entry(2, 2)], 2, 2), - (vec![2, 1, 2], vec![empty_entry(1, 2)], 2, 1), + ( + vec![2, 1, 1], + vec![empty_entry(1, 1), empty_entry(2, 2)], + 1, + 1, + ), + ( + vec![2, 1, 1], + vec![empty_entry(1, 1), empty_entry(1, 2)], + 2, + 0, + ), + ( + vec![2, 1, 2], + vec![empty_entry(1, 1), empty_entry(2, 2)], + 2, + 2, + ), + ( + vec![2, 1, 2], + vec![empty_entry(1, 1), empty_entry(1, 2)], + 2, + 0, + ), // even - (vec![2, 1, 1, 1], vec![empty_entry(2, 2)], 1, 1), - (vec![2, 1, 1, 1], vec![empty_entry(1, 2)], 2, 1), - (vec![2, 1, 1, 2], vec![empty_entry(2, 2)], 1, 1), - (vec![2, 1, 1, 2], vec![empty_entry(1, 2)], 2, 1), - (vec![2, 1, 2, 2], vec![empty_entry(2, 2)], 2, 2), - (vec![2, 1, 2, 2], vec![empty_entry(1, 2)], 2, 1), + ( + vec![2, 1, 1, 1], + vec![empty_entry(1, 1), empty_entry(2, 2)], + 1, + 1, + ), + ( + vec![2, 1, 1, 1], + vec![empty_entry(1, 1), empty_entry(1, 2)], + 2, + 0, + ), + ( + vec![2, 1, 1, 2], + vec![empty_entry(1, 1), empty_entry(2, 2)], + 1, + 1, + ), + ( + vec![2, 1, 1, 2], + vec![empty_entry(1, 1), empty_entry(1, 2)], + 2, + 0, + ), + ( + vec![2, 1, 2, 2], + vec![empty_entry(1, 1), empty_entry(2, 2)], + 2, + 2, + ), + ( + vec![2, 1, 2, 2], + vec![empty_entry(1, 1), empty_entry(1, 2)], + 2, + 0, + ), ]; for (i, (matches, logs, sm_term, w)) in tests.drain(..).enumerate() { let store = MemStorage::new_with_conf_state((vec![1], vec![])); store.wl().append(&logs).unwrap(); - let cfg = new_test_config(1, 10, 1); - let mut sm = new_test_raft_with_config(&cfg, store, &l); let mut hs = HardState::default(); hs.term = sm_term; - sm.raft_log.store.wl().set_hardstate(hs); - sm.term = sm_term; + store.wl().set_hardstate(hs); + let cfg = new_test_config(1, 5, 1); + let mut sm = new_test_raft_with_config(&cfg, store, &l); - for (j, &v) in matches.iter().enumerate() { + for (j, v) in matches.iter().enumerate() { let id = j as u64 + 1; - if let Some(pr) = sm.mut_prs().get_mut(id) { - pr.matched = v; - pr.next_idx = v + 1; - } else { - sm.set_progress(id, v, v + 1, false); + if sm.mut_prs().get(id).is_none() { + sm.set_progress(id, *v, *v + 1, false); } } sm.maybe_commit(); @@ -1168,20 +1204,20 @@ fn test_handle_msg_append() { }; let mut tests = vec![ // Ensure 1 - (nm(2, 3, 3, 3, None), 3, 1, true), // previous log mismatch - (nm(2, 3, 4, 3, None), 3, 1, true), // previous log non-exist + (nm(2, 3, 2, 3, None), 2, 0, true), // previous log mismatch + (nm(2, 3, 3, 3, None), 2, 0, true), // previous log non-exist // Ensure 2 - (nm(2, 1, 2, 2, None), 3, 2, false), - (nm(2, 1, 1, 2, Some(vec![(2, 2)])), 2, 2, false), - (nm(2, 2, 3, 4, Some(vec![(4, 2), (5, 2)])), 5, 4, false), - (nm(2, 2, 3, 5, Some(vec![(4, 2)])), 4, 4, false), - (nm(2, 1, 2, 5, Some(vec![(3, 2)])), 3, 3, false), + (nm(2, 1, 1, 1, None), 2, 1, false), + (nm(2, 0, 0, 1, Some(vec![(1, 2)])), 1, 1, false), + (nm(2, 2, 2, 3, Some(vec![(3, 2), (4, 2)])), 4, 3, false), + (nm(2, 2, 2, 4, Some(vec![(3, 2)])), 3, 3, false), + (nm(2, 1, 1, 4, Some(vec![(2, 2)])), 2, 2, false), // Ensure 3 - (nm(1, 1, 2, 4, None), 3, 2, false), // match entry 1, commit up to last new entry 1 - (nm(1, 1, 2, 4, Some(vec![(3, 2)])), 3, 3, false), // match entry 1, commit up to last new + (nm(1, 1, 1, 3, None), 2, 1, false), // match entry 1, commit up to last new entry 1 + (nm(1, 1, 1, 3, Some(vec![(2, 2)])), 2, 2, false), // match entry 1, commit up to last new // entry 2 - (nm(2, 2, 3, 4, None), 3, 3, false), // match entry 2, commit up to last new entry 2 - (nm(2, 2, 3, 5, None), 3, 3, false), // commit up to log.last() + (nm(2, 2, 2, 3, None), 2, 2, false), // match entry 2, commit up to last new entry 2 + (nm(2, 2, 2, 4, None), 2, 2, false), // commit up to log.last() ]; for (j, (m, w_index, w_commit, w_reject)) in tests.drain(..).enumerate() { @@ -1191,7 +1227,7 @@ fn test_handle_msg_append() { 10, 1, MemStorage::new(), - &[empty_entry(1, 2), empty_entry(2, 3)], + &[empty_entry(1, 1), empty_entry(2, 2)], &l, ); @@ -1240,9 +1276,9 @@ fn test_handle_heartbeat() { let store = MemStorage::new_with_conf_state((vec![1, 2], vec![])); store .wl() - .append(&[empty_entry(1, 2), empty_entry(2, 3), empty_entry(3, 4)]) + .append(&[empty_entry(1, 1), empty_entry(2, 2), empty_entry(3, 3)]) .unwrap(); - let cfg = new_test_config(1, 10, 1); + let cfg = new_test_config(1, 5, 1); let mut sm = new_test_raft_with_config(&cfg, store, &l); sm.become_follower(2, 2); sm.raft_log.commit_to(commit); @@ -1368,9 +1404,9 @@ fn test_msg_append_response_wait_reset() { // Node 2 acks the first entry, making it committed. let mut m = new_message(2, 0, MessageType::MsgAppendResponse, 0); - m.index = 2; + m.index = 1; sm.step(m).expect(""); - assert_eq!(sm.raft_log.committed, 2); + assert_eq!(sm.raft_log.committed, 1); // Also consume the MsgApp messages that update Commit on the followers. sm.read_messages(); @@ -1386,7 +1422,7 @@ fn test_msg_append_response_wait_reset() { assert_eq!(msgs[0].get_msg_type(), MessageType::MsgAppend); assert_eq!(msgs[0].to, 2); assert_eq!(msgs[0].entries.len(), 1); - assert_eq!(msgs[0].entries[0].index, 3); + assert_eq!(msgs[0].entries[0].index, 2); // Now Node 3 acks the first entry. This releases the wait and entry 2 is sent. m = new_message(3, 0, MessageType::MsgAppendResponse, 0); @@ -1396,7 +1432,7 @@ fn test_msg_append_response_wait_reset() { assert_eq!(msgs.len(), 1); assert_eq!(msgs[0].get_msg_type(), MessageType::MsgAppend); assert_eq!(msgs[0].to, 3); - assert_eq!(msgs[0].entries.len(), 2); + assert_eq!(msgs[0].entries.len(), 1); assert_eq!(msgs[0].entries[0].index, 2); } @@ -1408,28 +1444,32 @@ fn test_recv_msg_request_vote() { fn test_recv_msg_request_vote_for_type(msg_type: MessageType, l: &Logger) { let mut tests = vec![ + (StateRole::Follower, 0, 0, INVALID_ID, true), + (StateRole::Follower, 0, 1, INVALID_ID, true), + (StateRole::Follower, 0, 2, INVALID_ID, true), + (StateRole::Follower, 0, 3, INVALID_ID, false), + (StateRole::Follower, 1, 0, INVALID_ID, true), (StateRole::Follower, 1, 1, INVALID_ID, true), (StateRole::Follower, 1, 2, INVALID_ID, true), (StateRole::Follower, 1, 3, INVALID_ID, false), + (StateRole::Follower, 2, 0, INVALID_ID, true), (StateRole::Follower, 2, 1, INVALID_ID, true), - (StateRole::Follower, 2, 2, INVALID_ID, true), + (StateRole::Follower, 2, 2, INVALID_ID, false), (StateRole::Follower, 2, 3, INVALID_ID, false), + (StateRole::Follower, 3, 0, INVALID_ID, true), (StateRole::Follower, 3, 1, INVALID_ID, true), (StateRole::Follower, 3, 2, INVALID_ID, false), (StateRole::Follower, 3, 3, INVALID_ID, false), - (StateRole::Follower, 4, 1, INVALID_ID, true), - (StateRole::Follower, 4, 2, INVALID_ID, false), - (StateRole::Follower, 4, 3, INVALID_ID, false), - (StateRole::Follower, 4, 2, 2, false), - (StateRole::Follower, 4, 2, 1, true), - (StateRole::Leader, 4, 3, 1, true), - (StateRole::PreCandidate, 4, 3, 1, true), - (StateRole::Candidate, 4, 3, 1, true), + (StateRole::Follower, 3, 2, 2, false), + (StateRole::Follower, 3, 2, 1, true), + (StateRole::Leader, 3, 3, 1, true), + (StateRole::PreCandidate, 3, 3, 1, true), + (StateRole::Candidate, 3, 3, 1, true), ]; for (j, (state, index, log_term, vote_for, w_reject)) in tests.drain(..).enumerate() { let store = MemStorage::new_with_conf_state((vec![1], vec![])); - let ents = &[empty_entry(2, 2), empty_entry(2, 3)]; + let ents = &[empty_entry(2, 1), empty_entry(2, 2)]; store.wl().append(ents).unwrap(); let mut sm = new_test_raft(1, vec![1], 10, 1, store, &l); sm.state = state; @@ -1487,67 +1527,67 @@ fn test_state_transition() { StateRole::Follower, StateRole::PreCandidate, true, - 1, + 0, INVALID_ID, ), ( StateRole::Follower, StateRole::Candidate, true, - 2, + 1, INVALID_ID, ), - (StateRole::Follower, StateRole::Leader, false, 1, INVALID_ID), + (StateRole::Follower, StateRole::Leader, false, 0, INVALID_ID), ( StateRole::PreCandidate, StateRole::Follower, true, - 1, + 0, INVALID_ID, ), ( StateRole::PreCandidate, StateRole::PreCandidate, true, - 1, + 0, INVALID_ID, ), ( StateRole::PreCandidate, StateRole::Candidate, true, - 2, + 1, INVALID_ID, ), - (StateRole::PreCandidate, StateRole::Leader, true, 1, 1), + (StateRole::PreCandidate, StateRole::Leader, true, 0, 1), ( StateRole::Candidate, StateRole::Follower, true, - 1, + 0, INVALID_ID, ), ( StateRole::Candidate, StateRole::PreCandidate, true, - 1, + 0, INVALID_ID, ), ( StateRole::Candidate, StateRole::Candidate, true, - 2, + 1, INVALID_ID, ), - (StateRole::Candidate, StateRole::Leader, true, 1, 1), + (StateRole::Candidate, StateRole::Leader, true, 0, 1), (StateRole::Leader, StateRole::Follower, true, 1, INVALID_ID), ( StateRole::Leader, StateRole::PreCandidate, false, - 1, + 0, INVALID_ID, ), ( @@ -1557,7 +1597,7 @@ fn test_state_transition() { 1, INVALID_ID, ), - (StateRole::Leader, StateRole::Leader, true, 1, 1), + (StateRole::Leader, StateRole::Leader, true, 0, 1), ]; for (i, (from, to, wallow, wterm, wlead)) in tests.drain(..).enumerate() { let sm: &mut Raft = &mut new_test_raft(1, vec![1], 10, 1, new_storage(), &l); @@ -1590,10 +1630,10 @@ fn test_all_server_stepdown() { let l = default_logger(); let mut tests = vec![ // state, want_state, term, last_index, entry count. - (StateRole::Follower, StateRole::Follower, 3, 1, 0), - (StateRole::PreCandidate, StateRole::Follower, 3, 1, 0), - (StateRole::Candidate, StateRole::Follower, 3, 1, 0), - (StateRole::Leader, StateRole::Follower, 3, 2, 1), + (StateRole::Follower, StateRole::Follower, 3, 0, 0), + (StateRole::PreCandidate, StateRole::Follower, 3, 0, 0), + (StateRole::Candidate, StateRole::Follower, 3, 0, 0), + (StateRole::Leader, StateRole::Follower, 3, 1, 1), ]; let tmsg_types = vec![MessageType::MsgRequestVote, MessageType::MsgAppend]; @@ -2116,12 +2156,12 @@ fn test_read_only_option_safe() { assert_eq!(nt.peers[&1].state, StateRole::Leader); let mut tests = vec![ - (1, 10, 12, vec!["ctx1", "ctx11"], false), - (2, 10, 22, vec!["ctx2", "ctx22"], false), - (3, 10, 32, vec!["ctx3", "ctx33"], false), - (1, 10, 42, vec!["ctx4", "ctx44"], true), - (2, 10, 52, vec!["ctx5", "ctx55"], true), - (3, 10, 62, vec!["ctx6", "ctx66"], true), + (1, 10, 11, vec!["ctx1", "ctx11"], false), + (2, 10, 21, vec!["ctx2", "ctx22"], false), + (3, 10, 31, vec!["ctx3", "ctx33"], false), + (1, 10, 41, vec!["ctx4", "ctx44"], true), + (2, 10, 51, vec!["ctx5", "ctx55"], true), + (3, 10, 61, vec!["ctx6", "ctx66"], true), ]; for (i, (id, proposals, wri, wctx, pending)) in tests.drain(..).enumerate() { @@ -2207,10 +2247,10 @@ fn test_read_only_with_learner() { assert_eq!(nt.peers[&2].state, StateRole::Follower); let mut tests = vec![ - (1, 10, 12, "ctx1"), - (2, 10, 22, "ctx2"), - (1, 10, 32, "ctx3"), - (2, 10, 42, "ctx4"), + (1, 10, 11, "ctx1"), + (2, 10, 21, "ctx2"), + (1, 10, 31, "ctx3"), + (2, 10, 41, "ctx4"), ]; for (i, (id, proposals, wri, wctx)) in tests.drain(..).enumerate() { @@ -2286,12 +2326,12 @@ fn test_read_only_option_lease() { assert_eq!(nt.peers[&1].state, StateRole::Leader); let mut tests = vec![ - (1, 10, 12, "ctx1"), - (2, 10, 22, "ctx2"), - (3, 10, 32, "ctx3"), - (1, 10, 42, "ctx4"), - (2, 10, 52, "ctx5"), - (3, 10, 62, "ctx6"), + (1, 10, 11, "ctx1"), + (2, 10, 21, "ctx2"), + (3, 10, 31, "ctx3"), + (1, 10, 41, "ctx4"), + (2, 10, 51, "ctx5"), + (3, 10, 61, "ctx6"), ]; for (i, (id, proposals, wri, wctx)) in tests.drain(..).enumerate() { @@ -2356,7 +2396,7 @@ fn test_read_only_option_lease_without_check_quorum() { let read_states = &nt.peers[&2].read_states; assert!(!read_states.is_empty()); let rs = &read_states[0]; - assert_eq!(rs.index, 2); + assert_eq!(rs.index, 1); let vec_ctx = ctx.as_bytes().to_vec(); assert_eq!(rs.request_ctx, vec_ctx); } @@ -2367,13 +2407,13 @@ fn test_read_only_option_lease_without_check_quorum() { fn test_read_only_for_new_leader() { let l = default_logger(); let heartbeat_ticks = 1; - let node_configs = vec![(1, 2, 2, 1), (2, 3, 3, 3), (3, 3, 3, 3)]; + let node_configs = vec![(1, 1, 1, 0), (2, 2, 2, 2), (3, 2, 2, 2)]; let mut peers = vec![]; for (id, committed, applied, compact_index) in node_configs { let mut cfg = new_test_config(id, 10, heartbeat_ticks); cfg.applied = applied; let storage = MemStorage::new_with_conf_state((vec![1, 2, 3], vec![])); - let entries = vec![empty_entry(1, 2), empty_entry(1, 3)]; + let entries = vec![empty_entry(1, 1), empty_entry(1, 2)]; storage.wl().append(&entries).unwrap(); let mut hs = HardState::default(); hs.term = 1; @@ -2395,7 +2435,7 @@ fn test_read_only_for_new_leader() { assert_eq!(nt.peers[&1].state, StateRole::Leader); // Ensure peer 1 drops read only request. - let windex = 5; + let windex = 4; let wctx = "ctx"; nt.send(vec![new_message_with_entries( 1, @@ -2412,7 +2452,7 @@ fn test_read_only_for_new_leader() { nt.peers.get_mut(&1).unwrap().tick(); } nt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]); - assert_eq!(nt.peers[&1].raft_log.committed, 5); + assert_eq!(nt.peers[&1].raft_log.committed, 4); assert_eq!( nt.peers[&1] .raft_log @@ -2461,14 +2501,14 @@ fn test_advance_commit_index_by_read_index_response() { // commit entries for leader but not node 2 tt.send(vec![new_message(3, 1, MessageType::MsgReadIndex, 1)]); - assert_eq!(tt.peers[&1].raft_log.committed, 4); - assert_eq!(tt.peers[&2].raft_log.committed, 2); + assert_eq!(tt.peers[&1].raft_log.committed, 3); + assert_eq!(tt.peers[&2].raft_log.committed, 1); tt.recover(); // use LeaseBased so leader won't send MsgHeartbeat to advance node 2's commit index tt.peers.get_mut(&1).unwrap().read_only.option = ReadOnlyOption::LeaseBased; tt.send(vec![new_message(2, 1, MessageType::MsgReadIndex, 1)]); - assert_eq!(tt.peers[&2].raft_log.committed, 4); + assert_eq!(tt.peers[&2].raft_log.committed, 3); } #[test] @@ -2477,12 +2517,12 @@ fn test_leader_append_response() { // Initial progress: match = 0, next = 4 on followers. let mut tests = vec![ // Stale resp; no replies. - (4, true, 0, 4, 0, 0, 0), + (3, true, 0, 3, 0, 0, 0), // Denied resp; decrease next and send probing message. - (3, true, 0, 3, 1, 2, 1), - // Accepted resp; leader commits to 3; broadcast with committed index. - (3, false, 3, 5, 2, 3, 3), - (0, false, 0, 4, 0, 0, 0), + (2, true, 0, 2, 1, 1, 0), + // Accepted resp; leader commits; broadcast with committed index. + (2, false, 2, 4, 2, 2, 2), + (0, false, 0, 3, 0, 0, 0), ]; for (i, (index, reject, wmatch, wnext, wmsg_num, windex, wcommitted)) in @@ -2490,7 +2530,7 @@ fn test_leader_append_response() { { // Initial raft logs: last index = 3, committed = 1. let store = MemStorage::new_with_conf_state((vec![1, 2, 3], vec![])); - let ents = &[empty_entry(1, 2), empty_entry(2, 3)]; + let ents = &[empty_entry(0, 1), empty_entry(1, 2)]; store.wl().append(ents).unwrap(); let mut sm = new_test_raft(1, vec![1, 2, 3], 10, 1, store, &l); @@ -2543,14 +2583,13 @@ fn test_leader_append_response() { #[test] fn test_bcast_beat() { let l = default_logger(); - let store = new_storage(); - let mut sm = new_test_raft(1, vec![1, 2, 3], 10, 1, store, &l); - // make a state machine with log.offset = 1000 let offset = 1000u64; let s = new_snapshot(offset, 1, vec![1, 2, 3]); - sm.restore(s.clone()); - sm.raft_log.store.wl().apply_snapshot(s).unwrap(); + let store = new_storage(); + store.wl().apply_snapshot(s).expect(""); + let mut sm = new_test_raft(1, vec![1, 2, 3], 10, 1, store, &l); + sm.term = 1; sm.become_candidate(); sm.become_leader(); @@ -2628,7 +2667,7 @@ fn test_recv_msg_beat() { for (i, (state, w_msg)) in tests.drain(..).enumerate() { let store = MemStorage::new_with_conf_state((vec![1, 2, 3], vec![])); - let ents = &[empty_entry(1, 2), empty_entry(1, 3)]; + let ents = &[empty_entry(0, 1), empty_entry(1, 2)]; store.wl().append(ents).unwrap(); let mut sm = new_test_raft(1, vec![1, 2, 3], 10, 1, store, &l); @@ -2656,14 +2695,14 @@ fn test_recv_msg_beat() { #[test] fn test_leader_increase_next() { let l = default_logger(); - let previous_ents = vec![empty_entry(1, 2), empty_entry(1, 3), empty_entry(1, 4)]; + let previous_ents = vec![empty_entry(1, 1), empty_entry(1, 2), empty_entry(1, 3)]; let mut tests = vec![ // state replicate; optimistically increase next - // previous entries + noop entry + propose + 2 + // previous entries + noop entry + propose + 1 ( ProgressState::Replicate, 2, - previous_ents.len() as u64 + 1 + 1 + 2, + previous_ents.len() as u64 + 1 + 1 + 1, ), // state probe, not optimistically increase next (ProgressState::Probe, 2, 2), @@ -2696,8 +2735,6 @@ fn test_send_append_for_progress_probe() { r.become_candidate(); r.become_leader(); r.read_messages(); - // Because on index 1 there is a snapshot. - r.mut_prs().get_mut(2).unwrap().maybe_update(2 - 1); r.mut_prs().get_mut(2).unwrap().become_probe(); // each round is a heartbeat @@ -2710,7 +2747,7 @@ fn test_send_append_for_progress_probe() { do_send_append(&mut r, 2); let msg = r.read_messages(); assert_eq!(msg.len(), 1); - assert_eq!(msg[0].index, 1); + assert_eq!(msg[0].index, 0); } assert!(r.prs().get(2).unwrap().paused); @@ -2738,7 +2775,7 @@ fn test_send_append_for_progress_probe() { .expect(""); let msg = r.read_messages(); assert_eq!(msg.len(), 1); - assert_eq!(msg[0].index, 1); + assert_eq!(msg[0].index, 0); assert!(r.prs().get(2).unwrap().paused); } @@ -2749,9 +2786,6 @@ fn test_send_append_for_progress_replicate() { r.become_candidate(); r.become_leader(); r.read_messages(); - // Suppose node 2 has received the snapshot, and becomes active. - r.mut_prs().get_mut(2).unwrap().next_idx = 2; - r.mut_prs().get_mut(2).unwrap().matched = 1; r.mut_prs().get_mut(2).unwrap().become_replicate(); for _ in 0..10 { @@ -2830,7 +2864,7 @@ fn test_restore_ignore_snapshot() { let l = default_logger(); let previous_ents = vec![empty_entry(1, 1), empty_entry(1, 2), empty_entry(1, 3)]; let commit = 1u64; - let mut sm = new_test_raft(1, vec![], 10, 1, new_storage(), &l); + let mut sm = new_test_raft(1, vec![1, 2], 10, 1, new_storage(), &l); sm.raft_log.append(&previous_ents); sm.raft_log.commit_to(commit); @@ -2988,7 +3022,7 @@ fn test_step_ignore_config() { let index = r.raft_log.last_index(); let pending_conf_index = r.pending_conf_index; r.step(m).expect(""); - let mut we = empty_entry(2, 4); + let mut we = empty_entry(1, 3); we.set_entry_type(EntryType::EntryNormal); let wents = vec![we]; let entries = r.raft_log.entries(index + 1, None).expect(""); @@ -3001,7 +3035,7 @@ fn test_step_ignore_config() { #[test] fn test_new_leader_pending_config() { let l = default_logger(); - let mut tests = vec![(false, 1), (true, 2)]; + let mut tests = vec![(false, 0), (true, 1)]; for (i, (add_entry, wpending_index)) in tests.drain(..).enumerate() { let mut r = new_test_raft(1, vec![1, 2], 10, 1, new_storage(), &l); let mut e = Entry::default(); @@ -3299,7 +3333,7 @@ fn test_leader_transfer_to_slow_follower() { nt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]); nt.recover(); - assert_eq!(nt.peers[&1].prs().get(3).unwrap().matched, 2); + assert_eq!(nt.peers[&1].prs().get(3).unwrap().matched, 1); // Transfer leadership to 3 when node 3 is lack of log. nt.send(vec![new_message(3, 1, MessageType::MsgTransferLeader, 0)]); @@ -3327,7 +3361,7 @@ fn test_leader_transfer_after_snapshot() { .unwrap(); nt.recover(); - assert_eq!(nt.peers[&1].prs().get(3).unwrap().matched, 2); + assert_eq!(nt.peers[&1].prs().get(3).unwrap().matched, 1); // Transfer leadership to 3 when node 3 is lack of snapshot. nt.send(vec![new_message(3, 1, MessageType::MsgTransferLeader, 0)]); @@ -3429,7 +3463,7 @@ fn test_leader_transfer_ignore_proposal() { "should return drop proposal error while transferring" ); - assert_eq!(nt.peers[&1].prs().get(1).unwrap().matched, 2); + assert_eq!(nt.peers[&1].prs().get(1).unwrap().matched, 1); } #[test] @@ -4260,12 +4294,12 @@ fn test_batch_msg_append() { assert_eq!(raft.msgs.len(), 2); for msg in &raft.msgs { assert_eq!(msg.entries.len(), 10); - assert_eq!(msg.index, 2); + assert_eq!(msg.index, 1); } // if the append entry is not continuous, raft should not batch the RPC let mut reject_msg = new_message(2, 1, MessageType::MsgAppendResponse, 0); reject_msg.reject = true; - reject_msg.index = 3; + reject_msg.index = 2; assert!(raft.step(reject_msg).is_ok()); assert_eq!(raft.msgs.len(), 3); } diff --git a/harness/tests/integration_cases/test_raft_flow_control.rs b/harness/tests/integration_cases/test_raft_flow_control.rs index d5171b391..fc7d84317 100644 --- a/harness/tests/integration_cases/test_raft_flow_control.rs +++ b/harness/tests/integration_cases/test_raft_flow_control.rs @@ -15,16 +15,7 @@ // limitations under the License. use crate::test_util::*; -use raft::{default_logger, eraftpb::*, Raft, Storage}; - -// Force progress `pr` to be in replicate state at `i`. -fn progress_become_replicate(r: &mut Raft, pr: u64, i: u64) -where - T: Storage, -{ - r.mut_prs().get_mut(pr).unwrap().maybe_update(i - 1); - r.mut_prs().get_mut(pr).unwrap().become_replicate(); -} +use raft::{default_logger, eraftpb::*}; // test_msg_app_flow_control_full ensures: // 1. msgApp can fill the sending window until full @@ -36,11 +27,8 @@ fn test_msg_app_flow_control_full() { r.become_candidate(); r.become_leader(); - // The configuration is initialized at 1 and the leader's empty entry is at 2. - assert_eq!(r.raft_log.last_index(), 2); - // force the progress to be in replicate state - progress_become_replicate(&mut r, 2, 2); + r.mut_prs().get_mut(2).unwrap().become_replicate(); // fill in the inflights window for i in 0..r.max_inflight { r.step(new_message(1, 1, MessageType::MsgPropose, 1)) @@ -76,11 +64,8 @@ fn test_msg_app_flow_control_move_forward() { r.become_candidate(); r.become_leader(); - // The configuration is initialized at 1 and the leader's empty entry is at 2. - assert_eq!(r.raft_log.last_index(), 2); - // force the progress to be in replicate state - progress_become_replicate(&mut r, 2, 2); + r.mut_prs().get_mut(2).unwrap().become_replicate(); // fill in the inflights window for _ in 0..r.max_inflight { r.step(new_message(1, 1, MessageType::MsgPropose, 1)) @@ -88,9 +73,9 @@ fn test_msg_app_flow_control_move_forward() { r.read_messages(); } - // 2 is noop, 3 is the first proposal we just sent. - // so we start with 3. - for tt in 3..r.max_inflight { + // 1 is noop, 2 is the first proposal we just sent. + // so we start with 2. + for tt in 2..r.max_inflight { // move forward the window let mut m = new_message(2, 1, MessageType::MsgAppendResponse, 0); m.index = tt as u64; @@ -134,7 +119,7 @@ fn test_msg_app_flow_control_recv_heartbeat() { r.become_leader(); // force the progress to be in replicate state - progress_become_replicate(&mut r, 2, 2); + r.mut_prs().get_mut(2).unwrap().become_replicate(); // fill in the inflights window for _ in 0..r.max_inflight { r.step(new_message(1, 1, MessageType::MsgPropose, 1)) diff --git a/harness/tests/integration_cases/test_raft_paper.rs b/harness/tests/integration_cases/test_raft_paper.rs index 62063a35a..75e0056b2 100644 --- a/harness/tests/integration_cases/test_raft_paper.rs +++ b/harness/tests/integration_cases/test_raft_paper.rs @@ -87,10 +87,10 @@ fn test_update_term_from_message(state: StateRole, l: &Logger) { } let mut m = new_message(0, 0, MessageType::MsgAppend, 0); - m.term = 3; + m.term = 2; r.step(m).expect(""); - assert_eq!(r.term, 3); + assert_eq!(r.term, 2); assert_eq!(r.state, StateRole::Follower); } @@ -128,7 +128,7 @@ fn test_leader_bcast_beat() { let new_message_ext = |f, to| { let mut m = new_message(f, to, MessageType::MsgHeartbeat, 0); - m.term = 2; + m.term = 1; m.commit = 0; m }; @@ -164,7 +164,7 @@ fn test_nonleader_start_election(state: StateRole, l: &Logger) { let et = 10; let mut r = new_test_raft(1, vec![1, 2, 3], et, 1, new_storage(), l); match state { - StateRole::Follower => r.become_follower(2, 2), + StateRole::Follower => r.become_follower(1, 2), StateRole::Candidate => r.become_candidate(), _ => panic!("Only non-leader role is accepted."), } @@ -173,16 +173,14 @@ fn test_nonleader_start_election(state: StateRole, l: &Logger) { r.tick(); } - assert_eq!(r.term, 3); + assert_eq!(r.term, 2); assert_eq!(r.state, StateRole::Candidate); assert!(r.votes[&r.id]); let mut msgs = r.read_messages(); msgs.sort_by_key(|m| format!("{:?}", m)); let new_message_ext = |f, to| { let mut m = new_message(f, to, MessageType::MsgRequestVote, 0); - m.term = 3; - m.log_term = 1; - m.index = 1; + m.term = 2; m }; let expect_msgs = vec![new_message_ext(1, 2), new_message_ext(1, 3)]; @@ -243,8 +241,8 @@ fn test_leader_election_in_one_round_rpc() { if r.state != state { panic!("#{}: state = {:?}, want {:?}", i, r.state, state); } - if r.term != 2 { - panic!("#{}: term = {}, want {}", i, r.term, 2); + if r.term != 1 { + panic!("#{}: term = {}, want {}", i, r.term, 1); } } } @@ -266,12 +264,10 @@ fn test_follower_vote() { for (i, (vote, nvote, wreject)) in tests.drain(..).enumerate() { let mut r = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage(), &l); - r.load_state(&hard_state(1, 1, vote)); + r.load_state(&hard_state(1, 0, vote)); let mut m = new_message(nvote, 1, MessageType::MsgRequestVote, 0); m.term = 1; - m.log_term = 1; - m.index = 1; r.step(m).expect(""); let msgs = r.read_messages(); @@ -440,12 +436,12 @@ fn test_leader_start_replication() { assert_eq!(r.raft_log.committed, li); let mut msgs = r.read_messages(); msgs.sort_by_key(|m| format!("{:?}", m)); - let wents = vec![new_entry(2, li + 1, SOME_DATA)]; + let wents = vec![new_entry(1, li + 1, SOME_DATA)]; let new_message_ext = |f, to, ents| { let mut m = new_message(f, to, MessageType::MsgAppend, 0); - m.term = 2; + m.term = 1; m.index = li; - m.log_term = 2; + m.log_term = 1; m.commit = li; m.entries = ents; m @@ -482,7 +478,7 @@ fn test_leader_commit_entry() { } assert_eq!(r.raft_log.committed, li + 1); - let wents = vec![new_entry(2, li + 1, SOME_DATA)]; + let wents = vec![new_entry(1, li + 1, SOME_DATA)]; assert_eq!(r.raft_log.next_entries(), Some(wents)); let mut msgs = r.read_messages(); msgs.sort_by_key(|m| format!("{:?}", m)); @@ -543,9 +539,9 @@ fn test_leader_commit_preceding_entries() { let l = default_logger(); let mut tests = vec![ vec![], - vec![empty_entry(2, 2)], - vec![empty_entry(1, 2), empty_entry(2, 3)], - vec![empty_entry(1, 2)], + vec![empty_entry(2, 1)], + vec![empty_entry(1, 1), empty_entry(2, 2)], + vec![empty_entry(1, 1)], ]; for (i, mut tt) in tests.drain(..).enumerate() { @@ -555,7 +551,7 @@ fn test_leader_commit_preceding_entries() { let cfg = new_test_config(1, 10, 1); new_test_raft_with_config(&cfg, store, &l) }; - r.load_state(&hard_state(2, 1, 0)); + r.load_state(&hard_state(2, 0, 0)); r.become_candidate(); r.become_leader(); @@ -568,8 +564,8 @@ fn test_leader_commit_preceding_entries() { let li = tt.len() as u64; tt.append(&mut vec![ - empty_entry(3, li + 2), - new_entry(3, li + 3, SOME_DATA), + empty_entry(3, li + 1), + new_entry(3, li + 2, SOME_DATA), ]); let g = r.raft_log.next_entries(); let wg = Some(tt); @@ -586,27 +582,27 @@ fn test_leader_commit_preceding_entries() { fn test_follower_commit_entry() { let l = default_logger(); let mut tests = vec![ - (vec![new_entry(1, 2, SOME_DATA)], 2), + (vec![new_entry(1, 1, SOME_DATA)], 1), ( vec![ - new_entry(1, 2, SOME_DATA), - new_entry(1, 3, Some("somedata2")), + new_entry(1, 1, SOME_DATA), + new_entry(1, 2, Some("somedata2")), ], - 3, + 2, ), ( vec![ - new_entry(1, 2, Some("somedata2")), - new_entry(1, 3, SOME_DATA), + new_entry(1, 1, Some("somedata2")), + new_entry(1, 2, SOME_DATA), ], - 3, + 2, ), ( vec![ - new_entry(1, 2, SOME_DATA), - new_entry(1, 3, Some("somedata2")), + new_entry(1, 1, SOME_DATA), + new_entry(1, 2, Some("somedata2")), ], - 2, + 1, ), ]; @@ -616,8 +612,6 @@ fn test_follower_commit_entry() { let mut m = new_message(2, 1, MessageType::MsgAppend, 0); m.term = 1; - m.log_term = 1; - m.index = 1; m.commit = commit; m.entries = ents.clone().into(); r.step(m).expect(""); @@ -628,7 +622,7 @@ fn test_follower_commit_entry() { i, r.raft_log.committed, commit ); } - let wents = Some(ents[..commit as usize - 1].to_vec()); + let wents = Some(ents[..commit as usize].to_vec()); let g = r.raft_log.next_entries(); if g != wents { panic!("#{}: next_ents = {:?}, want {:?}", i, g, wents); @@ -644,22 +638,22 @@ fn test_follower_commit_entry() { #[test] fn test_follower_check_msg_append() { let l = default_logger(); - let ents = vec![empty_entry(1, 2), empty_entry(2, 3)]; + let ents = vec![empty_entry(1, 1), empty_entry(2, 2)]; let mut tests = vec![ // match with committed entries - (1, 2, 2, false, 0), - (ents[0].term, ents[0].index, 2, false, 0), + (0, 0, 1, false, 0), + (ents[0].term, ents[0].index, 1, false, 0), // match with uncommitted entries - (ents[1].term, ents[1].index, 3, false, 0), + (ents[1].term, ents[1].index, 2, false, 0), // unmatch with existing entry - (ents[0].term, ents[1].index, ents[1].index, true, 3), + (ents[0].term, ents[1].index, ents[1].index, true, 2), // unexisting entry ( ents[1].term + 1, ents[1].index + 1, ents[1].index + 1, true, - 3, + 2, ), ]; for (i, (term, index, windex, wreject, wreject_hint)) in tests.drain(..).enumerate() { @@ -669,7 +663,7 @@ fn test_follower_check_msg_append() { let cfg = new_test_config(1, 10, 1); new_test_raft_with_config(&cfg, store, &l) }; - r.load_state(&hard_state(1, 1, 0)); + r.load_state(&hard_state(0, 1, 0)); r.become_follower(2, 2); let mut m = new_message(2, 1, MessageType::MsgAppend, 0); @@ -703,32 +697,32 @@ fn test_follower_append_entries() { let l = default_logger(); let mut tests = vec![ ( - 3, 2, - vec![empty_entry(3, 4)], - vec![empty_entry(1, 2), empty_entry(2, 3), empty_entry(3, 4)], - vec![empty_entry(3, 4)], - ), - ( 2, - 1, - vec![empty_entry(3, 3), empty_entry(4, 4)], - vec![empty_entry(1, 2), empty_entry(3, 3), empty_entry(4, 4)], - vec![empty_entry(3, 3), empty_entry(4, 4)], + vec![empty_entry(3, 3)], + vec![empty_entry(1, 1), empty_entry(2, 2), empty_entry(3, 3)], + vec![empty_entry(3, 3)], ), ( 1, 1, - vec![empty_entry(1, 2)], - vec![empty_entry(1, 2), empty_entry(2, 3)], + vec![empty_entry(3, 2), empty_entry(4, 3)], + vec![empty_entry(1, 1), empty_entry(3, 2), empty_entry(4, 3)], + vec![empty_entry(3, 2), empty_entry(4, 3)], + ), + ( + 0, + 0, + vec![empty_entry(1, 1)], + vec![empty_entry(1, 1), empty_entry(2, 2)], vec![], ), ( - 1, - 1, - vec![empty_entry(3, 2)], - vec![empty_entry(3, 2)], - vec![empty_entry(3, 2)], + 0, + 0, + vec![empty_entry(3, 1)], + vec![empty_entry(3, 1)], + vec![empty_entry(3, 1)], ), ]; for (i, (index, term, ents, wents, wunstable)) in tests.drain(..).enumerate() { @@ -736,7 +730,7 @@ fn test_follower_append_entries() { let store = MemStorage::new_with_conf_state((vec![1, 2, 3], vec![])); store .wl() - .append(&[empty_entry(1, 2), empty_entry(2, 3)]) + .append(&[empty_entry(1, 1), empty_entry(2, 2)]) .unwrap(); let cfg = new_test_config(1, 10, 1); new_test_raft_with_config(&cfg, store, &l) @@ -773,6 +767,7 @@ fn test_follower_append_entries() { fn test_leader_sync_follower_log() { let l = default_logger(); let ents = vec![ + empty_entry(1, 1), empty_entry(1, 2), empty_entry(1, 3), empty_entry(4, 4), @@ -786,6 +781,7 @@ fn test_leader_sync_follower_log() { let term = 8u64; let mut tests = vec![ vec![ + empty_entry(1, 1), empty_entry(1, 2), empty_entry(1, 3), empty_entry(4, 4), @@ -795,8 +791,14 @@ fn test_leader_sync_follower_log() { empty_entry(6, 8), empty_entry(6, 9), ], - vec![empty_entry(1, 2), empty_entry(1, 3), empty_entry(4, 4)], vec![ + empty_entry(1, 1), + empty_entry(1, 2), + empty_entry(1, 3), + empty_entry(4, 4), + ], + vec![ + empty_entry(1, 1), empty_entry(1, 2), empty_entry(1, 3), empty_entry(4, 4), @@ -809,6 +811,7 @@ fn test_leader_sync_follower_log() { empty_entry(6, 11), ], vec![ + empty_entry(1, 1), empty_entry(1, 2), empty_entry(1, 3), empty_entry(4, 4), @@ -822,6 +825,7 @@ fn test_leader_sync_follower_log() { empty_entry(7, 12), ], vec![ + empty_entry(1, 1), empty_entry(1, 2), empty_entry(1, 3), empty_entry(4, 4), @@ -830,6 +834,7 @@ fn test_leader_sync_follower_log() { empty_entry(4, 7), ], vec![ + empty_entry(1, 1), empty_entry(1, 2), empty_entry(1, 3), empty_entry(2, 4), @@ -858,7 +863,7 @@ fn test_leader_sync_follower_log() { let cfg = new_test_config(2, 10, 1); new_test_raft_with_config(&cfg, store, &l) }; - follower.load_state(&hard_state(term - 1, 1, 0)); + follower.load_state(&hard_state(term - 1, 0, 0)); // It is necessary to have a three-node cluster. // The second may have more up-to-date log than the first one, so the @@ -892,15 +897,15 @@ fn test_leader_sync_follower_log() { fn test_vote_request() { let l = default_logger(); let mut tests = vec![ - (vec![empty_entry(1, 2)], 2), - (vec![empty_entry(1, 2), empty_entry(2, 3)], 3), + (vec![empty_entry(1, 1)], 2), + (vec![empty_entry(1, 1), empty_entry(2, 2)], 3), ]; for (j, (ents, wterm)) in tests.drain(..).enumerate() { let mut r = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage(), &l); let mut m = new_message(2, 1, MessageType::MsgAppend, 0); m.term = wterm - 1; - m.log_term = 1; // log-term must be greater than 0. - m.index = 1; + m.log_term = 0; + m.index = 0; m.entries = ents.clone().into(); r.step(m).expect(""); r.read_messages(); @@ -950,17 +955,17 @@ fn test_voter() { let l = default_logger(); let mut tests = vec![ // same logterm - (vec![empty_entry(1, 2)], 1, 2, false), - (vec![empty_entry(1, 2)], 1, 3, false), - (vec![empty_entry(1, 2), empty_entry(1, 3)], 1, 1, true), + (vec![empty_entry(1, 1)], 1, 1, false), + (vec![empty_entry(1, 1)], 1, 2, false), + (vec![empty_entry(1, 1), empty_entry(1, 2)], 1, 1, true), // candidate higher logterm - (vec![empty_entry(1, 2)], 2, 2, false), - (vec![empty_entry(1, 2)], 2, 3, false), - (vec![empty_entry(1, 2), empty_entry(1, 3)], 2, 2, false), + (vec![empty_entry(1, 1)], 2, 1, false), + (vec![empty_entry(1, 1)], 2, 2, false), + (vec![empty_entry(1, 1), empty_entry(1, 2)], 2, 1, false), // voter higher logterm - (vec![empty_entry(2, 2)], 1, 2, true), - (vec![empty_entry(2, 2)], 1, 3, true), - (vec![empty_entry(2, 2), empty_entry(1, 3)], 1, 2, true), + (vec![empty_entry(2, 1)], 1, 1, true), + (vec![empty_entry(2, 1)], 1, 2, true), + (vec![empty_entry(2, 1), empty_entry(1, 2)], 1, 1, true), ]; for (i, (ents, log_term, index, wreject)) in tests.drain(..).enumerate() { let s = MemStorage::new_with_conf_state((vec![1, 2], vec![])); @@ -998,13 +1003,13 @@ fn test_voter() { #[test] fn test_leader_only_commits_log_from_current_term() { let l = default_logger(); - let ents = vec![empty_entry(1, 2), empty_entry(2, 3)]; + let ents = vec![empty_entry(1, 1), empty_entry(2, 2)]; let mut tests = vec![ // do not commit log entries in previous terms - (1, 1), - (2, 1), + (1, 0), + (2, 0), // commit log in current term - (4, 4), + (3, 3), ]; for (i, (index, wcommit)) in tests.drain(..).enumerate() { let mut r = { @@ -1013,7 +1018,7 @@ fn test_leader_only_commits_log_from_current_term() { let cfg = new_test_config(1, 10, 1); new_test_raft_with_config(&cfg, store, &l) }; - r.load_state(&hard_state(2, 1, 0)); + r.load_state(&hard_state(2, 0, 0)); // become leader at term 3 r.become_candidate(); diff --git a/harness/tests/integration_cases/test_raft_snap.rs b/harness/tests/integration_cases/test_raft_snap.rs index c76cab9e0..38e584ad8 100644 --- a/harness/tests/integration_cases/test_raft_snap.rs +++ b/harness/tests/integration_cases/test_raft_snap.rs @@ -130,7 +130,11 @@ fn test_snapshot_abort() { fn test_snapshot_with_min_term() { let l = default_logger(); let do_test = |pre_vote: bool| { - let n1 = new_test_raft_with_prevote(1, vec![1, 2], 10, 1, new_storage(), pre_vote, &l); + let s = new_storage(); + s.wl() + .apply_snapshot(new_snapshot(1, 1, vec![1, 2])) + .unwrap(); + let n1 = new_test_raft_with_prevote(1, vec![1, 2], 10, 1, s, pre_vote, &l); let n2 = new_test_raft_with_prevote(2, vec![], 10, 1, new_storage(), pre_vote, &l); let mut nt = Network::new(vec![Some(n1), Some(n2)], &l); nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); diff --git a/harness/tests/integration_cases/test_raw_node.rs b/harness/tests/integration_cases/test_raw_node.rs index cbc93e319..864c096ee 100644 --- a/harness/tests/integration_cases/test_raw_node.rs +++ b/harness/tests/integration_cases/test_raw_node.rs @@ -57,11 +57,14 @@ fn new_raw_node( logger: &Logger, ) -> RawNode { let config = new_test_config(id, election, heartbeat); - if storage.initial_state().unwrap().initialized() && peers.is_empty() { + if storage.initial_state().unwrap().initialized() && !peers.is_empty() { panic!("new_raw_node with empty peers on initialized store"); } if !peers.is_empty() && !storage.initial_state().unwrap().initialized() { - storage.initialize_with_conf_state((peers, vec![])); + storage + .wl() + .apply_snapshot(new_snapshot(1, 1, peers)) + .unwrap(); } RawNode::new(&config, storage, logger).unwrap() } @@ -71,19 +74,6 @@ fn new_raw_node( fn test_raw_node_step() { let l = default_logger(); for msg_t in MessageType::values() { - if vec![ - // Vote messages with term 0 will cause panics. - MessageType::MsgRequestVote, - MessageType::MsgRequestPreVote, - // MsgAppend and MsgSnapshot with log term 0 will cause test code panics. - MessageType::MsgAppend, - MessageType::MsgSnapshot, - ] - .contains(&msg_t) - { - continue; - } - let mut raw_node = new_raw_node(1, vec![1], 10, 1, new_storage(), &l); let res = raw_node.step(new_message(0, 0, *msg_t, 0)); // local msg should be ignored. @@ -190,7 +180,7 @@ fn test_raw_node_propose_and_conf_change() { raw_node.advance(rd); // Exit when we have 3 entries: one initial configuration, one no-op for the election - // and proposed ConfChange. + // our proposed command and proposed ConfChange. last_index = s.last_index().unwrap(); if last_index >= 3 { break; @@ -432,33 +422,33 @@ fn test_skip_bcast_commit() { test_entries.data = b"testdata".to_vec(); let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![test_entries]); nt.send(vec![msg.clone()]); - assert_eq!(nt.peers[&1].raft_log.committed, 3); - assert_eq!(nt.peers[&2].raft_log.committed, 2); - assert_eq!(nt.peers[&3].raft_log.committed, 2); + assert_eq!(nt.peers[&1].raft_log.committed, 2); + assert_eq!(nt.peers[&2].raft_log.committed, 1); + assert_eq!(nt.peers[&3].raft_log.committed, 1); // After bcast heartbeat, followers will be informed the actual commit index. for _ in 0..nt.peers[&1].randomized_election_timeout() { nt.peers.get_mut(&1).unwrap().tick(); } nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); - assert_eq!(nt.peers[&2].raft_log.committed, 3); - assert_eq!(nt.peers[&3].raft_log.committed, 3); + assert_eq!(nt.peers[&2].raft_log.committed, 2); + assert_eq!(nt.peers[&3].raft_log.committed, 2); // The feature should be able to be adjusted at run time. nt.peers.get_mut(&1).unwrap().skip_bcast_commit(false); nt.send(vec![msg.clone()]); - assert_eq!(nt.peers[&1].raft_log.committed, 4); - assert_eq!(nt.peers[&2].raft_log.committed, 4); - assert_eq!(nt.peers[&3].raft_log.committed, 4); + assert_eq!(nt.peers[&1].raft_log.committed, 3); + assert_eq!(nt.peers[&2].raft_log.committed, 3); + assert_eq!(nt.peers[&3].raft_log.committed, 3); nt.peers.get_mut(&1).unwrap().skip_bcast_commit(true); // Later proposal should commit former proposal. nt.send(vec![msg.clone()]); nt.send(vec![msg]); - assert_eq!(nt.peers[&1].raft_log.committed, 6); - assert_eq!(nt.peers[&2].raft_log.committed, 5); - assert_eq!(nt.peers[&3].raft_log.committed, 5); + assert_eq!(nt.peers[&1].raft_log.committed, 5); + assert_eq!(nt.peers[&2].raft_log.committed, 4); + assert_eq!(nt.peers[&3].raft_log.committed, 4); // When committing conf change, leader should always bcast commit. let mut cc = ConfChange::default(); @@ -478,7 +468,7 @@ fn test_skip_bcast_commit() { assert!(nt.peers[&2].should_bcast_commit()); assert!(nt.peers[&3].should_bcast_commit()); - assert_eq!(nt.peers[&1].raft_log.committed, 7); - assert_eq!(nt.peers[&2].raft_log.committed, 7); - assert_eq!(nt.peers[&3].raft_log.committed, 7); + assert_eq!(nt.peers[&1].raft_log.committed, 6); + assert_eq!(nt.peers[&2].raft_log.committed, 6); + assert_eq!(nt.peers[&3].raft_log.committed, 6); } diff --git a/src/raft.rs b/src/raft.rs index 1d919fc16..36aed5269 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -1087,8 +1087,6 @@ impl Raft { match m.get_msg_type() { MessageType::MsgHup => self.hup(false), MessageType::MsgRequestVote | MessageType::MsgRequestPreVote => { - debug_assert!(m.log_term != 0, "{:?} log term can't be 0", m); - // We can vote if this is a repeat of a vote we've already cast... let can_vote = (self.vote == m.from) || // ...we haven't voted and we don't think there's a leader yet in this term... @@ -1886,7 +1884,6 @@ impl Raft { self.send(to_send); return; } - debug_assert!(m.log_term != 0, "{:?} log term can't be 0", m); let mut to_send = Message::default(); to_send.to = m.from; @@ -1932,7 +1929,6 @@ impl Raft { } fn handle_snapshot(&mut self, mut m: Message) { - debug_assert!(m.term != 0, "{:?} term can't be 0", m); let metadata = m.get_snapshot().get_metadata(); let (sindex, sterm) = (metadata.index, metadata.term); if self.restore(m.take_snapshot()) { diff --git a/src/storage.rs b/src/storage.rs index 8e991cdb6..1be5a1eef 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -322,6 +322,8 @@ impl MemStorage { /// Create a new `MemStorage` with a given `Config`. The given `Config` will be used to /// initialize the storage. + /// + /// You should use the same input to initialize all nodes. pub fn new_with_conf_state(conf_state: T) -> MemStorage where ConfState: From, @@ -332,26 +334,20 @@ impl MemStorage { } /// Initialize a `MemStorage` with a given `Config`. + /// + /// You should use the same input to initialize all nodes. pub fn initialize_with_conf_state(&self, conf_state: T) where ConfState: From, { assert!(!self.initial_state().unwrap().initialized()); let mut core = self.wl(); - // Set index to 1 to make `first_index` greater than 1 so that there will be a gap between - // uninitialized followers and the leader. And then followers can catch up the initial - // configuration by snapshots. - // - // And, set term to 1 because in term 0 there is no leader exactly. + // Setting initial state is very important to build a correct raft, as raft algorithm + // itself only guarantees logs consistency. Typically, you need to ensure either all start + // states are the same on all nodes, or new nodes always catch up logs by snapshot first. // - // An another alternative is appending some conf-change entries here to construct the - // initial configuration so that followers can catch up it by raft logs. However the entry - // count depends on how many peers in the initial configuration, which makes some indices - // not predictable. So we choose snapshot instead of raft logs here. - core.snapshot_metadata.index = 1; - core.snapshot_metadata.term = 1; - core.raft_state.hard_state.commit = 1; - core.raft_state.hard_state.term = 1; + // In practice, we choose the second way by assigning non-zero index to first index. Here + // we choose the first way for historical reason and easier to write tests. core.raft_state.conf_state = ConfState::from(conf_state); } From fc244b66c9b6e394808995e545b26df67200b4b1 Mon Sep 17 00:00:00 2001 From: Jay Lee Date: Thu, 26 Mar 2020 00:30:55 +0800 Subject: [PATCH 2/3] fix bench Signed-off-by: Jay Lee --- benches/suites/raw_node.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benches/suites/raw_node.rs b/benches/suites/raw_node.rs index 83e69225a..ca104cc38 100644 --- a/benches/suites/raw_node.rs +++ b/benches/suites/raw_node.rs @@ -110,7 +110,7 @@ fn test_ready_raft_node(logger: &slog::Logger) -> RawNode { let mut e = Entry::default(); e.data = vec![0; 32 * 1024]; e.context = vec![]; - e.index = i + 1; + e.index = i; e.term = 1; entries.push(e); } From 5ea59dd76542f7a598d1f20567d8980fa25a9249 Mon Sep 17 00:00:00 2001 From: Jay Lee Date: Thu, 26 Mar 2020 15:45:18 +0800 Subject: [PATCH 3/3] address comment Signed-off-by: Jay Lee --- harness/tests/integration_cases/test_raw_node.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/harness/tests/integration_cases/test_raw_node.rs b/harness/tests/integration_cases/test_raw_node.rs index 864c096ee..384b23b39 100644 --- a/harness/tests/integration_cases/test_raw_node.rs +++ b/harness/tests/integration_cases/test_raw_node.rs @@ -57,7 +57,7 @@ fn new_raw_node( logger: &Logger, ) -> RawNode { let config = new_test_config(id, election, heartbeat); - if storage.initial_state().unwrap().initialized() && !peers.is_empty() { + if storage.initial_state().unwrap().initialized() && peers.is_empty() { panic!("new_raw_node with empty peers on initialized store"); } if !peers.is_empty() && !storage.initial_state().unwrap().initialized() {