Skip to content

Commit

Permalink
Merge pull request #601 from drmingdrmer/30-progress
Browse files Browse the repository at this point in the history
Refactor: specify the default value when building Progress
  • Loading branch information
drmingdrmer authored Nov 8, 2022
2 parents d6ac640 + 6b8d43a commit 6cac90c
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 41 deletions.
2 changes: 2 additions & 0 deletions examples/raft-kv-rocksdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,11 @@ impl ExampleStore {
fn store(&self) -> &ColumnFamily {
self.db.cf_handle("store").unwrap()
}

fn logs(&self) -> &ColumnFamily {
self.db.cf_handle("logs").unwrap()
}

fn get_last_purged_(&self) -> StorageResult<Option<LogId<u64>>> {
Ok(self
.db
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ where

let learner_ids = em.learner_ids().collect::<Vec<_>>();

leader.progress = old_progress.upgrade_quorum_set(em.membership.to_quorum_set(), &learner_ids);
leader.progress = old_progress.upgrade_quorum_set(em.membership.to_quorum_set(), &learner_ids, None);
}

// A leader that is removed will be shut down when this membership log is committed.
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/leader/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub(crate) struct Leader<NID: NodeId, QS: QuorumSet<NID>> {
pub(crate) vote_granted_by: BTreeSet<NID>,

/// Tracks the replication progress and committed index
pub(crate) progress: VecProgress<NID, Option<LogId<NID>>, QS>,
pub(crate) progress: VecProgress<NID, Option<LogId<NID>>, Option<LogId<NID>>, QS>,
}

impl<NID, QS> Leader<NID, QS>
Expand All @@ -37,7 +37,7 @@ where
pub(crate) fn new(quorum_set: QS, learner_ids: impl Iterator<Item = NID>) -> Self {
Self {
vote_granted_by: BTreeSet::new(),
progress: VecProgress::new(quorum_set, learner_ids),
progress: VecProgress::new(quorum_set, learner_ids, None),
}
}

Expand Down
2 changes: 1 addition & 1 deletion openraft/src/progress/bench/vec_progress_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::quorum::Joint;
fn progress_update_01234_567(b: &mut Bencher) {
let membership: Vec<Vec<u64>> = vec![vec![0, 1, 2, 3, 4], vec![5, 6, 7]];
let quorum_set = Joint::from(membership);
let mut progress = VecProgress::<u64, u64, _>::new(quorum_set, 0..=7);
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, 0..=7, 0);

let mut id = 0u64;
let mut values = vec![0, 1, 2, 3, 4, 5, 6, 7];
Expand Down
138 changes: 101 additions & 37 deletions openraft/src/progress/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,40 @@
#[cfg(test)]
mod bench;

use std::borrow::Borrow;
use std::fmt::Debug;
use std::slice::Iter;

use crate::quorum::QuorumSet;

/// Track progress of several incremental values.
/// It calculates the committed value through a `QuorumSet`, when one of the value is updated.
pub(crate) trait Progress<ID, V, QS>
///
/// When one of the value is updated, it uses a `QuorumSet` to calculate the committed value.
/// `ID` is the identifier of every progress value.
/// `V` is type of a progress entry.
/// `P` is the progress data of `V`, a progress entry `V` could contain other user data.
/// `QS` is a quorum set implementation.
pub(crate) trait Progress<ID, V, P, QS>
where
ID: 'static,
V: Borrow<P>,
QS: QuorumSet<ID>,
{
/// Update one of the scalar value and re-calculate the committed value.
///
/// It returns Err(committed) if the id is not in this progress.
fn update(&mut self, id: &ID, value: V) -> Result<&V, &V>;
/// It returns Err(committed) if the `id` is not found.
fn update(&mut self, id: &ID, value: V) -> Result<&P, &P>;

/// Get the value by `id`.
// TODO: is `get_mut()` required ?
fn get(&self, id: &ID) -> &V;

/// Get the greatest value that is granted by a quorum defined in [`quorum_set()`].
///
/// In raft or other distributed consensus,
/// To commit a value, the value has to be **granted by a quorum** and has to be the greatest value
/// every proposed.
fn granted(&self) -> &V;
fn granted(&self) -> &P;

/// Returns the reference to the quorum set
fn quorum_set(&self) -> &QS;
Expand All @@ -42,7 +50,7 @@ where
fn iter(&self) -> Iter<(ID, V)>;

/// Build a new instance with the new quorum set, inheriting progress data from `self`.
fn upgrade_quorum_set(self, quorum_set: QS, learner_ids: &[ID]) -> Self;
fn upgrade_quorum_set(self, quorum_set: QS, learner_ids: &[ID], default_v: V) -> Self;

/// Return if the given id is a voter.
///
Expand All @@ -58,7 +66,7 @@ where
/// Suitable for small quorum set.
#[derive(Clone, Debug)]
#[derive(PartialEq, Eq)]
pub(crate) struct VecProgress<ID, V, QS>
pub(crate) struct VecProgress<ID, V, P, QS>
where
ID: 'static,
QS: QuorumSet<ID>,
Expand All @@ -67,7 +75,7 @@ where
quorum_set: QS,

/// Currently already committed value.
granted: V,
granted: P,

/// Number of voters
voter_count: usize,
Expand All @@ -94,22 +102,24 @@ pub(crate) struct Stat {
is_quorum_count: u64,
}

impl<ID, V, QS> VecProgress<ID, V, QS>
impl<ID, V, P, QS> VecProgress<ID, V, P, QS>
where
ID: PartialEq + Copy + Debug + 'static,
V: PartialOrd + Ord + Default + 'static,
V: Copy + 'static,
V: Borrow<P>,
P: PartialOrd + Ord + Copy + 'static,
QS: QuorumSet<ID>,
{
pub(crate) fn new(quorum_set: QS, learner_ids: impl Iterator<Item = ID>) -> Self {
let mut vector = quorum_set.ids().map(|id| (id, V::default())).collect::<Vec<_>>();
pub(crate) fn new(quorum_set: QS, learner_ids: impl Iterator<Item = ID>, default_v: V) -> Self {
let mut vector = quorum_set.ids().map(|id| (id, default_v)).collect::<Vec<_>>();

let voter_count = vector.len();

vector.extend(learner_ids.map(|id| (id, V::default())));
vector.extend(learner_ids.map(|id| (id, default_v)));

Self {
quorum_set,
granted: V::default(),
granted: *default_v.borrow(),
voter_count,
vector,
stat: Default::default(),
Expand All @@ -133,7 +143,7 @@ where
fn move_up(&mut self, index: usize) -> usize {
self.stat.move_count += 1;
for i in (0..index).rev() {
if self.vector[i].1 < self.vector[i + 1].1 {
if self.vector[i].1.borrow() < self.vector[i + 1].1.borrow() {
self.vector.swap(i, i + 1);
} else {
return i + 1;
Expand All @@ -149,10 +159,12 @@ where
}
}

impl<ID, V, QS> Progress<ID, V, QS> for VecProgress<ID, V, QS>
impl<ID, V, P, QS> Progress<ID, V, P, QS> for VecProgress<ID, V, P, QS>
where
ID: PartialEq + Debug + Copy + 'static,
V: PartialOrd + Ord + Copy + Default + 'static,
V: Copy + 'static,
V: Borrow<P>,
P: PartialOrd + Ord + Copy + 'static,
QS: QuorumSet<ID> + 'static,
{
/// Update one of the scalar value and re-calculate the committed value.
Expand Down Expand Up @@ -187,7 +199,7 @@ where
/// ------------------------------
/// 1 3 5
/// ```
fn update(&mut self, id: &ID, value: V) -> Result<&V, &V> {
fn update(&mut self, id: &ID, value: V) -> Result<&P, &P> {
self.stat.update_count += 1;

let index = match self.index(id) {
Expand All @@ -200,12 +212,15 @@ where
let elt = &mut self.vector[index];
let prev = elt.1;

if prev == value {
let prev_progress = prev.borrow();
let new_progress = value.borrow();

if prev_progress == new_progress {
elt.1 = value; // no progress change, update the entire value.
return Ok(&self.granted);
}

debug_assert!(value > prev);

debug_assert!(new_progress > prev_progress);
elt.1 = value;

// Learner does not grant a value.
Expand All @@ -214,13 +229,17 @@ where
return Ok(&self.granted);
}

if prev <= self.granted && self.granted < value {
// Sort and find the greatest value granted by a quorum set.

if prev_progress <= &self.granted && &self.granted < new_progress {
let new_index = self.move_up(index);

// From high to low, find the max value that has constituted a quorum.
for i in new_index..self.vector.len() {
for i in new_index..self.voter_count {
let prog = self.vector[i].1.borrow();

// No need to re-calculate already committed value.
if self.vector[i].1 <= self.granted {
if prog <= &self.granted {
break;
}

Expand All @@ -230,7 +249,7 @@ where
self.stat.is_quorum_count += 1;

if self.quorum_set.is_quorum(it) {
self.granted = self.vector[i].1;
self.granted = *prog;
break;
}
}
Expand All @@ -244,7 +263,7 @@ where
&self.vector[index].1
}

fn granted(&self) -> &V {
fn granted(&self) -> &P {
&self.granted
}

Expand All @@ -256,8 +275,8 @@ where
self.vector.as_slice().iter()
}

fn upgrade_quorum_set(self, quorum_set: QS, leaner_ids: &[ID]) -> Self {
let mut new_prog = Self::new(quorum_set, leaner_ids.iter().copied());
fn upgrade_quorum_set(self, quorum_set: QS, leaner_ids: &[ID], default_v: V) -> Self {
let mut new_prog = Self::new(quorum_set, leaner_ids.iter().copied(), default_v);

new_prog.stat = self.stat.clone();

Expand All @@ -275,14 +294,16 @@ where

#[cfg(test)]
mod t {
use std::borrow::Borrow;

use super::Progress;
use super::VecProgress;
use crate::quorum::Joint;

#[test]
fn vec_progress_new() -> anyhow::Result<()> {
let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
let progress = VecProgress::<u64, u64, _>::new(quorum_set, [6, 7].into_iter());
let progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7].into_iter(), 0);

assert_eq!(
vec![
Expand All @@ -305,7 +326,7 @@ mod t {
#[test]
fn vec_progress_iter() -> anyhow::Result<()> {
let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
let mut progress = VecProgress::<u64, u64, _>::new(quorum_set, [6, 7].into_iter());
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7].into_iter(), 0);

let _ = progress.update(&7, 7);
let _ = progress.update(&3, 3);
Expand All @@ -332,7 +353,7 @@ mod t {
#[test]
fn vec_progress_move_up() -> anyhow::Result<()> {
let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
let mut progress = VecProgress::<u64, u64, _>::new(quorum_set, [6].into_iter());
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6].into_iter(), 0);

// initial: 0-0, 1-0, 2-0, 3-0, 4-0
let cases = vec![
Expand Down Expand Up @@ -364,7 +385,7 @@ mod t {
#[test]
fn vec_progress_update() -> anyhow::Result<()> {
let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
let mut progress = VecProgress::<u64, u64, _>::new(quorum_set, [6].into_iter());
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6].into_iter(), 0);

// initial: 0,0,0,0,0
let cases = vec![
Expand All @@ -387,10 +408,53 @@ mod t {
Ok(())
}

/// Progress entry for testing
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
struct ProgressEntry {
progress: u64,
user_data: &'static str,
}

impl Borrow<u64> for ProgressEntry {
fn borrow(&self) -> &u64 {
&self.progress
}
}

#[test]
fn vec_progress_update_struct_value() -> anyhow::Result<()> {
let pv = |p, user_data| ProgressEntry { progress: p, user_data };

let quorum_set: Vec<u64> = vec![0, 1, 2];
let mut progress = VecProgress::<u64, ProgressEntry, u64, _>::new(quorum_set, [3].into_iter(), pv(0, "foo"));

// initial: 0,0,0,0
let cases = vec![
(3, pv(9, "a"), Ok(&0)), // 0,0,0,9 // learner won't affect granted
(1, pv(2, "b"), Ok(&0)), // 0,2,0,9
(2, pv(3, "c"), Ok(&2)), // 0,2,3,9
(1, pv(2, "d"), Ok(&2)), // 0,2,3,9 // No new granted, just update user data.
];

for (ith, (id, v, want_committed)) in cases.iter().enumerate() {
let got = progress.update(id, *v);
assert_eq!(want_committed.clone(), got, "{}-th case: id:{}, v:{:?}", ith, id, v);
}

// Check progress data

assert_eq!(pv(0, "foo"), *progress.get(&0),);
assert_eq!(pv(2, "d"), *progress.get(&1),);
assert_eq!(pv(3, "c"), *progress.get(&2),);
assert_eq!(pv(9, "a"), *progress.get(&3),);

Ok(())
}

#[test]
fn vec_progress_update_does_not_move_learner_elt() -> anyhow::Result<()> {
let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
let mut progress = VecProgress::<u64, u64, _>::new(quorum_set, [6].into_iter());
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6].into_iter(), 0);

assert_eq!(Some(5), progress.index(&6));

Expand All @@ -410,7 +474,7 @@ mod t {

// Initially, committed is 5

let mut p012 = VecProgress::<u64, u64, _>::new(qs012, [5].into_iter());
let mut p012 = VecProgress::<u64, u64, u64, _>::new(qs012, [5].into_iter(), 0);

let _ = p012.update(&0, 5);
let _ = p012.update(&1, 6);
Expand All @@ -419,7 +483,7 @@ mod t {

// After upgrading to a bigger quorum set, committed fall back to 0

let mut p012_345 = p012.upgrade_quorum_set(qs012_345, &[6]);
let mut p012_345 = p012.upgrade_quorum_set(qs012_345, &[6], 0);
assert_eq!(
&0,
p012_345.granted(),
Expand All @@ -433,7 +497,7 @@ mod t {
let _ = p012_345.update(&4, 8);
assert_eq!(&5, p012_345.granted());

let p345 = p012_345.upgrade_quorum_set(qs345, &[1]);
let p345 = p012_345.upgrade_quorum_set(qs345, &[1], 0);

assert_eq!(&8, p345.granted(), "shrink quorum set, greater value becomes committed");
assert_eq!(&6, p345.get(&1), "inherit voter progress");
Expand All @@ -444,7 +508,7 @@ mod t {
#[test]
fn vec_progress_is_voter() -> anyhow::Result<()> {
let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
let progress = VecProgress::<u64, u64, _>::new(quorum_set, [6, 7].into_iter());
let progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7].into_iter(), 0);

assert_eq!(Some(true), progress.is_voter(&1));
assert_eq!(Some(true), progress.is_voter(&3));
Expand Down

0 comments on commit 6cac90c

Please sign in to comment.