diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 7f09430227f..0595d53c072 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -723,13 +723,6 @@ impl BeaconChain { Ok(()) } - pub fn persist_data_availability_checker(&self) -> Result<(), Error> { - let _timer = metrics::start_timer(&metrics::PERSIST_DATA_AVAILABILITY_CHECKER); - self.data_availability_checker.persist_all()?; - - Ok(()) - } - /// Returns the slot _right now_ according to `self.slot_clock`. Returns `Err` if the slot is /// unavailable. /// @@ -6753,7 +6746,6 @@ impl Drop for BeaconChain { let drop = || -> Result<(), Error> { self.persist_head_and_fork_choice()?; self.persist_op_pool()?; - self.persist_data_availability_checker()?; self.persist_eth1_cache() }; diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index e0347d81c39..2431769ddb0 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -2,7 +2,7 @@ use crate::blob_verification::{verify_kzg_for_blob_list, GossipVerifiedBlob, Kzg use crate::block_verification_types::{ AvailabilityPendingExecutedBlock, AvailableExecutedBlock, RpcBlock, }; -use crate::data_availability_checker::overflow_lru_cache::OverflowLRUCache; +use crate::data_availability_checker::overflow_lru_cache::DataAvailabilityCheckerInner; use crate::{BeaconChain, BeaconChainTypes, BeaconStore}; use kzg::Kzg; use slog::{debug, error, Logger}; @@ -33,12 +33,32 @@ pub const OVERFLOW_LRU_CAPACITY: NonZeroUsize = new_non_zero_usize(1024); pub const STATE_LRU_CAPACITY_NON_ZERO: NonZeroUsize = new_non_zero_usize(2); pub const STATE_LRU_CAPACITY: usize = STATE_LRU_CAPACITY_NON_ZERO.get(); -/// This includes a cache for any blocks or blobs that have been received over gossip or RPC -/// and are awaiting more components before they can be imported. Additionally the -/// `DataAvailabilityChecker` is responsible for KZG verification of block components as well as -/// checking whether a "availability check" is required at all. +/// Cache to hold fully valid data that can't be imported to fork-choice yet. After Dencun hard-fork +/// blocks have a sidecar of data that is received separately from the network. We call the concept +/// of a block "becoming available" when all of its import dependencies are inserted into this +/// cache. +/// +/// Usually a block becomes available on its slot within a second of receiving its first component +/// over gossip. However, a block may never become available if a malicious proposer does not +/// publish its data, or there are network issues that prevent us from receiving it. If the block +/// does not become available after some time we can safely forget about it. Consider these two +/// cases: +/// +/// - Global unavailability: If nobody has received the block components it's likely that the +/// proposer never made the block available. So we can safely forget about the block as it will +/// never become available. +/// - Local unavailability: Some fraction of the network has received all block components, but not us. +/// Some of our peers will eventually attest to a descendant of that block and lookup sync will +/// fetch its components. Therefore it's not strictly necessary to hold to the partially available +/// block for too long as we can recover from other peers. +/// +/// Even in periods of non-finality, the proposer is expected to publish the block's data +/// immediately. Because this cache only holds fully valid data, its capacity is bound to 1 block +/// per slot and fork: before inserting into this cache we check the proposer signature and correct +/// proposer. Having a capacity > 1 is an optimization to prevent sync lookup from having re-fetch +/// data during moments of unstable network conditions. pub struct DataAvailabilityChecker { - availability_cache: Arc>, + availability_cache: Arc>, slot_clock: T::SlotClock, kzg: Option>, log: Logger, @@ -74,7 +94,8 @@ impl DataAvailabilityChecker { log: &Logger, spec: ChainSpec, ) -> Result { - let overflow_cache = OverflowLRUCache::new(OVERFLOW_LRU_CAPACITY, store, spec.clone())?; + let overflow_cache = + DataAvailabilityCheckerInner::new(OVERFLOW_LRU_CAPACITY, store, spec.clone())?; Ok(Self { availability_cache: Arc::new(overflow_cache), slot_clock, @@ -329,15 +350,9 @@ impl DataAvailabilityChecker { }) } - /// Persist all in memory components to disk - pub fn persist_all(&self) -> Result<(), AvailabilityCheckError> { - self.availability_cache.write_all_to_disk() - } - /// Collects metrics from the data availability checker. pub fn metrics(&self) -> DataAvailabilityCheckerMetrics { DataAvailabilityCheckerMetrics { - num_store_entries: self.availability_cache.num_store_entries(), state_cache_size: self.availability_cache.state_cache_size(), block_cache_size: self.availability_cache.block_cache_size(), } @@ -346,7 +361,6 @@ impl DataAvailabilityChecker { /// Helper struct to group data availability checker metrics. pub struct DataAvailabilityCheckerMetrics { - pub num_store_entries: usize, pub state_cache_size: usize, pub block_cache_size: usize, } @@ -372,7 +386,7 @@ pub fn start_availability_cache_maintenance_service( async fn availability_cache_maintenance_service( chain: Arc>, - overflow_cache: Arc>, + overflow_cache: Arc>, ) { let epoch_duration = chain.slot_clock.slot_duration() * T::EthSpec::slots_per_epoch() as u32; loop { diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index adc1a1e202c..3c05eba5eae 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -1,32 +1,3 @@ -//! This module implements a LRU cache for storing partially available blocks and blobs. -//! When the cache overflows, the least recently used items are persisted to the database. -//! This prevents lighthouse from using too much memory storing unfinalized blocks and blobs -//! if the chain were to lose finality. -//! -//! ## Deadlock safety -//! -//! The main object in this module is the `OverflowLruCache`. It contains two locks: -//! -//! - `self.critical` is an `RwLock` that protects content stored in memory. -//! - `self.maintenance_lock` is held when moving data between memory and disk. -//! -//! You mostly need to ensure that you don't try to hold the critical lock more than once -//! -//! ## Basic Algorithm -//! -//! As blocks and blobs come in from the network, their components are stored in memory in -//! this cache. When a block becomes fully available, it is removed from the cache and -//! imported into fork-choice. Blocks/blobs that remain unavailable will linger in the -//! cache until they are older than the finalized epoch or older than the data availability -//! cutoff. In the event the chain is not finalizing, the cache will eventually overflow and -//! the least recently used items will be persisted to disk. When this happens, we will still -//! store the hash of the block in memory so we always know we have data for that block -//! without needing to check the database. -//! -//! When the client is shut down, all pending components are persisted in the database. -//! On startup, the keys of these components are stored in memory and will be loaded in -//! the cache when they are accessed. - use super::state_lru_cache::{DietAvailabilityPendingExecutedBlock, StateLRUCache}; use crate::beacon_chain::BeaconStore; use crate::blob_verification::KzgVerifiedBlob; @@ -34,15 +5,13 @@ use crate::block_verification_types::{ AvailabilityPendingExecutedBlock, AvailableBlock, AvailableExecutedBlock, }; use crate::data_availability_checker::{Availability, AvailabilityCheckError}; -use crate::store::{DBColumn, KeyValueStore}; use crate::BeaconChainTypes; use lru::LruCache; -use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard}; -use ssz::{Decode, Encode}; +use parking_lot::RwLock; use ssz_derive::{Decode, Encode}; use ssz_types::{FixedVector, VariableList}; use std::num::NonZeroUsize; -use std::{collections::HashSet, sync::Arc}; +use std::sync::Arc; use types::blob_sidecar::BlobIdentifier; use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock}; @@ -243,318 +212,27 @@ impl PendingComponents { AvailableExecutedBlock::new(available_block, import_data, payload_verification_outcome), ))) } - - /// Returns the epoch of the block if it is cached, otherwise returns the epoch of the first blob. - pub fn epoch(&self) -> Option { - self.executed_block - .as_ref() - .map(|pending_block| pending_block.as_block().epoch()) - .or_else(|| { - for maybe_blob in self.verified_blobs.iter() { - if maybe_blob.is_some() { - return maybe_blob.as_ref().map(|kzg_verified_blob| { - kzg_verified_blob - .as_blob() - .slot() - .epoch(E::slots_per_epoch()) - }); - } - } - None - }) - } -} - -/// Blocks and blobs are stored in the database sequentially so that it's -/// fast to iterate over all the data for a particular block. -#[derive(Debug, PartialEq)] -enum OverflowKey { - Block(Hash256), - Blob(Hash256, u8), -} - -impl OverflowKey { - pub fn from_block_root(block_root: Hash256) -> Self { - Self::Block(block_root) - } - - pub fn from_blob_id( - blob_id: BlobIdentifier, - ) -> Result { - if blob_id.index > E::max_blobs_per_block() as u64 || blob_id.index > u8::MAX as u64 { - return Err(AvailabilityCheckError::BlobIndexInvalid(blob_id.index)); - } - Ok(Self::Blob(blob_id.block_root, blob_id.index as u8)) - } - - pub fn root(&self) -> &Hash256 { - match self { - Self::Block(root) => root, - Self::Blob(root, _) => root, - } - } -} - -/// A wrapper around BeaconStore that implements various -/// methods used for saving and retrieving blocks / blobs -/// from the store (for organization) -struct OverflowStore(BeaconStore); - -impl OverflowStore { - /// Store pending components in the database - pub fn persist_pending_components( - &self, - block_root: Hash256, - mut pending_components: PendingComponents, - ) -> Result<(), AvailabilityCheckError> { - let col = DBColumn::OverflowLRUCache; - - if let Some(block) = pending_components.executed_block.take() { - let key = OverflowKey::from_block_root(block_root); - self.0 - .hot_db - .put_bytes(col.as_str(), &key.as_ssz_bytes(), &block.as_ssz_bytes())? - } - - for blob in Vec::from(pending_components.verified_blobs) - .into_iter() - .flatten() - { - let key = OverflowKey::from_blob_id::(BlobIdentifier { - block_root, - index: blob.blob_index(), - })?; - - self.0 - .hot_db - .put_bytes(col.as_str(), &key.as_ssz_bytes(), &blob.as_ssz_bytes())? - } - - Ok(()) - } - - /// Load the pending components that we have in the database for a given block root - pub fn load_pending_components( - &self, - block_root: Hash256, - ) -> Result>, AvailabilityCheckError> { - // read everything from disk and reconstruct - let mut maybe_pending_components = None; - for res in self - .0 - .hot_db - .iter_raw_entries(DBColumn::OverflowLRUCache, block_root.as_bytes()) - { - let (key_bytes, value_bytes) = res?; - match OverflowKey::from_ssz_bytes(&key_bytes)? { - OverflowKey::Block(_) => { - maybe_pending_components - .get_or_insert_with(|| PendingComponents::empty(block_root)) - .executed_block = - Some(DietAvailabilityPendingExecutedBlock::from_ssz_bytes( - value_bytes.as_slice(), - )?); - } - OverflowKey::Blob(_, index) => { - *maybe_pending_components - .get_or_insert_with(|| PendingComponents::empty(block_root)) - .verified_blobs - .get_mut(index as usize) - .ok_or(AvailabilityCheckError::BlobIndexInvalid(index as u64))? = - Some(KzgVerifiedBlob::from_ssz_bytes(value_bytes.as_slice())?); - } - } - } - - Ok(maybe_pending_components) - } - - /// Returns the hashes of all the blocks we have any data for on disk - pub fn read_keys_on_disk(&self) -> Result, AvailabilityCheckError> { - let mut disk_keys = HashSet::new(); - for res in self.0.hot_db.iter_raw_keys(DBColumn::OverflowLRUCache, &[]) { - let key_bytes = res?; - disk_keys.insert(*OverflowKey::from_ssz_bytes(&key_bytes)?.root()); - } - Ok(disk_keys) - } - - /// Load a single blob from the database - pub fn load_blob( - &self, - blob_id: &BlobIdentifier, - ) -> Result>>, AvailabilityCheckError> { - let key = OverflowKey::from_blob_id::(*blob_id)?; - - self.0 - .hot_db - .get_bytes(DBColumn::OverflowLRUCache.as_str(), &key.as_ssz_bytes())? - .map(|blob_bytes| Arc::>::from_ssz_bytes(blob_bytes.as_slice())) - .transpose() - .map_err(|e| e.into()) - } - - /// Delete a set of keys from the database - pub fn delete_keys(&self, keys: &Vec) -> Result<(), AvailabilityCheckError> { - for key in keys { - self.0 - .hot_db - .key_delete(DBColumn::OverflowLRUCache.as_str(), &key.as_ssz_bytes())?; - } - Ok(()) - } -} - -/// This data stores the *critical* data that we need to keep in memory -/// protected by the RWLock -struct Critical { - /// This is the LRU cache of pending components - pub in_memory: LruCache>, - /// This holds all the roots of the blocks for which we have - /// `PendingComponents` in the database. - pub store_keys: HashSet, -} - -impl Critical { - pub fn new(capacity: NonZeroUsize) -> Self { - Self { - in_memory: LruCache::new(capacity), - store_keys: HashSet::new(), - } - } - - pub fn reload_store_keys( - &mut self, - overflow_store: &OverflowStore, - ) -> Result<(), AvailabilityCheckError> { - let disk_keys = overflow_store.read_keys_on_disk()?; - self.store_keys = disk_keys; - Ok(()) - } - - /// This only checks for the blobs in memory - pub fn peek_blob( - &self, - blob_id: &BlobIdentifier, - ) -> Result>>, AvailabilityCheckError> { - if let Some(pending_components) = self.in_memory.peek(&blob_id.block_root) { - Ok(pending_components - .verified_blobs - .get(blob_id.index as usize) - .ok_or(AvailabilityCheckError::BlobIndexInvalid(blob_id.index))? - .as_ref() - .map(|blob| blob.clone_blob())) - } else { - Ok(None) - } - } - - pub fn peek_pending_components( - &self, - block_root: &Hash256, - ) -> Option<&PendingComponents> { - self.in_memory.peek(block_root) - } - - /// Puts the pending components in the LRU cache. If the cache - /// is at capacity, the LRU entry is written to the store first - pub fn put_pending_components( - &mut self, - block_root: Hash256, - pending_components: PendingComponents, - overflow_store: &OverflowStore, - ) -> Result<(), AvailabilityCheckError> { - if self.in_memory.len() == self.in_memory.cap().get() { - // cache will overflow, must write lru entry to disk - if let Some((lru_key, lru_value)) = self.in_memory.pop_lru() { - overflow_store.persist_pending_components(lru_key, lru_value)?; - self.store_keys.insert(lru_key); - } - } - self.in_memory.put(block_root, pending_components); - Ok(()) - } - - /// Removes and returns the pending_components corresponding to - /// the `block_root` or `None` if it does not exist - pub fn pop_pending_components( - &mut self, - block_root: Hash256, - store: &OverflowStore, - ) -> Result>, AvailabilityCheckError> { - match self.in_memory.pop_entry(&block_root) { - Some((_, pending_components)) => Ok(Some(pending_components)), - None => { - // not in memory, is it in the store? - if self.store_keys.remove(&block_root) { - // We don't need to remove the data from the store as we have removed it from - // `store_keys` so we won't go looking for it on disk. The maintenance thread - // will remove it from disk the next time it runs. - store.load_pending_components(block_root) - } else { - Ok(None) - } - } - } - } - - /// Removes and returns the pending_components corresponding to - /// the `block_root` or `None` if it does not exist - pub fn remove_pending_components(&mut self, block_root: Hash256) { - match self.in_memory.pop_entry(&block_root) { - Some { .. } => {} - None => { - // not in memory, is it in the store? - // We don't need to remove the data from the store as we have removed it from - // `store_keys` so we won't go looking for it on disk. The maintenance thread - // will remove it from disk the next time it runs. - self.store_keys.remove(&block_root); - } - } - } - - /// Returns the number of pending component entries in memory. - pub fn num_blocks(&self) -> usize { - self.in_memory.len() - } - - /// Returns the number of entries that have overflowed to disk. - pub fn num_store_entries(&self) -> usize { - self.store_keys.len() - } } /// This is the main struct for this module. Outside methods should /// interact with the cache through this. -pub struct OverflowLRUCache { +pub struct DataAvailabilityCheckerInner { /// Contains all the data we keep in memory, protected by an RwLock - critical: RwLock>, - /// This is how we read and write components to the disk - overflow_store: OverflowStore, + critical: RwLock>>, /// This cache holds a limited number of states in memory and reconstructs them /// from disk when necessary. This is necessary until we merge tree-states state_cache: StateLRUCache, - /// Mutex to guard maintenance methods which move data between disk and memory - maintenance_lock: Mutex<()>, - /// The capacity of the LRU cache - capacity: NonZeroUsize, } -impl OverflowLRUCache { +impl DataAvailabilityCheckerInner { pub fn new( capacity: NonZeroUsize, beacon_store: BeaconStore, spec: ChainSpec, ) -> Result { - let overflow_store = OverflowStore(beacon_store.clone()); - let mut critical = Critical::new(capacity); - critical.reload_store_keys(&overflow_store)?; Ok(Self { - critical: RwLock::new(critical), - overflow_store, + critical: RwLock::new(LruCache::new(capacity)), state_cache: StateLRUCache::new(beacon_store, spec), - maintenance_lock: Mutex::new(()), - capacity, }) } @@ -565,7 +243,7 @@ impl OverflowLRUCache { ) -> Option>> { self.critical .read() - .peek_pending_components(block_root) + .peek(block_root) .and_then(|pending_components| { pending_components .executed_block @@ -579,12 +257,13 @@ impl OverflowLRUCache { &self, blob_id: &BlobIdentifier, ) -> Result>>, AvailabilityCheckError> { - let read_lock = self.critical.read(); - if let Some(blob) = read_lock.peek_blob(blob_id)? { - Ok(Some(blob)) - } else if read_lock.store_keys.contains(&blob_id.block_root) { - drop(read_lock); - self.overflow_store.load_blob(blob_id) + if let Some(pending_components) = self.critical.read().peek(&blob_id.block_root) { + Ok(pending_components + .verified_blobs + .get(blob_id.index as usize) + .ok_or(AvailabilityCheckError::BlobIndexInvalid(blob_id.index))? + .as_ref() + .map(|blob| blob.clone_blob())) } else { Ok(None) } @@ -595,7 +274,7 @@ impl OverflowLRUCache { block_root: &Hash256, f: F, ) -> R { - f(self.critical.read().peek_pending_components(block_root)) + f(self.critical.read().peek(block_root)) } pub fn put_kzg_verified_blobs>>( @@ -615,29 +294,22 @@ impl OverflowLRUCache { // Grab existing entry or create a new entry. let mut pending_components = write_lock - .pop_pending_components(block_root, &self.overflow_store)? + .pop_entry(&block_root) + .map(|(_, v)| v) .unwrap_or_else(|| PendingComponents::empty(block_root)); // Merge in the blobs. pending_components.merge_blobs(fixed_blobs); if pending_components.is_available() { - write_lock.put_pending_components( - block_root, - pending_components.clone(), - &self.overflow_store, - )?; + write_lock.put(block_root, pending_components.clone()); // No need to hold the write lock anymore drop(write_lock); pending_components.make_available(|diet_block| { self.state_cache.recover_pending_executed_block(diet_block) }) } else { - write_lock.put_pending_components( - block_root, - pending_components, - &self.overflow_store, - )?; + write_lock.put(block_root, pending_components); Ok(Availability::MissingComponents(block_root)) } } @@ -658,7 +330,8 @@ impl OverflowLRUCache { // Grab existing entry or create a new entry. let mut pending_components = write_lock - .pop_pending_components(block_root, &self.overflow_store)? + .pop_entry(&block_root) + .map(|(_, v)| v) .unwrap_or_else(|| PendingComponents::empty(block_root)); // Merge in the block. @@ -666,203 +339,29 @@ impl OverflowLRUCache { // Check if we have all components and entire set is consistent. if pending_components.is_available() { - write_lock.put_pending_components( - block_root, - pending_components.clone(), - &self.overflow_store, - )?; + write_lock.put(block_root, pending_components.clone()); // No need to hold the write lock anymore drop(write_lock); pending_components.make_available(|diet_block| { self.state_cache.recover_pending_executed_block(diet_block) }) } else { - write_lock.put_pending_components( - block_root, - pending_components, - &self.overflow_store, - )?; + write_lock.put(block_root, pending_components); Ok(Availability::MissingComponents(block_root)) } } pub fn remove_pending_components(&self, block_root: Hash256) { - self.critical.write().remove_pending_components(block_root); - } - - /// write all in memory objects to disk - pub fn write_all_to_disk(&self) -> Result<(), AvailabilityCheckError> { - let maintenance_lock = self.maintenance_lock.lock(); - let mut critical_lock = self.critical.write(); - - let mut swap_lru = LruCache::new(self.capacity); - std::mem::swap(&mut swap_lru, &mut critical_lock.in_memory); - - for (root, pending_components) in swap_lru.into_iter() { - self.overflow_store - .persist_pending_components(root, pending_components)?; - critical_lock.store_keys.insert(root); - } - - drop(critical_lock); - drop(maintenance_lock); - Ok(()) + self.critical.write().pop_entry(&block_root); } /// maintain the cache pub fn do_maintenance(&self, cutoff_epoch: Epoch) -> Result<(), AvailabilityCheckError> { - // ensure memory usage is below threshold - let threshold = self.capacity.get() * 3 / 4; - self.maintain_threshold(threshold, cutoff_epoch)?; - // clean up any keys on the disk that shouldn't be there - self.prune_disk(cutoff_epoch)?; // clean up any lingering states in the state cache self.state_cache.do_maintenance(cutoff_epoch); Ok(()) } - /// Enforce that the size of the cache is below a given threshold by - /// moving the least recently used items to disk. - fn maintain_threshold( - &self, - threshold: usize, - cutoff_epoch: Epoch, - ) -> Result<(), AvailabilityCheckError> { - // ensure only one thread at a time can be deleting things from the disk or - // moving things between memory and storage - let maintenance_lock = self.maintenance_lock.lock(); - - let mut stored = self.critical.read().in_memory.len(); - while stored > threshold { - let read_lock = self.critical.upgradable_read(); - let lru_entry = read_lock - .in_memory - .peek_lru() - .map(|(key, value)| (*key, value.clone())); - - let Some((lru_root, lru_pending_components)) = lru_entry else { - break; - }; - - if lru_pending_components - .epoch() - .map(|epoch| epoch < cutoff_epoch) - .unwrap_or(true) - { - // this data is no longer needed -> delete it - let mut write_lock = RwLockUpgradableReadGuard::upgrade(read_lock); - write_lock.in_memory.pop_entry(&lru_root); - stored = write_lock.in_memory.len(); - continue; - } else { - drop(read_lock); - } - - // write the lru entry to disk (we aren't holding any critical locks while we do this) - self.overflow_store - .persist_pending_components(lru_root, lru_pending_components)?; - // now that we've written to disk, grab the critical write lock - let mut write_lock = self.critical.write(); - if let Some((new_lru_root_ref, _)) = write_lock.in_memory.peek_lru() { - // need to ensure the entry we just wrote to disk wasn't updated - // while we were writing and is still the LRU entry - if *new_lru_root_ref == lru_root { - // it is still LRU entry -> delete it from memory & record that it's on disk - write_lock.in_memory.pop_entry(&lru_root); - write_lock.store_keys.insert(lru_root); - } - } - stored = write_lock.in_memory.len(); - drop(write_lock); - } - - drop(maintenance_lock); - Ok(()) - } - - /// Delete any data on disk that shouldn't be there. This can happen if - /// 1. The entry has been moved back to memory (or become fully available) - /// 2. The entry belongs to a block beyond the cutoff epoch - fn prune_disk(&self, cutoff_epoch: Epoch) -> Result<(), AvailabilityCheckError> { - // ensure only one thread at a time can be deleting things from the disk or - // moving things between memory and storage - let maintenance_lock = self.maintenance_lock.lock(); - - struct BlockData { - keys: Vec, - root: Hash256, - epoch: Epoch, - } - - let delete_if_outdated = |cache: &OverflowLRUCache, - block_data: Option| - -> Result<(), AvailabilityCheckError> { - let Some(block_data) = block_data else { - return Ok(()); - }; - let not_in_store_keys = !cache.critical.read().store_keys.contains(&block_data.root); - if not_in_store_keys { - // these keys aren't supposed to be on disk - cache.overflow_store.delete_keys(&block_data.keys)?; - } else { - // check this data is still relevant - if block_data.epoch < cutoff_epoch { - // this data is no longer needed -> delete it - self.overflow_store.delete_keys(&block_data.keys)?; - } - } - Ok(()) - }; - - let mut current_block_data: Option = None; - for res in self - .overflow_store - .0 - .hot_db - .iter_raw_entries(DBColumn::OverflowLRUCache, &[]) - { - let (key_bytes, value_bytes) = res?; - let overflow_key = OverflowKey::from_ssz_bytes(&key_bytes)?; - let current_root = *overflow_key.root(); - - match &mut current_block_data { - Some(block_data) if block_data.root == current_root => { - // still dealing with the same block - block_data.keys.push(overflow_key); - } - _ => { - // first time encountering data for this block - delete_if_outdated(self, current_block_data)?; - let current_epoch = match &overflow_key { - OverflowKey::Block(_) => { - DietAvailabilityPendingExecutedBlock::::from_ssz_bytes( - value_bytes.as_slice(), - )? - .as_block() - .epoch() - } - OverflowKey::Blob(_, _) => { - KzgVerifiedBlob::::from_ssz_bytes(value_bytes.as_slice())? - .as_blob() - .slot() - .epoch(T::EthSpec::slots_per_epoch()) - } - }; - current_block_data = Some(BlockData { - keys: vec![overflow_key], - root: current_root, - epoch: current_epoch, - }); - } - } - } - // can't fall off the end - delete_if_outdated(self, current_block_data)?; - - drop(maintenance_lock); - Ok(()) - } - #[cfg(test)] /// get the state cache for inspection (used only for tests) pub fn state_lru_cache(&self) -> &StateLRUCache { @@ -876,74 +375,7 @@ impl OverflowLRUCache { /// Number of pending component entries in memory in the cache. pub fn block_cache_size(&self) -> usize { - self.critical.read().num_blocks() - } - - /// Returns the number of entries in the cache that have overflowed to disk. - pub fn num_store_entries(&self) -> usize { - self.critical.read().num_store_entries() - } -} - -impl ssz::Encode for OverflowKey { - fn is_ssz_fixed_len() -> bool { - true - } - - fn ssz_append(&self, buf: &mut Vec) { - match self { - OverflowKey::Block(block_hash) => { - block_hash.ssz_append(buf); - buf.push(0u8) - } - OverflowKey::Blob(block_hash, index) => { - block_hash.ssz_append(buf); - buf.push(*index + 1) - } - } - } - - fn ssz_fixed_len() -> usize { - ::ssz_fixed_len() + 1 - } - - fn ssz_bytes_len(&self) -> usize { - match self { - Self::Block(root) => root.ssz_bytes_len() + 1, - Self::Blob(root, _) => root.ssz_bytes_len() + 1, - } - } -} - -impl ssz::Decode for OverflowKey { - fn is_ssz_fixed_len() -> bool { - true - } - - fn ssz_fixed_len() -> usize { - ::ssz_fixed_len() + 1 - } - - fn from_ssz_bytes(bytes: &[u8]) -> Result { - let len = bytes.len(); - let h256_len = ::ssz_fixed_len(); - let expected = h256_len + 1; - - if len != expected { - Err(ssz::DecodeError::InvalidByteLength { len, expected }) - } else { - let root_bytes = bytes - .get(..h256_len) - .ok_or(ssz::DecodeError::OutOfBoundsByte { i: 0 })?; - let block_root = Hash256::from_ssz_bytes(root_bytes)?; - let id_byte = *bytes - .get(h256_len) - .ok_or(ssz::DecodeError::OutOfBoundsByte { i: h256_len })?; - match id_byte { - 0 => Ok(OverflowKey::Block(block_root)), - n => Ok(OverflowKey::Blob(block_root, n - 1)), - } - } + self.critical.read().len() } } @@ -962,8 +394,7 @@ mod test { use logging::test_logger; use slog::{info, Logger}; use state_processing::ConsensusContext; - use std::collections::{BTreeMap, HashMap, VecDeque}; - use std::ops::AddAssign; + use std::collections::VecDeque; use store::{HotColdDB, ItemStore, LevelDB, StoreConfig}; use tempfile::{tempdir, TempDir}; use types::non_zero_usize::new_non_zero_usize; @@ -1047,39 +478,6 @@ mod test { harness } - #[test] - fn overflow_key_encode_decode_equality() { - type E = types::MainnetEthSpec; - let key_block = OverflowKey::Block(Hash256::random()); - let key_blob_0 = OverflowKey::from_blob_id::(BlobIdentifier { - block_root: Hash256::random(), - index: 0, - }) - .expect("should create overflow key 0"); - let key_blob_1 = OverflowKey::from_blob_id::(BlobIdentifier { - block_root: Hash256::random(), - index: 1, - }) - .expect("should create overflow key 1"); - let key_blob_2 = OverflowKey::from_blob_id::(BlobIdentifier { - block_root: Hash256::random(), - index: 2, - }) - .expect("should create overflow key 2"); - let key_blob_3 = OverflowKey::from_blob_id::(BlobIdentifier { - block_root: Hash256::random(), - index: 3, - }) - .expect("should create overflow key 3"); - - let keys = vec![key_block, key_blob_0, key_blob_1, key_blob_2, key_blob_3]; - for key in keys { - let encoded = key.as_ssz_bytes(); - let decoded = OverflowKey::from_ssz_bytes(&encoded).expect("should decode"); - assert_eq!(key, decoded, "Encoded and decoded keys should be equal"); - } - } - async fn availability_pending_block( harness: &BeaconChainHarness>, ) -> ( @@ -1176,7 +574,7 @@ mod test { capacity: usize, ) -> ( BeaconChainHarness>, - Arc>, + Arc>, TempDir, ) where @@ -1190,7 +588,7 @@ mod test { let test_store = harness.chain.store.clone(); let capacity_non_zero = new_non_zero_usize(capacity); let cache = Arc::new( - OverflowLRUCache::::new(capacity_non_zero, test_store, spec.clone()) + DataAvailabilityCheckerInner::::new(capacity_non_zero, test_store, spec.clone()) .expect("should create cache"), ); (harness, cache, chain_db_path) @@ -1212,10 +610,7 @@ mod test { blobs_expected, "should have expected number of blobs" ); - assert!( - cache.critical.read().in_memory.is_empty(), - "cache should be empty" - ); + assert!(cache.critical.read().is_empty(), "cache should be empty"); let availability = cache .put_pending_executed_block(pending_block) .expect("should put block"); @@ -1225,14 +620,14 @@ mod test { "block doesn't have blobs, should be available" ); assert_eq!( - cache.critical.read().in_memory.len(), + cache.critical.read().len(), 1, "cache should still have block as it hasn't been imported yet" ); // remove the blob to simulate successful import cache.remove_pending_components(root); assert_eq!( - cache.critical.read().in_memory.len(), + cache.critical.read().len(), 0, "cache should be empty now that block has been imported" ); @@ -1242,12 +637,12 @@ mod test { "should be pending blobs" ); assert_eq!( - cache.critical.read().in_memory.len(), + cache.critical.read().len(), 1, "cache should have one block" ); assert!( - cache.critical.read().in_memory.peek(&root).is_some(), + cache.critical.read().peek(&root).is_some(), "newly inserted block should exist in memory" ); } @@ -1262,11 +657,11 @@ mod test { assert!(matches!(availability, Availability::Available(_))); } else { assert!(matches!(availability, Availability::MissingComponents(_))); - assert_eq!(cache.critical.read().in_memory.len(), 1); + assert_eq!(cache.critical.read().len(), 1); } } assert!( - cache.critical.read().in_memory.is_empty(), + cache.critical.read().is_empty(), "cache should be empty now that all components available" ); @@ -1289,7 +684,7 @@ mod test { Availability::MissingComponents(root), "should be pending block" ); - assert_eq!(cache.critical.read().in_memory.len(), 1); + assert_eq!(cache.critical.read().len(), 1); } let availability = cache .put_pending_executed_block(pending_block) @@ -1300,457 +695,17 @@ mod test { availability ); assert!( - cache.critical.read().in_memory.len() == 1, + cache.critical.read().len() == 1, "cache should still have available block until import" ); // remove the blob to simulate successful import cache.remove_pending_components(root); assert!( - cache.critical.read().in_memory.is_empty(), + cache.critical.read().is_empty(), "cache should be empty now that all components available" ); } - #[tokio::test] - async fn overflow_cache_test_overflow() { - type E = MinimalEthSpec; - type T = DiskHarnessType; - let capacity = 4; - let (harness, cache, _path) = setup_harness_and_cache::(capacity).await; - - let mut pending_blocks = VecDeque::new(); - let mut pending_blobs = VecDeque::new(); - let mut roots = VecDeque::new(); - while pending_blobs.len() < capacity + 1 { - let (pending_block, blobs) = availability_pending_block(&harness).await; - if pending_block.num_blobs_expected() == 0 { - // we need blocks with blobs - continue; - } - let root = pending_block.block.canonical_root(); - pending_blocks.push_back(pending_block); - pending_blobs.push_back(blobs); - roots.push_back(root); - } - - for i in 0..capacity { - cache - .put_pending_executed_block(pending_blocks.pop_front().expect("should have block")) - .expect("should put block"); - assert_eq!(cache.critical.read().in_memory.len(), i + 1); - } - for root in roots.iter().take(capacity) { - assert!(cache.critical.read().in_memory.peek(root).is_some()); - } - assert_eq!( - cache.critical.read().in_memory.len(), - capacity, - "cache should be full" - ); - // the first block should be the lru entry - assert_eq!( - *cache - .critical - .read() - .in_memory - .peek_lru() - .expect("should exist") - .0, - roots[0], - "first block should be lru" - ); - - cache - .put_pending_executed_block(pending_blocks.pop_front().expect("should have block")) - .expect("should put block"); - assert_eq!( - cache.critical.read().in_memory.len(), - capacity, - "cache should be full" - ); - assert!( - cache.critical.read().in_memory.peek(&roots[0]).is_none(), - "first block should be evicted" - ); - assert_eq!( - *cache - .critical - .read() - .in_memory - .peek_lru() - .expect("should exist") - .0, - roots[1], - "second block should be lru" - ); - - assert!(cache - .overflow_store - .load_pending_components(roots[0]) - .expect("should exist") - .is_some()); - - let threshold = capacity * 3 / 4; - cache - .maintain_threshold(threshold, Epoch::new(0)) - .expect("should maintain threshold"); - assert_eq!( - cache.critical.read().in_memory.len(), - threshold, - "cache should have been maintained" - ); - - let store_keys = cache - .overflow_store - .read_keys_on_disk() - .expect("should read keys"); - assert_eq!(store_keys.len(), 2); - assert!(store_keys.contains(&roots[0])); - assert!(store_keys.contains(&roots[1])); - assert!(cache.critical.read().store_keys.contains(&roots[0])); - assert!(cache.critical.read().store_keys.contains(&roots[1])); - - let blobs_0 = pending_blobs.pop_front().expect("should have blobs"); - let expected_blobs = blobs_0.len(); - let mut kzg_verified_blobs = vec![]; - for (blob_index, gossip_blob) in blobs_0.into_iter().enumerate() { - kzg_verified_blobs.push(gossip_blob.into_inner()); - let availability = cache - .put_kzg_verified_blobs(roots[0], kzg_verified_blobs.clone()) - .expect("should put blob"); - if blob_index == expected_blobs - 1 { - assert!(matches!(availability, Availability::Available(_))); - // remove the block from the cache to simulate import - cache.remove_pending_components(roots[0]); - } else { - // the first block should be brought back into memory - assert!( - cache.critical.read().in_memory.peek(&roots[0]).is_some(), - "first block should be in memory" - ); - assert!(matches!(availability, Availability::MissingComponents(_))); - } - } - assert_eq!( - cache.critical.read().in_memory.len(), - threshold, - "cache should no longer have the first block" - ); - cache.prune_disk(Epoch::new(0)).expect("should prune disk"); - assert!( - cache - .overflow_store - .load_pending_components(roots[1]) - .expect("no error") - .is_some(), - "second block should still be on disk" - ); - assert!( - cache - .overflow_store - .load_pending_components(roots[0]) - .expect("no error") - .is_none(), - "first block should not be on disk" - ); - } - - #[tokio::test] - async fn overflow_cache_test_maintenance() { - type E = MinimalEthSpec; - type T = DiskHarnessType; - let capacity = E::slots_per_epoch() as usize; - let (harness, cache, _path) = setup_harness_and_cache::(capacity).await; - - let n_epochs = 4; - let mut pending_blocks = VecDeque::new(); - let mut pending_blobs = VecDeque::new(); - let mut epoch_count = BTreeMap::new(); - while pending_blobs.len() < n_epochs * capacity { - let (pending_block, blobs) = availability_pending_block(&harness).await; - if pending_block.num_blobs_expected() == 0 { - // we need blocks with blobs - continue; - } - let epoch = pending_block - .block - .as_block() - .slot() - .epoch(E::slots_per_epoch()); - epoch_count.entry(epoch).or_insert_with(|| 0).add_assign(1); - - pending_blocks.push_back(pending_block); - pending_blobs.push_back(blobs); - } - - for _ in 0..(n_epochs * capacity) { - let pending_block = pending_blocks.pop_front().expect("should have block"); - let mut pending_block_blobs = pending_blobs.pop_front().expect("should have blobs"); - let block_root = pending_block.block.as_block().canonical_root(); - let expected_blobs = pending_block.num_blobs_expected(); - if expected_blobs > 1 { - // might as well add a blob too - let one_blob = pending_block_blobs - .pop() - .expect("should have at least one blob"); - let kzg_verified_blobs = vec![one_blob.into_inner()]; - // generate random boolean - let block_first = (rand::random::() % 2) == 0; - if block_first { - let availability = cache - .put_pending_executed_block(pending_block) - .expect("should put block"); - assert!( - matches!(availability, Availability::MissingComponents(_)), - "should have pending blobs" - ); - let availability = cache - .put_kzg_verified_blobs(block_root, kzg_verified_blobs) - .expect("should put blob"); - assert!( - matches!(availability, Availability::MissingComponents(_)), - "availabilty should be pending blobs: {:?}", - availability - ); - } else { - let availability = cache - .put_kzg_verified_blobs(block_root, kzg_verified_blobs) - .expect("should put blob"); - let root = pending_block.block.as_block().canonical_root(); - assert_eq!( - availability, - Availability::MissingComponents(root), - "should be pending block" - ); - let availability = cache - .put_pending_executed_block(pending_block) - .expect("should put block"); - assert!( - matches!(availability, Availability::MissingComponents(_)), - "should have pending blobs" - ); - } - } else { - let availability = cache - .put_pending_executed_block(pending_block) - .expect("should put block"); - assert!( - matches!(availability, Availability::MissingComponents(_)), - "should be pending blobs" - ); - } - } - - // now we should have a full cache spanning multiple epochs - // run the maintenance routine for increasing epochs and ensure that the cache is pruned - assert_eq!( - cache.critical.read().in_memory.len(), - capacity, - "cache memory should be full" - ); - let store_keys = cache - .overflow_store - .read_keys_on_disk() - .expect("should read keys"); - assert_eq!( - store_keys.len(), - capacity * (n_epochs - 1), - "cache disk should have the rest" - ); - let mut expected_length = n_epochs * capacity; - for (epoch, count) in epoch_count { - cache - .do_maintenance(epoch + 1) - .expect("should run maintenance"); - let disk_keys = cache - .overflow_store - .read_keys_on_disk() - .expect("should read keys") - .len(); - let mem_keys = cache.critical.read().in_memory.len(); - expected_length -= count; - info!( - harness.chain.log, - "EPOCH: {} DISK KEYS: {} MEM KEYS: {} TOTAL: {} EXPECTED: {}", - epoch, - disk_keys, - mem_keys, - (disk_keys + mem_keys), - std::cmp::max(expected_length, capacity * 3 / 4), - ); - assert_eq!( - (disk_keys + mem_keys), - std::cmp::max(expected_length, capacity * 3 / 4), - "cache should be pruned" - ); - } - } - - #[tokio::test] - async fn overflow_cache_test_persist_recover() { - type E = MinimalEthSpec; - type T = DiskHarnessType; - let capacity = E::slots_per_epoch() as usize; - let (harness, cache, _path) = setup_harness_and_cache::(capacity).await; - - let n_epochs = 4; - let mut pending_blocks = VecDeque::new(); - let mut pending_blobs = VecDeque::new(); - let mut epoch_count = BTreeMap::new(); - while pending_blobs.len() < n_epochs * capacity { - let (pending_block, blobs) = availability_pending_block(&harness).await; - if pending_block.num_blobs_expected() == 0 { - // we need blocks with blobs - continue; - } - let epoch = pending_block - .block - .as_block() - .slot() - .epoch(E::slots_per_epoch()); - epoch_count.entry(epoch).or_insert_with(|| 0).add_assign(1); - - pending_blocks.push_back(pending_block); - pending_blobs.push_back(blobs); - } - - let mut remaining_blobs = HashMap::new(); - for _ in 0..(n_epochs * capacity) { - let pending_block = pending_blocks.pop_front().expect("should have block"); - let mut pending_block_blobs = pending_blobs.pop_front().expect("should have blobs"); - let block_root = pending_block.block.as_block().canonical_root(); - let expected_blobs = pending_block.num_blobs_expected(); - if expected_blobs > 1 { - // might as well add a blob too - let one_blob = pending_block_blobs - .pop() - .expect("should have at least one blob"); - let kzg_verified_blobs = vec![one_blob.into_inner()]; - // generate random boolean - let block_first = (rand::random::() % 2) == 0; - if block_first { - let availability = cache - .put_pending_executed_block(pending_block) - .expect("should put block"); - assert!( - matches!(availability, Availability::MissingComponents(_)), - "should have pending blobs" - ); - let availability = cache - .put_kzg_verified_blobs(block_root, kzg_verified_blobs) - .expect("should put blob"); - assert!( - matches!(availability, Availability::MissingComponents(_)), - "availabilty should be pending blobs: {:?}", - availability - ); - } else { - let availability = cache - .put_kzg_verified_blobs(block_root, kzg_verified_blobs) - .expect("should put blob"); - let root = pending_block.block.as_block().canonical_root(); - assert_eq!( - availability, - Availability::MissingComponents(root), - "should be pending block" - ); - let availability = cache - .put_pending_executed_block(pending_block) - .expect("should put block"); - assert!( - matches!(availability, Availability::MissingComponents(_)), - "should have pending blobs" - ); - } - } else { - let availability = cache - .put_pending_executed_block(pending_block) - .expect("should put block"); - assert!( - matches!(availability, Availability::MissingComponents(_)), - "should be pending blobs" - ); - } - remaining_blobs.insert(block_root, pending_block_blobs); - } - - // now we should have a full cache spanning multiple epochs - // cache should be at capacity - assert_eq!( - cache.critical.read().in_memory.len(), - capacity, - "cache memory should be full" - ); - // write all components to disk - cache.write_all_to_disk().expect("should write all to disk"); - // everything should be on disk now - assert_eq!( - cache - .overflow_store - .read_keys_on_disk() - .expect("should read keys") - .len(), - capacity * n_epochs, - "cache disk should have the rest" - ); - assert_eq!( - cache.critical.read().in_memory.len(), - 0, - "cache memory should be empty" - ); - assert_eq!( - cache.critical.read().store_keys.len(), - n_epochs * capacity, - "cache store should have the rest" - ); - drop(cache); - - // create a new cache with the same store - let recovered_cache = OverflowLRUCache::::new( - new_non_zero_usize(capacity), - harness.chain.store.clone(), - harness.chain.spec.clone(), - ) - .expect("should recover cache"); - // again, everything should be on disk - assert_eq!( - recovered_cache - .overflow_store - .read_keys_on_disk() - .expect("should read keys") - .len(), - capacity * n_epochs, - "cache disk should have the rest" - ); - assert_eq!( - recovered_cache.critical.read().in_memory.len(), - 0, - "cache memory should be empty" - ); - assert_eq!( - recovered_cache.critical.read().store_keys.len(), - n_epochs * capacity, - "cache store should have the rest" - ); - - // now lets insert the remaining blobs until the cache is empty - for (root, blobs) in remaining_blobs { - let additional_blobs = blobs.len(); - let mut kzg_verified_blobs = vec![]; - for (i, gossip_blob) in blobs.into_iter().enumerate() { - kzg_verified_blobs.push(gossip_blob.into_inner()); - let availability = recovered_cache - .put_kzg_verified_blobs(root, kzg_verified_blobs.clone()) - .expect("should put blob"); - if i == additional_blobs - 1 { - assert!(matches!(availability, Availability::Available(_))) - } else { - assert!(matches!(availability, Availability::MissingComponents(_))); - } - } - } - } - #[tokio::test] // ensure the state cache keeps memory usage low and that it can properly recover states // THIS TEST CAN BE DELETED ONCE TREE STATES IS MERGED AND WE RIP OUT THE STATE CACHE @@ -1807,7 +762,6 @@ mod test { let diet_block = cache .critical .read() - .in_memory .peek(&block_root) .map(|pending_components| { pending_components @@ -1836,7 +790,7 @@ mod test { // reconstruct the pending block by replaying the block on the parent state let recovered_pending_block = cache .state_lru_cache() - .reconstruct_pending_executed_block(diet_block) + .recover_pending_executed_block(diet_block) .expect("should reconstruct pending block"); // assert the recovered state is the same as the original diff --git a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs index 9775d54c024..e313ed06d2f 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs @@ -114,38 +114,12 @@ impl StateLRUCache { &self, diet_executed_block: DietAvailabilityPendingExecutedBlock, ) -> Result, AvailabilityCheckError> { - let maybe_state = self.states.write().pop(&diet_executed_block.state_root); - if let Some(state) = maybe_state { - let block_root = diet_executed_block.block.canonical_root(); - Ok(AvailabilityPendingExecutedBlock { - block: diet_executed_block.block, - import_data: BlockImportData { - block_root, - state, - parent_block: diet_executed_block.parent_block, - parent_eth1_finalization_data: diet_executed_block - .parent_eth1_finalization_data, - confirmed_state_roots: diet_executed_block.confirmed_state_roots, - consensus_context: diet_executed_block - .consensus_context - .into_consensus_context(), - }, - payload_verification_outcome: diet_executed_block.payload_verification_outcome, - }) + let state = if let Some(state) = self.states.write().pop(&diet_executed_block.state_root) { + state } else { - self.reconstruct_pending_executed_block(diet_executed_block) - } - } - - /// Reconstruct the `AvailabilityPendingExecutedBlock` by loading the parent - /// state from disk and replaying the block. This function does NOT check the - /// LRU cache. - pub fn reconstruct_pending_executed_block( - &self, - diet_executed_block: DietAvailabilityPendingExecutedBlock, - ) -> Result, AvailabilityCheckError> { + self.reconstruct_state(&diet_executed_block)? + }; let block_root = diet_executed_block.block.canonical_root(); - let state = self.reconstruct_state(&diet_executed_block)?; Ok(AvailabilityPendingExecutedBlock { block: diet_executed_block.block, import_data: BlockImportData { diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 4ceaf675cec..be8f46f7d1b 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -401,8 +401,6 @@ lazy_static! { try_create_histogram("beacon_persist_eth1_cache", "Time taken to persist the eth1 caches"); pub static ref PERSIST_FORK_CHOICE: Result = try_create_histogram("beacon_persist_fork_choice", "Time taken to persist the fork choice struct"); - pub static ref PERSIST_DATA_AVAILABILITY_CHECKER: Result = - try_create_histogram("beacon_persist_data_availability_checker", "Time taken to persist the data availability checker"); /* * Eth1 @@ -1213,10 +1211,6 @@ pub fn scrape_for_metrics(beacon_chain: &BeaconChain) { &DATA_AVAILABILITY_OVERFLOW_MEMORY_STATE_CACHE_SIZE, da_checker_metrics.state_cache_size, ); - set_gauge_by_usize( - &DATA_AVAILABILITY_OVERFLOW_STORE_CACHE_SIZE, - da_checker_metrics.num_store_entries, - ); if let Some((size, num_lookups)) = beacon_chain.pre_finalization_block_cache.metrics() { set_gauge_by_usize(&PRE_FINALIZATION_BLOCK_CACHE_SIZE, size); diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 385da5b4fe1..40b667a7447 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -17,7 +17,7 @@ use slasher::{DatabaseBackendOverride, Slasher}; use slog::{info, warn}; use std::ops::{Deref, DerefMut}; use std::sync::Arc; -use types::EthSpec; +use types::{ChainSpec, Epoch, EthSpec, ForkName}; /// A type-alias to the tighten the definition of a production-intended `Client`. pub type ProductionClient = @@ -78,6 +78,16 @@ impl ProductionBeaconNode { TimeoutRwLock::disable_timeouts() } + if let Err(misaligned_forks) = validator_fork_epochs(&spec) { + warn!( + log, + "Fork boundaries are not well aligned / multiples of 256"; + "info" => "This may cause issues as fork boundaries do not align with the \ + start of sync committee period.", + "misaligned_forks" => ?misaligned_forks, + ); + } + let builder = ClientBuilder::new(context.eth_spec_instance.clone()) .runtime_context(context) .chain_spec(spec.clone()) @@ -183,6 +193,28 @@ impl ProductionBeaconNode { } } +fn validator_fork_epochs(spec: &ChainSpec) -> Result<(), Vec<(ForkName, Epoch)>> { + // @dapplion: "We try to schedule forks such that the fork epoch is a multiple of 256, to keep + // historical vectors in the same fork. Indirectly that makes light client periods align with + // fork boundaries." + let sync_committee_period = spec.epochs_per_sync_committee_period; // 256 + let is_fork_boundary_misaligned = |epoch: Epoch| epoch % sync_committee_period != 0; + + let forks_with_misaligned_epochs = ForkName::list_all_fork_epochs(spec) + .iter() + .filter_map(|(fork, fork_epoch_opt)| { + fork_epoch_opt + .and_then(|epoch| is_fork_boundary_misaligned(epoch).then_some((*fork, epoch))) + }) + .collect::>(); + + if forks_with_misaligned_epochs.is_empty() { + Ok(()) + } else { + Err(forks_with_misaligned_epochs) + } +} + impl Deref for ProductionBeaconNode { type Target = ProductionClient; @@ -206,3 +238,23 @@ impl lighthouse_network::discv5::Executor for Discv5Executor { self.0.spawn(future, "discv5") } } + +#[cfg(test)] +mod test { + use super::*; + use types::MainnetEthSpec; + + #[test] + fn test_validator_fork_epoch_alignments() { + let mut spec = MainnetEthSpec::default_spec(); + spec.altair_fork_epoch = Some(Epoch::new(0)); + spec.bellatrix_fork_epoch = Some(Epoch::new(256)); + spec.deneb_fork_epoch = Some(Epoch::new(257)); + spec.electra_fork_epoch = None; + let result = validator_fork_epochs(&spec); + assert_eq!( + result, + Err(vec![(ForkName::Deneb, spec.deneb_fork_epoch.unwrap())]) + ); + } +} diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 0247bea5541..0e0965670b5 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -289,7 +289,7 @@ impl DBColumn { /// This function returns the number of bytes used by keys in a given column. pub fn key_size(self) -> usize { match self { - Self::OverflowLRUCache => 33, // See `OverflowKey` encode impl. + Self::OverflowLRUCache => 33, // DEPRECATED Self::BeaconMeta | Self::BeaconBlock | Self::BeaconState diff --git a/consensus/types/src/fork_name.rs b/consensus/types/src/fork_name.rs index 96f206d4543..55a393e0188 100644 --- a/consensus/types/src/fork_name.rs +++ b/consensus/types/src/fork_name.rs @@ -31,6 +31,16 @@ impl ForkName { ] } + pub fn list_all_fork_epochs(spec: &ChainSpec) -> Vec<(ForkName, Option)> { + vec![ + (ForkName::Altair, spec.altair_fork_epoch), + (ForkName::Bellatrix, spec.bellatrix_fork_epoch), + (ForkName::Capella, spec.capella_fork_epoch), + (ForkName::Deneb, spec.deneb_fork_epoch), + (ForkName::Electra, spec.electra_fork_epoch), + ] + } + pub fn latest() -> ForkName { // This unwrap is safe as long as we have 1+ forks. It is tested below. *ForkName::list_all().last().unwrap()