From b0a8604f54964e68f127ce15161ada6d98d1c105 Mon Sep 17 00:00:00 2001 From: Balaji Arun Date: Sun, 11 Feb 2024 17:01:08 -0800 Subject: [PATCH] [dag] smoke tests --- consensus/src/network.rs | 3 + .../consensus/consensus_fault_tolerance.rs | 6 +- .../src/consensus/dag/dag_fault_tolerance.rs | 249 ++++++++++++++++++ testsuite/smoke-test/src/consensus/dag/mod.rs | 3 + testsuite/smoke-test/src/consensus/mod.rs | 1 + 5 files changed, 259 insertions(+), 3 deletions(-) create mode 100644 testsuite/smoke-test/src/consensus/dag/dag_fault_tolerance.rs create mode 100644 testsuite/smoke-test/src/consensus/dag/mod.rs diff --git a/consensus/src/network.rs b/consensus/src/network.rs index 7ef5d4ac1b9108..d2cec7449544c4 100644 --- a/consensus/src/network.rs +++ b/consensus/src/network.rs @@ -255,6 +255,9 @@ impl NetworkSender { msg: ConsensusMsg, timeout_duration: Duration, ) -> anyhow::Result { + fail_point!("consensus::send::any", |_| { + Err(anyhow::anyhow!("Injected error in send_rpc")) + }); counters::CONSENSUS_SENT_MSGS .with_label_values(&[msg.name()]) .inc(); diff --git a/testsuite/smoke-test/src/consensus/consensus_fault_tolerance.rs b/testsuite/smoke-test/src/consensus/consensus_fault_tolerance.rs index 4f8825b56dc5e1..309dc20d0acbd1 100644 --- a/testsuite/smoke-test/src/consensus/consensus_fault_tolerance.rs +++ b/testsuite/smoke-test/src/consensus/consensus_fault_tolerance.rs @@ -60,8 +60,8 @@ pub async fn create_swarm(num_nodes: usize, max_block_txns: u64) -> LocalSwarm { swarm } -struct ActiveTrafficGuard { - finish_traffic: Arc, +pub struct ActiveTrafficGuard { + pub finish_traffic: Arc, } impl Drop for ActiveTrafficGuard { @@ -70,7 +70,7 @@ impl Drop for ActiveTrafficGuard { } } -async fn start_traffic(num_accounts: usize, tps: f32, swarm: &mut dyn Swarm) -> ActiveTrafficGuard { +pub async fn start_traffic(num_accounts: usize, tps: f32, swarm: &mut dyn Swarm) -> ActiveTrafficGuard { let validator_clients = swarm.get_all_nodes_clients_with_names(); let finish = Arc::new(AtomicBool::new(false)); diff --git a/testsuite/smoke-test/src/consensus/dag/dag_fault_tolerance.rs b/testsuite/smoke-test/src/consensus/dag/dag_fault_tolerance.rs new file mode 100644 index 00000000000000..aa230873d875bc --- /dev/null +++ b/testsuite/smoke-test/src/consensus/dag/dag_fault_tolerance.rs @@ -0,0 +1,249 @@ +// Copyright © Aptos Foundation + +use crate::{consensus::consensus_fault_tolerance::{start_traffic, ActiveTrafficGuard}, smoke_test_environment::SwarmBuilder}; +use aptos_config::config::DagFetcherConfig; +use aptos_forge::{ + test_utils::consensus_utils::{no_failure_injection, test_consensus_fault_tolerance, FailPointFailureInjection, NodeState}, + LocalSwarm, +}; +use aptos_types::on_chain_config::{ + ConsensusAlgorithmConfig, DagConsensusConfigV1, OnChainConsensusConfig, ValidatorTxnConfig, +}; +use rand::{rngs::SmallRng, Rng, SeedableRng}; +use std::sync::{atomic::AtomicBool, Arc}; + +pub async fn create_dag_swarm(num_nodes: usize, max_block_txns: u64) -> LocalSwarm { + let swarm = SwarmBuilder::new_local(num_nodes) + .with_init_config(Arc::new(move |_, config, _| { + config.api.failpoints_enabled = true; + config + .state_sync + .state_sync_driver + .enable_auto_bootstrapping = true; + config + .state_sync + .state_sync_driver + .max_connection_deadline_secs = 3; + config.dag_consensus.fetcher_config = DagFetcherConfig { + retry_interval_ms: 30, + rpc_timeout_ms: 500, + min_concurrent_responders: 2, + max_concurrent_responders: 7, + } + })) + .with_init_genesis_config(Arc::new(move |genesis_config| { + let onchain_consensus_config = OnChainConsensusConfig::V3 { + alg: ConsensusAlgorithmConfig::DAG(DagConsensusConfigV1::default()), + vtxn: ValidatorTxnConfig::default_for_genesis(), + }; + + genesis_config.consensus_config = onchain_consensus_config; + })) + .build() + .await; + + println!( + "Validators {:?}", + swarm.validators().map(|v| v.peer_id()).collect::>() + ); + swarm +} + +#[tokio::test] +async fn test_no_failures() { + let num_validators = 3; + + let mut swarm = create_dag_swarm(num_validators, 1 * num_validators as u64).await; + + test_consensus_fault_tolerance( + &mut swarm, + 3, + 5.0, + 1, + no_failure_injection(), + Box::new(move |_, _, executed_rounds, executed_transactions, _, _| { + assert!( + executed_transactions >= 4, + "no progress with active consensus, only {} transactions", + executed_transactions + ); + assert!( + executed_rounds >= 2, + "no progress with active consensus, only {} rounds", + executed_rounds + ); + Ok(()) + }), + true, + false, + ) + .await + .unwrap(); +} + +async fn run_dag_fail_point_test( + num_validators: usize, + cycles: usize, + cycle_duration_s: f32, + parts_in_cycle: usize, + traffic_tps: f32, + max_block_size: u64, + // (cycle, part) -> (Vec(validator_index, name, action), reset_old_enpoints) + get_fail_points_to_set: Box< + dyn FnMut(usize, usize) -> (Vec<(usize, String, String)>, bool) + Send, + >, + // (cycle, executed_epochs, executed_rounds, executed_transactions, current_state, previous_state) + check_cycle: Box< + dyn FnMut(usize, u64, u64, u64, Vec, Vec) -> anyhow::Result<()>, + >, +) { + let mut swarm = create_dag_swarm(num_validators, max_block_size).await; + let _active_traffic = if traffic_tps > 0.0 { + start_traffic(5, traffic_tps, &mut swarm).await + } else { + ActiveTrafficGuard { + finish_traffic: Arc::new(AtomicBool::new(false)), + } + }; + test_consensus_fault_tolerance( + &mut swarm, + cycles, + cycle_duration_s, + parts_in_cycle, + Box::new(FailPointFailureInjection::new(get_fail_points_to_set)), + check_cycle, + false, + false, + ) + .await + .unwrap(); +} + +#[tokio::test] +async fn test_fault_tolerance_of_network_send() { + // Randomly increase network failure rate, until network halts, and check that it comes back afterwards. + let mut small_rng = SmallRng::from_entropy(); + let num_validators = 3; + let num_cycles = 4; + run_dag_fail_point_test( + num_validators, + num_cycles, + 2.5, + 5, + 1.0, + 1, + Box::new(move |cycle, _part| { + let max = 10 * (10 - num_cycles + cycle + 1); + let rand: usize = small_rng.gen_range(0, 1000); + let rand_reliability = ((rand as f32 / 1000.0).powf(0.5) * max as f32) as i32; + let wanted_client = small_rng.gen_range(0usize, num_validators); + + ( + vec![( + wanted_client, + "consensus::send::any".to_string(), + format!("{}%return", rand_reliability), + )], + false, + ) + }), + Box::new(|_, _, _, _, _, _| Ok(())), + ) + .await; +} + +#[tokio::test] +async fn test_fault_tolerance_of_network_receive() { + // Randomly increase network failure rate, until network halts, and check that it comes back afterwards. + let mut small_rng = SmallRng::from_entropy(); + let num_validators = 3; + let num_cycles = 4; + run_dag_fail_point_test( + num_validators, + num_cycles, + 2.5, + 5, + 1.0, + 1, + Box::new(move |cycle, _part| { + let max = 10 * (10 - num_cycles + cycle + 1); + let rand: usize = small_rng.gen_range(0, 1000); + let rand_reliability = ((rand as f32 / 1000.0).powf(0.5) * max as f32) as i32; + let wanted_client = small_rng.gen_range(0usize, num_validators); + + ( + vec![( + wanted_client, + "consensus::process::any".to_string(), + format!("{}%return", rand_reliability), + )], + false, + ) + }), + Box::new(|_, _, _, _, _, _| Ok(())), + ) + .await; +} + +#[tokio::test] +async fn test_changing_working_consensus() { + // with 7 nodes, consensus needs 5 to operate. + // we rotate in each cycle, which 2 nodes are down. + // we should consisnently be seeing progress. + let num_validators = 7; + run_dag_fail_point_test( + num_validators, + 6, + 10.0, + 2, + 1.0, + num_validators as u64, + Box::new(move |cycle, part| { + if part == 0 { + let client_1 = (cycle * 2) % num_validators; + let client_2 = (cycle * 2 + 1) % num_validators; + ( + vec![ + ( + client_1, + "consensus::send::any".to_string(), + "return".to_string(), + ), + ( + client_1, + "consensus::process::any".to_string(), + "return".to_string(), + ), + ( + client_2, + "consensus::send::any".to_string(), + "return".to_string(), + ), + ( + client_2, + "consensus::process::any".to_string(), + "return".to_string(), + ), + ], + true, + ) + } else { + (vec![], false) + } + }), + Box::new(|_, _, executed_rounds, executed_transactions, _, _| { + assert!( + executed_transactions >= 1, + "no progress with active consensus, only {} transactions", + executed_transactions + ); + assert!( + executed_rounds >= 2, + "no progress with active consensus, only {} rounds", + executed_rounds + ); + Ok(()) + }), + ) + .await; +} diff --git a/testsuite/smoke-test/src/consensus/dag/mod.rs b/testsuite/smoke-test/src/consensus/dag/mod.rs new file mode 100644 index 00000000000000..2e15d3cf1ed1fc --- /dev/null +++ b/testsuite/smoke-test/src/consensus/dag/mod.rs @@ -0,0 +1,3 @@ +// Copyright © Aptos Foundation + +mod dag_fault_tolerance; diff --git a/testsuite/smoke-test/src/consensus/mod.rs b/testsuite/smoke-test/src/consensus/mod.rs index c9779dd0fa7e96..ad1368b8f84d27 100644 --- a/testsuite/smoke-test/src/consensus/mod.rs +++ b/testsuite/smoke-test/src/consensus/mod.rs @@ -4,4 +4,5 @@ mod consensus_fault_tolerance; mod consensus_only; mod consensusdb_recovery; +mod dag; mod quorum_store_fault_tolerance;