diff --git a/narwhal/primary/src/certificate_fetcher.rs b/narwhal/primary/src/certificate_fetcher.rs index 82ecb35b9f731f..62fba3642943bb 100644 --- a/narwhal/primary/src/certificate_fetcher.rs +++ b/narwhal/primary/src/certificate_fetcher.rs @@ -28,6 +28,7 @@ use tokio::{ time::{sleep, timeout, Instant}, }; use tracing::{debug, error, instrument, trace, warn}; +use types::AggregateSignatureVerificationState; use types::{ error::{DagError, DagResult}, validate_received_certificate_version, Certificate, CertificateAPI, @@ -50,6 +51,10 @@ const PARALLEL_FETCH_REQUEST_ADDITIONAL_TIMEOUT: Duration = Duration::from_secs( // Batch size is chosen so that verifying a batch takes non-trival // time (verifying a batch of 200 certificates should take > 100ms). const VERIFY_CERTIFICATES_BATCH_SIZE: usize = 200; +// Number of non-parent certificates to verify in a batch. Verifications in each +// batch run serially. Smaller batch size is chosen as there are significantly +// less certificates we are verifying when we only do certificate tip verification. +const VERIFY_NON_PARENT_CERTIFICATES_BATCH_SIZE: usize = 10; #[derive(Clone, Debug)] pub enum CertificateFetcherCommand { @@ -553,60 +558,57 @@ async fn process_certificates_v2_helper( } let all_certificates_count = all_certificates.len() as u64; - let mut non_parents_count: u64 = 0; - - // Classify certs as parent or non-parent but maintain the order that the - // certificates were received so that they can be validated in the same - // response order to avoid certificate suspensions - let mut classified_certs: Vec<(Certificate, bool)> = vec![]; - for c in all_certificates { - let is_parent = all_parents.contains(&c.digest()); - if !is_parent { - non_parents_count += 1; + + // Identify non-parent certs and preemptively set the parent certificates + // as verified indirectly. This is safe because any non-parent certs that + // fail verification will cancel processing for all fetched certs. + let mut non_parent_certs = Vec::new(); + for (idx, c) in all_certificates.iter_mut().enumerate() { + if !all_parents.contains(&c.digest()) { + non_parent_certs.push((idx, c.clone())); + } else { + c.set_aggregate_signature_verification_state( + AggregateSignatureVerificationState::VerifiedIndirectly( + c.aggregated_signature().clone(), + ), + ) } - classified_certs.push((c, is_parent)); } + let non_parents_count = non_parent_certs.len() as u64; - let verify_tasks = classified_certs - .chunks(VERIFY_CERTIFICATES_BATCH_SIZE) + // Create verify tasks only for non-parent certs as parent certs can skip + // this completely. + let non_parent_verify_tasks = non_parent_certs + .chunks(VERIFY_NON_PARENT_CERTIFICATES_BATCH_SIZE) .map(|chunk| { - let mut certs_and_classification = chunk.to_vec(); + let mut certs = chunk.to_vec(); let sync = synchronizer.clone(); let metrics = metrics.clone(); - // Use threads dedicated to computation heavy work. spawn_blocking(move || { let now = Instant::now(); - for (c, is_parent) in &mut certs_and_classification { - sync.sanitize_certificate(c, *is_parent)?; + for (_, c) in &mut certs { + sync.sanitize_certificate(c)?; } metrics .certificate_fetcher_total_verification_us .inc_by(now.elapsed().as_micros() as u64); - let certs = certs_and_classification - .into_iter() - .map(|(c, _)| c) - .collect(); - Ok::, DagError>(certs) + Ok::, DagError>(certs) }) }) .collect_vec(); - let mut sanitized_certificates = Vec::new(); - // Process verified certificates in the same order as received. We ensure - // that sanitize certificate completes for all fetched certificates before - // accepting any certficates because we have to be sure that all non-parent - // certificates successfully sanitized before accepting any of the parent - // certificates. This is because parent certificates will be processed before - // non parent certificates. - for task in verify_tasks.into_iter() { + // We ensure sanitization of certificates completes for all non-parents + // fetched certificates before accepting any certficates. + for task in non_parent_verify_tasks.into_iter() { // Any certificates that fail to be verified should cancel the entire - // batch of fetched certficates that are being processed to protect - // against byzantine validators. - let mut certificates = task.await.map_err(|err| { + // batch of fetched certficates. + let idx_and_certs = task.await.map_err(|err| { error!("Cancelling due to {err:?}"); DagError::Canceled })??; - sanitized_certificates.append(&mut certificates); + for (idx, cert) in idx_and_certs { + all_certificates[idx] = cert; + } } metrics @@ -616,17 +618,22 @@ async fn process_certificates_v2_helper( .fetched_certificates_verified_indirectly .inc_by(all_certificates_count.saturating_sub(non_parents_count)); - let now = Instant::now(); - for cert in sanitized_certificates { + // Accept verified certificates in the same order as received. + for cert in all_certificates { + let cert_digest = cert.digest(); + let now = Instant::now(); if let Err(e) = synchronizer.try_accept_fetched_certificate(cert).await { // It is possible that subsequent certificates are useful, // so not stopping early. - warn!("Failed to accept fetched certificate: {e}"); + warn!( + "Failed to accept fetched certificate {:?}: {e}", + cert_digest + ); } + metrics + .certificate_fetcher_total_accept_us + .inc_by(now.elapsed().as_micros() as u64); } - metrics - .certificate_fetcher_total_accept_us - .inc_by(now.elapsed().as_micros() as u64); Ok(()) } diff --git a/narwhal/primary/src/synchronizer.rs b/narwhal/primary/src/synchronizer.rs index c7413a68b36976..ebbe2f774c43d0 100644 --- a/narwhal/primary/src/synchronizer.rs +++ b/narwhal/primary/src/synchronizer.rs @@ -33,7 +33,7 @@ use tokio::{ time::{sleep, timeout}, }; use tracing::{debug, error, instrument, trace, warn}; -use types::{bail, AggregateSignatureVerificationState}; +use types::AggregateSignatureVerificationState; use types::{ ensure, error::{AcceptNotification, DagError, DagResult},