Skip to content

Commit

Permalink
[core] Make proper per epoch execution components (#18281)
Browse files Browse the repository at this point in the history
## Description 

Make state accumulator a per epoch component. This also requires that we
make checkpoint executor a proper per epoch component (previously it was
only instantiated once, but `run_epoch` called once per epoch) since it
needs to have a reference to the fresh accumulator after reconfig. Now
we actually drop it after the call to `run_epoch` returns.

## Test plan 

Passed against 120+ seeds:
```
./scripts/simtest/seed-search.py simtest --test test_simulated_load_reconfig_with_crashes_and_delays
```

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:

---------

Co-authored-by: Mark Logan <[email protected]>
  • Loading branch information
williampsmith and mystenmark committed Jun 27, 2024
1 parent 926a1c3 commit 36e3652
Show file tree
Hide file tree
Showing 13 changed files with 236 additions and 93 deletions.
14 changes: 2 additions & 12 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ use crate::authority::authority_store_pruner::{
};
use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
use crate::authority::epoch_start_configuration::EpochStartConfiguration;
use crate::checkpoints::checkpoint_executor::CheckpointExecutor;
use crate::checkpoints::CheckpointStore;
use crate::consensus_adapter::ConsensusAdapter;
use crate::epoch::committee_store::CommitteeStore;
Expand Down Expand Up @@ -2838,7 +2837,6 @@ impl AuthorityState {
supported_protocol_versions: SupportedProtocolVersions,
new_committee: Committee,
epoch_start_configuration: EpochStartConfiguration,
checkpoint_executor: &CheckpointExecutor,
accumulator: Arc<StateAccumulator>,
expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
) -> SuiResult<Arc<AuthorityPerEpochStore>> {
Expand All @@ -2857,12 +2855,7 @@ impl AuthorityState {
.await?;
self.get_reconfig_api()
.clear_state_end_of_epoch(&execution_lock);
self.check_system_consistency(
cur_epoch_store,
checkpoint_executor,
accumulator,
expensive_safety_check_config,
);
self.check_system_consistency(cur_epoch_store, accumulator, expensive_safety_check_config);
self.maybe_reaccumulate_state_hash(
cur_epoch_store,
epoch_start_configuration
Expand Down Expand Up @@ -2955,7 +2948,6 @@ impl AuthorityState {
fn check_system_consistency(
&self,
cur_epoch_store: &AuthorityPerEpochStore,
checkpoint_executor: &CheckpointExecutor,
accumulator: Arc<StateAccumulator>,
expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
) {
Expand Down Expand Up @@ -2986,7 +2978,6 @@ impl AuthorityState {
cur_epoch_store.epoch()
);
self.expensive_check_is_consistent_state(
checkpoint_executor,
accumulator,
cur_epoch_store,
cfg!(debug_assertions), // panic in debug mode only
Expand All @@ -3003,7 +2994,6 @@ impl AuthorityState {

fn expensive_check_is_consistent_state(
&self,
checkpoint_executor: &CheckpointExecutor,
accumulator: Arc<StateAccumulator>,
cur_epoch_store: &AuthorityPerEpochStore,
panic: bool,
Expand Down Expand Up @@ -3041,7 +3031,7 @@ impl AuthorityState {
}

if !panic {
checkpoint_executor.set_inconsistent_state(is_inconsistent);
accumulator.set_inconsistent_state(is_inconsistent);
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,7 @@ impl AuthorityPerEpochStore {
epoch_id
);
let epoch_start_configuration = Arc::new(epoch_start_configuration);
info!("epoch flags: {:?}", epoch_start_configuration.flags());
metrics.current_epoch.set(epoch_id as i64);
metrics
.current_voting_right
Expand Down
3 changes: 2 additions & 1 deletion crates/sui-core/src/authority/authority_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ pub async fn execute_certificate_with_execution_error(
// for testing and regression detection.
// We must do this before sending to consensus, otherwise consensus may already
// lead to transaction execution and state change.
let state_acc = StateAccumulator::new(authority.get_accumulator_store().clone(), &epoch_store);
let state_acc =
StateAccumulator::new_for_tests(authority.get_accumulator_store().clone(), &epoch_store);
let include_wrapped_tombstone = !authority
.epoch_store_for_testing()
.protocol_config()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ pub struct CheckpointExecutorMetrics {
pub checkpoint_transaction_count: Histogram,
pub checkpoint_contents_age_ms: Histogram,
pub last_executed_checkpoint_age_ms: Histogram,
pub accumulator_inconsistent_state: IntGauge,
}

impl CheckpointExecutorMetrics {
Expand Down Expand Up @@ -87,12 +86,6 @@ impl CheckpointExecutorMetrics {
"Age of the last executed checkpoint",
registry
),
accumulator_inconsistent_state: register_int_gauge_with_registry!(
"accumulator_inconsistent_state",
"1 if accumulated live object set differs from StateAccumulator root state hash for the previous epoch",
registry,
)
.unwrap(),
};
Arc::new(this)
}
Expand Down
27 changes: 9 additions & 18 deletions crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use either::Either;
use futures::stream::FuturesOrdered;
use itertools::izip;
use mysten_metrics::spawn_monitored_task;
use prometheus::Registry;
use sui_config::node::{CheckpointExecutorConfig, RunWithRange};
use sui_macros::{fail_point, fail_point_async};
use sui_types::accumulator::Accumulator;
Expand Down Expand Up @@ -68,7 +67,8 @@ use crate::{
};

mod data_ingestion_handler;
mod metrics;
pub mod metrics;

#[cfg(test)]
pub(crate) mod tests;

Expand Down Expand Up @@ -157,7 +157,7 @@ impl CheckpointExecutor {
state: Arc<AuthorityState>,
accumulator: Arc<StateAccumulator>,
config: CheckpointExecutorConfig,
prometheus_registry: &Registry,
metrics: Arc<CheckpointExecutorMetrics>,
) -> Self {
Self {
mailbox,
Expand All @@ -168,7 +168,7 @@ impl CheckpointExecutor {
tx_manager: state.transaction_manager().clone(),
accumulator,
config,
metrics: CheckpointExecutorMetrics::new(prometheus_registry),
metrics,
}
}

Expand All @@ -178,17 +178,14 @@ impl CheckpointExecutor {
state: Arc<AuthorityState>,
accumulator: Arc<StateAccumulator>,
) -> Self {
Self {
Self::new(
mailbox,
state: state.clone(),
checkpoint_store,
object_cache_reader: state.get_object_cache_reader().clone(),
transaction_cache_reader: state.get_transaction_cache_reader().clone(),
tx_manager: state.transaction_manager().clone(),
state,
accumulator,
config: Default::default(),
metrics: CheckpointExecutorMetrics::new_for_tests(),
}
Default::default(),
CheckpointExecutorMetrics::new_for_tests(),
)
}

/// Ensure that all checkpoints in the current epoch will be executed.
Expand Down Expand Up @@ -358,12 +355,6 @@ impl CheckpointExecutor {
}
}

pub fn set_inconsistent_state(&self, is_inconsistent_state: bool) {
self.metrics
.accumulator_inconsistent_state
.set(is_inconsistent_state as i64);
}

fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) {
// Ensure that we are not skipping checkpoints at any point
let seq = *checkpoint.sequence_number();
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-core/src/checkpoints/checkpoint_executor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ pub async fn test_checkpoint_executor_cross_epoch() {
EpochFlag::default_flags_for_new_epoch(&authority_state.config),
)
.unwrap(),
&executor,
accumulator,
&ExpensiveSafetyCheckConfig::default(),
)
Expand Down Expand Up @@ -394,7 +393,8 @@ async fn init_executor_test(
broadcast::channel(buffer_size);
let epoch_store = state.epoch_store_for_testing();

let accumulator = StateAccumulator::new(state.get_accumulator_store().clone(), &epoch_store);
let accumulator =
StateAccumulator::new_for_tests(state.get_accumulator_store().clone(), &epoch_store);
let accumulator = Arc::new(accumulator);

let executor = CheckpointExecutor::new_for_tests(
Expand Down
46 changes: 27 additions & 19 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use std::fs::File;
use std::io::Write;
use std::path::Path;
use std::sync::Arc;
use std::sync::Weak;
use std::time::Duration;
use sui_protocol_config::ProtocolVersion;
use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest};
Expand Down Expand Up @@ -859,7 +860,7 @@ pub struct CheckpointBuilder {
notify: Arc<Notify>,
notify_aggregator: Arc<Notify>,
effects_store: Arc<dyn TransactionCacheRead>,
accumulator: Arc<StateAccumulator>,
accumulator: Weak<StateAccumulator>,
output: Box<dyn CheckpointOutput>,
exit: watch::Receiver<()>,
metrics: Arc<CheckpointMetrics>,
Expand Down Expand Up @@ -897,7 +898,7 @@ impl CheckpointBuilder {
epoch_store: Arc<AuthorityPerEpochStore>,
notify: Arc<Notify>,
effects_store: Arc<dyn TransactionCacheRead>,
accumulator: Arc<StateAccumulator>,
accumulator: Weak<StateAccumulator>,
output: Box<dyn CheckpointOutput>,
exit: watch::Receiver<()>,
notify_aggregator: Arc<Notify>,
Expand Down Expand Up @@ -1432,19 +1433,24 @@ impl CheckpointBuilder {
let committee = system_state_obj.get_current_epoch_committee().committee;

// This must happen after the call to augment_epoch_last_checkpoint,
// otherwise we will not capture the change_epoch tx
let acc = self.accumulator.accumulate_checkpoint(
effects.clone(),
sequence_number,
&self.epoch_store,
)?;
self.accumulator
.accumulate_running_root(&self.epoch_store, sequence_number, Some(acc))
.await?;
let root_state_digest = self
.accumulator
.digest_epoch(self.epoch_store.clone(), sequence_number)
.await?;
// otherwise we will not capture the change_epoch tx.
let root_state_digest = {
let state_acc = self
.accumulator
.upgrade()
.expect("No checkpoints should be getting built after local configuration");
let acc = state_acc.accumulate_checkpoint(
effects.clone(),
sequence_number,
&self.epoch_store,
)?;
state_acc
.accumulate_running_root(&self.epoch_store, sequence_number, Some(acc))
.await?;
state_acc
.digest_epoch(self.epoch_store.clone(), sequence_number)
.await?
};
self.metrics.highest_accumulated_epoch.set(epoch as i64);
info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");

Expand Down Expand Up @@ -2213,7 +2219,7 @@ impl CheckpointService {
checkpoint_store: Arc<CheckpointStore>,
epoch_store: Arc<AuthorityPerEpochStore>,
effects_store: Arc<dyn TransactionCacheRead>,
accumulator: Arc<StateAccumulator>,
accumulator: Weak<StateAccumulator>,
checkpoint_output: Box<dyn CheckpointOutput>,
certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
metrics: Arc<CheckpointMetrics>,
Expand Down Expand Up @@ -2503,15 +2509,17 @@ mod tests {
let checkpoint_store = CheckpointStore::new(ckpt_dir.path());
let epoch_store = state.epoch_store_for_testing();

let accumulator =
StateAccumulator::new(state.get_accumulator_store().clone(), &epoch_store);
let accumulator = Arc::new(StateAccumulator::new_for_tests(
state.get_accumulator_store().clone(),
&epoch_store,
));

let (checkpoint_service, _exit) = CheckpointService::spawn(
state.clone(),
checkpoint_store,
epoch_store.clone(),
store,
Arc::new(accumulator),
Arc::downgrade(&accumulator),
Box::new(output),
Box::new(certified_output),
CheckpointMetrics::new_for_tests(),
Expand Down
Loading

0 comments on commit 36e3652

Please sign in to comment.