Skip to content

Commit

Permalink
Clean up ConsensusHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind committed Oct 10, 2023
1 parent 440bb9b commit 0a60588
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 235 deletions.
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ impl AuthorityMetrics {
"Total number of consensus transactions skipped",
registry,
)
.unwrap(),
.unwrap(),
skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
"skipped_consensus_txns_cache_hit",
"Total number of consensus transactions skipped because of local cache hit",
Expand Down
180 changes: 102 additions & 78 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use parking_lot::RwLock;
use parking_lot::{Mutex, RwLockReadGuard, RwLockWriteGuard};
use rocksdb::Options;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::cmp::Ordering;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::future::Future;
use std::iter;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -41,7 +42,7 @@ use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfigura
use crate::authority::{AuthorityStore, ResolverWrapper};
use crate::checkpoints::{
BuilderCheckpointSummary, CheckpointCommitHeight, CheckpointServiceNotify, EpochStats,
PendingCheckpoint,
PendingCheckpoint, PendingCheckpointInfo,
};
use crate::consensus_handler::{
SequencedConsensusTransaction, SequencedConsensusTransactionKey,
Expand All @@ -50,12 +51,14 @@ use crate::consensus_handler::{
use crate::epoch::epoch_metrics::EpochMetrics;
use crate::epoch::reconfiguration::ReconfigState;
use crate::module_cache_metrics::ResolverMetrics;
use crate::post_consensus_tx_reorder::PostConsensusTxReorder;
use crate::signature_verifier::*;
use crate::stake_aggregator::{GenericMultiStakeAggregator, StakeAggregator};
use move_bytecode_utils::module_cache::SyncModuleCache;
use mysten_common::sync::notify_once::NotifyOnce;
use mysten_common::sync::notify_read::NotifyRead;
use mysten_metrics::monitored_scope;
use narwhal_types::{Round, TimestampMs};
use prometheus::IntCounter;
use std::str::FromStr;
use sui_execution::{self, Executor};
Expand Down Expand Up @@ -1702,7 +1705,7 @@ impl AuthorityPerEpochStore {
&self,
transaction: SequencedConsensusTransaction,
skipped_consensus_txns: &IntCounter,
) -> Result<VerifiedSequencedConsensusTransaction, ()> {
) -> Option<VerifiedSequencedConsensusTransaction> {
let _scope = monitored_scope("VerifyConsensusTransaction");
if self
.is_consensus_message_processed(&transaction.transaction.key())
Expand All @@ -1714,7 +1717,7 @@ impl AuthorityPerEpochStore {
"handle_consensus_transaction UserTransaction [skip]",
);
skipped_consensus_txns.inc();
return Err(());
return None;
}
// Signatures are verified as part of narwhal payload verification in SuiTxValidator
match &transaction.transaction {
Expand All @@ -1728,7 +1731,7 @@ impl AuthorityPerEpochStore {
}) => {
if transaction.sender_authority() != data.summary.auth_sig().authority {
warn!("CheckpointSignature authority {} does not match narwhal certificate source {}", data.summary.auth_sig().authority, transaction.certificate.origin() );
return Err(());
return None;
}
}
SequencedConsensusTransactionKind::External(ConsensusTransaction {
Expand All @@ -1741,7 +1744,7 @@ impl AuthorityPerEpochStore {
authority,
transaction.certificate.origin()
);
return Err(());
return None;
}
}
SequencedConsensusTransactionKind::External(ConsensusTransaction {
Expand All @@ -1754,7 +1757,7 @@ impl AuthorityPerEpochStore {
capabilities.authority,
transaction.certificate.origin()
);
return Err(());
return None;
}
}
SequencedConsensusTransactionKind::External(ConsensusTransaction {
Expand All @@ -1767,19 +1770,19 @@ impl AuthorityPerEpochStore {
authority,
transaction.certificate.origin()
);
return Err(());
return None;
}
if !check_total_jwk_size(id, jwk) {
warn!(
"{:?} sent jwk that exceeded max size",
transaction.sender_authority().concise()
);
return Err(());
return None;
}
}
SequencedConsensusTransactionKind::System(_) => {}
}
Ok(VerifiedSequencedConsensusTransaction(transaction))
Some(VerifiedSequencedConsensusTransaction(transaction))
}

fn db_batch(&self) -> DBBatch {
Expand All @@ -1796,37 +1799,96 @@ impl AuthorityPerEpochStore {
C: CheckpointServiceNotify,
>(
self: &'a Arc<Self>,
transactions: &[VerifiedSequencedConsensusTransaction],
end_of_publish_transactions: Vec<VerifiedSequencedConsensusTransaction>,
transactions: Vec<SequencedConsensusTransaction>,
consensus_stats: &ExecutionIndicesWithStats,
checkpoint_service: &Arc<C>,
object_store: impl ObjectStore,
) -> SuiResult<(
Vec<VerifiedExecutableTransaction>,
ConsensusCommitBatch<'a, C>,
)> {
commit_round: Round,
commit_timestamp: TimestampMs,
skipped_consensus_txns: &IntCounter,
) -> SuiResult<Vec<VerifiedExecutableTransaction>> {
let verified_transactions: Vec<_> = transactions
.into_iter()
.filter_map(|transaction| {
self.verify_consensus_transaction(transaction, skipped_consensus_txns)
})
.collect();
let roots: BTreeSet<_> = verified_transactions
.iter()
.filter_map(|transaction| transaction.0.transaction.executable_transaction_digest())
.collect();
let (end_of_publish_transactions, mut sequenced_transactions): (Vec<_>, Vec<_>) =
verified_transactions
.into_iter()
.partition(|transaction| transaction.0.is_end_of_publish());

PostConsensusTxReorder::reorder(
&mut sequenced_transactions,
self.protocol_config.consensus_transaction_ordering(),
);

let mut batch = self.db_batch();
let (transactions_to_schedule, notifications, lock_and_final_round) = self
.process_consensus_transactions(
&mut batch,
transactions,
&sequenced_transactions,
&end_of_publish_transactions,
checkpoint_service,
object_store,
)
.await?;
self.record_consensus_commit_stats(&mut batch, consensus_stats)?;
Ok((
transactions_to_schedule,
ConsensusCommitBatch {
epoch_store: self.clone(),
checkpoint_service: checkpoint_service.clone(),
batch,
notifications,
end_of_publish_transactions,
lock_and_final_round,

// The last block in this function notifies about new checkpoint if needed
// It's important that we use as_ref() here to make sure we are not dropping the lock.
// The lock needs to be held until the end of this function.
let final_checkpoint_round = lock_and_final_round.as_ref().map(|(_, r)| *r);
let final_checkpoint = match final_checkpoint_round.map(|r| r.cmp(&commit_round)) {
Some(Ordering::Less) => {
debug!(
"Not forming checkpoint for round {} above final checkpoint round {:?}",
commit_round, final_checkpoint_round
);
return Ok(vec![]);
}
Some(Ordering::Equal) => true,
Some(Ordering::Greater) => false,
None => false,
};
let pending_checkpoint = PendingCheckpoint {
roots: roots.into_iter().collect(),
details: PendingCheckpointInfo {
timestamp_ms: commit_timestamp,
last_of_epoch: final_checkpoint,
commit_height: commit_round,
},
))
};

self.write_pending_checkpoint(&mut batch, &pending_checkpoint)?;

batch.write()?;

self.process_notifications(&notifications, &end_of_publish_transactions);

checkpoint_service.notify_checkpoint(&pending_checkpoint)?;

if final_checkpoint {
info!(epoch=?self.epoch(), "Received 2f+1 EndOfPublish messages, notifying last checkpoint");
self.record_end_of_message_quorum_time_metric();
}

Ok(transactions_to_schedule)
}

#[cfg(any(test, feature = "test-utils"))]
fn get_highest_pending_checkpoint_height(&self) -> CheckpointCommitHeight {
self.tables
.pending_checkpoints
.unbounded_iter()
.skip_to_last()
.next()
.map(|(key, _)| key)
.unwrap_or_default()
}

// Caller is not required to set ExecutionIndices with the right semantics in
Expand All @@ -1835,30 +1897,22 @@ impl AuthorityPerEpochStore {
// process_consensus_transactions_and_commit_boundary().
#[cfg(any(test, feature = "test-utils"))]
pub(crate) async fn process_consensus_transactions_for_tests<C: CheckpointServiceNotify>(
&self,
transactions: Vec<VerifiedSequencedConsensusTransaction>,
self: &Arc<Self>,
transactions: Vec<SequencedConsensusTransaction>,
checkpoint_service: &Arc<C>,
object_store: impl ObjectStore,
skipped_consensus_txns: &IntCounter,
) -> SuiResult<Vec<VerifiedExecutableTransaction>> {
let mut batch = self.db_batch();

let (transactions, end_of_publish_transactions): (Vec<_>, Vec<_>) = transactions
.into_iter()
.partition(|txn| !txn.0.is_end_of_publish());

let (certs, notifications, _lock) = self
.process_consensus_transactions(
&mut batch,
&transactions,
&end_of_publish_transactions,
checkpoint_service,
object_store,
)
.await?;
batch.write()?;

self.process_notifications(&notifications, &end_of_publish_transactions);
Ok(certs)
self.process_consensus_transactions_and_commit_boundary(
transactions,
&ExecutionIndicesWithStats::default(),
checkpoint_service,
object_store,
self.get_highest_pending_checkpoint_height() + 1,
0,
skipped_consensus_txns,
)
.await
}

fn process_notifications(
Expand Down Expand Up @@ -2468,36 +2522,6 @@ impl AuthorityPerEpochStore {
}
}

pub(crate) struct ConsensusCommitBatch<'a, C> {
epoch_store: Arc<AuthorityPerEpochStore>,
checkpoint_service: Arc<C>,
batch: DBBatch,
notifications: Vec<SequencedConsensusTransactionKey>,
end_of_publish_transactions: Vec<VerifiedSequencedConsensusTransaction>,
lock_and_final_round: Option<(parking_lot::RwLockWriteGuard<'a, ReconfigState>, u64)>,
}

impl<'a, C: CheckpointServiceNotify> ConsensusCommitBatch<'a, C> {
pub fn commit(mut self, pending_checkpoint: PendingCheckpoint) -> SuiResult {
self.epoch_store
.write_pending_checkpoint(&mut self.batch, &pending_checkpoint)?;

self.batch.write()?;

self.epoch_store
.process_notifications(&self.notifications, &self.end_of_publish_transactions);

self.checkpoint_service
.notify_checkpoint(&pending_checkpoint)
}

pub fn final_checkpoint_round(&self) -> Option<u64> {
self.lock_and_final_round
.as_ref()
.map(|(_, final_round)| *final_round)
}
}

impl ExecutionComponents {
fn new(
protocol_config: &ProtocolConfig,
Expand Down
62 changes: 25 additions & 37 deletions crates/sui-core/src/authority/authority_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,52 +308,40 @@ pub async fn send_consensus(authority: &AuthorityState, cert: &VerifiedCertifica
ConsensusTransaction::new_certificate_message(&authority.name, cert.clone().into_inner()),
);

if let Ok(transaction) = authority
let certs = authority
.epoch_store_for_testing()
.verify_consensus_transaction(transaction, &authority.metrics.skipped_consensus_txns)
{
let certs = authority
.epoch_store_for_testing()
.process_consensus_transactions_for_tests(
vec![transaction],
&Arc::new(CheckpointServiceNoop {}),
authority.db(),
)
.await
.unwrap();

authority
.transaction_manager()
.enqueue(certs, &authority.epoch_store_for_testing())
.unwrap();
} else {
warn!("Failed to verify certificate: {:?}", cert);
}
.process_consensus_transactions_for_tests(
vec![transaction],
&Arc::new(CheckpointServiceNoop {}),
authority.db(),
&authority.metrics.skipped_consensus_txns,
)
.await
.unwrap();

authority
.transaction_manager()
.enqueue(certs, &authority.epoch_store_for_testing())
.unwrap();
}

pub async fn send_consensus_no_execution(authority: &AuthorityState, cert: &VerifiedCertificate) {
let transaction = SequencedConsensusTransaction::new_test(
ConsensusTransaction::new_certificate_message(&authority.name, cert.clone().into_inner()),
);

if let Ok(transaction) = authority
// Call process_consensus_transaction() instead of handle_consensus_transaction(), to avoid actually executing cert.
// This allows testing cert execution independently.
authority
.epoch_store_for_testing()
.verify_consensus_transaction(transaction, &authority.metrics.skipped_consensus_txns)
{
// Call process_consensus_transaction() instead of handle_consensus_transaction(), to avoid actually executing cert.
// This allows testing cert execution independently.
authority
.epoch_store_for_testing()
.process_consensus_transactions_for_tests(
vec![transaction],
&Arc::new(CheckpointServiceNoop {}),
&authority.db(),
)
.await
.unwrap();
} else {
warn!("Failed to verify certificate: {:?}", cert);
}
.process_consensus_transactions_for_tests(
vec![transaction],
&Arc::new(CheckpointServiceNoop {}),
&authority.db(),
&authority.metrics.skipped_consensus_txns,
)
.await
.unwrap();
}

pub fn build_test_modules_with_dep_addr(
Expand Down
Loading

0 comments on commit 0a60588

Please sign in to comment.