Skip to content

Commit

Permalink
[forge] dag land blocking-style and stable tests (#12004)
Browse files Browse the repository at this point in the history
* [forge] dag land blocking-style test
* [dag][racefix] dont enter round with strong links
* [dag][forge] changing working quorum test
* [dag][forge] onchain config enable test
* [dag] trace ordered block
  • Loading branch information
ibalajiarun committed Feb 29, 2024
1 parent 6bb5323 commit bba289a
Show file tree
Hide file tree
Showing 7 changed files with 396 additions and 65 deletions.
4 changes: 4 additions & 0 deletions consensus/src/dag/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use super::{
observability::counters::{NUM_NODES_PER_BLOCK, NUM_ROUNDS_PER_BLOCK},
};
use crate::{
block_storage::tracing::{observe_block, BlockStage},
consensusdb::{CertifiedNodeSchema, ConsensusDB, DagVoteSchema, NodeSchema},
counters::update_counters_for_committed_blocks,
dag::{
Expand Down Expand Up @@ -195,6 +196,9 @@ impl OrderedNotifier for OrderedNotifierAdapter {
.write()
.insert(block_info.round(), Instant::now());
let block_created_ts = self.block_ordered_ts.clone();

observe_block(block.block().timestamp_usecs(), BlockStage::ORDERED);

let blocks_to_send = OrderedBlocks {
ordered_blocks: vec![block],
ordered_proof: LedgerInfoWithSignatures::new(
Expand Down
98 changes: 57 additions & 41 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,49 +183,63 @@ impl DagDriver {
debug!(error=?e, "cannot enter round");
return;
}
debug!(LogSchema::new(LogEvent::NewRound).round(new_round));
counters::CURRENT_ROUND.set(new_round as i64);

let strong_links = self
.dag
.read()
.get_strong_links_for_round(new_round - 1, &self.epoch_state.verifier)
.unwrap_or_else(|| {
assert_eq!(new_round, 1, "Only expect empty strong links for round 1");
vec![]
});

let (sys_payload_filter, payload_filter) = if strong_links.is_empty() {
(
vtxn_pool::TransactionFilter::PendingTxnHashSet(HashSet::new()),
PayloadFilter::Empty,
)
} else {

let (strong_links, sys_payload_filter, payload_filter) = {
let dag_reader = self.dag.read();
let highest_commit_round = self
.ledger_info_provider
.get_highest_committed_anchor_round();

let nodes = dag_reader
.reachable(
strong_links.iter().map(|node| node.metadata()),
Some(highest_commit_round.saturating_sub(self.window_size_config)),
|_| true,
)
.map(|node_status| node_status.as_node())
.collect::<Vec<_>>();

let payload_filter =
PayloadFilter::from(&nodes.iter().map(|node| node.payload()).collect());
let validator_txn_hashes = nodes
.iter()
.flat_map(|node| node.validator_txns())
.map(|txn| txn.hash());
let validator_payload_filter = vtxn_pool::TransactionFilter::PendingTxnHashSet(
HashSet::from_iter(validator_txn_hashes),
);

(validator_payload_filter, payload_filter)
let highest_strong_links_round =
dag_reader.highest_strong_links_round(&self.epoch_state.verifier);
if new_round.saturating_sub(highest_strong_links_round) == 0 {
debug!(
new_round = new_round,
highest_strong_link_round = highest_strong_links_round,
"new round too stale to enter"
);
return;
}

debug!(LogSchema::new(LogEvent::NewRound).round(new_round));
counters::CURRENT_ROUND.set(new_round as i64);

let strong_links = dag_reader
.get_strong_links_for_round(new_round - 1, &self.epoch_state.verifier)
.unwrap_or_else(|| {
assert_eq!(new_round, 1, "Only expect empty strong links for round 1");
vec![]
});

if strong_links.is_empty() {
(
strong_links,
vtxn_pool::TransactionFilter::PendingTxnHashSet(HashSet::new()),
PayloadFilter::Empty,
)
} else {
let highest_commit_round = self
.ledger_info_provider
.get_highest_committed_anchor_round();

let nodes = dag_reader
.reachable(
strong_links.iter().map(|node| node.metadata()),
Some(highest_commit_round.saturating_sub(self.window_size_config)),
|_| true,
)
.map(|node_status| node_status.as_node())
.collect::<Vec<_>>();

let payload_filter =
PayloadFilter::from(&nodes.iter().map(|node| node.payload()).collect());
let validator_txn_hashes = nodes
.iter()
.flat_map(|node| node.validator_txns())
.map(|txn| txn.hash());
let validator_payload_filter = vtxn_pool::TransactionFilter::PendingTxnHashSet(
HashSet::from_iter(validator_txn_hashes),
);

(strong_links, validator_payload_filter, payload_filter)
}
};

let (max_txns, max_size_bytes) = self
Expand Down Expand Up @@ -327,6 +341,8 @@ impl DagDriver {
debug!("Finish reliable broadcast for round {}", round);
};
tokio::spawn(Abortable::new(task, abort_registration));
// TODO: a bounded vec queue can hold more than window rounds, but we want to limit
// by number of rounds.
if let Some((_handle, prev_round_timestamp)) = self
.rb_handles
.lock()
Expand Down
26 changes: 6 additions & 20 deletions testsuite/forge-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use aptos_sdk::{
use aptos_testcases::{
compatibility_test::SimpleValidatorUpgrade,
consensus_reliability_tests::ChangingWorkingQuorumTest,
dag_onchain_enable_test::DagOnChainEnableTest,
forge_setup_test::ForgeSetupTest,
framework_upgrade::FrameworkUpgrade,
fullnode_reboot_stress_test::FullNodeRebootStressTest,
Expand Down Expand Up @@ -74,9 +73,12 @@ use std::{
thread,
time::Duration,
};
use suites::dag::get_dag_test;
use tokio::{runtime::Runtime, select};
use url::Url;

mod suites;

// Useful constants
const KILOBYTE: usize = 1000;
const MEGABYTE: usize = KILOBYTE * 1000;
Expand Down Expand Up @@ -518,6 +520,8 @@ fn get_test_suite(
return Ok(test_suite);
} else if let Some(test_suite) = get_state_sync_test(test_name) {
return Ok(test_suite);
} else if let Some(test_suite) = get_dag_test(test_name, duration, test_cmd) {
return Ok(test_suite);
}

// Otherwise, check the test name against the ungrouped test suites
Expand Down Expand Up @@ -554,7 +558,6 @@ fn get_test_suite(
"consensus_only_realistic_env_max_tps" => run_consensus_only_realistic_env_max_tps(),
"quorum_store_reconfig_enable_test" => quorum_store_reconfig_enable_test(),
"mainnet_like_simulation_test" => mainnet_like_simulation_test(),
"dag_reconfig_enable_test" => dag_reconfig_enable_test(),
"gather_metrics" => gather_metrics(),
_ => return Err(format_err!("Invalid --suite given: {:?}", test_name)),
};
Expand Down Expand Up @@ -2003,7 +2006,7 @@ fn chaos_test_suite(duration: Duration) -> ForgeConfig {
)
}

fn changing_working_quorum_test_helper(
pub fn changing_working_quorum_test_helper(
num_validators: usize,
epoch_duration: usize,
target_tps: usize,
Expand Down Expand Up @@ -2169,23 +2172,6 @@ fn quorum_store_reconfig_enable_test() -> ForgeConfig {
)
}

fn dag_reconfig_enable_test() -> ForgeConfig {
ForgeConfig::default()
.with_initial_validator_count(NonZeroUsize::new(20).unwrap())
.with_initial_fullnode_count(20)
.add_network_test(DagOnChainEnableTest {})
.with_success_criteria(
SuccessCriteria::new(5000)
.add_no_restarts()
.add_wait_for_catchup_s(240)
.add_system_metrics_threshold(SYSTEM_12_CORES_10GB_THRESHOLD.clone())
.add_chain_progress(StateProgressThreshold {
max_no_progress_secs: 10.0,
max_round_gap: 4,
}),
)
}

fn mainnet_like_simulation_test() -> ForgeConfig {
ForgeConfig::default()
.with_initial_validator_count(NonZeroUsize::new(20).unwrap())
Expand Down
Loading

0 comments on commit bba289a

Please sign in to comment.