diff --git a/narwhal/primary/src/consensus/leader_schedule.rs b/narwhal/primary/src/consensus/leader_schedule.rs new file mode 100644 index 0000000000000..035032437a25e --- /dev/null +++ b/narwhal/primary/src/consensus/leader_schedule.rs @@ -0,0 +1,290 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{ + collections::HashMap, + fmt::{Debug, Formatter}, + sync::Arc, +}; + +use config::{Authority, AuthorityIdentifier, Committee, Stake}; +use parking_lot::RwLock; +use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng}; +use storage::ConsensusStore; +use sui_protocol_config::ProtocolConfig; +use tracing::{debug, trace}; +use types::{Certificate, ReputationScores, Round}; + +use super::Dag; + +#[cfg(test)] +#[path = "tests/leader_schedule_tests.rs"] +mod leader_schedule_tests; + +#[derive(Default, Clone)] +pub struct LeaderSwapTable { + /// The round on which the leader swap table get into effect. + round: Round, + /// The list of `f` (by stake) authorities with best scores as those defined by the provided `ReputationScores`. + /// Those authorities will be used in the position of the `bad_nodes` on the final leader schedule. + good_nodes: Vec, + /// The set of `f` (by stake) authorities with the worst scores as those defined by the provided `ReputationScores`. + /// Every time where such authority is elected as leader on the schedule, it will swapped by one + /// of the authorities of the `good_nodes`. + bad_nodes: HashMap, +} + +impl Debug for LeaderSwapTable { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str(&format!( + "LeaderSwapTable round:{}, good_nodes:{:?} with stake:{}, bad_nodes:{:?} with stake:{}", + self.round, + self.good_nodes + .iter() + .map(|a| a.id()) + .collect::>(), + self.good_nodes.iter().map(|a| a.stake()).sum::(), + self.bad_nodes + .iter() + .map(|a| *a.0) + .collect::>(), + self.bad_nodes.iter().map(|a| a.1.stake()).sum::(), + )) + } +} + +impl LeaderSwapTable { + // constructs a new table based on the provided reputation scores. The `bad_nodes_stake_threshold` designates the + // total (by stake) nodes that will be considered as "bad" based on their scores and will be replaced by good nodes. + // The `bad_nodes_stake_threshold` should be in the range of [0 - 33]. + pub fn new( + committee: &Committee, + round: Round, + reputation_scores: &ReputationScores, + bad_nodes_stake_threshold: u64, + ) -> Self { + assert!((0..=33).contains(&bad_nodes_stake_threshold), "The bad_nodes_stake_threshold should be in range [0 - 33], out of bounds parameter detected"); + assert!(reputation_scores.final_of_schedule, "Only reputation scores that have been calculated on the end of a schedule are accepted"); + + // calculating the good nodes + let good_nodes = Self::retrieve_first_nodes( + committee, + reputation_scores.authorities_by_score_desc().into_iter(), + bad_nodes_stake_threshold, + ); + + // calculating the bad nodes + // we revert the sorted authorities to score ascending so we get the first low scorers + // up to the dictated stake threshold. + let bad_nodes = Self::retrieve_first_nodes( + committee, + reputation_scores + .authorities_by_score_desc() + .into_iter() + .rev(), + bad_nodes_stake_threshold, + ) + .into_iter() + .map(|authority| (authority.id(), authority)) + .collect::>(); + + good_nodes.iter().for_each(|good_node| { + debug!( + "Good node on round {}: {} -> {}", + round, + good_node.hostname(), + reputation_scores + .scores_per_authority + .get(&good_node.id()) + .unwrap() + ); + }); + + bad_nodes.iter().for_each(|(_id, bad_node)| { + debug!( + "Bad node on round {}: {} -> {}", + round, + bad_node.hostname(), + reputation_scores + .scores_per_authority + .get(&bad_node.id()) + .unwrap() + ); + }); + + debug!("Reputation scores on round {round}: {reputation_scores:?}"); + + Self { + round, + good_nodes, + bad_nodes, + } + } + + /// Checks whether the provided leader is a bad performer and needs to be swapped in the schedule + /// with a good performer. If not, then the method returns None. Otherwise the leader to swap with + /// is returned instead. The `leader_round` represents the DAG round on which the provided AuthorityIdentifier + /// is a leader on and is used as a seed to random function in order to calculate the good node that + /// will swap in that round with the bad node. We are intentionally not doing weighted randomness as + /// we want to give to all the good nodes equal opportunity to get swapped with bad nodes and not + /// have one node with enough stake end up swapping bad nodes more frequently than the others on + /// the final schedule. + pub fn swap(&self, leader: &AuthorityIdentifier, leader_round: Round) -> Option { + if self.bad_nodes.contains_key(leader) { + let mut seed_bytes = [0u8; 32]; + seed_bytes[32 - 8..].copy_from_slice(&leader_round.to_le_bytes()); + let mut rng = StdRng::from_seed(seed_bytes); + + let good_node = self + .good_nodes + .choose(&mut rng) + .expect("There should be at least one good node available"); + + trace!( + "Swapping bad leader {} -> {} for round {}", + leader, + good_node.id(), + leader_round + ); + + return Some(good_node.to_owned()); + } + None + } + + // Retrieves the first nodes provided by the iterator `authorities` until the `stake_threshold` has been + // reached. The `stake_threshold` should be between [0, 100] and expresses the percentage of stake that is + // considered the cutoff. Basically we keep adding to the response authorities until the sum of the stake + // reaches the `stake_threshold`. It's the caller's responsibility to ensure that the elements of the `authorities` + // input is already sorted. + fn retrieve_first_nodes( + committee: &Committee, + authorities: impl Iterator, + stake_threshold: u64, + ) -> Vec { + let mut filtered_authorities = Vec::new(); + + let mut stake = 0; + for (authority_id, _score) in authorities { + stake += committee.stake_by_id(authority_id); + + // if the total accumulated stake has surpassed the stake threshold then we omit this + // last authority and we exit the loop. + if stake > (stake_threshold * committee.total_stake()) / 100 as Stake { + break; + } + filtered_authorities.push(committee.authority_safe(&authority_id).to_owned()); + } + + filtered_authorities + } +} + +/// The LeaderSchedule is responsible for producing the leader schedule across an epoch. It provides +/// methods to derive the leader of a round based on the provided leader swap table. This struct can +/// be cloned and shared freely as the internal parts are atomically updated. +#[derive(Clone)] +pub struct LeaderSchedule { + pub committee: Committee, + pub leader_swap_table: Arc>, +} + +impl LeaderSchedule { + pub fn new(committee: Committee, table: LeaderSwapTable) -> Self { + Self { + committee, + leader_swap_table: Arc::new(RwLock::new(table)), + } + } + + /// Restores the LeaderSchedule by using the storage. It will attempt to retrieve the last committed + /// "final" ReputationScores and use them to create build a LeaderSwapTable to use for the LeaderSchedule. + pub fn from_store( + committee: Committee, + store: Arc, + protocol_config: ProtocolConfig, + ) -> Self { + let table = store + .read_latest_commit_with_final_reputation_scores() + .map_or(LeaderSwapTable::default(), |commit| { + LeaderSwapTable::new( + &committee, + commit.leader_round(), + &commit.reputation_score(), + protocol_config.consensus_bad_nodes_stake_threshold(), + ) + }); + // create the schedule + Self::new(committee, table) + } + + /// Atomically updates the leader swap table with the new provided one. Any leader queried from + /// now on will get calculated according to this swap table until a new one is provided again. + pub fn update_leader_swap_table(&self, table: LeaderSwapTable) { + trace!("Updating swap table {:?}", table); + + let mut write = self.leader_swap_table.write(); + *write = table; + } + + /// Returns the leader for the provided round. Keep in mind that this method will return a leader + /// according to the provided LeaderSwapTable. Providing a different table can potentially produce + /// a different leader for the same round. + pub fn leader(&self, round: Round) -> Authority { + assert_eq!( + round % 2, + 0, + "We should never attempt to do a leader election for odd rounds" + ); + + // TODO: split the leader election logic for testing from the production code. + cfg_if::cfg_if! { + if #[cfg(test)] { + // We apply round robin in leader election. Since we expect round to be an even number, + // 2, 4, 6, 8... it can't work well for leader election as we'll omit leaders. Thus + // we can always divide by 2 to get a monotonically incremented sequence, + // 2/2 = 1, 4/2 = 2, 6/2 = 3, 8/2 = 4 etc, and then do minus 1 so we can always + // start with base zero 0. + let next_leader = (round/2 + self.committee.size() as u64 - 1) as usize % self.committee.size(); + let authorities = self.committee.authorities().collect::>(); + + let leader: Authority = (*authorities.get(next_leader).unwrap()).clone(); + let table = self.leader_swap_table.read(); + + table.swap(&leader.id(), round).unwrap_or(leader) + } else { + // Elect the leader in a stake-weighted choice seeded by the round + let leader = self.committee.leader(round); + + let table = self.leader_swap_table.read(); + table.swap(&leader.id(), round).unwrap_or(leader) + } + } + } + + /// Returns the certificate originated by the leader of the specified round (if any). The Authority + /// leader of the round is always returned and that's irrespective of whether the certificate exists + /// as that's deterministically determined. The provided `leader_swap_table` is being used to determine + /// any overrides that need to be performed to the original schedule. + pub fn leader_certificate<'a>( + &self, + round: Round, + dag: &'a Dag, + ) -> (Authority, Option<&'a Certificate>) { + // Note: this function is often called with even rounds only. While we do not aim at random selection + // yet (see issue https://github.com/MystenLabs/sui/issues/5182), repeated calls to this function + // should still pick from the whole roster of leaders. + let leader = self.leader(round); + + // Return its certificate and the certificate's digest. + match dag.get(&round).and_then(|x| x.get(&leader.id())) { + None => (leader, None), + Some((_, certificate)) => (leader, Some(certificate)), + } + } + + pub fn num_of_bad_nodes(&self) -> usize { + let read = self.leader_swap_table.read(); + read.bad_nodes.len() + } +} diff --git a/narwhal/primary/src/consensus/mod.rs b/narwhal/primary/src/consensus/mod.rs index 5121310e51815..4cbc12bfad4a4 100644 --- a/narwhal/primary/src/consensus/mod.rs +++ b/narwhal/primary/src/consensus/mod.rs @@ -5,6 +5,7 @@ mod bullshark; #[cfg(test)] #[path = "tests/consensus_utils.rs"] mod consensus_utils; +mod leader_schedule; mod metrics; mod state; mod utils; @@ -14,10 +15,9 @@ pub use crate::consensus::bullshark::Bullshark; use crate::consensus::consensus_utils::{ make_certificate_store, make_consensus_store, NUM_SUB_DAGS_PER_SCHEDULE, }; +pub use crate::consensus::leader_schedule::{LeaderSchedule, LeaderSwapTable}; pub use crate::consensus::metrics::{ChannelMetrics, ConsensusMetrics}; -pub use crate::consensus::state::{ - Consensus, ConsensusRound, ConsensusState, Dag, LeaderSchedule, LeaderSwapTable, -}; +pub use crate::consensus::state::{Consensus, ConsensusRound, ConsensusState, Dag}; pub use crate::consensus::utils::gc_round; use store::StoreError; diff --git a/narwhal/primary/src/consensus/state.rs b/narwhal/primary/src/consensus/state.rs index afcd7db6d3f22..91f7facc5a8c5 100644 --- a/narwhal/primary/src/consensus/state.rs +++ b/narwhal/primary/src/consensus/state.rs @@ -7,311 +7,31 @@ use crate::consensus::bullshark::Bullshark; use crate::consensus::utils::gc_round; use crate::consensus::{metrics::ConsensusMetrics, ConsensusError}; -use config::{Authority, AuthorityIdentifier, Committee, Stake}; +use config::{AuthorityIdentifier, Committee}; use fastcrypto::hash::Hash; use mysten_metrics::metered_channel; use mysten_metrics::spawn_logged_monitored_task; -use parking_lot::RwLock; -use rand::prelude::SliceRandom; -use rand::rngs::StdRng; -use rand::SeedableRng; -use std::fmt::{Debug, Formatter}; +use std::fmt::Debug; use std::{ cmp::{max, Ordering}, collections::{BTreeMap, BTreeSet, HashMap}, sync::Arc, }; use storage::{CertificateStore, ConsensusStore}; -use sui_protocol_config::ProtocolConfig; use tokio::{sync::watch, task::JoinHandle}; -use tracing::{debug, info, instrument, trace}; +use tracing::{debug, info, instrument}; use types::{ Certificate, CertificateAPI, CertificateDigest, CommittedSubDag, ConditionalBroadcastReceiver, - ConsensusCommit, HeaderAPI, ReputationScores, Round, SequenceNumber, Timestamp, + ConsensusCommit, HeaderAPI, Round, SequenceNumber, Timestamp, }; #[cfg(test)] #[path = "tests/consensus_tests.rs"] -pub mod consensus_tests; +mod consensus_tests; /// The representation of the DAG in memory. pub type Dag = BTreeMap>; -#[derive(Default, Clone)] -pub struct LeaderSwapTable { - /// The round on which the leader swap table get into effect. - round: Round, - /// The list of `f` (by stake) authorities with best scores as those defined by the provided `ReputationScores`. - /// Those authorities will be used in the position of the `bad_nodes` on the final leader schedule. - good_nodes: Vec, - /// The set of `f` (by stake) authorities with the worst scores as those defined by the provided `ReputationScores`. - /// Every time where such authority is elected as leader on the schedule, it will swapped by one - /// of the authorities of the `good_nodes`. - bad_nodes: HashMap, -} - -impl Debug for LeaderSwapTable { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.write_str(&format!( - "LeaderSwapTable round:{}, good_nodes:{:?} with stake:{}, bad_nodes:{:?} with stake:{}", - self.round, - self.good_nodes - .iter() - .map(|a| a.id()) - .collect::>(), - self.good_nodes.iter().map(|a| a.stake()).sum::(), - self.bad_nodes - .iter() - .map(|a| *a.0) - .collect::>(), - self.bad_nodes.iter().map(|a| a.1.stake()).sum::(), - )) - } -} - -impl LeaderSwapTable { - // constructs a new table based on the provided reputation scores. The `bad_nodes_stake_threshold` designates the - // total (by stake) nodes that will be considered as "bad" based on their scores and will be replaced by good nodes. - // The `bad_nodes_stake_threshold` should be in the range of [0 - 33]. - pub fn new( - committee: &Committee, - round: Round, - reputation_scores: &ReputationScores, - bad_nodes_stake_threshold: u64, - ) -> Self { - assert!((0..=33).contains(&bad_nodes_stake_threshold), "The bad_nodes_stake_threshold should be in range [0 - 33], out of bounds parameter detected"); - assert!(reputation_scores.final_of_schedule, "Only reputation scores that have been calculated on the end of a schedule are accepted"); - - // calculating the good nodes - let good_nodes = Self::retrieve_first_nodes( - committee, - reputation_scores.authorities_by_score_desc().into_iter(), - bad_nodes_stake_threshold, - ); - - // calculating the bad nodes - // we revert the sorted authorities to score ascending so we get the first low scorers - // up to the dictated stake threshold. - let bad_nodes = Self::retrieve_first_nodes( - committee, - reputation_scores - .authorities_by_score_desc() - .into_iter() - .rev(), - bad_nodes_stake_threshold, - ) - .into_iter() - .map(|authority| (authority.id(), authority)) - .collect::>(); - - good_nodes.iter().for_each(|good_node| { - debug!( - "Good node on round {}: {} -> {}", - round, - good_node.hostname(), - reputation_scores - .scores_per_authority - .get(&good_node.id()) - .unwrap() - ); - }); - - bad_nodes.iter().for_each(|(_id, bad_node)| { - debug!( - "Bad node on round {}: {} -> {}", - round, - bad_node.hostname(), - reputation_scores - .scores_per_authority - .get(&bad_node.id()) - .unwrap() - ); - }); - - debug!("Reputation scores on round {round}: {reputation_scores:?}"); - - Self { - round, - good_nodes, - bad_nodes, - } - } - - /// Checks whether the provided leader is a bad performer and needs to be swapped in the schedule - /// with a good performer. If not, then the method returns None. Otherwise the leader to swap with - /// is returned instead. The `leader_round` represents the DAG round on which the provided AuthorityIdentifier - /// is a leader on and is used as a seed to random function in order to calculate the good node that - /// will swap in that round with the bad node. We are intentionally not doing weighted randomness as - /// we want to give to all the good nodes equal opportunity to get swapped with bad nodes and not - /// have one node with enough stake end up swapping bad nodes more frequently than the others on - /// the final schedule. - pub fn swap(&self, leader: &AuthorityIdentifier, leader_round: Round) -> Option { - if self.bad_nodes.contains_key(leader) { - let mut seed_bytes = [0u8; 32]; - seed_bytes[32 - 8..].copy_from_slice(&leader_round.to_le_bytes()); - let mut rng = StdRng::from_seed(seed_bytes); - - let good_node = self - .good_nodes - .choose(&mut rng) - .expect("There should be at least one good node available"); - - trace!( - "Swapping bad leader {} -> {} for round {}", - leader, - good_node.id(), - leader_round - ); - - return Some(good_node.to_owned()); - } - None - } - - // Retrieves the first nodes provided by the iterator `authorities` until the `stake_threshold` has been - // reached. The `stake_threshold` should be between [0, 100] and expresses the percentage of stake that is - // considered the cutoff. Basically we keep adding to the response authorities until the sum of the stake - // reaches the `stake_threshold`. It's the caller's responsibility to ensure that the elements of the `authorities` - // input is already sorted. - fn retrieve_first_nodes( - committee: &Committee, - authorities: impl Iterator, - stake_threshold: u64, - ) -> Vec { - let mut filtered_authorities = Vec::new(); - - let mut stake = 0; - for (authority_id, _score) in authorities { - stake += committee.stake_by_id(authority_id); - - // if the total accumulated stake has surpassed the stake threshold then we omit this - // last authority and we exit the loop. - if stake > (stake_threshold * committee.total_stake()) / 100 as Stake { - break; - } - filtered_authorities.push(committee.authority_safe(&authority_id).to_owned()); - } - - filtered_authorities - } -} - -/// The LeaderSchedule is responsible for producing the leader schedule across an epoch. It provides -/// methods to derive the leader of a round based on the provided leader swap table. This struct can -/// be cloned and shared freely as the internal parts are atomically updated. -#[derive(Clone)] -pub struct LeaderSchedule { - pub committee: Committee, - pub leader_swap_table: Arc>, -} - -impl LeaderSchedule { - pub fn new(committee: Committee, table: LeaderSwapTable) -> Self { - Self { - committee, - leader_swap_table: Arc::new(RwLock::new(table)), - } - } - - /// Restores the LeaderSchedule by using the storage. It will attempt to retrieve the last committed - /// "final" ReputationScores and use them to create build a LeaderSwapTable to use for the LeaderSchedule. - pub fn from_store( - committee: Committee, - store: Arc, - protocol_config: ProtocolConfig, - ) -> Self { - // Only try to restore when the new leader election schedule is enabled, otherwise fallback to - // default swap table, which basically means there will be no swaps. - let table = if protocol_config.narwhal_new_leader_election_schedule() { - store - .read_latest_commit_with_final_reputation_scores() - .map_or(LeaderSwapTable::default(), |commit| { - LeaderSwapTable::new( - &committee, - commit.leader_round(), - &commit.reputation_score(), - protocol_config.consensus_bad_nodes_stake_threshold(), - ) - }) - } else { - LeaderSwapTable::default() - }; - - // create the schedule - Self::new(committee, table) - } - - /// Atomically updates the leader swap table with the new provided one. Any leader queried from - /// now on will get calculated according to this swap table until a new one is provided again. - pub fn update_leader_swap_table(&self, table: LeaderSwapTable) { - trace!("Updating swap table {:?}", table); - - let mut write = self.leader_swap_table.write(); - *write = table; - } - - /// Returns the leader for the provided round. Keep in mind that this method will return a leader - /// according to the provided LeaderSwapTable. Providing a different table can potentially produce - /// a different leader for the same round. - pub fn leader(&self, round: Round) -> Authority { - assert_eq!( - round % 2, - 0, - "We should never attempt to do a leader election for odd rounds" - ); - - // TODO: split the leader election logic for testing from the production code. - cfg_if::cfg_if! { - if #[cfg(test)] { - // We apply round robin in leader election. Since we expect round to be an even number, - // 2, 4, 6, 8... it can't work well for leader election as we'll omit leaders. Thus - // we can always divide by 2 to get a monotonically incremented sequence, - // 2/2 = 1, 4/2 = 2, 6/2 = 3, 8/2 = 4 etc, and then do minus 1 so we can always - // start with base zero 0. - let next_leader = (round/2 + self.committee.size() as u64 - 1) as usize % self.committee.size(); - let authorities = self.committee.authorities().collect::>(); - - let leader: Authority = (*authorities.get(next_leader).unwrap()).clone(); - let table = self.leader_swap_table.read(); - - table.swap(&leader.id(), round).unwrap_or(leader) - } else { - // Elect the leader in a stake-weighted choice seeded by the round - let leader = self.committee.leader(round); - - let table = self.leader_swap_table.read(); - table.swap(&leader.id(), round).unwrap_or(leader) - } - } - } - - /// Returns the certificate originated by the leader of the specified round (if any). The Authority - /// leader of the round is always returned and that's irrespective of whether the certificate exists - /// as that's deterministically determined. The provided `leader_swap_table` is being used to determine - /// any overrides that need to be performed to the original schedule. - pub fn leader_certificate<'a>( - &self, - round: Round, - dag: &'a Dag, - ) -> (Authority, Option<&'a Certificate>) { - // Note: this function is often called with even rounds only. While we do not aim at random selection - // yet (see issue https://github.com/MystenLabs/sui/issues/5182), repeated calls to this function - // should still pick from the whole roster of leaders. - let leader = self.leader(round); - - // Return its certificate and the certificate's digest. - match dag.get(&round).and_then(|x| x.get(&leader.id())) { - None => (leader, None), - Some((_, certificate)) => (leader, Some(certificate)), - } - } - - pub fn num_of_bad_nodes(&self) -> usize { - let read = self.leader_swap_table.read(); - read.bad_nodes.len() - } -} - /// The state that needs to be persisted for crash-recovery. pub struct ConsensusState { /// The information about the last committed round and corresponding GC round. diff --git a/narwhal/primary/src/consensus/tests/consensus_tests.rs b/narwhal/primary/src/consensus/tests/consensus_tests.rs index 11e19abe9e7a6..3cb3ceff48baa 100644 --- a/narwhal/primary/src/consensus/tests/consensus_tests.rs +++ b/narwhal/primary/src/consensus/tests/consensus_tests.rs @@ -1,27 +1,22 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -#![allow(clippy::mutable_key_type)] +use std::{collections::BTreeSet, sync::Arc}; -use config::AuthorityIdentifier; use fastcrypto::hash::Hash; use prometheus::Registry; -use std::collections::{BTreeSet, HashMap}; -use std::num::NonZeroUsize; -use std::sync::Arc; -use storage::{ConsensusStore, NodeStorage}; +use storage::NodeStorage; use sui_protocol_config::ProtocolConfig; use telemetry_subscribers::TelemetryGuards; -use test_utils::{get_protocol_config, latest_protocol_version, mock_certificate}; +use test_utils::{get_protocol_config, latest_protocol_version}; use test_utils::{temp_dir, CommitteeFixture}; use tokio::sync::watch; use types::{ - Certificate, CertificateAPI, CommittedSubDag, HeaderAPI, PreSubscribedBroadcastSender, - ReputationScores, Round, + Certificate, CertificateAPI, HeaderAPI, PreSubscribedBroadcastSender, ReputationScores, }; use crate::consensus::{ - Bullshark, Consensus, ConsensusMetrics, ConsensusRound, Dag, LeaderSchedule, LeaderSwapTable, + Bullshark, Consensus, ConsensusMetrics, ConsensusRound, LeaderSchedule, LeaderSwapTable, }; use crate::NUM_SHUTDOWN_RECEIVERS; @@ -330,215 +325,6 @@ async fn test_consensus_recovery_with_bullshark_with_config(config: ProtocolConf ); } -#[tokio::test] -async fn test_leader_swap_table() { - // GIVEN - let fixture = CommitteeFixture::builder().build(); - let committee = fixture.committee(); - let mut protocol_config: ProtocolConfig = latest_protocol_version(); - protocol_config.set_consensus_bad_nodes_stake_threshold(33); - - // the authority ids - let authority_ids: Vec = fixture.authorities().map(|a| a.id()).collect(); - - // Adding some scores - let mut scores = ReputationScores::new(&committee); - scores.final_of_schedule = true; - for (score, id) in authority_ids.iter().enumerate() { - scores.add_score(*id, score as u64); - } - - let table = LeaderSwapTable::new( - &committee, - 2, - &scores, - protocol_config.consensus_bad_nodes_stake_threshold(), - ); - - // Only one bad authority should be calculated since all have equal stake - assert_eq!(table.bad_nodes.len(), 1); - - // now first three should be swapped, whereas the others should not return anything - for (index, id) in authority_ids.iter().enumerate() { - if index < 1 { - let s = table.swap(id, index as Round).unwrap(); - - // make sure that the returned node is amongst the good nodes - assert!(table.good_nodes.iter().any(|n| *n == s)); - } else { - assert!(table.swap(id, index as Round).is_none()); - } - } - - // Now we create a larger committee with more score variation - still all the authorities have - // equal stake. - let fixture = CommitteeFixture::builder() - .committee_size(NonZeroUsize::new(10).unwrap()) - .build(); - let committee = fixture.committee(); - - // the authority ids - let authority_ids: Vec = fixture.authorities().map(|a| a.id()).collect(); - - // Adding some scores - let mut scores = ReputationScores::new(&committee); - scores.final_of_schedule = true; - for (score, id) in authority_ids.iter().enumerate() { - scores.add_score(*id, score as u64); - } - - // We expect the first 3 authorities (f) to be amongst the bad nodes - let table = LeaderSwapTable::new( - &committee, - 2, - &scores, - protocol_config.consensus_bad_nodes_stake_threshold(), - ); - - assert_eq!(table.bad_nodes.len(), 3); - assert!(table.bad_nodes.contains_key(&authority_ids[0])); - assert!(table.bad_nodes.contains_key(&authority_ids[1])); - assert!(table.bad_nodes.contains_key(&authority_ids[2])); - - // now first three should be swapped, whereas the others should not return anything - for (index, id) in authority_ids.iter().enumerate() { - if index < 3 { - let s = table.swap(id, index as Round).unwrap(); - - // make sure that the returned node is amongst the good nodes - assert!(table.good_nodes.iter().any(|n| *n == s)); - } else { - assert!(table.swap(id, index as Round).is_none()); - } - } -} - -#[tokio::test] -async fn test_leader_schedule() { - // GIVEN - let fixture = CommitteeFixture::builder().build(); - let committee = fixture.committee(); - let mut protocol_config: ProtocolConfig = latest_protocol_version(); - protocol_config.set_consensus_bad_nodes_stake_threshold(33); - - // the authority ids - let authority_ids: Vec = fixture.authorities().map(|a| a.id()).collect(); - - // Create a leader schedule with a default swap table, so no authority will be swapped. - let schedule = LeaderSchedule::new(committee.clone(), LeaderSwapTable::default()); - - // Call the leader for round 2. It should give us the validator of position 0 - let original_leader = authority_ids[0]; - let leader_2 = schedule.leader(2); - - assert_eq!(leader_2.id(), original_leader); - - // Now update the scores to consider the authority of position 0 as slow - let mut scores = ReputationScores::new(&committee); - scores.final_of_schedule = true; - for (score, id) in authority_ids.iter().enumerate() { - scores.add_score(*id, score as u64); - } - - // Update the schedule - let table = LeaderSwapTable::new( - &committee, - 2, - &scores, - protocol_config.consensus_bad_nodes_stake_threshold(), - ); - schedule.update_leader_swap_table(table.clone()); - - // Now call the leader for round 2 again. It should be swapped with another node - let leader_2 = schedule.leader(2); - - // The returned leader should not be the one of position 0 - assert_ne!(leader_2.id(), original_leader); - - // The returned leader should be the one returned by the swap table when using the updated leader scores. - let swapped_leader = table.swap(&original_leader, 2).unwrap().id(); - assert_eq!(leader_2.id(), table.swap(&original_leader, 2).unwrap().id()); - - // Now create an empty DAG - let mut dag = Dag::new(); - - // Now try to retrieve the leader's certificate - let (leader_authority, leader_certificate) = schedule.leader_certificate(2, &dag); - assert_eq!(leader_authority.id(), swapped_leader); - assert!(leader_certificate.is_none()); - - // Populate the leader's certificate and try again - let (digest, certificate) = mock_certificate( - &committee, - &latest_protocol_version(), - leader_authority.id(), - 2, - BTreeSet::new(), - ); - dag.entry(2) - .or_default() - .insert(leader_authority.id(), (digest, certificate.clone())); - - let (leader_authority, leader_certificate_result) = schedule.leader_certificate(2, &dag); - assert_eq!(leader_authority.id(), swapped_leader); - assert_eq!(certificate, leader_certificate_result.unwrap().clone()); -} - -#[tokio::test] -async fn test_leader_schedule_from_store() { - // GIVEN - let fixture = CommitteeFixture::builder().build(); - let committee = fixture.committee(); - let authority_ids: Vec = fixture.authorities().map(|a| a.id()).collect(); - let store = Arc::new(ConsensusStore::new_for_tests()); - - // Create a leader schedule with a default swap table, so no authority will be swapped and find the leader at - // position 2. We expect the leader of round 2 to be the authority of position 0 , since round robin is used - // in tests. - let schedule = LeaderSchedule::new(committee.clone(), LeaderSwapTable::default()); - let leader_2 = schedule.leader(2); - assert_eq!(leader_2.id(), authority_ids[0]); - - // AND we add some a commit with a final score where the validator 0 is expected to be the lowest score one. - let mut scores = ReputationScores::new(&committee); - scores.final_of_schedule = true; - for (score, id) in fixture.authorities().map(|a| a.id()).enumerate() { - scores.add_score(id, score as u64); - } - - let sub_dag = CommittedSubDag::new( - vec![], - Certificate::default(&latest_protocol_version()), - 0, - scores, - None, - ); - - store - .write_consensus_state(&HashMap::new(), &sub_dag) - .unwrap(); - - // WHEN flag is disabled for the new schedule algorithm - let protocol_config = ProtocolConfig::get_for_max_version_UNSAFE(); - let schedule = LeaderSchedule::from_store(committee.clone(), store.clone(), protocol_config); - - // THEN the default should be returned. In this case we detect since good/bad nodes will be empty - assert!(schedule.leader_swap_table.read().good_nodes.is_empty()); - assert!(schedule.leader_swap_table.read().bad_nodes.is_empty()); - - // WHEN flag is enabled for the new schedule algorithm - let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE(); - protocol_config.set_narwhal_new_leader_election_schedule(true); - protocol_config.set_consensus_bad_nodes_stake_threshold(33); - let schedule = LeaderSchedule::from_store(committee, store, protocol_config); - - // THEN the stored schedule should be returned and eventually the low score leader should be - // swapped with a high score one. - let new_leader_2 = schedule.leader(2); - - assert_ne!(leader_2.id(), new_leader_2.id()); -} - fn setup_tracing() -> TelemetryGuards { // Setup tracing let tracing_level = "debug"; diff --git a/narwhal/primary/src/consensus/tests/leader_schedule_tests.rs b/narwhal/primary/src/consensus/tests/leader_schedule_tests.rs new file mode 100644 index 0000000000000..5e445f591b783 --- /dev/null +++ b/narwhal/primary/src/consensus/tests/leader_schedule_tests.rs @@ -0,0 +1,225 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{ + collections::{BTreeSet, HashMap}, + num::NonZeroUsize, + sync::Arc, +}; + +use config::AuthorityIdentifier; +use storage::ConsensusStore; +use sui_protocol_config::ProtocolConfig; +use test_utils::{latest_protocol_version, mock_certificate, CommitteeFixture}; +use types::{Certificate, CommittedSubDag, ReputationScores, Round}; + +use crate::consensus::{Dag, LeaderSchedule, LeaderSwapTable}; + +#[tokio::test] +async fn test_leader_swap_table() { + // GIVEN + let fixture = CommitteeFixture::builder().build(); + let committee = fixture.committee(); + let mut protocol_config = latest_protocol_version(); + protocol_config.set_consensus_bad_nodes_stake_threshold(33); + + // the authority ids + let authority_ids: Vec = fixture.authorities().map(|a| a.id()).collect(); + + // Adding some scores + let mut scores = ReputationScores::new(&committee); + scores.final_of_schedule = true; + for (score, id) in authority_ids.iter().enumerate() { + scores.add_score(*id, score as u64); + } + + let table = LeaderSwapTable::new( + &committee, + 2, + &scores, + protocol_config.consensus_bad_nodes_stake_threshold(), + ); + + // Only one bad authority should be calculated since all have equal stake + assert_eq!(table.bad_nodes.len(), 1); + + // now first three should be swapped, whereas the others should not return anything + for (index, id) in authority_ids.iter().enumerate() { + if index < 1 { + let s = table.swap(id, index as Round).unwrap(); + + // make sure that the returned node is amongst the good nodes + assert!(table.good_nodes.iter().any(|n| *n == s)); + } else { + assert!(table.swap(id, index as Round).is_none()); + } + } + + // Now we create a larger committee with more score variation - still all the authorities have + // equal stake. + let fixture = CommitteeFixture::builder() + .committee_size(NonZeroUsize::new(10).unwrap()) + .build(); + let committee = fixture.committee(); + + // the authority ids + let authority_ids: Vec = fixture.authorities().map(|a| a.id()).collect(); + + // Adding some scores + let mut scores = ReputationScores::new(&committee); + scores.final_of_schedule = true; + for (score, id) in authority_ids.iter().enumerate() { + scores.add_score(*id, score as u64); + } + + // We expect the first 3 authorities (f) to be amongst the bad nodes + let table = LeaderSwapTable::new( + &committee, + 2, + &scores, + protocol_config.consensus_bad_nodes_stake_threshold(), + ); + + assert_eq!(table.bad_nodes.len(), 3); + assert!(table.bad_nodes.contains_key(&authority_ids[0])); + assert!(table.bad_nodes.contains_key(&authority_ids[1])); + assert!(table.bad_nodes.contains_key(&authority_ids[2])); + + // now first three should be swapped, whereas the others should not return anything + for (index, id) in authority_ids.iter().enumerate() { + if index < 3 { + let s = table.swap(id, index as Round).unwrap(); + + // make sure that the returned node is amongst the good nodes + assert!(table.good_nodes.iter().any(|n| *n == s)); + } else { + assert!(table.swap(id, index as Round).is_none()); + } + } +} + +#[tokio::test] +async fn test_leader_schedule() { + // GIVEN + let fixture = CommitteeFixture::builder().build(); + let committee = fixture.committee(); + let mut protocol_config = latest_protocol_version(); + protocol_config.set_consensus_bad_nodes_stake_threshold(33); + + // the authority ids + let authority_ids: Vec = fixture.authorities().map(|a| a.id()).collect(); + + // Create a leader schedule with a default swap table, so no authority will be swapped. + let schedule = LeaderSchedule::new(committee.clone(), LeaderSwapTable::default()); + + // Call the leader for round 2. It should give us the validator of position 0 + let original_leader = authority_ids[0]; + let leader_2 = schedule.leader(2); + + assert_eq!(leader_2.id(), original_leader); + + // Now update the scores to consider the authority of position 0 as slow + let mut scores = ReputationScores::new(&committee); + scores.final_of_schedule = true; + for (score, id) in authority_ids.iter().enumerate() { + scores.add_score(*id, score as u64); + } + + // Update the schedule + let table = LeaderSwapTable::new( + &committee, + 2, + &scores, + protocol_config.consensus_bad_nodes_stake_threshold(), + ); + schedule.update_leader_swap_table(table.clone()); + + // Now call the leader for round 2 again. It should be swapped with another node + let leader_2 = schedule.leader(2); + + // The returned leader should not be the one of position 0 + assert_ne!(leader_2.id(), original_leader); + + // The returned leader should be the one returned by the swap table when using the updated leader scores. + let swapped_leader = table.swap(&original_leader, 2).unwrap().id(); + assert_eq!(leader_2.id(), table.swap(&original_leader, 2).unwrap().id()); + + // Now create an empty DAG + let mut dag = Dag::new(); + + // Now try to retrieve the leader's certificate + let (leader_authority, leader_certificate) = schedule.leader_certificate(2, &dag); + assert_eq!(leader_authority.id(), swapped_leader); + assert!(leader_certificate.is_none()); + + // Populate the leader's certificate and try again + let (digest, certificate) = mock_certificate( + &committee, + &latest_protocol_version(), + leader_authority.id(), + 2, + BTreeSet::new(), + ); + dag.entry(2) + .or_default() + .insert(leader_authority.id(), (digest, certificate.clone())); + + let (leader_authority, leader_certificate_result) = schedule.leader_certificate(2, &dag); + assert_eq!(leader_authority.id(), swapped_leader); + assert_eq!(certificate, leader_certificate_result.unwrap().clone()); +} + +#[tokio::test] +async fn test_leader_schedule_from_store() { + // GIVEN + let fixture = CommitteeFixture::builder().build(); + let committee = fixture.committee(); + let authority_ids: Vec = fixture.authorities().map(|a| a.id()).collect(); + let store = Arc::new(ConsensusStore::new_for_tests()); + + // Create a leader schedule with a default swap table, so no authority will be swapped and find the leader at + // position 2. We expect the leader of round 2 to be the authority of position 0 , since round robin is used + // in tests. + let schedule = LeaderSchedule::new(committee.clone(), LeaderSwapTable::default()); + let leader_2 = schedule.leader(2); + assert_eq!(leader_2.id(), authority_ids[0]); + + // AND we add some a commit with a final score where the validator 0 is expected to be the lowest score one. + let mut scores = ReputationScores::new(&committee); + scores.final_of_schedule = true; + for (score, id) in fixture.authorities().map(|a| a.id()).enumerate() { + scores.add_score(id, score as u64); + } + + let sub_dag = CommittedSubDag::new( + vec![], + Certificate::default(&latest_protocol_version()), + 0, + scores, + None, + ); + + store + .write_consensus_state(&HashMap::new(), &sub_dag) + .unwrap(); + + // WHEN flag is disabled for the new schedule algorithm + let protocol_config = ProtocolConfig::get_for_max_version_UNSAFE(); + let schedule = LeaderSchedule::from_store(committee.clone(), store.clone(), protocol_config); + + // THEN the default should be returned. In this case we detect since good/bad nodes will be empty + assert!(schedule.leader_swap_table.read().good_nodes.is_empty()); + assert!(schedule.leader_swap_table.read().bad_nodes.is_empty()); + + // WHEN flag is enabled for the new schedule algorithm + let mut protocol_config = ProtocolConfig::get_for_max_version_UNSAFE(); + protocol_config.set_narwhal_new_leader_election_schedule(true); + protocol_config.set_consensus_bad_nodes_stake_threshold(33); + let schedule = LeaderSchedule::from_store(committee, store, protocol_config); + + // THEN the stored schedule should be returned and eventually the low score leader should be + // swapped with a high score one. + let new_leader_2 = schedule.leader(2); + + assert_ne!(leader_2.id(), new_leader_2.id()); +}