Skip to content

Commit

Permalink
Skip sanitization completely for parent certificates
Browse files Browse the repository at this point in the history
  • Loading branch information
arun-koshy committed Oct 10, 2023
1 parent 34f18e6 commit e1435e7
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 41 deletions.
87 changes: 47 additions & 40 deletions narwhal/primary/src/certificate_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use tokio::{
time::{sleep, timeout, Instant},
};
use tracing::{debug, error, instrument, trace, warn};
use types::AggregateSignatureVerificationState;
use types::{
error::{DagError, DagResult},
validate_received_certificate_version, Certificate, CertificateAPI,
Expand All @@ -50,6 +51,10 @@ const PARALLEL_FETCH_REQUEST_ADDITIONAL_TIMEOUT: Duration = Duration::from_secs(
// Batch size is chosen so that verifying a batch takes non-trival
// time (verifying a batch of 200 certificates should take > 100ms).
const VERIFY_CERTIFICATES_BATCH_SIZE: usize = 200;
// Number of non-parent certificates to verify in a batch. Verifications in each
// batch run serially. Smaller batch size is chosen as there are significantly
// less certificates we are verifying when we only do certificate tip verification.
const VERIFY_NON_PARENT_CERTIFICATES_BATCH_SIZE: usize = 10;

#[derive(Clone, Debug)]
pub enum CertificateFetcherCommand {
Expand Down Expand Up @@ -553,60 +558,57 @@ async fn process_certificates_v2_helper(
}

let all_certificates_count = all_certificates.len() as u64;
let mut non_parents_count: u64 = 0;

// Classify certs as parent or non-parent but maintain the order that the
// certificates were received so that they can be validated in the same
// response order to avoid certificate suspensions
let mut classified_certs: Vec<(Certificate, bool)> = vec![];
for c in all_certificates {
let is_parent = all_parents.contains(&c.digest());
if !is_parent {
non_parents_count += 1;

// Identify non-parent certs and preemptively set the parent certificates
// as verified indirectly. This is safe because any non-parent certs that
// fail verification will cancel processing for all fetched certs.
let mut non_parent_certs = Vec::new();
for (idx, c) in all_certificates.iter_mut().enumerate() {
if !all_parents.contains(&c.digest()) {
non_parent_certs.push((idx, c.clone()));
} else {
c.set_aggregate_signature_verification_state(
AggregateSignatureVerificationState::VerifiedIndirectly(
c.aggregated_signature().clone(),
),
)
}
classified_certs.push((c, is_parent));
}
let non_parents_count = non_parent_certs.len() as u64;

let verify_tasks = classified_certs
.chunks(VERIFY_CERTIFICATES_BATCH_SIZE)
// Create verify tasks only for non-parent certs as parent certs can skip
// this completely.
let non_parent_verify_tasks = non_parent_certs
.chunks(VERIFY_NON_PARENT_CERTIFICATES_BATCH_SIZE)
.map(|chunk| {
let mut certs_and_classification = chunk.to_vec();
let mut certs = chunk.to_vec();
let sync = synchronizer.clone();
let metrics = metrics.clone();
// Use threads dedicated to computation heavy work.
spawn_blocking(move || {
let now = Instant::now();
for (c, is_parent) in &mut certs_and_classification {
sync.sanitize_certificate(c, *is_parent)?;
for (_, c) in &mut certs {
sync.sanitize_certificate(c)?;
}
metrics
.certificate_fetcher_total_verification_us
.inc_by(now.elapsed().as_micros() as u64);
let certs = certs_and_classification
.into_iter()
.map(|(c, _)| c)
.collect();
Ok::<Vec<Certificate>, DagError>(certs)
Ok::<Vec<(usize, Certificate)>, DagError>(certs)
})
})
.collect_vec();

let mut sanitized_certificates = Vec::new();
// Process verified certificates in the same order as received. We ensure
// that sanitize certificate completes for all fetched certificates before
// accepting any certficates because we have to be sure that all non-parent
// certificates successfully sanitized before accepting any of the parent
// certificates. This is because parent certificates will be processed before
// non parent certificates.
for task in verify_tasks.into_iter() {
// We ensure sanitization of certificates completes for all non-parents
// fetched certificates before accepting any certficates.
for task in non_parent_verify_tasks.into_iter() {
// Any certificates that fail to be verified should cancel the entire
// batch of fetched certficates that are being processed to protect
// against byzantine validators.
let mut certificates = task.await.map_err(|err| {
// batch of fetched certficates.
let idx_and_certs = task.await.map_err(|err| {
error!("Cancelling due to {err:?}");
DagError::Canceled
})??;
sanitized_certificates.append(&mut certificates);
for (idx, cert) in idx_and_certs {
all_certificates[idx] = cert;
}
}

metrics
Expand All @@ -616,17 +618,22 @@ async fn process_certificates_v2_helper(
.fetched_certificates_verified_indirectly
.inc_by(all_certificates_count.saturating_sub(non_parents_count));

let now = Instant::now();
for cert in sanitized_certificates {
// Accept verified certificates in the same order as received.
for cert in all_certificates {
let cert_digest = cert.digest();
let now = Instant::now();
if let Err(e) = synchronizer.try_accept_fetched_certificate(cert).await {
// It is possible that subsequent certificates are useful,
// so not stopping early.
warn!("Failed to accept fetched certificate: {e}");
warn!(
"Failed to accept fetched certificate {:?}: {e}",
cert_digest
);
}
metrics
.certificate_fetcher_total_accept_us
.inc_by(now.elapsed().as_micros() as u64);
}
metrics
.certificate_fetcher_total_accept_us
.inc_by(now.elapsed().as_micros() as u64);

Ok(())
}
2 changes: 1 addition & 1 deletion narwhal/primary/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use tokio::{
time::{sleep, timeout},
};
use tracing::{debug, error, instrument, trace, warn};
use types::{bail, AggregateSignatureVerificationState};
use types::AggregateSignatureVerificationState;
use types::{
ensure,
error::{AcceptNotification, DagError, DagResult},
Expand Down

0 comments on commit e1435e7

Please sign in to comment.