Skip to content

Commit

Permalink
Run fork choice before block proposal (#3168)
Browse files Browse the repository at this point in the history
## Issue Addressed

Upcoming spec change ethereum/consensus-specs#2878

## Proposed Changes

1. Run fork choice at the start of every slot, and wait for this run to complete before proposing a block.
2. As an optimisation, also run fork choice 3/4 of the way through the slot (at 9s), _dequeueing attestations for the next slot_.
3. Remove the fork choice run from the state advance timer that occurred before advancing the state.

## Additional Info

### Block Proposal Accuracy

This change makes us more likely to propose on top of the correct head in the presence of re-orgs with proposer boost in play. The main scenario that this change is designed to address is described in the linked spec issue.

### Attestation Accuracy

This change _also_ makes us more likely to attest to the correct head. Currently in the case of a skipped slot at `slot` we only run fork choice 9s into `slot - 1`. This means the attestations from `slot - 1` aren't taken into consideration, and any boost applied to the block from `slot - 1` is not removed (it should be). In the language of the linked spec issue, this means we are liable to attest to C, even when the majority voting weight has already caused a re-org to B.

### Why remove the call before the state advance?

If we've run fork choice at the start of the slot then it has already dequeued all the attestations from the previous slot, which are the only ones eligible to influence the head in the current slot. Running fork choice again is unnecessary (unless we run it for the next slot and try to pre-empt a re-org, but I don't currently think this is a great idea).

### Performance

Based on Prater testing this adds about 5-25ms of runtime to block proposal times, which are 500-1000ms on average (and spike to 5s+ sometimes due to state handling issues 😢 ). I believe this is a small enough penalty to enable it by default, with the option to disable it via the new flag `--fork-choice-before-proposal-timeout 0`. Upcoming work on block packing and state representation will also reduce block production times in general, while removing the spikes.

### Implementation

Fork choice gets invoked at the start of the slot via the `per_slot_task` function called from the slot timer. It then uses a condition variable to signal to block production that fork choice has been updated. This is a bit funky, but it seems to work. One downside of the timer-based approach is that it doesn't happen automatically in most of the tests. The test added by this PR has to trigger the run manually.
  • Loading branch information
michaelsproul committed May 20, 2022
1 parent 54b58fd commit 6aaeb78
Show file tree
Hide file tree
Showing 15 changed files with 458 additions and 47 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

110 changes: 100 additions & 10 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::errors::{BeaconChainError as Error, BlockProductionError};
use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend};
use crate::events::ServerSentEventHandler;
use crate::execution_payload::get_execution_payload;
use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult};
use crate::head_tracker::HeadTracker;
use crate::historical_blocks::HistoricalBlockError;
use crate::migrate::BackgroundMigrator;
Expand Down Expand Up @@ -339,6 +340,10 @@ pub struct BeaconChain<T: BeaconChainTypes> {
/// A state-machine that is updated with information from the network and chooses a canonical
/// head block.
pub fork_choice: RwLock<BeaconForkChoice<T>>,
/// Transmitter used to indicate that slot-start fork choice has completed running.
pub fork_choice_signal_tx: Option<ForkChoiceSignalTx>,
/// Receiver used by block production to wait on slot-start fork choice.
pub fork_choice_signal_rx: Option<ForkChoiceSignalRx>,
/// A handler for events generated by the beacon chain. This is only initialized when the
/// HTTP server is enabled.
pub event_handler: Option<ServerSentEventHandler<T::EthSpec>>,
Expand Down Expand Up @@ -2952,12 +2957,64 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(block_root)
}

/// If configured, wait for the fork choice run at the start of the slot to complete.
fn wait_for_fork_choice_before_block_production(
self: &Arc<Self>,
slot: Slot,
) -> Result<(), BlockProductionError> {
if let Some(rx) = &self.fork_choice_signal_rx {
let current_slot = self
.slot()
.map_err(|_| BlockProductionError::UnableToReadSlot)?;

let timeout = Duration::from_millis(self.config.fork_choice_before_proposal_timeout_ms);

if slot == current_slot || slot == current_slot + 1 {
match rx.wait_for_fork_choice(slot, timeout) {
ForkChoiceWaitResult::Success(fc_slot) => {
debug!(
self.log,
"Fork choice successfully updated before block production";
"slot" => slot,
"fork_choice_slot" => fc_slot,
);
}
ForkChoiceWaitResult::Behind(fc_slot) => {
warn!(
self.log,
"Fork choice notifier out of sync with block production";
"fork_choice_slot" => fc_slot,
"slot" => slot,
"message" => "this block may be orphaned",
);
}
ForkChoiceWaitResult::TimeOut => {
warn!(
self.log,
"Timed out waiting for fork choice before proposal";
"message" => "this block may be orphaned",
);
}
}
} else {
error!(
self.log,
"Producing block at incorrect slot";
"block_slot" => slot,
"current_slot" => current_slot,
"message" => "check clock sync, this block may be orphaned",
);
}
}
Ok(())
}

/// Produce a new block at the given `slot`.
///
/// The produced block will not be inherently valid, it must be signed by a block producer.
/// Block signing is out of the scope of this function and should be done by a separate program.
pub fn produce_block<Payload: ExecPayload<T::EthSpec>>(
&self,
self: &Arc<Self>,
randao_reveal: Signature,
slot: Slot,
validator_graffiti: Option<Graffiti>,
Expand All @@ -2972,7 +3029,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Same as `produce_block` but allowing for configuration of RANDAO-verification.
pub fn produce_block_with_verification<Payload: ExecPayload<T::EthSpec>>(
&self,
self: &Arc<Self>,
randao_reveal: Signature,
slot: Slot,
validator_graffiti: Option<Graffiti>,
Expand All @@ -2981,6 +3038,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
metrics::inc_counter(&metrics::BLOCK_PRODUCTION_REQUESTS);
let _complete_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_TIMES);

let fork_choice_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_FORK_CHOICE_TIMES);
self.wait_for_fork_choice_before_block_production(slot)?;
drop(fork_choice_timer);

// Producing a block requires the tree hash cache, so clone a full state corresponding to
// the head from the snapshot cache. Unfortunately we can't move the snapshot out of the
// cache (which would be fast), because we need to re-process the block after it has been
Expand Down Expand Up @@ -3362,10 +3423,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Execute the fork choice algorithm and enthrone the result as the canonical head.
pub fn fork_choice(self: &Arc<Self>) -> Result<(), Error> {
self.fork_choice_at_slot(self.slot()?)
}

/// Execute fork choice at `slot`, processing queued attestations from `slot - 1` and earlier.
///
/// The `slot` is not verified in any way, callers should ensure it corresponds to at most
/// one slot ahead of the current wall-clock slot.
pub fn fork_choice_at_slot(self: &Arc<Self>, slot: Slot) -> Result<(), Error> {
metrics::inc_counter(&metrics::FORK_CHOICE_REQUESTS);
let _timer = metrics::start_timer(&metrics::FORK_CHOICE_TIMES);

let result = self.fork_choice_internal();
let result = self.fork_choice_internal(slot);

if result.is_err() {
metrics::inc_counter(&metrics::FORK_CHOICE_ERRORS);
Expand All @@ -3374,13 +3443,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
result
}

fn fork_choice_internal(self: &Arc<Self>) -> Result<(), Error> {
fn fork_choice_internal(self: &Arc<Self>, slot: Slot) -> Result<(), Error> {
// Atomically obtain the head block root and the finalized block.
let (beacon_block_root, finalized_block) = {
let mut fork_choice = self.fork_choice.write();

// Determine the root of the block that is the head of the chain.
let beacon_block_root = fork_choice.get_head(self.slot()?, &self.spec)?;
let beacon_block_root = fork_choice.get_head(slot, &self.spec)?;

(beacon_block_root, fork_choice.get_finalized_block()?)
};
Expand Down Expand Up @@ -3752,6 +3821,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

// Update the execution layer.
// Always use the wall-clock slot to update the execution engine rather than the `slot`
// passed in.
if let Err(e) = self.update_execution_engine_forkchoice_blocking(self.slot()?) {
crit!(
self.log,
Expand Down Expand Up @@ -4005,8 +4076,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"prepare_slot" => prepare_slot
);

// Use the blocking method here so that we don't form a queue of these functions when
// routinely calling them.
self.update_execution_engine_forkchoice_async(current_slot)
.await?;
}
Expand Down Expand Up @@ -4336,11 +4405,32 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

/// Called by the timer on every slot.
///
/// Performs slot-based pruning.
pub fn per_slot_task(&self) {
pub fn per_slot_task(self: &Arc<Self>) {
trace!(self.log, "Running beacon chain per slot tasks");
if let Some(slot) = self.slot_clock.now() {
// Run fork choice and signal to any waiting task that it has completed.
if let Err(e) = self.fork_choice() {
error!(
self.log,
"Fork choice error at slot start";
"error" => ?e,
"slot" => slot,
);
}

// Send the notification regardless of fork choice success, this is a "best effort"
// notification and we don't want block production to hit the timeout in case of error.
if let Some(tx) = &self.fork_choice_signal_tx {
if let Err(e) = tx.notify_fork_choice_complete(slot) {
warn!(
self.log,
"Error signalling fork choice waiter";
"error" => ?e,
"slot" => slot,
);
}
}

self.naive_aggregation_pool.write().prune(slot);
self.block_times_cache.write().prune(slot);
}
Expand Down
13 changes: 13 additions & 0 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::beacon_chain::{BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY};
use crate::eth1_chain::{CachingEth1Backend, SszEth1};
use crate::fork_choice_signal::ForkChoiceSignalTx;
use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary};
use crate::head_tracker::HeadTracker;
use crate::migrate::{BackgroundMigrator, MigratorConfig};
Expand Down Expand Up @@ -694,6 +695,16 @@ where
);
}

// If enabled, set up the fork choice signaller.
let (fork_choice_signal_tx, fork_choice_signal_rx) =
if self.chain_config.fork_choice_before_proposal_timeout_ms != 0 {
let tx = ForkChoiceSignalTx::new();
let rx = tx.get_receiver();
(Some(tx), Some(rx))
} else {
(None, None)
};

// Store the `PersistedBeaconChain` in the database atomically with the metadata so that on
// restart we can correctly detect the presence of an initialized database.
//
Expand Down Expand Up @@ -752,6 +763,8 @@ where
genesis_block_root,
genesis_state_root,
fork_choice: RwLock::new(fork_choice),
fork_choice_signal_tx,
fork_choice_signal_rx,
event_handler: self.event_handler,
head_tracker,
snapshot_cache: TimeoutRwLock::new(SnapshotCache::new(
Expand Down
7 changes: 7 additions & 0 deletions beacon_node/beacon_chain/src/chain_config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use serde_derive::{Deserialize, Serialize};
use types::Checkpoint;

pub const DEFAULT_FORK_CHOICE_BEFORE_PROPOSAL_TIMEOUT: u64 = 250;

#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
pub struct ChainConfig {
/// Maximum number of slots to skip when importing a consensus message (e.g., block,
Expand All @@ -18,6 +20,10 @@ pub struct ChainConfig {
pub enable_lock_timeouts: bool,
/// The max size of a message that can be sent over the network.
pub max_network_size: usize,
/// Number of milliseconds to wait for fork choice before proposing a block.
///
/// If set to 0 then block proposal will not wait for fork choice at all.
pub fork_choice_before_proposal_timeout_ms: u64,
}

impl Default for ChainConfig {
Expand All @@ -28,6 +34,7 @@ impl Default for ChainConfig {
reconstruct_historic_states: false,
enable_lock_timeouts: true,
max_network_size: 10 * 1_048_576, // 10M
fork_choice_before_proposal_timeout_ms: DEFAULT_FORK_CHOICE_BEFORE_PROPOSAL_TIMEOUT,
}
}
}
5 changes: 5 additions & 0 deletions beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ pub enum BeaconChainError {
},
RuntimeShutdown,
ProcessInvalidExecutionPayload(JoinError),
ForkChoiceSignalOutOfOrder {
current: Slot,
latest: Slot,
},
}

easy_from_to!(SlotProcessingError, BeaconChainError);
Expand Down Expand Up @@ -234,6 +238,7 @@ pub enum BlockProductionError {
FailedToReadFinalizedBlock(store::Error),
MissingFinalizedBlock(Hash256),
BlockTooLarge(usize),
ForkChoiceError(BeaconChainError),
}

easy_from_to!(BlockProcessingError, BlockProductionError);
Expand Down
97 changes: 97 additions & 0 deletions beacon_node/beacon_chain/src/fork_choice_signal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
//! Concurrency helpers for synchronising block proposal with fork choice.
//!
//! The transmitter provides a way for a thread runnning fork choice on a schedule to signal
//! to the receiver that fork choice has been updated for a given slot.
use crate::BeaconChainError;
use parking_lot::{Condvar, Mutex};
use std::sync::Arc;
use std::time::Duration;
use types::Slot;

/// Sender, for use by the per-slot task timer.
pub struct ForkChoiceSignalTx {
pair: Arc<(Mutex<Slot>, Condvar)>,
}

/// Receiver, for use by the beacon chain waiting on fork choice to complete.
pub struct ForkChoiceSignalRx {
pair: Arc<(Mutex<Slot>, Condvar)>,
}

pub enum ForkChoiceWaitResult {
/// Successfully reached a slot greater than or equal to the awaited slot.
Success(Slot),
/// Fork choice was updated to a lower slot, indicative of lag or processing delays.
Behind(Slot),
/// Timed out waiting for the fork choice update from the sender.
TimeOut,
}

impl ForkChoiceSignalTx {
pub fn new() -> Self {
let pair = Arc::new((Mutex::new(Slot::new(0)), Condvar::new()));
Self { pair }
}

pub fn get_receiver(&self) -> ForkChoiceSignalRx {
ForkChoiceSignalRx {
pair: self.pair.clone(),
}
}

/// Signal to the receiver that fork choice has been updated to `slot`.
///
/// Return an error if the provided `slot` is strictly less than any previously provided slot.
pub fn notify_fork_choice_complete(&self, slot: Slot) -> Result<(), BeaconChainError> {
let &(ref lock, ref condvar) = &*self.pair;

let mut current_slot = lock.lock();

if slot < *current_slot {
return Err(BeaconChainError::ForkChoiceSignalOutOfOrder {
current: *current_slot,
latest: slot,
});
} else {
*current_slot = slot;
}

// We use `notify_all` because there may be multiple block proposals waiting simultaneously.
// Usually there'll be 0-1.
condvar.notify_all();

Ok(())
}
}

impl Default for ForkChoiceSignalTx {
fn default() -> Self {
Self::new()
}
}

impl ForkChoiceSignalRx {
pub fn wait_for_fork_choice(&self, slot: Slot, timeout: Duration) -> ForkChoiceWaitResult {
let &(ref lock, ref condvar) = &*self.pair;

let mut current_slot = lock.lock();

// Wait for `current_slot >= slot`.
//
// Do not loop and wait, if we receive an update for the wrong slot then something is
// quite out of whack and we shouldn't waste more time waiting.
if *current_slot < slot {
let timeout_result = condvar.wait_for(&mut current_slot, timeout);

if timeout_result.timed_out() {
return ForkChoiceWaitResult::TimeOut;
}
}

if *current_slot >= slot {
ForkChoiceWaitResult::Success(*current_slot)
} else {
ForkChoiceWaitResult::Behind(*current_slot)
}
}
}
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mod errors;
pub mod eth1_chain;
pub mod events;
mod execution_payload;
pub mod fork_choice_signal;
pub mod fork_revert;
mod head_tracker;
pub mod historical_blocks;
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/beacon_chain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ lazy_static! {
);
pub static ref BLOCK_PRODUCTION_TIMES: Result<Histogram> =
try_create_histogram("beacon_block_production_seconds", "Full runtime of block production");
pub static ref BLOCK_PRODUCTION_FORK_CHOICE_TIMES: Result<Histogram> = try_create_histogram(
"beacon_block_production_fork_choice_seconds",
"Time taken to run fork choice before block production"
);
pub static ref BLOCK_PRODUCTION_STATE_LOAD_TIMES: Result<Histogram> = try_create_histogram(
"beacon_block_production_state_load_seconds",
"Time taken to load the base state for block production"
Expand Down
Loading

0 comments on commit 6aaeb78

Please sign in to comment.