Skip to content

Commit

Permalink
Check system overload using txn age in queue (#13639)
Browse files Browse the repository at this point in the history
## Description 

This PR enhances our current system overload detection to also include
the scenario where the transactions depending on an object are not great
in the count, but lengthy in execution. More precisely, we keep track of
how long each transaction has been in the execution queue and use this
age as a proxy for the hotness of the object when determining whether to
reject a transaction or not.

I also added a field in the node config to make this threshold
configurable and easier to test. We can also move other similar numbers
here as well.

## Test Plan 

Added a test.

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] protocol change
- [x] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes

Enhance our validators' protection mechanism against congestion to stop
accepting transactions when one of the following happens:
- There are long-running transactions waiting for execution in a certain
object's queue
- There are a large number of transactions waiting in a certain object's
queue, or in general
  • Loading branch information
emmazzz authored Oct 27, 2023
1 parent e04067c commit 23c0280
Show file tree
Hide file tree
Showing 12 changed files with 307 additions and 38 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions crates/sui-config/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ pub struct NodeConfig {

#[serde(default = "default_zklogin_oauth_providers")]
pub zklogin_oauth_providers: BTreeMap<Chain, BTreeSet<String>>,

#[serde(default = "default_overload_threshold_config")]
pub overload_threshold_config: OverloadThresholdConfig,
}

#[derive(Clone, Debug, Deserialize, Serialize, Default)]
Expand Down Expand Up @@ -671,6 +674,29 @@ pub struct TransactionKeyValueStoreWriteConfig {
pub concurrency: usize,
}

/// Configuration for the threshold(s) at which we consider the system
/// to be overloaded. When one of the threshold is passed, the node may
/// stop processing new transactions and/or certificates until the congestion
/// resolves.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct OverloadThresholdConfig {
pub max_txn_age_in_queue: Duration,
// TODO: Move other thresholds here as well, including `MAX_TM_QUEUE_LENGTH`
// and `MAX_PER_OBJECT_QUEUE_LENGTH`.
}

impl Default for OverloadThresholdConfig {
fn default() -> Self {
Self {
max_txn_age_in_queue: Duration::from_secs(1), // 1 second
}
}
}

fn default_overload_threshold_config() -> OverloadThresholdConfig {
OverloadThresholdConfig::default()
}

#[derive(Debug, Clone, PartialEq, Deserialize, Serialize, Eq)]
pub struct Genesis {
#[serde(flatten)]
Expand Down
1 change: 1 addition & 0 deletions crates/sui-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ enum_dispatch.workspace = true
eyre.workspace = true
futures.workspace = true
im.workspace = true
indexmap.workspace = true
itertools.workspace = true
lru.workspace = true
num_cpus.workspace = true
Expand Down
14 changes: 12 additions & 2 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use std::{
sync::Arc,
thread,
};
use sui_config::node::StateDebugDumpConfig;
use sui_config::node::{OverloadThresholdConfig, StateDebugDumpConfig};
use sui_config::NodeConfig;
use sui_types::execution::DynamicallyLoadedObjectMetadata;
use tap::{TapFallible, TapOptional};
Expand Down Expand Up @@ -649,6 +649,9 @@ pub struct AuthorityState {

/// Config for state dumping on forks
debug_dump_config: StateDebugDumpConfig,

/// Config for when we consider the node overloaded.
overload_threshold_config: OverloadThresholdConfig,
}

/// The authority state encapsulates all state, drives execution, and ensures safety.
Expand All @@ -674,6 +677,10 @@ impl AuthorityState {
self.committee_store.clone()
}

pub fn max_txn_age_in_queue(&self) -> Duration {
self.overload_threshold_config.max_txn_age_in_queue
}

pub fn get_epoch_state_commitments(
&self,
epoch: EpochId,
Expand Down Expand Up @@ -800,7 +807,8 @@ impl AuthorityState {
consensus_adapter: &Arc<ConsensusAdapter>,
tx_data: &SenderSignedData,
) -> SuiResult {
self.transaction_manager.check_execution_overload(tx_data)?;
self.transaction_manager
.check_execution_overload(self.max_txn_age_in_queue(), tx_data)?;
consensus_adapter.check_consensus_overload()?;
Ok(())
}
Expand Down Expand Up @@ -2011,6 +2019,7 @@ impl AuthorityState {
certificate_deny_config: CertificateDenyConfig,
indirect_objects_threshold: usize,
debug_dump_config: StateDebugDumpConfig,
overload_threshold_config: OverloadThresholdConfig,
archive_readers: ArchiveReaderBalancer,
) -> Arc<Self> {
Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
Expand Down Expand Up @@ -2062,6 +2071,7 @@ impl AuthorityState {
transaction_deny_config,
certificate_deny_config,
debug_dump_config,
overload_threshold_config,
});

// Start a task to execute ready certificates.
Expand Down
10 changes: 9 additions & 1 deletion crates/sui-core/src/authority/test_authority_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ use std::sync::Arc;
use sui_archival::reader::ArchiveReaderBalancer;
use sui_config::certificate_deny_config::CertificateDenyConfig;
use sui_config::genesis::Genesis;
use sui_config::node::StateDebugDumpConfig;
use sui_config::node::{
AuthorityStorePruningConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
};
use sui_config::node::{OverloadThresholdConfig, StateDebugDumpConfig};
use sui_config::transaction_deny_config::TransactionDenyConfig;
use sui_macros::nondeterministic;
use sui_protocol_config::{ProtocolConfig, SupportedProtocolVersions};
Expand Down Expand Up @@ -52,6 +52,7 @@ pub struct TestAuthorityBuilder<'a> {
accounts: Vec<AccountConfig>,
/// By default, we don't insert the genesis checkpoint, which isn't needed by most tests.
insert_genesis_checkpoint: bool,
overload_threshold_config: Option<OverloadThresholdConfig>,
}

impl<'a> TestAuthorityBuilder<'a> {
Expand Down Expand Up @@ -143,6 +144,11 @@ impl<'a> TestAuthorityBuilder<'a> {
self
}

pub fn with_overload_threshold_config(mut self, config: OverloadThresholdConfig) -> Self {
assert!(self.overload_threshold_config.replace(config).is_none());
self
}

pub async fn build(self) -> Arc<AuthorityState> {
let local_network_config =
sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
Expand Down Expand Up @@ -236,6 +242,7 @@ impl<'a> TestAuthorityBuilder<'a> {
};
let transaction_deny_config = self.transaction_deny_config.unwrap_or_default();
let certificate_deny_config = self.certificate_deny_config.unwrap_or_default();
let overload_threshold_config = self.overload_threshold_config.unwrap_or_default();
let state = AuthorityState::new(
name,
secret,
Expand All @@ -256,6 +263,7 @@ impl<'a> TestAuthorityBuilder<'a> {
StateDebugDumpConfig {
dump_file_directory: Some(tempdir().unwrap().into_path()),
},
overload_threshold_config,
ArchiveReaderBalancer::default(),
)
.await;
Expand Down
65 changes: 45 additions & 20 deletions crates/sui-core/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::authority::{AuthorityState, EffectsNotifyRead};
use crate::authority::{
test_authority_builder::TestAuthorityBuilder, AuthorityState, EffectsNotifyRead,
};
use crate::authority_aggregator::{AuthorityAggregator, TimeoutConfig};
use crate::epoch::committee_store::CommitteeStore;
use crate::state_accumulator::StateAccumulator;
use crate::test_authority_clients::LocalAuthorityClient;
use fastcrypto::hash::MultisetHash;
use fastcrypto::traits::KeyPair;
use futures::future::join_all;
use move_core_types::account_address::AccountAddress;
use move_core_types::ident_str;
use prometheus::Registry;
Expand All @@ -18,6 +21,7 @@ use std::sync::Arc;
use std::time::Duration;
use sui_config::genesis::Genesis;
use sui_config::local_ip_utils;
use sui_config::node::OverloadThresholdConfig;
use sui_framework::BuiltInFramework;
use sui_genesis_builder::validator_info::ValidatorInfo;
use sui_move_build::{BuildConfig, CompiledPackage, SuiPackageHooks};
Expand Down Expand Up @@ -266,43 +270,64 @@ pub async fn init_local_authorities(
ObjectID,
) {
let (genesis, key_pairs, framework) = init_genesis(committee_size, genesis_objects).await;
let (aggregator, authorities) = init_local_authorities_with_genesis(&genesis, key_pairs).await;
let authorities = join_all(key_pairs.iter().map(|(_, key_pair)| {
TestAuthorityBuilder::new()
.with_genesis_and_keypair(&genesis, key_pair)
.build()
}))
.await;
let aggregator = init_local_authorities_with_genesis(&genesis, authorities.clone()).await;
(aggregator, authorities, genesis, framework)
}

pub async fn init_local_authorities_with_genesis(
genesis: &Genesis,
key_pairs: Vec<(AuthorityPublicKeyBytes, AuthorityKeyPair)>,
pub async fn init_local_authorities_with_overload_thresholds(
committee_size: usize,
genesis_objects: Vec<Object>,
overload_thresholds: OverloadThresholdConfig,
) -> (
AuthorityAggregator<LocalAuthorityClient>,
Vec<Arc<AuthorityState>>,
Genesis,
ObjectID,
) {
let (genesis, key_pairs, framework) = init_genesis(committee_size, genesis_objects).await;
let authorities = join_all(key_pairs.iter().map(|(_, key_pair)| {
TestAuthorityBuilder::new()
.with_genesis_and_keypair(&genesis, key_pair)
.with_overload_threshold_config(overload_thresholds.clone())
.build()
}))
.await;
let aggregator = init_local_authorities_with_genesis(&genesis, authorities.clone()).await;
(aggregator, authorities, genesis, framework)
}

pub async fn init_local_authorities_with_genesis(
genesis: &Genesis,
authorities: Vec<Arc<AuthorityState>>,
) -> AuthorityAggregator<LocalAuthorityClient> {
telemetry_subscribers::init_for_testing();
let committee = genesis.committee().unwrap();

let mut clients = BTreeMap::new();
let mut states = Vec::new();
for (authority_name, secret) in key_pairs {
let client = LocalAuthorityClient::new(secret, genesis).await;
states.push(client.state.clone());
clients.insert(authority_name, client);
for state in authorities {
let name = state.name;
let client = LocalAuthorityClient::new_from_authority(state);
clients.insert(name, client);
}
let timeouts = TimeoutConfig {
pre_quorum_timeout: Duration::from_secs(5),
post_quorum_timeout: Duration::from_secs(5),
serial_authority_request_interval: Duration::from_secs(1),
};
let committee_store = Arc::new(CommitteeStore::new_for_testing(&committee));
(
AuthorityAggregator::new_with_timeouts(
committee,
committee_store,
clients,
&Registry::new(),
Arc::new(HashMap::new()),
timeouts,
),
states,
AuthorityAggregator::new_with_timeouts(
committee,
committee_store,
clients,
&Registry::new(),
Arc::new(HashMap::new()),
timeouts,
)
}

Expand Down
53 changes: 40 additions & 13 deletions crates/sui-core/src/transaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ use std::{
cmp::max,
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
sync::Arc,
time::{Duration, Instant},
};

use indexmap::IndexMap;
use lru::LruCache;
use mysten_metrics::monitored_scope;
use parking_lot::RwLock;
Expand Down Expand Up @@ -246,10 +248,10 @@ struct Inner {
// Maps input objects to transactions waiting for locks on the object.
lock_waiters: HashMap<InputKey, LockQueue>,

// Number of transactions that depend on each object ID. Should match exactly with total
// number of transactions per object ID prefix in the missing_inputs table.
// Stores age info for all transactions depending on each object.
// Used for throttling signing and submitting transactions depending on hot objects.
input_objects: HashMap<ObjectID, usize>,
// An `IndexMap` is used to ensure that the insertion order is preserved.
input_objects: HashMap<ObjectID, IndexMap<TransactionDigest, Instant>>,

// Maps object IDs to the highest observed sequence number of the object. When the value is
// None, indicates that the object is immutable, corresponding to an InputKey with no sequence
Expand Down Expand Up @@ -312,7 +314,7 @@ impl Inner {
return ready_certificates;
}

let input_count = self
let input_txns = self
.input_objects
.get_mut(&input_key.id())
.unwrap_or_else(|| {
Expand All @@ -321,8 +323,12 @@ impl Inner {
input_key.id()
)
});
*input_count -= digests.len();
if *input_count == 0 {
for digest in digests.iter() {
// The digest of the transaction must be inside the map.
assert!(input_txns.shift_remove(digest).is_some());
}

if input_txns.is_empty() {
self.input_objects.remove(&input_key.id());
}

Expand Down Expand Up @@ -665,8 +671,8 @@ impl TransactionManager {
}
if acquire {
pending_cert.acquiring_locks.insert(key, lock_mode);
let input_count = inner.input_objects.entry(key.id()).or_default();
*input_count += 1;
let input_txns = inner.input_objects.entry(key.id()).or_default();
input_txns.insert(digest, Instant::now());
} else {
pending_cert.acquired_locks.insert(key, lock_mode);
}
Expand Down Expand Up @@ -836,14 +842,20 @@ impl TransactionManager {
.map(|cert| cert.acquiring_locks.keys().cloned().collect())
}

// Returns the number of transactions waiting on each object ID.
pub(crate) fn objects_queue_len(&self, keys: Vec<ObjectID>) -> Vec<(ObjectID, usize)> {
// Returns the number of transactions waiting on each object ID, as well as the age of the oldest transaction in the queue.
pub(crate) fn objects_queue_len_and_age(
&self,
keys: Vec<ObjectID>,
) -> Vec<(ObjectID, usize, Option<Duration>)> {
let inner = self.inner.read();
keys.into_iter()
.map(|key| {
let default_map = IndexMap::new();
let txns = inner.input_objects.get(&key).unwrap_or(&default_map);
(
key,
inner.input_objects.get(&key).cloned().unwrap_or_default(),
txns.len(),
txns.first().map(|(_, time)| time.elapsed()),
)
})
.collect()
Expand All @@ -862,7 +874,11 @@ impl TransactionManager {
*inner = Inner::new(new_epoch, self.metrics.clone());
}

pub(crate) fn check_execution_overload(&self, tx_data: &SenderSignedData) -> SuiResult {
pub(crate) fn check_execution_overload(
&self,
txn_age_threshold: Duration,
tx_data: &SenderSignedData,
) -> SuiResult {
// Too many transactions are pending execution.
let inflight_queue_len = self.inflight_queue_len();
fp_ensure!(
Expand All @@ -873,7 +889,7 @@ impl TransactionManager {
}
);

for (object_id, queue_len) in self.objects_queue_len(
for (object_id, queue_len, txn_age) in self.objects_queue_len_and_age(
tx_data
.transaction_data()
.input_objects()?
Expand All @@ -890,6 +906,17 @@ impl TransactionManager {
threshold: MAX_PER_OBJECT_QUEUE_LENGTH,
}
);
if let Some(age) = txn_age {
// Check that we don't have a txn that has been waiting for a long time in the queue.
fp_ensure!(
age < txn_age_threshold,
SuiError::TooOldTransactionPendingOnObject {
object_id,
txn_age_sec: age.as_secs(),
threshold: txn_age_threshold.as_secs(),
}
);
}
}
Ok(())
}
Expand Down
Loading

1 comment on commit 23c0280

@vercel
Copy link

@vercel vercel bot commented on 23c0280 Oct 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.