diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index c8df7f4e6a4635..5cc8829f7fc528 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -136,7 +136,6 @@ use crate::authority::authority_store_pruner::{ }; use crate::authority::epoch_start_configuration::EpochStartConfigTrait; use crate::authority::epoch_start_configuration::EpochStartConfiguration; -use crate::checkpoints::checkpoint_executor::CheckpointExecutor; use crate::checkpoints::CheckpointStore; use crate::consensus_adapter::ConsensusAdapter; use crate::epoch::committee_store::CommitteeStore; @@ -2834,7 +2833,6 @@ impl AuthorityState { supported_protocol_versions: SupportedProtocolVersions, new_committee: Committee, epoch_start_configuration: EpochStartConfiguration, - checkpoint_executor: &CheckpointExecutor, accumulator: Arc, expensive_safety_check_config: &ExpensiveSafetyCheckConfig, ) -> SuiResult> { @@ -2853,12 +2851,7 @@ impl AuthorityState { .await?; self.get_reconfig_api() .clear_state_end_of_epoch(&execution_lock); - self.check_system_consistency( - cur_epoch_store, - checkpoint_executor, - accumulator, - expensive_safety_check_config, - ); + self.check_system_consistency(cur_epoch_store, accumulator, expensive_safety_check_config); self.maybe_reaccumulate_state_hash( cur_epoch_store, epoch_start_configuration @@ -2942,7 +2935,6 @@ impl AuthorityState { fn check_system_consistency( &self, cur_epoch_store: &AuthorityPerEpochStore, - checkpoint_executor: &CheckpointExecutor, accumulator: Arc, expensive_safety_check_config: &ExpensiveSafetyCheckConfig, ) { @@ -2973,7 +2965,6 @@ impl AuthorityState { cur_epoch_store.epoch() ); self.expensive_check_is_consistent_state( - checkpoint_executor, accumulator, cur_epoch_store, cfg!(debug_assertions), // panic in debug mode only @@ -2990,7 +2981,6 @@ impl AuthorityState { fn expensive_check_is_consistent_state( &self, - checkpoint_executor: &CheckpointExecutor, accumulator: Arc, cur_epoch_store: &AuthorityPerEpochStore, panic: bool, @@ -3028,7 +3018,7 @@ impl AuthorityState { } if !panic { - checkpoint_executor.set_inconsistent_state(is_inconsistent); + accumulator.set_inconsistent_state(is_inconsistent); } } diff --git a/crates/sui-core/src/authority/authority_test_utils.rs b/crates/sui-core/src/authority/authority_test_utils.rs index 74bd74b49f9b93..335872122d324f 100644 --- a/crates/sui-core/src/authority/authority_test_utils.rs +++ b/crates/sui-core/src/authority/authority_test_utils.rs @@ -88,7 +88,8 @@ pub async fn execute_certificate_with_execution_error( // for testing and regression detection. // We must do this before sending to consensus, otherwise consensus may already // lead to transaction execution and state change. - let state_acc = StateAccumulator::new(authority.get_accumulator_store().clone(), &epoch_store); + let state_acc = + StateAccumulator::new_for_tests(authority.get_accumulator_store().clone(), &epoch_store); let include_wrapped_tombstone = !authority .epoch_store_for_testing() .protocol_config() diff --git a/crates/sui-core/src/checkpoints/checkpoint_executor/metrics.rs b/crates/sui-core/src/checkpoints/checkpoint_executor/metrics.rs index 33b82682416acb..c51c4c6e61a8f5 100644 --- a/crates/sui-core/src/checkpoints/checkpoint_executor/metrics.rs +++ b/crates/sui-core/src/checkpoints/checkpoint_executor/metrics.rs @@ -20,7 +20,6 @@ pub struct CheckpointExecutorMetrics { pub checkpoint_transaction_count: Histogram, pub checkpoint_contents_age_ms: Histogram, pub last_executed_checkpoint_age_ms: Histogram, - pub accumulator_inconsistent_state: IntGauge, } impl CheckpointExecutorMetrics { @@ -87,12 +86,6 @@ impl CheckpointExecutorMetrics { "Age of the last executed checkpoint", registry ), - accumulator_inconsistent_state: register_int_gauge_with_registry!( - "accumulator_inconsistent_state", - "1 if accumulated live object set differs from StateAccumulator root state hash for the previous epoch", - registry, - ) - .unwrap(), }; Arc::new(this) } diff --git a/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs b/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs index 5c6d59430adf43..cdca3bb40aa7b2 100644 --- a/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs +++ b/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs @@ -29,7 +29,6 @@ use either::Either; use futures::stream::FuturesOrdered; use itertools::izip; use mysten_metrics::spawn_monitored_task; -use prometheus::Registry; use sui_config::node::{CheckpointExecutorConfig, RunWithRange}; use sui_macros::{fail_point, fail_point_async}; use sui_types::accumulator::Accumulator; @@ -68,7 +67,8 @@ use crate::{ }; mod data_ingestion_handler; -mod metrics; +pub mod metrics; + #[cfg(test)] pub(crate) mod tests; @@ -157,7 +157,7 @@ impl CheckpointExecutor { state: Arc, accumulator: Arc, config: CheckpointExecutorConfig, - prometheus_registry: &Registry, + metrics: Arc, ) -> Self { Self { mailbox, @@ -168,7 +168,7 @@ impl CheckpointExecutor { tx_manager: state.transaction_manager().clone(), accumulator, config, - metrics: CheckpointExecutorMetrics::new(prometheus_registry), + metrics, } } @@ -178,17 +178,14 @@ impl CheckpointExecutor { state: Arc, accumulator: Arc, ) -> Self { - Self { + Self::new( mailbox, - state: state.clone(), checkpoint_store, - object_cache_reader: state.get_object_cache_reader().clone(), - transaction_cache_reader: state.get_transaction_cache_reader().clone(), - tx_manager: state.transaction_manager().clone(), + state, accumulator, - config: Default::default(), - metrics: CheckpointExecutorMetrics::new_for_tests(), - } + Default::default(), + CheckpointExecutorMetrics::new_for_tests(), + ) } /// Ensure that all checkpoints in the current epoch will be executed. @@ -358,12 +355,6 @@ impl CheckpointExecutor { } } - pub fn set_inconsistent_state(&self, is_inconsistent_state: bool) { - self.metrics - .accumulator_inconsistent_state - .set(is_inconsistent_state as i64); - } - fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) { // Ensure that we are not skipping checkpoints at any point let seq = *checkpoint.sequence_number(); diff --git a/crates/sui-core/src/checkpoints/checkpoint_executor/tests.rs b/crates/sui-core/src/checkpoints/checkpoint_executor/tests.rs index c156a1676879f9..4efa9a57714b2f 100644 --- a/crates/sui-core/src/checkpoints/checkpoint_executor/tests.rs +++ b/crates/sui-core/src/checkpoints/checkpoint_executor/tests.rs @@ -234,7 +234,6 @@ pub async fn test_checkpoint_executor_cross_epoch() { EpochFlag::default_flags_for_new_epoch(&authority_state.config), ) .unwrap(), - &executor, accumulator, &ExpensiveSafetyCheckConfig::default(), ) @@ -394,7 +393,8 @@ async fn init_executor_test( broadcast::channel(buffer_size); let epoch_store = state.epoch_store_for_testing(); - let accumulator = StateAccumulator::new(state.get_accumulator_store().clone(), &epoch_store); + let accumulator = + StateAccumulator::new_for_tests(state.get_accumulator_store().clone(), &epoch_store); let accumulator = Arc::new(accumulator); let executor = CheckpointExecutor::new_for_tests( diff --git a/crates/sui-core/src/checkpoints/mod.rs b/crates/sui-core/src/checkpoints/mod.rs index ca4d94bf9c2c3b..d356b7f1266595 100644 --- a/crates/sui-core/src/checkpoints/mod.rs +++ b/crates/sui-core/src/checkpoints/mod.rs @@ -2504,7 +2504,7 @@ mod tests { let epoch_store = state.epoch_store_for_testing(); let accumulator = - StateAccumulator::new(state.get_accumulator_store().clone(), &epoch_store); + StateAccumulator::new_for_tests(state.get_accumulator_store().clone(), &epoch_store); let (checkpoint_service, _exit) = CheckpointService::spawn( state.clone(), diff --git a/crates/sui-core/src/state_accumulator.rs b/crates/sui-core/src/state_accumulator.rs index 50953405ff957a..775d682cceb1cb 100644 --- a/crates/sui-core/src/state_accumulator.rs +++ b/crates/sui-core/src/state_accumulator.rs @@ -3,6 +3,7 @@ use itertools::Itertools; use mysten_metrics::monitored_scope; +use prometheus::{register_int_gauge_with_registry, IntGauge, Registry}; use serde::Serialize; use sui_protocol_config::ProtocolConfig; use sui_types::base_types::{ObjectID, ObjectRef, SequenceNumber, VersionNumber}; @@ -25,6 +26,24 @@ use sui_types::messages_checkpoint::{CheckpointSequenceNumber, ECMHLiveObjectSet use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore; use crate::authority::authority_store_tables::LiveObject; +pub struct StateAccumulatorMetrics { + inconsistent_state: IntGauge, +} + +impl StateAccumulatorMetrics { + pub fn new(registry: &Registry) -> Arc { + let this = Self { + inconsistent_state: register_int_gauge_with_registry!( + "accumulator_inconsistent_state", + "1 if accumulated live object set differs from StateAccumulator root state hash for the previous epoch", + registry + ) + .unwrap(), + }; + Arc::new(this) + } +} + pub enum StateAccumulator { V1(StateAccumulatorV1), V2(StateAccumulatorV2), @@ -32,10 +51,12 @@ pub enum StateAccumulator { pub struct StateAccumulatorV1 { store: Arc, + metrics: Arc, } pub struct StateAccumulatorV2 { store: Arc, + metrics: Arc, } pub trait AccumulatorStore: ObjectStore + Send + Sync { @@ -366,16 +387,37 @@ impl StateAccumulator { pub fn new( store: Arc, epoch_store: &Arc, + metrics: Arc, ) -> Self { if cfg!(msim) { if epoch_store.state_accumulator_v2_enabled() { - return StateAccumulator::V2(StateAccumulatorV2::new(store)); + return StateAccumulator::V2(StateAccumulatorV2::new(store, metrics)); } else { - return StateAccumulator::V1(StateAccumulatorV1::new(store)); + return StateAccumulator::V1(StateAccumulatorV1::new(store, metrics)); } } - StateAccumulator::V1(StateAccumulatorV1::new(store)) + StateAccumulator::V1(StateAccumulatorV1::new(store, metrics)) + } + + pub fn new_for_tests( + store: Arc, + epoch_store: &Arc, + ) -> Self { + Self::new( + store, + epoch_store, + StateAccumulatorMetrics::new(&Registry::new()), + ) + } + + pub fn set_inconsistent_state(&self, is_inconsistent_state: bool) { + match self { + StateAccumulator::V1(impl_v1) => &impl_v1.metrics, + StateAccumulator::V2(impl_v2) => &impl_v2.metrics, + } + .inconsistent_state + .set(is_inconsistent_state as i64); } /// Accumulates the effects of a single checkpoint and persists the accumulator. @@ -528,8 +570,8 @@ impl StateAccumulator { } impl StateAccumulatorV1 { - pub fn new(store: Arc) -> Self { - Self { store } + pub fn new(store: Arc, metrics: Arc) -> Self { + Self { store, metrics } } /// Unions all checkpoint accumulators at the end of the epoch to generate the @@ -618,8 +660,8 @@ impl StateAccumulatorV1 { } impl StateAccumulatorV2 { - pub fn new(store: Arc) -> Self { - Self { store } + pub fn new(store: Arc, metrics: Arc) -> Self { + Self { store, metrics } } pub async fn accumulate_running_root( diff --git a/crates/sui-core/src/test_utils.rs b/crates/sui-core/src/test_utils.rs index 3c2b8a6ae5ef3c..d9ccdd467af78d 100644 --- a/crates/sui-core/src/test_utils.rs +++ b/crates/sui-core/src/test_utils.rs @@ -80,7 +80,8 @@ pub async fn send_and_confirm_transaction( // // We also check the incremental effects of the transaction on the live object set against StateAccumulator // for testing and regression detection - let state_acc = StateAccumulator::new(authority.get_accumulator_store().clone(), &epoch_store); + let state_acc = + StateAccumulator::new_for_tests(authority.get_accumulator_store().clone(), &epoch_store); let include_wrapped_tombstone = !authority .epoch_store_for_testing() .protocol_config() diff --git a/crates/sui-core/src/unit_tests/narwhal_manager_tests.rs b/crates/sui-core/src/unit_tests/narwhal_manager_tests.rs index 2fb2873ee63d99..a1257f9200745e 100644 --- a/crates/sui-core/src/unit_tests/narwhal_manager_tests.rs +++ b/crates/sui-core/src/unit_tests/narwhal_manager_tests.rs @@ -70,7 +70,8 @@ async fn send_transactions( pub fn checkpoint_service_for_testing(state: Arc) -> Arc { let (output, _result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10); let epoch_store = state.epoch_store_for_testing(); - let accumulator = StateAccumulator::new(state.get_accumulator_store().clone(), &epoch_store); + let accumulator = + StateAccumulator::new_for_tests(state.get_accumulator_store().clone(), &epoch_store); let (certified_output, _certified_result) = mpsc::channel::(10); let (checkpoint_service, _) = CheckpointService::spawn( diff --git a/crates/sui-node/src/lib.rs b/crates/sui-node/src/lib.rs index 9c2f226db3440d..4a48a415b56dcc 100644 --- a/crates/sui-node/src/lib.rs +++ b/crates/sui-node/src/lib.rs @@ -29,6 +29,7 @@ use sui_core::consensus_adapter::SubmitToConsensus; use sui_core::consensus_manager::ConsensusClient; use sui_core::epoch::randomness::RandomnessManager; use sui_core::execution_cache::build_execution_cache; +use sui_core::state_accumulator::StateAccumulatorMetrics; use sui_core::storage::RestReadStore; use sui_core::traffic_controller::metrics::TrafficControllerMetrics; use sui_json_rpc::bridge_api::BridgeReadApi; @@ -67,6 +68,7 @@ use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait; use sui_core::authority::epoch_start_configuration::EpochStartConfiguration; use sui_core::authority_aggregator::AuthorityAggregator; use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics}; +use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics; use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason}; use sui_core::checkpoints::{ CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync, @@ -225,7 +227,7 @@ pub struct SuiNode { state_sync_handle: state_sync::Handle, randomness_handle: randomness::Handle, checkpoint_store: Arc, - accumulator: Arc, + accumulator_components: Mutex, Arc)>>, connection_monitor_status: Arc, /// Broadcast channel to send the starting system state for the next epoch. @@ -714,9 +716,11 @@ impl SuiNode { ) .await?; + let accumulator_metrics = StateAccumulatorMetrics::new(&prometheus_registry); let accumulator = Arc::new(StateAccumulator::new( cache_traits.accumulator_store.clone(), &epoch_store, + accumulator_metrics.clone(), )); let authority_names_to_peer_ids = epoch_store @@ -783,7 +787,7 @@ impl SuiNode { state_sync_handle, randomness_handle, checkpoint_store, - accumulator, + accumulator_components: Mutex::new(Some((accumulator, accumulator_metrics))), end_of_epoch_channel, connection_monitor_status, trusted_peer_change_tx, @@ -1459,17 +1463,23 @@ impl SuiNode { /// This function awaits the completion of checkpoint execution of the current epoch, /// after which it iniitiates reconfiguration of the entire system. pub async fn monitor_reconfiguration(self: Arc) -> Result<()> { - let mut checkpoint_executor = CheckpointExecutor::new( - self.state_sync_handle.subscribe_to_synced_checkpoints(), - self.checkpoint_store.clone(), - self.state.clone(), - self.accumulator.clone(), - self.config.checkpoint_executor_config.clone(), - &self.registry_service.default_registry(), - ); + let checkpoint_executor_metrics = + CheckpointExecutorMetrics::new(&self.registry_service.default_registry()); - let run_with_range = self.config.run_with_range; loop { + let mut accum_components_guard = self.accumulator_components.lock().await; + let (accumulator, accumulator_metrics) = accum_components_guard.take().unwrap(); + let mut checkpoint_executor = CheckpointExecutor::new( + self.state_sync_handle.subscribe_to_synced_checkpoints(), + self.checkpoint_store.clone(), + self.state.clone(), + accumulator.clone(), + self.config.checkpoint_executor_config.clone(), + checkpoint_executor_metrics.clone(), + ); + + let run_with_range = self.config.run_with_range; + let cur_epoch_store = self.state.load_epoch_store_one_call_per_task(); // Advertise capabilities to committee, if we are a validator. @@ -1498,6 +1508,7 @@ impl SuiNode { let stop_condition = checkpoint_executor .run_epoch(cur_epoch_store.clone(), run_with_range) .await; + drop(checkpoint_executor); if stop_condition == StopReason::RunWithRangeCondition { SuiNode::shutdown(&self).await; @@ -1568,6 +1579,7 @@ impl SuiNode { // The following code handles 4 different cases, depending on whether the node // was a validator in the previous epoch, and whether the node is a validator // in the new epoch. + let new_validator_components = if let Some(ValidatorComponents { validator_server_handle, validator_overload_monitor_handle, @@ -1591,10 +1603,22 @@ impl SuiNode { &cur_epoch_store, next_epoch_committee.clone(), new_epoch_start_state, - &checkpoint_executor, + accumulator.clone(), ) .await; + // No other components should be holding a reference to srtate accumulator + // at this point. Confirm here before we swap in the new accumulator. + let _ = Arc::into_inner(accumulator) + .expect("Accumulator should have no references at this point"); + let new_accumulator = Arc::new(StateAccumulator::new( + self.state.get_accumulator_store().clone(), + &new_epoch_store, + accumulator_metrics.clone(), + )); + *accum_components_guard = + Some((new_accumulator.clone(), accumulator_metrics.clone())); + consensus_epoch_data_remover .remove_old_data(next_epoch - 1) .await; @@ -1612,7 +1636,7 @@ impl SuiNode { self.randomness_handle.clone(), consensus_manager, consensus_epoch_data_remover, - self.accumulator.clone(), + new_accumulator, validator_server_handle, validator_overload_monitor_handle, checkpoint_metrics, @@ -1632,10 +1656,22 @@ impl SuiNode { &cur_epoch_store, next_epoch_committee.clone(), new_epoch_start_state, - &checkpoint_executor, + accumulator.clone(), ) .await; + // No other components should be holding a reference to srtate accumulator + // at this point. Confirm here before we swap in the new accumulator. + let _ = Arc::into_inner(accumulator) + .expect("Accumulator should have no references at this point"); + let new_accumulator = Arc::new(StateAccumulator::new( + self.state.get_accumulator_store().clone(), + &new_epoch_store, + accumulator_metrics.clone(), + )); + *accum_components_guard = + Some((new_accumulator.clone(), accumulator_metrics.clone())); + if self.state.is_validator(&new_epoch_store) { info!("Promoting the node from fullnode to validator, starting grpc server"); @@ -1648,7 +1684,7 @@ impl SuiNode { self.checkpoint_store.clone(), self.state_sync_handle.clone(), self.randomness_handle.clone(), - self.accumulator.clone(), + new_accumulator, self.connection_monitor_status.clone(), &self.registry_service, self.metrics.clone(), @@ -1697,7 +1733,7 @@ impl SuiNode { cur_epoch_store: &AuthorityPerEpochStore, next_epoch_committee: Committee, next_epoch_start_system_state: EpochStartSystemState, - checkpoint_executor: &CheckpointExecutor, + accumulator: Arc, ) -> Arc { let next_epoch = next_epoch_committee.epoch(); @@ -1722,8 +1758,7 @@ impl SuiNode { self.config.supported_protocol_versions.unwrap(), next_epoch_committee, epoch_start_configuration, - checkpoint_executor, - self.accumulator.clone(), + accumulator, &self.config.expensive_safety_check_config, ) .await diff --git a/crates/sui-single-node-benchmark/src/single_node.rs b/crates/sui-single-node-benchmark/src/single_node.rs index d020ce6d4ffa43..5b87afcfff846a 100644 --- a/crates/sui-single-node-benchmark/src/single_node.rs +++ b/crates/sui-single-node-benchmark/src/single_node.rs @@ -278,7 +278,7 @@ impl SingleValidator { ckpt_receiver, validator.get_checkpoint_store().clone(), validator.clone(), - Arc::new(StateAccumulator::new( + Arc::new(StateAccumulator::new_for_tests( validator.get_accumulator_store().clone(), self.get_epoch_store(), )),