Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cleanup] remove remaining logic for input object fastpath #14188

Merged
merged 2 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/sui-benchmark/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,6 @@ impl ValidatorProxy for LocalValidatorAggregatorProxy {
let QuorumDriverResponse {
effects_cert,
events,
..
} = resp;
return Ok(ExecutionEffects::CertifiedTransactionEffects(
effects_cert.into(),
Expand Down
1 change: 0 additions & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,6 @@ impl AuthorityState {
// this function, in order to prevent a byzantine validator from
// giving us incorrect effects.
effects: &VerifiedCertifiedTransactionEffects,
_objects: Vec<Object>,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult {
assert!(self.is_fullnode(epoch_store));
Expand Down
34 changes: 0 additions & 34 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,40 +835,6 @@ impl AuthorityStore {
self.bulk_insert_genesis_objects(objects).await
}

/// Insert objects directly into the object table, but do not touch other tables.
/// This is used in fullnode to insert objects from validators certificate handling response
/// in fast path execution.
/// This is best-efforts. If the object needs to be stored as an indirect object then we
/// do not insert this object at all.
///
/// Caveat: if an Object is regularly inserted as an indirect object in the stiore, but the threshold
/// changes in the fullnode which causes it to be considered as non-indirect, and only inserted
/// to the object store, this would cause the reference counting to be incorrect.
///
/// TODO: handle this in a more resilient way.
pub(crate) fn _fullnode_fast_path_insert_objects_to_object_store_maybe(
&self,
objects: &Vec<Object>,
) -> SuiResult {
let mut write_batch = self.perpetual_tables.objects.batch();

for obj in objects {
let StoreObjectPair(store_object, indirect_object) =
get_store_object_pair(obj.clone(), self.indirect_objects_threshold);
// Do not insert to store if the object needs to stored as indirect object too.
if indirect_object.is_some() {
continue;
}
write_batch.insert_batch(
&self.perpetual_tables.objects,
std::iter::once((ObjectKey(obj.id(), obj.version()), store_object)),
)?;
}

write_batch.write()?;
Ok(())
}

/// This function should only be used for initializing genesis and should remain private.
async fn bulk_insert_genesis_objects(&self, objects: &[Object]) -> SuiResult<()> {
let mut batch = self.perpetual_tables.objects.batch();
Expand Down
39 changes: 6 additions & 33 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use prometheus::{
register_int_counter_vec_with_registry, register_int_counter_with_registry,
register_int_gauge_with_registry, IntCounter, IntCounterVec, IntGauge, Registry,
};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::string::ToString;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -389,7 +389,6 @@ struct ProcessCertificateState {
non_retryable_stake: StakeUnit,
non_retryable_errors: Vec<(SuiError, Vec<AuthorityName>, StakeUnit)>,
retryable_errors: Vec<(SuiError, Vec<AuthorityName>, StakeUnit)>,
object_map: HashMap<TransactionEffectsDigest, HashSet<Object>>,
// As long as none of the exit criteria are met we consider the state retryable
// 1) >= 2f+1 signatures
// 2) >= f+1 non-retryable errors
Expand Down Expand Up @@ -1573,16 +1572,11 @@ where
&self,
certificate: CertifiedTransaction,
) -> Result<
(
VerifiedCertifiedTransactionEffects,
TransactionEvents,
Vec<Object>,
),
(VerifiedCertifiedTransactionEffects, TransactionEvents),
AggregatorProcessCertificateError,
> {
let state = ProcessCertificateState {
effects_map: MultiStakeAggregator::new(self.committee.clone()),
object_map: HashMap::new(),
non_retryable_stake: 0,
non_retryable_errors: vec![],
retryable_errors: vec![],
Expand Down Expand Up @@ -1748,18 +1742,12 @@ where
state: &mut ProcessCertificateState,
response: SuiResult<HandleCertificateResponseV2>,
name: AuthorityName,
) -> SuiResult<
Option<(
VerifiedCertifiedTransactionEffects,
TransactionEvents,
Vec<Object>,
)>,
> {
) -> SuiResult<Option<(VerifiedCertifiedTransactionEffects, TransactionEvents)>> {
match response {
Ok(HandleCertificateResponseV2 {
signed_effects,
events,
fastpath_input_objects,
..
}) => {
debug!(
?tx_digest,
Expand All @@ -1768,7 +1756,7 @@ where
);
let effects_digest = *signed_effects.digest();
// Note: here we aggregate votes by the hash of the effects structure
let result = match state.effects_map.insert(
match state.effects_map.insert(
(signed_effects.epoch(), effects_digest),
signed_effects.clone(),
) {
Expand Down Expand Up @@ -1796,25 +1784,10 @@ where
);
ct.verify(&committee).map(|ct| {
debug!(?tx_digest, "Got quorum for validators handle_certificate.");
let fastpath_input_objects =
state.object_map.remove(&effects_digest).unwrap_or_default();
Some((ct, events, fastpath_input_objects.into_iter().collect()))
Some((ct, events))
})
}
};
if result.is_ok() {
// We verified the objects' relevance and content's integrity in `safe_client.rs`
// based on the effects. Only responses with legit objects will reach here.
// Therefore, as long as we have quorum on effects, we have quorum on objects.
// One thing to note is objects may be missing in some responses e.g. validators are on
// different code versions, but this is fine as long as their content is correct.
state
.object_map
.entry(effects_digest)
.or_default()
.extend(fastpath_input_objects.into_iter());
}
result
}
Err(err) => Err(err),
}
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl AuthorityAPI for NetworkAuthorityClient {
return Ok(HandleCertificateResponseV2 {
signed_effects: response.signed_effects,
events: response.events,
fastpath_input_objects: vec![], // fastpath is unused for now
fastpath_input_objects: vec![], // unused field
});
}
response.map_err(Into::into)
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ impl ValidatorService {
return Ok(Some(HandleCertificateResponseV2 {
signed_effects: signed_effects.into_inner(),
events,
fastpath_input_objects: vec![], // fastpath is unused for now
fastpath_input_objects: vec![], // unused field
}));
}

Expand Down Expand Up @@ -447,7 +447,7 @@ impl ValidatorService {
Ok(Some(HandleCertificateResponseV2 {
signed_effects: effects.into_inner(),
events,
fastpath_input_objects: vec![], // fastpath is unused for now
fastpath_input_objects: vec![], // unused field
}))
}
}
Expand Down
4 changes: 1 addition & 3 deletions crates/sui-core/src/quorum_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ where
let auth_agg = self.validators.load();
let _cert_guard = GaugeGuard::acquire(&auth_agg.metrics.inflight_certificates);
let tx_digest = *certificate.digest();
let (effects, events, objects) = auth_agg
let (effects, events) = auth_agg
.process_certificate(certificate.clone())
.instrument(tracing::debug_span!("aggregator_process_cert", ?tx_digest))
.await
Expand Down Expand Up @@ -439,7 +439,6 @@ where
let response = QuorumDriverResponse {
effects_cert: effects,
events,
objects,
};

Ok(response)
Expand Down Expand Up @@ -699,7 +698,6 @@ where
let response = QuorumDriverResponse {
effects_cert,
events,
objects: vec![],
};
quorum_driver.notify(&transaction, &Ok(response), old_retry_times + 1);
return;
Expand Down
31 changes: 1 addition & 30 deletions crates/sui-core/src/safe_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::epoch::committee_store::CommitteeStore;
use mysten_metrics::histogram::{Histogram, HistogramVec};
use prometheus::core::GenericCounter;
use prometheus::{register_int_counter_vec_with_registry, IntCounterVec, Registry};
use std::collections::HashSet;
use std::sync::Arc;
use sui_types::crypto::AuthorityPublicKeyBytes;
use sui_types::effects::{SignedTransactionEffects, TransactionEffectsAPI};
Expand Down Expand Up @@ -345,38 +344,10 @@ where
let signed_effects =
self.check_signed_effects_plain(digest, response.signed_effects, None)?;

// For now, validators only pass back input shared object.
let fastpath_input_objects = if !response.fastpath_input_objects.is_empty() {
let input_shared_objects = signed_effects
.input_shared_objects()
.into_iter()
.map(|(obj_ref, _kind)| obj_ref)
.collect::<HashSet<_>>();
for object in &response.fastpath_input_objects {
let obj_ref = object.compute_object_reference();
if !input_shared_objects.contains(&obj_ref) {
error!(tx_digest=?digest, name=?self.address, ?obj_ref, "Object returned from HandleCertificateResponseV2 is not in the input shared objects of the transaction");
return Err(SuiError::ByzantineAuthoritySuspicion {
authority: self.address,
reason: format!(
"Object {:?} returned from HandleCertificateResponseV2 is not in the input shared objects of tx: {:?}",
obj_ref, digest
),
});
}
}
response
.fastpath_input_objects
.into_iter()
.collect::<Vec<_>>()
} else {
vec![]
};

Ok(HandleCertificateResponseV2 {
signed_effects,
events: response.events,
fastpath_input_objects,
fastpath_input_objects: vec![], // unused field
})
}

Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/test_authority_clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl LocalAuthorityClient {
Ok(HandleCertificateResponseV2 {
signed_effects,
events,
fastpath_input_objects: vec![],
fastpath_input_objects: vec![], // unused field
})
}
}
Expand Down
14 changes: 0 additions & 14 deletions crates/sui-core/src/transaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -712,20 +712,6 @@ impl TransactionManager {
Ok(())
}

/// Notifies TransactionManager that the given objects are available in the objects table.
/// Useful when transactions associated with the objects are not known, e.g. after checking
/// object availability from storage, or for testing.
pub(crate) fn _fastpath_objects_available(
&self,
input_keys: Vec<InputKey>,
epoch_store: &AuthorityPerEpochStore,
) {
let mut inner = self.inner.write();
let _scope = monitored_scope("TransactionManager::objects_available::wlock");
self.objects_available_locked(&mut inner, epoch_store, input_keys, false);
inner.maybe_shrink_capacity();
}

#[cfg(test)]
pub(crate) fn objects_available(
&self,
Expand Down
20 changes: 2 additions & 18 deletions crates/sui-core/src/transaction_orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use sui_types::base_types::TransactionDigest;
use sui_types::effects::{TransactionEffectsAPI, VerifiedCertifiedTransactionEffects};
use sui_types::error::{SuiError, SuiResult};
use sui_types::executable_transaction::VerifiedExecutableTransaction;
use sui_types::object::Object;
use sui_types::quorum_driver_types::{
ExecuteTransactionRequest, ExecuteTransactionRequestType, ExecuteTransactionResponse,
FinalizedEffects, QuorumDriverEffectsQueueResult, QuorumDriverError, QuorumDriverResponse,
Expand Down Expand Up @@ -223,11 +222,7 @@ where
Ok(Err(err)) => Err(err),
Ok(Ok(response)) => {
good_response_metrics.inc();
let QuorumDriverResponse {
effects_cert,
objects,
..
} = response;
let QuorumDriverResponse { effects_cert, .. } = response;
if !wait_for_local_execution {
return Ok(ExecuteTransactionResponse::EffectsCert(Box::new((
FinalizedEffects::new_from_effects_cert(effects_cert.into()),
Expand All @@ -245,7 +240,6 @@ where
&self.validator_state,
&executable_tx,
&effects_cert,
objects,
&self.metrics,
)
.await
Expand Down Expand Up @@ -312,7 +306,6 @@ where
validator_state: &Arc<AuthorityState>,
transaction: &VerifiedExecutableTransaction,
effects_cert: &VerifiedCertifiedTransactionEffects,
objects: Vec<Object>,
metrics: &TransactionOrchestratorMetrics,
) -> SuiResult {
let epoch_store = validator_state.load_epoch_store_one_call_per_task();
Expand Down Expand Up @@ -347,7 +340,6 @@ where
validator_state.fullnode_execute_certificate_with_effects(
transaction,
effects_cert,
objects,
&epoch_store,
),
)
Expand Down Expand Up @@ -391,14 +383,7 @@ where
) {
loop {
match effects_receiver.recv().await {
Ok(Ok((
transaction,
QuorumDriverResponse {
effects_cert,
objects,
..
},
))) => {
Ok(Ok((transaction, QuorumDriverResponse { effects_cert, .. }))) => {
let tx_digest = transaction.digest();
if let Err(err) = pending_transaction_log.finish_transaction(tx_digest) {
panic!(
Expand Down Expand Up @@ -436,7 +421,6 @@ where
&validator_state,
&executable_tx,
&effects_cert,
objects,
&metrics,
)
.await;
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-e2e-tests/tests/dynamic_committee_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl StressTestRunner {
.programmable(pt)
.build(),
);
let (effects, _, _) = self
let (effects, _) = self
.test_cluster
.execute_transaction_return_raw_effects(transaction)
.await
Expand Down
4 changes: 0 additions & 4 deletions crates/sui-e2e-tests/tests/full_node_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,6 @@ async fn test_full_node_transaction_orchestrator_basic() -> Result<(), anyhow::E
QuorumDriverResponse {
effects_cert: certified_txn_effects,
events: txn_events,
..
},
) = rx.recv().await.unwrap().unwrap();
let (cte, events, is_executed_locally) = *res;
Expand Down Expand Up @@ -877,7 +876,6 @@ async fn test_full_node_transaction_orchestrator_basic() -> Result<(), anyhow::E
QuorumDriverResponse {
effects_cert: certified_txn_effects,
events: txn_events,
..
},
) = rx.recv().await.unwrap().unwrap();
let (cte, events, is_executed_locally) = *res;
Expand Down Expand Up @@ -1273,10 +1271,8 @@ async fn test_pass_back_no_object() -> Result<(), anyhow::Error> {
QuorumDriverResponse {
effects_cert: _certified_txn_effects,
events: _txn_events,
objects,
},
) = rx.recv().await.unwrap().unwrap();
assert!(objects.is_empty(), "{objects:?}");
Ok(())
}

Expand Down
Loading
Loading