Skip to content

Commit

Permalink
[Quorum Store] improvements to prevent some batches not getting quorum
Browse files Browse the repository at this point in the history
  • Loading branch information
bchocho committed Jan 26, 2024
1 parent c4da1d7 commit 5a71dbb
Show file tree
Hide file tree
Showing 12 changed files with 140 additions and 26 deletions.
30 changes: 25 additions & 5 deletions consensus/consensus-types/src/proof_of_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ impl SignedBatchInfoMsg {
&self,
sender: PeerId,
max_num_batches: usize,
max_batch_expiry_gap_usecs: u64,

Check warning on line 151 in consensus/consensus-types/src/proof_of_store.rs

View check run for this annotation

Codecov / codecov/patch

consensus/consensus-types/src/proof_of_store.rs#L151

Added line #L151 was not covered by tests
validator: &ValidatorVerifier,
) -> anyhow::Result<()> {
ensure!(!self.signed_infos.is_empty(), "Empty message");
Expand All @@ -158,7 +159,7 @@ impl SignedBatchInfoMsg {
max_num_batches
);
for signed_info in &self.signed_infos {
signed_info.verify(sender, validator)?
signed_info.verify(sender, max_batch_expiry_gap_usecs, validator)?

Check warning on line 162 in consensus/consensus-types/src/proof_of_store.rs

View check run for this annotation

Codecov / codecov/patch

consensus/consensus-types/src/proof_of_store.rs#L162

Added line #L162 was not covered by tests
}
Ok(())
}
Expand Down Expand Up @@ -207,12 +208,29 @@ impl SignedBatchInfo {
self.signer
}

pub fn verify(&self, sender: PeerId, validator: &ValidatorVerifier) -> anyhow::Result<()> {
if sender == self.signer {
Ok(validator.verify(self.signer, &self.info, &self.signature)?)
} else {
pub fn verify(
&self,
sender: PeerId,
max_batch_expiry_gap_usecs: u64,
validator: &ValidatorVerifier,
) -> anyhow::Result<()> {
if sender != self.signer {

Check warning on line 217 in consensus/consensus-types/src/proof_of_store.rs

View check run for this annotation

Codecov / codecov/patch

consensus/consensus-types/src/proof_of_store.rs#L211-L217

Added lines #L211 - L217 were not covered by tests
bail!("Sender {} mismatch signer {}", sender, self.signer);
}

if self.expiration()
> aptos_infallible::duration_since_epoch().as_micros() as u64
+ max_batch_expiry_gap_usecs

Check warning on line 223 in consensus/consensus-types/src/proof_of_store.rs

View check run for this annotation

Codecov / codecov/patch

consensus/consensus-types/src/proof_of_store.rs#L220-L223

Added lines #L220 - L223 were not covered by tests
{
bail!(
"Batch expiration too far in future: {} > {}",
self.expiration(),
aptos_infallible::duration_since_epoch().as_micros() as u64
+ max_batch_expiry_gap_usecs
);
}

Ok(validator.verify(self.signer, &self.info, &self.signature)?)

Check warning on line 233 in consensus/consensus-types/src/proof_of_store.rs

View check run for this annotation

Codecov / codecov/patch

consensus/consensus-types/src/proof_of_store.rs#L225-L233

Added lines #L225 - L233 were not covered by tests
}

pub fn signature(self) -> bls12381::Signature {
Expand All @@ -238,6 +256,8 @@ pub enum SignedBatchInfoError {
WrongInfo((u64, u64)),
DuplicatedSignature,
InvalidAuthor,
NotFound,
AlreadyCommitted,
}

#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)]
Expand Down
3 changes: 3 additions & 0 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,8 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
let round_manager_tx = self.round_manager_tx.clone();
let my_peer_id = self.author;
let max_num_batches = self.config.quorum_store.receiver_max_num_batches;
let max_batch_expiry_gap_usecs =
self.config.quorum_store.batch_expiry_gap_when_init_usecs;
let payload_manager = self.payload_manager.clone();
self.bounded_executor
.spawn(async move {
Expand All @@ -1191,6 +1193,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
quorum_store_enabled,
peer_id == my_peer_id,
max_num_batches,
max_batch_expiry_gap_usecs,
)
) {
Ok(verified_event) => {
Expand Down
1 change: 1 addition & 0 deletions consensus/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub struct LogSchema {
pub enum LogEvent {
CommitViaBlock,
CommitViaSync,
IncrementalProofExpired,
NetworkReceiveProposal,
NewEpoch,
NewRound,
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/quorum_store/batch_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::{
network::{NetworkSender, QuorumStoreSender},
quorum_store::{
batch_store::BatchStore,
batch_store::{BatchStore, BatchWriter},
counters,
types::{Batch, PersistedValue},
},
Expand Down
13 changes: 13 additions & 0 deletions consensus/src/quorum_store/batch_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
monitor,
network::{NetworkSender, QuorumStoreSender},
quorum_store::{
batch_store::BatchWriter,
counters,
quorum_store_db::QuorumStoreStorage,
types::Batch,
Expand Down Expand Up @@ -44,6 +45,7 @@ pub struct BatchGenerator {
my_peer_id: PeerId,
batch_id: BatchId,
db: Arc<dyn QuorumStoreStorage>,
batch_writer: Arc<dyn BatchWriter>,
config: QuorumStoreConfig,
mempool_proxy: MempoolProxy,
batches_in_progress: HashMap<BatchId, Vec<TransactionSummary>>,
Expand All @@ -61,6 +63,7 @@ impl BatchGenerator {
my_peer_id: PeerId,
config: QuorumStoreConfig,
db: Arc<dyn QuorumStoreStorage>,
batch_writer: Arc<dyn BatchWriter>,
mempool_tx: Sender<QuorumStoreRequest>,
mempool_txn_pull_timeout_ms: u64,
) -> Self {
Expand All @@ -85,6 +88,7 @@ impl BatchGenerator {
my_peer_id,
batch_id,
db,
batch_writer,
config,
mempool_proxy: MempoolProxy::new(mempool_tx, mempool_txn_pull_timeout_ms),
batches_in_progress: HashMap::new(),
Expand Down Expand Up @@ -404,6 +408,15 @@ impl BatchGenerator {
let batches = self.handle_scheduled_pull(dynamic_pull_max_txn).await;
if !batches.is_empty() {
last_non_empty_pull = tick_start;

let persist_start = Instant::now();
let mut persist_requests = vec![];
for batch in batches.clone().into_iter() {
persist_requests.push(batch.into());
}
self.batch_writer.persist(persist_requests);
counters::BATCH_CREATION_PERSIST_LATENCY.observe_duration(persist_start.elapsed());

network_sender.broadcast_batch_msg(batches).await;
} else if tick_start.elapsed() > interval.period().checked_div(2).unwrap_or(Duration::ZERO) {
// If the pull takes too long, it's also accounted as a non-empty pull to avoid pulling too often.
Expand Down
26 changes: 16 additions & 10 deletions consensus/src/quorum_store/batch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,16 +299,6 @@ impl BatchStore {
ret
}

pub fn persist(&self, persist_requests: Vec<PersistedValue>) -> Vec<SignedBatchInfo> {
let mut signed_infos = vec![];
for persist_request in persist_requests.into_iter() {
if let Some(signed_info) = self.persist_inner(persist_request) {
signed_infos.push(signed_info);
}
}
signed_infos
}

fn persist_inner(&self, persist_request: PersistedValue) -> Option<SignedBatchInfo> {
match self.save(persist_request.clone()) {
Ok(needs_db) => {
Expand Down Expand Up @@ -382,6 +372,18 @@ impl BatchStore {
}
}

impl BatchWriter for BatchStore {
fn persist(&self, persist_requests: Vec<PersistedValue>) -> Vec<SignedBatchInfo> {
let mut signed_infos = vec![];
for persist_request in persist_requests.into_iter() {
if let Some(signed_info) = self.persist_inner(persist_request) {
signed_infos.push(signed_info);
}
}
signed_infos
}
}

pub trait BatchReader: Send + Sync {
/// Check if the batch corresponding to the digest exists, return the batch author if true
fn exists(&self, digest: &HashValue) -> Option<PeerId>;
Expand Down Expand Up @@ -444,3 +446,7 @@ impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchReader for Batch
self.batch_store.update_certified_timestamp(certified_time);
}
}

pub trait BatchWriter: Send + Sync {
fn persist(&self, persist_requests: Vec<PersistedValue>) -> Vec<SignedBatchInfo>;
}
11 changes: 11 additions & 0 deletions consensus/src/quorum_store/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,17 @@ pub static BATCH_CREATION_COMPUTE_LATENCY: Lazy<DurationHistogram> = Lazy::new(|
)
});

/// Histogram of the time it takes to persist batches generated locally to the DB.
pub static BATCH_CREATION_PERSIST_LATENCY: Lazy<DurationHistogram> = Lazy::new(|| {
DurationHistogram::new(
register_histogram!(
"quorum_store_batch_creation_persist_latency",
"Histogram of the time it takes to persist batches generated locally to the DB.",
)
.unwrap(),
)
});

Check warning on line 630 in consensus/src/quorum_store/counters.rs

View check run for this annotation

Codecov / codecov/patch

consensus/src/quorum_store/counters.rs#L622-L630

Added lines #L622 - L630 were not covered by tests

/// Histogram of the time durations from created batch to created PoS.
pub static BATCH_TO_POS_DURATION: Lazy<DurationHistogram> = Lazy::new(|| {
DurationHistogram::new(
Expand Down
42 changes: 36 additions & 6 deletions consensus/src/quorum_store/proof_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ pub(crate) struct ProofCoordinator {
digest_to_time: HashMap<HashValue, u64>,
// to record the batch creation time
timeouts: Timeouts<BatchInfo>,
committed_batches: HashMap<BatchInfo, IncrementalProofState>,
batch_reader: Arc<dyn BatchReader>,
batch_generator_cmd_tx: tokio::sync::mpsc::Sender<BatchGeneratorCommand>,
broadcast_proofs: bool,
Expand All @@ -162,6 +163,7 @@ impl ProofCoordinator {
digest_to_proof: HashMap::new(),
digest_to_time: HashMap::new(),
timeouts: Timeouts::new(),
committed_batches: HashMap::new(),
batch_reader,
batch_generator_cmd_tx,
broadcast_proofs,
Expand All @@ -179,10 +181,17 @@ impl ProofCoordinator {
let batch_author = self
.batch_reader
.exists(signed_batch_info.digest())
.ok_or(SignedBatchInfoError::WrongAuthor)?;
.ok_or(SignedBatchInfoError::NotFound)?;
if batch_author != signed_batch_info.author() {
return Err(SignedBatchInfoError::WrongAuthor);
}
if self
.committed_batches
.get(signed_batch_info.batch_info())
.is_some()
{
return Err(SignedBatchInfoError::AlreadyCommitted);

Check warning on line 193 in consensus/src/quorum_store/proof_coordinator.rs

View check run for this annotation

Codecov / codecov/patch

consensus/src/quorum_store/proof_coordinator.rs#L193

Added line #L193 was not covered by tests
}

self.timeouts.add(
signed_batch_info.batch_info().clone(),
Expand Down Expand Up @@ -228,14 +237,20 @@ impl ProofCoordinator {
counters::BATCH_TO_POS_DURATION.observe_duration(Duration::from_micros(duration));
return Ok(Some(proof));
}
} else if let Some(value) = self
.committed_batches
.get_mut(signed_batch_info.batch_info())

Check warning on line 242 in consensus/src/quorum_store/proof_coordinator.rs

View check run for this annotation

Codecov / codecov/patch

consensus/src/quorum_store/proof_coordinator.rs#L240-L242

Added lines #L240 - L242 were not covered by tests
{
value.add_signature(signed_batch_info, validator_verifier)?;

Check warning on line 244 in consensus/src/quorum_store/proof_coordinator.rs

View check run for this annotation

Codecov / codecov/patch

consensus/src/quorum_store/proof_coordinator.rs#L244

Added line #L244 was not covered by tests
} else {
return Err(SignedBatchInfoError::NotFound);

Check warning on line 246 in consensus/src/quorum_store/proof_coordinator.rs

View check run for this annotation

Codecov / codecov/patch

consensus/src/quorum_store/proof_coordinator.rs#L246

Added line #L246 was not covered by tests
}
Ok(None)
}

fn update_counters(state: &IncrementalProofState) {
fn update_counters_on_expire(state: &IncrementalProofState) {

Check warning on line 251 in consensus/src/quorum_store/proof_coordinator.rs

View check run for this annotation

Codecov / codecov/patch

consensus/src/quorum_store/proof_coordinator.rs#L251

Added line #L251 was not covered by tests
counters::BATCH_RECEIVED_REPLIES_COUNT.observe(state.aggregated_signature.len() as f64);
counters::BATCH_RECEIVED_REPLIES_VOTING_POWER.observe(state.aggregated_voting_power as f64);
counters::BATCH_SUCCESSFUL_CREATION.observe(if state.completed { 1.0 } else { 0.0 });
}

async fn expire(&mut self) {
Expand All @@ -251,10 +266,20 @@ impl ProofCoordinator {
if !state.completed && !state.self_voted {
continue;
}

Check warning on line 269 in consensus/src/quorum_store/proof_coordinator.rs

View check run for this annotation

Codecov / codecov/patch

consensus/src/quorum_store/proof_coordinator.rs#L269

Added line #L269 was not covered by tests
if !state.completed {
counters::TIMEOUT_BATCHES_COUNT.inc();
}
Self::update_counters(&state);
if !state.completed {

Check warning on line 273 in consensus/src/quorum_store/proof_coordinator.rs

View check run for this annotation

Codecov / codecov/patch

consensus/src/quorum_store/proof_coordinator.rs#L273

Added line #L273 was not covered by tests
info!(
LogSchema::new(LogEvent::ProofOfStoreInit),
digest = signed_batch_info_info.digest(),

Check warning on line 276 in consensus/src/quorum_store/proof_coordinator.rs

View check run for this annotation

Codecov / codecov/patch

consensus/src/quorum_store/proof_coordinator.rs#L275-L276

Added lines #L275 - L276 were not covered by tests
self_voted = state.self_voted,
);
}
Self::update_counters_on_expire(&state);
} else if let Some(state) = self.committed_batches.remove(&signed_batch_info_info) {
Self::update_counters_on_expire(&state);

Check warning on line 282 in consensus/src/quorum_store/proof_coordinator.rs

View check run for this annotation

Codecov / codecov/patch

consensus/src/quorum_store/proof_coordinator.rs#L279-L282

Added lines #L279 - L282 were not covered by tests
}
}
if self
Expand Down Expand Up @@ -289,8 +314,13 @@ impl ProofCoordinator {
let digest = batch.digest();
if let Entry::Occupied(existing_proof) = self.digest_to_proof.entry(*digest) {
if batch == *existing_proof.get().batch_info() {
Self::update_counters(existing_proof.get());
existing_proof.remove();
let incremental_proof = existing_proof.get();
if !incremental_proof.completed {
warn!("QS: received commit notification for batch that did not complete: {}, self_voted: {}", digest, incremental_proof.self_voted);
}
counters::BATCH_SUCCESSFUL_CREATION.observe(if incremental_proof.completed { 1.0 } else { 0.0 });
let committed_proof = existing_proof.remove();
self.committed_batches.insert(batch, committed_proof);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions consensus/src/quorum_store/quorum_store_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ impl InnerBuilder {
self.author,
self.config.clone(),
self.quorum_store_storage.clone(),
self.batch_store.clone().unwrap(),

Check warning on line 294 in consensus/src/quorum_store/quorum_store_builder.rs

View check run for this annotation

Codecov / codecov/patch

consensus/src/quorum_store/quorum_store_builder.rs#L294

Added line #L294 was not covered by tests
self.quorum_store_to_mempool_sender,
self.mempool_txn_pull_timeout_ms,
);
Expand Down
Loading

0 comments on commit 5a71dbb

Please sign in to comment.