Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Make proper per epoch execution components #18281

Merged
merged 5 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 2 additions & 12 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ use crate::authority::authority_store_pruner::{
};
use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
use crate::authority::epoch_start_configuration::EpochStartConfiguration;
use crate::checkpoints::checkpoint_executor::CheckpointExecutor;
use crate::checkpoints::CheckpointStore;
use crate::consensus_adapter::ConsensusAdapter;
use crate::epoch::committee_store::CommitteeStore;
Expand Down Expand Up @@ -2848,7 +2847,6 @@ impl AuthorityState {
supported_protocol_versions: SupportedProtocolVersions,
new_committee: Committee,
epoch_start_configuration: EpochStartConfiguration,
checkpoint_executor: &CheckpointExecutor,
accumulator: Arc<StateAccumulator>,
expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
) -> SuiResult<Arc<AuthorityPerEpochStore>> {
Expand All @@ -2867,12 +2865,7 @@ impl AuthorityState {
.await?;
self.get_reconfig_api()
.clear_state_end_of_epoch(&execution_lock);
self.check_system_consistency(
cur_epoch_store,
checkpoint_executor,
accumulator,
expensive_safety_check_config,
);
self.check_system_consistency(cur_epoch_store, accumulator, expensive_safety_check_config);
self.maybe_reaccumulate_state_hash(
cur_epoch_store,
epoch_start_configuration
Expand Down Expand Up @@ -2965,7 +2958,6 @@ impl AuthorityState {
fn check_system_consistency(
&self,
cur_epoch_store: &AuthorityPerEpochStore,
checkpoint_executor: &CheckpointExecutor,
accumulator: Arc<StateAccumulator>,
expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
) {
Expand Down Expand Up @@ -2996,7 +2988,6 @@ impl AuthorityState {
cur_epoch_store.epoch()
);
self.expensive_check_is_consistent_state(
checkpoint_executor,
accumulator,
cur_epoch_store,
cfg!(debug_assertions), // panic in debug mode only
Expand All @@ -3013,7 +3004,6 @@ impl AuthorityState {

fn expensive_check_is_consistent_state(
&self,
checkpoint_executor: &CheckpointExecutor,
accumulator: Arc<StateAccumulator>,
cur_epoch_store: &AuthorityPerEpochStore,
panic: bool,
Expand Down Expand Up @@ -3051,7 +3041,7 @@ impl AuthorityState {
}

if !panic {
checkpoint_executor.set_inconsistent_state(is_inconsistent);
accumulator.set_inconsistent_state(is_inconsistent);
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,7 @@ impl AuthorityPerEpochStore {
epoch_id
);
let epoch_start_configuration = Arc::new(epoch_start_configuration);
info!("epoch flags: {:?}", epoch_start_configuration.flags());
metrics.current_epoch.set(epoch_id as i64);
metrics
.current_voting_right
Expand Down
3 changes: 2 additions & 1 deletion crates/sui-core/src/authority/authority_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ pub async fn execute_certificate_with_execution_error(
// for testing and regression detection.
// We must do this before sending to consensus, otherwise consensus may already
// lead to transaction execution and state change.
let state_acc = StateAccumulator::new(authority.get_accumulator_store().clone(), &epoch_store);
let state_acc =
StateAccumulator::new_for_tests(authority.get_accumulator_store().clone(), &epoch_store);
let include_wrapped_tombstone = !authority
.epoch_store_for_testing()
.protocol_config()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ pub struct CheckpointExecutorMetrics {
pub checkpoint_transaction_count: Histogram,
pub checkpoint_contents_age_ms: Histogram,
pub last_executed_checkpoint_age_ms: Histogram,
pub accumulator_inconsistent_state: IntGauge,
}

impl CheckpointExecutorMetrics {
Expand Down Expand Up @@ -87,12 +86,6 @@ impl CheckpointExecutorMetrics {
"Age of the last executed checkpoint",
registry
),
accumulator_inconsistent_state: register_int_gauge_with_registry!(
"accumulator_inconsistent_state",
"1 if accumulated live object set differs from StateAccumulator root state hash for the previous epoch",
registry,
)
.unwrap(),
};
Arc::new(this)
}
Expand Down
27 changes: 9 additions & 18 deletions crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use either::Either;
use futures::stream::FuturesOrdered;
use itertools::izip;
use mysten_metrics::spawn_monitored_task;
use prometheus::Registry;
use sui_config::node::{CheckpointExecutorConfig, RunWithRange};
use sui_macros::{fail_point, fail_point_async};
use sui_types::accumulator::Accumulator;
Expand Down Expand Up @@ -68,7 +67,8 @@ use crate::{
};

mod data_ingestion_handler;
mod metrics;
pub mod metrics;

#[cfg(test)]
pub(crate) mod tests;

Expand Down Expand Up @@ -157,7 +157,7 @@ impl CheckpointExecutor {
state: Arc<AuthorityState>,
accumulator: Arc<StateAccumulator>,
config: CheckpointExecutorConfig,
prometheus_registry: &Registry,
metrics: Arc<CheckpointExecutorMetrics>,
) -> Self {
Self {
mailbox,
Expand All @@ -168,7 +168,7 @@ impl CheckpointExecutor {
tx_manager: state.transaction_manager().clone(),
accumulator,
config,
metrics: CheckpointExecutorMetrics::new(prometheus_registry),
metrics,
}
}

Expand All @@ -178,17 +178,14 @@ impl CheckpointExecutor {
state: Arc<AuthorityState>,
accumulator: Arc<StateAccumulator>,
) -> Self {
Self {
Self::new(
mailbox,
state: state.clone(),
checkpoint_store,
object_cache_reader: state.get_object_cache_reader().clone(),
transaction_cache_reader: state.get_transaction_cache_reader().clone(),
tx_manager: state.transaction_manager().clone(),
state,
accumulator,
config: Default::default(),
metrics: CheckpointExecutorMetrics::new_for_tests(),
}
Default::default(),
CheckpointExecutorMetrics::new_for_tests(),
)
}

/// Ensure that all checkpoints in the current epoch will be executed.
Expand Down Expand Up @@ -358,12 +355,6 @@ impl CheckpointExecutor {
}
}

pub fn set_inconsistent_state(&self, is_inconsistent_state: bool) {
self.metrics
.accumulator_inconsistent_state
.set(is_inconsistent_state as i64);
}

fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) {
// Ensure that we are not skipping checkpoints at any point
let seq = *checkpoint.sequence_number();
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-core/src/checkpoints/checkpoint_executor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ pub async fn test_checkpoint_executor_cross_epoch() {
EpochFlag::default_flags_for_new_epoch(&authority_state.config),
)
.unwrap(),
&executor,
accumulator,
&ExpensiveSafetyCheckConfig::default(),
)
Expand Down Expand Up @@ -394,7 +393,8 @@ async fn init_executor_test(
broadcast::channel(buffer_size);
let epoch_store = state.epoch_store_for_testing();

let accumulator = StateAccumulator::new(state.get_accumulator_store().clone(), &epoch_store);
let accumulator =
StateAccumulator::new_for_tests(state.get_accumulator_store().clone(), &epoch_store);
let accumulator = Arc::new(accumulator);

let executor = CheckpointExecutor::new_for_tests(
Expand Down
46 changes: 27 additions & 19 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use std::fs::File;
use std::io::Write;
use std::path::Path;
use std::sync::Arc;
use std::sync::Weak;
use std::time::Duration;
use sui_protocol_config::ProtocolVersion;
use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest};
Expand Down Expand Up @@ -859,7 +860,7 @@ pub struct CheckpointBuilder {
notify: Arc<Notify>,
notify_aggregator: Arc<Notify>,
effects_store: Arc<dyn TransactionCacheRead>,
accumulator: Arc<StateAccumulator>,
accumulator: Weak<StateAccumulator>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this Weak? We recreate checkpoint builder every epoch, right?

Copy link
Contributor Author

@williampsmith williampsmith Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the comments between @mystenmark and I from a previous commit, but to answer here, yes we recreate checkpoint builder every epoch, but this allows us to enforce the invariant in general that there are no dangling references to state accumulator when we tear it down and replace it. To safeguard against a bug creeping in if some other component later references state accumulator for example.

We can't make that guarantee if we have this remain an Arc because in that case we would race with checkpoint builder shutdown to count the strong references (checkpoint service is shutdown via an async message channel)

output: Box<dyn CheckpointOutput>,
exit: watch::Receiver<()>,
metrics: Arc<CheckpointMetrics>,
Expand Down Expand Up @@ -897,7 +898,7 @@ impl CheckpointBuilder {
epoch_store: Arc<AuthorityPerEpochStore>,
notify: Arc<Notify>,
effects_store: Arc<dyn TransactionCacheRead>,
accumulator: Arc<StateAccumulator>,
accumulator: Weak<StateAccumulator>,
output: Box<dyn CheckpointOutput>,
exit: watch::Receiver<()>,
notify_aggregator: Arc<Notify>,
Expand Down Expand Up @@ -1432,19 +1433,24 @@ impl CheckpointBuilder {
let committee = system_state_obj.get_current_epoch_committee().committee;

// This must happen after the call to augment_epoch_last_checkpoint,
// otherwise we will not capture the change_epoch tx
let acc = self.accumulator.accumulate_checkpoint(
effects.clone(),
sequence_number,
&self.epoch_store,
)?;
self.accumulator
.accumulate_running_root(&self.epoch_store, sequence_number, Some(acc))
.await?;
let root_state_digest = self
.accumulator
.digest_epoch(self.epoch_store.clone(), sequence_number)
.await?;
// otherwise we will not capture the change_epoch tx.
let root_state_digest = {
let state_acc = self
.accumulator
.upgrade()
.expect("No checkpoints should be getting built after local configuration");
let acc = state_acc.accumulate_checkpoint(
effects.clone(),
sequence_number,
&self.epoch_store,
)?;
state_acc
.accumulate_running_root(&self.epoch_store, sequence_number, Some(acc))
.await?;
state_acc
.digest_epoch(self.epoch_store.clone(), sequence_number)
.await?
};
self.metrics.highest_accumulated_epoch.set(epoch as i64);
info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");

Expand Down Expand Up @@ -2213,7 +2219,7 @@ impl CheckpointService {
checkpoint_store: Arc<CheckpointStore>,
epoch_store: Arc<AuthorityPerEpochStore>,
effects_store: Arc<dyn TransactionCacheRead>,
accumulator: Arc<StateAccumulator>,
accumulator: Weak<StateAccumulator>,
checkpoint_output: Box<dyn CheckpointOutput>,
certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
metrics: Arc<CheckpointMetrics>,
Expand Down Expand Up @@ -2503,15 +2509,17 @@ mod tests {
let checkpoint_store = CheckpointStore::new(ckpt_dir.path());
let epoch_store = state.epoch_store_for_testing();

let accumulator =
StateAccumulator::new(state.get_accumulator_store().clone(), &epoch_store);
let accumulator = Arc::new(StateAccumulator::new_for_tests(
state.get_accumulator_store().clone(),
&epoch_store,
));

let (checkpoint_service, _exit) = CheckpointService::spawn(
state.clone(),
checkpoint_store,
epoch_store.clone(),
store,
Arc::new(accumulator),
Arc::downgrade(&accumulator),
Box::new(output),
Box::new(certified_output),
CheckpointMetrics::new_for_tests(),
Expand Down
Loading
Loading