Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move processing cache out of DA #5420

Merged
merged 6 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions beacon_node/beacon_chain/src/beacon_block_streamer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
use crate::{metrics, BeaconChain, BeaconChainError, BeaconChainTypes};
use execution_layer::{ExecutionLayer, ExecutionPayloadBodyV1};
use slog::{crit, debug, Logger};
use std::collections::HashMap;
Expand Down Expand Up @@ -410,8 +410,13 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
fn check_caches(&self, root: Hash256) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
if self.check_caches == CheckCaches::Yes {
self.beacon_chain
.data_availability_checker
.get_block(&root)
.reqresp_pre_import_cache
.read()
.get(&root)
.map(|block| {
metrics::inc_counter(&metrics::BEACON_REQRESP_PRE_IMPORT_CACHE_HITS);
block.clone()
})
.or(self.beacon_chain.early_attester_cache.get_block(root))
} else {
None
Expand Down
18 changes: 10 additions & 8 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ use store::{
use task_executor::{ShutdownReason, TaskExecutor};
use tokio_stream::Stream;
use tree_hash::TreeHash;
use types::beacon_state::CloneConfig;
use types::blob_sidecar::{BlobSidecarList, FixedBlobSidecarList};
use types::payload::BlockProductionVersion;
use types::*;
Expand Down Expand Up @@ -359,6 +358,9 @@ pub type BeaconStore<T> = Arc<
>,
>;

/// Cache gossip verified blocks to serve over ReqResp before they are imported
type ReqRespPreImportCache<E> = HashMap<Hash256, Arc<SignedBeaconBlock<E>>>;

/// Represents the "Beacon Chain" component of Ethereum 2.0. Allows import of blocks and block
/// operations and chooses a canonical head.
pub struct BeaconChain<T: BeaconChainTypes> {
Expand Down Expand Up @@ -461,6 +463,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub(crate) attester_cache: Arc<AttesterCache>,
/// A cache used when producing attestations whilst the head block is still being imported.
pub early_attester_cache: EarlyAttesterCache<T::EthSpec>,
/// Cache gossip verified blocks to serve over ReqResp before they are imported
pub reqresp_pre_import_cache: Arc<RwLock<ReqRespPreImportCache<T::EthSpec>>>,
/// A cache used to keep track of various block timings.
pub block_times_cache: Arc<RwLock<BlockTimesCache>>,
/// A cache used to track pre-finalization block roots for quick rejection.
Expand Down Expand Up @@ -2891,8 +2895,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

self.data_availability_checker
.notify_gossip_blob(blob.slot(), block_root, &blob);
let r = self.check_gossip_blob_availability_and_import(blob).await;
self.remove_notified(&block_root, r)
}
Expand Down Expand Up @@ -2925,8 +2927,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

self.data_availability_checker
.notify_rpc_blobs(slot, block_root, &blobs);
let r = self
.check_rpc_blob_availability_and_import(slot, block_root, blobs)
.await;
Expand All @@ -2943,7 +2943,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let has_missing_components =
matches!(r, Ok(AvailabilityProcessingStatus::MissingComponents(_, _)));
if !has_missing_components {
self.data_availability_checker.remove_notified(block_root);
self.reqresp_pre_import_cache.write().remove(block_root);
}
r
}
Expand All @@ -2956,8 +2956,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
unverified_block: B,
notify_execution_layer: NotifyExecutionLayer,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
self.data_availability_checker
.notify_block(block_root, unverified_block.block_cloned());
self.reqresp_pre_import_cache
.write()
.insert(block_root, unverified_block.block_cloned());

let r = self
.process_block(block_root, unverified_block, notify_execution_layer, || {
Ok(())
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,7 @@ where
validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache),
attester_cache: <_>::default(),
early_attester_cache: <_>::default(),
reqresp_pre_import_cache: <_>::default(),
light_client_server_cache: LightClientServerCache::new(),
light_client_server_tx: self.light_client_server_tx,
shutdown_sender: self
Expand Down
108 changes: 16 additions & 92 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@ pub use crate::data_availability_checker::availability_view::{
};
pub use crate::data_availability_checker::child_components::ChildComponents;
use crate::data_availability_checker::overflow_lru_cache::OverflowLRUCache;
use crate::data_availability_checker::processing_cache::ProcessingCache;
use crate::{BeaconChain, BeaconChainTypes, BeaconStore};
use kzg::Kzg;
use parking_lot::RwLock;
pub use processing_cache::ProcessingComponents;
use slasher::test_utils::E;
use slog::{debug, error, Logger};
use slot_clock::SlotClock;
Expand All @@ -20,15 +17,13 @@ use std::fmt::Debug;
use std::num::NonZeroUsize;
use std::sync::Arc;
use task_executor::TaskExecutor;
use types::beacon_block_body::KzgCommitmentOpts;
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::{BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
use types::{BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock};

mod availability_view;
mod child_components;
mod error;
mod overflow_lru_cache;
mod processing_cache;
mod state_lru_cache;

pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory};
Expand All @@ -49,7 +44,6 @@ pub const STATE_LRU_CAPACITY: usize = STATE_LRU_CAPACITY_NON_ZERO.get();
/// `DataAvailabilityChecker` is responsible for KZG verification of block components as well as
/// checking whether a "availability check" is required at all.
pub struct DataAvailabilityChecker<T: BeaconChainTypes> {
processing_cache: RwLock<ProcessingCache<T::EthSpec>>,
availability_cache: Arc<OverflowLRUCache<T>>,
slot_clock: T::SlotClock,
kzg: Option<Arc<Kzg>>,
Expand Down Expand Up @@ -88,7 +82,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
) -> Result<Self, AvailabilityCheckError> {
let overflow_cache = OverflowLRUCache::new(OVERFLOW_LRU_CAPACITY, store, spec.clone())?;
Ok(Self {
processing_cache: <_>::default(),
availability_cache: Arc::new(overflow_cache),
slot_clock,
log: log.clone(),
Expand All @@ -97,17 +90,17 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
})
}

/// Checks if the given block root is cached.
/// Checks if the block root is currenlty in the availability cache awaiting processing because
/// of missing components.
pub fn has_block(&self, block_root: &Hash256) -> bool {
self.processing_cache.read().has_block(block_root)
self.availability_cache.has_block(block_root)
}

/// Get the processing info for a block.
pub fn get_processing_components(
&self,
block_root: Hash256,
) -> Option<ProcessingComponents<T::EthSpec>> {
self.processing_cache.read().get(&block_root).cloned()
pub fn get_missing_blob_ids_with(&self, block_root: Hash256) -> MissingBlobs {
self.availability_cache
.with_pending_components(&block_root, |pending_components| {
self.get_missing_blob_ids(block_root, pending_components)
})
}

/// A `None` indicates blobs are not required.
Expand All @@ -117,7 +110,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
pub fn get_missing_blob_ids<V: AvailabilityView<T::EthSpec>>(
&self,
block_root: Hash256,
availability_view: &V,
availability_view: Option<&V>,
) -> MissingBlobs {
let Some(current_slot) = self.slot_clock.now_or_genesis() else {
error!(
Expand All @@ -130,6 +123,12 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch());

if self.da_check_required_for_epoch(current_epoch) {
let Some(availability_view) = availability_view else {
return MissingBlobs::PossibleMissing(BlobIdentifier::get_all_blob_ids::<E>(
block_root,
));
};

match availability_view.get_cached_block() {
Some(cached_block) => {
let block_commitments = cached_block.get_commitments();
Expand Down Expand Up @@ -192,14 +191,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
self.availability_cache.peek_blob(blob_id)
}

/// Get a block from the availability cache. Includes any blocks we are currently processing.
pub fn get_block(&self, block_root: &Hash256) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
self.processing_cache
.read()
.get(block_root)
.and_then(|cached| cached.block.clone())
}

/// Put a list of blobs received via RPC into the availability cache. This performs KZG
/// verification on the blobs in the list.
pub fn put_rpc_blobs(
Expand Down Expand Up @@ -352,71 +343,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block.num_expected_blobs() > 0 && self.da_check_required_for_epoch(block.epoch())
}

/// Adds a block to the processing cache. This block's commitments are unverified but caching
/// them here is useful to avoid duplicate downloads of blocks, as well as understanding
/// our blob download requirements. We will also serve this over RPC.
pub fn notify_block(&self, block_root: Hash256, block: Arc<SignedBeaconBlock<T::EthSpec>>) {
let slot = block.slot();
self.processing_cache
.write()
.entry(block_root)
.or_insert_with(|| ProcessingComponents::new(slot))
.merge_block(block);
}

/// Add a single blob commitment to the processing cache. This commitment is unverified but caching
/// them here is useful to avoid duplicate downloads of blobs, as well as understanding
/// our block and blob download requirements.
pub fn notify_gossip_blob(
&self,
slot: Slot,
block_root: Hash256,
blob: &GossipVerifiedBlob<T>,
) {
let index = blob.index();
let commitment = blob.kzg_commitment();
self.processing_cache
.write()
.entry(block_root)
.or_insert_with(|| ProcessingComponents::new(slot))
.merge_single_blob(index as usize, commitment);
}

/// Adds blob commitments to the processing cache. These commitments are unverified but caching
/// them here is useful to avoid duplicate downloads of blobs, as well as understanding
/// our block and blob download requirements.
pub fn notify_rpc_blobs(
&self,
slot: Slot,
block_root: Hash256,
blobs: &FixedBlobSidecarList<T::EthSpec>,
) {
let mut commitments = KzgCommitmentOpts::<T::EthSpec>::default();
for blob in blobs.iter().flatten() {
if let Some(commitment) = commitments.get_mut(blob.index as usize) {
*commitment = Some(blob.kzg_commitment);
}
}
self.processing_cache
.write()
.entry(block_root)
.or_insert_with(|| ProcessingComponents::new(slot))
.merge_blobs(commitments);
}

/// Clears the block and all blobs from the processing cache for a give root if they exist.
pub fn remove_notified(&self, block_root: &Hash256) {
self.processing_cache.write().remove(block_root)
}

/// Gather all block roots for which we are not currently processing all components for the
/// given slot.
pub fn incomplete_processing_components(&self, slot: Slot) -> Vec<Hash256> {
self.processing_cache
.read()
.incomplete_processing_components(slot)
}

/// The epoch at which we require a data availability check in block processing.
/// `None` if the `Deneb` fork is disabled.
pub fn data_availability_boundary(&self) -> Option<Epoch> {
Expand Down Expand Up @@ -458,7 +384,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
/// Collects metrics from the data availability checker.
pub fn metrics(&self) -> DataAvailabilityCheckerMetrics {
DataAvailabilityCheckerMetrics {
processing_cache_size: self.processing_cache.read().len(),
num_store_entries: self.availability_cache.num_store_entries(),
state_cache_size: self.availability_cache.state_cache_size(),
block_cache_size: self.availability_cache.block_cache_size(),
Expand All @@ -468,7 +393,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {

/// Helper struct to group data availability checker metrics.
pub struct DataAvailabilityCheckerMetrics {
pub processing_cache_size: usize,
pub num_store_entries: usize,
pub state_cache_size: usize,
pub block_cache_size: usize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use super::state_lru_cache::DietAvailabilityPendingExecutedBlock;
use crate::blob_verification::KzgVerifiedBlob;
use crate::block_verification_types::AsBlock;
use crate::data_availability_checker::overflow_lru_cache::PendingComponents;
use crate::data_availability_checker::ProcessingComponents;
use kzg::KzgCommitment;
use ssz_types::FixedVector;
use std::sync::Arc;
Expand Down Expand Up @@ -180,14 +179,6 @@ macro_rules! impl_availability_view {
};
}

impl_availability_view!(
ProcessingComponents,
Arc<SignedBeaconBlock<E>>,
KzgCommitment,
block,
blob_commitments
);

impl_availability_view!(
PendingComponents,
DietAvailabilityPendingExecutedBlock<E>,
Expand Down Expand Up @@ -303,32 +294,6 @@ pub mod tests {
(block, blobs, invalid_blobs)
}

type ProcessingViewSetup<E> = (
Arc<SignedBeaconBlock<E>>,
FixedVector<Option<KzgCommitment>, <E as EthSpec>::MaxBlobsPerBlock>,
FixedVector<Option<KzgCommitment>, <E as EthSpec>::MaxBlobsPerBlock>,
);

pub fn setup_processing_components(
block: SignedBeaconBlock<E>,
valid_blobs: FixedVector<Option<Arc<BlobSidecar<E>>>, <E as EthSpec>::MaxBlobsPerBlock>,
invalid_blobs: FixedVector<Option<Arc<BlobSidecar<E>>>, <E as EthSpec>::MaxBlobsPerBlock>,
) -> ProcessingViewSetup<E> {
let blobs = FixedVector::from(
valid_blobs
.iter()
.map(|blob_opt| blob_opt.as_ref().map(|blob| blob.kzg_commitment))
.collect::<Vec<_>>(),
);
let invalid_blobs = FixedVector::from(
invalid_blobs
.iter()
.map(|blob_opt| blob_opt.as_ref().map(|blob| blob.kzg_commitment))
.collect::<Vec<_>>(),
);
(Arc::new(block), blobs, invalid_blobs)
}

type PendingComponentsSetup<E> = (
DietAvailabilityPendingExecutedBlock<E>,
FixedVector<Option<KzgVerifiedBlob<E>>, <E as EthSpec>::MaxBlobsPerBlock>,
Expand Down Expand Up @@ -517,13 +482,6 @@ pub mod tests {
};
}

generate_tests!(
processing_components_tests,
ProcessingComponents::<E>,
kzg_commitments,
processing_blobs,
setup_processing_components
);
generate_tests!(
pending_components_tests,
PendingComponents<E>,
Expand Down
Loading