From d36c828cdd19388ebf269b53fec04b2c6e766ebb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Mon, 7 Nov 2022 21:45:26 +0800 Subject: [PATCH 1/3] Refactor: specify the default value when building Progress This is the first step to extend Progress to hold complete replication state: the `matching` log id and the `max_possible_matching` log index. I.e., progress should be `Progress, Option), _>`. Because `max_possible_matching` is decided by the leader, the default value of a progress entry must be specified when it is created. --- examples/raft-kv-rocksdb/src/store.rs | 2 ++ openraft/src/engine/engine_impl.rs | 2 +- openraft/src/leader/leader.rs | 2 +- .../src/progress/bench/vec_progress_update.rs | 2 +- openraft/src/progress/mod.rs | 36 +++++++++---------- 5 files changed, 23 insertions(+), 21 deletions(-) diff --git a/examples/raft-kv-rocksdb/src/store.rs b/examples/raft-kv-rocksdb/src/store.rs index 8aa161f23..9a1e6c0ba 100644 --- a/examples/raft-kv-rocksdb/src/store.rs +++ b/examples/raft-kv-rocksdb/src/store.rs @@ -221,9 +221,11 @@ impl ExampleStore { fn store(&self) -> &ColumnFamily { self.db.cf_handle("store").unwrap() } + fn logs(&self) -> &ColumnFamily { self.db.cf_handle("logs").unwrap() } + fn get_last_purged_(&self) -> StorageResult>> { Ok(self .db diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index 95177ef99..511f822e3 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -664,7 +664,7 @@ where let learner_ids = em.learner_ids().collect::>(); - leader.progress = old_progress.upgrade_quorum_set(em.membership.to_quorum_set(), &learner_ids); + leader.progress = old_progress.upgrade_quorum_set(em.membership.to_quorum_set(), &learner_ids, None); } // A leader that is removed will be shut down when this membership log is committed. diff --git a/openraft/src/leader/leader.rs b/openraft/src/leader/leader.rs index 9e3ad710b..5a2aa1391 100644 --- a/openraft/src/leader/leader.rs +++ b/openraft/src/leader/leader.rs @@ -37,7 +37,7 @@ where pub(crate) fn new(quorum_set: QS, learner_ids: impl Iterator) -> Self { Self { vote_granted_by: BTreeSet::new(), - progress: VecProgress::new(quorum_set, learner_ids), + progress: VecProgress::new(quorum_set, learner_ids, None), } } diff --git a/openraft/src/progress/bench/vec_progress_update.rs b/openraft/src/progress/bench/vec_progress_update.rs index 9a1062204..d9d3c0309 100644 --- a/openraft/src/progress/bench/vec_progress_update.rs +++ b/openraft/src/progress/bench/vec_progress_update.rs @@ -11,7 +11,7 @@ use crate::quorum::Joint; fn progress_update_01234_567(b: &mut Bencher) { let membership: Vec> = vec![vec![0, 1, 2, 3, 4], vec![5, 6, 7]]; let quorum_set = Joint::from(membership); - let mut progress = VecProgress::::new(quorum_set, 0..=7); + let mut progress = VecProgress::::new(quorum_set, 0..=7, 0); let mut id = 0u64; let mut values = vec![0, 1, 2, 3, 4, 5, 6, 7]; diff --git a/openraft/src/progress/mod.rs b/openraft/src/progress/mod.rs index e8a86c5ec..cae91111c 100644 --- a/openraft/src/progress/mod.rs +++ b/openraft/src/progress/mod.rs @@ -42,7 +42,7 @@ where fn iter(&self) -> Iter<(ID, V)>; /// Build a new instance with the new quorum set, inheriting progress data from `self`. - fn upgrade_quorum_set(self, quorum_set: QS, learner_ids: &[ID]) -> Self; + fn upgrade_quorum_set(self, quorum_set: QS, learner_ids: &[ID], default_v: V) -> Self; /// Return if the given id is a voter. /// @@ -97,19 +97,19 @@ pub(crate) struct Stat { impl VecProgress where ID: PartialEq + Copy + Debug + 'static, - V: PartialOrd + Ord + Default + 'static, + V: PartialOrd + Ord + Copy + 'static, QS: QuorumSet, { - pub(crate) fn new(quorum_set: QS, learner_ids: impl Iterator) -> Self { - let mut vector = quorum_set.ids().map(|id| (id, V::default())).collect::>(); + pub(crate) fn new(quorum_set: QS, learner_ids: impl Iterator, default_v: V) -> Self { + let mut vector = quorum_set.ids().map(|id| (id, default_v)).collect::>(); let voter_count = vector.len(); - vector.extend(learner_ids.map(|id| (id, V::default()))); + vector.extend(learner_ids.map(|id| (id, default_v))); Self { quorum_set, - granted: V::default(), + granted: default_v, voter_count, vector, stat: Default::default(), @@ -152,7 +152,7 @@ where impl Progress for VecProgress where ID: PartialEq + Debug + Copy + 'static, - V: PartialOrd + Ord + Copy + Default + 'static, + V: PartialOrd + Ord + Copy + 'static, QS: QuorumSet + 'static, { /// Update one of the scalar value and re-calculate the committed value. @@ -256,8 +256,8 @@ where self.vector.as_slice().iter() } - fn upgrade_quorum_set(self, quorum_set: QS, leaner_ids: &[ID]) -> Self { - let mut new_prog = Self::new(quorum_set, leaner_ids.iter().copied()); + fn upgrade_quorum_set(self, quorum_set: QS, leaner_ids: &[ID], default_v: V) -> Self { + let mut new_prog = Self::new(quorum_set, leaner_ids.iter().copied(), default_v); new_prog.stat = self.stat.clone(); @@ -282,7 +282,7 @@ mod t { #[test] fn vec_progress_new() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let progress = VecProgress::::new(quorum_set, [6, 7].into_iter()); + let progress = VecProgress::::new(quorum_set, [6, 7].into_iter(), 0); assert_eq!( vec![ @@ -305,7 +305,7 @@ mod t { #[test] fn vec_progress_iter() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let mut progress = VecProgress::::new(quorum_set, [6, 7].into_iter()); + let mut progress = VecProgress::::new(quorum_set, [6, 7].into_iter(), 0); let _ = progress.update(&7, 7); let _ = progress.update(&3, 3); @@ -332,7 +332,7 @@ mod t { #[test] fn vec_progress_move_up() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let mut progress = VecProgress::::new(quorum_set, [6].into_iter()); + let mut progress = VecProgress::::new(quorum_set, [6].into_iter(), 0); // initial: 0-0, 1-0, 2-0, 3-0, 4-0 let cases = vec![ @@ -364,7 +364,7 @@ mod t { #[test] fn vec_progress_update() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let mut progress = VecProgress::::new(quorum_set, [6].into_iter()); + let mut progress = VecProgress::::new(quorum_set, [6].into_iter(), 0); // initial: 0,0,0,0,0 let cases = vec![ @@ -390,7 +390,7 @@ mod t { #[test] fn vec_progress_update_does_not_move_learner_elt() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let mut progress = VecProgress::::new(quorum_set, [6].into_iter()); + let mut progress = VecProgress::::new(quorum_set, [6].into_iter(), 0); assert_eq!(Some(5), progress.index(&6)); @@ -410,7 +410,7 @@ mod t { // Initially, committed is 5 - let mut p012 = VecProgress::::new(qs012, [5].into_iter()); + let mut p012 = VecProgress::::new(qs012, [5].into_iter(), 0); let _ = p012.update(&0, 5); let _ = p012.update(&1, 6); @@ -419,7 +419,7 @@ mod t { // After upgrading to a bigger quorum set, committed fall back to 0 - let mut p012_345 = p012.upgrade_quorum_set(qs012_345, &[6]); + let mut p012_345 = p012.upgrade_quorum_set(qs012_345, &[6], 0); assert_eq!( &0, p012_345.granted(), @@ -433,7 +433,7 @@ mod t { let _ = p012_345.update(&4, 8); assert_eq!(&5, p012_345.granted()); - let p345 = p012_345.upgrade_quorum_set(qs345, &[1]); + let p345 = p012_345.upgrade_quorum_set(qs345, &[1], 0); assert_eq!(&8, p345.granted(), "shrink quorum set, greater value becomes committed"); assert_eq!(&6, p345.get(&1), "inherit voter progress"); @@ -444,7 +444,7 @@ mod t { #[test] fn vec_progress_is_voter() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let progress = VecProgress::::new(quorum_set, [6, 7].into_iter()); + let progress = VecProgress::::new(quorum_set, [6, 7].into_iter(), 0); assert_eq!(Some(true), progress.is_voter(&1)); assert_eq!(Some(true), progress.is_voter(&3)); From 9b0136c897107f383dfd563394e5962debe1743f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Tue, 8 Nov 2022 11:51:18 +0800 Subject: [PATCH 2/3] Refactor: add comment and fix typo --- openraft/src/progress/mod.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/openraft/src/progress/mod.rs b/openraft/src/progress/mod.rs index cae91111c..2b14ab7dc 100644 --- a/openraft/src/progress/mod.rs +++ b/openraft/src/progress/mod.rs @@ -214,11 +214,13 @@ where return Ok(&self.granted); } + // Sort and find the greatest value granted by a quorum set. + if prev <= self.granted && self.granted < value { let new_index = self.move_up(index); // From high to low, find the max value that has constituted a quorum. - for i in new_index..self.vector.len() { + for i in new_index..self.voter_count { // No need to re-calculate already committed value. if self.vector[i].1 <= self.granted { break; From 6b8d43a5b5512774de6fdb5b028ff4608cac88fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Tue, 8 Nov 2022 18:02:08 +0800 Subject: [PATCH 3/3] Refactor: let Progress store more info than just replication progress The leader must keep tracks of addtional information such as the `max_possible_matche` index , in order to give `Engine` full control over replication. --- openraft/src/leader/leader.rs | 2 +- .../src/progress/bench/vec_progress_update.rs | 2 +- openraft/src/progress/mod.rs | 118 +++++++++++++----- 3 files changed, 92 insertions(+), 30 deletions(-) diff --git a/openraft/src/leader/leader.rs b/openraft/src/leader/leader.rs index 5a2aa1391..98fa15c1f 100644 --- a/openraft/src/leader/leader.rs +++ b/openraft/src/leader/leader.rs @@ -26,7 +26,7 @@ pub(crate) struct Leader> { pub(crate) vote_granted_by: BTreeSet, /// Tracks the replication progress and committed index - pub(crate) progress: VecProgress>, QS>, + pub(crate) progress: VecProgress>, Option>, QS>, } impl Leader diff --git a/openraft/src/progress/bench/vec_progress_update.rs b/openraft/src/progress/bench/vec_progress_update.rs index d9d3c0309..6d23127ed 100644 --- a/openraft/src/progress/bench/vec_progress_update.rs +++ b/openraft/src/progress/bench/vec_progress_update.rs @@ -11,7 +11,7 @@ use crate::quorum::Joint; fn progress_update_01234_567(b: &mut Bencher) { let membership: Vec> = vec![vec![0, 1, 2, 3, 4], vec![5, 6, 7]]; let quorum_set = Joint::from(membership); - let mut progress = VecProgress::::new(quorum_set, 0..=7, 0); + let mut progress = VecProgress::::new(quorum_set, 0..=7, 0); let mut id = 0u64; let mut values = vec![0, 1, 2, 3, 4, 5, 6, 7]; diff --git a/openraft/src/progress/mod.rs b/openraft/src/progress/mod.rs index 2b14ab7dc..f9a725a40 100644 --- a/openraft/src/progress/mod.rs +++ b/openraft/src/progress/mod.rs @@ -8,24 +8,32 @@ #[cfg(test)] mod bench; +use std::borrow::Borrow; use std::fmt::Debug; use std::slice::Iter; use crate::quorum::QuorumSet; /// Track progress of several incremental values. -/// It calculates the committed value through a `QuorumSet`, when one of the value is updated. -pub(crate) trait Progress +/// +/// When one of the value is updated, it uses a `QuorumSet` to calculate the committed value. +/// `ID` is the identifier of every progress value. +/// `V` is type of a progress entry. +/// `P` is the progress data of `V`, a progress entry `V` could contain other user data. +/// `QS` is a quorum set implementation. +pub(crate) trait Progress where ID: 'static, + V: Borrow

, QS: QuorumSet, { /// Update one of the scalar value and re-calculate the committed value. /// - /// It returns Err(committed) if the id is not in this progress. - fn update(&mut self, id: &ID, value: V) -> Result<&V, &V>; + /// It returns Err(committed) if the `id` is not found. + fn update(&mut self, id: &ID, value: V) -> Result<&P, &P>; /// Get the value by `id`. + // TODO: is `get_mut()` required ? fn get(&self, id: &ID) -> &V; /// Get the greatest value that is granted by a quorum defined in [`quorum_set()`]. @@ -33,7 +41,7 @@ where /// In raft or other distributed consensus, /// To commit a value, the value has to be **granted by a quorum** and has to be the greatest value /// every proposed. - fn granted(&self) -> &V; + fn granted(&self) -> &P; /// Returns the reference to the quorum set fn quorum_set(&self) -> &QS; @@ -58,7 +66,7 @@ where /// Suitable for small quorum set. #[derive(Clone, Debug)] #[derive(PartialEq, Eq)] -pub(crate) struct VecProgress +pub(crate) struct VecProgress where ID: 'static, QS: QuorumSet, @@ -67,7 +75,7 @@ where quorum_set: QS, /// Currently already committed value. - granted: V, + granted: P, /// Number of voters voter_count: usize, @@ -94,10 +102,12 @@ pub(crate) struct Stat { is_quorum_count: u64, } -impl VecProgress +impl VecProgress where ID: PartialEq + Copy + Debug + 'static, - V: PartialOrd + Ord + Copy + 'static, + V: Copy + 'static, + V: Borrow

, + P: PartialOrd + Ord + Copy + 'static, QS: QuorumSet, { pub(crate) fn new(quorum_set: QS, learner_ids: impl Iterator, default_v: V) -> Self { @@ -109,7 +119,7 @@ where Self { quorum_set, - granted: default_v, + granted: *default_v.borrow(), voter_count, vector, stat: Default::default(), @@ -133,7 +143,7 @@ where fn move_up(&mut self, index: usize) -> usize { self.stat.move_count += 1; for i in (0..index).rev() { - if self.vector[i].1 < self.vector[i + 1].1 { + if self.vector[i].1.borrow() < self.vector[i + 1].1.borrow() { self.vector.swap(i, i + 1); } else { return i + 1; @@ -149,10 +159,12 @@ where } } -impl Progress for VecProgress +impl Progress for VecProgress where ID: PartialEq + Debug + Copy + 'static, - V: PartialOrd + Ord + Copy + 'static, + V: Copy + 'static, + V: Borrow

, + P: PartialOrd + Ord + Copy + 'static, QS: QuorumSet + 'static, { /// Update one of the scalar value and re-calculate the committed value. @@ -187,7 +199,7 @@ where /// ------------------------------ /// 1 3 5 /// ``` - fn update(&mut self, id: &ID, value: V) -> Result<&V, &V> { + fn update(&mut self, id: &ID, value: V) -> Result<&P, &P> { self.stat.update_count += 1; let index = match self.index(id) { @@ -200,12 +212,15 @@ where let elt = &mut self.vector[index]; let prev = elt.1; - if prev == value { + let prev_progress = prev.borrow(); + let new_progress = value.borrow(); + + if prev_progress == new_progress { + elt.1 = value; // no progress change, update the entire value. return Ok(&self.granted); } - debug_assert!(value > prev); - + debug_assert!(new_progress > prev_progress); elt.1 = value; // Learner does not grant a value. @@ -216,13 +231,15 @@ where // Sort and find the greatest value granted by a quorum set. - if prev <= self.granted && self.granted < value { + if prev_progress <= &self.granted && &self.granted < new_progress { let new_index = self.move_up(index); // From high to low, find the max value that has constituted a quorum. for i in new_index..self.voter_count { + let prog = self.vector[i].1.borrow(); + // No need to re-calculate already committed value. - if self.vector[i].1 <= self.granted { + if prog <= &self.granted { break; } @@ -232,7 +249,7 @@ where self.stat.is_quorum_count += 1; if self.quorum_set.is_quorum(it) { - self.granted = self.vector[i].1; + self.granted = *prog; break; } } @@ -246,7 +263,7 @@ where &self.vector[index].1 } - fn granted(&self) -> &V { + fn granted(&self) -> &P { &self.granted } @@ -277,6 +294,8 @@ where #[cfg(test)] mod t { + use std::borrow::Borrow; + use super::Progress; use super::VecProgress; use crate::quorum::Joint; @@ -284,7 +303,7 @@ mod t { #[test] fn vec_progress_new() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let progress = VecProgress::::new(quorum_set, [6, 7].into_iter(), 0); + let progress = VecProgress::::new(quorum_set, [6, 7].into_iter(), 0); assert_eq!( vec![ @@ -307,7 +326,7 @@ mod t { #[test] fn vec_progress_iter() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let mut progress = VecProgress::::new(quorum_set, [6, 7].into_iter(), 0); + let mut progress = VecProgress::::new(quorum_set, [6, 7].into_iter(), 0); let _ = progress.update(&7, 7); let _ = progress.update(&3, 3); @@ -334,7 +353,7 @@ mod t { #[test] fn vec_progress_move_up() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let mut progress = VecProgress::::new(quorum_set, [6].into_iter(), 0); + let mut progress = VecProgress::::new(quorum_set, [6].into_iter(), 0); // initial: 0-0, 1-0, 2-0, 3-0, 4-0 let cases = vec![ @@ -366,7 +385,7 @@ mod t { #[test] fn vec_progress_update() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let mut progress = VecProgress::::new(quorum_set, [6].into_iter(), 0); + let mut progress = VecProgress::::new(quorum_set, [6].into_iter(), 0); // initial: 0,0,0,0,0 let cases = vec![ @@ -389,10 +408,53 @@ mod t { Ok(()) } + /// Progress entry for testing + #[derive(Clone, Copy, Debug, PartialEq, Eq)] + struct ProgressEntry { + progress: u64, + user_data: &'static str, + } + + impl Borrow for ProgressEntry { + fn borrow(&self) -> &u64 { + &self.progress + } + } + + #[test] + fn vec_progress_update_struct_value() -> anyhow::Result<()> { + let pv = |p, user_data| ProgressEntry { progress: p, user_data }; + + let quorum_set: Vec = vec![0, 1, 2]; + let mut progress = VecProgress::::new(quorum_set, [3].into_iter(), pv(0, "foo")); + + // initial: 0,0,0,0 + let cases = vec![ + (3, pv(9, "a"), Ok(&0)), // 0,0,0,9 // learner won't affect granted + (1, pv(2, "b"), Ok(&0)), // 0,2,0,9 + (2, pv(3, "c"), Ok(&2)), // 0,2,3,9 + (1, pv(2, "d"), Ok(&2)), // 0,2,3,9 // No new granted, just update user data. + ]; + + for (ith, (id, v, want_committed)) in cases.iter().enumerate() { + let got = progress.update(id, *v); + assert_eq!(want_committed.clone(), got, "{}-th case: id:{}, v:{:?}", ith, id, v); + } + + // Check progress data + + assert_eq!(pv(0, "foo"), *progress.get(&0),); + assert_eq!(pv(2, "d"), *progress.get(&1),); + assert_eq!(pv(3, "c"), *progress.get(&2),); + assert_eq!(pv(9, "a"), *progress.get(&3),); + + Ok(()) + } + #[test] fn vec_progress_update_does_not_move_learner_elt() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let mut progress = VecProgress::::new(quorum_set, [6].into_iter(), 0); + let mut progress = VecProgress::::new(quorum_set, [6].into_iter(), 0); assert_eq!(Some(5), progress.index(&6)); @@ -412,7 +474,7 @@ mod t { // Initially, committed is 5 - let mut p012 = VecProgress::::new(qs012, [5].into_iter(), 0); + let mut p012 = VecProgress::::new(qs012, [5].into_iter(), 0); let _ = p012.update(&0, 5); let _ = p012.update(&1, 6); @@ -446,7 +508,7 @@ mod t { #[test] fn vec_progress_is_voter() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let progress = VecProgress::::new(quorum_set, [6, 7].into_iter(), 0); + let progress = VecProgress::::new(quorum_set, [6, 7].into_iter(), 0); assert_eq!(Some(true), progress.is_voter(&1)); assert_eq!(Some(true), progress.is_voter(&3));