Skip to content

Commit

Permalink
Revert reconstruction behaviour to always go ahead rather than allowi…
Browse files Browse the repository at this point in the history
…ng one at a time. Address other review comments.
  • Loading branch information
jimmygchen committed Jun 28, 2024
1 parent 1e3964e commit 6ac055d
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 27 deletions.
4 changes: 4 additions & 0 deletions beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,10 @@ impl<E: EthSpec> AsBlock<E> for AvailableBlock<E> {
(Some(blobs), _) => RpcBlockInner::BlockAndBlobs(block, blobs),
(_, Some(data_columns)) => RpcBlockInner::BlockAndCustodyColumns(
block,
// TODO(das): This is an ugly hack that should be removed. After updating
// store types to handle custody data columns this should not be required.
// It's okay-ish because available blocks must have all the required custody
// columns.
data_columns
.into_iter()
.map(CustodyDataColumn::from_asserted_custody)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub struct PendingComponents<E: EthSpec> {
pub verified_blobs: FixedVector<Option<KzgVerifiedBlob<E>>, E::MaxBlobsPerBlock>,
pub verified_data_columns: RuntimeVariableList<KzgVerifiedCustodyDataColumn<E>>,
pub executed_block: Option<DietAvailabilityPendingExecutedBlock<E>>,
pub reconstruction_started: bool,
}

pub enum BlockImportRequirement {
Expand Down Expand Up @@ -277,6 +278,7 @@ impl<E: EthSpec> PendingComponents<E> {
verified_blobs: FixedVector::default(),
verified_data_columns: RuntimeVariableList::empty(spec.number_of_columns),
executed_block: None,
reconstruction_started: false,
}
}

Expand Down Expand Up @@ -357,6 +359,11 @@ impl<E: EthSpec> PendingComponents<E> {
)))
}

/// Mark reconstruction as started for this `PendingComponent`.
pub fn reconstruction_started(&mut self) {
self.reconstruction_started = true;
}

/// Returns the epoch of the block if it is cached, otherwise returns the epoch of the first blob.
pub fn epoch(&self) -> Option<Epoch> {
self.executed_block
Expand Down Expand Up @@ -744,8 +751,6 @@ pub struct OverflowLRUCache<T: BeaconChainTypes> {
capacity: NonZeroUsize,
/// The number of data columns the node is custodying.
custody_column_count: usize,
/// The block root of data columns currently being reconstructed, if any.
reconstructing_block_root: Mutex<Option<Hash256>>,
log: Logger,
spec: Arc<ChainSpec>,
}
Expand All @@ -768,7 +773,6 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
maintenance_lock: Mutex::new(()),
capacity,
custody_column_count,
reconstructing_block_root: Mutex::new(None),
log,
spec,
})
Expand Down Expand Up @@ -858,7 +862,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
AvailabilityCheckError,
> {
// Clone the pending components, so we don't hold the read lock during reconstruction
let Some(pending_components) = self
let Some(mut pending_components) = self
.peek_pending_components(&block_root, |pending_components_opt| {
pending_components_opt.cloned()
})
Expand All @@ -872,7 +876,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
.map(|r| self.should_reconstruct(&r, &pending_components))?;

if should_reconstruct {
*self.reconstructing_block_root.lock() = Some(block_root);
pending_components.reconstruction_started();

let timer = metrics::start_timer(&metrics::DATA_AVAILABILITY_RECONSTRUCTION_TIME);

Expand All @@ -898,6 +902,8 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
})
else {
// If block is already imported (no longer in cache), abort publishing data columns
// TODO(das) This assumes only supernodes do reconstructions (i.e. custody
// requirement = all columns). This behaviour is likely to change in the future.
return Ok(None);
};

Expand Down Expand Up @@ -988,8 +994,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
let has_missing_columns = pending_components.verified_data_columns.len() < num_of_columns;

has_missing_columns
// for simplicity now, we only reconstruct columns for one block at a time.
&& self.reconstructing_block_root.lock().is_none()
&& !pending_components.reconstruction_started
&& *num_expected_columns == num_of_columns
&& pending_components.verified_data_columns.len() >= num_of_columns / 2
}
Expand Down
32 changes: 12 additions & 20 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -778,25 +778,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
});
}

/// Re-seed the network with reconstructed data columns.
fn handle_data_columns_to_publish(
&self,
data_columns_to_publish: DataColumnSidecarVec<T::EthSpec>,
) {
self.send_network_message(NetworkMessage::Publish {
messages: data_columns_to_publish
.iter()
.map(|d| {
let subnet = DataColumnSubnetId::from_column_index::<T::EthSpec>(
d.index as usize,
&self.chain.spec,
);
PubsubMessage::DataColumnSidecar(Box::new((subnet, d.clone())))
})
.collect(),
});
}

async fn attempt_data_column_reconstruction(&self, block_root: Hash256) {
let result = self.chain.reconstruct_data_columns(block_root).await;
match result {
Expand All @@ -822,7 +803,18 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}

self.handle_data_columns_to_publish(data_columns_to_publish);
self.send_network_message(NetworkMessage::Publish {
messages: data_columns_to_publish
.iter()
.map(|d| {
let subnet = DataColumnSubnetId::from_column_index::<T::EthSpec>(
d.index as usize,
&self.chain.spec,
);
PubsubMessage::DataColumnSidecar(Box::new((subnet, d.clone())))
})
.collect(),
});
}
Ok(None) => {
debug!(
Expand Down

0 comments on commit 6ac055d

Please sign in to comment.