Skip to content

Commit

Permalink
Optimize validator duties (#2243)
Browse files Browse the repository at this point in the history
## Issue Addressed

Closes #2052

## Proposed Changes

- Refactor the attester/proposer duties endpoints in the BN
    - Performance improvements
    - Fixes some potential inconsistencies with the dependent root fields.
    - Removes `http_api::beacon_proposer_cache` and just uses the one on the `BeaconChain` instead.
    - Move the code for the proposer/attester duties endpoints into separate files, for readability.
- Refactor the `DutiesService` in the VC
    - Required to reduce the delay on broadcasting new blocks.
    - Gets rid of the `ValidatorDuty` shim struct that came about when we adopted the standard API.
    - Separate block/attestation duty tasks so that they don't block each other when one is slow.
- In the VC, use `PublicKeyBytes` to represent validators instead of `PublicKey`. `PublicKey` is a legit crypto object whilst `PublicKeyBytes` is just a byte-array, it's much faster to clone/hash `PublicKeyBytes` and this change has had a significant impact on runtimes.
    - Unfortunately this has created lots of dust changes.
 - In the BN, store `PublicKeyBytes` in the `beacon_proposer_cache` and allow access to them. The HTTP API always sends `PublicKeyBytes` over the wire and the conversion from `PublicKey` -> `PublickeyBytes` is non-trivial, especially when queries have 100s/1000s of validators (like Pyrmont).
 - Add the `state_processing::state_advance` mod which dedups a lot of the "apply `n` skip slots to the state" code.
    - This also fixes a bug with some functions which were failing to include a state root as per [this comment](https://github.com/sigp/lighthouse/blob/072695284f7eff82c51f79bc921ad942fea7483a/consensus/state_processing/src/state_advance.rs#L69-L74). I couldn't find any instance of this bug that resulted in anything more severe than keying a shuffling cache by the wrong block root.
 - Swap the VC block service to use `mpsc` from `tokio` instead of `futures`. This is consistent with the rest of the code base.
    
~~This PR *reduces* the size of the codebase 🎉~~ It *used* to reduce the size of the code base before I added more comments. 

## Observations on Prymont

- Proposer duties times down from peaks of 450ms to consistent <1ms.
- Current epoch attester duties times down from >1s peaks to a consistent 20-30ms.
- Block production down from +600ms to 100-200ms.

## Additional Info

- ~~Blocked on #2241~~
- ~~Blocked on #2234~~

## TODO

- [x] ~~Refactor this into some smaller PRs?~~ Leaving this as-is for now.
- [x] Address `per_slot_processing` roots.
- [x] Investigate slow next epoch times. Not getting added to cache on block processing?
- [x] Consider [this](https://github.com/sigp/lighthouse/blob/072695284f7eff82c51f79bc921ad942fea7483a/beacon_node/store/src/hot_cold_store.rs#L811-L812) in the scenario of replacing the state roots


Co-authored-by: pawan <[email protected]>
Co-authored-by: Michael Sproul <[email protected]>
  • Loading branch information
3 people committed Mar 17, 2021
1 parent 6a69b20 commit 015ab7d
Show file tree
Hide file tree
Showing 49 changed files with 2,165 additions and 1,797 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion account_manager/src/validator/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ pub fn cli_run<T: EthSpec>(
})?;

slashing_protection
.register_validator(&voting_pubkey)
.register_validator(voting_pubkey.compress())
.map_err(|e| {
format!(
"Error registering validator {}: {:?}",
Expand Down
2 changes: 1 addition & 1 deletion account_manager/src/validator/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ pub fn cli_run(matches: &ArgMatches, validator_dir: PathBuf) -> Result<(), Strin
.public_key()
.ok_or_else(|| format!("Keystore public key is invalid: {}", keystore.pubkey()))?;
slashing_protection
.register_validator(&voting_pubkey)
.register_validator(voting_pubkey.compress())
.map_err(|e| {
format!(
"Error registering validator {}: {:?}",
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/attestation_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,7 @@ where
}

chain
.with_committee_cache(target.root, attestation_epoch, |committee_cache| {
.with_committee_cache(target.root, attestation_epoch, |committee_cache, _| {
let committees_per_slot = committee_cache.committees_per_slot();

Ok(committee_cache
Expand Down
210 changes: 160 additions & 50 deletions beacon_node/beacon_chain/src/beacon_chain.rs

Large diffs are not rendered by default.

16 changes: 15 additions & 1 deletion beacon_node/beacon_chain/src/beacon_proposer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl Default for BeaconProposerCache {
impl BeaconProposerCache {
/// If it is cached, returns the proposer for the block at `slot` where the block has the
/// ancestor block root of `shuffling_decision_block` at `end_slot(slot.epoch() - 1)`.
pub fn get<T: EthSpec>(
pub fn get_slot<T: EthSpec>(
&mut self,
shuffling_decision_block: Hash256,
slot: Slot,
Expand All @@ -84,6 +84,20 @@ impl BeaconProposerCache {
}
}

/// As per `Self::get_slot`, but returns all proposers in all slots for the given `epoch`.
///
/// The nth slot in the returned `SmallVec` will be equal to the nth slot in the given `epoch`.
/// E.g., if `epoch == 1` then `smallvec[0]` refers to slot 32 (assuming `SLOTS_PER_EPOCH ==
/// 32`).
pub fn get_epoch<T: EthSpec>(
&mut self,
shuffling_decision_block: Hash256,
epoch: Epoch,
) -> Option<&SmallVec<[usize; TYPICAL_SLOTS_PER_EPOCH]>> {
let key = (epoch, shuffling_decision_block);
self.cache.get(&key).map(|cache| &cache.proposers)
}

/// Insert the proposers into the cache.
///
/// See `Self::get` for a description of `shuffling_decision_block`.
Expand Down
31 changes: 20 additions & 11 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ use state_processing::{
block_signature_verifier::{BlockSignatureVerifier, Error as BlockSignatureVerifierError},
per_block_processing,
per_epoch_processing::EpochProcessingSummary,
per_slot_processing, BlockProcessingError, BlockSignatureStrategy, SlotProcessingError,
per_slot_processing,
state_advance::partial_state_advance,
BlockProcessingError, BlockSignatureStrategy, SlotProcessingError,
};
use std::borrow::Cow;
use std::convert::TryFrom;
Expand Down Expand Up @@ -351,8 +353,12 @@ pub fn signature_verify_chain_segment<T: BeaconChainTypes>(
.map(|(_, block)| block.slot())
.unwrap_or_else(|| slot);

let state =
cheap_state_advance_to_obtain_committees(&mut parent.pre_state, highest_slot, &chain.spec)?;
let state = cheap_state_advance_to_obtain_committees(
&mut parent.pre_state,
parent.beacon_state_root,
highest_slot,
&chain.spec,
)?;

let pubkey_cache = get_validator_pubkey_cache(chain)?;
let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec);
Expand Down Expand Up @@ -564,7 +570,7 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
let proposer_opt = chain
.beacon_proposer_cache
.lock()
.get::<T::EthSpec>(proposer_shuffling_decision_block, block.slot());
.get_slot::<T::EthSpec>(proposer_shuffling_decision_block, block.slot());
let (expected_proposer, fork, parent, block) = if let Some(proposer) = proposer_opt {
// The proposer index was cached and we can return it without needing to load the
// parent.
Expand All @@ -586,6 +592,7 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
// The state produced is only valid for determining proposer/attester shuffling indices.
let state = cheap_state_advance_to_obtain_committees(
&mut parent.pre_state,
parent.beacon_state_root,
block.slot(),
&chain.spec,
)?;
Expand Down Expand Up @@ -694,6 +701,7 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {

let state = cheap_state_advance_to_obtain_committees(
&mut parent.pre_state,
parent.beacon_state_root,
block.slot(),
&chain.spec,
)?;
Expand Down Expand Up @@ -738,6 +746,7 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {

let state = cheap_state_advance_to_obtain_committees(
&mut parent.pre_state,
parent.beacon_state_root,
block.slot(),
&chain.spec,
)?;
Expand Down Expand Up @@ -1280,6 +1289,7 @@ fn load_parent<T: BeaconChainTypes>(
beacon_block: parent_block,
beacon_block_root: root,
pre_state: parent_state,
beacon_state_root: Some(parent_state_root),
},
block,
))
Expand All @@ -1303,6 +1313,7 @@ fn load_parent<T: BeaconChainTypes>(
/// mutated to be invalid (in fact, it is never changed beyond a simple committee cache build).
fn cheap_state_advance_to_obtain_committees<'a, E: EthSpec>(
state: &'a mut BeaconState<E>,
state_root_opt: Option<Hash256>,
block_slot: Slot,
spec: &ChainSpec,
) -> Result<Cow<'a, BeaconState<E>>, BlockError<E>> {
Expand All @@ -1319,14 +1330,12 @@ fn cheap_state_advance_to_obtain_committees<'a, E: EthSpec>(
})
} else {
let mut state = state.clone_with(CloneConfig::committee_caches_only());
let target_slot = block_epoch.start_slot(E::slots_per_epoch());

while state.current_epoch() < block_epoch {
// Don't calculate state roots since they aren't required for calculating
// shuffling (achieved by providing Hash256::zero()).
per_slot_processing(&mut state, Some(Hash256::zero()), spec).map_err(|e| {
BlockError::BeaconChainError(BeaconChainError::SlotProcessingError(e))
})?;
}
// Advance the state into the same epoch as the block. Use the "partial" method since state
// roots are not important for proposer/attester shuffling.
partial_state_advance(&mut state, state_root_opt, target_slot, spec)
.map_err(|e| BlockError::BeaconChainError(BeaconChainError::from(e)))?;

state.build_committee_cache(RelativeEpoch::Current, spec)?;

Expand Down
10 changes: 10 additions & 0 deletions beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use state_processing::{
ProposerSlashingValidationError,
},
signature_sets::Error as SignatureSetError,
state_advance::Error as StateAdvanceError,
BlockProcessingError, SlotProcessingError,
};
use std::time::Duration;
Expand Down Expand Up @@ -51,6 +52,7 @@ pub enum BeaconChainError {
MissingBeaconBlock(Hash256),
MissingBeaconState(Hash256),
SlotProcessingError(SlotProcessingError),
StateAdvanceError(StateAdvanceError),
UnableToAdvanceState(String),
NoStateForAttestation {
beacon_block_root: Hash256,
Expand Down Expand Up @@ -81,6 +83,7 @@ pub enum BeaconChainError {
BlockSignatureVerifierError(state_processing::block_signature_verifier::Error),
DuplicateValidatorPublicKey,
ValidatorPubkeyCacheFileError(String),
ValidatorIndexUnknown(usize),
OpPoolError(OpPoolError),
NaiveAggregationError(NaiveAggregationError),
ObservedAttestationsError(ObservedAttestationsError),
Expand All @@ -105,6 +108,10 @@ pub enum BeaconChainError {
block_slot: Slot,
state_slot: Slot,
},
InvalidStateForShuffling {
state_epoch: Epoch,
shuffling_epoch: Epoch,
},
}

easy_from_to!(SlotProcessingError, BeaconChainError);
Expand All @@ -122,6 +129,7 @@ easy_from_to!(BlockSignatureVerifierError, BeaconChainError);
easy_from_to!(PruningError, BeaconChainError);
easy_from_to!(ArithError, BeaconChainError);
easy_from_to!(ForkChoiceStoreError, BeaconChainError);
easy_from_to!(StateAdvanceError, BeaconChainError);

#[derive(Debug)]
pub enum BlockProductionError {
Expand All @@ -133,6 +141,7 @@ pub enum BlockProductionError {
BlockProcessingError(BlockProcessingError),
Eth1ChainError(Eth1ChainError),
BeaconStateError(BeaconStateError),
StateAdvanceError(StateAdvanceError),
OpPoolError(OpPoolError),
/// The `BeaconChain` was explicitly configured _without_ a connection to eth1, therefore it
/// cannot produce blocks.
Expand All @@ -147,3 +156,4 @@ easy_from_to!(BlockProcessingError, BlockProductionError);
easy_from_to!(BeaconStateError, BlockProductionError);
easy_from_to!(SlotProcessingError, BlockProductionError);
easy_from_to!(Eth1ChainError, BlockProductionError);
easy_from_to!(StateAdvanceError, BlockProductionError);
9 changes: 9 additions & 0 deletions beacon_node/beacon_chain/src/snapshot_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@ pub struct PreProcessingSnapshot<T: EthSpec> {
/// advanced forward one slot using `per_slot_processing`. This state is "primed and ready" for
/// the application of another block.
pub pre_state: BeaconState<T>,
/// This value is only set to `Some` if the `pre_state` was *not* advanced forward.
pub beacon_state_root: Option<Hash256>,
pub beacon_block: SignedBeaconBlock<T>,
pub beacon_block_root: Hash256,
}

impl<T: EthSpec> From<BeaconSnapshot<T>> for PreProcessingSnapshot<T> {
fn from(snapshot: BeaconSnapshot<T>) -> Self {
let beacon_state_root = Some(snapshot.beacon_state_root());
Self {
pre_state: snapshot.beacon_state,
beacon_state_root,
beacon_block: snapshot.beacon_block,
beacon_block_root: snapshot.beacon_block_root,
}
Expand All @@ -47,10 +51,15 @@ impl<T: EthSpec> CacheItem<T> {
}

pub fn into_pre_state(self) -> PreProcessingSnapshot<T> {
// Do not include the beacon state root if the state has been advanced.
let beacon_state_root =
Some(self.beacon_block.state_root()).filter(|_| self.pre_state.is_none());

PreProcessingSnapshot {
beacon_block: self.beacon_block,
beacon_block_root: self.beacon_block_root,
pre_state: self.pre_state.unwrap_or(self.beacon_state),
beacon_state_root,
}
}
}
Expand Down
54 changes: 41 additions & 13 deletions beacon_node/beacon_chain/src/state_advance_timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
//! 2. There's a possibility that the head block is never built upon, causing wasted CPU cycles.
use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS;
use crate::{
beacon_chain::BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT, snapshot_cache::StateAdvance, BeaconChain,
BeaconChainError, BeaconChainTypes,
beacon_chain::{ATTESTATION_CACHE_LOCK_TIMEOUT, BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT},
snapshot_cache::StateAdvance,
BeaconChain, BeaconChainError, BeaconChainTypes,
};
use slog::{debug, error, warn, Logger};
use slot_clock::SlotClock;
Expand All @@ -27,7 +28,7 @@ use std::sync::{
};
use task_executor::TaskExecutor;
use tokio::time::sleep;
use types::{EthSpec, Hash256, Slot};
use types::{AttestationShufflingId, EthSpec, Hash256, RelativeEpoch, Slot};

/// If the head slot is more than `MAX_ADVANCE_DISTANCE` from the current slot, then don't perform
/// the state advancement.
Expand Down Expand Up @@ -252,16 +253,22 @@ fn advance_head<T: BeaconChainTypes>(
"current_slot" => current_slot,
);

// If the advanced state is in a later epoch than where it started, pre-emptively add the
// proposer shuffling for the new epoch into the cache.
if state.current_epoch() > initial_epoch {
debug!(
log,
"Priming proposer cache";
"head_root" => ?head_root,
"state_epoch" => state.current_epoch(),
"current_epoch" => current_slot.epoch(T::EthSpec::slots_per_epoch()),
);
// Build the current epoch cache, to prepare to compute proposer duties.
state
.build_committee_cache(RelativeEpoch::Current, &beacon_chain.spec)
.map_err(BeaconChainError::from)?;
// Build the next epoch cache, to prepare to compute attester duties.
state
.build_committee_cache(RelativeEpoch::Next, &beacon_chain.spec)
.map_err(BeaconChainError::from)?;

// If the `pre_state` is in a later epoch than `state`, pre-emptively add the proposer shuffling
// for the state's current epoch and the committee cache for the state's next epoch.
if initial_epoch < state.current_epoch() {
// Update the proposer cache.
//
// We supply the `head_root` as the decision block since the prior `if` statement guarantees
// the head root is the latest block from the prior epoch.
beacon_chain
.beacon_proposer_cache
.lock()
Expand All @@ -274,6 +281,27 @@ fn advance_head<T: BeaconChainTypes>(
state.fork,
)
.map_err(BeaconChainError::from)?;

// Update the attester cache.
let shuffling_id = AttestationShufflingId::new(head_root, &state, RelativeEpoch::Next)
.map_err(BeaconChainError::from)?;
let committee_cache = state
.committee_cache(RelativeEpoch::Next)
.map_err(BeaconChainError::from)?;
beacon_chain
.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or(BeaconChainError::AttestationCacheLockTimeout)?
.insert(shuffling_id.clone(), committee_cache);

debug!(
log,
"Primed proposer and attester caches";
"head_root" => ?head_root,
"next_epoch_shuffling_root" => ?shuffling_id.shuffling_decision_block,
"state_epoch" => state.current_epoch(),
"current_epoch" => current_slot.epoch(T::EthSpec::slots_per_epoch()),
);
}

let final_slot = state.slot;
Expand Down
Loading

0 comments on commit 015ab7d

Please sign in to comment.