diff --git a/Cargo.lock b/Cargo.lock index 1f60a8f961319..1baaa77e01f76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -655,7 +655,7 @@ dependencies = [ "futures-util", "handlebars", "http", - "indexmap 2.0.0", + "indexmap 2.0.2", "lru 0.7.8", "mime", "multer", @@ -725,7 +725,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7d74240f9daa8c1e8f73e9cfcc338d20a88d00bbeb83ded49ce8e5b4dcec0f5" dependencies = [ "bytes", - "indexmap 2.0.0", + "indexmap 2.0.2", "serde", "serde_json", ] @@ -4163,9 +4163,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.14.0" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" +checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12" [[package]] name = "hdrhistogram" @@ -4561,12 +4561,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.0.0" +version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d" +checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897" dependencies = [ "equivalent", - "hashbrown 0.14.0", + "hashbrown 0.14.1", "serde", ] @@ -13970,7 +13970,7 @@ dependencies = [ "handlebars", "hashbrown 0.12.3", "hashbrown 0.13.2", - "hashbrown 0.14.0", + "hashbrown 0.14.1", "hdrhistogram", "headers", "headers-core", @@ -14008,7 +14008,7 @@ dependencies = [ "include_dir_macros", "indenter", "indexmap 1.9.3", - "indexmap 2.0.0", + "indexmap 2.0.2", "indicatif", "inout", "inquire", diff --git a/crates/sui-protocol-config/src/lib.rs b/crates/sui-protocol-config/src/lib.rs index a701f6272d377..49ac1f54779b3 100644 --- a/crates/sui-protocol-config/src/lib.rs +++ b/crates/sui-protocol-config/src/lib.rs @@ -76,7 +76,8 @@ const MAX_PROTOCOL_VERSION: u64 = 28; // Version 25: Add sui::table_vec::swap and sui::table_vec::swap_remove to system packages. // Version 26: New gas model version. // Add support for receiving objects off of other objects in devnet only. -// Version 27: Add sui::zklogin::verify_zklogin_id and related functions to sui framework. +// Version 28: Add sui::zklogin::verify_zklogin_id and related functions to sui framework. +// Use CertificateV2 in narwhal #[derive(Copy, Clone, Debug, Hash, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub struct ProtocolVersion(u64); @@ -1529,9 +1530,10 @@ impl ProtocolConfig { cfg.check_zklogin_id_cost_base = Some(200); // zklogin::check_zklogin_issuer cfg.check_zklogin_issuer_cost_base = Some(200); - // Only enable effects v2 on devnet. + // Only enable effects v2 & nw certificate v2 on devnet. if chain != Chain::Mainnet && chain != Chain::Testnet { cfg.feature_flags.enable_effects_v2 = true; + cfg.feature_flags.narwhal_certificate_v2 = true; } } // Use this template when making changes: diff --git a/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__version_28.snap b/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__version_28.snap index 361d906b89f10..93f8571c3d5fb 100644 --- a/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__version_28.snap +++ b/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__version_28.snap @@ -36,6 +36,7 @@ feature_flags: loaded_child_object_format_type: true receive_objects: true enable_effects_v2: true + narwhal_certificate_v2: true max_tx_size_bytes: 131072 max_input_objects: 2048 max_size_written_objects: 5000000 diff --git a/narwhal/node/tests/staged/narwhal.yaml b/narwhal/node/tests/staged/narwhal.yaml index a538603f3cf02..19095a25a8fec 100644 --- a/narwhal/node/tests/staged/narwhal.yaml +++ b/narwhal/node/tests/staged/narwhal.yaml @@ -32,23 +32,21 @@ BatchV2: TYPENAME: VersionedMetadata Certificate: ENUM: - 0: - V1: + 1: + V2: NEWTYPE: - TYPENAME: CertificateV1 + TYPENAME: CertificateV2 CertificateDigest: NEWTYPESTRUCT: TUPLEARRAY: CONTENT: U8 SIZE: 32 -CertificateV1: +CertificateV2: STRUCT: - header: TYPENAME: Header - - aggregated_signature: - TUPLEARRAY: - CONTENT: U8 - SIZE: 48 + - signature_verification_state: + TYPENAME: SignatureVerificationState - signed_authorities: BYTES - metadata: TYPENAME: Metadata @@ -88,6 +86,14 @@ MetadataV1: - created_at: U64 - received_at: OPTION: U64 +SignatureVerificationState: + ENUM: + 0: + Unsigned: + NEWTYPE: + TUPLEARRAY: + CONTENT: U8 + SIZE: 48 VersionedMetadata: ENUM: 0: diff --git a/narwhal/primary/src/aggregators.rs b/narwhal/primary/src/aggregators.rs index 7ceefdb52071f..2553748649e85 100644 --- a/narwhal/primary/src/aggregators.rs +++ b/narwhal/primary/src/aggregators.rs @@ -16,7 +16,7 @@ use tracing::warn; use types::{ ensure, error::{DagError, DagResult}, - Certificate, CertificateAPI, Header, Vote, VoteAPI, + Certificate, CertificateAPI, Header, SignatureVerificationState, Vote, VoteAPI, }; /// Aggregates votes for a particular header into a certificate. @@ -62,7 +62,7 @@ impl VotesAggregator { .votes_received_last_round .set(self.votes.len() as i64); if self.weight >= committee.quorum_threshold() { - let cert = Certificate::new_unverified( + let mut cert = Certificate::new_unverified( &self.protocol_config, committee, header.clone(), @@ -83,9 +83,7 @@ impl VotesAggregator { "Failed to verify aggregated sig on certificate: {} error: {}", certificate_digest, err ); - let mut i = 0; - while i < self.votes.len() { - let (id, sig) = &self.votes[i]; + self.votes.retain(|(id, sig)| { let pk = committee.authority_safe(id).protocol_key(); if sig .verify_secure(&to_intent_message(certificate_digest), pk) @@ -93,14 +91,26 @@ impl VotesAggregator { { warn!("Invalid signature on header from authority: {}", id); self.weight -= committee.stake(pk); - self.votes.remove(i); + false } else { - i += 1; + true } - } + }); return Ok(None); } - Ok(_) => return Ok(Some(cert)), + Ok(_) => { + // TODO: Move this block and the AggregateSignature verification into Certificate + if self.protocol_config.narwhal_certificate_v2() { + cert.set_signature_verification_state( + SignatureVerificationState::VerifiedDirectly( + cert.aggregated_signature() + .ok_or(DagError::InvalidSignature)? + .clone(), + ), + ); + } + return Ok(Some(cert)); + } } } Ok(None) diff --git a/narwhal/primary/src/certificate_fetcher.rs b/narwhal/primary/src/certificate_fetcher.rs index c0d71141dd689..7f30167d415be 100644 --- a/narwhal/primary/src/certificate_fetcher.rs +++ b/narwhal/primary/src/certificate_fetcher.rs @@ -6,18 +6,21 @@ use anemo::Request; use config::{AuthorityIdentifier, Committee}; use consensus::consensus::ConsensusRound; use crypto::NetworkPublicKey; +use fastcrypto::hash::Hash; use futures::{stream::FuturesUnordered, StreamExt}; use itertools::Itertools; use mysten_metrics::metered_channel::Receiver; use mysten_metrics::{monitored_future, monitored_scope, spawn_logged_monitored_task}; use network::PrimaryToPrimaryRpc; use rand::{rngs::ThreadRng, seq::SliceRandom}; +use std::collections::HashSet; use std::{ collections::{BTreeMap, BTreeSet}, sync::Arc, time::Duration, }; use storage::CertificateStore; +use sui_protocol_config::ProtocolConfig; use tokio::task::{spawn_blocking, JoinSet}; use tokio::{ sync::watch, @@ -27,8 +30,9 @@ use tokio::{ use tracing::{debug, error, instrument, trace, warn}; use types::{ error::{DagError, DagResult}, - Certificate, CertificateAPI, ConditionalBroadcastReceiver, FetchCertificatesRequest, - FetchCertificatesResponse, HeaderAPI, Round, + validate_received_certificate_version, Certificate, CertificateAPI, + ConditionalBroadcastReceiver, FetchCertificatesRequest, FetchCertificatesResponse, HeaderAPI, + Round, SignatureVerificationState, }; #[cfg(test)] @@ -46,6 +50,10 @@ const PARALLEL_FETCH_REQUEST_ADDITIONAL_TIMEOUT: Duration = Duration::from_secs( // Batch size is chosen so that verifying a batch takes non-trival // time (verifying a batch of 200 certificates should take > 100ms). const VERIFY_CERTIFICATES_BATCH_SIZE: usize = 200; +// Number of leaf certificates to verify in a batch. Verifications in each +// batch run serially. Smaller batch size is chosen as there are significantly +// less certificates we are verifying when verifiying leaves only +const VERIFY_LEAF_CERTIFICATES_BATCH_SIZE: usize = 10; #[derive(Clone, Debug)] pub enum CertificateFetcherCommand { @@ -68,6 +76,7 @@ pub(crate) struct CertificateFetcher { state: Arc, /// The committee information. committee: Committee, + protocol_config: ProtocolConfig, /// Persistent storage for certificates. Read-only usage. certificate_store: CertificateStore, /// Receiver for signal of round changes. @@ -104,6 +113,7 @@ impl CertificateFetcher { pub fn spawn( authority_id: AuthorityIdentifier, committee: Committee, + protocol_config: ProtocolConfig, network: anemo::Network, certificate_store: CertificateStore, rx_consensus_round_updates: watch::Receiver, @@ -124,6 +134,7 @@ impl CertificateFetcher { Self { state, committee, + protocol_config, certificate_store, rx_consensus_round_updates, rx_shutdown, @@ -277,6 +288,7 @@ impl CertificateFetcher { let state = self.state.clone(); let committee = self.committee.clone(); + let protocol_config = self.protocol_config.clone(); debug!( "Starting task to fetch missing certificates: max target {}, gc round {:?}", @@ -289,7 +301,15 @@ impl CertificateFetcher { state.metrics.certificate_fetcher_inflight_fetch.inc(); let now = Instant::now(); - match run_fetch_task(state.clone(), committee, gc_round, written_rounds).await { + match run_fetch_task( + &protocol_config, + state.clone(), + committee, + gc_round, + written_rounds, + ) + .await + { Ok(_) => { debug!( "Finished task to fetch certificates successfully, elapsed = {}s", @@ -313,6 +333,7 @@ impl CertificateFetcher { #[allow(clippy::mutable_key_type)] #[instrument(level = "debug", skip_all)] async fn run_fetch_task( + protocol_config: &ProtocolConfig, state: Arc, committee: Committee, gc_round: Round, @@ -330,7 +351,13 @@ async fn run_fetch_task( // Process and store fetched certificates. let num_certs_fetched = response.certificates.len(); - process_certificates_helper(response, &state.synchronizer, state.metrics.clone()).await?; + process_certificates_helper( + protocol_config, + response, + &state.synchronizer, + state.metrics.clone(), + ) + .await?; state .metrics .certificate_fetcher_num_certificates_processed @@ -422,6 +449,7 @@ async fn fetch_certificates_helper( #[instrument(level = "debug", skip_all)] async fn process_certificates_helper( + protocol_config: &ProtocolConfig, response: FetchCertificatesResponse, synchronizer: &Synchronizer, metrics: Arc, @@ -433,12 +461,43 @@ async fn process_certificates_helper( MAX_CERTIFICATES_TO_FETCH, )); } - // Verify certificates in parallel. // In PrimaryReceiverHandler, certificates already in storage are ignored. // The check is unnecessary here, because there is no concurrent processing of older // certificates. For byzantine failures, the check will not be effective anyway. let _verify_scope = monitored_scope("VerifyingFetchedCertificates"); - let all_certificates = response.certificates; + + // Verify certificates in parallel. If we are using CertificateV2 only verify + // the tip of a certificate chain and verify the parent certificates of that tip + // indirectly. + if protocol_config.narwhal_certificate_v2() { + process_certificates_v2_helper(protocol_config, response, synchronizer, metrics).await?; + } else { + process_certificates_v1_helper(protocol_config, response, synchronizer, metrics).await?; + } + + trace!("Fetched certificates have been processed"); + + Ok(()) +} + +async fn process_certificates_v1_helper( + protocol_config: &ProtocolConfig, + response: FetchCertificatesResponse, + synchronizer: &Synchronizer, + metrics: Arc, +) -> DagResult<()> { + let mut all_certificates = vec![]; + for cert in response.certificates { + // We should not be getting mixed versions of certificates from a + // validator, so any individual certificate with mismatched versions + // should cancel processing for the entire batch of fetched certificates. + all_certificates.push( + validate_received_certificate_version(cert, protocol_config).map_err(|err| { + error!("{err}"); + DagError::Canceled + })?, + ); + } let verify_tasks = all_certificates .chunks(VERIFY_CERTIFICATES_BATCH_SIZE) .map(|certs| { @@ -475,7 +534,112 @@ async fn process_certificates_helper( .inc_by(now.elapsed().as_micros() as u64); } - trace!("Fetched certificates have been processed"); + Ok(()) +} + +async fn process_certificates_v2_helper( + protocol_config: &ProtocolConfig, + response: FetchCertificatesResponse, + synchronizer: &Synchronizer, + metrics: Arc, +) -> DagResult<()> { + let mut all_certificates = vec![]; + let mut all_parents = HashSet::new(); + for cert in response.certificates { + // We should not be getting mixed versions of certificates from a + // validator, so any individual certificate with mismatched versions + // should cancel processing for the entire batch of fetched certificates. + all_certificates.push( + validate_received_certificate_version(cert.clone(), protocol_config).map_err( + |err| { + error!("{err}"); + DagError::Canceled + }, + )?, + ); + + for parent_digest in cert.header().parents() { + all_parents.insert(*parent_digest); + } + } + + let all_certificates_count = all_certificates.len() as u64; + + // Identify leaf certs and preemptively set the parent certificates + // as verified indirectly. This is safe because any leaf certs that + // fail verification will cancel processing for all fetched certs. + let mut leaf_certs = Vec::new(); + for (idx, c) in all_certificates.iter_mut().enumerate() { + if !all_parents.contains(&c.digest()) { + leaf_certs.push((idx, c.clone())); + } else { + c.set_signature_verification_state(SignatureVerificationState::VerifiedIndirectly( + c.aggregated_signature() + .ok_or(DagError::InvalidSignature)? + .clone(), + )) + } + } + let leaves_count = leaf_certs.len() as u64; + + // Create verify tasks only for leaf certs as parent certs can skip this completely. + let leaf_verify_tasks = leaf_certs + .chunks(VERIFY_LEAF_CERTIFICATES_BATCH_SIZE) + .map(|chunk| { + let certs = chunk.to_vec(); + let sync = synchronizer.clone(); + let metrics = metrics.clone(); + spawn_blocking(move || { + let now = Instant::now(); + let mut sanitized_certs = Vec::new(); + for (idx, c) in certs { + sanitized_certs.push((idx, sync.sanitize_certificate(c)?)); + } + metrics + .certificate_fetcher_total_verification_us + .inc_by(now.elapsed().as_micros() as u64); + Ok::, DagError>(sanitized_certs) + }) + }) + .collect_vec(); + + // We ensure sanitization of certificates completes for all leaves + // fetched certificates before accepting any certficates. + for task in leaf_verify_tasks.into_iter() { + // Any certificates that fail to be verified should cancel the entire + // batch of fetched certficates. + let idx_and_certs = task.await.map_err(|err| { + error!("Cancelling due to {err:?}"); + DagError::Canceled + })??; + for (idx, cert) in idx_and_certs { + all_certificates[idx] = cert; + } + } + + metrics + .fetched_certificates_verified_directly + .inc_by(leaves_count); + metrics + .fetched_certificates_verified_indirectly + .inc_by(all_certificates_count.saturating_sub(leaves_count)); + + // Accept verified certificates in the same order as received. + for cert in all_certificates { + let cert_digest = cert.digest(); + let now = Instant::now(); + if let Err(e) = synchronizer.try_accept_fetched_certificate(cert).await { + // It is possible that subsequent certificates are useful, + // so not stopping early. + warn!( + "Failed to accept fetched certificate {:?}: {e}", + cert_digest + ); + } + metrics + .certificate_fetcher_total_accept_us + .inc_by(now.elapsed().as_micros() as u64); + } Ok(()) } diff --git a/narwhal/primary/src/metrics.rs b/narwhal/primary/src/metrics.rs index 35e62647e78ea..d6340253e8f92 100644 --- a/narwhal/primary/src/metrics.rs +++ b/narwhal/primary/src/metrics.rs @@ -328,6 +328,10 @@ pub struct PrimaryMetrics { pub header_max_parent_wait_ms: IntCounter, /// Counts when the GC loop in synchronizer times out waiting for consensus commit. pub synchronizer_gc_timeout: IntCounter, + // Total number of fetched certificates verified directly. + pub fetched_certificates_verified_directly: IntCounter, + // Total number of fetched certificates verified indirectly. + pub fetched_certificates_verified_indirectly: IntCounter, } impl PrimaryMetrics { @@ -513,6 +517,16 @@ impl PrimaryMetrics { "Counts when the GC loop in synchronizer times out waiting for consensus commit.", registry ).unwrap(), + fetched_certificates_verified_directly: register_int_counter_with_registry!( + "fetched_certificates_verified_directly", + "Total number of fetched certificates verified directly.", + registry + ).unwrap(), + fetched_certificates_verified_indirectly: register_int_counter_with_registry!( + "fetched_certificates_verified_indirectly", + "Total number of fetched certificates verified indirectly.", + registry + ).unwrap(), } } } diff --git a/narwhal/primary/src/primary.rs b/narwhal/primary/src/primary.rs index 79a8722422fe7..45a212e9c4972 100644 --- a/narwhal/primary/src/primary.rs +++ b/narwhal/primary/src/primary.rs @@ -10,7 +10,10 @@ use crate::{ synchronizer::Synchronizer, }; -use anemo::{codegen::InboundRequestLayer, types::Address}; +use anemo::{ + codegen::InboundRequestLayer, + types::{response::StatusCode, Address}, +}; use anemo::{types::PeerInfo, Network, PeerId}; use anemo_tower::auth::RequireAuthorizationLayer; use anemo_tower::set_header::SetResponseHeaderLayer; @@ -59,11 +62,11 @@ use tracing::{debug, error, info, instrument, warn}; use types::{ ensure, error::{DagError, DagResult}, - now, Certificate, CertificateAPI, CertificateDigest, FetchCertificatesRequest, - FetchCertificatesResponse, Header, HeaderAPI, MetadataAPI, PreSubscribedBroadcastSender, - PrimaryToPrimary, PrimaryToPrimaryServer, RequestVoteRequest, RequestVoteResponse, Round, - SendCertificateRequest, SendCertificateResponse, Vote, VoteInfoAPI, WorkerOthersBatchMessage, - WorkerOwnBatchMessage, WorkerToPrimary, WorkerToPrimaryServer, + now, validate_received_certificate_version, Certificate, CertificateAPI, CertificateDigest, + FetchCertificatesRequest, FetchCertificatesResponse, Header, HeaderAPI, MetadataAPI, + PreSubscribedBroadcastSender, PrimaryToPrimary, PrimaryToPrimaryServer, RequestVoteRequest, + RequestVoteResponse, Round, SendCertificateRequest, SendCertificateResponse, Vote, VoteInfoAPI, + WorkerOthersBatchMessage, WorkerOwnBatchMessage, WorkerToPrimary, WorkerToPrimaryServer, }; #[cfg(test)] @@ -191,6 +194,7 @@ impl Primary { let mut primary_service = PrimaryToPrimaryServer::new(PrimaryReceiverHandler { authority_id: authority.id(), committee: committee.clone(), + protocol_config: protocol_config.clone(), worker_cache: worker_cache.clone(), synchronizer: synchronizer.clone(), signature_service: signature_service.clone(), @@ -441,6 +445,7 @@ impl Primary { let certificate_fetcher_handle = CertificateFetcher::spawn( authority.id(), committee.clone(), + protocol_config.clone(), network.clone(), certificate_store, rx_consensus_round_updates, @@ -524,6 +529,7 @@ struct PrimaryReceiverHandler { /// The id of this primary. authority_id: AuthorityIdentifier, committee: Committee, + protocol_config: ProtocolConfig, worker_cache: WorkerCache, synchronizer: Arc, /// Service to sign headers. @@ -857,7 +863,17 @@ impl PrimaryToPrimary for PrimaryReceiverHandler { request: anemo::Request, ) -> Result, anemo::rpc::Status> { let _scope = monitored_scope("PrimaryReceiverHandler::send_certificate"); - let certificate = request.into_body().certificate; + let certificate = validate_received_certificate_version( + request.into_body().certificate, + &self.protocol_config, + ) + .map_err(|err| { + anemo::rpc::Status::new_with_message( + StatusCode::BadRequest, + format!("Invalid certifcate: {err}"), + ) + })?; + match self.synchronizer.try_accept_certificate(certificate).await { Ok(()) => Ok(anemo::Response::new(SendCertificateResponse { accepted: true, diff --git a/narwhal/primary/src/synchronizer.rs b/narwhal/primary/src/synchronizer.rs index 64e67241d1525..967ee5d221f4e 100644 --- a/narwhal/primary/src/synchronizer.rs +++ b/narwhal/primary/src/synchronizer.rs @@ -33,6 +33,7 @@ use tokio::{ time::{sleep, timeout}, }; use tracing::{debug, error, instrument, trace, warn}; +use types::SignatureVerificationState; use types::{ ensure, error::{AcceptNotification, DagError, DagResult}, @@ -59,6 +60,7 @@ struct Inner { authority_id: AuthorityIdentifier, /// Committee of the current epoch. committee: Committee, + protocol_config: ProtocolConfig, /// The worker information cache. worker_cache: WorkerCache, /// The depth of the garbage collector. @@ -167,6 +169,21 @@ impl Inner { } } + if self.protocol_config.narwhal_certificate_v2() + && !matches!( + certificate.signature_verification_state(), + SignatureVerificationState::VerifiedDirectly(_) + | SignatureVerificationState::VerifiedIndirectly(_) + | SignatureVerificationState::Genesis + ) + { + panic!( + "Attempting to write cert {:?} with invalid signature state {:?} to store", + certificate.digest(), + certificate.signature_verification_state() + ); + } + // Store the certificate and make it available as parent to other certificates. self.certificate_store .write(certificate.clone()) @@ -343,6 +360,7 @@ impl Synchronizer { let inner = Arc::new(Inner { authority_id, committee: committee.clone(), + protocol_config: protocol_config.clone(), worker_cache, gc_depth, gc_round: AtomicU64::new(gc_round), @@ -474,6 +492,17 @@ impl Synchronizer { debug!("Synchronizer is shutting down."); return; }; + + if protocol_config.narwhal_certificate_v2() { + assert!( + matches!( + certificate.signature_verification_state(), + SignatureVerificationState::VerifiedDirectly(_) + | SignatureVerificationState::VerifiedIndirectly(_) + ), + "Never accept certificates that have not been verified either directly or indirectly."); + } + let Some(inner) = weak_inner.upgrade() else { debug!("Synchronizer is shutting down."); return; diff --git a/narwhal/primary/src/tests/certificate_fetcher_tests.rs b/narwhal/primary/src/tests/certificate_fetcher_tests.rs index 8c25a8683fce4..201142b8b66ad 100644 --- a/narwhal/primary/src/tests/certificate_fetcher_tests.rs +++ b/narwhal/primary/src/tests/certificate_fetcher_tests.rs @@ -8,6 +8,7 @@ use crate::{ use anemo::async_trait; use anyhow::Result; use config::{AuthorityIdentifier, Epoch, WorkerId}; +use crypto::AggregateSignatureBytes; use fastcrypto::{hash::Hash, traits::KeyPair}; use indexmap::IndexMap; use itertools::Itertools; @@ -19,7 +20,7 @@ use storage::CertificateStore; use storage::NodeStorage; use consensus::consensus::ConsensusRound; -use test_utils::{latest_protocol_version, temp_dir, CommitteeFixture}; +use test_utils::{get_protocol_config, latest_protocol_version, temp_dir, CommitteeFixture}; use tokio::{ sync::{ mpsc::{self, error::TryRecvError, Receiver, Sender}, @@ -32,6 +33,7 @@ use types::{ FetchCertificatesResponse, Header, HeaderAPI, HeaderDigest, Metadata, PreSubscribedBroadcastSender, PrimaryToPrimary, PrimaryToPrimaryServer, RequestVoteRequest, RequestVoteResponse, Round, SendCertificateRequest, SendCertificateResponse, + SignatureVerificationState, }; pub struct NetworkProxy { @@ -72,7 +74,59 @@ impl PrimaryToPrimary for NetworkProxy { } } -async fn verify_certificates_in_store( +async fn verify_certificates_v2_in_store( + certificate_store: &CertificateStore, + certificates: &[Certificate], + expected_verified_directly_count: u64, + expected_verified_indirectly_count: u64, +) { + let mut missing = None; + let mut verified_indirectly = 0; + let mut verified_directly = 0; + for _ in 0..20 { + missing = None; + verified_directly = 0; + verified_indirectly = 0; + for (i, _) in certificates.iter().enumerate() { + if let Ok(Some(cert)) = certificate_store.read(certificates[i].digest()) { + match cert.signature_verification_state() { + SignatureVerificationState::VerifiedDirectly(_) => verified_directly += 1, + SignatureVerificationState::VerifiedIndirectly(_) => verified_indirectly += 1, + _ => panic!( + "Found unexpected stored signature state {:?}", + cert.signature_verification_state() + ), + }; + continue; + } + missing = Some(i); + break; + } + if missing.is_none() { + break; + } + sleep(Duration::from_secs(1)).await; + } + if let Some(i) = missing { + panic!( + "Missing certificate in store: input index {}, certificate: {:?}", + i, certificates[i] + ); + } + + assert_eq!( + verified_directly, expected_verified_directly_count, + "Verified {} certificates directly in the store, expected {}", + verified_directly, expected_verified_directly_count + ); + assert_eq!( + verified_indirectly, expected_verified_indirectly_count, + "Verified {} certificates indirectly in the store, expected {}", + verified_indirectly, expected_verified_indirectly_count + ); +} + +async fn verify_certificates_v1_in_store( certificate_store: &CertificateStore, certificates: &[Certificate], ) { @@ -103,13 +157,17 @@ fn verify_certificates_not_in_store( certificate_store: &CertificateStore, certificates: &[Certificate], ) { - assert!(certificate_store + let found_certificates = certificate_store .read_all(certificates.iter().map(|c| c.digest())) - .unwrap() - .into_iter() - .map_while(|c| c) - .next() - .is_none()); + .unwrap(); + + let found_count = found_certificates.iter().filter(|&c| c.is_some()).count(); + + assert_eq!( + found_count, 0, + "Found {} certificates in the store", + found_count + ); } // Used below to construct malformed Headers @@ -125,8 +183,10 @@ struct BadHeader { pub metadata: Metadata, } +// TODO: Remove after network has moved to CertificateV2 #[tokio::test(flavor = "current_thread", start_paused = true)] -async fn fetch_certificates_basic() { +async fn fetch_certificates_v1_basic() { + let cert_v1_protocol_config = get_protocol_config(27); let fixture = CommitteeFixture::builder().randomize_ports(true).build(); let worker_cache = fixture.worker_cache(); let primary = fixture.authorities().next().unwrap(); @@ -161,7 +221,7 @@ async fn fetch_certificates_basic() { let synchronizer = Arc::new(Synchronizer::new( id, fixture.committee(), - latest_protocol_version(), + cert_v1_protocol_config.clone(), worker_cache.clone(), gc_depth, client, @@ -196,6 +256,7 @@ async fn fetch_certificates_basic() { let _certificate_fetcher_handle = CertificateFetcher::spawn( id, fixture.committee(), + cert_v1_protocol_config.clone(), client_network.clone(), certificate_store.clone(), rx_consensus_round_updates.clone(), @@ -207,7 +268,7 @@ async fn fetch_certificates_basic() { // Generate headers and certificates in successive rounds let genesis_certs: Vec<_> = - Certificate::genesis(&latest_protocol_version(), &fixture.committee()); + Certificate::genesis(&cert_v1_protocol_config, &fixture.committee()); for cert in genesis_certs.iter() { certificate_store .write(cert.clone()) @@ -225,11 +286,11 @@ async fn fetch_certificates_basic() { .into_iter() .map(|header| { fixture - .certificate(&latest_protocol_version(), &header) + .certificate(&cert_v1_protocol_config, &header) .digest() }) .collect(); - (_, current_round) = fixture.headers_round(i, &parents, &latest_protocol_version()); + (_, current_round) = fixture.headers_round(i, &parents, &cert_v1_protocol_config); headers.extend(current_round.clone()); } @@ -242,7 +303,7 @@ async fn fetch_certificates_basic() { // Create certificates test data. let mut certificates = vec![]; for header in headers.into_iter() { - certificates.push(fixture.certificate(&latest_protocol_version(), &header)); + certificates.push(fixture.certificate(&cert_v1_protocol_config, &header)); } assert_eq!(certificates.len(), total_certificates); // note genesis is not included assert_eq!(240, total_certificates); @@ -284,7 +345,7 @@ async fn fetch_certificates_basic() { tx_fetch_resp.try_send(first_batch_resp.clone()).unwrap(); // The certificates up to index 4 + 62 = 66 should be written to store eventually by core. - verify_certificates_in_store( + verify_certificates_v1_in_store( &certificate_store, &certificates[0..(num_written + first_batch_len)], ) @@ -327,7 +388,7 @@ async fn fetch_certificates_basic() { tx_fetch_resp.try_send(second_batch_resp.clone()).unwrap(); // The certificates 4 ~ 64 should become available in store eventually. - verify_certificates_in_store( + verify_certificates_v1_in_store( &certificate_store, &certificates[0..(num_written + second_batch_len)], ) @@ -353,6 +414,22 @@ async fn fetch_certificates_basic() { } } + let target_index = num_written + 8; + assert!(!synchronizer + .get_missing_parents(&certificates[target_index].clone()) + .await + .unwrap() + .is_empty()); + + // Verify the fetch request. + let req = rx_fetch_req.recv().await.unwrap(); + let (lower_bound, skip_rounds) = req.get_bounds(); + assert_eq!(lower_bound, 0); + assert_eq!(skip_rounds.len(), fixture.authorities().count()); + for rounds in skip_rounds.values() { + assert_eq!(rounds, &(1..32).collect()); + } + // Send out a batch of malformed certificates. let mut certs = Vec::new(); // Add cert missing parent info. @@ -376,6 +453,365 @@ async fn fetch_certificates_basic() { .unwrap(); // Verify no certificate is written to store. - sleep(Duration::from_secs(5)).await; + sleep(Duration::from_secs(1)).await; verify_certificates_not_in_store(&certificate_store, &certificates[num_written..]); + + // Send out a batch of certificate V2s. + let mut certs = Vec::new(); + for cert in certificates.iter().skip(num_written).take(8) { + certs.push(fixture.certificate(&latest_protocol_version(), cert.header())); + } + tx_fetch_resp + .try_send(FetchCertificatesResponse { + certificates: certs, + }) + .unwrap(); + + sleep(Duration::from_secs(1)).await; + verify_certificates_not_in_store(&certificate_store, &certificates[num_written..target_index]); +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn fetch_certificates_v2_basic() { + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let worker_cache = fixture.worker_cache(); + let primary = fixture.authorities().next().unwrap(); + let client = NetworkClient::new_from_keypair(&primary.network_keypair()); + let id = primary.id(); + let fake_primary = fixture.authorities().nth(1).unwrap(); + let metrics = Arc::new(PrimaryMetrics::new(&Registry::new())); + let primary_channel_metrics = PrimaryChannelMetrics::new(&Registry::new()); + let gc_depth: Round = 50; + + // kept empty + let mut tx_shutdown = PreSubscribedBroadcastSender::new(NUM_SHUTDOWN_RECEIVERS); + // synchronizer to certificate fetcher + let (tx_certificate_fetcher, rx_certificate_fetcher) = test_utils::test_channel!(1000); + let (tx_new_certificates, _rx_new_certificates) = test_utils::test_channel!(1000); + let (tx_parents, _rx_parents) = test_utils::test_channel!(1000); + // FetchCertificateProxy -> test + let (tx_fetch_req, mut rx_fetch_req) = mpsc::channel(1000); + // test -> FetchCertificateProxy + let (tx_fetch_resp, rx_fetch_resp) = mpsc::channel(1000); + + // Create test stores. + let store = NodeStorage::reopen(temp_dir(), None); + let certificate_store = store.certificate_store.clone(); + let payload_store = store.payload_store.clone(); + + // Signal rounds + let (_tx_consensus_round_updates, rx_consensus_round_updates) = + watch::channel(ConsensusRound::new(0, 0)); + + // Make a synchronizer for certificates. + let synchronizer = Arc::new(Synchronizer::new( + id, + fixture.committee(), + latest_protocol_version(), + worker_cache.clone(), + gc_depth, + client, + certificate_store.clone(), + payload_store.clone(), + tx_certificate_fetcher, + tx_new_certificates.clone(), + tx_parents.clone(), + rx_consensus_round_updates.clone(), + metrics.clone(), + &primary_channel_metrics, + )); + + let fake_primary_addr = fake_primary.address().to_anemo_address().unwrap(); + let fake_route = + anemo::Router::new().add_rpc_service(PrimaryToPrimaryServer::new(NetworkProxy { + request: tx_fetch_req, + response: Arc::new(Mutex::new(rx_fetch_resp)), + })); + let fake_server_network = anemo::Network::bind(fake_primary_addr.clone()) + .server_name("narwhal") + .private_key(fake_primary.network_keypair().copy().private().0.to_bytes()) + .start(fake_route) + .unwrap(); + let client_network = test_utils::test_network(primary.network_keypair(), primary.address()); + client_network + .connect_with_peer_id(fake_primary_addr, fake_server_network.peer_id()) + .await + .unwrap(); + + // Make a certificate fetcher + let _certificate_fetcher_handle = CertificateFetcher::spawn( + id, + fixture.committee(), + latest_protocol_version(), + client_network.clone(), + certificate_store.clone(), + rx_consensus_round_updates.clone(), + tx_shutdown.subscribe(), + rx_certificate_fetcher, + synchronizer.clone(), + metrics.clone(), + ); + + // Generate headers and certificates in successive rounds + let genesis_certs: Vec<_> = + Certificate::genesis(&latest_protocol_version(), &fixture.committee()); + for cert in genesis_certs.iter() { + certificate_store + .write(cert.clone()) + .expect("Writing certificate to store failed"); + } + + let mut current_round: Vec<_> = genesis_certs + .into_iter() + .map(|cert| cert.header().clone()) + .collect(); + let mut headers = vec![]; + let rounds = 100; + for i in 0..rounds { + let parents: BTreeSet<_> = current_round + .into_iter() + .map(|header| { + fixture + .certificate(&latest_protocol_version(), &header) + .digest() + }) + .collect(); + (_, current_round) = fixture.headers_round(i, &parents, &latest_protocol_version()); + headers.extend(current_round.clone()); + } + + // Avoid any sort of missing payload by pre-populating the batch + for (digest, (worker_id, _)) in headers.iter().flat_map(|h| h.payload().iter()) { + payload_store.write(digest, worker_id).unwrap(); + } + + let total_certificates = fixture.authorities().count() * rounds as usize; + // Create certificates test data. + let mut certificates = vec![]; + for header in headers.into_iter() { + certificates.push(fixture.certificate(&latest_protocol_version(), &header)); + } + assert_eq!(certificates.len(), total_certificates); // note genesis is not included + assert_eq!(400, total_certificates); + + for cert in certificates.iter_mut().take(4) { + // Manually writing the certificates to store so we can consider them verified + // directly + cert.set_signature_verification_state(SignatureVerificationState::VerifiedDirectly( + cert.aggregated_signature() + .expect("Invalid Signature") + .clone(), + )); + certificate_store + .write(cert.clone()) + .expect("Writing certificate to store failed"); + } + let mut num_written = 4; + + // Send a primary message for a certificate with parents that do not exist locally, to trigger fetching. + let target_index = 123; + assert!(!synchronizer + .get_missing_parents(&certificates[target_index].clone()) + .await + .unwrap() + .is_empty()); + + // Verify the fetch request. + let mut req = rx_fetch_req.recv().await.unwrap(); + let (lower_bound, skip_rounds) = req.get_bounds(); + assert_eq!(lower_bound, 0); + assert_eq!(skip_rounds.len(), fixture.authorities().count()); + for rounds in skip_rounds.values() { + assert_eq!(rounds, &(1..2).collect()); + } + + // Send back another 62 certificates. + let first_batch_len = 62; + let first_batch_resp = FetchCertificatesResponse { + certificates: certificates + .iter() + .skip(num_written) + .take(first_batch_len) + .cloned() + .collect_vec(), + }; + tx_fetch_resp.try_send(first_batch_resp.clone()).unwrap(); + + // The certificates up to index 66 (4 + 62) should be written to store eventually by core. + verify_certificates_v2_in_store( + &certificate_store, + &certificates[0..(num_written + first_batch_len)], + 6, // 2 fetched certs verified directly + the initial 4 inserted + 60, // verified indirectly + ) + .await; + num_written += first_batch_len; + + // The certificate fetcher should send out another fetch request, because it has not received certificate 123. + loop { + match rx_fetch_req.recv().await { + Some(r) => { + let (_, skip_rounds) = r.get_bounds(); + if skip_rounds.values().next().unwrap().len() == 1 { + // Drain the fetch requests sent out before the last reply, when only 1 round in skip_rounds. + tx_fetch_resp.try_send(first_batch_resp.clone()).unwrap(); + continue; + } + req = r; + break; + } + None => panic!("Unexpected channel closing!"), + } + } + let (_, skip_rounds) = req.get_bounds(); + assert_eq!(skip_rounds.len(), fixture.authorities().count()); + for (_, rounds) in skip_rounds { + let rounds = rounds.into_iter().collect_vec(); + assert!(rounds == (1..=16).collect_vec() || rounds == (1..=17).collect_vec()); + } + + // Send back another 123 + 1 - 66 = 58 certificates. + let second_batch_len = target_index + 1 - num_written; + let second_batch_resp = FetchCertificatesResponse { + certificates: certificates + .iter() + .skip(num_written) + .take(second_batch_len) + .cloned() + .collect_vec(), + }; + tx_fetch_resp.try_send(second_batch_resp.clone()).unwrap(); + + // The certificates up to index 124 (4 + 62 + 58) should become available in store eventually. + verify_certificates_v2_in_store( + &certificate_store, + &certificates[0..(num_written + second_batch_len)], + 10, // 6 fetched certs verified directly + the initial 4 inserted + 114, // verified indirectly + ) + .await; + num_written += second_batch_len; + + // No new fetch request is expected. + sleep(Duration::from_secs(5)).await; + loop { + match rx_fetch_req.try_recv() { + Ok(r) => { + let (_, skip_rounds) = r.get_bounds(); + let first_num_skip_rounds = skip_rounds.values().next().unwrap().len(); + if first_num_skip_rounds == 16 || first_num_skip_rounds == 17 { + // Drain the fetch requests sent out before the last reply. + tx_fetch_resp.try_send(second_batch_resp.clone()).unwrap(); + continue; + } + panic!("No more fetch request is expected! {:#?}", r); + } + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => panic!("Unexpected disconnect!"), + } + } + + let target_index = num_written + 204; + assert!(!synchronizer + .get_missing_parents(&certificates[target_index].clone()) + .await + .unwrap() + .is_empty()); + + // Verify the fetch request. + let req = rx_fetch_req.recv().await.unwrap(); + let (lower_bound, skip_rounds) = req.get_bounds(); + assert_eq!(lower_bound, 0); + assert_eq!(skip_rounds.len(), fixture.authorities().count()); + for rounds in skip_rounds.values() { + assert_eq!(rounds, &(1..32).collect()); + } + + // Send out a batch of malformed certificates. + let mut certs = Vec::new(); + // Add cert missing parent info. + let mut cert = certificates[num_written].clone(); + cert.header_mut().clear_parents(); + certs.push(cert); + // Add cert with incorrect digest. + let mut cert = certificates[num_written].clone(); + // This is a bit tedious to craft + let cert_header = unsafe { std::mem::transmute::(cert.header().clone()) }; + let wrong_header = BadHeader { ..cert_header }; + let wolf_header = unsafe { std::mem::transmute::(wrong_header) }; + cert.update_header(wolf_header); + certs.push(cert); + // Add cert without all parents in storage. + certs.push(certificates[num_written + 1].clone()); + tx_fetch_resp + .try_send(FetchCertificatesResponse { + certificates: certs, + }) + .unwrap(); + + // Verify no certificate is written to store. + sleep(Duration::from_secs(1)).await; + verify_certificates_not_in_store(&certificate_store, &certificates[num_written..target_index]); + + // Send out a batch of certificates with bad signatures for parent certificates. + // and bad signatures for non-parent certificates. + let mut certs = Vec::new(); + for cert in certificates.iter().skip(num_written).take(204) { + let mut cert = cert.clone(); + cert.set_signature_verification_state(SignatureVerificationState::Unverified( + AggregateSignatureBytes::default(), + )); + certs.push(cert); + } + tx_fetch_resp + .try_send(FetchCertificatesResponse { + certificates: certs, + }) + .unwrap(); + + sleep(Duration::from_secs(1)).await; + verify_certificates_not_in_store(&certificate_store, &certificates[num_written..target_index]); + + // Send out a batch of certificate V1s. + let mut certs = Vec::new(); + for cert in certificates.iter().skip(num_written).take(204) { + certs.push(fixture.certificate(&get_protocol_config(27), cert.header())); + } + tx_fetch_resp + .try_send(FetchCertificatesResponse { + certificates: certs, + }) + .unwrap(); + + sleep(Duration::from_secs(1)).await; + verify_certificates_not_in_store(&certificate_store, &certificates[num_written..target_index]); + + // Send out a batch of certificates with good signatures for leaves and + // bad signatures for parent certificates. + let mut certs = Vec::new(); + for cert in certificates.iter().skip(num_written).take(200) { + let mut cert = cert.clone(); + cert.set_signature_verification_state(SignatureVerificationState::Unverified( + AggregateSignatureBytes::default(), + )); + certs.push(cert); + } + + for cert in certificates.iter().skip(num_written + 200).take(4) { + certs.push(cert.clone()); + } + tx_fetch_resp + .try_send(FetchCertificatesResponse { + certificates: certs, + }) + .unwrap(); + + // The certificates 4 + 62 + 58 + 204 = 328 should become available in store eventually. + verify_certificates_v2_in_store( + &certificate_store, + &certificates[0..(target_index)], + 14, // 10 fetched certs verified directly + the initial 4 inserted + 314, // verified indirectly + ) + .await; } diff --git a/narwhal/primary/src/tests/certificate_tests.rs b/narwhal/primary/src/tests/certificate_tests.rs index c55bf75750ede..8f76c6e5bedea 100644 --- a/narwhal/primary/src/tests/certificate_tests.rs +++ b/narwhal/primary/src/tests/certificate_tests.rs @@ -11,9 +11,8 @@ use rand::{ SeedableRng, }; use std::num::NonZeroUsize; -use test_utils::latest_protocol_version; -use test_utils::CommitteeFixture; -use types::{Certificate, Vote, VoteAPI}; +use test_utils::{get_protocol_config, latest_protocol_version, CommitteeFixture}; +use types::{Certificate, CertificateAPI, SignatureVerificationState, Vote, VoteAPI}; #[test] fn test_empty_certificate_verification() { @@ -39,10 +38,11 @@ fn test_empty_certificate_verification() { } #[test] -fn test_valid_certificate_verification() { +fn test_valid_certificate_v1_verification() { let fixture = CommitteeFixture::builder().build(); + let cert_v1_protocol_config = get_protocol_config(27); let committee = fixture.committee(); - let header = fixture.header(&latest_protocol_version()); + let header = fixture.header(&cert_v1_protocol_config); let mut signatures = Vec::new(); @@ -53,7 +53,7 @@ fn test_valid_certificate_verification() { } let certificate = - Certificate::new_unverified(&latest_protocol_version(), &committee, header, signatures) + Certificate::new_unverified(&cert_v1_protocol_config, &committee, header, signatures) .unwrap(); assert!(certificate @@ -61,6 +61,33 @@ fn test_valid_certificate_verification() { .is_ok()); } +#[test] +fn test_valid_certificate_v2_verification() { + let fixture = CommitteeFixture::builder().build(); + let committee = fixture.committee(); + let header = fixture.header(&latest_protocol_version()); + + let mut signatures = Vec::new(); + + // 3 Signers satisfies the 2F + 1 signed stake requirement + for authority in fixture.authorities().take(3) { + let vote = authority.vote(&header); + signatures.push((vote.author(), vote.signature().clone())); + } + + let certificate = + Certificate::new_unverified(&latest_protocol_version(), &committee, header, signatures) + .unwrap(); + + let verified_certificate = certificate.verify(&committee, &fixture.worker_cache()); + + assert!(verified_certificate.is_ok()); + assert!(matches!( + verified_certificate.unwrap().signature_verification_state(), + SignatureVerificationState::VerifiedDirectly(_) + )); +} + #[test] fn test_certificate_insufficient_signatures() { let fixture = CommitteeFixture::builder().build(); diff --git a/narwhal/primary/src/tests/certifier_tests.rs b/narwhal/primary/src/tests/certifier_tests.rs index 5c7a81394757b..024083adb3e7d 100644 --- a/narwhal/primary/src/tests/certifier_tests.rs +++ b/narwhal/primary/src/tests/certifier_tests.rs @@ -13,17 +13,136 @@ use primary::NUM_SHUTDOWN_RECEIVERS; use prometheus::Registry; use rand::{rngs::StdRng, SeedableRng}; use std::num::NonZeroUsize; -use test_utils::latest_protocol_version; use test_utils::CommitteeFixture; +use test_utils::{get_protocol_config, latest_protocol_version}; use tokio::sync::watch; use tokio::time::Duration; use types::{ CertificateAPI, MockPrimaryToPrimary, PreSubscribedBroadcastSender, PrimaryToPrimaryServer, - RequestVoteResponse, + RequestVoteResponse, SignatureVerificationState, }; +// TODO: Remove after network has moved to CertificateV2 #[tokio::test(flavor = "current_thread", start_paused = true)] -async fn propose_header() { +async fn propose_header_and_form_certificate_v1() { + let cert_v1_protocol_config = get_protocol_config(27); + telemetry_subscribers::init_for_testing(); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let worker_cache = fixture.worker_cache(); + let primary = fixture.authorities().last().unwrap(); + let client = NetworkClient::new_from_keypair(&primary.network_keypair()); + let network_key = primary.network_keypair().copy().private().0.to_bytes(); + let id = primary.id(); + let signature_service = SignatureService::new(primary.keypair().copy()); + let metrics = Arc::new(PrimaryMetrics::new(&Registry::new())); + let primary_channel_metrics = PrimaryChannelMetrics::new(&Registry::new()); + let mut tx_shutdown = PreSubscribedBroadcastSender::new(NUM_SHUTDOWN_RECEIVERS); + let (tx_certificate_fetcher, _rx_certificate_fetcher) = test_utils::test_channel!(1); + let (tx_headers, rx_headers) = test_utils::test_channel!(1); + let (tx_new_certificates, mut rx_new_certificates) = test_utils::test_channel!(3); + let (tx_parents, _rx_parents) = test_utils::test_channel!(1); + let (_tx_consensus_round_updates, rx_consensus_round_updates) = + watch::channel(ConsensusRound::new(0, 0)); + let (certificate_store, payload_store) = create_db_stores(); + + // Create a fake header. + let proposed_header = primary.header(&cert_v1_protocol_config, &committee); + + // Set up network. + let own_address = committee + .primary_by_id(&id) + .unwrap() + .to_anemo_address() + .unwrap(); + let network = anemo::Network::bind(own_address) + .server_name("narwhal") + .private_key(network_key) + .start(anemo::Router::new()) + .unwrap(); + + // Set up remote primaries responding with votes. + let mut primary_networks = Vec::new(); + for primary in fixture.authorities().filter(|a| a.id() != id) { + let address = committee.primary(&primary.public_key()).unwrap(); + let name = primary.id(); + let signature_service = SignatureService::new(primary.keypair().copy()); + let vote = Vote::new(&proposed_header, &name, &signature_service).await; + let mut mock_server = MockPrimaryToPrimary::new(); + let mut mock_seq = mockall::Sequence::new(); + // Verify errors are retried. + mock_server + .expect_request_vote() + .times(3) + .in_sequence(&mut mock_seq) + .returning(move |_request| { + Err(anemo::rpc::Status::new( + anemo::types::response::StatusCode::Unknown, + )) + }); + mock_server + .expect_request_vote() + .times(1) + .in_sequence(&mut mock_seq) + .return_once(move |_request| { + Ok(anemo::Response::new(RequestVoteResponse { + vote: Some(vote), + missing: Vec::new(), + })) + }); + let routes = anemo::Router::new().add_rpc_service(PrimaryToPrimaryServer::new(mock_server)); + primary_networks.push(primary.new_network(routes)); + println!("New primary added: {:?}", address); + + let address = address.to_anemo_address().unwrap(); + let peer_id = anemo::PeerId(primary.network_keypair().public().0.to_bytes()); + network + .connect_with_peer_id(address, peer_id) + .await + .unwrap(); + } + + // Spawn the core. + let synchronizer = Arc::new(Synchronizer::new( + id, + fixture.committee(), + cert_v1_protocol_config.clone(), + worker_cache.clone(), + /* gc_depth */ 50, + client, + certificate_store.clone(), + payload_store.clone(), + tx_certificate_fetcher, + tx_new_certificates.clone(), + tx_parents.clone(), + rx_consensus_round_updates.clone(), + metrics.clone(), + &primary_channel_metrics, + )); + + let _handle = Certifier::spawn( + id, + committee.clone(), + cert_v1_protocol_config.clone(), + certificate_store.clone(), + synchronizer, + signature_service, + tx_shutdown.subscribe(), + rx_headers, + metrics.clone(), + network, + ); + + // Propose header and ensure that a certificate is formed by pulling it out of the + // consensus channel. + let proposed_digest = proposed_header.digest(); + tx_headers.send(proposed_header).await.unwrap(); + let certificate = rx_new_certificates.recv().await.unwrap(); + assert_eq!(certificate.header().digest(), proposed_digest); +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn propose_header_and_form_certificate_v2() { telemetry_subscribers::init_for_testing(); let fixture = CommitteeFixture::builder().randomize_ports(true).build(); let committee = fixture.committee(); @@ -137,6 +256,10 @@ async fn propose_header() { tx_headers.send(proposed_header).await.unwrap(); let certificate = rx_new_certificates.recv().await.unwrap(); assert_eq!(certificate.header().digest(), proposed_digest); + assert!(matches!( + certificate.signature_verification_state(), + SignatureVerificationState::VerifiedDirectly(_) + )); } #[tokio::test(flavor = "current_thread", start_paused = true)] diff --git a/narwhal/primary/src/tests/primary_tests.rs b/narwhal/primary/src/tests/primary_tests.rs index ccd7363f62d60..2e6b36d4b4737 100644 --- a/narwhal/primary/src/tests/primary_tests.rs +++ b/narwhal/primary/src/tests/primary_tests.rs @@ -26,12 +26,14 @@ use std::{ }; use storage::{NodeStorage, VoteDigestStore}; use test_utils::{ - latest_protocol_version, make_optimal_signed_certificates, temp_dir, CommitteeFixture, + get_protocol_config, latest_protocol_version, make_optimal_signed_certificates, temp_dir, + CommitteeFixture, }; use tokio::{sync::watch, time::timeout}; use types::{ now, Certificate, CertificateAPI, FetchCertificatesRequest, Header, HeaderAPI, MockPrimaryToWorker, PreSubscribedBroadcastSender, PrimaryToPrimary, RequestVoteRequest, + SignatureVerificationState, }; use worker::{metrics::initialise_metrics, TrivialTransactionValidator, Worker}; @@ -306,6 +308,7 @@ async fn test_request_vote_has_missing_parents() { let handler = PrimaryReceiverHandler { authority_id: target_id, committee: fixture.committee(), + protocol_config: latest_protocol_version(), worker_cache: worker_cache.clone(), synchronizer: synchronizer.clone(), signature_service, @@ -474,6 +477,7 @@ async fn test_request_vote_accept_missing_parents() { let handler = PrimaryReceiverHandler { authority_id: target_id, committee: fixture.committee(), + protocol_config: latest_protocol_version(), worker_cache: worker_cache.clone(), synchronizer: synchronizer.clone(), signature_service, @@ -630,6 +634,7 @@ async fn test_request_vote_missing_batches() { let handler = PrimaryReceiverHandler { authority_id, committee: fixture.committee(), + protocol_config: latest_protocol_version(), worker_cache: worker_cache.clone(), synchronizer: synchronizer.clone(), signature_service, @@ -777,6 +782,7 @@ async fn test_request_vote_already_voted() { let handler = PrimaryReceiverHandler { authority_id: id, committee: fixture.committee(), + protocol_config: latest_protocol_version(), worker_cache: worker_cache.clone(), synchronizer: synchronizer.clone(), signature_service, @@ -924,8 +930,184 @@ async fn test_request_vote_already_voted() { ); } +// TODO: Remove after network has moved to CertificateV2 #[tokio::test] -async fn test_fetch_certificates_handler() { +async fn test_fetch_certificates_v1_handler() { + let cert_v1_protocol_config = get_protocol_config(27); + let fixture = CommitteeFixture::builder() + .randomize_ports(true) + .committee_size(NonZeroUsize::new(4).unwrap()) + .build(); + let id = fixture.authorities().next().unwrap().id(); + let worker_cache = fixture.worker_cache(); + let primary = fixture.authorities().next().unwrap(); + let signature_service = SignatureService::new(primary.keypair().copy()); + let metrics = Arc::new(PrimaryMetrics::new(&Registry::new())); + let primary_channel_metrics = PrimaryChannelMetrics::new(&Registry::new()); + let client = NetworkClient::new_from_keypair(&primary.network_keypair()); + + let (certificate_store, payload_store) = create_db_stores(); + let (tx_certificate_fetcher, _rx_certificate_fetcher) = test_utils::test_channel!(1); + let (tx_new_certificates, _rx_new_certificates) = test_utils::test_channel!(100); + let (tx_parents, _rx_parents) = test_utils::test_channel!(100); + let (_tx_consensus_round_updates, rx_consensus_round_updates) = + watch::channel(ConsensusRound::default()); + let (_tx_narwhal_round_updates, rx_narwhal_round_updates) = watch::channel(1u64); + + let synchronizer = Arc::new(Synchronizer::new( + id, + fixture.committee(), + cert_v1_protocol_config.clone(), + worker_cache.clone(), + /* gc_depth */ 50, + client, + certificate_store.clone(), + payload_store.clone(), + tx_certificate_fetcher, + tx_new_certificates, + tx_parents, + rx_consensus_round_updates.clone(), + metrics.clone(), + &primary_channel_metrics, + )); + let handler = PrimaryReceiverHandler { + authority_id: id, + committee: fixture.committee(), + protocol_config: cert_v1_protocol_config.clone(), + worker_cache: worker_cache.clone(), + synchronizer: synchronizer.clone(), + signature_service, + certificate_store: certificate_store.clone(), + vote_digest_store: VoteDigestStore::new_for_tests(), + rx_narwhal_round_updates, + parent_digests: Default::default(), + metrics: metrics.clone(), + }; + + let mut current_round: Vec<_> = + Certificate::genesis(&cert_v1_protocol_config, &fixture.committee()) + .into_iter() + .map(|cert| cert.header().clone()) + .collect(); + let mut headers = vec![]; + let total_rounds = 4; + for i in 0..total_rounds { + let parents: BTreeSet<_> = current_round + .into_iter() + .map(|header| { + fixture + .certificate(&cert_v1_protocol_config, &header) + .digest() + }) + .collect(); + (_, current_round) = fixture.headers_round(i, &parents, &cert_v1_protocol_config); + headers.extend(current_round.clone()); + } + + let total_authorities = fixture.authorities().count(); + let total_certificates = total_authorities * total_rounds as usize; + // Create certificates test data. + let mut certificates = vec![]; + for header in headers.into_iter() { + certificates.push(fixture.certificate(&cert_v1_protocol_config, &header)); + } + assert_eq!(certificates.len(), total_certificates); + assert_eq!(16, total_certificates); + + // Populate certificate store such that each authority has the following rounds: + // Authority 0: 1 + // Authority 1: 1 2 + // Authority 2: 1 2 3 + // Authority 3: 1 2 3 4 + // This is unrealistic because in practice a certificate can only be stored with 2f+1 parents + // already in store. But this does not matter for testing here. + let mut authorities = Vec::::new(); + for i in 0..total_authorities { + authorities.push(certificates[i].header().author()); + for j in 0..=i { + let cert = certificates[i + j * total_authorities].clone(); + assert_eq!(&cert.header().author(), authorities.last().unwrap()); + certificate_store + .write(cert) + .expect("Writing certificate to store failed"); + } + } + + // Each test case contains (lower bound round, skip rounds, max items, expected output). + let test_cases = vec![ + ( + 0, + vec![vec![], vec![], vec![], vec![]], + 20, + vec![1, 1, 1, 1, 2, 2, 2, 3, 3, 4], + ), + ( + 0, + vec![vec![1u64], vec![1], vec![], vec![]], + 20, + vec![1, 1, 2, 2, 2, 3, 3, 4], + ), + ( + 0, + vec![vec![], vec![], vec![1], vec![1]], + 20, + vec![1, 1, 2, 2, 2, 3, 3, 4], + ), + ( + 1, + vec![vec![], vec![], vec![2], vec![2]], + 4, + vec![2, 3, 3, 4], + ), + (1, vec![vec![], vec![], vec![2], vec![2]], 2, vec![2, 3]), + ( + 0, + vec![vec![1], vec![1], vec![1, 2, 3], vec![1, 2, 3]], + 2, + vec![2, 4], + ), + (2, vec![vec![], vec![], vec![], vec![]], 3, vec![3, 3, 4]), + (2, vec![vec![], vec![], vec![], vec![]], 2, vec![3, 3]), + // Check that round 2 and 4 are fetched for the last authority, skipping round 3. + ( + 1, + vec![vec![], vec![], vec![3], vec![3]], + 5, + vec![2, 2, 2, 4], + ), + ]; + for (lower_bound_round, skip_rounds_vec, max_items, expected_rounds) in test_cases { + let req = FetchCertificatesRequest::default() + .set_bounds( + lower_bound_round, + authorities + .clone() + .into_iter() + .zip( + skip_rounds_vec + .into_iter() + .map(|rounds| rounds.into_iter().collect()), + ) + .collect(), + ) + .set_max_items(max_items); + let resp = handler + .fetch_certificates(anemo::Request::new(req.clone())) + .await + .unwrap() + .into_body(); + assert_eq!( + resp.certificates + .iter() + .map(|cert| cert.round()) + .collect_vec(), + expected_rounds + ); + } +} + +#[tokio::test] +async fn test_fetch_certificates_v2_handler() { let fixture = CommitteeFixture::builder() .randomize_ports(true) .committee_size(NonZeroUsize::new(4).unwrap()) @@ -965,6 +1147,7 @@ async fn test_fetch_certificates_handler() { let handler = PrimaryReceiverHandler { authority_id: id, committee: fixture.committee(), + protocol_config: latest_protocol_version(), worker_cache: worker_cache.clone(), synchronizer: synchronizer.clone(), signature_service, @@ -1017,8 +1200,27 @@ async fn test_fetch_certificates_handler() { for i in 0..total_authorities { authorities.push(certificates[i].header().author()); for j in 0..=i { - let cert = certificates[i + j * total_authorities].clone(); + let mut cert = certificates[i + j * total_authorities].clone(); assert_eq!(&cert.header().author(), authorities.last().unwrap()); + if i == 3 && j == 3 { + // Simulating only 1 directly verified certificate (Auth 3 Round 4) being stored. + cert.set_signature_verification_state( + SignatureVerificationState::VerifiedDirectly( + cert.aggregated_signature() + .expect("Invalid Signature") + .clone(), + ), + ); + } else { + // Simulating some indirectly verified certificates being stored. + cert.set_signature_verification_state( + SignatureVerificationState::VerifiedIndirectly( + cert.aggregated_signature() + .expect("Invalid Signature") + .clone(), + ), + ); + } certificate_store .write(cert) .expect("Writing certificate to store failed"); @@ -1142,6 +1344,7 @@ async fn test_request_vote_created_at_in_future() { let handler = PrimaryReceiverHandler { authority_id: id, committee: fixture.committee(), + protocol_config: latest_protocol_version(), worker_cache: worker_cache.clone(), synchronizer: synchronizer.clone(), signature_service, diff --git a/narwhal/primary/src/tests/synchronizer_tests.rs b/narwhal/primary/src/tests/synchronizer_tests.rs index 628cf0fe6b53d..b97e17ec2462a 100644 --- a/narwhal/primary/src/tests/synchronizer_tests.rs +++ b/narwhal/primary/src/tests/synchronizer_tests.rs @@ -24,7 +24,10 @@ use test_utils::{ CommitteeFixture, }; use tokio::sync::watch; -use types::{error::DagError, Certificate, CertificateAPI, Header, HeaderAPI, Round}; +use types::{ + error::DagError, Certificate, CertificateAPI, Header, HeaderAPI, Round, + SignatureVerificationState, +}; #[tokio::test] async fn accept_certificates() { @@ -900,9 +903,35 @@ async fn gc_suspended_certificates() { ); // Re-insertion of missing certificate as fetched certificates should be suspended too. - for cert in &certificates[NUM_AUTHORITIES * 2..NUM_AUTHORITIES * 4] { + for (idx, cert) in certificates[NUM_AUTHORITIES * 2..NUM_AUTHORITIES * 4] + .iter() + .enumerate() + { + let mut verified_cert = cert.clone(); + // Simulate CertificateV2 fetched certificate leaf only verification + if (idx + NUM_AUTHORITIES * 2) < NUM_AUTHORITIES * 3 { + // Round 3 certs are parents of round 4 certs, so we mark them as verified indirectly + verified_cert.set_signature_verification_state( + SignatureVerificationState::VerifiedIndirectly( + verified_cert + .aggregated_signature() + .expect("Invalid Signature") + .clone(), + ), + ) + } else { + // Round 4 certs are leaf certs in this case and are verified directly + verified_cert.set_signature_verification_state( + SignatureVerificationState::VerifiedDirectly( + verified_cert + .aggregated_signature() + .expect("Invalid Signature") + .clone(), + ), + ) + } match synchronizer - .try_accept_fetched_certificate(cert.clone()) + .try_accept_fetched_certificate(verified_cert) .await { Ok(()) => panic!("Unexpected acceptance of {cert:?}"), diff --git a/narwhal/types/src/primary.rs b/narwhal/types/src/primary.rs index 03dfa45ceae33..f39a87f00f2ae 100644 --- a/narwhal/types/src/primary.rs +++ b/narwhal/types/src/primary.rs @@ -1251,7 +1251,7 @@ pub enum SignatureVerificationState { VerifiedDirectly(AggregateSignatureBytes), // This state occurs when the cert was a parent of another fetched certificate // that was verified directly, then this certificate is verified indirectly. - VerifiedIndirectly, + VerifiedIndirectly(AggregateSignatureBytes), // This state occurs only for genesis certificates which always has valid // signatures bytes but the bytes are garbage so we don't mark them as verified. Genesis, @@ -1282,9 +1282,9 @@ impl CertificateAPI for CertificateV2 { match &self.signature_verification_state { SignatureVerificationState::VerifiedDirectly(bytes) | SignatureVerificationState::Unverified(bytes) + | SignatureVerificationState::VerifiedIndirectly(bytes) | SignatureVerificationState::Unsigned(bytes) => Some(bytes), - SignatureVerificationState::VerifiedIndirectly - | SignatureVerificationState::Genesis => None, + SignatureVerificationState::Genesis => None, } } @@ -1413,7 +1413,7 @@ impl CertificateV2 { let aggregate_signature_bytes = AggregateSignatureBytes::from(&aggregated_signature); - let aggregate_signature_verification_state = if !check_stake { + let signature_verification_state = if !check_stake { SignatureVerificationState::Unsigned(aggregate_signature_bytes) } else { SignatureVerificationState::Unverified(aggregate_signature_bytes) @@ -1421,7 +1421,7 @@ impl CertificateV2 { Ok(Certificate::V2(CertificateV2 { header, - signature_verification_state: aggregate_signature_verification_state, + signature_verification_state, signed_authorities, metadata: Metadata::default(), })) @@ -1493,7 +1493,7 @@ impl CertificateV2 { fn verify_signature(mut self, pks: Vec) -> DagResult { let aggregrate_signature_bytes = match self.signature_verification_state { - SignatureVerificationState::VerifiedIndirectly + SignatureVerificationState::VerifiedIndirectly(_) | SignatureVerificationState::VerifiedDirectly(_) | SignatureVerificationState::Genesis => return Ok(Certificate::V2(self)), SignatureVerificationState::Unverified(ref bytes) => bytes, @@ -1528,6 +1528,51 @@ impl CertificateV2 { } } +// Certificate version is validated against network protocol version. If CertificateV2 +// is being used then the cert will also be marked as Unverifed as this certificate +// is assumed to be received from the network. This SignatureVerificationState is +// why the modified certificate is being returned. +pub fn validate_received_certificate_version( + mut certificate: Certificate, + protocol_config: &ProtocolConfig, +) -> anyhow::Result { + // If network has advanced to using version 28, which sets narwhal_certificate_v2 + // to true, we will start using CertificateV2 locally and so we will only accept + // CertificateV2 from the network. Otherwise CertificateV1 is used. + match certificate { + Certificate::V1(_) => { + // CertificateV1 does not have a concept of aggregated signature state + // so there is nothing to reset. + if protocol_config.narwhal_certificate_v2() { + return Err(anyhow::anyhow!(format!( + "Received CertificateV1 {certificate:?} but network is at {:?} and this certificate version is no longer supported", + protocol_config.version + ))); + } + } + Certificate::V2(_) => { + if !protocol_config.narwhal_certificate_v2() { + return Err(anyhow::anyhow!(format!( + "Received CertificateV2 {certificate:?} but network is at {:?} and this certificate version is not supported yet", + protocol_config.version + ))); + } else { + // CertificateV2 was received from the network so we need to mark + // certificate aggregated signature state as unverified. + certificate.set_signature_verification_state( + SignatureVerificationState::Unverified( + certificate + .aggregated_signature() + .ok_or(anyhow::anyhow!("Invalid signature"))? + .clone(), + ), + ); + } + } + }; + Ok(certificate) +} + #[derive( Clone, Copy,