Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove timeout locks #6048

Merged
merged 4 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions beacon_node/beacon_chain/src/attestation_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
mod batch;

use crate::{
beacon_chain::VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT,
metrics,
observed_aggregates::{ObserveOutcome, ObservedAttestationKey},
observed_attesters::Error as ObservedAttestersError,
Expand Down Expand Up @@ -1174,10 +1173,7 @@ pub fn verify_attestation_signature<T: BeaconChainTypes>(
let signature_setup_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SIGNATURE_SETUP_TIMES);

let pubkey_cache = chain
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(BeaconChainError::ValidatorPubkeyCacheLockTimeout)?;
let pubkey_cache = chain.validator_pubkey_cache.read();

let fork = chain
.spec
Expand Down Expand Up @@ -1272,10 +1268,7 @@ pub fn verify_signed_aggregate_signatures<T: BeaconChainTypes>(
signed_aggregate: &SignedAggregateAndProof<T::EthSpec>,
indexed_attestation: &IndexedAttestation<T::EthSpec>,
) -> Result<bool, Error> {
let pubkey_cache = chain
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(BeaconChainError::ValidatorPubkeyCacheLockTimeout)?;
let pubkey_cache = chain.validator_pubkey_cache.read();

let aggregator_index = signed_aggregate.message().aggregator_index();
if aggregator_index >= pubkey_cache.len() as u64 {
Expand Down
15 changes: 3 additions & 12 deletions beacon_node/beacon_chain/src/attestation_verification/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ use super::{
CheckAttestationSignature, Error, IndexedAggregatedAttestation, IndexedUnaggregatedAttestation,
VerifiedAggregatedAttestation, VerifiedUnaggregatedAttestation,
};
use crate::{
beacon_chain::VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT, metrics, BeaconChain, BeaconChainError,
BeaconChainTypes,
};
use crate::{metrics, BeaconChain, BeaconChainError, BeaconChainTypes};
use bls::verify_signature_sets;
use state_processing::signature_sets::{
indexed_attestation_signature_set_from_pubkeys, signed_aggregate_selection_proof_signature_set,
Expand Down Expand Up @@ -60,10 +57,7 @@ where
let signature_setup_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_BATCH_AGG_SIGNATURE_SETUP_TIMES);

let pubkey_cache = chain
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(BeaconChainError::ValidatorPubkeyCacheLockTimeout)?;
let pubkey_cache = chain.validator_pubkey_cache.read();

let mut signature_sets = Vec::with_capacity(num_indexed * 3);
// Iterate, flattening to get only the `Ok` values.
Expand Down Expand Up @@ -169,10 +163,7 @@ where
&metrics::ATTESTATION_PROCESSING_BATCH_UNAGG_SIGNATURE_SETUP_TIMES,
);

let pubkey_cache = chain
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(BeaconChainError::ValidatorPubkeyCacheLockTimeout)?;
let pubkey_cache = chain.validator_pubkey_cache.read();

let mut signature_sets = Vec::with_capacity(num_partially_verified);

Expand Down
94 changes: 28 additions & 66 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache};
use crate::sync_committee_verification::{
Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution,
};
use crate::timeout_rw_lock::TimeoutRwLock;
use crate::validator_monitor::{
get_slot_delay_ms, timestamp_now, ValidatorMonitor,
HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS,
Expand Down Expand Up @@ -132,17 +131,6 @@ pub type ForkChoiceError = fork_choice::Error<crate::ForkChoiceStoreError>;
/// Alias to appease clippy.
type HashBlockTuple<E> = (Hash256, RpcBlock<E>);

/// The time-out before failure during an operation to take a read/write RwLock on the
/// attestation cache.
pub const ATTESTATION_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1);

/// The time-out before failure during an operation to take a read/write RwLock on the
/// validator pubkey cache.
pub const VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1);

/// The timeout for the eth1 finalization cache
pub const ETH1_FINALIZATION_CACHE_LOCK_TIMEOUT: Duration = Duration::from_millis(200);

// These keys are all zero because they get stored in different columns, see `DBColumn` type.
pub const BEACON_CHAIN_DB_KEY: Hash256 = Hash256::zero();
pub const OP_POOL_DB_KEY: Hash256 = Hash256::zero();
Expand Down Expand Up @@ -465,13 +453,13 @@ pub struct BeaconChain<T: BeaconChainTypes> {
/// Used to track the heads of the beacon chain.
pub(crate) head_tracker: Arc<HeadTracker>,
/// Caches the attester shuffling for a given epoch and shuffling key root.
pub shuffling_cache: TimeoutRwLock<ShufflingCache>,
pub shuffling_cache: RwLock<ShufflingCache>,
/// A cache of eth1 deposit data at epoch boundaries for deposit finalization
pub eth1_finalization_cache: TimeoutRwLock<Eth1FinalizationCache>,
pub eth1_finalization_cache: RwLock<Eth1FinalizationCache>,
/// Caches the beacon block proposer shuffling for a given epoch and shuffling key root.
pub beacon_proposer_cache: Arc<Mutex<BeaconProposerCache>>,
/// Caches a map of `validator_index -> validator_pubkey`.
pub(crate) validator_pubkey_cache: TimeoutRwLock<ValidatorPubkeyCache<T>>,
pub(crate) validator_pubkey_cache: RwLock<ValidatorPubkeyCache<T>>,
/// A cache used when producing attestations.
pub(crate) attester_cache: Arc<AttesterCache>,
/// A cache used when producing attestations whilst the head block is still being imported.
Expand Down Expand Up @@ -1472,10 +1460,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// May return an error if acquiring a read-lock on the `validator_pubkey_cache` times out.
pub fn validator_index(&self, pubkey: &PublicKeyBytes) -> Result<Option<usize>, Error> {
let pubkey_cache = self
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?;
let pubkey_cache = self.validator_pubkey_cache.read();

Ok(pubkey_cache.get_index(pubkey))
}
Expand All @@ -1488,10 +1473,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
validator_pubkeys: impl Iterator<Item = &'a PublicKeyBytes>,
) -> Result<Vec<u64>, Error> {
let pubkey_cache = self
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?;
let pubkey_cache = self.validator_pubkey_cache.read();

validator_pubkeys
.map(|pubkey| {
Expand All @@ -1516,10 +1498,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// May return an error if acquiring a read-lock on the `validator_pubkey_cache` times out.
pub fn validator_pubkey(&self, validator_index: usize) -> Result<Option<PublicKey>, Error> {
let pubkey_cache = self
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?;
let pubkey_cache = self.validator_pubkey_cache.read();

Ok(pubkey_cache.get(validator_index).cloned())
}
Expand All @@ -1529,10 +1508,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
validator_index: usize,
) -> Result<Option<PublicKeyBytes>, Error> {
let pubkey_cache = self
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?;
let pubkey_cache = self.validator_pubkey_cache.read();

Ok(pubkey_cache.get_pubkey_bytes(validator_index).copied())
}
Expand All @@ -1546,10 +1522,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
validator_indices: &[usize],
) -> Result<HashMap<usize, PublicKeyBytes>, Error> {
let pubkey_cache = self
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?;
let pubkey_cache = self.validator_pubkey_cache.read();

let mut map = HashMap::with_capacity(validator_indices.len());
for &validator_index in validator_indices {
Expand Down Expand Up @@ -3506,11 +3479,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// is so we don't have to think about lock ordering with respect to the fork choice lock.
// There are a bunch of places where we lock both fork choice and the pubkey cache and it
// would be difficult to check that they all lock fork choice first.
let mut ops = self
.validator_pubkey_cache
.try_write_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?
.import_new_pubkeys(&state)?;
let mut ops = {
let _timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_PUBKEY_CACHE_LOCK);
self.validator_pubkey_cache
.write()
.import_new_pubkeys(&state)?
};

// Apply the state to the attester cache, only if it is from the previous epoch or later.
//
Expand Down Expand Up @@ -4116,18 +4090,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
for relative_epoch in [RelativeEpoch::Current, RelativeEpoch::Next] {
let shuffling_id = AttestationShufflingId::new(block_root, state, relative_epoch)?;

let shuffling_is_cached = self
.shuffling_cache
.try_read_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or(Error::AttestationCacheLockTimeout)?
.contains(&shuffling_id);
let shuffling_is_cached = self.shuffling_cache.read().contains(&shuffling_id);

if !shuffling_is_cached {
state.build_committee_cache(relative_epoch, &self.spec)?;
let committee_cache = state.committee_cache(relative_epoch)?;
self.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or(Error::AttestationCacheLockTimeout)?
.write()
.insert_committee_cache(shuffling_id, committee_cache);
}
}
Expand Down Expand Up @@ -4174,14 +4143,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
)
};

if let Some(finalized_eth1_data) = self
.eth1_finalization_cache
.try_write_for(ETH1_FINALIZATION_CACHE_LOCK_TIMEOUT)
.and_then(|mut cache| {
cache.insert(checkpoint, eth1_finalization_data);
cache.finalize(&current_finalized_checkpoint)
})
{
let finalized_eth1_data = {
let mut cache = self.eth1_finalization_cache.write();
cache.insert(checkpoint, eth1_finalization_data);
cache.finalize(&current_finalized_checkpoint)
};
if let Some(finalized_eth1_data) = finalized_eth1_data {
if let Some(eth1_chain) = self.eth1_chain.as_ref() {
let finalized_deposit_count = finalized_eth1_data.deposit_count;
eth1_chain.finalize_eth1_data(finalized_eth1_data);
Expand Down Expand Up @@ -6365,15 +6332,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
})?;

// Obtain the shuffling cache, timing how long we wait.
let cache_wait_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SHUFFLING_CACHE_WAIT_TIMES);

let mut shuffling_cache = self
.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or(Error::AttestationCacheLockTimeout)?;

metrics::stop_timer(cache_wait_timer);
let mut shuffling_cache = {
let _ =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SHUFFLING_CACHE_WAIT_TIMES);
self.shuffling_cache.write()
};

if let Some(cache_item) = shuffling_cache.get(&shuffling_id) {
// The shuffling cache is no longer required, drop the write-lock to allow concurrent
Expand Down Expand Up @@ -6481,8 +6444,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let shuffling_decision_block = shuffling_id.shuffling_decision_block;

self.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or(Error::AttestationCacheLockTimeout)?
.write()
.insert_committee_cache(shuffling_id, &committee_cache);

metrics::stop_timer(committee_building_timer);
Expand Down
7 changes: 2 additions & 5 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use crate::observed_block_producers::SeenBlock;
use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS;
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::{
beacon_chain::{BeaconForkChoice, ForkChoiceError, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT},
beacon_chain::{BeaconForkChoice, ForkChoiceError},
metrics, BeaconChain, BeaconChainError, BeaconChainTypes,
};
use derivative::Derivative;
Expand Down Expand Up @@ -2096,10 +2096,7 @@ pub fn cheap_state_advance_to_obtain_committees<'a, E: EthSpec, Err: BlockBlobEr
pub fn get_validator_pubkey_cache<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
) -> Result<RwLockReadGuard<ValidatorPubkeyCache<T>>, BeaconChainError> {
chain
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(BeaconChainError::ValidatorPubkeyCacheLockTimeout)
Ok(chain.validator_pubkey_cache.read())
}

/// Produces an _empty_ `BlockSignatureVerifier`.
Expand Down
7 changes: 3 additions & 4 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::light_client_server_cache::LightClientServerCache;
use crate::migrate::{BackgroundMigrator, MigratorConfig};
use crate::persisted_beacon_chain::PersistedBeaconChain;
use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache};
use crate::timeout_rw_lock::TimeoutRwLock;
use crate::validator_monitor::{ValidatorMonitor, ValidatorMonitorConfig};
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::ChainConfig;
Expand Down Expand Up @@ -935,16 +934,16 @@ where
fork_choice_signal_rx,
event_handler: self.event_handler,
head_tracker,
shuffling_cache: TimeoutRwLock::new(ShufflingCache::new(
shuffling_cache: RwLock::new(ShufflingCache::new(
shuffling_cache_size,
head_shuffling_ids,
log.clone(),
)),
eth1_finalization_cache: TimeoutRwLock::new(Eth1FinalizationCache::new(log.clone())),
eth1_finalization_cache: RwLock::new(Eth1FinalizationCache::new(log.clone())),
beacon_proposer_cache,
block_times_cache: <_>::default(),
pre_finalization_block_cache: <_>::default(),
validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache),
validator_pubkey_cache: RwLock::new(validator_pubkey_cache),
attester_cache: <_>::default(),
early_attester_cache: <_>::default(),
reqresp_pre_import_cache: <_>::default(),
Expand Down
20 changes: 4 additions & 16 deletions beacon_node/beacon_chain/src/canonical_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
//! the head block root. This is unacceptable for fast-responding functions like the networking
//! stack.

use crate::beacon_chain::ATTESTATION_CACHE_LOCK_TIMEOUT;
use crate::persisted_fork_choice::PersistedForkChoice;
use crate::shuffling_cache::BlockShufflingIds;
use crate::{
Expand Down Expand Up @@ -817,21 +816,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
new_snapshot.beacon_block_root,
&new_snapshot.beacon_state,
) {
Ok(head_shuffling_ids) => {
self.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.map(|mut shuffling_cache| {
shuffling_cache.update_head_shuffling_ids(head_shuffling_ids)
})
.unwrap_or_else(|| {
error!(
self.log,
"Failed to obtain cache write lock";
"lock" => "shuffling_cache",
"task" => "update head shuffling decision root"
);
});
}
Ok(head_shuffling_ids) => self
.shuffling_cache
.write()
.update_head_shuffling_ids(head_shuffling_ids),
Err(e) => {
error!(
self.log,
Expand Down
3 changes: 0 additions & 3 deletions beacon_node/beacon_chain/src/chain_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ pub struct ChainConfig {
pub weak_subjectivity_checkpoint: Option<Checkpoint>,
/// Determine whether to reconstruct historic states, usually after a checkpoint sync.
pub reconstruct_historic_states: bool,
/// Whether timeouts on `TimeoutRwLock`s are enabled or not.
pub enable_lock_timeouts: bool,
/// The max size of a message that can be sent over the network.
pub max_network_size: usize,
/// Maximum percentage of the head committee weight at which to attempt re-orging the canonical head.
Expand Down Expand Up @@ -94,7 +92,6 @@ impl Default for ChainConfig {
import_max_skip_slots: None,
weak_subjectivity_checkpoint: None,
reconstruct_historic_states: false,
enable_lock_timeouts: true,
max_network_size: 10 * 1_048_576, // 10M
re_org_head_threshold: Some(DEFAULT_RE_ORG_HEAD_THRESHOLD),
re_org_parent_threshold: Some(DEFAULT_RE_ORG_PARENT_THRESHOLD),
Expand Down
2 changes: 0 additions & 2 deletions beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ pub mod state_advance_timer;
pub mod sync_committee_rewards;
pub mod sync_committee_verification;
pub mod test_utils;
mod timeout_rw_lock;
pub mod validator_monitor;
pub mod validator_pubkey_cache;

Expand Down Expand Up @@ -98,5 +97,4 @@ pub use state_processing::per_block_processing::errors::{
ExitValidationError, ProposerSlashingValidationError,
};
pub use store;
pub use timeout_rw_lock::TimeoutRwLock;
pub use types;
4 changes: 4 additions & 0 deletions beacon_node/beacon_chain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ lazy_static! {
"Time spent running fork choice's `get_head` during block import",
exponential_buckets(1e-3, 2.0, 8)
);
pub static ref BLOCK_PROCESSING_PUBKEY_CACHE_LOCK: Result<Histogram> = try_create_histogram(
"beacon_block_processing_pubkey_cache_lock_seconds",
"Time spent waiting or holding the pubkey cache write lock",
);
pub static ref BLOCK_SYNC_AGGREGATE_SET_BITS: Result<IntGauge> = try_create_int_gauge(
"block_sync_aggregate_set_bits",
"The number of true bits in the last sync aggregate in a block"
Expand Down
Loading
Loading