Skip to content

Commit

Permalink
[core] Make proper per epoch execution components
Browse files Browse the repository at this point in the history
  • Loading branch information
williampsmith committed Jun 17, 2024
1 parent 842cd17 commit 63703b0
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 69 deletions.
14 changes: 2 additions & 12 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ use crate::authority::authority_store_pruner::{
};
use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
use crate::authority::epoch_start_configuration::EpochStartConfiguration;
use crate::checkpoints::checkpoint_executor::CheckpointExecutor;
use crate::checkpoints::CheckpointStore;
use crate::consensus_adapter::ConsensusAdapter;
use crate::epoch::committee_store::CommitteeStore;
Expand Down Expand Up @@ -2834,7 +2833,6 @@ impl AuthorityState {
supported_protocol_versions: SupportedProtocolVersions,
new_committee: Committee,
epoch_start_configuration: EpochStartConfiguration,
checkpoint_executor: &CheckpointExecutor,
accumulator: Arc<StateAccumulator>,
expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
) -> SuiResult<Arc<AuthorityPerEpochStore>> {
Expand All @@ -2853,12 +2851,7 @@ impl AuthorityState {
.await?;
self.get_reconfig_api()
.clear_state_end_of_epoch(&execution_lock);
self.check_system_consistency(
cur_epoch_store,
checkpoint_executor,
accumulator,
expensive_safety_check_config,
);
self.check_system_consistency(cur_epoch_store, accumulator, expensive_safety_check_config);
self.maybe_reaccumulate_state_hash(
cur_epoch_store,
epoch_start_configuration
Expand Down Expand Up @@ -2942,7 +2935,6 @@ impl AuthorityState {
fn check_system_consistency(
&self,
cur_epoch_store: &AuthorityPerEpochStore,
checkpoint_executor: &CheckpointExecutor,
accumulator: Arc<StateAccumulator>,
expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
) {
Expand Down Expand Up @@ -2973,7 +2965,6 @@ impl AuthorityState {
cur_epoch_store.epoch()
);
self.expensive_check_is_consistent_state(
checkpoint_executor,
accumulator,
cur_epoch_store,
cfg!(debug_assertions), // panic in debug mode only
Expand All @@ -2990,7 +2981,6 @@ impl AuthorityState {

fn expensive_check_is_consistent_state(
&self,
checkpoint_executor: &CheckpointExecutor,
accumulator: Arc<StateAccumulator>,
cur_epoch_store: &AuthorityPerEpochStore,
panic: bool,
Expand Down Expand Up @@ -3028,7 +3018,7 @@ impl AuthorityState {
}

if !panic {
checkpoint_executor.set_inconsistent_state(is_inconsistent);
accumulator.set_inconsistent_state(is_inconsistent);
}
}

Expand Down
3 changes: 2 additions & 1 deletion crates/sui-core/src/authority/authority_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ pub async fn execute_certificate_with_execution_error(
// for testing and regression detection.
// We must do this before sending to consensus, otherwise consensus may already
// lead to transaction execution and state change.
let state_acc = StateAccumulator::new(authority.get_accumulator_store().clone(), &epoch_store);
let state_acc =
StateAccumulator::new_for_tests(authority.get_accumulator_store().clone(), &epoch_store);
let include_wrapped_tombstone = !authority
.epoch_store_for_testing()
.protocol_config()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ pub struct CheckpointExecutorMetrics {
pub checkpoint_transaction_count: Histogram,
pub checkpoint_contents_age_ms: Histogram,
pub last_executed_checkpoint_age_ms: Histogram,
pub accumulator_inconsistent_state: IntGauge,
}

impl CheckpointExecutorMetrics {
Expand Down Expand Up @@ -87,12 +86,6 @@ impl CheckpointExecutorMetrics {
"Age of the last executed checkpoint",
registry
),
accumulator_inconsistent_state: register_int_gauge_with_registry!(
"accumulator_inconsistent_state",
"1 if accumulated live object set differs from StateAccumulator root state hash for the previous epoch",
registry,
)
.unwrap(),
};
Arc::new(this)
}
Expand Down
27 changes: 9 additions & 18 deletions crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use either::Either;
use futures::stream::FuturesOrdered;
use itertools::izip;
use mysten_metrics::spawn_monitored_task;
use prometheus::Registry;
use sui_config::node::{CheckpointExecutorConfig, RunWithRange};
use sui_macros::{fail_point, fail_point_async};
use sui_types::accumulator::Accumulator;
Expand Down Expand Up @@ -68,7 +67,8 @@ use crate::{
};

mod data_ingestion_handler;
mod metrics;
pub mod metrics;

#[cfg(test)]
pub(crate) mod tests;

Expand Down Expand Up @@ -157,7 +157,7 @@ impl CheckpointExecutor {
state: Arc<AuthorityState>,
accumulator: Arc<StateAccumulator>,
config: CheckpointExecutorConfig,
prometheus_registry: &Registry,
metrics: Arc<CheckpointExecutorMetrics>,
) -> Self {
Self {
mailbox,
Expand All @@ -168,7 +168,7 @@ impl CheckpointExecutor {
tx_manager: state.transaction_manager().clone(),
accumulator,
config,
metrics: CheckpointExecutorMetrics::new(prometheus_registry),
metrics,
}
}

Expand All @@ -178,17 +178,14 @@ impl CheckpointExecutor {
state: Arc<AuthorityState>,
accumulator: Arc<StateAccumulator>,
) -> Self {
Self {
Self::new(
mailbox,
state: state.clone(),
checkpoint_store,
object_cache_reader: state.get_object_cache_reader().clone(),
transaction_cache_reader: state.get_transaction_cache_reader().clone(),
tx_manager: state.transaction_manager().clone(),
state,
accumulator,
config: Default::default(),
metrics: CheckpointExecutorMetrics::new_for_tests(),
}
Default::default(),
CheckpointExecutorMetrics::new_for_tests(),
)
}

/// Ensure that all checkpoints in the current epoch will be executed.
Expand Down Expand Up @@ -358,12 +355,6 @@ impl CheckpointExecutor {
}
}

pub fn set_inconsistent_state(&self, is_inconsistent_state: bool) {
self.metrics
.accumulator_inconsistent_state
.set(is_inconsistent_state as i64);
}

fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) {
// Ensure that we are not skipping checkpoints at any point
let seq = *checkpoint.sequence_number();
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-core/src/checkpoints/checkpoint_executor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ pub async fn test_checkpoint_executor_cross_epoch() {
EpochFlag::default_flags_for_new_epoch(&authority_state.config),
)
.unwrap(),
&executor,
accumulator,
&ExpensiveSafetyCheckConfig::default(),
)
Expand Down Expand Up @@ -394,7 +393,8 @@ async fn init_executor_test(
broadcast::channel(buffer_size);
let epoch_store = state.epoch_store_for_testing();

let accumulator = StateAccumulator::new(state.get_accumulator_store().clone(), &epoch_store);
let accumulator =
StateAccumulator::new_for_tests(state.get_accumulator_store().clone(), &epoch_store);
let accumulator = Arc::new(accumulator);

let executor = CheckpointExecutor::new_for_tests(
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2504,7 +2504,7 @@ mod tests {
let epoch_store = state.epoch_store_for_testing();

let accumulator =
StateAccumulator::new(state.get_accumulator_store().clone(), &epoch_store);
StateAccumulator::new_for_tests(state.get_accumulator_store().clone(), &epoch_store);

let (checkpoint_service, _exit) = CheckpointService::spawn(
state.clone(),
Expand Down
56 changes: 49 additions & 7 deletions crates/sui-core/src/state_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use itertools::Itertools;
use mysten_metrics::monitored_scope;
use prometheus::{register_int_gauge_with_registry, IntGauge, Registry};
use serde::Serialize;
use sui_protocol_config::ProtocolConfig;
use sui_types::base_types::{ObjectID, ObjectRef, SequenceNumber, VersionNumber};
Expand All @@ -25,17 +26,37 @@ use sui_types::messages_checkpoint::{CheckpointSequenceNumber, ECMHLiveObjectSet
use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
use crate::authority::authority_store_tables::LiveObject;

pub struct StateAccumulatorMetrics {
inconsistent_state: IntGauge,
}

impl StateAccumulatorMetrics {
pub fn new(registry: &Registry) -> Arc<Self> {
let this = Self {
inconsistent_state: register_int_gauge_with_registry!(
"accumulator_inconsistent_state",
"1 if accumulated live object set differs from StateAccumulator root state hash for the previous epoch",
registry
)
.unwrap(),
};
Arc::new(this)
}
}

pub enum StateAccumulator {
V1(StateAccumulatorV1),
V2(StateAccumulatorV2),
}

pub struct StateAccumulatorV1 {
store: Arc<dyn AccumulatorStore>,
metrics: Arc<StateAccumulatorMetrics>,
}

pub struct StateAccumulatorV2 {
store: Arc<dyn AccumulatorStore>,
metrics: Arc<StateAccumulatorMetrics>,
}

pub trait AccumulatorStore: ObjectStore + Send + Sync {
Expand Down Expand Up @@ -366,16 +387,37 @@ impl StateAccumulator {
pub fn new(
store: Arc<dyn AccumulatorStore>,
epoch_store: &Arc<AuthorityPerEpochStore>,
metrics: Arc<StateAccumulatorMetrics>,
) -> Self {
if cfg!(msim) {
if epoch_store.state_accumulator_v2_enabled() {
return StateAccumulator::V2(StateAccumulatorV2::new(store));
return StateAccumulator::V2(StateAccumulatorV2::new(store, metrics));
} else {
return StateAccumulator::V1(StateAccumulatorV1::new(store));
return StateAccumulator::V1(StateAccumulatorV1::new(store, metrics));
}
}

StateAccumulator::V1(StateAccumulatorV1::new(store))
StateAccumulator::V1(StateAccumulatorV1::new(store, metrics))
}

pub fn new_for_tests(
store: Arc<dyn AccumulatorStore>,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> Self {
Self::new(
store,
epoch_store,
StateAccumulatorMetrics::new(&Registry::new()),
)
}

pub fn set_inconsistent_state(&self, is_inconsistent_state: bool) {
match self {
StateAccumulator::V1(impl_v1) => &impl_v1.metrics,
StateAccumulator::V2(impl_v2) => &impl_v2.metrics,
}
.inconsistent_state
.set(is_inconsistent_state as i64);
}

/// Accumulates the effects of a single checkpoint and persists the accumulator.
Expand Down Expand Up @@ -528,8 +570,8 @@ impl StateAccumulator {
}

impl StateAccumulatorV1 {
pub fn new(store: Arc<dyn AccumulatorStore>) -> Self {
Self { store }
pub fn new(store: Arc<dyn AccumulatorStore>, metrics: Arc<StateAccumulatorMetrics>) -> Self {
Self { store, metrics }
}

/// Unions all checkpoint accumulators at the end of the epoch to generate the
Expand Down Expand Up @@ -618,8 +660,8 @@ impl StateAccumulatorV1 {
}

impl StateAccumulatorV2 {
pub fn new(store: Arc<dyn AccumulatorStore>) -> Self {
Self { store }
pub fn new(store: Arc<dyn AccumulatorStore>, metrics: Arc<StateAccumulatorMetrics>) -> Self {
Self { store, metrics }
}

pub async fn accumulate_running_root(
Expand Down
3 changes: 2 additions & 1 deletion crates/sui-core/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ pub async fn send_and_confirm_transaction(
//
// We also check the incremental effects of the transaction on the live object set against StateAccumulator
// for testing and regression detection
let state_acc = StateAccumulator::new(authority.get_accumulator_store().clone(), &epoch_store);
let state_acc =
StateAccumulator::new_for_tests(authority.get_accumulator_store().clone(), &epoch_store);
let include_wrapped_tombstone = !authority
.epoch_store_for_testing()
.protocol_config()
Expand Down
3 changes: 2 additions & 1 deletion crates/sui-core/src/unit_tests/narwhal_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ async fn send_transactions(
pub fn checkpoint_service_for_testing(state: Arc<AuthorityState>) -> Arc<CheckpointService> {
let (output, _result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
let epoch_store = state.epoch_store_for_testing();
let accumulator = StateAccumulator::new(state.get_accumulator_store().clone(), &epoch_store);
let accumulator =
StateAccumulator::new_for_tests(state.get_accumulator_store().clone(), &epoch_store);
let (certified_output, _certified_result) = mpsc::channel::<CertifiedCheckpointSummary>(10);

let (checkpoint_service, _) = CheckpointService::spawn(
Expand Down
Loading

0 comments on commit 63703b0

Please sign in to comment.