Skip to content

Commit

Permalink
[typed store] iterators: migrate unbounded_iter callsites
Browse files Browse the repository at this point in the history
  • Loading branch information
phoenix-o committed Feb 26, 2025
1 parent 8d3abb7 commit c10efb8
Show file tree
Hide file tree
Showing 22 changed files with 133 additions and 182 deletions.
5 changes: 4 additions & 1 deletion crates/sui-bridge/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ impl BridgeOrchestratorTables {
}

pub fn get_all_pending_actions(&self) -> HashMap<BridgeActionDigest, BridgeAction> {
self.pending_actions.unbounded_iter().collect()
self.pending_actions
.safe_iter()
.collect::<Result<HashMap<_, _>, _>>()
.expect("failed to get all pending actions")
}

pub fn get_sui_event_cursors(
Expand Down
6 changes: 4 additions & 2 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3310,7 +3310,9 @@ impl AuthorityState {
);

if cfg!(debug_assertions) {
cur_epoch_store.check_all_executed_transactions_in_checkpoint();
cur_epoch_store
.check_all_executed_transactions_in_checkpoint()
.expect("failed to check all executed transactions in checkpoint");
}

if let Err(err) = self
Expand Down Expand Up @@ -5318,7 +5320,7 @@ impl AuthorityState {
expensive_safety_check_config,
cur_epoch_store.get_chain_identifier(),
epoch_last_checkpoint,
);
)?;
self.epoch_store.store(new_epoch_store.clone());
Ok(new_epoch_store)
}
Expand Down
65 changes: 35 additions & 30 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,11 +669,12 @@ impl AuthorityEpochTables {
Ok(state)
}

pub fn get_all_pending_consensus_transactions(&self) -> Vec<ConsensusTransaction> {
self.pending_consensus_transactions
.unbounded_iter()
.map(|(_k, v)| v)
.collect()
pub fn get_all_pending_consensus_transactions(&self) -> SuiResult<Vec<ConsensusTransaction>> {
Ok(self
.pending_consensus_transactions
.safe_iter()
.map(|item| item.map(|(_k, v)| v))
.collect::<Result<Vec<_>, _>>()?)
}

pub fn reset_db_for_execution_since_genesis(&self) -> SuiResult {
Expand Down Expand Up @@ -779,19 +780,19 @@ impl AuthorityPerEpochStore {
expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
chain_identifier: ChainIdentifier,
highest_executed_checkpoint: CheckpointSequenceNumber,
) -> Arc<Self> {
) -> SuiResult<Arc<Self>> {
let current_time = Instant::now();
let epoch_id = committee.epoch;

let tables = AuthorityEpochTables::open(epoch_id, parent_path, db_options.clone());
let end_of_publish =
StakeAggregator::from_iter(committee.clone(), tables.end_of_publish.unbounded_iter());
StakeAggregator::from_iter(committee.clone(), tables.end_of_publish.safe_iter())?;
let reconfig_state = tables
.load_reconfig_state()
.expect("Load reconfig state at initialization cannot fail");

let epoch_alive_notify = NotifyOnce::new();
let pending_consensus_transactions = tables.get_all_pending_consensus_transactions();
let pending_consensus_transactions = tables.get_all_pending_consensus_transactions()?;
let pending_consensus_certificates: HashSet<_> = pending_consensus_transactions
.iter()
.filter_map(|transaction| {
Expand Down Expand Up @@ -876,7 +877,8 @@ impl AuthorityPerEpochStore {

let mut jwk_aggregator = JwkAggregator::new(committee.clone());

for ((authority, id, jwk), _) in tables.pending_jwks.unbounded_iter() {
for item in tables.pending_jwks.safe_iter() {
let ((authority, id, jwk), _) = item?;
jwk_aggregator.insert(authority, (id, jwk));
}

Expand All @@ -885,6 +887,10 @@ impl AuthorityPerEpochStore {
let consensus_output_cache =
ConsensusOutputCache::new(&epoch_start_configuration, &tables, metrics.clone());

let execution_time_observations = tables
.execution_time_observations
.safe_iter()
.collect::<Result<Vec<_>, _>>()?;
let execution_time_estimator = ExecutionTimeEstimator::new(
committee.clone(),
// Load observations stored at end of previous epoch.
Expand All @@ -894,16 +900,13 @@ impl AuthorityPerEpochStore {
&*object_store,
)
// Load observations stored during the current epoch.
.chain(
tables
.execution_time_observations
.unbounded_iter()
.flat_map(|((generation, source), observations)| {
observations
.into_iter()
.map(move |(key, duration)| (source, generation, key, duration))
}),
),
.chain(execution_time_observations.into_iter().flat_map(
|((generation, source), observations)| {
observations
.into_iter()
.map(move |(key, duration)| (source, generation, key, duration))
},
)),
);

let s = Arc::new(Self {
Expand Down Expand Up @@ -947,7 +950,7 @@ impl AuthorityPerEpochStore {
});

s.update_buffer_stake_metric();
s
Ok(s)
}

pub fn tables(&self) -> SuiResult<Arc<AuthorityEpochTables>> {
Expand Down Expand Up @@ -1061,7 +1064,7 @@ impl AuthorityPerEpochStore {
expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
chain_identifier: ChainIdentifier,
previous_epoch_last_checkpoint: CheckpointSequenceNumber,
) -> Arc<Self> {
) -> SuiResult<Arc<Self>> {
assert_eq!(self.epoch() + 1, new_committee.epoch);
self.record_reconfig_halt_duration_metric();
self.record_epoch_total_duration_metric();
Expand Down Expand Up @@ -1105,6 +1108,7 @@ impl AuthorityPerEpochStore {
self.chain_identifier,
previous_epoch_last_checkpoint,
)
.expect("failed to create new authority per epoch store")
}

pub fn committee(&self) -> &Arc<Committee> {
Expand Down Expand Up @@ -1610,9 +1614,9 @@ impl AuthorityPerEpochStore {
Ok(self
.tables()?
.pending_execution
.unbounded_iter()
.map(|(_, cert)| cert.into())
.collect())
.safe_iter()
.map(|item| item.map(|(_, cert)| cert.into()))
.collect::<Result<Vec<_>, _>>()?)
}

/// Called when transaction outputs are committed to disk
Expand Down Expand Up @@ -1648,6 +1652,7 @@ impl AuthorityPerEpochStore {
self.tables()
.expect("recovery should not cross epoch boundary")
.get_all_pending_consensus_transactions()
.expect("failed to get pending consensus transactions")
}

#[cfg(test)]
Expand Down Expand Up @@ -4386,25 +4391,25 @@ impl AuthorityPerEpochStore {
self.signature_verifier.clear_signature_cache();
}

pub(crate) fn check_all_executed_transactions_in_checkpoint(&self) {
pub(crate) fn check_all_executed_transactions_in_checkpoint(&self) -> SuiResult<()> {
let tables = self.tables().unwrap();

info!("Verifying that all executed transactions are in a checkpoint");

let mut executed_iter = tables.executed_in_epoch.unbounded_iter();
let mut checkpointed_iter = tables.executed_transactions_to_checkpoint.unbounded_iter();
let mut executed_iter = tables.executed_in_epoch.safe_iter();
let mut checkpointed_iter = tables.executed_transactions_to_checkpoint.safe_iter();

// verify that the two iterators (which are both sorted) are identical
loop {
let executed = executed_iter.next();
let checkpointed = checkpointed_iter.next();
let executed = executed_iter.next().transpose()?;
let checkpointed = checkpointed_iter.next().transpose()?;
match (executed, checkpointed) {
(Some((left, ())), Some((right, _))) => {
if left != right {
panic!("Executed transactions and checkpointed transactions do not match: {:?} {:?}", left, right);
}
}
(None, None) => break,
(None, None) => break Ok(()),
(left, right) => panic!(
"Executed transactions and checkpointed transactions do not match: {:?} {:?}",
left, right
Expand Down
12 changes: 0 additions & 12 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1768,18 +1768,6 @@ impl AuthorityStore {
Ok(())
}

#[cfg(msim)]
pub fn remove_all_versions_of_object(&self, object_id: ObjectID) {
let entries: Vec<_> = self
.perpetual_tables
.objects
.unbounded_iter()
.filter_map(|(key, _)| if key.0 == object_id { Some(key) } else { None })
.collect();
info!("Removing all versions of object: {:?}", entries);
self.perpetual_tables.objects.multi_remove(entries).unwrap();
}

// Counts the number of versions exist in object store for `object_id`. This includes tombstone.
#[cfg(msim)]
pub fn count_object_versions(&self, object_id: ObjectID) -> usize {
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-core/src/authority/authority_store_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,9 +842,9 @@ mod tests {
&ReadWriteOptions::default(),
false,
)?;
let iter = objects.unbounded_iter();
for (k, _v) in iter {
after_pruning.insert(k);
let iter = objects.safe_iter();
for item in iter {
after_pruning.insert(item?.0);
}
Ok(after_pruning)
}
Expand Down
10 changes: 5 additions & 5 deletions crates/sui-core/src/authority/authority_store_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,12 +415,12 @@ impl AuthorityPerpetualTables {
}

pub fn database_is_empty(&self) -> SuiResult<bool> {
Ok(self.objects.unbounded_iter().next().is_none())
Ok(self.objects.safe_iter().next().is_none())
}

pub fn iter_live_object_set(&self, include_wrapped_object: bool) -> LiveSetIter<'_> {
LiveSetIter {
iter: self.objects.unbounded_iter(),
iter: self.objects.safe_iter(),
tables: self,
prev: None,
include_wrapped_object,
Expand All @@ -437,7 +437,7 @@ impl AuthorityPerpetualTables {
let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);

LiveSetIter {
iter: self.objects.iter_with_bounds(lower_bound, upper_bound),
iter: self.objects.safe_iter_with_bounds(lower_bound, upper_bound),
tables: self,
prev: None,
include_wrapped_object,
Expand Down Expand Up @@ -541,7 +541,7 @@ impl ObjectStore for AuthorityPerpetualTables {

pub struct LiveSetIter<'a> {
iter:
<DBMap<ObjectKey, StoreObjectWrapper> as Map<'a, ObjectKey, StoreObjectWrapper>>::Iterator,
<DBMap<ObjectKey, StoreObjectWrapper> as Map<'a, ObjectKey, StoreObjectWrapper>>::SafeIterator,
tables: &'a AuthorityPerpetualTables,
prev: Option<(ObjectKey, StoreObjectWrapper)>,
/// Whether a wrapped object is considered as a live object.
Expand Down Expand Up @@ -615,7 +615,7 @@ impl Iterator for LiveSetIter<'_> {

fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some((next_key, next_value)) = self.iter.next() {
if let Some(Ok((next_key, next_value))) = self.iter.next() {
let prev = self.prev.take();
self.prev = Some((next_key, next_value));

Expand Down
3 changes: 2 additions & 1 deletion crates/sui-core/src/authority/test_authority_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ impl<'a> TestAuthorityBuilder<'a> {
.get_highest_executed_checkpoint_seq_number()
.unwrap()
.unwrap_or(0),
);
)
.expect("failed to create authority per epoch store");
let committee_store = Arc::new(CommitteeStore::new(
path.join("epochs"),
&genesis_committee,
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-core/src/authority/transaction_deferral.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ mod object_cost_tests {
}

let mut previous_future_round = 0;
for (key, _) in db.deferred_certs.unbounded_iter() {
match key {
for item in db.deferred_certs.safe_iter() {
match item.unwrap().0 {
DeferralKey::Randomness { .. } => (),
DeferralKey::ConsensusRound { future_round, .. } => {
assert!(previous_future_round <= future_round);
Expand Down
15 changes: 12 additions & 3 deletions crates/sui-core/src/epoch/committee_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ impl CommitteeStore {
tables,
cache: RwLock::new(HashMap::new()),
};
if store.database_is_empty() {
if store
.database_is_empty()
.expect("CommitteeStore initialization failed")
{
store
.init_genesis_committee(genesis_committee.clone())
.expect("Init genesis committee data must not fail");
Expand Down Expand Up @@ -126,7 +129,13 @@ impl CommitteeStore {
.map_err(Into::into)
}

fn database_is_empty(&self) -> bool {
self.tables.committee_map.unbounded_iter().next().is_none()
fn database_is_empty(&self) -> SuiResult<bool> {
Ok(self
.tables
.committee_map
.safe_iter()
.next()
.transpose()?
.is_none())
}
}
12 changes: 7 additions & 5 deletions crates/sui-core/src/stake_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ use sui_types::base_types::AuthorityName;
use sui_types::base_types::ConciseableName;
use sui_types::committee::{Committee, CommitteeTrait, StakeUnit};
use sui_types::crypto::{AuthorityQuorumSignInfo, AuthoritySignInfo, AuthoritySignInfoTrait};
use sui_types::error::SuiError;
use sui_types::error::{SuiError, SuiResult};
use sui_types::message_envelope::{Envelope, Message};
use tracing::warn;
use typed_store::TypedStoreError;

/// StakeAggregator allows us to keep track of the total stake of a set of validators.
/// STRENGTH indicates whether we want a strong quorum (2f+1) or a weak quorum (f+1).
Expand All @@ -38,15 +39,16 @@ impl<S: Clone + Eq, const STRENGTH: bool> StakeAggregator<S, STRENGTH> {
}
}

pub fn from_iter<I: Iterator<Item = (AuthorityName, S)>>(
pub fn from_iter<I: Iterator<Item = Result<(AuthorityName, S), TypedStoreError>>>(
committee: Arc<Committee>,
data: I,
) -> Self {
) -> SuiResult<Self> {
let mut this = Self::new(committee);
for (authority, s) in data {
for item in data {
let (authority, s) = item?;
this.insert_generic(authority, s);
}
this
Ok(this)
}

/// A generic version of inserting arbitrary type of V (e.g. void type).
Expand Down
6 changes: 4 additions & 2 deletions crates/sui-core/src/transaction_orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,9 @@ where
info!("Skipping loading pending transactions from pending_tx_log.");
return;
}
let pending_txes = pending_tx_log.load_all_pending_transactions();
let pending_txes = pending_tx_log
.load_all_pending_transactions()
.expect("failed to load all pending transactions");
info!(
"Recovering {} pending transactions from pending_tx_log.",
pending_txes.len()
Expand Down Expand Up @@ -518,7 +520,7 @@ where
});
}

pub fn load_all_pending_transactions(&self) -> Vec<VerifiedTransaction> {
pub fn load_all_pending_transactions(&self) -> SuiResult<Vec<VerifiedTransaction>> {
self.pending_tx_log.load_all_pending_transactions()
}
}
Expand Down
6 changes: 4 additions & 2 deletions crates/sui-core/src/verify_indexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ pub fn verify_indexes(store: &dyn AccumulatorStore, indexes: Arc<IndexStore>) ->
tracing::info!("Live objects set is prepared, about to verify indexes");

// Verify Owner Index
for (key, info) in indexes.tables().owner_index().unbounded_iter() {
for item in indexes.tables().owner_index().safe_iter() {
let (key, info) = item?;
let calculated_info = owner_index.remove(&key).ok_or_else(|| {
anyhow!(
"owner_index: found extra, unexpected entry {:?}",
Expand All @@ -66,7 +67,8 @@ pub fn verify_indexes(store: &dyn AccumulatorStore, indexes: Arc<IndexStore>) ->
tracing::info!("Owner index is good");

// Verify Coin Index
for (key, info) in indexes.tables().coin_index().unbounded_iter() {
for item in indexes.tables().coin_index().safe_iter() {
let (key, info) = item?;
let calculated_info = coin_index.remove(&key).ok_or_else(|| {
anyhow!(
"coin_index: found extra, unexpected entry {:?}",
Expand Down
Loading

0 comments on commit c10efb8

Please sign in to comment.