Skip to content

Commit

Permalink
better counters
Browse files Browse the repository at this point in the history
  • Loading branch information
bchocho committed Jan 13, 2024
1 parent 301330d commit bc1baa9
Showing 1 changed file with 20 additions and 11 deletions.
31 changes: 20 additions & 11 deletions consensus/src/quorum_store/proof_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use aptos_types::{
aggregate_signature::PartialSignatures, validator_verifier::ValidatorVerifier, PeerId,
};
use std::{
collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
collections::{hash_map::Entry, BTreeMap, HashMap},
sync::Arc,
time::Duration,
};
Expand Down Expand Up @@ -142,7 +142,7 @@ pub(crate) struct ProofCoordinator {
digest_to_time: HashMap<HashValue, u64>,
// to record the batch creation time
timeouts: Timeouts<BatchInfo>,
committed_batches: HashSet<BatchInfo>,
committed_batches: HashMap<BatchInfo, IncrementalProofState>,
batch_reader: Arc<dyn BatchReader>,
batch_generator_cmd_tx: tokio::sync::mpsc::Sender<BatchGeneratorCommand>,
broadcast_proofs: bool,
Expand All @@ -163,7 +163,7 @@ impl ProofCoordinator {
digest_to_proof: HashMap::new(),
digest_to_time: HashMap::new(),
timeouts: Timeouts::new(),
committed_batches: HashSet::new(),
committed_batches: HashMap::new(),
batch_reader,
batch_generator_cmd_tx,
broadcast_proofs,
Expand All @@ -187,7 +187,8 @@ impl ProofCoordinator {
}
if self
.committed_batches
.contains(signed_batch_info.batch_info())
.get(signed_batch_info.batch_info())
.is_some()
{
return Err(SignedBatchInfoError::AlreadyCommitted);
}
Expand Down Expand Up @@ -236,20 +237,25 @@ impl ProofCoordinator {
counters::BATCH_TO_POS_DURATION.observe_duration(Duration::from_micros(duration));
return Ok(Some(proof));
}
} else if let Some(value) = self
.committed_batches
.get_mut(signed_batch_info.batch_info())
{
value.add_signature(signed_batch_info, validator_verifier)?;
} else {
return Err(SignedBatchInfoError::NotFound);
}
Ok(None)
}

fn update_counters(state: &IncrementalProofState) {
fn update_counters_on_expire(state: &IncrementalProofState) {
counters::BATCH_RECEIVED_REPLIES_COUNT.observe(state.aggregated_signature.len() as f64);
counters::BATCH_RECEIVED_REPLIES_VOTING_POWER.observe(state.aggregated_voting_power as f64);
counters::BATCH_SUCCESSFUL_CREATION.observe(if state.completed { 1.0 } else { 0.0 });
}

async fn expire(&mut self) {
let mut batch_ids = vec![];
for signed_batch_info_info in self.timeouts.expire() {
self.committed_batches.remove(&signed_batch_info_info);
if let Some(state) = self.digest_to_proof.remove(signed_batch_info_info.digest()) {
if !state.completed {
batch_ids.push(signed_batch_info_info.batch_id());
Expand All @@ -260,6 +266,7 @@ impl ProofCoordinator {
if !state.completed && !state.self_voted {
continue;
}

if !state.completed {
counters::TIMEOUT_BATCHES_COUNT.inc();
}
Expand All @@ -270,7 +277,9 @@ impl ProofCoordinator {
self_voted = state.self_voted,
);
}
Self::update_counters(&state);
Self::update_counters_on_expire(&state);
} else if let Some(state) = self.committed_batches.remove(&signed_batch_info_info) {
Self::update_counters_on_expire(&state);
}
}
if self
Expand Down Expand Up @@ -309,9 +318,9 @@ impl ProofCoordinator {
if !incremental_proof.completed {
warn!("QS: received commit notification for batch that did not complete: {}, self_voted: {}", digest, incremental_proof.self_voted);
}
Self::update_counters(incremental_proof);
existing_proof.remove();
self.committed_batches.insert(batch);
counters::BATCH_SUCCESSFUL_CREATION.observe(if incremental_proof.completed { 1.0 } else { 0.0 });
let committed_proof = existing_proof.remove();
self.committed_batches.insert(batch, committed_proof);
}
}
}
Expand Down

0 comments on commit bc1baa9

Please sign in to comment.