Skip to content

Commit

Permalink
use Weak<StateAccumulator> for CheckpointService
Browse files Browse the repository at this point in the history
  • Loading branch information
williampsmith committed Jun 17, 2024
1 parent 34d51e7 commit 46c8eb1
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 39 deletions.
46 changes: 27 additions & 19 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use std::fs::File;
use std::io::Write;
use std::path::Path;
use std::sync::Arc;
use std::sync::Weak;
use std::time::Duration;
use sui_protocol_config::ProtocolVersion;
use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest};
Expand Down Expand Up @@ -859,7 +860,7 @@ pub struct CheckpointBuilder {
notify: Arc<Notify>,
notify_aggregator: Arc<Notify>,
effects_store: Arc<dyn TransactionCacheRead>,
accumulator: Arc<StateAccumulator>,
accumulator: Weak<StateAccumulator>,
output: Box<dyn CheckpointOutput>,
exit: watch::Receiver<()>,
metrics: Arc<CheckpointMetrics>,
Expand Down Expand Up @@ -897,7 +898,7 @@ impl CheckpointBuilder {
epoch_store: Arc<AuthorityPerEpochStore>,
notify: Arc<Notify>,
effects_store: Arc<dyn TransactionCacheRead>,
accumulator: Arc<StateAccumulator>,
accumulator: Weak<StateAccumulator>,
output: Box<dyn CheckpointOutput>,
exit: watch::Receiver<()>,
notify_aggregator: Arc<Notify>,
Expand Down Expand Up @@ -1432,19 +1433,24 @@ impl CheckpointBuilder {
let committee = system_state_obj.get_current_epoch_committee().committee;

// This must happen after the call to augment_epoch_last_checkpoint,
// otherwise we will not capture the change_epoch tx
let acc = self.accumulator.accumulate_checkpoint(
effects.clone(),
sequence_number,
&self.epoch_store,
)?;
self.accumulator
.accumulate_running_root(&self.epoch_store, sequence_number, Some(acc))
.await?;
let root_state_digest = self
.accumulator
.digest_epoch(self.epoch_store.clone(), sequence_number)
.await?;
// otherwise we will not capture the change_epoch tx.
let root_state_digest = {
let state_acc = self
.accumulator
.upgrade()
.expect("No checkpoints should be getting built after local configuration");
let acc = state_acc.accumulate_checkpoint(
effects.clone(),
sequence_number,
&self.epoch_store,
)?;
state_acc
.accumulate_running_root(&self.epoch_store, sequence_number, Some(acc))
.await?;
state_acc
.digest_epoch(self.epoch_store.clone(), sequence_number)
.await?
};
self.metrics.highest_accumulated_epoch.set(epoch as i64);
info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");

Expand Down Expand Up @@ -2213,7 +2219,7 @@ impl CheckpointService {
checkpoint_store: Arc<CheckpointStore>,
epoch_store: Arc<AuthorityPerEpochStore>,
effects_store: Arc<dyn TransactionCacheRead>,
accumulator: Arc<StateAccumulator>,
accumulator: Weak<StateAccumulator>,
checkpoint_output: Box<dyn CheckpointOutput>,
certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
metrics: Arc<CheckpointMetrics>,
Expand Down Expand Up @@ -2503,15 +2509,17 @@ mod tests {
let checkpoint_store = CheckpointStore::new(ckpt_dir.path());
let epoch_store = state.epoch_store_for_testing();

let accumulator =
StateAccumulator::new_for_tests(state.get_accumulator_store().clone(), &epoch_store);
let accumulator = Arc::new(StateAccumulator::new_for_tests(
state.get_accumulator_store().clone(),
&epoch_store,
));

let (checkpoint_service, _exit) = CheckpointService::spawn(
state.clone(),
checkpoint_store,
epoch_store.clone(),
store,
Arc::new(accumulator),
Arc::downgrade(&accumulator),
Box::new(output),
Box::new(certified_output),
CheckpointMetrics::new_for_tests(),
Expand Down
8 changes: 5 additions & 3 deletions crates/sui-core/src/unit_tests/narwhal_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,18 @@ async fn send_transactions(
pub fn checkpoint_service_for_testing(state: Arc<AuthorityState>) -> Arc<CheckpointService> {
let (output, _result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
let epoch_store = state.epoch_store_for_testing();
let accumulator =
StateAccumulator::new_for_tests(state.get_accumulator_store().clone(), &epoch_store);
let accumulator = Arc::new(StateAccumulator::new_for_tests(
state.get_accumulator_store().clone(),
&epoch_store,
));
let (certified_output, _certified_result) = mpsc::channel::<CertifiedCheckpointSummary>(10);

let (checkpoint_service, _) = CheckpointService::spawn(
state.clone(),
state.get_checkpoint_store().clone(),
epoch_store.clone(),
state.get_transaction_cache_reader().clone(),
Arc::new(accumulator),
Arc::downgrade(&accumulator),
Box::new(output),
Box::new(certified_output),
CheckpointMetrics::new_for_tests(),
Expand Down
34 changes: 17 additions & 17 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::path::PathBuf;
use std::str::FromStr;
#[cfg(msim)]
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::{Arc, Weak};
use std::time::Duration;
use sui_core::authority::epoch_start_configuration::EpochFlag;
use sui_core::authority::RandomnessRoundReceiver;
Expand Down Expand Up @@ -757,7 +757,7 @@ impl SuiNode {
checkpoint_store.clone(),
state_sync_handle.clone(),
randomness_handle.clone(),
accumulator.clone(),
Arc::downgrade(&accumulator),
connection_monitor_status.clone(),
&registry_service,
sui_node_metrics.clone(),
Expand Down Expand Up @@ -1119,7 +1119,7 @@ impl SuiNode {
checkpoint_store: Arc<CheckpointStore>,
state_sync_handle: state_sync::Handle,
randomness_handle: randomness::Handle,
accumulator: Arc<StateAccumulator>,
accumulator: Weak<StateAccumulator>,
connection_monitor_status: Arc<ConnectionMonitorStatus>,
registry_service: &RegistryService,
sui_node_metrics: Arc<SuiNodeMetrics>,
Expand Down Expand Up @@ -1209,7 +1209,7 @@ impl SuiNode {
randomness_handle: randomness::Handle,
consensus_manager: ConsensusManager,
consensus_epoch_data_remover: EpochDataRemover,
accumulator: Arc<StateAccumulator>,
accumulator: Weak<StateAccumulator>,
validator_server_handle: JoinHandle<Result<()>>,
validator_overload_monitor_handle: Option<JoinHandle<()>>,
checkpoint_metrics: Arc<CheckpointMetrics>,
Expand Down Expand Up @@ -1313,7 +1313,7 @@ impl SuiNode {
epoch_store: Arc<AuthorityPerEpochStore>,
state: Arc<AuthorityState>,
state_sync_handle: state_sync::Handle,
accumulator: Arc<StateAccumulator>,
accumulator: Weak<StateAccumulator>,
checkpoint_metrics: Arc<CheckpointMetrics>,
) -> (Arc<CheckpointService>, watch::Sender<()>) {
let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
Expand Down Expand Up @@ -1607,17 +1607,17 @@ impl SuiNode {
)
.await;

// No other components should be referencing state accumulator at this point.
// However, since checkpoint_service shutdown is asynchronous, we can't trust
// strong reference count. However it should still be safe to swap out state
// accumulator since we know we have already reached EndOfPublish
// No other components should be holding a strong reference to state accumulator
// at this point. Confirm here before we swap in the new accumulator.
Arc::into_inner(accumulator)
.expect("Accumulator should have no other references at this point");
let new_accumulator = Arc::new(StateAccumulator::new(
self.state.get_accumulator_store().clone(),
&new_epoch_store,
accumulator_metrics.clone(),
));
*accum_components_guard =
Some((new_accumulator.clone(), accumulator_metrics.clone()));
let weak_accumulator = Arc::downgrade(&new_accumulator);
*accum_components_guard = Some((new_accumulator, accumulator_metrics.clone()));

consensus_epoch_data_remover
.remove_old_data(next_epoch - 1)
Expand All @@ -1636,7 +1636,7 @@ impl SuiNode {
self.randomness_handle.clone(),
consensus_manager,
consensus_epoch_data_remover,
new_accumulator,
weak_accumulator,
validator_server_handle,
validator_overload_monitor_handle,
checkpoint_metrics,
Expand All @@ -1660,17 +1660,17 @@ impl SuiNode {
)
.await;

// No other components should be holding a reference to state accumulator
// No other components should be holding a strong reference to state accumulator
// at this point. Confirm here before we swap in the new accumulator.
let _ = Arc::into_inner(accumulator)
Arc::into_inner(accumulator)
.expect("Accumulator should have no other references at this point");
let new_accumulator = Arc::new(StateAccumulator::new(
self.state.get_accumulator_store().clone(),
&new_epoch_store,
accumulator_metrics.clone(),
));
*accum_components_guard =
Some((new_accumulator.clone(), accumulator_metrics.clone()));
let weak_accumulator = Arc::downgrade(&new_accumulator);
*accum_components_guard = Some((new_accumulator, accumulator_metrics.clone()));

if self.state.is_validator(&new_epoch_store) {
info!("Promoting the node from fullnode to validator, starting grpc server");
Expand All @@ -1684,7 +1684,7 @@ impl SuiNode {
self.checkpoint_store.clone(),
self.state_sync_handle.clone(),
self.randomness_handle.clone(),
new_accumulator,
weak_accumulator,
self.connection_monitor_status.clone(),
&self.registry_service,
self.metrics.clone(),
Expand Down

0 comments on commit 46c8eb1

Please sign in to comment.