Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
arun-koshy committed Oct 6, 2023
1 parent 28d1242 commit 3800a58
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 108 deletions.
9 changes: 6 additions & 3 deletions narwhal/primary/src/aggregators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,12 @@ impl VotesAggregator {
let (_, pks) = cert.signed_by(committee);

let certificate_digest: Digest<{ crypto::DIGEST_LENGTH }> = Digest::from(cert.digest());
match AggregateSignature::try_from(cert.aggregated_signature())
.map_err(|_| DagError::InvalidSignature)?
.verify_secure(&to_intent_message(certificate_digest), &pks[..])
match AggregateSignature::try_from(
cert.aggregated_signature()
.ok_or(DagError::InvalidSignature)?,
)
.map_err(|_| DagError::InvalidSignature)?
.verify_secure(&to_intent_message(certificate_digest), &pks[..])
{
Err(err) => {
warn!(
Expand Down
9 changes: 5 additions & 4 deletions narwhal/primary/src/certificate_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,19 +441,20 @@ async fn process_certificates_helper(
let verify_tasks = all_certificates
.chunks(VERIFY_CERTIFICATES_BATCH_SIZE)
.map(|certs| {
let mut certs = certs.to_vec();
let certs = certs.to_vec();
let sync = synchronizer.clone();
let metrics = metrics.clone();
// Use threads dedicated to computation heavy work.
spawn_blocking(move || {
let now = Instant::now();
for c in &mut certs {
sync.sanitize_certificate(c)?;
let mut sanitized_certs = Vec::new();
for c in certs {
sanitized_certs.push(sync.sanitize_certificate(c)?);
}
metrics
.certificate_fetcher_total_verification_us
.inc_by(now.elapsed().as_micros() as u64);
Ok::<Vec<Certificate>, DagError>(certs)
Ok::<Vec<Certificate>, DagError>(sanitized_certs)
})
})
.collect_vec();
Expand Down
29 changes: 16 additions & 13 deletions narwhal/primary/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,7 @@ impl Synchronizer {
let _scope = monitored_scope("Synchronizer::try_accept_certificate");
self.process_certificate_internal(certificate, true, true)
.await
.map(|_| ())
}

/// Tries to accept a certificate from certificate fetcher.
Expand All @@ -569,17 +570,18 @@ impl Synchronizer {
let _scope = monitored_scope("Synchronizer::try_accept_fetched_certificate");
self.process_certificate_internal(certificate, false, false)
.await
.map(|_| ())
}

/// Accepts a certificate produced by this primary. This is not expected to fail unless
/// the primary is shutting down.
pub async fn accept_own_certificate(&self, certificate: Certificate) -> DagResult<()> {
// Process the new certificate.
match self
let certificate = match self
.process_certificate_internal(certificate.clone(), false, false)
.await
{
Ok(()) => Ok(()),
Ok(processed_certificate) => Ok(processed_certificate),
result @ Err(DagError::ShuttingDown) => result,
Err(e) => panic!("Failed to process locally-created certificate: {e}"),
}?;
Expand Down Expand Up @@ -634,8 +636,7 @@ impl Synchronizer {
}

/// Checks if the certificate is valid and can potentially be accepted into the DAG.
// TODO: produce a different type after sanitize, e.g. VerifiedCertificate.
pub fn sanitize_certificate(&self, certificate: &mut Certificate) -> DagResult<()> {
pub fn sanitize_certificate(&self, certificate: Certificate) -> DagResult<Certificate> {
ensure!(
self.inner.committee.epoch() == certificate.epoch(),
DagError::InvalidEpoch {
Expand All @@ -650,24 +651,24 @@ impl Synchronizer {
DagError::TooOld(certificate.digest().into(), certificate.round(), gc_round)
);
// Verify the certificate (and the embedded header).
certificate
.verify(&self.inner.committee, &self.inner.worker_cache)
.map_err(DagError::from)
certificate.verify(&self.inner.committee, &self.inner.worker_cache)
}

// CertificateV2 maintains signature verification state. Therefore when this
// method is called with sanitize = true, the signature verification state may
// change which is why the updated certificate is returned.
async fn process_certificate_internal(
&self,
mut certificate: Certificate,
sanitize: bool,
early_suspend: bool,
) -> DagResult<()> {
) -> DagResult<Certificate> {
let _scope = monitored_scope("Synchronizer::process_certificate_internal");

let digest = certificate.digest();
if self.inner.certificate_store.contains(&digest)? {
trace!("Certificate {digest:?} has already been processed. Skip processing.");
self.inner.metrics.duplicate_certificates_processed.inc();
return Ok(());
return Ok(certificate);
}
// Ensure parents are checked if !early_suspend.
// See comments above `try_accept_fetched_certificate()` for details.
Expand All @@ -682,8 +683,9 @@ impl Synchronizer {
return Err(DagError::Suspended(notify));
}
}

if sanitize {
self.sanitize_certificate(&mut certificate)?;
certificate = self.sanitize_certificate(certificate)?;
}

debug!(
Expand Down Expand Up @@ -750,12 +752,13 @@ impl Synchronizer {
let (sender, receiver) = oneshot::channel();
self.inner
.tx_certificate_acceptor
.send((certificate, sender, early_suspend))
.send((certificate.clone(), sender, early_suspend))
.await
.expect("Synchronizer should shut down before certificate acceptor task.");
receiver
.await
.expect("Synchronizer should shut down before certificate acceptor task.")
.expect("Synchronizer should shut down before certificate acceptor task.")?;
Ok(certificate)
}

/// This function checks if a certificate has all parents and can be accepted into storage.
Expand Down
10 changes: 5 additions & 5 deletions narwhal/primary/src/tests/certificate_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ fn test_empty_certificate_verification() {
)
.is_err());

let mut certificate =
let certificate =
Certificate::new_unsigned(&latest_protocol_version(), &committee, header, Vec::new())
.unwrap();
assert!(certificate
Expand All @@ -52,7 +52,7 @@ fn test_valid_certificate_verification() {
signatures.push((vote.author(), vote.signature().clone()));
}

let mut certificate =
let certificate =
Certificate::new_unverified(&latest_protocol_version(), &committee, header, signatures)
.unwrap();

Expand Down Expand Up @@ -83,7 +83,7 @@ fn test_certificate_insufficient_signatures() {
)
.is_err());

let mut certificate =
let certificate =
Certificate::new_unsigned(&latest_protocol_version(), &committee, header, signatures)
.unwrap();

Expand Down Expand Up @@ -111,7 +111,7 @@ fn test_certificate_validly_repeated_public_keys() {
let certificate_res =
Certificate::new_unverified(&latest_protocol_version(), &committee, header, signatures);
assert!(certificate_res.is_ok());
let mut certificate = certificate_res.unwrap();
let certificate = certificate_res.unwrap();

assert!(certificate
.verify(&committee, &fixture.worker_cache())
Expand Down Expand Up @@ -167,7 +167,7 @@ proptest::proptest! {
signatures.push((vote.author(), vote.signature().clone()));
}

let mut certificate = Certificate::new_unverified(&latest_protocol_version(), &committee, header, signatures).unwrap();
let certificate = Certificate::new_unverified(&latest_protocol_version(), &committee, header, signatures).unwrap();

assert!(certificate
.verify(&committee, &fixture.worker_cache())
Expand Down
6 changes: 3 additions & 3 deletions narwhal/primary/src/tests/synchronizer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ async fn accept_suspended_certificates() {
);
let certificates = certificates.into_iter().collect_vec();

// Try to aceept certificates from round 2 to 5. All of them should be suspended.
// Try to accept certificates from round 2 to 5. All of them should be suspended.
let accept = FuturesUnordered::new();
for cert in &certificates[NUM_AUTHORITIES..] {
match synchronizer.try_accept_certificate(cert.clone()).await {
Expand All @@ -195,7 +195,7 @@ async fn accept_suspended_certificates() {
}
}

// Try to aceept certificates from round 1. All of them should be accepted.
// Try to accept certificates from round 1. All of them should be accepted.
for cert in &certificates[..NUM_AUTHORITIES] {
match synchronizer.try_accept_certificate(cert.clone()).await {
Ok(()) => continue,
Expand All @@ -206,7 +206,7 @@ async fn accept_suspended_certificates() {
// Wait for all notifications to arrive.
accept.collect::<Vec<()>>().await;

// Try to aceept certificates from round 2 and above again. All of them should be accepted.
// Try to accept certificates from round 2 and above again. All of them should be accepted.
for cert in &certificates[NUM_AUTHORITIES..] {
match synchronizer.try_accept_certificate(cert.clone()).await {
Ok(()) => continue,
Expand Down
21 changes: 13 additions & 8 deletions narwhal/types/benches/verify_certificate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
use criterion::{
criterion_group, criterion_main, BenchmarkId, Criterion, SamplingMode, Throughput,
};
use fastcrypto::hash::Hash;
use fastcrypto::{hash::Hash, traits::KeyPair};
use narwhal_types::Certificate;
use std::collections::BTreeSet;
use test_utils::{latest_protocol_version, make_optimal_certificates, CommitteeFixture};
use test_utils::{latest_protocol_version, make_optimal_signed_certificates, CommitteeFixture};

pub fn verify_certificates(c: &mut Criterion) {
let mut bench_group = c.benchmark_group("verify_certificate");
Expand All @@ -19,19 +19,22 @@ pub fn verify_certificates(c: &mut Criterion) {
.committee_size(committee_size.try_into().unwrap())
.build();
let committee = fixture.committee();
let ids: Vec<_> = fixture.authorities().map(|a| a.id()).collect();
let keys: Vec<_> = fixture
.authorities()
.map(|a| (a.id(), a.keypair().copy()))
.collect();

// process certificates for rounds, check we don't grow the dag too much
let genesis = Certificate::genesis(&latest_protocol_version(), &committee)
.iter()
.map(|x| x.digest())
.collect::<BTreeSet<_>>();
let (certificates, _next_parents) = make_optimal_certificates(
&committee,
&latest_protocol_version(),
let (certificates, _next_parents) = make_optimal_signed_certificates(
1..=1,
&genesis,
&ids,
&committee,
&latest_protocol_version(),
keys.as_slice(),
);
let certificate = certificates.front().unwrap().clone();

Expand All @@ -44,7 +47,9 @@ pub fn verify_certificates(c: &mut Criterion) {
|b, cert| {
let worker_cache = fixture.worker_cache();
b.iter(|| {
let _ = cert.clone().verify(&committee, &worker_cache);
cert.clone()
.verify(&committee, &worker_cache)
.expect("Verification failed");
})
},
);
Expand Down
Loading

0 comments on commit 3800a58

Please sign in to comment.