From 966eb287ff9bc98112b7b5d69253cca2d86277f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Thu, 3 Mar 2022 10:29:22 +0800 Subject: [PATCH] Feature: use a version to track metics change - Add `Versioned` to track changes of an `Arc`. In openraft, some frequently updated object such metrics are wrapped in an `Arc`, and some modification is made in place: by storing an `AtomicU64`. Thus we can not tell whether an `Arc` is changed by comparing them with `==` any more. In order to determine whether to broadcast a metrics instance, we need an additional `version` to track the changes applied to the `Arc`. These are all included in the `Versioned`. - Add trait `Update` to define variant update operation to apply to `Versioned`. `Update` has to implement two methods: - `apply_in_place()` to apply a modification in place if possible - and `apply_mut()` if it has to apply modification to a cloned instance. `Update` will increment the `Versioned.version` by 1 after each update. - Reimplement `LeaderMetrics` with `Versioned`. - Change: type of `LeaderMetrics.replication` from HashMap to BTreeMap for easing test. commit-id:2432f7a0 --- openraft/src/core/admin.rs | 8 +- openraft/src/core/mod.rs | 9 +- openraft/src/core/replication.rs | 23 +-- openraft/src/leader_metrics.rs | 81 +++++++++++ openraft/src/leader_metrics_test.rs | 104 ++++++++++++++ openraft/src/lib.rs | 4 + openraft/src/metrics.rs | 28 +--- openraft/src/replication/mod.rs | 2 +- openraft/src/versioned/mod.rs | 7 + openraft/src/versioned/versioned.rs | 134 ++++++++++++++++++ .../t90_issue_216_stale_last_log_id.rs | 8 +- openraft/tests/metrics/t30_leader_metrics.rs | 12 +- 12 files changed, 357 insertions(+), 63 deletions(-) create mode 100644 openraft/src/leader_metrics.rs create mode 100644 openraft/src/leader_metrics_test.rs create mode 100644 openraft/src/versioned/mod.rs create mode 100644 openraft/src/versioned/versioned.rs diff --git a/openraft/src/core/admin.rs b/openraft/src/core/admin.rs index 1506bc1a7..a2158e872 100644 --- a/openraft/src/core/admin.rs +++ b/openraft/src/core/admin.rs @@ -17,12 +17,14 @@ use crate::error::InitializeError; use crate::error::LearnerIsLagging; use crate::error::LearnerNotFound; use crate::error::NodeIdNotInNodes; +use crate::leader_metrics::RemoveTarget; use crate::membership::EitherNodesOrIds; use crate::raft::AddLearnerResponse; use crate::raft::ClientWriteResponse; use crate::raft::EntryPayload; use crate::raft::RaftRespTx; use crate::raft_types::LogIdOptionExt; +use crate::versioned::Updatable; use crate::LogId; use crate::Membership; use crate::Node; @@ -337,9 +339,9 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS tracing::info!("removed replication to: {}", target); self.nodes.remove(&target); - let mut metrics_clone = self.leader_metrics.as_ref().clone(); - metrics_clone.replication.remove(&target); - self.leader_metrics = Arc::new(metrics_clone); + + self.leader_metrics.update(RemoveTarget { target }); + true } } diff --git a/openraft/src/core/mod.rs b/openraft/src/core/mod.rs index 7cbe71054..48c5a246b 100644 --- a/openraft/src/core/mod.rs +++ b/openraft/src/core/mod.rs @@ -41,7 +41,7 @@ use crate::error::ExtractFatal; use crate::error::Fatal; use crate::error::ForwardToLeader; use crate::error::InitializeError; -use crate::metrics::LeaderMetrics; +use crate::leader_metrics::LeaderMetrics; use crate::metrics::RaftMetrics; use crate::raft::AddLearnerResponse; use crate::raft::Entry; @@ -53,6 +53,7 @@ use crate::raft_types::LogIdOptionExt; use crate::replication::ReplicaEvent; use crate::replication::ReplicationStream; use crate::storage::RaftSnapshotBuilder; +use crate::versioned::Versioned; use crate::vote::Vote; use crate::LeaderId; use crate::LogId; @@ -363,7 +364,7 @@ impl, S: RaftStorage> RaftCore>>>) { + fn report_metrics(&mut self, leader_metrics: Update>>>) { let leader_metrics = match leader_metrics { Update::Update(v) => v.cloned(), Update::AsIs => self.tx_metrics.borrow().leader_metrics.clone(), @@ -751,7 +752,7 @@ struct LeaderState<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStora pub(super) nodes: BTreeMap>, /// The metrics about a leader - pub leader_metrics: Arc>, + pub leader_metrics: Versioned>, /// The stream of events coming from replication streams. pub(super) replication_rx: mpsc::UnboundedReceiver<(ReplicaEvent, Span)>, @@ -770,7 +771,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS Self { core, nodes: BTreeMap::new(), - leader_metrics: Arc::new(LeaderMetrics::default()), + leader_metrics: Versioned::new(LeaderMetrics::default()), replication_tx, replication_rx, awaiting_committed: Vec::new(), diff --git a/openraft/src/core/replication.rs b/openraft/src/core/replication.rs index 0dbf282e1..5376f587d 100644 --- a/openraft/src/core/replication.rs +++ b/openraft/src/core/replication.rs @@ -1,7 +1,4 @@ use std::collections::BTreeMap; -use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering; -use std::sync::Arc; use tokio::sync::oneshot; use tracing_futures::Instrument; @@ -12,6 +9,7 @@ use crate::core::ReplicationState; use crate::core::SnapshotState; use crate::core::State; use crate::error::AddLearnerError; +use crate::leader_metrics::UpdateMatchedLogId; use crate::raft::AddLearnerResponse; use crate::raft::RaftRespTx; use crate::replication::RaftEvent; @@ -19,12 +17,12 @@ use crate::replication::ReplicaEvent; use crate::replication::ReplicationStream; use crate::storage::Snapshot; use crate::summary::MessageSummary; +use crate::versioned::Updatable; use crate::vote::Vote; use crate::LogId; use crate::RaftNetworkFactory; use crate::RaftStorage; use crate::RaftTypeConfig; -use crate::ReplicationMetrics; use crate::StorageError; impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderState<'a, C, N, S> { @@ -181,23 +179,8 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS #[tracing::instrument(level = "trace", skip(self))] fn update_leader_metrics(&mut self, target: C::NodeId, matched: LogId) { tracing::debug!(%target, ?matched, "update_leader_metrics"); - let (matched_leader_id, matched_index) = (matched.leader_id, matched.index); - if let Some(target_metrics) = self.leader_metrics.replication.get(&target) { - if target_metrics.matched_leader_id == matched_leader_id { - // we can update the metrics in-place - target_metrics.matched_index.store(matched_index, Ordering::Relaxed); - return; - } - } - // either the record does not exist or the leader ID is different - // create a new object with updated metrics - let mut metrics_clone = self.leader_metrics.as_ref().clone(); - metrics_clone.replication.insert(target, ReplicationMetrics { - matched_leader_id, - matched_index: AtomicU64::new(matched_index), - }); - self.leader_metrics = Arc::new(metrics_clone); + self.leader_metrics.update(UpdateMatchedLogId { target, matched }); } #[tracing::instrument(level = "trace", skip(self))] diff --git a/openraft/src/leader_metrics.rs b/openraft/src/leader_metrics.rs new file mode 100644 index 000000000..d1d75641e --- /dev/null +++ b/openraft/src/leader_metrics.rs @@ -0,0 +1,81 @@ +use std::collections::BTreeMap; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +use serde::Deserialize; +use serde::Serialize; + +use crate::versioned::Update; +use crate::versioned::UpdateError; +use crate::LogId; +use crate::MessageSummary; +use crate::NodeId; +use crate::RaftTypeConfig; +use crate::ReplicationMetrics; + +/// The metrics about the leader. It is Some() only when this node is leader. +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] +pub struct LeaderMetrics { + /// Replication metrics of all known replication target: voters and learners + pub replication: BTreeMap>, +} + +impl MessageSummary for LeaderMetrics { + fn summary(&self) -> String { + let mut res = vec!["LeaderMetrics{".to_string()]; + for (i, (k, v)) in self.replication.iter().enumerate() { + if i > 0 { + res.push(", ".to_string()); + } + res.push(format!("{}:{}", k, v.summary())); + } + + res.push("}".to_string()); + res.join("") + } +} + +/// Update one replication metrics in `LeaderMetrics.replication`. +pub struct UpdateMatchedLogId { + pub target: NID, + pub matched: LogId, +} + +impl Update> for UpdateMatchedLogId { + /// If there is already a record for the target node. Just modify the atomic u64. + fn apply_in_place(&self, to: &Arc>) -> Result<(), UpdateError> { + let target_metrics = to.replication.get(&self.target).ok_or(UpdateError::CanNotUpdateInPlace)?; + + if target_metrics.matched_leader_id == self.matched.leader_id { + target_metrics.matched_index.store(self.matched.index, Ordering::Relaxed); + return Ok(()); + } + + Err(UpdateError::CanNotUpdateInPlace) + } + + /// To insert a new record always work. + fn apply_mut(&self, to: &mut LeaderMetrics) { + to.replication.insert(self.target, ReplicationMetrics { + matched_leader_id: self.matched.leader_id, + matched_index: AtomicU64::new(self.matched.index), + }); + } +} + +/// Remove one replication metrics in `LeaderMetrics.replication`. +pub struct RemoveTarget { + pub target: NID, +} + +impl Update> for RemoveTarget { + /// Removing can not be done in place + fn apply_in_place(&self, _to: &Arc>) -> Result<(), UpdateError> { + Err(UpdateError::CanNotUpdateInPlace) + } + + fn apply_mut(&self, to: &mut LeaderMetrics) { + to.replication.remove(&self.target); + } +} diff --git a/openraft/src/leader_metrics_test.rs b/openraft/src/leader_metrics_test.rs new file mode 100644 index 000000000..0777ac026 --- /dev/null +++ b/openraft/src/leader_metrics_test.rs @@ -0,0 +1,104 @@ +use crate::leader_metrics::LeaderMetrics; +use crate::leader_metrics::UpdateMatchedLogId; +use crate::testing::DummyConfig; +use crate::versioned::Updatable; +use crate::versioned::Versioned; +use crate::LeaderId; +use crate::LogId; +use crate::MessageSummary; + +#[test] +fn test_versioned() -> anyhow::Result<()> { + let mut a = Versioned::new(LeaderMetrics:: { + replication: Default::default(), + }); + + assert_eq!("{ver:0, LeaderMetrics{}}", a.summary()); + + // In place update + + a.update(UpdateMatchedLogId { + target: 1, + matched: LogId::new(LeaderId::new(1, 2), 3), + }); + + assert_eq!("{ver:1, LeaderMetrics{1:1-2-3}}", a.summary()); + + let mut b1 = a.clone(); + + // Two instances reference the same data. + // In place update applies to both instance. + + b1.update(UpdateMatchedLogId { + target: 1, + matched: LogId::new(LeaderId::new(1, 2), 5), + }); + assert_eq!("{ver:1, LeaderMetrics{1:1-2-5}}", a.summary()); + assert_eq!("{ver:2, LeaderMetrics{1:1-2-5}}", b1.summary()); + + // In place update is not possible. + // Fall back to cloned update + + b1.update(UpdateMatchedLogId { + target: 2, + matched: LogId::new(LeaderId::new(1, 2), 5), + }); + assert_eq!("{ver:1, LeaderMetrics{1:1-2-5}}", a.summary()); + assert_eq!("{ver:3, LeaderMetrics{1:1-2-5, 2:1-2-5}}", b1.summary()); + + // a and b1 have the same content but not equal, because they reference different data. + + a.update(UpdateMatchedLogId { + target: 1, + matched: LogId::new(LeaderId::new(1, 2), 5), + }); + a.update(UpdateMatchedLogId { + target: 2, + matched: LogId::new(LeaderId::new(1, 2), 5), + }); + assert_eq!("{ver:3, LeaderMetrics{1:1-2-5, 2:1-2-5}}", a.summary()); + assert_eq!("{ver:3, LeaderMetrics{1:1-2-5, 2:1-2-5}}", b1.summary()); + assert_ne!(a, b1); + + // b2 reference the same data as b1. + + let mut b2 = b1.clone(); + b2.update(UpdateMatchedLogId { + target: 2, + matched: LogId::new(LeaderId::new(1, 2), 9), + }); + assert_eq!("{ver:3, LeaderMetrics{1:1-2-5, 2:1-2-9}}", b1.summary()); + assert_eq!("{ver:4, LeaderMetrics{1:1-2-5, 2:1-2-9}}", b2.summary()); + assert_ne!(b1, b2); + + // Two Versioned are equal only when they reference the same data and have the same version. + + b1.update(UpdateMatchedLogId { + target: 2, + matched: LogId::new(LeaderId::new(1, 2), 9), + }); + assert_eq!("{ver:4, LeaderMetrics{1:1-2-5, 2:1-2-9}}", b1.summary()); + assert_eq!("{ver:4, LeaderMetrics{1:1-2-5, 2:1-2-9}}", b2.summary()); + assert_eq!(b1, b2); + + Ok(()) +} + +#[test] +fn test_versioned_methods() -> anyhow::Result<()> { + let mut a = Versioned::new(LeaderMetrics:: { + replication: Default::default(), + }); + + a.update(UpdateMatchedLogId { + target: 1, + matched: LogId::new(LeaderId::new(1, 2), 3), + }); + + assert_eq!("{ver:1, LeaderMetrics{1:1-2-3}}", a.summary()); + + assert_eq!(1, a.version()); + assert_eq!("LeaderMetrics{1:1-2-3}", a.data().summary()); + + Ok(()) +} diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index 511d85879..f740dfbe2 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -4,6 +4,7 @@ mod config; mod core; mod defensive; +mod leader_metrics; mod membership; mod node; mod raft_types; @@ -20,7 +21,10 @@ pub mod network; pub mod raft; pub mod storage; pub mod testing; +pub mod versioned; +#[cfg(test)] +mod leader_metrics_test; #[cfg(test)] mod metrics_wait_test; diff --git a/openraft/src/metrics.rs b/openraft/src/metrics.rs index c4f2d913a..cccee5f95 100644 --- a/openraft/src/metrics.rs +++ b/openraft/src/metrics.rs @@ -8,7 +8,6 @@ //! return a stream of metrics. use std::collections::BTreeSet; -use std::collections::HashMap; use std::sync::Arc; use serde::Deserialize; @@ -21,12 +20,13 @@ use tokio::time::Instant; use crate::core::EffectiveMembership; use crate::core::State; use crate::error::Fatal; +use crate::leader_metrics::LeaderMetrics; use crate::raft_types::LogIdOptionExt; +use crate::versioned::Versioned; use crate::LogId; use crate::Membership; use crate::MessageSummary; use crate::RaftTypeConfig; -use crate::ReplicationMetrics; /// A set of metrics describing the current state of a Raft node. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] @@ -53,7 +53,7 @@ pub struct RaftMetrics { pub snapshot: Option>, /// The metrics about the leader. It is Some() only when this node is leader. - pub leader_metrics: Option>>, + pub leader_metrics: Option>>, } impl MessageSummary for RaftMetrics { @@ -72,28 +72,6 @@ impl MessageSummary for RaftMetrics { } } -/// The metrics about the leader. It is Some() only when this node is leader. -#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] -pub struct LeaderMetrics { - /// Replication metrics of all known replication target: voters and learners - pub replication: HashMap>, -} - -impl MessageSummary for LeaderMetrics { - fn summary(&self) -> String { - let mut res = vec!["LeaderMetrics{".to_string()]; - for (i, (k, v)) in self.replication.iter().enumerate() { - if i > 0 { - res.push(",".to_string()); - } - res.push(format!("{}:{}", k, v.summary())); - } - - res.push("}".to_string()); - res.join("") - } -} - impl RaftMetrics { pub(crate) fn new_initial(id: C::NodeId) -> Self { let membership_config = Membership::new_initial(id); diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index a4d45884b..69fb8c0e4 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -93,7 +93,7 @@ impl ReplicationMetrics { impl MessageSummary for ReplicationMetrics { fn summary(&self) -> String { - format!("{:?}", self.matched()) + format!("{}", self.matched()) } } diff --git a/openraft/src/versioned/mod.rs b/openraft/src/versioned/mod.rs new file mode 100644 index 000000000..f120782b2 --- /dev/null +++ b/openraft/src/versioned/mod.rs @@ -0,0 +1,7 @@ +#[allow(clippy::module_inception)] +mod versioned; + +pub use versioned::Updatable; +pub use versioned::Update; +pub use versioned::UpdateError; +pub use versioned::Versioned; diff --git a/openraft/src/versioned/versioned.rs b/openraft/src/versioned/versioned.rs new file mode 100644 index 000000000..b4e3de256 --- /dev/null +++ b/openraft/src/versioned/versioned.rs @@ -0,0 +1,134 @@ +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use serde::Deserialize; +use serde::Serialize; + +use crate::MessageSummary; + +/// Track data change by version. +/// +/// For variable with interior mutability, such as those contain `Arc>` or `AtomicU64`, +/// a `version` is used to track changes. +/// +/// Every update that is made to it increments the `version` by 1. +/// The inner `data` is an `Arc` thus `Clone` would be cheap enough. +/// +/// **Caveat**: an instance will see changes made on another clone, since they reference the same data, until an +/// update-by-replace is made. +#[derive(Clone, Serialize, Deserialize)] +pub struct Versioned +where Data: Clone +{ + /// A version number that indicates every change to `data`. + version: u64, + + /// The data. + /// + /// It may change in place(for `AtomicU64` fields) or be replace as a whole. + /// The `version` should be incremented in any case. + data: Arc, +} + +impl Versioned +where Data: Clone +{ + pub fn new(d: Data) -> Self { + Self { + version: 0, + data: Arc::new(d), + } + } + + pub fn version(&self) -> u64 { + self.version + } + + pub fn data(&self) -> &Arc { + &self.data + } +} + +impl PartialEq for Versioned +where Data: Clone +{ + fn eq(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.data, &other.data) && self.version == other.version + } +} + +impl Eq for Versioned where Data: Clone {} + +impl Debug for Versioned +where Data: Clone + Debug +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Versioned").field("version", &self.version).field("data", &self.data).finish() + } +} + +impl MessageSummary for Versioned +where Data: Clone + MessageSummary +{ + fn summary(&self) -> String { + format!("{{ver:{}, {}}}", self.version, self.data.summary()) + } +} + +#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)] +pub enum UpdateError { + #[error("can not update in place")] + CanNotUpdateInPlace, +} + +/// Defines an update operation that can be applied to `Versioned` +/// +/// The update try to update `Versioned.data` in place if possible. +/// If in place update can not be done, it then update it by replacing `Versioned.data` with a new cloned instance. +/// +/// A user needs to implement two update methods: `update_in_place()` and `update_mut()`. +pub trait Update +where Data: Clone +{ + /// Try to apply an update `Versioned.data` in place if possible and return. + /// Or replace `Versioned.data` with a new one. + /// Finally it increments the `Versioned.version` by 1 to indicate the data has changed. + fn apply(&self, to: &mut Versioned) { + let res = self.apply_in_place(&to.data); + + if res.is_err() { + let x = Arc::make_mut(&mut to.data); + self.apply_mut(x); + } + + to.version += 1; + } + + /// Apply the update to the `Versioned.data` in place if possible. + /// + /// If it can not be done, it should return an error to inform it to update the `Versioned.data` by replacing it. + fn apply_in_place(&self, to: &Arc) -> Result<(), UpdateError>; + + /// Apply the update a cloned new instance of `Versioned.data`. + /// + /// After updating it, `to` will replace the `data` in [`Versioned`]. + fn apply_mut(&self, to: &mut Data); +} + +/// An object that can be updated by calling `self.update(arg)` +pub trait Updatable { + fn update(&mut self, arg: U); +} + +/// If `U` is an `Update`, i.e. `U.apply(Versioned)`, +/// then install a method `update()` to `Versioned`: `Versioned.update(U)`. +impl Updatable for Versioned +where + Data: Clone, + U: Update, +{ + fn update(&mut self, update: U) { + update.apply(self) + } +} diff --git a/openraft/tests/append_entries/t90_issue_216_stale_last_log_id.rs b/openraft/tests/append_entries/t90_issue_216_stale_last_log_id.rs index 509986d96..9e613c637 100644 --- a/openraft/tests/append_entries/t90_issue_216_stale_last_log_id.rs +++ b/openraft/tests/append_entries/t90_issue_216_stale_last_log_id.rs @@ -53,10 +53,10 @@ async fn stale_last_log_id() -> Result<()> { log_index += n_ops as u64; } - router.wait(&1, Some(Duration::from_millis(500))).await?.log(Some(log_index), "").await?; - router.wait(&2, Some(Duration::from_millis(500))).await?.log(Some(log_index), "").await?; - router.wait(&3, Some(Duration::from_millis(500))).await?.log(Some(log_index), "").await?; - router.wait(&4, Some(Duration::from_millis(500))).await?.log(Some(log_index), "").await?; + router.wait(&1, Some(Duration::from_millis(1000))).await?.log(Some(log_index), "").await?; + router.wait(&2, Some(Duration::from_millis(1000))).await?.log(Some(log_index), "").await?; + router.wait(&3, Some(Duration::from_millis(1000))).await?.log(Some(log_index), "").await?; + router.wait(&4, Some(Duration::from_millis(1000))).await?.log(Some(log_index), "").await?; Ok(()) } diff --git a/openraft/tests/metrics/t30_leader_metrics.rs b/openraft/tests/metrics/t30_leader_metrics.rs index 9fc7ec173..927bc8afd 100644 --- a/openraft/tests/metrics/t30_leader_metrics.rs +++ b/openraft/tests/metrics/t30_leader_metrics.rs @@ -3,8 +3,8 @@ use std::time::Duration; use anyhow::Result; use futures::stream::StreamExt; +use maplit::btreemap; use maplit::btreeset; -use maplit::hashmap; use openraft::raft::VoteRequest; use openraft::Config; use openraft::LeaderId; @@ -68,7 +68,7 @@ async fn leader_metrics() -> Result<()> { &0, |x| { if let Some(ref q) = x.leader_metrics { - q.replication.is_empty() + q.data().replication.is_empty() } else { false } @@ -110,13 +110,13 @@ async fn leader_metrics() -> Result<()> { router.assert_stable_cluster(Some(1), Some(log_index)).await; // Still in term 1, so leader is still node 0. let ww = ReplicationMetrics::new(LogId::new(LeaderId::new(1, 0), log_index)); - let want_repl = hashmap! { 1=>ww.clone(), 2=>ww.clone(), 3=>ww.clone(), 4=>ww.clone(), }; + let want_repl = btreemap! { 1=>ww.clone(), 2=>ww.clone(), 3=>ww.clone(), 4=>ww.clone(), }; router .wait_for_metrics( &0, |x| { if let Some(ref q) = x.leader_metrics { - q.replication == want_repl + q.data().replication == want_repl } else { false } @@ -159,13 +159,13 @@ async fn leader_metrics() -> Result<()> { tracing::info!("--- replication metrics should reflect the replication state"); { let ww = ReplicationMetrics::new(LogId::new(LeaderId::new(1, 0), log_index)); - let want_repl = hashmap! { 1=>ww.clone(), 2=>ww.clone(), 3=>ww.clone()}; + let want_repl = btreemap! { 1=>ww.clone(), 2=>ww.clone(), 3=>ww.clone()}; router .wait_for_metrics( &0, |x| { if let Some(ref q) = x.leader_metrics { - q.replication == want_repl + q.data().replication == want_repl } else { false }