From 6ab618a05eef69d8b27ad60ca32faa6ded96c209 Mon Sep 17 00:00:00 2001 From: Arun Koshy <97870774+arun-koshy@users.noreply.github.com> Date: Tue, 10 Oct 2023 13:08:51 -0700 Subject: [PATCH] [narwhal] Add CertificateV2 (#13777) Added new version for NW `Certificate` which Includes new `AggregateSignatureState`. This holds `AggregateSignatureBytes` but with the added layer to specify if the signature was verified via a leader, verified directly, unverified or unsigned. This will be used to take advantage of the certificate chain that is formed via the DAG by only verifying the leaders of the certificate chain when they are fetched from validators during catchup. This is gated by a protocol feature flag so this PR should have no effect yet. Still testing the [followup PR](https://github.com/MystenLabs/sui/commit/ef48f56d972e46ada7b58bd2ecd7eebd23d8010b) which will include the usage of `CertificateV2`, but sharing here to provide more context for these changes. --- crates/sui-core/Cargo.toml | 2 +- crates/sui-core/src/consensus_handler.rs | 14 +- crates/sui-open-rpc/spec/openrpc.json | 1 + crates/sui-protocol-config/src/lib.rs | 8 + .../consensus/benches/process_certificates.rs | 2 +- .../consensus/src/tests/bullshark_tests.rs | 32 +- .../consensus/src/tests/consensus_tests.rs | 10 +- .../consensus/src/tests/randomized_tests.rs | 2 +- .../tests/consensus_integration_tests.rs | 2 +- narwhal/node/src/generate_format.rs | 14 +- narwhal/primary/src/aggregators.rs | 21 +- narwhal/primary/src/certificate_fetcher.rs | 7 +- narwhal/primary/src/certifier.rs | 9 +- narwhal/primary/src/primary.rs | 5 +- narwhal/primary/src/proposer.rs | 4 +- narwhal/primary/src/synchronizer.rs | 44 +- .../src/tests/certificate_fetcher_tests.rs | 12 +- .../primary/src/tests/certificate_tests.rs | 56 ++- narwhal/primary/src/tests/certifier_tests.rs | 15 +- narwhal/primary/src/tests/primary_tests.rs | 57 ++- narwhal/primary/src/tests/proposer_tests.rs | 16 +- .../primary/src/tests/synchronizer_tests.rs | 46 +- narwhal/storage/src/certificate_store.rs | 17 +- narwhal/storage/src/consensus_store.rs | 6 +- narwhal/test-utils/src/lib.rs | 60 ++- narwhal/types/benches/verify_certificate.rs | 23 +- narwhal/types/src/consensus.rs | 31 +- narwhal/types/src/primary.rs | 431 ++++++++++++++++-- narwhal/types/tests/primary_tests.rs | 10 +- 29 files changed, 760 insertions(+), 197 deletions(-) diff --git a/crates/sui-core/Cargo.toml b/crates/sui-core/Cargo.toml index 345accd58824f..974f96337f889 100644 --- a/crates/sui-core/Cargo.toml +++ b/crates/sui-core/Cargo.toml @@ -60,6 +60,7 @@ narwhal-crypto.workspace = true narwhal-executor.workspace = true narwhal-network.workspace = true narwhal-node.workspace = true +narwhal-test-utils.workspace = true narwhal-types.workspace = true narwhal-worker.workspace = true shared-crypto.workspace = true @@ -92,7 +93,6 @@ serde_yaml.workspace = true move-symbol-pool.workspace = true -narwhal-test-utils.workspace = true sui-test-transaction-builder.workspace = true sui-types = { workspace = true, features = ["test-utils"] } diff --git a/crates/sui-core/src/consensus_handler.rs b/crates/sui-core/src/consensus_handler.rs index 18b659c64d821..5be05ba95273d 100644 --- a/crates/sui-core/src/consensus_handler.rs +++ b/crates/sui-core/src/consensus_handler.rs @@ -19,7 +19,8 @@ use lru::LruCache; use mysten_metrics::{monitored_scope, spawn_monitored_task}; use narwhal_config::Committee; use narwhal_executor::{ExecutionIndices, ExecutionState}; -use narwhal_types::{BatchAPI, CertificateAPI, ConsensusOutput, HeaderAPI}; +use narwhal_test_utils::latest_protocol_version; +use narwhal_types::{BatchAPI, Certificate, CertificateAPI, ConsensusOutput, HeaderAPI}; use serde::{Deserialize, Serialize}; use std::collections::hash_map::DefaultHasher; use std::collections::{BTreeSet, HashMap, HashSet}; @@ -667,7 +668,7 @@ impl SequencedConsensusTransaction { pub fn new_test(transaction: ConsensusTransaction) -> Self { Self { transaction: SequencedConsensusTransactionKind::External(transaction), - certificate: Default::default(), + certificate: Arc::new(Certificate::default(&latest_protocol_version())), certificate_author: AuthorityName::ZERO, consensus_index: Default::default(), } @@ -758,8 +759,13 @@ mod tests { .build() .unwrap(); - let certificate = - Certificate::new_unsigned(&committee, Header::V1(header), vec![]).unwrap(); + let certificate = Certificate::new_unsigned( + latest_protocol_config, + &committee, + Header::V1(header), + vec![], + ) + .unwrap(); certificates.push(certificate); } diff --git a/crates/sui-open-rpc/spec/openrpc.json b/crates/sui-open-rpc/spec/openrpc.json index ba3d025992106..29edeabd94557 100644 --- a/crates/sui-open-rpc/spec/openrpc.json +++ b/crates/sui-open-rpc/spec/openrpc.json @@ -1365,6 +1365,7 @@ "loaded_child_object_format_type": false, "loaded_child_objects_fixed": true, "missing_type_is_compatibility_error": true, + "narwhal_certificate_v2": false, "narwhal_new_leader_election_schedule": false, "narwhal_versioned_metadata": false, "no_extraneous_module_bytes": false, diff --git a/crates/sui-protocol-config/src/lib.rs b/crates/sui-protocol-config/src/lib.rs index 58d89b216e164..a701f6272d377 100644 --- a/crates/sui-protocol-config/src/lib.rs +++ b/crates/sui-protocol-config/src/lib.rs @@ -295,6 +295,10 @@ struct FeatureFlags { #[serde(skip_serializing_if = "is_false")] enable_effects_v2: bool, + + // If true, then use CertificateV2 in narwhal. + #[serde(skip_serializing_if = "is_false")] + narwhal_certificate_v2: bool, } fn is_false(b: &bool) -> bool { @@ -952,6 +956,10 @@ impl ProtocolConfig { pub fn enable_effects_v2(&self) -> bool { self.feature_flags.enable_effects_v2 } + + pub fn narwhal_certificate_v2(&self) -> bool { + self.feature_flags.narwhal_certificate_v2 + } } #[cfg(not(msim))] diff --git a/narwhal/consensus/benches/process_certificates.rs b/narwhal/consensus/benches/process_certificates.rs index b8aeef1bc6345..432f23d9abaa3 100644 --- a/narwhal/consensus/benches/process_certificates.rs +++ b/narwhal/consensus/benches/process_certificates.rs @@ -30,7 +30,7 @@ pub fn process_certificates(c: &mut Criterion) { let rounds: Round = *size; // process certificates for rounds, check we don't grow the dag too much - let genesis = Certificate::genesis(&committee) + let genesis = Certificate::genesis(&latest_protocol_version(), &committee) .iter() .map(|x| x.digest()) .collect::>(); diff --git a/narwhal/consensus/src/tests/bullshark_tests.rs b/narwhal/consensus/src/tests/bullshark_tests.rs index 3c5d378582fad..71c9d430e73fa 100644 --- a/narwhal/consensus/src/tests/bullshark_tests.rs +++ b/narwhal/consensus/src/tests/bullshark_tests.rs @@ -30,7 +30,7 @@ async fn order_leaders() { let committee = fixture.committee(); // Make certificates for rounds 1 to 7. let ids: Vec<_> = fixture.authorities().map(|a| a.id()).collect(); - let genesis = Certificate::genesis(&committee) + let genesis = Certificate::genesis(&latest_protocol_version(), &committee) .iter() .map(|x| x.digest()) .collect::>(); @@ -127,7 +127,7 @@ async fn commit_one_with_leader_schedule_change() { let committee = fixture.committee(); // Make certificates for rounds 1 to 9. let ids: Vec<_> = fixture.authorities().map(|a| a.id()).collect(); - let genesis = Certificate::genesis(&committee) + let genesis = Certificate::genesis(&latest_protocol_version(), &committee) .iter() .map(|x| x.digest()) .collect::>(); @@ -188,7 +188,7 @@ async fn not_enough_support_with_leader_schedule_change() { let committee = fixture.committee(); let ids: Vec<_> = fixture.authorities().map(|a| a.id()).collect(); - let genesis = Certificate::genesis(&committee) + let genesis = Certificate::genesis(&latest_protocol_version(), &committee) .iter() .map(|x| x.digest()) .collect::>(); @@ -336,7 +336,7 @@ async fn test_long_period_of_asynchrony_for_leader_schedule_change() { let committee = fixture.committee(); let ids: Vec<_> = fixture.authorities().map(|a| a.id()).collect(); - let genesis = Certificate::genesis(&committee) + let genesis = Certificate::genesis(&latest_protocol_version(), &committee) .iter() .map(|x| x.digest()) .collect::>(); @@ -455,7 +455,7 @@ async fn commit_one() { let committee = fixture.committee(); // Make certificates for rounds 1 and 2. let ids: Vec<_> = fixture.authorities().map(|a| a.id()).collect(); - let genesis = Certificate::genesis(&committee) + let genesis = Certificate::genesis(&latest_protocol_version(), &committee) .iter() .map(|x| x.digest()) .collect::>(); @@ -559,7 +559,7 @@ async fn dead_node() { // remove the last authority - 4 let dead_node = ids.pop().unwrap(); - let genesis = Certificate::genesis(&committee) + let genesis = Certificate::genesis(&latest_protocol_version(), &committee) .iter() .map(|x| x.digest()) .collect::>(); @@ -670,7 +670,7 @@ async fn not_enough_support() { let mut ids: Vec<_> = fixture.authorities().map(|a| a.id()).collect(); ids.sort(); - let genesis = Certificate::genesis(&committee) + let genesis = Certificate::genesis(&latest_protocol_version(), &committee) .iter() .map(|x| x.digest()) .collect::>(); @@ -870,7 +870,7 @@ async fn missing_leader() { let mut ids: Vec<_> = fixture.authorities().map(|a| a.id()).collect(); ids.sort(); - let genesis = Certificate::genesis(&committee) + let genesis = Certificate::genesis(&latest_protocol_version(), &committee) .iter() .map(|x| x.digest()) .collect::>(); @@ -986,7 +986,7 @@ async fn committed_round_after_restart() { let epoch = committee.epoch(); // Make certificates for rounds 1 to 11. - let genesis = Certificate::genesis(&committee) + let genesis = Certificate::genesis(&latest_protocol_version(), &committee) .iter() .map(|x| x.digest()) .collect::>(); @@ -1092,7 +1092,7 @@ async fn delayed_certificates_are_rejected() { let gc_depth = 10; // Make certificates for rounds 1 to 11. - let genesis = Certificate::genesis(&committee) + let genesis = Certificate::genesis(&latest_protocol_version(), &committee) .iter() .map(|x| x.digest()) .collect::>(); @@ -1151,7 +1151,7 @@ async fn submitting_equivocating_certificate_should_error() { let gc_depth = 10; // Make certificates for rounds 1 to 11. - let genesis = Certificate::genesis(&committee) + let genesis = Certificate::genesis(&latest_protocol_version(), &committee) .iter() .map(|x| x.digest()) .collect::>(); @@ -1227,7 +1227,7 @@ async fn reset_consensus_scores_on_every_schedule_change() { let gc_depth = 10; // Make certificates for rounds 1 to 50. - let genesis = Certificate::genesis(&committee) + let genesis = Certificate::genesis(&latest_protocol_version(), &committee) .iter() .map(|x| x.digest()) .collect::>(); @@ -1341,7 +1341,7 @@ async fn restart_with_new_committee() { tokio::spawn(async move { while rx_primary.recv().await.is_some() {} }); // Make certificates for rounds 1 and 2. - let genesis = Certificate::genesis(&committee) + let genesis = Certificate::genesis(&latest_protocol_version(), &committee) .iter() .map(|x| x.digest()) .collect::>(); @@ -1426,7 +1426,7 @@ async fn garbage_collection_basic() { .map(|authority| authority.id()) .collect(); let slow_node = ids[3]; - let genesis = Certificate::genesis(&committee); + let genesis = Certificate::genesis(&latest_protocol_version(), &committee); let slow_nodes = vec![(slow_node, 0.0_f64)]; let (certificates, _round_5_certificates) = test_utils::make_certificates_with_slow_nodes( @@ -1522,7 +1522,7 @@ async fn slow_node() { .map(|authority| authority.id()) .collect(); let slow_node = ids[3]; - let genesis = Certificate::genesis(&committee); + let genesis = Certificate::genesis(&latest_protocol_version(), &committee); let slow_nodes = vec![(slow_node, 0.0_f64)]; let (certificates, round_8_certificates) = test_utils::make_certificates_with_slow_nodes( @@ -1662,7 +1662,7 @@ async fn not_enough_support_and_missing_leaders_and_gc() { let keys_with_dead_node = ids[0..=2].to_vec(); let slow_node = ids[3]; let slow_nodes = vec![(slow_node, 0.0_f64)]; - let genesis = Certificate::genesis(&committee); + let genesis = Certificate::genesis(&latest_protocol_version(), &committee); let (mut certificates, round_2_certificates) = test_utils::make_certificates_with_slow_nodes( &committee, diff --git a/narwhal/consensus/src/tests/consensus_tests.rs b/narwhal/consensus/src/tests/consensus_tests.rs index e4a2953c999cf..31df9beb3329f 100644 --- a/narwhal/consensus/src/tests/consensus_tests.rs +++ b/narwhal/consensus/src/tests/consensus_tests.rs @@ -65,7 +65,7 @@ async fn test_consensus_recovery_with_bullshark_with_config(config: ProtocolConf // AND make certificates for rounds 1 to 7 (inclusive) let ids: Vec<_> = fixture.authorities().map(|a| a.id()).collect(); - let genesis = Certificate::genesis(&committee) + let genesis = Certificate::genesis(&latest_protocol_version(), &committee) .iter() .map(|x| x.digest()) .collect::>(); @@ -507,7 +507,13 @@ async fn test_leader_schedule_from_store() { scores.add_score(id, score as u64); } - let sub_dag = CommittedSubDag::new(vec![], Certificate::default(), 0, scores, None); + let sub_dag = CommittedSubDag::new( + vec![], + Certificate::default(&latest_protocol_version()), + 0, + scores, + None, + ); store .write_consensus_state(&HashMap::new(), &sub_dag) diff --git a/narwhal/consensus/src/tests/randomized_tests.rs b/narwhal/consensus/src/tests/randomized_tests.rs index ef558f114017a..c1f235afb471d 100644 --- a/narwhal/consensus/src/tests/randomized_tests.rs +++ b/narwhal/consensus/src/tests/randomized_tests.rs @@ -288,7 +288,7 @@ fn generate_randomised_dag( .rng(rand) .build(); let committee: Committee = fixture.committee(); - let genesis = Certificate::genesis(&committee); + let genesis = Certificate::genesis(&latest_protocol_version(), &committee); // Create a known DAG let (original_certificates, _last_round) = make_certificates_with_parameters( diff --git a/narwhal/executor/tests/consensus_integration_tests.rs b/narwhal/executor/tests/consensus_integration_tests.rs index 7d7668e091ef6..e6d13a7aedcbf 100644 --- a/narwhal/executor/tests/consensus_integration_tests.rs +++ b/narwhal/executor/tests/consensus_integration_tests.rs @@ -34,7 +34,7 @@ async fn test_recovery() { // Make certificates for rounds 1 and 2. let ids: Vec<_> = fixture.authorities().map(|a| a.id()).collect(); - let genesis = Certificate::genesis(&committee) + let genesis = Certificate::genesis(&latest_protocol_version(), &committee) .iter() .map(|x| x.digest()) .collect::>(); diff --git a/narwhal/node/src/generate_format.rs b/narwhal/node/src/generate_format.rs index c51f1079ae9d5..6e9269d35a974 100644 --- a/narwhal/node/src/generate_format.rs +++ b/narwhal/node/src/generate_format.rs @@ -11,6 +11,7 @@ use mysten_network::Multiaddr; use rand::{prelude::StdRng, SeedableRng}; use serde_reflection::{Registry, Result, Samples, Tracer, TracerConfig}; use std::{fs::File, io::Write}; +use test_utils::latest_protocol_version; use types::{ Batch, BatchDigest, Certificate, CertificateDigest, Header, HeaderDigest, HeaderV1Builder, MetadataV1, VersionedMetadata, WorkerOthersBatchMessage, WorkerOwnBatchMessage, @@ -61,7 +62,8 @@ fn get_registry() -> Result { let committee = committee_builder.build(); - let certificates: Vec = Certificate::genesis(&committee); + let certificates: Vec = + Certificate::genesis(&latest_protocol_version(), &committee); // Find the author id inside the committee let authority = committee.authority_by_key(kp.public()).unwrap(); @@ -83,10 +85,16 @@ fn get_registry() -> Result { .unwrap(); let worker_pk = network_keys[0].public().clone(); - let certificate = - Certificate::new_unsigned(&committee, Header::V1(header.clone()), vec![]).unwrap(); + let certificate = Certificate::new_unsigned( + &latest_protocol_version(), + &committee, + Header::V1(header.clone()), + vec![], + ) + .unwrap(); let signature = keys[0].sign(certificate.digest().as_ref()); let certificate = Certificate::new_unsigned( + &latest_protocol_version(), &committee, Header::V1(header.clone()), vec![(authority.id(), signature)], diff --git a/narwhal/primary/src/aggregators.rs b/narwhal/primary/src/aggregators.rs index 1c435ddbd63e0..7ceefdb52071f 100644 --- a/narwhal/primary/src/aggregators.rs +++ b/narwhal/primary/src/aggregators.rs @@ -11,6 +11,7 @@ use crypto::{ use fastcrypto::hash::{Digest, Hash}; use std::collections::HashSet; use std::sync::Arc; +use sui_protocol_config::ProtocolConfig; use tracing::warn; use types::{ ensure, @@ -20,6 +21,7 @@ use types::{ /// Aggregates votes for a particular header into a certificate. pub struct VotesAggregator { + protocol_config: ProtocolConfig, weight: Stake, votes: Vec<(AuthorityIdentifier, Signature)>, used: HashSet, @@ -27,10 +29,11 @@ pub struct VotesAggregator { } impl VotesAggregator { - pub fn new(metrics: Arc) -> Self { + pub fn new(protocol_config: &ProtocolConfig, metrics: Arc) -> Self { metrics.votes_received_last_round.set(0); Self { + protocol_config: protocol_config.clone(), weight: 0, votes: Vec::new(), used: HashSet::new(), @@ -59,13 +62,21 @@ impl VotesAggregator { .votes_received_last_round .set(self.votes.len() as i64); if self.weight >= committee.quorum_threshold() { - let cert = Certificate::new_unverified(committee, header.clone(), self.votes.clone())?; + let cert = Certificate::new_unverified( + &self.protocol_config, + committee, + header.clone(), + self.votes.clone(), + )?; let (_, pks) = cert.signed_by(committee); let certificate_digest: Digest<{ crypto::DIGEST_LENGTH }> = Digest::from(cert.digest()); - match AggregateSignature::try_from(cert.aggregated_signature()) - .map_err(|_| DagError::InvalidSignature)? - .verify_secure(&to_intent_message(certificate_digest), &pks[..]) + match AggregateSignature::try_from( + cert.aggregated_signature() + .ok_or(DagError::InvalidSignature)?, + ) + .map_err(|_| DagError::InvalidSignature)? + .verify_secure(&to_intent_message(certificate_digest), &pks[..]) { Err(err) => { warn!( diff --git a/narwhal/primary/src/certificate_fetcher.rs b/narwhal/primary/src/certificate_fetcher.rs index 24714a5dee01e..5666d25c408b4 100644 --- a/narwhal/primary/src/certificate_fetcher.rs +++ b/narwhal/primary/src/certificate_fetcher.rs @@ -447,13 +447,14 @@ async fn process_certificates_helper( // Use threads dedicated to computation heavy work. spawn_blocking(move || { let now = Instant::now(); - for c in &certs { - sync.sanitize_certificate(c)?; + let mut sanitized_certs = Vec::new(); + for c in certs { + sanitized_certs.push(sync.sanitize_certificate(c)?); } metrics .certificate_fetcher_total_verification_us .inc_by(now.elapsed().as_micros() as u64); - Ok::, DagError>(certs) + Ok::, DagError>(sanitized_certs) }) }) .collect_vec(); diff --git a/narwhal/primary/src/certifier.rs b/narwhal/primary/src/certifier.rs index ba0675e0292b7..5f87299bf095f 100644 --- a/narwhal/primary/src/certifier.rs +++ b/narwhal/primary/src/certifier.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use std::time::Duration; use storage::CertificateStore; use sui_macros::fail_point_async; +use sui_protocol_config::ProtocolConfig; use tokio::{ sync::oneshot, task::{JoinHandle, JoinSet}, @@ -41,6 +42,7 @@ pub struct Certifier { authority_id: AuthorityIdentifier, /// The committee information. committee: Committee, + protocol_config: ProtocolConfig, /// The persistent storage keyed to certificates. certificate_store: CertificateStore, /// Handles synchronization with other nodes and our workers. @@ -71,6 +73,7 @@ impl Certifier { pub fn spawn( authority_id: AuthorityIdentifier, committee: Committee, + protocol_config: ProtocolConfig, certificate_store: CertificateStore, synchronizer: Arc, signature_service: SignatureService, @@ -84,6 +87,7 @@ impl Certifier { Self { authority_id, committee, + protocol_config, certificate_store, synchronizer, signature_service, @@ -237,6 +241,7 @@ impl Certifier { async fn propose_header( authority_id: AuthorityIdentifier, committee: Committee, + protocol_config: ProtocolConfig, certificate_store: CertificateStore, signature_service: SignatureService, metrics: Arc, @@ -259,7 +264,7 @@ impl Certifier { metrics.proposed_header_round.set(header.round() as i64); // Reset the votes aggregator and sign our own header. - let mut votes_aggregator = VotesAggregator::new(metrics.clone()); + let mut votes_aggregator = VotesAggregator::new(&protocol_config, metrics.clone()); let vote = Vote::new(&header, &authority_id, &signature_service).await; let mut certificate = votes_aggregator.append(vote, &committee, &header)?; @@ -373,10 +378,12 @@ impl Certifier { let signature_service = self.signature_service.clone(); let metrics = self.metrics.clone(); let network = self.network.clone(); + let protocol_config = self.protocol_config.clone(); fail_point_async!("narwhal-delay"); self.propose_header_tasks.spawn(monitored_future!(Self::propose_header( name, committee, + protocol_config, certificate_store, signature_service, metrics, diff --git a/narwhal/primary/src/primary.rs b/narwhal/primary/src/primary.rs index 9568e3a6b43d3..93d0310868604 100644 --- a/narwhal/primary/src/primary.rs +++ b/narwhal/primary/src/primary.rs @@ -90,7 +90,7 @@ impl Primary { network_signer: NetworkKeyPair, committee: Committee, worker_cache: WorkerCache, - _protocol_config: ProtocolConfig, + protocol_config: ProtocolConfig, parameters: Parameters, client: NetworkClient, certificate_store: CertificateStore, @@ -167,6 +167,7 @@ impl Primary { let synchronizer = Arc::new(Synchronizer::new( authority.id(), committee.clone(), + protocol_config.clone(), worker_cache.clone(), parameters.gc_depth, client.clone(), @@ -425,6 +426,7 @@ impl Primary { let core_handle = Certifier::spawn( authority.id(), committee.clone(), + protocol_config.clone(), certificate_store.clone(), synchronizer.clone(), signature_service, @@ -453,6 +455,7 @@ impl Primary { let proposer_handle = Proposer::spawn( authority.id(), committee, + &protocol_config, proposer_store, parameters.header_num_of_batches_threshold, parameters.max_header_num_of_batches, diff --git a/narwhal/primary/src/proposer.rs b/narwhal/primary/src/proposer.rs index 024a6d8827970..211951de2eaf2 100644 --- a/narwhal/primary/src/proposer.rs +++ b/narwhal/primary/src/proposer.rs @@ -10,6 +10,7 @@ use mysten_metrics::spawn_logged_monitored_task; use std::collections::{BTreeMap, VecDeque}; use std::{cmp::Ordering, sync::Arc}; use storage::ProposerStore; +use sui_protocol_config::ProtocolConfig; use tokio::time::{sleep_until, Instant}; use tokio::{ sync::{oneshot, watch}, @@ -106,6 +107,7 @@ impl Proposer { pub fn spawn( authority_id: AuthorityIdentifier, committee: Committee, + protocol_config: &ProtocolConfig, proposer_store: ProposerStore, header_num_of_batches_threshold: usize, max_header_num_of_batches: usize, @@ -121,7 +123,7 @@ impl Proposer { metrics: Arc, leader_schedule: LeaderSchedule, ) -> JoinHandle<()> { - let genesis = Certificate::genesis(&committee); + let genesis = Certificate::genesis(protocol_config, &committee); spawn_logged_monitored_task!( async move { Self { diff --git a/narwhal/primary/src/synchronizer.rs b/narwhal/primary/src/synchronizer.rs index c27d0ff5a9ace..be3092a97fb68 100644 --- a/narwhal/primary/src/synchronizer.rs +++ b/narwhal/primary/src/synchronizer.rs @@ -26,6 +26,7 @@ use std::{ time::Duration, }; use storage::{CertificateStore, PayloadStore}; +use sui_protocol_config::ProtocolConfig; use tokio::{ sync::{broadcast, oneshot, watch, MutexGuard}, task::JoinSet, @@ -306,6 +307,7 @@ impl Synchronizer { pub fn new( authority_id: AuthorityIdentifier, committee: Committee, + protocol_config: ProtocolConfig, worker_cache: WorkerCache, gc_depth: Round, client: NetworkClient, @@ -319,7 +321,7 @@ impl Synchronizer { primary_channel_metrics: &PrimaryChannelMetrics, ) -> Self { let committee: &Committee = &committee; - let genesis = Self::make_genesis(committee); + let genesis = Self::make_genesis(&protocol_config, committee); let highest_processed_round = certificate_store.highest_round_number(); let highest_created_certificate = certificate_store.last_round(authority_id).unwrap(); let gc_round = rx_consensus_round_updates.borrow().gc_round; @@ -555,6 +557,7 @@ impl Synchronizer { let _scope = monitored_scope("Synchronizer::try_accept_certificate"); self.process_certificate_internal(certificate, true, true) .await + .map(|_| ()) } /// Tries to accept a certificate from certificate fetcher. @@ -567,17 +570,18 @@ impl Synchronizer { let _scope = monitored_scope("Synchronizer::try_accept_fetched_certificate"); self.process_certificate_internal(certificate, false, false) .await + .map(|_| ()) } /// Accepts a certificate produced by this primary. This is not expected to fail unless /// the primary is shutting down. pub async fn accept_own_certificate(&self, certificate: Certificate) -> DagResult<()> { // Process the new certificate. - match self + let certificate = match self .process_certificate_internal(certificate.clone(), false, false) .await { - Ok(()) => Ok(()), + Ok(processed_certificate) => Ok(processed_certificate), result @ Err(DagError::ShuttingDown) => result, Err(e) => panic!("Failed to process locally-created certificate: {e}"), }?; @@ -621,16 +625,18 @@ impl Synchronizer { Ok(()) } - fn make_genesis(committee: &Committee) -> HashMap { - Certificate::genesis(committee) + fn make_genesis( + protocol_config: &ProtocolConfig, + committee: &Committee, + ) -> HashMap { + Certificate::genesis(protocol_config, committee) .into_iter() .map(|x| (x.digest(), x)) .collect() } /// Checks if the certificate is valid and can potentially be accepted into the DAG. - // TODO: produce a different type after sanitize, e.g. VerifiedCertificate. - pub fn sanitize_certificate(&self, certificate: &Certificate) -> DagResult<()> { + pub fn sanitize_certificate(&self, certificate: Certificate) -> DagResult { ensure!( self.inner.committee.epoch() == certificate.epoch(), DagError::InvalidEpoch { @@ -645,24 +651,24 @@ impl Synchronizer { DagError::TooOld(certificate.digest().into(), certificate.round(), gc_round) ); // Verify the certificate (and the embedded header). - certificate - .verify(&self.inner.committee, &self.inner.worker_cache) - .map_err(DagError::from) + certificate.verify(&self.inner.committee, &self.inner.worker_cache) } + // CertificateV2 maintains signature verification state. Therefore when this + // method is called with sanitize = true, the signature verification state may + // change which is why the updated certificate is returned. async fn process_certificate_internal( &self, - certificate: Certificate, + mut certificate: Certificate, sanitize: bool, early_suspend: bool, - ) -> DagResult<()> { + ) -> DagResult { let _scope = monitored_scope("Synchronizer::process_certificate_internal"); - let digest = certificate.digest(); if self.inner.certificate_store.contains(&digest)? { trace!("Certificate {digest:?} has already been processed. Skip processing."); self.inner.metrics.duplicate_certificates_processed.inc(); - return Ok(()); + return Ok(certificate); } // Ensure parents are checked if !early_suspend. // See comments above `try_accept_fetched_certificate()` for details. @@ -677,8 +683,9 @@ impl Synchronizer { return Err(DagError::Suspended(notify)); } } + if sanitize { - self.sanitize_certificate(&certificate)?; + certificate = self.sanitize_certificate(certificate)?; } debug!( @@ -745,12 +752,13 @@ impl Synchronizer { let (sender, receiver) = oneshot::channel(); self.inner .tx_certificate_acceptor - .send((certificate, sender, early_suspend)) + .send((certificate.clone(), sender, early_suspend)) .await .expect("Synchronizer should shut down before certificate acceptor task."); receiver .await - .expect("Synchronizer should shut down before certificate acceptor task.") + .expect("Synchronizer should shut down before certificate acceptor task.")?; + Ok(certificate) } /// This function checks if a certificate has all parents and can be accepted into storage. @@ -1315,7 +1323,7 @@ mod tests { .build(); let committee: Committee = fixture.committee(); - let genesis = Certificate::genesis(&committee) + let genesis = Certificate::genesis(&latest_protocol_version(), &committee) .iter() .map(|x| x.digest()) .collect::>(); diff --git a/narwhal/primary/src/tests/certificate_fetcher_tests.rs b/narwhal/primary/src/tests/certificate_fetcher_tests.rs index da441b2a7a29a..8c25a8683fce4 100644 --- a/narwhal/primary/src/tests/certificate_fetcher_tests.rs +++ b/narwhal/primary/src/tests/certificate_fetcher_tests.rs @@ -161,6 +161,7 @@ async fn fetch_certificates_basic() { let synchronizer = Arc::new(Synchronizer::new( id, fixture.committee(), + latest_protocol_version(), worker_cache.clone(), gc_depth, client, @@ -205,7 +206,8 @@ async fn fetch_certificates_basic() { ); // Generate headers and certificates in successive rounds - let genesis_certs: Vec<_> = Certificate::genesis(&fixture.committee()); + let genesis_certs: Vec<_> = + Certificate::genesis(&latest_protocol_version(), &fixture.committee()); for cert in genesis_certs.iter() { certificate_store .write(cert.clone()) @@ -221,7 +223,11 @@ async fn fetch_certificates_basic() { for i in 0..rounds { let parents: BTreeSet<_> = current_round .into_iter() - .map(|header| fixture.certificate(&header).digest()) + .map(|header| { + fixture + .certificate(&latest_protocol_version(), &header) + .digest() + }) .collect(); (_, current_round) = fixture.headers_round(i, &parents, &latest_protocol_version()); headers.extend(current_round.clone()); @@ -236,7 +242,7 @@ async fn fetch_certificates_basic() { // Create certificates test data. let mut certificates = vec![]; for header in headers.into_iter() { - certificates.push(fixture.certificate(&header)); + certificates.push(fixture.certificate(&latest_protocol_version(), &header)); } assert_eq!(certificates.len(), total_certificates); // note genesis is not included assert_eq!(240, total_certificates); diff --git a/narwhal/primary/src/tests/certificate_tests.rs b/narwhal/primary/src/tests/certificate_tests.rs index 3f860747195ea..c55bf75750ede 100644 --- a/narwhal/primary/src/tests/certificate_tests.rs +++ b/narwhal/primary/src/tests/certificate_tests.rs @@ -11,6 +11,7 @@ use rand::{ SeedableRng, }; use std::num::NonZeroUsize; +use test_utils::latest_protocol_version; use test_utils::CommitteeFixture; use types::{Certificate, Vote, VoteAPI}; @@ -19,11 +20,19 @@ fn test_empty_certificate_verification() { let fixture = CommitteeFixture::builder().build(); let committee = fixture.committee(); - let header = fixture.header(); + let header = fixture.header(&latest_protocol_version()); // You should not be allowed to create a certificate that does not satisfying quorum requirements - assert!(Certificate::new_unverified(&committee, header.clone(), Vec::new()).is_err()); - - let certificate = Certificate::new_unsigned(&committee, header, Vec::new()).unwrap(); + assert!(Certificate::new_unverified( + &latest_protocol_version(), + &committee, + header.clone(), + Vec::new() + ) + .is_err()); + + let certificate = + Certificate::new_unsigned(&latest_protocol_version(), &committee, header, Vec::new()) + .unwrap(); assert!(certificate .verify(&committee, &fixture.worker_cache()) .is_err()); @@ -33,7 +42,7 @@ fn test_empty_certificate_verification() { fn test_valid_certificate_verification() { let fixture = CommitteeFixture::builder().build(); let committee = fixture.committee(); - let header = fixture.header(); + let header = fixture.header(&latest_protocol_version()); let mut signatures = Vec::new(); @@ -43,7 +52,9 @@ fn test_valid_certificate_verification() { signatures.push((vote.author(), vote.signature().clone())); } - let certificate = Certificate::new_unverified(&committee, header, signatures).unwrap(); + let certificate = + Certificate::new_unverified(&latest_protocol_version(), &committee, header, signatures) + .unwrap(); assert!(certificate .verify(&committee, &fixture.worker_cache()) @@ -54,7 +65,7 @@ fn test_valid_certificate_verification() { fn test_certificate_insufficient_signatures() { let fixture = CommitteeFixture::builder().build(); let committee = fixture.committee(); - let header = fixture.header(); + let header = fixture.header(&latest_protocol_version()); let mut signatures = Vec::new(); @@ -64,9 +75,17 @@ fn test_certificate_insufficient_signatures() { signatures.push((vote.author(), vote.signature().clone())); } - assert!(Certificate::new_unverified(&committee, header.clone(), signatures.clone()).is_err()); + assert!(Certificate::new_unverified( + &latest_protocol_version(), + &committee, + header.clone(), + signatures.clone() + ) + .is_err()); - let certificate = Certificate::new_unsigned(&committee, header, signatures).unwrap(); + let certificate = + Certificate::new_unsigned(&latest_protocol_version(), &committee, header, signatures) + .unwrap(); assert!(certificate .verify(&committee, &fixture.worker_cache()) @@ -77,7 +96,7 @@ fn test_certificate_insufficient_signatures() { fn test_certificate_validly_repeated_public_keys() { let fixture = CommitteeFixture::builder().build(); let committee = fixture.committee(); - let header = fixture.header(); + let header = fixture.header(&latest_protocol_version()); let mut signatures = Vec::new(); @@ -89,7 +108,8 @@ fn test_certificate_validly_repeated_public_keys() { signatures.push((vote.author(), vote.signature().clone())); } - let certificate_res = Certificate::new_unverified(&committee, header, signatures); + let certificate_res = + Certificate::new_unverified(&latest_protocol_version(), &committee, header, signatures); assert!(certificate_res.is_ok()); let certificate = certificate_res.unwrap(); @@ -102,7 +122,7 @@ fn test_certificate_validly_repeated_public_keys() { fn test_unknown_signature_in_certificate() { let fixture = CommitteeFixture::builder().build(); let committee = fixture.committee(); - let header = fixture.header(); + let header = fixture.header(&latest_protocol_version()); let mut signatures = Vec::new(); @@ -118,7 +138,13 @@ fn test_unknown_signature_in_certificate() { let vote = Vote::new_with_signer(&header, &malicious_id, &malicious_key); signatures.push((vote.author(), vote.signature().clone())); - assert!(Certificate::new_unverified(&committee, header, signatures).is_err()); + assert!(Certificate::new_unverified( + &latest_protocol_version(), + &committee, + header, + signatures + ) + .is_err()); } proptest::proptest! { @@ -130,7 +156,7 @@ proptest::proptest! { .committee_size(NonZeroUsize::new(committee_size).unwrap()) .build(); let committee = fixture.committee(); - let header = fixture.header(); + let header = fixture.header(&latest_protocol_version()); let mut signatures = Vec::new(); @@ -141,7 +167,7 @@ proptest::proptest! { signatures.push((vote.author(), vote.signature().clone())); } - let certificate = Certificate::new_unverified(&committee, header, signatures).unwrap(); + let certificate = Certificate::new_unverified(&latest_protocol_version(), &committee, header, signatures).unwrap(); assert!(certificate .verify(&committee, &fixture.worker_cache()) diff --git a/narwhal/primary/src/tests/certifier_tests.rs b/narwhal/primary/src/tests/certifier_tests.rs index 0871870bac809..5c7a81394757b 100644 --- a/narwhal/primary/src/tests/certifier_tests.rs +++ b/narwhal/primary/src/tests/certifier_tests.rs @@ -13,6 +13,7 @@ use primary::NUM_SHUTDOWN_RECEIVERS; use prometheus::Registry; use rand::{rngs::StdRng, SeedableRng}; use std::num::NonZeroUsize; +use test_utils::latest_protocol_version; use test_utils::CommitteeFixture; use tokio::sync::watch; use tokio::time::Duration; @@ -44,7 +45,7 @@ async fn propose_header() { let (certificate_store, payload_store) = create_db_stores(); // Create a fake header. - let proposed_header = primary.header(&committee); + let proposed_header = primary.header(&latest_protocol_version(), &committee); // Set up network. let own_address = committee @@ -103,6 +104,7 @@ async fn propose_header() { let synchronizer = Arc::new(Synchronizer::new( id, fixture.committee(), + latest_protocol_version(), worker_cache.clone(), /* gc_depth */ 50, client, @@ -119,6 +121,7 @@ async fn propose_header() { let _handle = Certifier::spawn( id, committee.clone(), + latest_protocol_version(), certificate_store.clone(), synchronizer, signature_service, @@ -159,7 +162,7 @@ async fn propose_header_failure() { let (certificate_store, payload_store) = create_db_stores(); // Create a fake header. - let proposed_header = primary.header(&committee); + let proposed_header = primary.header(&latest_protocol_version(), &committee); // Set up network. let own_address = committee @@ -201,6 +204,7 @@ async fn propose_header_failure() { let synchronizer = Arc::new(Synchronizer::new( authority_id, fixture.committee(), + latest_protocol_version(), worker_cache.clone(), /* gc_depth */ 50, client, @@ -217,6 +221,7 @@ async fn propose_header_failure() { let _handle = Certifier::spawn( authority_id, committee.clone(), + latest_protocol_version(), certificate_store.clone(), synchronizer, signature_service, @@ -278,7 +283,7 @@ async fn run_vote_aggregator_with_param( let (certificate_store, payload_store) = create_db_stores(); // Create a fake header. - let proposed_header = primary.header(&committee); + let proposed_header = primary.header(&latest_protocol_version(), &committee); // Set up network. let own_address = committee @@ -332,6 +337,7 @@ async fn run_vote_aggregator_with_param( let synchronizer = Arc::new(Synchronizer::new( id, fixture.committee(), + latest_protocol_version(), worker_cache.clone(), /* gc_depth */ 50, client, @@ -347,6 +353,7 @@ async fn run_vote_aggregator_with_param( let _handle = Certifier::spawn( id, committee.clone(), + latest_protocol_version(), certificate_store.clone(), synchronizer, signature_service, @@ -401,6 +408,7 @@ async fn shutdown_core() { let synchronizer = Arc::new(Synchronizer::new( id, fixture.committee(), + latest_protocol_version(), worker_cache.clone(), /* gc_depth */ 50, client, @@ -431,6 +439,7 @@ async fn shutdown_core() { let handle = Certifier::spawn( id, committee.clone(), + latest_protocol_version(), certificate_store.clone(), synchronizer.clone(), signature_service.clone(), diff --git a/narwhal/primary/src/tests/primary_tests.rs b/narwhal/primary/src/tests/primary_tests.rs index dbe5eca88fc05..60106767b4a3c 100644 --- a/narwhal/primary/src/tests/primary_tests.rs +++ b/narwhal/primary/src/tests/primary_tests.rs @@ -25,7 +25,9 @@ use std::{ time::Duration, }; use storage::{NodeStorage, VoteDigestStore}; -use test_utils::{make_optimal_signed_certificates, temp_dir, CommitteeFixture}; +use test_utils::{ + latest_protocol_version, make_optimal_signed_certificates, temp_dir, CommitteeFixture, +}; use tokio::{sync::watch, time::timeout}; use types::{ now, Certificate, CertificateAPI, FetchCertificatesRequest, Header, HeaderAPI, @@ -288,6 +290,7 @@ async fn test_request_vote_has_missing_parents() { let synchronizer = Arc::new(Synchronizer::new( target_id, fixture.committee(), + latest_protocol_version(), worker_cache.clone(), /* gc_depth */ 50, client, @@ -315,7 +318,7 @@ async fn test_request_vote_has_missing_parents() { // Make some mock certificates that are parents of our new header. let committee: Committee = fixture.committee(); - let genesis = Certificate::genesis(&committee) + let genesis = Certificate::genesis(&latest_protocol_version(), &committee) .iter() .map(|x| x.digest()) .collect::>(); @@ -338,7 +341,7 @@ async fn test_request_vote_has_missing_parents() { // Create a test header. let test_header = Header::V1( author - .header_builder(&fixture.committee()) + .header_builder(&latest_protocol_version(), &fixture.committee()) .author(author_id) .round(3) .parents(round_2_certs.iter().map(|c| c.digest()).collect()) @@ -455,6 +458,7 @@ async fn test_request_vote_accept_missing_parents() { let synchronizer = Arc::new(Synchronizer::new( target_id, fixture.committee(), + latest_protocol_version(), worker_cache.clone(), /* gc_depth */ 50, client, @@ -482,7 +486,7 @@ async fn test_request_vote_accept_missing_parents() { // Make some mock certificates that are parents of our new header. let committee: Committee = fixture.committee(); - let genesis = Certificate::genesis(&committee) + let genesis = Certificate::genesis(&latest_protocol_version(), &committee) .iter() .map(|x| x.digest()) .collect::>(); @@ -506,7 +510,7 @@ async fn test_request_vote_accept_missing_parents() { // Create a test header. let test_header = Header::V1( author - .header_builder(&fixture.committee()) + .header_builder(&latest_protocol_version(), &fixture.committee()) .author(author_id) .round(3) .parents(round_2_certs.iter().map(|c| c.digest()).collect()) @@ -610,6 +614,7 @@ async fn test_request_vote_missing_batches() { let synchronizer = Arc::new(Synchronizer::new( authority_id, fixture.committee(), + latest_protocol_version(), worker_cache.clone(), /* gc_depth */ 50, client.clone(), @@ -640,7 +645,7 @@ async fn test_request_vote_missing_batches() { for primary in fixture.authorities().filter(|a| a.id() != authority_id) { let header = Header::V1( primary - .header_builder(&fixture.committee()) + .header_builder(&latest_protocol_version(), &fixture.committee()) .with_payload_batch( test_utils::fixture_batch_with_transactions( 10, @@ -653,7 +658,7 @@ async fn test_request_vote_missing_batches() { .unwrap(), ); - let certificate = fixture.certificate(&header); + let certificate = fixture.certificate(&latest_protocol_version(), &header); let digest = certificate.clone().digest(); certificates.insert(digest, certificate.clone()); @@ -664,7 +669,7 @@ async fn test_request_vote_missing_batches() { } let test_header = Header::V1( author - .header_builder(&fixture.committee()) + .header_builder(&latest_protocol_version(), &fixture.committee()) .round(2) .parents(certificates.keys().cloned().collect()) .with_payload_batch( @@ -755,6 +760,7 @@ async fn test_request_vote_already_voted() { let synchronizer = Arc::new(Synchronizer::new( id, fixture.committee(), + latest_protocol_version(), worker_cache.clone(), /* gc_depth */ 50, client.clone(), @@ -786,7 +792,7 @@ async fn test_request_vote_already_voted() { for primary in fixture.authorities().filter(|a| a.id() != id) { let header = Header::V1( primary - .header_builder(&fixture.committee()) + .header_builder(&latest_protocol_version(), &fixture.committee()) .with_payload_batch( test_utils::fixture_batch_with_transactions( 10, @@ -799,7 +805,7 @@ async fn test_request_vote_already_voted() { .unwrap(), ); - let certificate = fixture.certificate(&header); + let certificate = fixture.certificate(&latest_protocol_version(), &header); let digest = certificate.clone().digest(); certificates.insert(digest, certificate.clone()); @@ -831,7 +837,7 @@ async fn test_request_vote_already_voted() { // Verify Handler generates a Vote. let test_header = Header::V1( author - .header_builder(&fixture.committee()) + .header_builder(&latest_protocol_version(), &fixture.committee()) .round(2) .parents(certificates.keys().cloned().collect()) .with_payload_batch( @@ -883,7 +889,7 @@ async fn test_request_vote_already_voted() { // Verify a different request for the same round receives an error. let test_header = Header::V1( author - .header_builder(&fixture.committee()) + .header_builder(&latest_protocol_version(), &fixture.committee()) .round(2) .parents(certificates.keys().cloned().collect()) .with_payload_batch( @@ -943,6 +949,7 @@ async fn test_fetch_certificates_handler() { let synchronizer = Arc::new(Synchronizer::new( id, fixture.committee(), + latest_protocol_version(), worker_cache.clone(), /* gc_depth */ 50, client, @@ -968,16 +975,21 @@ async fn test_fetch_certificates_handler() { metrics: metrics.clone(), }; - let mut current_round: Vec<_> = Certificate::genesis(&fixture.committee()) - .into_iter() - .map(|cert| cert.header().clone()) - .collect(); + let mut current_round: Vec<_> = + Certificate::genesis(&latest_protocol_version(), &fixture.committee()) + .into_iter() + .map(|cert| cert.header().clone()) + .collect(); let mut headers = vec![]; let total_rounds = 4; for i in 0..total_rounds { let parents: BTreeSet<_> = current_round .into_iter() - .map(|header| fixture.certificate(&header).digest()) + .map(|header| { + fixture + .certificate(&latest_protocol_version(), &header) + .digest() + }) .collect(); (_, current_round) = fixture.headers_round(i, &parents, &test_utils::latest_protocol_version()); @@ -989,7 +1001,7 @@ async fn test_fetch_certificates_handler() { // Create certificates test data. let mut certificates = vec![]; for header in headers.into_iter() { - certificates.push(fixture.certificate(&header)); + certificates.push(fixture.certificate(&latest_protocol_version(), &header)); } assert_eq!(certificates.len(), total_certificates); assert_eq!(16, total_certificates); @@ -1114,6 +1126,7 @@ async fn test_request_vote_created_at_in_future() { let synchronizer = Arc::new(Synchronizer::new( id, fixture.committee(), + latest_protocol_version(), worker_cache.clone(), /* gc_depth */ 50, client.clone(), @@ -1144,7 +1157,7 @@ async fn test_request_vote_created_at_in_future() { for primary in fixture.authorities().filter(|a| a.id() != id) { let header = Header::V1( primary - .header_builder(&fixture.committee()) + .header_builder(&latest_protocol_version(), &fixture.committee()) .with_payload_batch( test_utils::fixture_batch_with_transactions( 10, @@ -1157,7 +1170,7 @@ async fn test_request_vote_created_at_in_future() { .unwrap(), ); - let certificate = fixture.certificate(&header); + let certificate = fixture.certificate(&latest_protocol_version(), &header); let digest = certificate.clone().digest(); certificates.insert(digest, certificate.clone()); @@ -1193,7 +1206,7 @@ async fn test_request_vote_created_at_in_future() { let test_header = Header::V1( author - .header_builder(&fixture.committee()) + .header_builder(&latest_protocol_version(), &fixture.committee()) .round(2) .parents(certificates.keys().cloned().collect()) .with_payload_batch( @@ -1231,7 +1244,7 @@ async fn test_request_vote_created_at_in_future() { let created_at = now() + 500; let test_header = author - .header_builder(&fixture.committee()) + .header_builder(&latest_protocol_version(), &fixture.committee()) .round(2) .parents(certificates.keys().cloned().collect()) .with_payload_batch( diff --git a/narwhal/primary/src/tests/proposer_tests.rs b/narwhal/primary/src/tests/proposer_tests.rs index 0f22430210623..f34c75437245b 100644 --- a/narwhal/primary/src/tests/proposer_tests.rs +++ b/narwhal/primary/src/tests/proposer_tests.rs @@ -30,6 +30,7 @@ async fn propose_empty() { let _proposer_handle = Proposer::spawn( name, committee.clone(), + &latest_protocol_version(), ProposerStore::new_for_tests(), /* header_num_of_batches_threshold */ 32, /* max_header_num_of_batches */ 100, @@ -77,6 +78,7 @@ async fn propose_payload_and_repropose_after_n_seconds() { let _proposer_handle = Proposer::spawn( name, committee.clone(), + &latest_protocol_version(), ProposerStore::new_for_tests(), /* header_num_of_batches_threshold */ 1, /* max_header_num_of_batches */ max_num_of_batches, @@ -147,10 +149,10 @@ async fn propose_payload_and_repropose_after_n_seconds() { // AND send some parents to advance the round let parents: Vec<_> = fixture - .headers() + .headers(&latest_protocol_version()) .iter() .take(4) - .map(|h| fixture.certificate(h)) + .map(|h| fixture.certificate(&latest_protocol_version(), h)) .collect(); let result = tx_parents.send((parents, 1, 0)).await; @@ -199,6 +201,7 @@ async fn equivocation_protection() { let proposer_handle = Proposer::spawn( authority_id, committee.clone(), + &latest_protocol_version(), proposer_store.clone(), /* header_num_of_batches_threshold */ 1, /* max_header_num_of_batches */ 10, @@ -238,10 +241,10 @@ async fn equivocation_protection() { // Create and send parents let parents: Vec<_> = fixture - .headers() + .headers(&latest_protocol_version()) .iter() .take(3) - .map(|h| fixture.certificate(h)) + .map(|h| fixture.certificate(&latest_protocol_version(), h)) .collect(); let result = tx_parents.send((parents, 1, 0)).await; @@ -271,6 +274,7 @@ async fn equivocation_protection() { let _proposer_handle = Proposer::spawn( authority_id, committee.clone(), + &latest_protocol_version(), proposer_store, /* header_num_of_batches_threshold */ 1, /* max_header_num_of_batches */ 10, @@ -309,10 +313,10 @@ async fn equivocation_protection() { // Create and send a superset parents, same round but different set from before let parents: Vec<_> = fixture - .headers() + .headers(&latest_protocol_version()) .iter() .take(4) - .map(|h| fixture.certificate(h)) + .map(|h| fixture.certificate(&latest_protocol_version(), h)) .collect(); let result = tx_parents.send((parents, 1, 0)).await; diff --git a/narwhal/primary/src/tests/synchronizer_tests.rs b/narwhal/primary/src/tests/synchronizer_tests.rs index 2ed94ef8bc1cc..8319c25689cd6 100644 --- a/narwhal/primary/src/tests/synchronizer_tests.rs +++ b/narwhal/primary/src/tests/synchronizer_tests.rs @@ -51,6 +51,7 @@ async fn accept_certificates() { let synchronizer = Arc::new(Synchronizer::new( authority_id, fixture.committee(), + latest_protocol_version(), worker_cache.clone(), /* gc_depth */ 50, client.clone(), @@ -78,10 +79,10 @@ async fn accept_certificates() { // Send 3 certificates to the Synchronizer. let certificates: Vec<_> = fixture - .headers() + .headers(&latest_protocol_version()) .iter() .take(3) - .map(|h| fixture.certificate(h)) + .map(|h| fixture.certificate(&latest_protocol_version(), h)) .collect(); for cert in certificates.clone() { synchronizer.try_accept_certificate(cert).await.unwrap(); @@ -148,6 +149,7 @@ async fn accept_suspended_certificates() { let synchronizer = Arc::new(Synchronizer::new( authority_id, fixture.committee(), + latest_protocol_version(), worker_cache.clone(), /* gc_depth */ 50, client, @@ -163,7 +165,7 @@ async fn accept_suspended_certificates() { // Make fake certificates. let committee = fixture.committee(); - let genesis = Certificate::genesis(&committee) + let genesis = Certificate::genesis(&latest_protocol_version(), &committee) .iter() .map(|x| x.digest()) .collect::>(); @@ -180,7 +182,7 @@ async fn accept_suspended_certificates() { ); let certificates = certificates.into_iter().collect_vec(); - // Try to aceept certificates from round 2 to 5. All of them should be suspended. + // Try to accept certificates from round 2 to 5. All of them should be suspended. let accept = FuturesUnordered::new(); for cert in &certificates[NUM_AUTHORITIES..] { match synchronizer.try_accept_certificate(cert.clone()).await { @@ -193,7 +195,7 @@ async fn accept_suspended_certificates() { } } - // Try to aceept certificates from round 1. All of them should be accepted. + // Try to accept certificates from round 1. All of them should be accepted. for cert in &certificates[..NUM_AUTHORITIES] { match synchronizer.try_accept_certificate(cert.clone()).await { Ok(()) => continue, @@ -204,7 +206,7 @@ async fn accept_suspended_certificates() { // Wait for all notifications to arrive. accept.collect::>().await; - // Try to aceept certificates from round 2 and above again. All of them should be accepted. + // Try to accept certificates from round 2 and above again. All of them should be accepted. for cert in &certificates[NUM_AUTHORITIES..] { match synchronizer.try_accept_certificate(cert.clone()).await { Ok(()) => continue, @@ -254,6 +256,7 @@ async fn synchronizer_recover_basic() { let synchronizer = Arc::new(Synchronizer::new( name, fixture.committee(), + latest_protocol_version(), worker_cache.clone(), /* gc_depth */ 50, client.clone(), @@ -281,10 +284,10 @@ async fn synchronizer_recover_basic() { // Send 3 certificates to Synchronizer. let certificates: Vec<_> = fixture - .headers() + .headers(&latest_protocol_version()) .iter() .take(3) - .map(|h| fixture.certificate(h)) + .map(|h| fixture.certificate(&latest_protocol_version(), h)) .collect(); for cert in certificates.clone() { synchronizer.try_accept_certificate(cert).await.unwrap(); @@ -302,6 +305,7 @@ async fn synchronizer_recover_basic() { let _synchronizer = Arc::new(Synchronizer::new( name, fixture.committee(), + latest_protocol_version(), worker_cache.clone(), /* gc_depth */ 50, client.clone(), @@ -370,6 +374,7 @@ async fn synchronizer_recover_partial_certs() { let synchronizer = Arc::new(Synchronizer::new( name, fixture.committee(), + latest_protocol_version(), worker_cache.clone(), /* gc_depth */ 50, client.clone(), @@ -397,10 +402,10 @@ async fn synchronizer_recover_partial_certs() { // Send 1 certificate. let certificates: Vec = fixture - .headers() + .headers(&latest_protocol_version()) .iter() .take(3) - .map(|h| fixture.certificate(h)) + .map(|h| fixture.certificate(&latest_protocol_version(), h)) .collect(); let last_cert = certificates.clone().into_iter().last().unwrap(); synchronizer @@ -420,6 +425,7 @@ async fn synchronizer_recover_partial_certs() { let synchronizer = Arc::new(Synchronizer::new( name, fixture.committee(), + latest_protocol_version(), worker_cache.clone(), /* gc_depth */ 50, client.clone(), @@ -480,6 +486,7 @@ async fn synchronizer_recover_previous_round() { let synchronizer = Arc::new(Synchronizer::new( name, fixture.committee(), + latest_protocol_version(), worker_cache.clone(), /* gc_depth */ 50, client.clone(), @@ -506,7 +513,7 @@ async fn synchronizer_recover_previous_round() { client.set_primary_network(network.clone()); // Send 3 certificates from round 1, and 2 certificates from round 2 to Synchronizer. - let genesis_certs = Certificate::genesis(&committee); + let genesis_certs = Certificate::genesis(&latest_protocol_version(), &committee); let genesis = genesis_certs .iter() .map(|x| x.digest()) @@ -549,6 +556,7 @@ async fn synchronizer_recover_previous_round() { let _synchronizer = Arc::new(Synchronizer::new( name, fixture.committee(), + latest_protocol_version(), worker_cache.clone(), /* gc_depth */ 50, client.clone(), @@ -595,6 +603,7 @@ async fn deliver_certificate_using_store() { let synchronizer = Synchronizer::new( name, fixture.committee(), + latest_protocol_version(), worker_cache.clone(), /* gc_depth */ 50, client, @@ -609,7 +618,7 @@ async fn deliver_certificate_using_store() { ); // create some certificates in a complete DAG form - let genesis_certs = Certificate::genesis(&committee); + let genesis_certs = Certificate::genesis(&latest_protocol_version(), &committee); let genesis = genesis_certs .iter() .map(|x| x.digest()) @@ -666,6 +675,7 @@ async fn deliver_certificate_not_found_parents() { let synchronizer = Synchronizer::new( name, fixture.committee(), + latest_protocol_version(), worker_cache.clone(), /* gc_depth */ 50, client, @@ -680,7 +690,7 @@ async fn deliver_certificate_not_found_parents() { ); // create some certificates in a complete DAG form - let genesis_certs = Certificate::genesis(&committee); + let genesis_certs = Certificate::genesis(&latest_protocol_version(), &committee); let genesis = genesis_certs .iter() .map(|x| x.digest()) @@ -748,6 +758,7 @@ async fn sync_batches_drops_old() { let synchronizer = Arc::new(Synchronizer::new( primary.id(), fixture.committee(), + latest_protocol_version(), worker_cache.clone(), /* gc_depth */ 50, client, @@ -765,7 +776,7 @@ async fn sync_batches_drops_old() { for _ in 0..3 { let header = Header::V1( author - .header_builder(&fixture.committee()) + .header_builder(&latest_protocol_version(), &fixture.committee()) .with_payload_batch( test_utils::fixture_batch_with_transactions(10, &latest_protocol_version()), 0, @@ -775,7 +786,7 @@ async fn sync_batches_drops_old() { .unwrap(), ); - let certificate = fixture.certificate(&header); + let certificate = fixture.certificate(&latest_protocol_version(), &header); let digest = certificate.clone().digest(); certificates.insert(digest, certificate.clone()); @@ -786,7 +797,7 @@ async fn sync_batches_drops_old() { } let test_header = Header::V1( author - .header_builder(&fixture.committee()) + .header_builder(&latest_protocol_version(), &fixture.committee()) .round(2) .parents(certificates.keys().cloned().collect()) .with_payload_batch( @@ -834,6 +845,7 @@ async fn gc_suspended_certificates() { let synchronizer = Arc::new(Synchronizer::new( primary.id(), fixture.committee(), + latest_protocol_version(), worker_cache.clone(), /* gc_depth */ GC_DEPTH, client, @@ -849,7 +861,7 @@ async fn gc_suspended_certificates() { // Make 5 rounds of fake certificates. let committee: Committee = fixture.committee(); - let genesis = Certificate::genesis(&committee) + let genesis = Certificate::genesis(&latest_protocol_version(), &committee) .iter() .map(|x| x.digest()) .collect::>(); diff --git a/narwhal/storage/src/certificate_store.rs b/narwhal/storage/src/certificate_store.rs index 3eb9bfaf85732..a3f1927ab38b5 100644 --- a/narwhal/storage/src/certificate_store.rs +++ b/narwhal/storage/src/certificate_store.rs @@ -800,23 +800,28 @@ mod test { fn certificates(rounds: u64) -> Vec { let fixture = CommitteeFixture::builder().build(); let committee = fixture.committee(); - let mut current_round: Vec<_> = Certificate::genesis(&committee) - .into_iter() - .map(|cert| cert.header().clone()) - .collect(); + let mut current_round: Vec<_> = + Certificate::genesis(&latest_protocol_version(), &committee) + .into_iter() + .map(|cert| cert.header().clone()) + .collect(); let mut result: Vec = Vec::new(); for i in 0..rounds { let parents: BTreeSet<_> = current_round .iter() - .map(|header| fixture.certificate(header).digest()) + .map(|header| { + fixture + .certificate(&latest_protocol_version(), header) + .digest() + }) .collect(); (_, current_round) = fixture.headers_round(i, &parents, &latest_protocol_version()); result.extend( current_round .iter() - .map(|h| fixture.certificate(h)) + .map(|h| fixture.certificate(&latest_protocol_version(), h)) .collect::>(), ); } diff --git a/narwhal/storage/src/consensus_store.rs b/narwhal/storage/src/consensus_store.rs index e097668c2557c..33580876119b7 100644 --- a/narwhal/storage/src/consensus_store.rs +++ b/narwhal/storage/src/consensus_store.rs @@ -192,7 +192,7 @@ mod test { use crate::ConsensusStore; use std::collections::HashMap; use store::Map; - use test_utils::CommitteeFixture; + use test_utils::{latest_protocol_version, CommitteeFixture}; use types::{ Certificate, CommittedSubDag, CommittedSubDagShell, ConsensusCommit, ConsensusCommitV2, ReputationScores, TimestampMs, @@ -209,7 +209,7 @@ mod test { for sequence_number in 0..10 { let sub_dag = CommittedSubDag::new( vec![], - Certificate::default(), + Certificate::default(&latest_protocol_version()), sequence_number, ReputationScores::new(&committee), None, @@ -237,7 +237,7 @@ mod test { let sub_dag = CommittedSubDag::new( vec![], - Certificate::default(), + Certificate::default(&latest_protocol_version()), sequence_number, scores, None, diff --git a/narwhal/test-utils/src/lib.rs b/narwhal/test-utils/src/lib.rs index 88005599f8081..b684271f9682e 100644 --- a/narwhal/test-utils/src/lib.rs +++ b/narwhal/test-utils/src/lib.rs @@ -798,7 +798,9 @@ pub fn mock_certificate_with_rand( .payload(fixture_payload_with_rand(1, rand, protocol_config)) .build() .unwrap(); - let certificate = Certificate::new_unsigned(committee, Header::V1(header), Vec::new()).unwrap(); + let certificate = + Certificate::new_unsigned(protocol_config, committee, Header::V1(header), Vec::new()) + .unwrap(); (certificate.digest(), certificate) } @@ -833,7 +835,9 @@ pub fn mock_certificate_with_epoch( .payload(fixture_payload(1, protocol_config)) .build() .unwrap(); - let certificate = Certificate::new_unsigned(committee, Header::V1(header), Vec::new()).unwrap(); + let certificate = + Certificate::new_unsigned(protocol_config, committee, Header::V1(header), Vec::new()) + .unwrap(); (certificate.digest(), certificate) } @@ -855,15 +859,21 @@ pub fn mock_signed_certificate( let header = header_builder.build().unwrap(); - let cert = - Certificate::new_unsigned(committee, Header::V1(header.clone()), Vec::new()).unwrap(); + let cert = Certificate::new_unsigned( + protocol_config, + committee, + Header::V1(header.clone()), + Vec::new(), + ) + .unwrap(); let mut votes = Vec::new(); for (name, signer) in signers { let sig = Signature::new_secure(&to_intent_message(cert.header().digest()), signer); votes.push((*name, sig)) } - let cert = Certificate::new_unverified(committee, Header::V1(header), votes).unwrap(); + let cert = + Certificate::new_unverified(protocol_config, committee, Header::V1(header), votes).unwrap(); (cert.digest(), cert) } @@ -1038,24 +1048,27 @@ impl CommitteeFixture { // pub fn header(&self, author: PublicKey) -> Header { // Currently sign with the last authority - pub fn header(&self) -> Header { - self.authorities.last().unwrap().header(&self.committee()) + pub fn header(&self, protocol_config: &ProtocolConfig) -> Header { + self.authorities + .last() + .unwrap() + .header(protocol_config, &self.committee()) } - pub fn headers(&self) -> Vec
{ + pub fn headers(&self, protocol_config: &ProtocolConfig) -> Vec
{ let committee = self.committee(); self.authorities .iter() - .map(|a| a.header_with_round(&committee, 1)) + .map(|a| a.header_with_round(protocol_config, &committee, 1)) .collect() } - pub fn headers_next_round(&self) -> Vec
{ + pub fn headers_next_round(&self, protocol_config: &ProtocolConfig) -> Vec
{ let committee = self.committee(); self.authorities .iter() - .map(|a| a.header_with_round(&committee, 2)) + .map(|a| a.header_with_round(protocol_config, &committee, 2)) .collect() } @@ -1100,14 +1113,14 @@ impl CommitteeFixture { .collect() } - pub fn certificate(&self, header: &Header) -> Certificate { + pub fn certificate(&self, protocol_config: &ProtocolConfig, header: &Header) -> Certificate { let committee = self.committee(); let votes: Vec<_> = self .votes(header) .into_iter() .map(|x| (x.author(), x.signature().clone())) .collect(); - Certificate::new_unverified(&committee, header.clone(), votes).unwrap() + Certificate::new_unverified(protocol_config, &committee, header.clone(), votes).unwrap() } } @@ -1177,18 +1190,23 @@ impl AuthorityFixture { ) } - pub fn header(&self, committee: &Committee) -> Header { + pub fn header(&self, protocol_config: &ProtocolConfig, committee: &Committee) -> Header { let header = self - .header_builder(committee) + .header_builder(protocol_config, committee) .payload(Default::default()) .build() .unwrap(); Header::V1(header) } - pub fn header_with_round(&self, committee: &Committee, round: Round) -> Header { + pub fn header_with_round( + &self, + protocol_config: &ProtocolConfig, + committee: &Committee, + round: Round, + ) -> Header { let header = self - .header_builder(committee) + .header_builder(protocol_config, committee) .payload(Default::default()) .round(round) .build() @@ -1196,13 +1214,17 @@ impl AuthorityFixture { Header::V1(header) } - pub fn header_builder(&self, committee: &Committee) -> types::HeaderV1Builder { + pub fn header_builder( + &self, + protocol_config: &ProtocolConfig, + committee: &Committee, + ) -> types::HeaderV1Builder { types::HeaderV1Builder::default() .author(self.id()) .round(1) .epoch(committee.epoch()) .parents( - Certificate::genesis(committee) + Certificate::genesis(protocol_config, committee) .iter() .map(|x| x.digest()) .collect(), diff --git a/narwhal/types/benches/verify_certificate.rs b/narwhal/types/benches/verify_certificate.rs index 63115c0dbaf0e..9127dfe648782 100644 --- a/narwhal/types/benches/verify_certificate.rs +++ b/narwhal/types/benches/verify_certificate.rs @@ -4,10 +4,10 @@ use criterion::{ criterion_group, criterion_main, BenchmarkId, Criterion, SamplingMode, Throughput, }; -use fastcrypto::hash::Hash; +use fastcrypto::{hash::Hash, traits::KeyPair}; use narwhal_types::Certificate; use std::collections::BTreeSet; -use test_utils::{latest_protocol_version, make_optimal_certificates, CommitteeFixture}; +use test_utils::{latest_protocol_version, make_optimal_signed_certificates, CommitteeFixture}; pub fn verify_certificates(c: &mut Criterion) { let mut bench_group = c.benchmark_group("verify_certificate"); @@ -19,19 +19,22 @@ pub fn verify_certificates(c: &mut Criterion) { .committee_size(committee_size.try_into().unwrap()) .build(); let committee = fixture.committee(); - let ids: Vec<_> = fixture.authorities().map(|a| a.id()).collect(); + let keys: Vec<_> = fixture + .authorities() + .map(|a| (a.id(), a.keypair().copy())) + .collect(); // process certificates for rounds, check we don't grow the dag too much - let genesis = Certificate::genesis(&committee) + let genesis = Certificate::genesis(&latest_protocol_version(), &committee) .iter() .map(|x| x.digest()) .collect::>(); - let (certificates, _next_parents) = make_optimal_certificates( - &committee, - &latest_protocol_version(), + let (certificates, _next_parents) = make_optimal_signed_certificates( 1..=1, &genesis, - &ids, + &committee, + &latest_protocol_version(), + keys.as_slice(), ); let certificate = certificates.front().unwrap().clone(); @@ -44,7 +47,9 @@ pub fn verify_certificates(c: &mut Criterion) { |b, cert| { let worker_cache = fixture.worker_cache(); b.iter(|| { - let _ = cert.verify(&committee, &worker_cache); + cert.clone() + .verify(&committee, &worker_cache) + .expect("Verification failed"); }) }, ); diff --git a/narwhal/types/src/consensus.rs b/narwhal/types/src/consensus.rs index 5a6b8037e2e4f..e61b2cbefcc77 100644 --- a/narwhal/types/src/consensus.rs +++ b/narwhal/types/src/consensus.rs @@ -39,7 +39,7 @@ impl Hash<{ crypto::DIGEST_LENGTH }> for ConsensusOutput { } } -#[derive(Serialize, Deserialize, Clone, Debug, Default)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct CommittedSubDag { /// The sequence of committed certificates. pub certificates: Vec, @@ -449,7 +449,7 @@ mod tests { use indexmap::IndexMap; use std::collections::BTreeSet; use std::num::NonZeroUsize; - use test_utils::CommitteeFixture; + use test_utils::{latest_protocol_version, CommitteeFixture}; #[test] fn test_zero_timestamp_in_sub_dag() { @@ -467,8 +467,13 @@ mod tests { .build() .unwrap(); - let certificate = - Certificate::new_unsigned(&committee, Header::V1(header), Vec::new()).unwrap(); + let certificate = Certificate::new_unsigned( + &latest_protocol_version(), + &committee, + Header::V1(header), + Vec::new(), + ) + .unwrap(); // AND we initialise the sub dag via the "restore" way let sub_dag_round = CommittedSubDag { @@ -503,8 +508,13 @@ mod tests { .build() .unwrap(); - let certificate = - Certificate::new_unsigned(&committee, Header::V1(header), Vec::new()).unwrap(); + let certificate = Certificate::new_unsigned( + &latest_protocol_version(), + &committee, + Header::V1(header), + Vec::new(), + ) + .unwrap(); // AND let sub_dag_round_2 = CommittedSubDag::new( @@ -530,8 +540,13 @@ mod tests { .build() .unwrap(); - let certificate = - Certificate::new_unsigned(&committee, Header::V1(header), Vec::new()).unwrap(); + let certificate = Certificate::new_unsigned( + &latest_protocol_version(), + &committee, + Header::V1(header), + Vec::new(), + ) + .unwrap(); // WHEN create the sub dag based on the "previously committed" sub dag. let sub_dag_round_4 = CommittedSubDag::new( diff --git a/narwhal/types/src/primary.rs b/narwhal/types/src/primary.rs index e32ba9658083c..03dfa45ceae33 100644 --- a/narwhal/types/src/primary.rs +++ b/narwhal/types/src/primary.rs @@ -865,78 +865,111 @@ impl PartialEq for Vote { #[enum_dispatch(CertificateAPI)] pub enum Certificate { V1(CertificateV1), -} - -// TODO: Revisit if we should not impl Default for Certificate -impl Default for Certificate { - fn default() -> Self { - Self::V1(CertificateV1::default()) - } + V2(CertificateV2), } impl Certificate { - // TODO: Add version number and match on that - pub fn genesis(committee: &Committee) -> Vec { - CertificateV1::genesis(committee) - .into_iter() - .map(Self::V1) - .collect() + pub fn genesis(protocol_config: &ProtocolConfig, committee: &Committee) -> Vec { + if protocol_config.narwhal_certificate_v2() { + CertificateV2::genesis(committee) + .into_iter() + .map(Self::V2) + .collect() + } else { + CertificateV1::genesis(committee) + .into_iter() + .map(Self::V1) + .collect() + } } pub fn new_unverified( + protocol_config: &ProtocolConfig, committee: &Committee, header: Header, votes: Vec<(AuthorityIdentifier, Signature)>, ) -> DagResult { - CertificateV1::new_unverified(committee, header, votes) + if protocol_config.narwhal_certificate_v2() { + CertificateV2::new_unverified(committee, header, votes) + } else { + CertificateV1::new_unverified(committee, header, votes) + } } pub fn new_unsigned( + protocol_config: &ProtocolConfig, committee: &Committee, header: Header, votes: Vec<(AuthorityIdentifier, Signature)>, ) -> DagResult { - CertificateV1::new_unsigned(committee, header, votes) + if protocol_config.narwhal_certificate_v2() { + CertificateV2::new_unsigned(committee, header, votes) + } else { + CertificateV1::new_unsigned(committee, header, votes) + } } - pub fn new_test_empty(author: AuthorityIdentifier) -> Self { - CertificateV1::new_test_empty(author) + pub fn new_test_empty(protocol_config: &ProtocolConfig, author: AuthorityIdentifier) -> Self { + if protocol_config.narwhal_certificate_v2() { + CertificateV2::new_test_empty(author) + } else { + CertificateV1::new_test_empty(author) + } } /// This function requires that certificate was verified against given committee pub fn signed_authorities(&self, committee: &Committee) -> Vec { match self { Certificate::V1(certificate) => certificate.signed_authorities(committee), + Certificate::V2(certificate) => certificate.signed_authorities(committee), } } pub fn signed_by(&self, committee: &Committee) -> (Stake, Vec) { match self { Certificate::V1(certificate) => certificate.signed_by(committee), + Certificate::V2(certificate) => certificate.signed_by(committee), } } - pub fn verify(&self, committee: &Committee, worker_cache: &WorkerCache) -> DagResult<()> { + pub fn verify( + self, + committee: &Committee, + worker_cache: &WorkerCache, + ) -> DagResult { match self { Certificate::V1(certificate) => certificate.verify(committee, worker_cache), + Certificate::V2(certificate) => certificate.verify(committee, worker_cache), } } pub fn round(&self) -> Round { match self { Certificate::V1(certificate) => certificate.round(), + Certificate::V2(certificate) => certificate.round(), } } pub fn epoch(&self) -> Epoch { match self { Certificate::V1(certificate) => certificate.epoch(), + Certificate::V2(certificate) => certificate.epoch(), } } pub fn origin(&self) -> AuthorityIdentifier { match self { Certificate::V1(certificate) => certificate.origin(), + Certificate::V2(certificate) => certificate.origin(), + } + } + + // Used for testing + pub fn default(protocol_config: &ProtocolConfig) -> Certificate { + if protocol_config.narwhal_certificate_v2() { + Certificate::V2(CertificateV2::default()) + } else { + Certificate::V1(CertificateV1::default()) } } } @@ -947,6 +980,7 @@ impl Hash<{ crypto::DIGEST_LENGTH }> for Certificate { fn digest(&self) -> CertificateDigest { match self { Certificate::V1(data) => data.digest(), + Certificate::V2(data) => data.digest(), } } } @@ -954,13 +988,17 @@ impl Hash<{ crypto::DIGEST_LENGTH }> for Certificate { #[enum_dispatch] pub trait CertificateAPI { fn header(&self) -> &Header; - fn aggregated_signature(&self) -> &AggregateSignatureBytes; + fn aggregated_signature(&self) -> Option<&AggregateSignatureBytes>; fn signed_authorities(&self) -> &roaring::RoaringBitmap; fn metadata(&self) -> &Metadata; // Used for testing. fn update_header(&mut self, header: Header); fn header_mut(&mut self) -> &mut Header; + + // CertificateV2 + fn signature_verification_state(&self) -> &SignatureVerificationState; + fn set_signature_verification_state(&mut self, state: SignatureVerificationState); } #[serde_as] @@ -978,8 +1016,16 @@ impl CertificateAPI for CertificateV1 { &self.header } - fn aggregated_signature(&self) -> &AggregateSignatureBytes { - &self.aggregated_signature + fn aggregated_signature(&self) -> Option<&AggregateSignatureBytes> { + Some(&self.aggregated_signature) + } + + fn signature_verification_state(&self) -> &SignatureVerificationState { + unimplemented!("CertificateV2 field! Use aggregated_signature."); + } + + fn set_signature_verification_state(&mut self, _state: SignatureVerificationState) { + unimplemented!("CertificateV2 field! Use aggregated_signature."); } fn signed_authorities(&self) -> &roaring::RoaringBitmap { @@ -1134,8 +1180,11 @@ impl CertificateV1 { } /// Verifies the validity of the certificate. - /// TODO: Output a different type, similar to Sui VerifiedCertificate. - pub fn verify(&self, committee: &Committee, worker_cache: &WorkerCache) -> DagResult<()> { + pub fn verify( + self, + committee: &Committee, + worker_cache: &WorkerCache, + ) -> DagResult { // Ensure the header is from the correct epoch. ensure!( self.epoch() == committee.epoch(), @@ -1146,8 +1195,8 @@ impl CertificateV1 { ); // Genesis certificates are always valid. - if self.round() == 0 && Self::genesis(committee).contains(self) { - return Ok(()); + if self.round() == 0 && Self::genesis(committee).contains(&self) { + return Ok(Certificate::V1(self)); } // Save signature verifications when the header is invalid. @@ -1167,7 +1216,303 @@ impl CertificateV1 { .verify_secure(&to_intent_message(certificate_digest), &pks[..]) .map_err(|_| DagError::InvalidSignature)?; - Ok(()) + Ok(Certificate::V1(self)) + } + + pub fn round(&self) -> Round { + self.header.round() + } + + pub fn epoch(&self) -> Epoch { + self.header.epoch() + } + + pub fn origin(&self) -> AuthorityIdentifier { + self.header.author() + } +} + +// Holds AggregateSignatureBytes but with the added layer to specify the +// signatures verification state. This will be used to take advantage of the +// certificate chain that is formed via the DAG by only verifying the +// leaves of the certificate chain when they are fetched from validators +// during catchup. +#[derive(Clone, Serialize, Deserialize, MallocSizeOf, Debug)] +pub enum SignatureVerificationState { + // This state occurs when the certificate has not yet received a quorum of + // signatures. + Unsigned(AggregateSignatureBytes), + // This state occurs when a certificate has just been received from the network + // and has not been verified yet. + Unverified(AggregateSignatureBytes), + // This state occurs when a certificate was either created locally, received + // via brodacast, or fetched but was not the parent of another certifiate. + // Therefore this certificate had to be verified directly. + VerifiedDirectly(AggregateSignatureBytes), + // This state occurs when the cert was a parent of another fetched certificate + // that was verified directly, then this certificate is verified indirectly. + VerifiedIndirectly, + // This state occurs only for genesis certificates which always has valid + // signatures bytes but the bytes are garbage so we don't mark them as verified. + Genesis, +} + +impl Default for SignatureVerificationState { + fn default() -> Self { + SignatureVerificationState::Unsigned(AggregateSignatureBytes::default()) + } +} + +#[serde_as] +#[derive(Clone, Serialize, Deserialize, Default, MallocSizeOf)] +pub struct CertificateV2 { + pub header: Header, + pub signature_verification_state: SignatureVerificationState, + #[serde_as(as = "NarwhalBitmap")] + signed_authorities: roaring::RoaringBitmap, + pub metadata: Metadata, +} + +impl CertificateAPI for CertificateV2 { + fn header(&self) -> &Header { + &self.header + } + + fn aggregated_signature(&self) -> Option<&AggregateSignatureBytes> { + match &self.signature_verification_state { + SignatureVerificationState::VerifiedDirectly(bytes) + | SignatureVerificationState::Unverified(bytes) + | SignatureVerificationState::Unsigned(bytes) => Some(bytes), + SignatureVerificationState::VerifiedIndirectly + | SignatureVerificationState::Genesis => None, + } + } + + fn signature_verification_state(&self) -> &SignatureVerificationState { + &self.signature_verification_state + } + + fn set_signature_verification_state(&mut self, state: SignatureVerificationState) { + self.signature_verification_state = state; + } + + fn signed_authorities(&self) -> &roaring::RoaringBitmap { + &self.signed_authorities + } + + fn metadata(&self) -> &Metadata { + &self.metadata + } + + // Used for testing. + fn update_header(&mut self, header: Header) { + self.header = header; + } + + fn header_mut(&mut self) -> &mut Header { + &mut self.header + } +} + +impl CertificateV2 { + pub fn genesis(committee: &Committee) -> Vec { + committee + .authorities() + .map(|authority| Self { + header: Header::V1(HeaderV1 { + author: authority.id(), + epoch: committee.epoch(), + ..Default::default() + }), + signature_verification_state: SignatureVerificationState::Genesis, + ..Self::default() + }) + .collect() + } + + pub fn new_unverified( + committee: &Committee, + header: Header, + votes: Vec<(AuthorityIdentifier, Signature)>, + ) -> DagResult { + Self::new_unsafe(committee, header, votes, true) + } + + pub fn new_unsigned( + committee: &Committee, + header: Header, + votes: Vec<(AuthorityIdentifier, Signature)>, + ) -> DagResult { + Self::new_unsafe(committee, header, votes, false) + } + + pub fn new_test_empty(author: AuthorityIdentifier) -> Certificate { + let header = Header::V1(HeaderV1 { + author, + ..Default::default() + }); + Certificate::V1(CertificateV1 { + header, + ..Default::default() + }) + } + + fn new_unsafe( + committee: &Committee, + header: Header, + votes: Vec<(AuthorityIdentifier, Signature)>, + check_stake: bool, + ) -> DagResult { + let mut votes = votes; + votes.sort_by_key(|(pk, _)| *pk); + let mut votes: VecDeque<_> = votes.into_iter().collect(); + + let mut weight = 0; + let mut sigs = Vec::new(); + + let filtered_votes = committee + .authorities() + .enumerate() + .filter(|(_, authority)| { + if !votes.is_empty() && authority.id() == votes.front().unwrap().0 { + sigs.push(votes.pop_front().unwrap()); + weight += authority.stake(); + // If there are repeats, also remove them + while !votes.is_empty() && votes.front().unwrap() == sigs.last().unwrap() { + votes.pop_front().unwrap(); + } + return true; + } + false + }) + .map(|(index, _)| index as u32); + + let signed_authorities= roaring::RoaringBitmap::from_sorted_iter(filtered_votes) + .map_err(|_| DagError::InvalidBitmap("Failed to convert votes into a bitmap of authority keys. Something is likely very wrong...".to_string()))?; + + // Ensure that all authorities in the set of votes are known + ensure!( + votes.is_empty(), + DagError::UnknownAuthority(votes.front().unwrap().0.to_string()) + ); + + // Ensure that the authorities have enough weight + ensure!( + !check_stake || weight >= committee.quorum_threshold(), + DagError::CertificateRequiresQuorum + ); + + let aggregated_signature = if sigs.is_empty() { + AggregateSignature::default() + } else { + AggregateSignature::aggregate::>( + sigs.iter().map(|(_, sig)| sig).collect(), + ) + .map_err(|_| DagError::InvalidSignature)? + }; + + let aggregate_signature_bytes = AggregateSignatureBytes::from(&aggregated_signature); + + let aggregate_signature_verification_state = if !check_stake { + SignatureVerificationState::Unsigned(aggregate_signature_bytes) + } else { + SignatureVerificationState::Unverified(aggregate_signature_bytes) + }; + + Ok(Certificate::V2(CertificateV2 { + header, + signature_verification_state: aggregate_signature_verification_state, + signed_authorities, + metadata: Metadata::default(), + })) + } + + /// This function requires that certificate was verified against given committee + pub fn signed_authorities(&self, committee: &Committee) -> Vec { + assert_eq!(committee.epoch(), self.epoch()); + let (_stake, pks) = self.signed_by(committee); + pks + } + + pub fn signed_by(&self, committee: &Committee) -> (Stake, Vec) { + // Ensure the certificate has a quorum. + let mut weight = 0; + + let auth_indexes = self.signed_authorities.iter().collect::>(); + let mut auth_iter = 0; + let pks = committee + .authorities() + .enumerate() + .filter(|(i, authority)| match auth_indexes.get(auth_iter) { + Some(index) if *index == *i as u32 => { + weight += authority.stake(); + auth_iter += 1; + true + } + _ => false, + }) + .map(|(_, authority)| authority.protocol_key().clone()) + .collect(); + (weight, pks) + } + + /// Verifies the validity of the certificate. + pub fn verify( + self, + committee: &Committee, + worker_cache: &WorkerCache, + ) -> DagResult { + // Ensure the header is from the correct epoch. + ensure!( + self.epoch() == committee.epoch(), + DagError::InvalidEpoch { + expected: committee.epoch(), + received: self.epoch() + } + ); + + // Genesis certificates are always valid. + if self.round() == 0 && Self::genesis(committee).contains(&self) { + return Ok(Certificate::V2(self)); + } + + // Save signature verifications when the header is invalid. + self.header.validate(committee, worker_cache)?; + + let (weight, pks) = self.signed_by(committee); + + ensure!( + weight >= committee.quorum_threshold(), + DagError::CertificateRequiresQuorum + ); + + let verified_cert = self.verify_signature(pks)?; + + Ok(verified_cert) + } + + fn verify_signature(mut self, pks: Vec) -> DagResult { + let aggregrate_signature_bytes = match self.signature_verification_state { + SignatureVerificationState::VerifiedIndirectly + | SignatureVerificationState::VerifiedDirectly(_) + | SignatureVerificationState::Genesis => return Ok(Certificate::V2(self)), + SignatureVerificationState::Unverified(ref bytes) => bytes, + SignatureVerificationState::Unsigned(_) => { + bail!(DagError::CertificateRequiresQuorum); + } + }; + + // Verify the signatures + let certificate_digest: Digest<{ crypto::DIGEST_LENGTH }> = Digest::from(self.digest()); + AggregateSignature::try_from(aggregrate_signature_bytes) + .map_err(|_| DagError::InvalidSignature)? + .verify_secure(&to_intent_message(certificate_digest), &pks[..]) + .map_err(|_| DagError::InvalidSignature)?; + + self.signature_verification_state = + SignatureVerificationState::VerifiedDirectly(aggregrate_signature_bytes.clone()); + + Ok(Certificate::V2(self)) } pub fn round(&self) -> Round { @@ -1248,6 +1593,14 @@ impl Hash<{ crypto::DIGEST_LENGTH }> for CertificateV1 { } } +impl Hash<{ crypto::DIGEST_LENGTH }> for CertificateV2 { + type TypedDigest = CertificateDigest; + + fn digest(&self) -> CertificateDigest { + CertificateDigest(self.header.digest().0) + } +} + impl fmt::Debug for Certificate { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { match self { @@ -1260,6 +1613,15 @@ impl fmt::Debug for Certificate { data.header.digest(), data.epoch() ), + Certificate::V2(data) => write!( + f, + "{}: C{}({}, {}, E{})", + data.digest(), + data.round(), + data.origin(), + data.header.digest(), + data.epoch() + ), } } } @@ -1268,6 +1630,13 @@ impl PartialEq for Certificate { fn eq(&self, other: &Self) -> bool { match (self, other) { (Certificate::V1(data), Certificate::V1(other_data)) => data.eq(other_data), + (Certificate::V2(data), Certificate::V2(other_data)) => data.eq(other_data), + (Certificate::V1(_), Certificate::V2(_)) => { + unimplemented!("Invalid comparison between CertificateV1 & CertificateV2"); + } + (Certificate::V2(_), Certificate::V1(_)) => { + unimplemented!("Invalid comparison between CertificateV2 & CertificateV1"); + } } } } @@ -1282,6 +1651,16 @@ impl PartialEq for CertificateV1 { } } +impl PartialEq for CertificateV2 { + fn eq(&self, other: &Self) -> bool { + let mut ret = self.header().digest() == other.header().digest(); + ret &= self.round() == other.round(); + ret &= self.epoch() == other.epoch(); + ret &= self.origin() == other.origin(); + ret + } +} + /// Request for broadcasting certificates to peers. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct SendCertificateRequest { diff --git a/narwhal/types/tests/primary_tests.rs b/narwhal/types/tests/primary_tests.rs index 9239daf704ef9..972123aa2bbf9 100644 --- a/narwhal/types/tests/primary_tests.rs +++ b/narwhal/types/tests/primary_tests.rs @@ -9,7 +9,7 @@ use rand::rngs::OsRng; use rand::seq::SliceRandom; use std::collections::BTreeSet; use std::num::NonZeroUsize; -use test_utils::{AuthorityFixture, CommitteeFixture}; +use test_utils::{latest_protocol_version, AuthorityFixture, CommitteeFixture}; #[tokio::test] async fn test_certificate_singers_are_ordered() { @@ -47,7 +47,13 @@ async fn test_certificate_singers_are_ordered() { votes.shuffle(&mut OsRng); // Create a certificate - let certificate = Certificate::new_unverified(&committee, Header::V1(header), votes).unwrap(); + let certificate = Certificate::new_unverified( + &latest_protocol_version(), + &committee, + Header::V1(header), + votes, + ) + .unwrap(); let (stake, signers) = certificate.signed_by(&committee);