diff --git a/crates/sui-core/src/checkpoints/mod.rs b/crates/sui-core/src/checkpoints/mod.rs index d356b7f1266595..b3b5b4dd0071f3 100644 --- a/crates/sui-core/src/checkpoints/mod.rs +++ b/crates/sui-core/src/checkpoints/mod.rs @@ -41,6 +41,7 @@ use std::fs::File; use std::io::Write; use std::path::Path; use std::sync::Arc; +use std::sync::Weak; use std::time::Duration; use sui_protocol_config::ProtocolVersion; use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest}; @@ -859,7 +860,7 @@ pub struct CheckpointBuilder { notify: Arc, notify_aggregator: Arc, effects_store: Arc, - accumulator: Arc, + accumulator: Weak, output: Box, exit: watch::Receiver<()>, metrics: Arc, @@ -897,7 +898,7 @@ impl CheckpointBuilder { epoch_store: Arc, notify: Arc, effects_store: Arc, - accumulator: Arc, + accumulator: Weak, output: Box, exit: watch::Receiver<()>, notify_aggregator: Arc, @@ -1432,19 +1433,24 @@ impl CheckpointBuilder { let committee = system_state_obj.get_current_epoch_committee().committee; // This must happen after the call to augment_epoch_last_checkpoint, - // otherwise we will not capture the change_epoch tx - let acc = self.accumulator.accumulate_checkpoint( - effects.clone(), - sequence_number, - &self.epoch_store, - )?; - self.accumulator - .accumulate_running_root(&self.epoch_store, sequence_number, Some(acc)) - .await?; - let root_state_digest = self - .accumulator - .digest_epoch(self.epoch_store.clone(), sequence_number) - .await?; + // otherwise we will not capture the change_epoch tx. + let root_state_digest = { + let state_acc = self + .accumulator + .upgrade() + .expect("No checkpoints should be getting built after local configuration"); + let acc = state_acc.accumulate_checkpoint( + effects.clone(), + sequence_number, + &self.epoch_store, + )?; + state_acc + .accumulate_running_root(&self.epoch_store, sequence_number, Some(acc)) + .await?; + state_acc + .digest_epoch(self.epoch_store.clone(), sequence_number) + .await? + }; self.metrics.highest_accumulated_epoch.set(epoch as i64); info!("Epoch {epoch} root state hash digest: {root_state_digest:?}"); @@ -2213,7 +2219,7 @@ impl CheckpointService { checkpoint_store: Arc, epoch_store: Arc, effects_store: Arc, - accumulator: Arc, + accumulator: Weak, checkpoint_output: Box, certified_checkpoint_output: Box, metrics: Arc, @@ -2503,15 +2509,17 @@ mod tests { let checkpoint_store = CheckpointStore::new(ckpt_dir.path()); let epoch_store = state.epoch_store_for_testing(); - let accumulator = - StateAccumulator::new_for_tests(state.get_accumulator_store().clone(), &epoch_store); + let accumulator = Arc::new(StateAccumulator::new_for_tests( + state.get_accumulator_store().clone(), + &epoch_store, + )); let (checkpoint_service, _exit) = CheckpointService::spawn( state.clone(), checkpoint_store, epoch_store.clone(), store, - Arc::new(accumulator), + accumulator.downgrade(), Box::new(output), Box::new(certified_output), CheckpointMetrics::new_for_tests(), diff --git a/crates/sui-node/src/lib.rs b/crates/sui-node/src/lib.rs index 08637cb3613778..2b490675663866 100644 --- a/crates/sui-node/src/lib.rs +++ b/crates/sui-node/src/lib.rs @@ -20,7 +20,7 @@ use std::path::PathBuf; use std::str::FromStr; #[cfg(msim)] use std::sync::atomic::Ordering; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use std::time::Duration; use sui_core::authority::epoch_start_configuration::EpochFlag; use sui_core::authority::RandomnessRoundReceiver; @@ -757,7 +757,7 @@ impl SuiNode { checkpoint_store.clone(), state_sync_handle.clone(), randomness_handle.clone(), - accumulator.clone(), + Arc::downgrade(&accumulator), connection_monitor_status.clone(), ®istry_service, sui_node_metrics.clone(), @@ -1119,7 +1119,7 @@ impl SuiNode { checkpoint_store: Arc, state_sync_handle: state_sync::Handle, randomness_handle: randomness::Handle, - accumulator: Arc, + accumulator: Weak, connection_monitor_status: Arc, registry_service: &RegistryService, sui_node_metrics: Arc, @@ -1209,7 +1209,7 @@ impl SuiNode { randomness_handle: randomness::Handle, consensus_manager: ConsensusManager, consensus_epoch_data_remover: EpochDataRemover, - accumulator: Arc, + accumulator: Weak, validator_server_handle: JoinHandle>, validator_overload_monitor_handle: Option>, checkpoint_metrics: Arc, @@ -1313,7 +1313,7 @@ impl SuiNode { epoch_store: Arc, state: Arc, state_sync_handle: state_sync::Handle, - accumulator: Arc, + accumulator: Weak, checkpoint_metrics: Arc, ) -> (Arc, watch::Sender<()>) { let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms(); @@ -1607,17 +1607,17 @@ impl SuiNode { ) .await; - // No other components should be referencing state accumulator at this point. - // However, since checkpoint_service shutdown is asynchronous, we can't trust - // strong reference count. However it should still be safe to swap out state - // accumulator since we know we have already reached EndOfPublish + // No other components should be holding a strong reference to state accumulator + // at this point. Confirm here before we swap in the new accumulator. + Arc::into_inner(accumulator) + .expect("Accumulator should have no other references at this point"); let new_accumulator = Arc::new(StateAccumulator::new( self.state.get_accumulator_store().clone(), &new_epoch_store, accumulator_metrics.clone(), )); - *accum_components_guard = - Some((new_accumulator.clone(), accumulator_metrics.clone())); + let weak_accumulator = Arc::downgrade(&new_accumulator); + *accum_components_guard = Some((new_accumulator, accumulator_metrics.clone())); consensus_epoch_data_remover .remove_old_data(next_epoch - 1) @@ -1636,7 +1636,7 @@ impl SuiNode { self.randomness_handle.clone(), consensus_manager, consensus_epoch_data_remover, - new_accumulator, + weak_accumulator, validator_server_handle, validator_overload_monitor_handle, checkpoint_metrics, @@ -1660,17 +1660,17 @@ impl SuiNode { ) .await; - // No other components should be holding a reference to state accumulator + // No other components should be holding a strong reference to state accumulator // at this point. Confirm here before we swap in the new accumulator. - let _ = Arc::into_inner(accumulator) + Arc::into_inner(accumulator) .expect("Accumulator should have no other references at this point"); let new_accumulator = Arc::new(StateAccumulator::new( self.state.get_accumulator_store().clone(), &new_epoch_store, accumulator_metrics.clone(), )); - *accum_components_guard = - Some((new_accumulator.clone(), accumulator_metrics.clone())); + let weak_accumulator = Arc::downgrade(&new_accumulator); + *accum_components_guard = Some((new_accumulator, accumulator_metrics.clone())); if self.state.is_validator(&new_epoch_store) { info!("Promoting the node from fullnode to validator, starting grpc server"); @@ -1684,7 +1684,7 @@ impl SuiNode { self.checkpoint_store.clone(), self.state_sync_handle.clone(), self.randomness_handle.clone(), - new_accumulator, + weak_accumulator, self.connection_monitor_status.clone(), &self.registry_service, self.metrics.clone(),