Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix a bug about snapshot and adjust some test cases #213

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions examples/five_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ fn main() {
add_all_followers(proposals.as_ref());

// Put 100 key-value pairs.
println!("We get a 5 nodes Raft cluster now, now propose 100 proposals");
(0..100u16)
.filter(|i| {
let (proposal, rx) = Proposal::normal(*i, "hello, world".to_owned());
Expand All @@ -113,6 +114,9 @@ fn main() {
})
.count();

println!("Propose 100 proposals success!");

// FIXME: the program will be blocked here forever. Need to exit gracefully.
for th in handles {
th.join().unwrap();
}
Expand Down Expand Up @@ -207,6 +211,15 @@ fn on_ready(
return;
}

// Apply the snashot. It's also necessary with same reason as above.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Apply the snashot. It's also necessary with same reason as above.
// Apply the snapshot. It's also necessary with same reason as above.

Also, which reason? That the program will be blocked forever?

if *ready.snapshot() != Snapshot::new() {
let s = ready.snapshot().clone();
if let Err(e) = raft_group.raft.raft_log.store.wl().apply_snapshot(s) {
error!("apply snapshot fail: {:?}, need to retry or panic", e);
return;
}
}

// Send out the messages come from the node.
for msg in ready.messages.drain(..) {
let to = msg.get_to();
Expand Down
26 changes: 1 addition & 25 deletions harness/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use raft::{eraftpb::Message, storage::MemStorage, Progress, ProgressSet, Raft, Result};
use raft::{eraftpb::Message, storage::MemStorage, Raft, Result};
use std::ops::{Deref, DerefMut};

/// A simulated Raft façade for testing.
Expand Down Expand Up @@ -63,30 +63,6 @@ impl Interface {
None => vec![],
}
}

/// Initialize a raft with the given ID and peer set.
pub fn initial(&mut self, id: u64, ids: &[u64]) {
if self.raft.is_some() {
self.id = id;
let prs = self.take_prs();
self.set_prs(ProgressSet::with_capacity(
ids.len(),
prs.learner_ids().len(),
));
for id in ids {
let progress = Progress::new(0, 256);
if prs.learner_ids().contains(id) {
if let Err(e) = self.mut_prs().insert_learner(*id, progress) {
panic!("{}", e);
}
} else if let Err(e) = self.mut_prs().insert_voter(*id, progress) {
panic!("{}", e);
}
}
let term = self.term;
self.reset(term);
}
}
}

impl From<Option<Raft<MemStorage>>> for Interface {
Expand Down
54 changes: 31 additions & 23 deletions harness/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,39 @@ pub struct Network {
}

impl Network {
/// Initializes a network from peers.
/// Get a base config. Calling `Network::new` will initialize peers with this config.
pub fn base_config() -> Config {
Copy link
Contributor

@Hoverbear Hoverbear Apr 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn base_config() -> Config {
pub fn default_config() -> Config {

Config {
election_tick: 10,
heartbeat_tick: 1,
max_size_per_msg: NO_LIMIT,
max_inflight_msgs: 256,
..Default::default()
}
}

/// Initializes a network from `peers`.
///
/// Nodes will recieve their ID based on their index in the vector, starting with 1.
///
/// A `None` node will be replaced with a new Raft node.
/// A `None` node will be replaced with a new Raft node, and its configuration will
/// be `peers`.
pub fn new(peers: Vec<Option<Interface>>) -> Network {
Network::new_with_config(peers, false)
let config = Network::base_config();
Network::new_with_config(peers, &config)
}

/// Explicitly set the pre_vote option on newly created rafts.
///
/// **TODO:** Make this accept any config.
pub fn new_with_config(mut peers: Vec<Option<Interface>>, pre_vote: bool) -> Network {
let size = peers.len();
let peer_addrs: Vec<u64> = (1..=size as u64).collect();
/// Initialize a network from `peers` with explicitly specified `config`.
pub fn new_with_config(mut peers: Vec<Option<Interface>>, config: &Config) -> Network {
let peer_addrs: Vec<u64> = (1..=peers.len() as u64).collect();
for (i, id) in peer_addrs.iter().enumerate() {
if let Some(ref peer) = peers[i] {
if peer.raft.as_ref().map_or(false, |r| r.id != *id) {
panic!("peer {} in peers has a wrong position", peer.id);
}
}
}

let mut nstorage = HashMap::new();
let mut npeers = HashMap::new();
for (p, id) in peers.drain(..).zip(peer_addrs.clone()) {
Expand All @@ -84,23 +102,13 @@ impl Network {
let conf_state = ConfState::from((peer_addrs.clone(), vec![]));
let store = MemStorage::new_with_conf_state(conf_state);
nstorage.insert(id, store.clone());
let config = Config {
id,
election_tick: 10,
heartbeat_tick: 1,
max_size_per_msg: NO_LIMIT,
max_inflight_msgs: 256,
pre_vote,
tag: format!("{}", id),
..Default::default()
};
let mut config = config.clone();
config.id = id;
config.tag = format!("{}", id);
let r = Raft::new(&config, store).unwrap().into();
npeers.insert(id, r);
}
Some(mut p) => {
p.initial(id, &peer_addrs);
npeers.insert(id, p);
}
Some(r) => drop(npeers.insert(id, r)),
}
}
Network {
Expand Down
1 change: 1 addition & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use super::{
};

/// Config contains the parameters to start a raft.
#[derive(Clone)]
pub struct Config {
/// The identity of the local raft. It cannot be 0, and must be unique in the group.
pub id: u64,
Expand Down
5 changes: 4 additions & 1 deletion src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2040,7 +2040,10 @@ impl<T: Storage> Raft<T> {

let next_idx = self.raft_log.last_index() + 1;
let mut prs = ProgressSet::restore_snapmeta(meta, next_idx, self.max_inflight);
prs.get_mut(self.id).unwrap().matched = next_idx - 1;
if let Some(pr) = prs.get_mut(self.id) {
// The snapshot could be too old so that it doesn't contain the peer.
pr.matched = next_idx - 1;
}
if self.is_learner && prs.configuration().voters().contains(&self.id) {
self.is_learner = false;
}
Expand Down
7 changes: 6 additions & 1 deletion src/raft_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,12 @@ impl<T: Storage> RaftLog<T> {

/// Answers the question: Does this index belong to this term?
pub fn match_term(&self, idx: u64, term: u64) -> bool {
self.term(idx).map(|t| t == term).unwrap_or(false)
match self.term(idx) {
// For uninitialized storage, should return false.
Ok(0) => term == 0 && self.last_index() > 0,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it need to add some logics that exceeds etcd-raft.

Ok(t) => t == term,
_ => false,
}
}

/// Returns None if the entries cannot be appended. Otherwise,
Expand Down
5 changes: 5 additions & 0 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ impl MemStorageCore {
&self.raft_state.hard_state
}

/// Get the mut hard state.
pub fn mut_hard_state(&mut self) -> &mut HardState {
&mut self.raft_state.hard_state
}

/// Commit to an index.
///
/// # Panics
Expand Down
72 changes: 42 additions & 30 deletions tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,16 @@ fn read_messages<T: Storage>(raft: &mut Raft<T>) -> Vec<Message> {
raft.msgs.drain(..).collect()
}

fn ents_with_config(terms: &[u64], pre_vote: bool) -> Interface {
let store = MemStorage::new();
fn ents_with_config(terms: &[u64], pre_vote: bool, id: u64, peers: Vec<u64>) -> Interface {
let store = MemStorage::new_with_conf_state((peers.clone(), vec![]));
for (i, term) in terms.iter().enumerate() {
let mut e = Entry::new();
e.set_index(i as u64 + 1);
// An additional `plus one` for initialized storage.
e.set_index(i as u64 + 1 + 1);
e.set_term(*term);
store.wl().append(&[e]).expect("");
}
let mut raft = new_test_raft_with_prevote(1, vec![], 5, 1, store, pre_vote);
let mut raft = new_test_raft_with_prevote(id, peers, 5, 1, store, pre_vote);
raft.reset(terms[terms.len() - 1]);
raft
}
Expand Down Expand Up @@ -101,13 +102,11 @@ fn assert_raft_log(
// voted_with_config creates a raft state machine with vote and term set
// to the given value but no log entries (indicating that it voted in
// the given term but has not receive any logs).
fn voted_with_config(vote: u64, term: u64, pre_vote: bool) -> Interface {
let mut hard_state = HardState::new();
hard_state.set_vote(vote);
hard_state.set_term(term);
let store = MemStorage::new();
store.wl().set_hardstate(hard_state);
let mut raft = new_test_raft_with_prevote(1, vec![], 5, 1, store, pre_vote);
fn voted_with_config(vote: u64, term: u64, pre_vote: bool, id: u64, peers: Vec<u64>) -> Interface {
let store = MemStorage::new_with_conf_state((peers.clone(), vec![]));
store.wl().mut_hard_state().set_vote(vote);
store.wl().mut_hard_state().set_term(term);
let mut raft = new_test_raft_with_prevote(id, peers, 5, 1, store, pre_vote);
raft.reset(term);
raft
}
Expand Down Expand Up @@ -382,29 +381,31 @@ fn test_leader_election_pre_vote() {
}

fn test_leader_election_with_config(pre_vote: bool) {
let mut config = Network::base_config();
config.pre_vote = pre_vote;
let mut tests = vec![
(
Network::new_with_config(vec![None, None, None], pre_vote),
Network::new_with_config(vec![None, None, None], &config),
StateRole::Leader,
1,
),
(
Network::new_with_config(vec![None, None, NOP_STEPPER], pre_vote),
Network::new_with_config(vec![None, None, NOP_STEPPER], &config),
StateRole::Leader,
1,
),
(
Network::new_with_config(vec![None, NOP_STEPPER, NOP_STEPPER], pre_vote),
Network::new_with_config(vec![None, NOP_STEPPER, NOP_STEPPER], &config),
StateRole::Candidate,
1,
),
(
Network::new_with_config(vec![None, NOP_STEPPER, NOP_STEPPER, None], pre_vote),
Network::new_with_config(vec![None, NOP_STEPPER, NOP_STEPPER, None], &config),
StateRole::Candidate,
1,
),
(
Network::new_with_config(vec![None, NOP_STEPPER, NOP_STEPPER, None, None], pre_vote),
Network::new_with_config(vec![None, NOP_STEPPER, NOP_STEPPER, None, None], &config),
StateRole::Leader,
1,
),
Expand All @@ -414,12 +415,12 @@ fn test_leader_election_with_config(pre_vote: bool) {
Network::new_with_config(
vec![
None,
Some(ents_with_config(&[1], pre_vote)),
Some(ents_with_config(&[1], pre_vote)),
Some(ents_with_config(&[1, 1], pre_vote)),
Some(ents_with_config(&[1], pre_vote, 2, vec![1, 2, 3, 4, 5])),
Some(ents_with_config(&[1], pre_vote, 3, vec![1, 2, 3, 4, 5])),
Some(ents_with_config(&[1, 1], pre_vote, 4, vec![1, 2, 3, 4, 5])),
None,
],
pre_vote,
&config,
),
StateRole::Follower,
1,
Expand Down Expand Up @@ -467,7 +468,9 @@ fn test_leader_cycle_pre_vote() {
// pre-vote) work when not starting from a clean state (as they do in
// test_leader_election)
fn test_leader_cycle_with_config(pre_vote: bool) {
let mut network = Network::new_with_config(vec![None, None, None], pre_vote);
let mut config = Network::base_config();
config.pre_vote = pre_vote;
let mut network = Network::new_with_config(vec![None, None, None], &config);
for campaigner_id in 1..4 {
network.send(vec![new_message(
campaigner_id,
Expand Down Expand Up @@ -524,15 +527,18 @@ fn test_leader_election_overwrite_newer_logs_with_config(pre_vote: bool) {
// entry overwrites the loser's. (test_leader_sync_follower_log tests
// the case where older log entries are overwritten, so this test
// focuses on the case where the newer entries are lost).
let peers = vec![1, 2, 3, 4, 5];
let mut config = Network::base_config();
config.pre_vote = pre_vote;
let mut network = Network::new_with_config(
vec![
Some(ents_with_config(&[1], pre_vote)), // Node 1: Won first election
Some(ents_with_config(&[1], pre_vote)), // Node 2: Get logs from node 1
Some(ents_with_config(&[2], pre_vote)), // Node 3: Won second election
Some(voted_with_config(3, 2, pre_vote)), // Node 4: Voted but didn't get logs
Some(voted_with_config(3, 2, pre_vote)), // Node 5: Voted but didn't get logs
Some(ents_with_config(&[1], pre_vote, 1, peers.clone())), // Node 1: Won first election
Some(ents_with_config(&[1], pre_vote, 2, peers.clone())), // Node 2: Get logs from node 1
Some(ents_with_config(&[2], pre_vote, 3, peers.clone())), // Node 3: Won second election
Some(voted_with_config(3, 2, pre_vote, 4, peers.clone())), // Node 4: Voted but didn't get logs
Some(voted_with_config(3, 2, pre_vote, 5, peers.clone())), // Node 5: Voted but didn't get logs
],
pre_vote,
&config,
);

// Node 1 campaigns. The election fails because a quorum of nodes
Expand Down Expand Up @@ -892,7 +898,9 @@ fn test_dueling_pre_candidates() {
let b = new_test_raft_with_prevote(2, vec![1, 2, 3], 10, 1, new_storage(), true);
let c = new_test_raft_with_prevote(3, vec![1, 2, 3], 10, 1, new_storage(), true);

let mut nt = Network::new_with_config(vec![Some(a), Some(b), Some(c)], true);
let mut config = Network::base_config();
config.pre_vote = true;
let mut nt = Network::new_with_config(vec![Some(a), Some(b), Some(c)], &config);
nt.cut(1, 3);

nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);
Expand Down Expand Up @@ -977,7 +985,9 @@ fn test_single_node_candidate() {
#[test]
fn test_sinle_node_pre_candidate() {
setup_for_test();
let mut tt = Network::new_with_config(vec![None], true);
let mut config = Network::base_config();
config.pre_vote = true;
let mut tt = Network::new_with_config(vec![None], &config);
tt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

assert_eq!(tt.peers[&1].state, StateRole::Leader);
Expand Down Expand Up @@ -3492,7 +3502,9 @@ fn test_node_with_smaller_term_can_complete_election() {
n3.become_follower(1, INVALID_ID);

// cause a network partition to isolate node 3
let mut nt = Network::new_with_config(vec![Some(n1), Some(n2), Some(n3)], true);
let mut config = Network::base_config();
config.pre_vote = true;
let mut nt = Network::new_with_config(vec![Some(n1), Some(n2), Some(n3)], &config);
nt.cut(1, 3);
nt.cut(2, 3);

Expand Down
16 changes: 15 additions & 1 deletion tests/integration_cases/test_raft_snap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
// limitations under the License.

use crate::test_util::*;
use harness::setup_for_test;
use harness::{setup_for_test, Network};
use raft::eraftpb::*;

fn testing_snap() -> Snapshot {
Expand Down Expand Up @@ -134,3 +134,17 @@ fn test_snapshot_abort() {
assert_eq!(sm.prs().get(2).unwrap().pending_snapshot, 0);
assert_eq!(sm.prs().get(2).unwrap().next_idx, 12);
}

#[test]
fn test_snapshot_with_term_0() {
setup_for_test();
let n1 = new_test_raft(1, vec![1, 2], 10, 1, new_storage());
let n2 = new_test_raft(2, vec![], 10, 1, new_storage());
let mut nt = Network::new(vec![Some(n1), Some(n2)]);
nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);
let messages = nt.read_messages();
nt.send(messages);
// 1 will be elected as leader, and then send a snapshot and an empty entry to 2.
assert_eq!(nt.peers[&2].raft_log.first_index(), 2);
assert_eq!(nt.peers[&2].raft_log.last_index(), 2);
}