Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update is_available check to support PeerDAS. #6076

Merged
merged 6 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,17 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
log: &Logger,
spec: ChainSpec,
) -> Result<Self, AvailabilityCheckError> {
let overflow_cache =
DataAvailabilityCheckerInner::new(OVERFLOW_LRU_CAPACITY, store, spec.clone())?;
// TODO(das): support supernode or custom custody requirement
pawanjay176 marked this conversation as resolved.
Show resolved Hide resolved
let custody_subnet_count = spec.custody_requirement as usize;
let custody_column_count =
custody_subnet_count.saturating_mul(spec.data_columns_per_subnet());

let overflow_cache = DataAvailabilityCheckerInner::new(
OVERFLOW_LRU_CAPACITY,
store,
custody_column_count,
spec.clone(),
)?;
Ok(Self {
availability_cache: Arc::new(overflow_cache),
slot_clock,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub enum Error {
blob_commitment: KzgCommitment,
block_commitment: KzgCommitment,
},
UnableToDetermineImportRequirement,
Unexpected,
SszTypes(ssz_types::Error),
MissingBlobs,
Expand Down Expand Up @@ -41,6 +42,7 @@ impl Error {
| Error::Unexpected
| Error::ParentStateMissing(_)
| Error::BlockReplayError(_)
| Error::UnableToDetermineImportRequirement
| Error::RebuildingStateCaches(_)
| Error::SlotClockError => ErrorCategory::Internal,
Error::Kzg(_)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::block_verification_types::{
AvailabilityPendingExecutedBlock, AvailableBlock, AvailableExecutedBlock,
};
use crate::data_availability_checker::{Availability, AvailabilityCheckError};
use crate::data_column_verification::KzgVerifiedCustodyDataColumn;
use crate::BeaconChainTypes;
use lru::LruCache;
use parking_lot::RwLock;
Expand All @@ -23,9 +24,15 @@ use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock};
pub struct PendingComponents<E: EthSpec> {
pub block_root: Hash256,
pub verified_blobs: FixedVector<Option<KzgVerifiedBlob<E>>, E::MaxBlobsPerBlock>,
pub verified_data_columns: Vec<KzgVerifiedCustodyDataColumn<E>>,
pub executed_block: Option<DietAvailabilityPendingExecutedBlock<E>>,
}

pub enum BlockImportRequirement {
AllBlobs,
CustodyColumns(usize),
}

impl<E: EthSpec> PendingComponents<E> {
/// Returns an immutable reference to the cached block.
pub fn get_cached_block(&self) -> &Option<DietAvailabilityPendingExecutedBlock<E>> {
Expand Down Expand Up @@ -78,6 +85,11 @@ impl<E: EthSpec> PendingComponents<E> {
self.get_cached_blobs().iter().flatten().count()
}

/// Returns the number of data columns that have been received and are stored in the cache.
pub fn num_received_data_columns(&self) -> usize {
self.verified_data_columns.len()
}

/// Inserts a block into the cache.
pub fn insert_block(&mut self, block: DietAvailabilityPendingExecutedBlock<E>) {
*self.get_cached_block_mut() = Some(block)
Expand Down Expand Up @@ -134,15 +146,29 @@ impl<E: EthSpec> PendingComponents<E> {
self.merge_blobs(reinsert);
}

/// Checks if the block and all of its expected blobs are available in the cache.
/// Checks if the block and all of its expected blobs or custody columns (post-PeerDAS) are
/// available in the cache.
///
/// Returns `true` if both the block exists and the number of received blobs matches the number
/// of expected blobs.
pub fn is_available(&self) -> bool {
if let Some(num_expected_blobs) = self.num_expected_blobs() {
num_expected_blobs == self.num_received_blobs()
} else {
false
/// Returns `true` if both the block exists and the number of received blobs / custody columns
/// matches the number of expected blobs / custody columns.
pub fn is_available(&self, block_import_requirement: &BlockImportRequirement) -> bool {
match block_import_requirement {
BlockImportRequirement::AllBlobs => {
if let Some(num_expected_blobs) = self.num_expected_blobs() {
pawanjay176 marked this conversation as resolved.
Show resolved Hide resolved
num_expected_blobs == self.num_received_blobs()
} else {
false
}
}
BlockImportRequirement::CustodyColumns(num_expected_columns) => {
let num_received_data_columns = self.num_received_data_columns();
if let Some(num_expected_blobs) = self.num_expected_blobs() {
pawanjay176 marked this conversation as resolved.
Show resolved Hide resolved
// No data columns when there are 0 blobs
num_expected_blobs == 0 || *num_expected_columns == num_received_data_columns
} else {
false
}
}
}
}

Expand All @@ -151,6 +177,7 @@ impl<E: EthSpec> PendingComponents<E> {
Self {
block_root,
verified_blobs: FixedVector::default(),
verified_data_columns: vec![],
executed_block: None,
}
}
Expand All @@ -170,6 +197,7 @@ impl<E: EthSpec> PendingComponents<E> {
let Self {
block_root,
verified_blobs,
verified_data_columns: _,
executed_block,
} = self;

Expand Down Expand Up @@ -212,6 +240,35 @@ impl<E: EthSpec> PendingComponents<E> {
AvailableExecutedBlock::new(available_block, import_data, payload_verification_outcome),
)))
}

/// Returns the epoch of the block if it is cached, otherwise returns the epoch of the first blob.
pub fn epoch(&self) -> Option<Epoch> {
pawanjay176 marked this conversation as resolved.
Show resolved Hide resolved
self.executed_block
.as_ref()
.map(|pending_block| pending_block.as_block().epoch())
.or_else(|| {
for maybe_blob in self.verified_blobs.iter() {
if maybe_blob.is_some() {
return maybe_blob.as_ref().map(|kzg_verified_blob| {
kzg_verified_blob
.as_blob()
.slot()
.epoch(E::slots_per_epoch())
});
}
}

if let Some(kzg_verified_data_column) = self.verified_data_columns.first() {
let epoch = kzg_verified_data_column
.as_data_column()
.slot()
.epoch(E::slots_per_epoch());
return Some(epoch);
}

None
})
}
}

/// This is the main struct for this module. Outside methods should
Expand All @@ -222,17 +279,23 @@ pub struct DataAvailabilityCheckerInner<T: BeaconChainTypes> {
/// This cache holds a limited number of states in memory and reconstructs them
/// from disk when necessary. This is necessary until we merge tree-states
state_cache: StateLRUCache<T>,
/// The number of data columns the node is custodying.
custody_column_count: usize,
spec: ChainSpec,
}

impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
pub fn new(
capacity: NonZeroUsize,
beacon_store: BeaconStore<T>,
custody_column_count: usize,
spec: ChainSpec,
) -> Result<Self, AvailabilityCheckError> {
Ok(Self {
critical: RwLock::new(LruCache::new(capacity)),
state_cache: StateLRUCache::new(beacon_store, spec),
state_cache: StateLRUCache::new(beacon_store, spec.clone()),
custody_column_count,
spec,
})
}

Expand Down Expand Up @@ -277,6 +340,24 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
f(self.critical.read().peek(block_root))
}

fn block_import_requirement(
&self,
pending_components: &PendingComponents<T::EthSpec>,
) -> Result<BlockImportRequirement, AvailabilityCheckError> {
let epoch = pending_components
.epoch()
.ok_or(AvailabilityCheckError::UnableToDetermineImportRequirement)?;

let peer_das_enabled = self.spec.is_peer_das_enabled_for_epoch(epoch);
if peer_das_enabled {
Ok(BlockImportRequirement::CustodyColumns(
self.custody_column_count,
))
} else {
Ok(BlockImportRequirement::AllBlobs)
}
}

pub fn put_kzg_verified_blobs<I: IntoIterator<Item = KzgVerifiedBlob<T::EthSpec>>>(
&self,
block_root: Hash256,
Expand All @@ -301,7 +382,8 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
// Merge in the blobs.
pending_components.merge_blobs(fixed_blobs);

if pending_components.is_available() {
let block_import_requirement = self.block_import_requirement(&pending_components)?;
if pending_components.is_available(&block_import_requirement) {
write_lock.put(block_root, pending_components.clone());
// No need to hold the write lock anymore
drop(write_lock);
Expand Down Expand Up @@ -338,7 +420,8 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
pending_components.merge_block(diet_executed_block);

// Check if we have all components and entire set is consistent.
if pending_components.is_available() {
let block_import_requirement = self.block_import_requirement(&pending_components)?;
if pending_components.is_available(&block_import_requirement) {
write_lock.put(block_root, pending_components.clone());
// No need to hold the write lock anymore
drop(write_lock);
Expand Down Expand Up @@ -401,6 +484,7 @@ mod test {
use types::{ExecPayload, MinimalEthSpec};

const LOW_VALIDATOR_COUNT: usize = 32;
const DEFAULT_TEST_CUSTODY_COLUMN_COUNT: usize = 8;

fn get_store_with_spec<E: EthSpec>(
db_path: &TempDir,
Expand Down Expand Up @@ -588,8 +672,13 @@ mod test {
let test_store = harness.chain.store.clone();
let capacity_non_zero = new_non_zero_usize(capacity);
let cache = Arc::new(
DataAvailabilityCheckerInner::<T>::new(capacity_non_zero, test_store, spec.clone())
.expect("should create cache"),
DataAvailabilityCheckerInner::<T>::new(
capacity_non_zero,
test_store,
DEFAULT_TEST_CUSTODY_COLUMN_COUNT,
spec.clone(),
)
.expect("should create cache"),
);
(harness, cache, chain_db_path)
}
Expand Down
19 changes: 19 additions & 0 deletions beacon_node/beacon_chain/src/data_column_verification.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use derivative::Derivative;
use ssz_derive::{Decode, Encode};
use std::sync::Arc;
use types::data_column_sidecar::DataColumnSidecar;
use types::EthSpec;

/// Data column that we must custody and has completed kzg verification
#[derive(Debug, Derivative, Clone, Encode, Decode)]
#[derivative(PartialEq, Eq)]
#[ssz(struct_behaviour = "transparent")]
pub struct KzgVerifiedCustodyDataColumn<E: EthSpec> {
data: Arc<DataColumnSidecar<E>>,
}

impl<E: EthSpec> KzgVerifiedCustodyDataColumn<E> {
pub fn as_data_column(&self) -> &DataColumnSidecar<E> {
&self.data
}
}
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod canonical_head;
pub mod capella_readiness;
pub mod chain_config;
pub mod data_availability_checker;
mod data_column_verification;
pub mod deneb_readiness;
mod early_attester_cache;
pub mod electra_readiness;
Expand Down
32 changes: 32 additions & 0 deletions consensus/types/src/chain_spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ pub struct ChainSpec {
/*
* DAS params
*/
pub eip7594_fork_epoch: Option<Epoch>,
pub custody_requirement: u64,
pub data_column_sidecar_subnet_count: u64,
pub number_of_columns: usize,

/*
Expand Down Expand Up @@ -392,6 +395,13 @@ impl ChainSpec {
}
}

/// Returns true if the given epoch is greater than or equal to the `EIP7594_FORK_EPOCH`.
pub fn is_peer_das_enabled_for_epoch(&self, block_epoch: Epoch) -> bool {
self.eip7594_fork_epoch.map_or(false, |eip7594_fork_epoch| {
block_epoch >= eip7594_fork_epoch
})
}

/// For a given `BeaconState`, return the whistleblower reward quotient associated with its variant.
pub fn whistleblower_reward_quotient_for_state<E: EthSpec>(
&self,
Expand Down Expand Up @@ -587,6 +597,12 @@ impl ChainSpec {
}
}

pub fn data_columns_per_subnet(&self) -> usize {
self.number_of_columns
.safe_div(self.data_column_sidecar_subnet_count as usize)
.expect("Subnet count must be greater than 0")
}

/// Returns a `ChainSpec` compatible with the Ethereum Foundation specification.
pub fn mainnet() -> Self {
Self {
Expand Down Expand Up @@ -777,6 +793,12 @@ impl ChainSpec {
})
.expect("calculation does not overflow"),

/*
* DAS params
*/
eip7594_fork_epoch: None,
custody_requirement: 1,
data_column_sidecar_subnet_count: 32,
number_of_columns: 128,

/*
Expand Down Expand Up @@ -880,6 +902,10 @@ impl ChainSpec {
electra_fork_epoch: None,
max_pending_partials_per_withdrawals_sweep: u64::checked_pow(2, 0)
.expect("pow does not overflow"),
/*
* DAS params
*/
eip7594_fork_epoch: None,
// Other
network_id: 2, // lighthouse testnet network id
deposit_chain_id: 5,
Expand Down Expand Up @@ -1081,6 +1107,12 @@ impl ChainSpec {
})
.expect("calculation does not overflow"),

/*
* DAS params
*/
eip7594_fork_epoch: None,
custody_requirement: 1,
data_column_sidecar_subnet_count: 32,
number_of_columns: 128,

/*
Expand Down
Loading