Skip to content

Commit

Permalink
Merge pull request #237 from drmingdrmer/simpl-node-id
Browse files Browse the repository at this point in the history
Refactor: replace generic type RaftTypeConfig with NodeId if possible
  • Loading branch information
drmingdrmer authored Mar 5, 2022
2 parents 88a5a60 + c4e5c06 commit 0951160
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 25 deletions.
4 changes: 2 additions & 2 deletions openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,

/// Report a metrics payload on the current state of the Raft node.
#[tracing::instrument(level = "trace", skip(self))]
fn report_metrics(&mut self, leader_metrics: Update<Option<&Versioned<LeaderMetrics<C>>>>) {
fn report_metrics(&mut self, leader_metrics: Update<Option<&Versioned<LeaderMetrics<C::NodeId>>>>) {
let leader_metrics = match leader_metrics {
Update::Update(v) => v.cloned(),
Update::AsIs => self.tx_metrics.borrow().leader_metrics.clone(),
Expand Down Expand Up @@ -752,7 +752,7 @@ struct LeaderState<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStora
pub(super) nodes: BTreeMap<C::NodeId, ReplicationState<C>>,

/// The metrics about a leader
pub leader_metrics: Versioned<LeaderMetrics<C>>,
pub leader_metrics: Versioned<LeaderMetrics<C::NodeId>>,

/// The stream of events coming from replication streams.
pub(super) replication_rx: mpsc::UnboundedReceiver<(ReplicaEvent<C, S::SnapshotData>, Span)>,
Expand Down
20 changes: 10 additions & 10 deletions openraft/src/leader_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ use crate::versioned::UpdateError;
use crate::LogId;
use crate::MessageSummary;
use crate::NodeId;
use crate::RaftTypeConfig;
use crate::ReplicationMetrics;

/// The metrics about the leader. It is Some() only when this node is leader.
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct LeaderMetrics<C: RaftTypeConfig> {
#[serde(bound = "")]
pub struct LeaderMetrics<NID: NodeId> {
/// Replication metrics of all known replication target: voters and learners
pub replication: BTreeMap<C::NodeId, ReplicationMetrics<C>>,
pub replication: BTreeMap<NID, ReplicationMetrics<NID>>,
}

impl<C: RaftTypeConfig> MessageSummary for LeaderMetrics<C> {
impl<NID: NodeId> MessageSummary for LeaderMetrics<NID> {
fn summary(&self) -> String {
let mut res = vec!["LeaderMetrics{".to_string()];
for (i, (k, v)) in self.replication.iter().enumerate() {
Expand All @@ -42,9 +42,9 @@ pub struct UpdateMatchedLogId<NID: NodeId> {
pub matched: LogId<NID>,
}

impl<C: RaftTypeConfig> Update<LeaderMetrics<C>> for UpdateMatchedLogId<C::NodeId> {
impl<NID: NodeId> Update<LeaderMetrics<NID>> for UpdateMatchedLogId<NID> {
/// If there is already a record for the target node. Just modify the atomic u64.
fn apply_in_place(&self, to: &Arc<LeaderMetrics<C>>) -> Result<(), UpdateError> {
fn apply_in_place(&self, to: &Arc<LeaderMetrics<NID>>) -> Result<(), UpdateError> {
let target_metrics = to.replication.get(&self.target).ok_or(UpdateError::CanNotUpdateInPlace)?;

if target_metrics.matched_leader_id == self.matched.leader_id {
Expand All @@ -56,7 +56,7 @@ impl<C: RaftTypeConfig> Update<LeaderMetrics<C>> for UpdateMatchedLogId<C::NodeI
}

/// To insert a new record always work.
fn apply_mut(&self, to: &mut LeaderMetrics<C>) {
fn apply_mut(&self, to: &mut LeaderMetrics<NID>) {
to.replication.insert(self.target, ReplicationMetrics {
matched_leader_id: self.matched.leader_id,
matched_index: AtomicU64::new(self.matched.index),
Expand All @@ -69,13 +69,13 @@ pub struct RemoveTarget<NID: NodeId> {
pub target: NID,
}

impl<C: RaftTypeConfig> Update<LeaderMetrics<C>> for RemoveTarget<C::NodeId> {
impl<NID: NodeId> Update<LeaderMetrics<NID>> for RemoveTarget<NID> {
/// Removing can not be done in place
fn apply_in_place(&self, _to: &Arc<LeaderMetrics<C>>) -> Result<(), UpdateError> {
fn apply_in_place(&self, _to: &Arc<LeaderMetrics<NID>>) -> Result<(), UpdateError> {
Err(UpdateError::CanNotUpdateInPlace)
}

fn apply_mut(&self, to: &mut LeaderMetrics<C>) {
fn apply_mut(&self, to: &mut LeaderMetrics<NID>) {
to.replication.remove(&self.target);
}
}
5 changes: 2 additions & 3 deletions openraft/src/leader_metrics_test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::leader_metrics::LeaderMetrics;
use crate::leader_metrics::UpdateMatchedLogId;
use crate::testing::DummyConfig;
use crate::versioned::Updatable;
use crate::versioned::Versioned;
use crate::LeaderId;
Expand All @@ -9,7 +8,7 @@ use crate::MessageSummary;

#[test]
fn test_versioned() -> anyhow::Result<()> {
let mut a = Versioned::new(LeaderMetrics::<DummyConfig> {
let mut a = Versioned::new(LeaderMetrics::<u64> {
replication: Default::default(),
});

Expand Down Expand Up @@ -86,7 +85,7 @@ fn test_versioned() -> anyhow::Result<()> {

#[test]
fn test_versioned_methods() -> anyhow::Result<()> {
let mut a = Versioned::new(LeaderMetrics::<DummyConfig> {
let mut a = Versioned::new(LeaderMetrics::<u64> {
replication: Default::default(),
});

Expand Down
2 changes: 1 addition & 1 deletion openraft/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub struct RaftMetrics<C: RaftTypeConfig> {
pub snapshot: Option<LogId<C::NodeId>>,

/// The metrics about the leader. It is Some() only when this node is leader.
pub leader_metrics: Option<Versioned<LeaderMetrics<C>>>,
pub leader_metrics: Option<Versioned<LeaderMetrics<C::NodeId>>>,
}

impl<C: RaftTypeConfig> MessageSummary for RaftMetrics<C> {
Expand Down
20 changes: 11 additions & 9 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use crate::LeaderId;
use crate::LogId;
use crate::MessageSummary;
use crate::Node;
use crate::NodeId;
use crate::RPCTypes;
use crate::RaftNetwork;
use crate::RaftNetworkFactory;
Expand All @@ -52,12 +53,13 @@ use crate::ToStorageResult;
use crate::Vote;

#[derive(Default, Debug, Serialize, Deserialize)]
pub struct ReplicationMetrics<C: RaftTypeConfig> {
pub(crate) matched_leader_id: LeaderId<C::NodeId>,
#[serde(bound = "")]
pub struct ReplicationMetrics<NID: NodeId> {
pub(crate) matched_leader_id: LeaderId<NID>,
pub(crate) matched_index: AtomicU64,
}

impl<C: RaftTypeConfig> Clone for ReplicationMetrics<C> {
impl<NID: NodeId> Clone for ReplicationMetrics<NID> {
fn clone(&self) -> Self {
Self {
matched_leader_id: self.matched_leader_id,
Expand All @@ -66,24 +68,24 @@ impl<C: RaftTypeConfig> Clone for ReplicationMetrics<C> {
}
}

impl<C: RaftTypeConfig> PartialEq for ReplicationMetrics<C> {
impl<NID: NodeId> PartialEq for ReplicationMetrics<NID> {
fn eq(&self, other: &Self) -> bool {
self.matched_leader_id == other.matched_leader_id
&& self.matched_index.load(Ordering::Relaxed) == other.matched_index.load(Ordering::Relaxed)
}
}

impl<C: RaftTypeConfig> Eq for ReplicationMetrics<C> {}
impl<NID: NodeId> Eq for ReplicationMetrics<NID> {}

impl<C: RaftTypeConfig> ReplicationMetrics<C> {
pub fn new(log_id: LogId<C::NodeId>) -> Self {
impl<NID: NodeId> ReplicationMetrics<NID> {
pub fn new(log_id: LogId<NID>) -> Self {
Self {
matched_leader_id: log_id.leader_id,
matched_index: AtomicU64::new(log_id.index),
}
}

pub fn matched(&self) -> LogId<C::NodeId> {
pub fn matched(&self) -> LogId<NID> {
let index = self.matched_index.load(Ordering::Relaxed);
LogId {
leader_id: self.matched_leader_id,
Expand All @@ -92,7 +94,7 @@ impl<C: RaftTypeConfig> ReplicationMetrics<C> {
}
}

impl<C: RaftTypeConfig> MessageSummary for ReplicationMetrics<C> {
impl<NID: NodeId> MessageSummary for ReplicationMetrics<NID> {
fn summary(&self) -> String {
format!("{}", self.matched())
}
Expand Down

0 comments on commit 0951160

Please sign in to comment.