Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual finalization endpoint new pruning #7060

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
be105d1
Drop head tracker for summaries dag
dapplion Dec 28, 2024
8c15bab
Improve state summary dag compute logic
dapplion Jan 10, 2025
8c9a1b2
Implement db schema upgrade and downgrade
dapplion Jan 20, 2025
10bbb2e
Log about multiple roots in dag tree
dapplion Feb 3, 2025
28d7b74
Add states descendants_of
dapplion Feb 3, 2025
c5b4293
Prune descendants of finalized checkpoint not finalized block
dapplion Feb 3, 2025
066f96a
Prevent very long log line
dapplion Feb 3, 2025
663dfd3
Update tests
dapplion Feb 3, 2025
979e43a
Tweak logs
michaelsproul Feb 3, 2025
e56299c
Annotate SummariesDagError error
dapplion Feb 3, 2025
91eab38
Deprecate block DAG for pruning
dapplion Feb 3, 2025
12fa5a8
Remove some persisted head stuff
michaelsproul Feb 3, 2025
ed97b97
Use slot clock in heads
dapplion Feb 4, 2025
b9d8ae7
Fix nodes_without_children
dapplion Feb 4, 2025
d5a03c9
Add more range sync tests (#6872)
dapplion Feb 10, 2025
ec2fe38
Merge remote-tracking branch 'origin/release-v7.0.0-beta.0' into unst…
michaelsproul Feb 11, 2025
3992d6b
Fix misc PeerDAS todos (#6862)
dapplion Feb 11, 2025
d603881
Add PeerDAS metrics to track subnets without peers (#6928)
dapplion Feb 11, 2025
431dd7c
Remove un-used batch sync error condition (#6917)
dapplion Feb 11, 2025
0055af5
Unsubscribe blob topics at Fulu fork (#6932)
dapplion Feb 11, 2025
6ab6eae
Merge remote-tracking branch 'origin/release-v7.0.0-beta.0' into unst…
michaelsproul Feb 13, 2025
7033656
Fix compilation and remove error from `heads`
michaelsproul Feb 14, 2025
1dc6d5e
Merge remote-tracking branch 'origin/unstable' into drop-headtracker
michaelsproul Feb 14, 2025
f6786eb
Tidy and document `migrate_database`.
michaelsproul Feb 14, 2025
37be9ae
Tweaks in `prune_hot_db`.
michaelsproul Feb 14, 2025
cf3b776
Correct assert in `revert_minority_fork_on_resume`
michaelsproul Feb 18, 2025
54010b0
Update consensus/proto_array/src/proto_array.rs
dapplion Feb 19, 2025
7abbaeb
Use descent from finality instead of viability
michaelsproul Feb 20, 2025
5cc266c
Clean up DB migrations
michaelsproul Feb 20, 2025
abb3c3f
Prevent deletion of payloads >= split slot
michaelsproul Feb 20, 2025
2cb71e2
Load block roots from fork choice where possible to avoid loading sta…
jimmygchen Feb 28, 2025
bd093d9
Check if the start slot is newer than finalization (`start_slot >= fi…
jimmygchen Feb 28, 2025
b03c18d
force finalization endpoint
eserilev Feb 28, 2025
9057a88
cleanup
eserilev Feb 28, 2025
d43553c
Merge branch 'holesky-rescue' into manual-finalization-endpoint
michaelsproul Feb 28, 2025
b567cc4
Remove ds store
eserilev Feb 28, 2025
f397b87
Merge branch 'manual-finalization-endpoint' of https://github.com/ese…
eserilev Feb 28, 2025
9f4e757
Don't import blocks that conflict with the split
michaelsproul Feb 28, 2025
25465a6
Merge remote-tracking branch 'origin/drop-headtracker' into manual-fi…
michaelsproul Mar 1, 2025
3d2c9d5
Fix descent from split check
michaelsproul Mar 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

95 changes: 25 additions & 70 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use crate::events::ServerSentEventHandler;
use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, PreparePayloadHandle};
use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult};
use crate::graffiti_calculator::GraffitiCalculator;
use crate::head_tracker::{HeadTracker, HeadTrackerReader, SszHeadTracker};
use crate::kzg_utils::reconstruct_blobs;
use crate::light_client_finality_update_verification::{
Error as LightClientFinalityUpdateError, VerifiedLightClientFinalityUpdate,
Expand All @@ -42,7 +41,7 @@ use crate::light_client_optimistic_update_verification::{
Error as LightClientOptimisticUpdateError, VerifiedLightClientOptimisticUpdate,
};
use crate::light_client_server_cache::LightClientServerCache;
use crate::migrate::BackgroundMigrator;
use crate::migrate::{BackgroundMigrator, ManualFinalizationNotification};
use crate::naive_aggregation_pool::{
AggregatedAttestationMap, Error as NaiveAggregationError, NaiveAggregationPool,
SyncContributionAggregateMap,
Expand All @@ -57,7 +56,7 @@ use crate::observed_block_producers::ObservedBlockProducers;
use crate::observed_data_sidecars::ObservedDataSidecars;
use crate::observed_operations::{ObservationOutcome, ObservedOperations};
use crate::observed_slashable::ObservedSlashable;
use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT};
use crate::persisted_beacon_chain::PersistedBeaconChain;
use crate::persisted_fork_choice::PersistedForkChoice;
use crate::pre_finalization_cache::PreFinalizationBlockCache;
use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache};
Expand Down Expand Up @@ -458,8 +457,6 @@ pub struct BeaconChain<T: BeaconChainTypes> {
/// A handler for events generated by the beacon chain. This is only initialized when the
/// HTTP server is enabled.
pub event_handler: Option<ServerSentEventHandler<T::EthSpec>>,
/// Used to track the heads of the beacon chain.
pub(crate) head_tracker: Arc<HeadTracker>,
/// Caches the attester shuffling for a given epoch and shuffling key root.
pub shuffling_cache: RwLock<ShufflingCache>,
/// A cache of eth1 deposit data at epoch boundaries for deposit finalization
Expand Down Expand Up @@ -613,57 +610,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
})
}

/// Persists the head tracker and fork choice.
/// Return a database operation for writing the `PersistedBeaconChain` to disk.
///
/// We do it atomically even though no guarantees need to be made about blocks from
/// the head tracker also being present in fork choice.
pub fn persist_head_and_fork_choice(&self) -> Result<(), Error> {
let mut batch = vec![];

let _head_timer = metrics::start_timer(&metrics::PERSIST_HEAD);

// Hold a lock to head_tracker until it has been persisted to disk. Otherwise there's a race
// condition with the pruning thread which can result in a block present in the head tracker
// but absent in the DB. This inconsistency halts pruning and dramastically increases disk
// size. Ref: https://github.com/sigp/lighthouse/issues/4773
let head_tracker = self.head_tracker.0.read();
batch.push(self.persist_head_in_batch(&head_tracker));

let _fork_choice_timer = metrics::start_timer(&metrics::PERSIST_FORK_CHOICE);
batch.push(self.persist_fork_choice_in_batch());

self.store.hot_db.do_atomically(batch)?;
drop(head_tracker);

Ok(())
}

/// Return a `PersistedBeaconChain` without reference to a `BeaconChain`.
pub fn make_persisted_head(
genesis_block_root: Hash256,
head_tracker_reader: &HeadTrackerReader,
) -> PersistedBeaconChain {
PersistedBeaconChain {
_canonical_head_block_root: DUMMY_CANONICAL_HEAD_BLOCK_ROOT,
genesis_block_root,
ssz_head_tracker: SszHeadTracker::from_map(head_tracker_reader),
}
}

/// Return a database operation for writing the beacon chain head to disk.
pub fn persist_head_in_batch(
&self,
head_tracker_reader: &HeadTrackerReader,
) -> KeyValueStoreOp {
Self::persist_head_in_batch_standalone(self.genesis_block_root, head_tracker_reader)
}

pub fn persist_head_in_batch_standalone(
genesis_block_root: Hash256,
head_tracker_reader: &HeadTrackerReader,
) -> KeyValueStoreOp {
Self::make_persisted_head(genesis_block_root, head_tracker_reader)
.as_kv_store_op(BEACON_CHAIN_DB_KEY)
/// These days the `PersistedBeaconChain` is only used to store the genesis block root, so it
/// should only ever be written once at startup. It used to be written more frequently, but
/// this is no longer necessary.
pub fn persist_head_in_batch_standalone(genesis_block_root: Hash256) -> KeyValueStoreOp {
PersistedBeaconChain { genesis_block_root }.as_kv_store_op(BEACON_CHAIN_DB_KEY)
}

/// Load fork choice from disk, returning `None` if it isn't found.
Expand Down Expand Up @@ -1457,12 +1410,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// Returns `(block_root, block_slot)`.
pub fn heads(&self) -> Vec<(Hash256, Slot)> {
self.head_tracker.heads()
}

/// Only used in tests.
pub fn knows_head(&self, block_hash: &SignedBeaconBlockHash) -> bool {
self.head_tracker.contains_head((*block_hash).into())
self.canonical_head
.fork_choice_read_lock()
.proto_array()
.heads_descended_from_finalization::<T::EthSpec>()
.iter()
.map(|node| (node.root, node.slot))
.collect()
}

/// Returns the `BeaconState` at the given slot.
Expand Down Expand Up @@ -1707,6 +1661,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

pub fn manually_finalize_state(&self, state_root: Hash256, checkpoint: Checkpoint) {
let notif = ManualFinalizationNotification {
state_root: state_root.into(),
checkpoint,
};
self.store_migrator.process_manual_finalization(notif)
}

/// Returns an aggregated `Attestation`, if any, that has a matching `attestation.data`.
///
/// The attestation will be obtained from `self.naive_aggregation_pool`.
Expand Down Expand Up @@ -2974,10 +2936,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Only completed sampling results are received. Blocks are unavailable by default and should
/// be pruned on finalization, on a timeout or by a max count.
pub async fn process_sampling_completed(self: &Arc<Self>, block_root: Hash256) {
// TODO(das): update fork-choice
// TODO(das): update fork-choice, act on sampling result, adjust log level
// NOTE: It is possible that sampling complets before block is imported into fork choice,
// in that case we may need to update availability cache.
// TODO(das): These log levels are too high, reduce once DAS matures
info!(self.log, "Sampling completed"; "block_root" => %block_root);
}

Expand Down Expand Up @@ -4024,9 +3985,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// about it.
let block_time_imported = timestamp_now();

let parent_root = block.parent_root();
let slot = block.slot();

let current_eth1_finalization_data = Eth1FinalizationData {
eth1_data: state.eth1_data().clone(),
eth1_deposit_index: state.eth1_deposit_index(),
Expand All @@ -4047,9 +4005,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
});
}

self.head_tracker
.register_block(block_root, parent_root, slot);

metrics::stop_timer(db_write_timer);

metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES);
Expand Down Expand Up @@ -7312,7 +7267,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
impl<T: BeaconChainTypes> Drop for BeaconChain<T> {
fn drop(&mut self) {
let drop = || -> Result<(), Error> {
self.persist_head_and_fork_choice()?;
self.persist_fork_choice()?;
self.persist_op_pool()?;
self.persist_eth1_cache()
};
Expand Down
7 changes: 6 additions & 1 deletion beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1782,7 +1782,12 @@ pub fn check_block_is_finalized_checkpoint_or_descendant<
fork_choice: &BeaconForkChoice<T>,
block: B,
) -> Result<B, BlockError> {
if fork_choice.is_finalized_checkpoint_or_descendant(block.parent_root()) {
// If we have a split block newer than finalization then we also ban blocks which are not
// descended from that split block.
let split = chain.store.get_split_info();
if fork_choice.is_finalized_checkpoint_or_descendant(block.parent_root())
&& fork_choice.is_descendant(split.block_root, block.parent_root())
{
Ok(block)
} else {
// If fork choice does *not* consider the parent to be a descendant of the finalized block,
Expand Down
22 changes: 2 additions & 20 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::eth1_finalization_cache::Eth1FinalizationCache;
use crate::fork_choice_signal::ForkChoiceSignalTx;
use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary};
use crate::graffiti_calculator::{GraffitiCalculator, GraffitiOrigin};
use crate::head_tracker::HeadTracker;
use crate::kzg_utils::blobs_to_data_column_sidecars;
use crate::light_client_server_cache::LightClientServerCache;
use crate::migrate::{BackgroundMigrator, MigratorConfig};
Expand Down Expand Up @@ -92,7 +91,6 @@ pub struct BeaconChainBuilder<T: BeaconChainTypes> {
slot_clock: Option<T::SlotClock>,
shutdown_sender: Option<Sender<ShutdownReason>>,
light_client_server_tx: Option<Sender<LightClientProducerEvent<T::EthSpec>>>,
head_tracker: Option<HeadTracker>,
validator_pubkey_cache: Option<ValidatorPubkeyCache<T>>,
spec: Arc<ChainSpec>,
chain_config: ChainConfig,
Expand Down Expand Up @@ -136,7 +134,6 @@ where
slot_clock: None,
shutdown_sender: None,
light_client_server_tx: None,
head_tracker: None,
validator_pubkey_cache: None,
spec: Arc::new(E::default_spec()),
chain_config: ChainConfig::default(),
Expand Down Expand Up @@ -325,10 +322,6 @@ where

self.genesis_block_root = Some(chain.genesis_block_root);
self.genesis_state_root = Some(genesis_block.state_root());
self.head_tracker = Some(
HeadTracker::from_ssz_container(&chain.ssz_head_tracker)
.map_err(|e| format!("Failed to decode head tracker for database: {:?}", e))?,
);
self.validator_pubkey_cache = Some(pubkey_cache);
self.fork_choice = Some(fork_choice);

Expand Down Expand Up @@ -746,7 +739,6 @@ where
.genesis_state_root
.ok_or("Cannot build without a genesis state root")?;
let validator_monitor_config = self.validator_monitor_config.unwrap_or_default();
let head_tracker = Arc::new(self.head_tracker.unwrap_or_default());
let beacon_proposer_cache: Arc<Mutex<BeaconProposerCache>> = <_>::default();

let mut validator_monitor = ValidatorMonitor::new(
Expand Down Expand Up @@ -791,8 +783,6 @@ where
&log,
)?;

// Update head tracker.
head_tracker.register_block(block_root, block.parent_root(), block.slot());
(block_root, block, true)
}
Err(e) => return Err(descriptive_db_error("head block", &e)),
Expand Down Expand Up @@ -848,12 +838,7 @@ where
})?;

let migrator_config = self.store_migrator_config.unwrap_or_default();
let store_migrator = BackgroundMigrator::new(
store.clone(),
migrator_config,
genesis_block_root,
log.clone(),
);
let store_migrator = BackgroundMigrator::new(store.clone(), migrator_config, log.clone());

if let Some(slot) = slot_clock.now() {
validator_monitor.process_valid_state(
Expand All @@ -878,11 +863,10 @@ where
//
// This *must* be stored before constructing the `BeaconChain`, so that its `Drop` instance
// doesn't write a `PersistedBeaconChain` without the rest of the batch.
let head_tracker_reader = head_tracker.0.read();
self.pending_io_batch.push(BeaconChain::<
Witness<TSlotClock, TEth1Backend, E, THotStore, TColdStore>,
>::persist_head_in_batch_standalone(
genesis_block_root, &head_tracker_reader
genesis_block_root
));
self.pending_io_batch.push(BeaconChain::<
Witness<TSlotClock, TEth1Backend, E, THotStore, TColdStore>,
Expand All @@ -893,7 +877,6 @@ where
.hot_db
.do_atomically(self.pending_io_batch)
.map_err(|e| format!("Error writing chain & metadata to disk: {:?}", e))?;
drop(head_tracker_reader);

let genesis_validators_root = head_snapshot.beacon_state.genesis_validators_root();
let genesis_time = head_snapshot.beacon_state.genesis_time();
Expand Down Expand Up @@ -974,7 +957,6 @@ where
fork_choice_signal_tx,
fork_choice_signal_rx,
event_handler: self.event_handler,
head_tracker,
shuffling_cache: RwLock::new(ShufflingCache::new(
shuffling_cache_size,
head_shuffling_ids,
Expand Down
13 changes: 10 additions & 3 deletions beacon_node/beacon_chain/src/canonical_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use slot_clock::SlotClock;
use state_processing::AllCaches;
use std::sync::Arc;
use std::time::Duration;
use store::{iter::StateRootsIterator, KeyValueStoreOp, StoreItem};
use store::{iter::StateRootsIterator, KeyValueStore, KeyValueStoreOp, StoreItem};
use task_executor::{JoinHandle, ShutdownReason};
use types::*;

Expand Down Expand Up @@ -848,7 +848,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);

if is_epoch_transition || reorg_distance.is_some() {
self.persist_head_and_fork_choice()?;
self.persist_fork_choice()?;
self.op_pool.prune_attestations(self.epoch()?);
}

Expand Down Expand Up @@ -992,7 +992,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.store_migrator.process_finalization(
new_finalized_state_root.into(),
new_view.finalized_checkpoint,
self.head_tracker.clone(),
)?;

// Prune blobs in the background.
Expand All @@ -1007,6 +1006,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(())
}

/// Persist fork choice to disk, writing immediately.
pub fn persist_fork_choice(&self) -> Result<(), Error> {
let _fork_choice_timer = metrics::start_timer(&metrics::PERSIST_FORK_CHOICE);
let batch = vec![self.persist_fork_choice_in_batch()];
self.store.hot_db.do_atomically(batch)?;
Ok(())
}

/// Return a database operation for writing fork choice to disk.
pub fn persist_fork_choice_in_batch(&self) -> KeyValueStoreOp {
Self::persist_fork_choice_in_batch_standalone(&self.canonical_head.fork_choice_read_lock())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,6 @@ impl<E: EthSpec> PendingComponents<E> {
self.get_cached_blobs().iter().flatten().count()
}

/// Checks if a data column of a given index exists in the cache.
///
/// Returns:
/// - `true` if a data column for the given index exists.
/// - `false` otherwise.
fn data_column_exists(&self, data_column_index: u64) -> bool {
self.get_cached_data_column(data_column_index).is_some()
}

/// Returns the number of data columns that have been received and are stored in the cache.
pub fn num_received_data_columns(&self) -> usize {
self.verified_data_columns.len()
Expand Down Expand Up @@ -182,8 +173,7 @@ impl<E: EthSpec> PendingComponents<E> {
kzg_verified_data_columns: I,
) -> Result<(), AvailabilityCheckError> {
for data_column in kzg_verified_data_columns {
// TODO(das): Add equivalent checks for data columns if necessary
if !self.data_column_exists(data_column.index()) {
if self.get_cached_data_column(data_column.index()).is_none() {
self.verified_data_columns.push(data_column);
}
}
Expand Down
Loading
Loading