Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <[email protected]>
  • Loading branch information
hicqu committed Oct 14, 2019
1 parent a718cb3 commit 1517451
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 98 deletions.
16 changes: 8 additions & 8 deletions harness/tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1938,7 +1938,7 @@ fn test_non_promotable_voter_which_check_quorum() {
.unwrap()
.set_randomized_election_timeout(b_election_timeout + 1);

nt.peers.get_mut(&2).unwrap().remove_node(2, 100).unwrap();
nt.peers.get_mut(&2).unwrap().remove_node(2).unwrap();
assert!(!nt.peers[&2].promotable());

for _ in 0..b_election_timeout {
Expand Down Expand Up @@ -3056,10 +3056,10 @@ fn test_add_node_check_quorum() -> Result<()> {
fn test_remove_node() -> Result<()> {
let l = default_logger();
let mut r = new_test_raft(1, vec![1, 2], 10, 1, new_storage(), &l);
r.remove_node(2, 100)?;
r.remove_node(2)?;
assert_eq!(r.prs().voter_ids().iter().next().unwrap(), &1);
// remove all nodes from cluster
r.remove_node(1, 100)?;
r.remove_node(1)?;
assert!(r.prs().voter_ids().is_empty());

Ok(())
Expand All @@ -3069,7 +3069,7 @@ fn test_remove_node() -> Result<()> {
fn test_remove_node_itself() -> Result<()> {
let l = default_logger().new(o!("test" => "remove_node_itself"));
let mut n1 = new_test_learner_raft(1, vec![1], vec![2], 10, 1, new_storage(), &l);
n1.remove_node(1, 100)?;
n1.remove_node(1)?;
assert_eq!(n1.prs().learner_ids().iter().next().unwrap(), &2);
assert!(n1.prs().voter_ids().is_empty());
Ok(())
Expand Down Expand Up @@ -3425,7 +3425,7 @@ fn test_leader_transfer_remove_node() -> Result<()> {
nt.send(vec![new_message(3, 1, MessageType::MsgTransferLeader, 0)]);
assert_eq!(nt.peers[&1].lead_transferee.unwrap(), 3);

nt.peers.get_mut(&1).unwrap().remove_node(3, 100)?;
nt.peers.get_mut(&1).unwrap().remove_node(3)?;

check_leader_transfer_state(&nt.peers[&1], StateRole::Leader, 1);

Expand Down Expand Up @@ -3871,7 +3871,7 @@ fn test_add_voter_peer_promotes_self_sets_is_learner() -> Result<()> {
n1.add_learner(1).ok();
assert!(n1.promotable());
assert!(n1.prs().voter_ids().contains(&1));
n1.remove_node(1, 100)?;
n1.remove_node(1)?;
n1.add_learner(1)?;
assert!(!n1.promotable());
assert!(n1.prs().learner_ids().contains(&1));
Expand All @@ -3885,11 +3885,11 @@ fn test_add_voter_peer_promotes_self_sets_is_learner() -> Result<()> {
fn test_remove_learner() -> Result<()> {
let l = default_logger();
let mut n1 = new_test_learner_raft(1, vec![1], vec![2], 10, 1, new_storage(), &l);
n1.remove_node(2, 100)?;
n1.remove_node(2)?;
assert_eq!(n1.prs().voter_ids().iter().next().unwrap(), &1);
assert!(n1.prs().learner_ids().is_empty());

n1.remove_node(1, 100)?;
n1.remove_node(1)?;
assert!(n1.prs().voter_ids().is_empty());
assert_eq!(n1.prs().learner_ids().len(), 0);

Expand Down
87 changes: 20 additions & 67 deletions src/progress/progress_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
// limitations under the License.

use std::cell::RefCell;
use std::mem;

use hashbrown::hash_map::DefaultHashBuilder;
use hashbrown::{HashMap, HashSet};
Expand Down Expand Up @@ -132,10 +131,6 @@ impl Configuration {
pub fn contains(&self, id: u64) -> bool {
self.voters.contains(&id) || self.learners.contains(&id)
}

fn iter(&self) -> impl Iterator<Item = &u64> {
self.voters.iter().chain(&self.learners)
}
}

/// The status of an election according to a Candidate node.
Expand Down Expand Up @@ -165,9 +160,6 @@ pub struct ProgressSet {
#[get = "pub"]
next_configuration: Option<Configuration>,

/// Uncommitted removed progresses with indices, which must be monotonically increasing.
pub(crate) uncommitted_removes: Vec<(u64, Vec<u64>)>,

// A preallocated buffer for sorting in the maximal_committed_index function.
// You should not depend on these values unless you just set them.
// We use a cell to avoid taking a `&mut self`.
Expand All @@ -191,7 +183,6 @@ impl ProgressSet {
sort_buffer: RefCell::from(Vec::with_capacity(voters)),
configuration: Configuration::with_capacity(voters, learners),
next_configuration: Option::default(),
uncommitted_removes: vec![],
logger,
}
}
Expand All @@ -200,7 +191,6 @@ impl ProgressSet {
self.progress.clear();
self.configuration.voters.clear();
self.configuration.learners.clear();
self.uncommitted_removes.clear();
self.next_configuration = None;
}

Expand Down Expand Up @@ -426,7 +416,7 @@ impl ProgressSet {
/// # Errors
///
/// * There is a pending membership change.
pub fn remove(&mut self, id: u64, index: u64) -> Result<()> {
pub fn remove(&mut self, id: u64) -> Result<()> {
debug!(self.logger, "Removing peer with id {id}", id = id);

if self.is_in_membership_change() {
Expand All @@ -436,12 +426,10 @@ impl ProgressSet {
}

if self.configuration.voters.remove(&id) {
self.uncommitted_removes.push((index, vec![id]));
self.assert_progress_and_configuration_consistent();
return Ok(());
}
if self.configuration.learners.remove(&id) {
self.uncommitted_removes.push((index, vec![id]));
self.assert_progress_and_configuration_consistent();
return Ok(());
}
Expand Down Expand Up @@ -662,56 +650,20 @@ impl ProgressSet {
///
/// This must be called only after calling `begin_membership_change` and after the majority
/// of peers in both the `current` and the `next` state have committed the changes.
pub(crate) fn finalize_membership_change(&mut self, index: u64) -> Result<()> {
let next = self.next_configuration.take();
match next {
None => Err(Error::NoPendingMembershipChange),
Some(next) => {
let removed = self
.configuration
.iter()
.filter(|&&id| !next.contains(id))
.cloned()
.collect();
self.configuration = next;
self.uncommitted_removes.push((index, removed));
debug!(
self.logger,
"Finalizing membership change";
"config" => ?self.configuration,
);
Ok(())
}
}
}

pub(crate) fn shrink_to(&mut self, committed_index: u64) {
if self.uncommitted_removes.is_empty() {
return;
}
let uncommitted_removes = mem::replace(&mut self.uncommitted_removes, vec![]);
for (offset, (index, removed)) in uncommitted_removes.iter().enumerate() {
if *index <= committed_index {
for id in removed {
self.progress.remove(id);
}
if self.progress.capacity() >= (self.progress.len() << 1) {
self.progress.shrink_to_fit();
}
} else {
self.uncommitted_removes
.extend_from_slice(&uncommitted_removes[offset..]);
break;
}
pub(crate) fn finalize_membership_change(&mut self) -> Result<()> {
if let Some(next) = self.next_configuration.take() {
self.configuration = next;
debug!(
self.logger,
"Finalizing membership change";
"config" => ?self.configuration,
);
return Ok(());
}
Err(Error::NoPendingMembershipChange)
}

pub(crate) fn revert(
&mut self,
conf: Configuration,
next_conf: Option<Configuration>,
index: u64,
) {
pub(crate) fn revert(&mut self, conf: Configuration, next_conf: Option<Configuration>) {
self.progress.retain(|id, _| {
conf.contains(*id) || next_conf.as_ref().map_or(false, |c| c.contains(*id))
});
Expand All @@ -721,13 +673,14 @@ impl ProgressSet {

self.configuration = conf;
self.next_configuration = next_conf;
}

// Only uncommitted configuration changes can be reverted, so that
// the current progress set must contains all peers after revert.
if !self.uncommitted_removes.is_empty() {
let x = mem::replace(&mut self.uncommitted_removes, vec![]).into_iter();
self.uncommitted_removes = x.take_while(|(i, _)| *i < index).collect();
}
pub(crate) fn gc(&mut self) {
let conf = &self.configuration;
let next_conf = &self.next_configuration;
self.progress.retain(|id, _| {
conf.contains(*id) || next_conf.as_ref().map_or(false, |c| c.contains(*id))
});
}
}

Expand Down Expand Up @@ -922,7 +875,7 @@ mod test_progress_set {
"Transition state learners inaccurate."
);

set.finalize_membership_change(100)?;
set.finalize_membership_change()?;
assert!(!set.is_in_membership_change());
assert_eq!(set.voter_ids(), end_voters, "End state voters inaccurate");
assert_eq!(
Expand Down
48 changes: 25 additions & 23 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,20 +652,8 @@ impl<T: Storage> Raft<T> {
///
/// So the new elected leader must have replicated the committed entry.
pub fn maybe_commit(&mut self) -> bool {
let pci = self.raft_log.committed;
let mci = self.prs().maximal_committed_index();
if self.raft_log.maybe_commit(mci, self.term) {
// There are at most 1 uncommitted configuration change.
let (last_cc_idx, in_membership_change) = {
let cs = self.conf_states.last().unwrap();
(cs.index, cs.in_membership_change)
};
if pci < last_cc_idx && last_cc_idx <= mci && in_membership_change {
self.append_finalize_conf_change_entry(last_cc_idx);
}
return true;
}
false
self.raft_log.maybe_commit(mci, self.term)
}

/// Commit that the Raft peer has applied up to the given index.
Expand Down Expand Up @@ -758,11 +746,11 @@ impl<T: Storage> Raft<T> {
let cs = Configuration::from_conf_state(cs);
let cs_next = &self.conf_states[conf_states_len - 1].conf_state;
let cs_next = Configuration::from_conf_state(cs_next);
prs.revert(cs, Some(cs_next), index);
prs.revert(cs, Some(cs_next));
} else {
let cs = &self.conf_states[conf_states_len - 1].conf_state;
let cs = Configuration::from_conf_state(cs);
prs.revert(cs, None, index);
prs.revert(cs, None);
}
self.prs = Some(prs);
}
Expand Down Expand Up @@ -804,7 +792,7 @@ impl<T: Storage> Raft<T> {
cs.conf_state = self.prs().configuration().to_conf_state();
}
ConfChangeType::RemoveNode => {
self.remove_node(cc.node_id, e.index)?;
self.remove_node(cc.node_id)?;
cs.conf_state = self.prs().configuration().to_conf_state();
}
ConfChangeType::BeginMembershipChange => {
Expand All @@ -815,7 +803,7 @@ impl<T: Storage> Raft<T> {
}
ConfChangeType::FinalizeMembershipChange => {
assert!(self.conf_states.last().unwrap().in_membership_change);
self.finalize_membership_change(e.index)?;
self.finalize_membership_change()?;
cs.conf_state = self.prs().configuration().to_conf_state();
}
}
Expand Down Expand Up @@ -1270,9 +1258,23 @@ impl<T: Storage> Raft<T> {
StateRole::Follower => self.step_follower(m),
StateRole::Leader => self.step_leader(m),
};
let curr_committed = self.raft_log.committed;
let mut curr_committed = self.raft_log.committed;
if curr_committed > prev_committed {
self.mut_prs().shrink_to(curr_committed);
// There are at most 1 uncommitted configuration change.
let (mut last_cc_idx, mut in_membership_change) = {
let cs = self.conf_states.last().unwrap();
(cs.index, cs.in_membership_change)
};
while prev_committed < last_cc_idx && last_cc_idx <= curr_committed {
if !in_membership_change {
self.mut_prs().gc();
break;
}
self.append_finalize_conf_change_entry(last_cc_idx);
curr_committed = self.raft_log.committed;
last_cc_idx = self.raft_log.last_index();
in_membership_change = false;
}
}
return res;
}
Expand Down Expand Up @@ -1335,11 +1337,11 @@ impl<T: Storage> Raft<T> {
///
/// * This Raft is not in a configuration change via `begin_membership_change`.
#[inline(always)]
fn finalize_membership_change(&mut self, index: u64) -> Result<()> {
fn finalize_membership_change(&mut self) -> Result<()> {
assert!(self.is_in_membership_change());
// Here we can't call `become_follower` because we need to bcast the entry later.
// Calling that function will cause wrong `next_idx`s.
self.mut_prs().finalize_membership_change(index)?;
self.mut_prs().finalize_membership_change()?;
self.promotable = self.prs().voter_ids().contains(&self.id);
Ok(())
}
Expand Down Expand Up @@ -2404,8 +2406,8 @@ impl<T: Storage> Raft<T> {
///
/// * `id` is not a voter or learner.
/// * There is a pending membership change. (See `is_in_membership_change()`)
pub fn remove_node(&mut self, id: u64, index: u64) -> Result<()> {
self.mut_prs().remove(id, index)?;
pub fn remove_node(&mut self, id: u64) -> Result<()> {
self.mut_prs().remove(id)?;
self.promotable = self.prs().voter_ids().contains(&self.id);

// do not try to commit or abort transferring if there are no voters in the cluster.
Expand Down

0 comments on commit 1517451

Please sign in to comment.