diff --git a/.github/workflows/cancel-previous-runs.yml b/.github/workflows/cancel-previous-runs.yml new file mode 100644 index 00000000000..b7a3cd0b469 --- /dev/null +++ b/.github/workflows/cancel-previous-runs.yml @@ -0,0 +1,14 @@ +name: cancel previous runs +on: [push] +jobs: + cancel: + name: 'Cancel Previous Runs' + runs-on: ubuntu-latest + timeout-minutes: 3 + steps: + # https://github.com/styfle/cancel-workflow-action/releases + - uses: styfle/cancel-workflow-action@514c783 # 0.7.0 + with: + # https://api.github.com/repos/sigp/lighthouse/actions/workflows + workflow_id: 697364,2434944,4462424,308241,2883401,316 + access_token: ${{ github.token }} diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 2f547701af0..99daeeca9d5 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -28,6 +28,8 @@ jobs: DOCKER_CLI_EXPERIMENTAL: enabled steps: - uses: actions/checkout@v2 + - name: Update Rust + run: rustup update stable - name: Dockerhub login run: | echo "${DOCKER_PASSWORD}" | docker login --username ${DOCKER_USERNAME} --password-stdin @@ -58,6 +60,8 @@ jobs: needs: [extract-branch-name] steps: - uses: actions/checkout@v2 + - name: Update Rust + run: rustup update stable - name: Dockerhub login run: | echo "${DOCKER_PASSWORD}" | docker login --username ${DOCKER_USERNAME} --password-stdin diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index e2d366d9045..cbb2a105370 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -227,6 +227,7 @@ jobs: - name: Create Release Draft env: + GITHUB_USER: sigp GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # The formatting here is borrowed from OpenEthereum: https://github.com/openethereum/openethereum/blob/main/.github/workflows/build.yml diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index 864d8c23919..bcb7eec1219 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -24,6 +24,8 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v1 + - name: Get latest version of stable Rust + run: rustup update stable - name: Get latest version of stable Rust run: rustup update stable - name: Check formatting with cargo fmt @@ -78,6 +80,8 @@ jobs: needs: cargo-fmt steps: - uses: actions/checkout@v1 + - name: Get latest version of stable Rust + run: rustup update stable - name: Build the root Dockerfile run: docker build . eth1-simulator-ubuntu: @@ -86,6 +90,8 @@ jobs: needs: cargo-fmt steps: - uses: actions/checkout@v1 + - name: Get latest version of stable Rust + run: rustup update stable - name: Install ganache-cli run: sudo npm install -g ganache-cli - name: Run the beacon chain sim that starts from an eth1 contract @@ -96,6 +102,8 @@ jobs: needs: cargo-fmt steps: - uses: actions/checkout@v1 + - name: Get latest version of stable Rust + run: rustup update stable - name: Install ganache-cli run: sudo npm install -g ganache-cli - name: Run the beacon chain sim without an eth1 connection @@ -106,6 +114,8 @@ jobs: needs: cargo-fmt steps: - uses: actions/checkout@v1 + - name: Get latest version of stable Rust + run: rustup update stable - name: Install ganache-cli run: sudo npm install -g ganache-cli - name: Run the syncing simulator @@ -116,6 +126,8 @@ jobs: needs: cargo-fmt steps: - uses: actions/checkout@v1 + - name: Get latest version of stable Rust + run: rustup update stable - name: Typecheck benchmark code without running it run: make check-benches check-consensus: @@ -124,6 +136,8 @@ jobs: needs: cargo-fmt steps: - uses: actions/checkout@v1 + - name: Get latest version of stable Rust + run: rustup update stable - name: Typecheck consensus code in strict mode run: make check-consensus clippy: @@ -132,6 +146,8 @@ jobs: needs: cargo-fmt steps: - uses: actions/checkout@v1 + - name: Get latest version of stable Rust + run: rustup update stable - name: Lint code for quality and style with Clippy run: make lint - name: Certify Cargo.lock freshness @@ -142,6 +158,8 @@ jobs: needs: cargo-fmt steps: - uses: actions/checkout@v1 + - name: Get latest version of stable Rust + run: rustup update stable - name: Validate state_processing feature arbitrary-fuzz run: make arbitrary-fuzz cargo-audit: @@ -150,6 +168,8 @@ jobs: needs: cargo-fmt steps: - uses: actions/checkout@v1 + - name: Get latest version of stable Rust + run: rustup update stable - name: Run cargo audit to identify known security vulnerabilities reported to the RustSec Advisory Database run: make audit cargo-udeps: diff --git a/Cargo.lock b/Cargo.lock index ed028b99716..7659f5670a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -601,7 +601,7 @@ dependencies = [ "parking_lot", "proto_array", "rand 0.7.3", - "rand_core 0.5.1", + "rand_core 0.6.2", "rayon", "regex", "safe_arith", @@ -4045,12 +4045,12 @@ dependencies = [ [[package]] name = "nb-connect" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8123a81538e457d44b933a02faf885d3fe8408806b23fa700e8f01c6c3a98998" +checksum = "670361df1bc2399ee1ff50406a0d422587dd3bb0da596e1978fe8e05dabddf4f" dependencies = [ "libc", - "winapi 0.3.9", + "socket2", ] [[package]] @@ -4888,7 +4888,7 @@ checksum = "18519b42a40024d661e1714153e9ad0c3de27cd495760ceb09710920f1098b1e" dependencies = [ "libc", "rand_chacha 0.3.0", - "rand_core 0.6.1", + "rand_core 0.6.2", "rand_hc 0.3.0", ] @@ -4909,7 +4909,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e12735cf05c9e10bf21534da50a147b924d555dc7a547c42e6bb2d5b6017ae0d" dependencies = [ "ppv-lite86", - "rand_core 0.6.1", + "rand_core 0.6.2", ] [[package]] @@ -4923,9 +4923,9 @@ dependencies = [ [[package]] name = "rand_core" -version = "0.6.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c026d7df8b298d90ccbbc5190bd04d85e159eaf5576caeacf8741da93ccbd2e5" +checksum = "34cf66eb183df1c5876e2dcf6b13d57340741e8dc255b48e40a26de954d06ae7" dependencies = [ "getrandom 0.2.2", ] @@ -4945,7 +4945,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3190ef7066a446f2e7f42e239d161e905420ccab01eb967c9eb27d21b2322a73" dependencies = [ - "rand_core 0.6.1", + "rand_core 0.6.2", ] [[package]] diff --git a/Dockerfile b/Dockerfile index 2d7bca06a50..bc4430779df 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM rust:1.47.0 AS builder +FROM rust:1.50.0 AS builder RUN apt-get update && apt-get install -y cmake COPY . lighthouse ARG PORTABLE diff --git a/account_manager/src/common.rs b/account_manager/src/common.rs index 2b9c93fb1dc..0d76380b02d 100644 --- a/account_manager/src/common.rs +++ b/account_manager/src/common.rs @@ -29,7 +29,7 @@ pub fn read_mnemonic_from_cli( }) })?, None => loop { - eprintln!(""); + eprintln!(); eprintln!("{}", MNEMONIC_PROMPT); let mnemonic = read_input_from_user(stdin_inputs)?; @@ -37,7 +37,7 @@ pub fn read_mnemonic_from_cli( match Mnemonic::from_phrase(mnemonic.as_str(), Language::English) { Ok(mnemonic_m) => { eprintln!("Valid mnemonic provided."); - eprintln!(""); + eprintln!(); sleep(Duration::from_secs(1)); break mnemonic_m; } diff --git a/account_manager/src/validator/create.rs b/account_manager/src/validator/create.rs index 85e332458c7..8e96a004030 100644 --- a/account_manager/src/validator/create.rs +++ b/account_manager/src/validator/create.rs @@ -272,7 +272,7 @@ pub fn read_wallet_password_from_cli( .map_err(|e| format!("Unable to read {:?}: {:?}", path, e)) .map(|bytes| strip_off_newlines(bytes).into()), None => { - eprintln!(""); + eprintln!(); eprintln!("{}", WALLET_PASSWORD_PROMPT); let password = PlainText::from(read_password_from_user(stdin_inputs)?.as_ref().to_vec()); diff --git a/account_manager/src/validator/exit.rs b/account_manager/src/validator/exit.rs index 018483896ff..9d44f5897d6 100644 --- a/account_manager/src/validator/exit.rs +++ b/account_manager/src/validator/exit.rs @@ -280,7 +280,7 @@ fn load_voting_keypair( .map_err(|e| format!("Error while decrypting keypair: {:?}", e)) } else { // Prompt password from user. - eprintln!(""); + eprintln!(); eprintln!( "{} for validator in {:?}: ", PASSWORD_PROMPT, voting_keystore_path @@ -289,7 +289,7 @@ fn load_voting_keypair( match keystore.decrypt_keypair(password.as_ref()) { Ok(keypair) => { eprintln!("Password is correct."); - eprintln!(""); + eprintln!(); std::thread::sleep(std::time::Duration::from_secs(1)); // Provides nicer UX. Ok(keypair) } diff --git a/account_manager/src/validator/import.rs b/account_manager/src/validator/import.rs index 93d273b91f2..aa09211e456 100644 --- a/account_manager/src/validator/import.rs +++ b/account_manager/src/validator/import.rs @@ -135,12 +135,12 @@ pub fn cli_run(matches: &ArgMatches, validator_dir: PathBuf) -> Result<(), Strin let keystore = Keystore::from_json_file(src_keystore) .map_err(|e| format!("Unable to read keystore JSON {:?}: {:?}", src_keystore, e))?; - eprintln!(""); + eprintln!(); eprintln!("Keystore found at {:?}:", src_keystore); - eprintln!(""); + eprintln!(); eprintln!(" - Public key: 0x{}", keystore.pubkey()); eprintln!(" - UUID: {}", keystore.uuid()); - eprintln!(""); + eprintln!(); eprintln!( "If you enter the password it will be stored as plain-text in {} so that it is not \ required each time the validator client starts.", @@ -152,7 +152,7 @@ pub fn cli_run(matches: &ArgMatches, validator_dir: PathBuf) -> Result<(), Strin eprintln!("Reuse previous password."); break Some(password); } - eprintln!(""); + eprintln!(); eprintln!("{}", PASSWORD_PROMPT); let password = read_password_from_user(stdin_inputs)?; @@ -166,7 +166,7 @@ pub fn cli_run(matches: &ArgMatches, validator_dir: PathBuf) -> Result<(), Strin match keystore.decrypt_keypair(password.as_ref()) { Ok(_) => { eprintln!("Password is correct."); - eprintln!(""); + eprintln!(); sleep(Duration::from_secs(1)); // Provides nicer UX. if reuse_password { previous_password = Some(password.clone()); @@ -234,13 +234,13 @@ pub fn cli_run(matches: &ArgMatches, validator_dir: PathBuf) -> Result<(), Strin eprintln!("Successfully updated {}.", CONFIG_FILENAME); } - eprintln!(""); + eprintln!(); eprintln!( "Successfully imported {} validators ({} skipped).", num_imported_keystores, keystore_paths.len() - num_imported_keystores ); - eprintln!(""); + eprintln!(); eprintln!("WARNING: {}", KEYSTORE_REUSE_WARNING); Ok(()) diff --git a/account_manager/src/validator/recover.rs b/account_manager/src/validator/recover.rs index 2a20961c1b6..27d8aa71d4f 100644 --- a/account_manager/src/validator/recover.rs +++ b/account_manager/src/validator/recover.rs @@ -93,9 +93,9 @@ pub fn cli_run(matches: &ArgMatches, validator_dir: PathBuf) -> Result<(), Strin ensure_dir_exists(&validator_dir)?; ensure_dir_exists(&secrets_dir)?; - eprintln!(""); + eprintln!(); eprintln!("WARNING: KEY RECOVERY CAN LEAD TO DUPLICATING VALIDATORS KEYS, WHICH CAN LEAD TO SLASHING."); - eprintln!(""); + eprintln!(); let mnemonic = read_mnemonic_from_cli(mnemonic_path, stdin_inputs)?; diff --git a/account_manager/src/wallet/create.rs b/account_manager/src/wallet/create.rs index 1332dad44b3..3965443cdf7 100644 --- a/account_manager/src/wallet/create.rs +++ b/account_manager/src/wallet/create.rs @@ -215,7 +215,7 @@ pub fn read_new_wallet_password_from_cli( Ok(password) } None => loop { - eprintln!(""); + eprintln!(); eprintln!("{}", NEW_WALLET_PASSWORD_PROMPT); let password = PlainText::from(read_password_from_user(stdin_inputs)?.as_ref().to_vec()); diff --git a/account_manager/src/wallet/recover.rs b/account_manager/src/wallet/recover.rs index 2240323c26a..0ac30fe27ab 100644 --- a/account_manager/src/wallet/recover.rs +++ b/account_manager/src/wallet/recover.rs @@ -63,9 +63,9 @@ pub fn cli_run(matches: &ArgMatches, wallet_base_dir: PathBuf) -> Result<(), Str let mnemonic_path: Option = clap_utils::parse_optional(matches, MNEMONIC_FLAG)?; let stdin_inputs = matches.is_present(STDIN_INPUTS_FLAG); - eprintln!(""); + eprintln!(); eprintln!("WARNING: KEY RECOVERY CAN LEAD TO DUPLICATING VALIDATORS KEYS, WHICH CAN LEAD TO SLASHING."); - eprintln!(""); + eprintln!(); let mnemonic = read_mnemonic_from_cli(mnemonic_path, stdin_inputs)?; diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 3bc77c245db..930820b7f66 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -46,7 +46,7 @@ futures = "0.3.7" genesis = { path = "../genesis" } integer-sqrt = "0.1.5" rand = "0.7.3" -rand_core = "0.5.1" +rand_core = "0.6.2" proto_array = { path = "../../consensus/proto_array" } lru = "0.6.0" tempfile = "3.1.0" diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 59fed51b806..49454711533 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2,6 +2,7 @@ use crate::attestation_verification::{ Error as AttestationError, SignatureVerifiedAttestation, VerifiedAggregatedAttestation, VerifiedUnaggregatedAttestation, }; +use crate::beacon_proposer_cache::BeaconProposerCache; use crate::block_verification::{ check_block_is_finalized_descendant, check_block_relevancy, get_block_root, signature_verify_chain_segment, BlockError, FullyVerifiedBlock, GossipVerifiedBlock, @@ -24,7 +25,7 @@ use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; use crate::snapshot_cache::SnapshotCache; use crate::timeout_rw_lock::TimeoutRwLock; use crate::validator_monitor::{ - get_block_delay_ms, timestamp_now, ValidatorMonitor, + get_block_delay_ms, get_slot_delay_ms, timestamp_now, ValidatorMonitor, HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS, }; use crate::validator_pubkey_cache::ValidatorPubkeyCache; @@ -231,8 +232,10 @@ pub struct BeaconChain { pub(crate) head_tracker: Arc, /// A cache dedicated to block processing. pub(crate) snapshot_cache: TimeoutRwLock>, - /// Caches the shuffling for a given epoch and state root. + /// Caches the attester shuffling for a given epoch and shuffling key root. pub(crate) shuffling_cache: TimeoutRwLock, + /// Caches the beacon block proposer shuffling for a given epoch and shuffling key root. + pub(crate) beacon_proposer_cache: Mutex, /// Caches a map of `validator_index -> validator_pubkey`. pub(crate) validator_pubkey_cache: TimeoutRwLock, /// A list of any hard-coded forks that have been disabled. @@ -453,9 +456,10 @@ impl BeaconChain { &self, ) -> Result>, Error> { let head = self.head()?; - let slot = head.beacon_state.slot; + let head_slot = head.beacon_state.slot; + let head_state_root = head.beacon_state_root(); let iter = StateRootsIterator::owned(self.store.clone(), head.beacon_state); - let iter = std::iter::once(Ok((head.beacon_state_root, slot))) + let iter = std::iter::once(Ok((head_state_root, head_slot))) .chain(iter) .map(|result| result.map_err(Into::into)); Ok(iter) @@ -599,7 +603,7 @@ impl BeaconChain { Ok(HeadInfo { slot: head.beacon_block.slot(), block_root: head.beacon_block_root, - state_root: head.beacon_state_root, + state_root: head.beacon_state_root(), current_justified_checkpoint: head.beacon_state.current_justified_checkpoint, finalized_checkpoint: head.beacon_state.finalized_checkpoint, fork: head.beacon_state.fork, @@ -1549,7 +1553,7 @@ impl BeaconChain { // For the current and next epoch of this state, ensure we have the shuffling from this // block in our cache. for relative_epoch in &[RelativeEpoch::Current, RelativeEpoch::Next] { - let shuffling_id = ShufflingId::new(block_root, &state, *relative_epoch)?; + let shuffling_id = AttestationShufflingId::new(block_root, &state, *relative_epoch)?; let shuffling_is_cached = self .shuffling_cache @@ -1727,19 +1731,22 @@ impl BeaconChain { self.snapshot_cache .try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) + .ok_or(Error::SnapshotCacheLockTimeout) .map(|mut snapshot_cache| { - snapshot_cache.insert(BeaconSnapshot { - beacon_state: state, - beacon_state_root: signed_block.state_root(), - beacon_block: signed_block, - beacon_block_root: block_root, - }); + snapshot_cache.insert( + BeaconSnapshot { + beacon_state: state, + beacon_block: signed_block, + beacon_block_root: block_root, + }, + None, + ) }) - .unwrap_or_else(|| { + .unwrap_or_else(|e| { error!( self.log, - "Failed to obtain cache write lock"; - "lock" => "snapshot_cache", + "Failed to insert snapshot"; + "error" => ?e, "task" => "process block" ); }); @@ -1747,7 +1754,7 @@ impl BeaconChain { self.head_tracker .register_block(block_root, parent_root, slot); - // send an event to the `events` endpoint after fully processing the block + // Send an event to the `events` endpoint after fully processing the block. if let Some(event_handler) = self.event_handler.as_ref() { if event_handler.has_block_subscribers() { event_handler.register(EventKind::Block(SseBlock { @@ -2021,7 +2028,6 @@ impl BeaconChain { beacon_block, beacon_block_root, beacon_state, - beacon_state_root, }) }) .and_then(|mut snapshot| { @@ -2096,7 +2102,7 @@ impl BeaconChain { let update_head_timer = metrics::start_timer(&metrics::UPDATE_HEAD_TIMES); // These fields are used for server-sent events - let state_root = new_head.beacon_state_root; + let state_root = new_head.beacon_state_root(); let head_slot = new_head.beacon_state.slot; let target_epoch_start_slot = new_head .beacon_state @@ -2116,6 +2122,12 @@ impl BeaconChain { metrics::stop_timer(update_head_timer); + // Observe the delay between the start of the slot and when we set the block as head. + metrics::observe_duration( + &metrics::BEACON_BLOCK_HEAD_SLOT_START_DELAY_TIME, + get_slot_delay_ms(timestamp_now(), head_slot, &self.slot_clock), + ); + self.snapshot_cache .try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) .map(|mut snapshot_cache| { @@ -2458,7 +2470,6 @@ impl BeaconChain { beacon_block: self.head()?.beacon_block, beacon_block_root: self.head()?.beacon_block_root, beacon_state: self.head()?.beacon_state, - beacon_state_root: self.head()?.beacon_state_root, }; dump.push(last_slot.clone()); @@ -2485,7 +2496,6 @@ impl BeaconChain { beacon_block, beacon_block_root, beacon_state, - beacon_state_root, }; dump.push(slot.clone()); diff --git a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs index 104cb5ed0a0..80b9fe2ad80 100644 --- a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs +++ b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs @@ -210,7 +210,7 @@ where let anchor_state = &anchor.beacon_state; let mut anchor_block_header = anchor_state.latest_block_header.clone(); if anchor_block_header.state_root == Hash256::zero() { - anchor_block_header.state_root = anchor.beacon_state_root; + anchor_block_header.state_root = anchor.beacon_state_root(); } let anchor_root = anchor_block_header.canonical_root(); let anchor_epoch = anchor_state.current_epoch(); diff --git a/beacon_node/beacon_chain/src/beacon_proposer_cache.rs b/beacon_node/beacon_chain/src/beacon_proposer_cache.rs new file mode 100644 index 00000000000..21207101060 --- /dev/null +++ b/beacon_node/beacon_chain/src/beacon_proposer_cache.rs @@ -0,0 +1,113 @@ +//! The `BeaconProposer` cache stores the proposer indices for some epoch. +//! +//! This cache is keyed by `(epoch, block_root)` where `block_root` is the block root at +//! `end_slot(epoch - 1)`. We make the assertion that the proposer shuffling is identical for all +//! blocks in `epoch` which share the common ancestor of `block_root`. +//! +//! The cache is a fairly unintelligent LRU cache that is not pruned after finality. This makes it +//! very simple to reason about, but it might store values that are useless due to finalization. The +//! values it stores are very small, so this should not be an issue. + +use lru::LruCache; +use smallvec::SmallVec; +use types::{BeaconStateError, Epoch, EthSpec, Fork, Hash256, Slot, Unsigned}; + +/// The number of sets of proposer indices that should be cached. +const CACHE_SIZE: usize = 16; + +/// This value is fairly unimportant, it's used to avoid heap allocations. The result of it being +/// incorrect is non-substantial from a consensus perspective (and probably also from a +/// performance perspective). +const TYPICAL_SLOTS_PER_EPOCH: usize = 32; + +/// For some given slot, this contains the proposer index (`index`) and the `fork` that should be +/// used to verify their signature. +pub struct Proposer { + pub index: usize, + pub fork: Fork, +} + +/// The list of proposers for some given `epoch`, alongside the `fork` that should be used to verify +/// their signatures. +pub struct EpochBlockProposers { + /// The epoch to which the proposers pertain. + epoch: Epoch, + /// The fork that should be used to verify proposer signatures. + fork: Fork, + /// A list of length `T::EthSpec::slots_per_epoch()`, representing the proposers for each slot + /// in that epoch. + /// + /// E.g., if `self.epoch == 1`, then `self.proposers[0]` contains the proposer for slot `32`. + proposers: SmallVec<[usize; TYPICAL_SLOTS_PER_EPOCH]>, +} + +/// A cache to store the proposers for some epoch. +/// +/// See the module-level documentation for more information. +pub struct BeaconProposerCache { + cache: LruCache<(Epoch, Hash256), EpochBlockProposers>, +} + +impl Default for BeaconProposerCache { + fn default() -> Self { + Self { + cache: LruCache::new(CACHE_SIZE), + } + } +} + +impl BeaconProposerCache { + /// If it is cached, returns the proposer for the block at `slot` where the block has the + /// ancestor block root of `shuffling_decision_block` at `end_slot(slot.epoch() - 1)`. + pub fn get( + &mut self, + shuffling_decision_block: Hash256, + slot: Slot, + ) -> Option { + let epoch = slot.epoch(T::slots_per_epoch()); + let key = (epoch, shuffling_decision_block); + if let Some(cache) = self.cache.get(&key) { + // This `if` statement is likely unnecessary, but it feels like good practice. + if epoch == cache.epoch { + cache + .proposers + .get(slot.as_usize() % T::SlotsPerEpoch::to_usize()) + .map(|&index| Proposer { + index, + fork: cache.fork, + }) + } else { + None + } + } else { + None + } + } + + /// Insert the proposers into the cache. + /// + /// See `Self::get` for a description of `shuffling_decision_block`. + /// + /// The `fork` value must be valid to verify proposer signatures in `epoch`. + pub fn insert( + &mut self, + epoch: Epoch, + shuffling_decision_block: Hash256, + proposers: Vec, + fork: Fork, + ) -> Result<(), BeaconStateError> { + let key = (epoch, shuffling_decision_block); + if !self.cache.contains(&key) { + self.cache.put( + key, + EpochBlockProposers { + epoch, + fork, + proposers: proposers.into(), + }, + ); + } + + Ok(()) + } +} diff --git a/beacon_node/beacon_chain/src/beacon_snapshot.rs b/beacon_node/beacon_chain/src/beacon_snapshot.rs index f38a688963d..ba99debaa55 100644 --- a/beacon_node/beacon_chain/src/beacon_snapshot.rs +++ b/beacon_node/beacon_chain/src/beacon_snapshot.rs @@ -9,7 +9,6 @@ pub struct BeaconSnapshot { pub beacon_block: SignedBeaconBlock, pub beacon_block_root: Hash256, pub beacon_state: BeaconState, - pub beacon_state_root: Hash256, } impl BeaconSnapshot { @@ -18,28 +17,33 @@ impl BeaconSnapshot { beacon_block: SignedBeaconBlock, beacon_block_root: Hash256, beacon_state: BeaconState, - beacon_state_root: Hash256, ) -> Self { Self { beacon_block, beacon_block_root, beacon_state, - beacon_state_root, } } + /// Returns the state root from `self.beacon_block`. + /// + /// ## Caution + /// + /// It is not strictly enforced that `root(self.beacon_state) == self.beacon_state_root()`. + pub fn beacon_state_root(&self) -> Hash256 { + self.beacon_block.message.state_root + } + /// Update all fields of the checkpoint. pub fn update( &mut self, beacon_block: SignedBeaconBlock, beacon_block_root: Hash256, beacon_state: BeaconState, - beacon_state_root: Hash256, ) { self.beacon_block = beacon_block; self.beacon_block_root = beacon_block_root; self.beacon_state = beacon_state; - self.beacon_state_root = beacon_state_root; } pub fn clone_with(&self, clone_config: CloneConfig) -> Self { @@ -47,7 +51,6 @@ impl BeaconSnapshot { beacon_block: self.beacon_block.clone(), beacon_block_root: self.beacon_block_root, beacon_state: self.beacon_state.clone_with(clone_config), - beacon_state_root: self.beacon_state_root, } } } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 21d8ddc541d..fe83987186a 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -40,17 +40,20 @@ //! END //! //! ``` +use crate::snapshot_cache::PreProcessingSnapshot; +use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::{ beacon_chain::{ BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT, MAXIMUM_GOSSIP_CLOCK_DISPARITY, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT, }, - metrics, BeaconChain, BeaconChainError, BeaconChainTypes, BeaconSnapshot, + metrics, BeaconChain, BeaconChainError, BeaconChainTypes, }; use fork_choice::{ForkChoice, ForkChoiceStore}; use parking_lot::RwLockReadGuard; -use slog::{error, Logger}; +use proto_array::Block as ProtoBlock; +use slog::{debug, error, Logger}; use slot_clock::SlotClock; use ssz::Encode; use state_processing::{ @@ -66,7 +69,7 @@ use std::io::Write; use store::{Error as DBError, HotColdDB, HotStateSummary, KeyValueStore, StoreOp}; use tree_hash::TreeHash; use types::{ - BeaconBlock, BeaconState, BeaconStateError, ChainSpec, CloneConfig, EthSpec, Hash256, + BeaconBlock, BeaconState, BeaconStateError, ChainSpec, CloneConfig, Epoch, EthSpec, Hash256, PublicKey, RelativeEpoch, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, }; @@ -179,7 +182,7 @@ pub enum BlockError { /// ## Peer scoring /// /// The block is invalid and the peer is faulty. - BlockIsNotLaterThanParent { block_slot: Slot, state_slot: Slot }, + BlockIsNotLaterThanParent { block_slot: Slot, parent_slot: Slot }, /// At least one block in the chain segment did not have it's parent root set to the root of /// the prior block. /// @@ -348,11 +351,8 @@ pub fn signature_verify_chain_segment( .map(|(_, block)| block.slot()) .unwrap_or_else(|| slot); - let state = cheap_state_advance_to_obtain_committees( - &mut parent.beacon_state, - highest_slot, - &chain.spec, - )?; + let state = + cheap_state_advance_to_obtain_committees(&mut parent.pre_state, highest_slot, &chain.spec)?; let pubkey_cache = get_validator_pubkey_cache(chain)?; let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec); @@ -388,7 +388,7 @@ pub fn signature_verify_chain_segment( pub struct GossipVerifiedBlock { pub block: SignedBeaconBlock, pub block_root: Hash256, - parent: BeaconSnapshot, + parent: Option>, } /// A wrapper around a `SignedBeaconBlock` that indicates that all signatures (except the deposit @@ -396,7 +396,7 @@ pub struct GossipVerifiedBlock { pub struct SignatureVerifiedBlock { block: SignedBeaconBlock, block_root: Hash256, - parent: Option>, + parent: Option>, } /// A wrapper around a `SignedBeaconBlock` that indicates that this block is fully verified and @@ -520,16 +520,90 @@ impl GossipVerifiedBlock { &chain.store, )?; - let (mut parent, block) = load_parent(block, chain)?; + let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); + let (parent_block, block) = verify_parent_block_is_known(chain, block)?; + + // Track the number of skip slots between the block and its parent. + metrics::set_gauge( + &metrics::GOSSIP_BEACON_BLOCK_SKIPPED_SLOTS, + block + .slot() + .as_u64() + .saturating_sub(1) + .saturating_sub(parent_block.slot.into()) as i64, + ); - // Reject any block that exceeds our limit on skipped slots. - check_block_skip_slots(chain, &parent.beacon_block.message, &block.message)?; + // Paranoid check to prevent propagation of blocks that don't form a legitimate chain. + // + // This is not in the spec, but @protolambda tells me that the majority of other clients are + // already doing it. For reference: + // + // https://github.com/ethereum/eth2.0-specs/pull/2196 + if parent_block.slot >= block.slot() { + return Err(BlockError::BlockIsNotLaterThanParent { + block_slot: block.slot(), + parent_slot: parent_block.slot, + }); + } - let state = cheap_state_advance_to_obtain_committees( - &mut parent.beacon_state, - block.slot(), - &chain.spec, - )?; + let proposer_shuffling_decision_block = + if parent_block.slot.epoch(T::EthSpec::slots_per_epoch()) == block_epoch { + parent_block + .next_epoch_shuffling_id + .shuffling_decision_block + } else { + parent_block.root + }; + + // Reject any block that exceeds our limit on skipped slots. + check_block_skip_slots(chain, parent_block.slot, &block.message)?; + + // We assign to a variable instead of using `if let Some` directly to ensure we drop the + // write lock before trying to acquire it again in the `else` clause. + let proposer_opt = chain + .beacon_proposer_cache + .lock() + .get::(proposer_shuffling_decision_block, block.slot()); + let (expected_proposer, fork, parent, block) = if let Some(proposer) = proposer_opt { + // The proposer index was cached and we can return it without needing to load the + // parent. + (proposer.index, proposer.fork, None, block) + } else { + // The proposer index was *not* cached and we must load the parent in order to determine + // the proposer index. + let (mut parent, block) = load_parent(block, chain)?; + + debug!( + chain.log, + "Proposer shuffling cache miss"; + "parent_root" => ?parent.beacon_block_root, + "parent_slot" => parent.beacon_block.slot(), + "block_root" => ?block_root, + "block_slot" => block.slot(), + ); + + // The state produced is only valid for determining proposer/attester shuffling indices. + let state = cheap_state_advance_to_obtain_committees( + &mut parent.pre_state, + block.slot(), + &chain.spec, + )?; + + let proposers = state.get_beacon_proposer_indices(&chain.spec)?; + let proposer_index = *proposers + .get(block.slot().as_usize() % T::EthSpec::slots_per_epoch() as usize) + .ok_or_else(|| BeaconChainError::NoProposerForSlot(block.slot()))?; + + // Prime the proposer shuffling cache with the newly-learned value. + chain.beacon_proposer_cache.lock().insert( + block_epoch, + proposer_shuffling_decision_block, + proposers, + state.fork, + )?; + + (proposer_index, state.fork, Some(parent), block) + }; let signature_is_valid = { let pubkey_cache = get_validator_pubkey_cache(chain)?; @@ -539,7 +613,7 @@ impl GossipVerifiedBlock { block.verify_signature( Some(block_root), pubkey, - &state.fork, + &fork, chain.genesis_validators_root, &chain.spec, ) @@ -566,12 +640,10 @@ impl GossipVerifiedBlock { }); } - let expected_proposer = - state.get_beacon_proposer_index(block.message.slot, &chain.spec)? as u64; - if block.message.proposer_index != expected_proposer { + if block.message.proposer_index != expected_proposer as u64 { return Err(BlockError::IncorrectBlockProposer { block: block.message.proposer_index, - local_shuffling: expected_proposer, + local_shuffling: expected_proposer as u64, }); } @@ -615,12 +687,12 @@ impl SignatureVerifiedBlock { let (mut parent, block) = load_parent(block, chain)?; // Reject any block that exceeds our limit on skipped slots. - check_block_skip_slots(chain, &parent.beacon_block.message, &block.message)?; + check_block_skip_slots(chain, parent.beacon_block.slot(), &block.message)?; let block_root = get_block_root(&block); let state = cheap_state_advance_to_obtain_committees( - &mut parent.beacon_state, + &mut parent.pre_state, block.slot(), &chain.spec, )?; @@ -657,11 +729,14 @@ impl SignatureVerifiedBlock { from: GossipVerifiedBlock, chain: &BeaconChain, ) -> Result> { - let mut parent = from.parent; - let block = from.block; + let (mut parent, block) = if let Some(parent) = from.parent { + (parent, from.block) + } else { + load_parent(from.block, chain)? + }; let state = cheap_state_advance_to_obtain_committees( - &mut parent.beacon_state, + &mut parent.pre_state, block.slot(), &chain.spec, )?; @@ -749,7 +824,7 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> { pub fn from_signature_verified_components( block: SignedBeaconBlock, block_root: Hash256, - parent: BeaconSnapshot, + parent: PreProcessingSnapshot, chain: &BeaconChain, ) -> Result> { // Reject any block if its parent is not known to fork choice. @@ -771,7 +846,7 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> { } // Reject any block that exceeds our limit on skipped slots. - check_block_skip_slots(chain, &parent.beacon_block.message, &block.message)?; + check_block_skip_slots(chain, parent.beacon_block.slot(), &block.message)?; /* * Perform cursory checks to see if the block is even worth processing. @@ -790,20 +865,41 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> { let mut confirmation_db_batch = vec![]; // The block must have a higher slot than its parent. - if block.slot() <= parent.beacon_state.slot { + if block.slot() <= parent.beacon_block.slot() { return Err(BlockError::BlockIsNotLaterThanParent { block_slot: block.slot(), - state_slot: parent.beacon_state.slot, + parent_slot: parent.beacon_block.slot(), }); } let mut summaries = vec![]; // Transition the parent state to the block slot. - let mut state = parent.beacon_state; + // + // It is important to note that we're using a "pre-state" here, one that has potentially + // been advanced one slot forward from `parent.beacon_block.slot`. + let mut state = parent.pre_state; + + // Perform a sanity check on the pre-state. + let parent_slot = parent.beacon_block.slot(); + if state.slot < parent_slot || state.slot > parent_slot + 1 { + return Err(BeaconChainError::BadPreState { + parent_root: parent.beacon_block_root, + parent_slot, + block_root, + block_slot: block.slot(), + state_slot: state.slot, + } + .into()); + } + let distance = block.slot().as_u64().saturating_sub(state.slot.as_u64()); - for i in 0..distance { - let state_root = if i == 0 { + for _ in 0..distance { + let state_root = if parent.beacon_block.slot() == state.slot { + // If it happens that `pre_state` has *not* already been advanced forward a single + // slot, then there is no need to compute the state root for this + // `per_slot_processing` call since that state root is already stored in the parent + // block. parent.beacon_block.state_root() } else { // This is a new state we've reached, so stage it for storage in the DB. @@ -851,6 +947,24 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> { expose_participation_metrics(&summaries); + // If the block is sufficiently recent, notify the validator monitor. + if let Some(slot) = chain.slot_clock.now() { + let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); + if block.slot().epoch(T::EthSpec::slots_per_epoch()) + + VALIDATOR_MONITOR_HISTORIC_EPOCHS as u64 + >= epoch + { + let validator_monitor = chain.validator_monitor.read(); + // Update the summaries in a separate loop to `per_slot_processing`. This protects + // the `validator_monitor` lock from being bounced or held for a long time whilst + // performing `per_slot_processing`. + for (i, summary) in summaries.iter().enumerate() { + let epoch = state.current_epoch() - Epoch::from(summaries.len() - i); + validator_monitor.process_validator_statuses(epoch, &summary.statuses); + } + } + } + metrics::stop_timer(catchup_timer); /* @@ -941,14 +1055,14 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> { /// `import_max_skip_slots` value. fn check_block_skip_slots( chain: &BeaconChain, - parent: &BeaconBlock, + parent_slot: Slot, block: &BeaconBlock, ) -> Result<(), BlockError> { // Reject any block that exceeds our limit on skipped slots. if let Some(max_skip_slots) = chain.config.import_max_skip_slots { - if block.slot > parent.slot + max_skip_slots { + if block.slot > parent_slot + max_skip_slots { return Err(BlockError::TooManySkippedSlots { - parent_slot: parent.slot, + parent_slot, block_slot: block.slot, }); } @@ -1071,6 +1185,24 @@ pub fn get_block_root(block: &SignedBeaconBlock) -> Hash256 { block_root } +/// Verify the parent of `block` is known, returning some information about the parent block from +/// fork choice. +#[allow(clippy::type_complexity)] +fn verify_parent_block_is_known( + chain: &BeaconChain, + block: SignedBeaconBlock, +) -> Result<(ProtoBlock, SignedBeaconBlock), BlockError> { + if let Some(proto_block) = chain + .fork_choice + .read() + .get_block(&block.message.parent_root) + { + Ok((proto_block, block)) + } else { + Err(BlockError::ParentUnknown(Box::new(block))) + } +} + /// Load the parent snapshot (block and state) of the given `block`. /// /// Returns `Err(BlockError::ParentUnknown)` if the parent is not found, or if an error occurs @@ -1079,7 +1211,13 @@ pub fn get_block_root(block: &SignedBeaconBlock) -> Hash256 { fn load_parent( block: SignedBeaconBlock, chain: &BeaconChain, -) -> Result<(BeaconSnapshot, SignedBeaconBlock), BlockError> { +) -> Result< + ( + PreProcessingSnapshot, + SignedBeaconBlock, + ), + BlockError, +> { // Reject any block if its parent is not known to fork choice. // // A block that is not in fork choice is either: @@ -1105,7 +1243,7 @@ fn load_parent( .try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) .and_then(|mut snapshot_cache| snapshot_cache.try_remove(block.parent_root())) { - Ok((snapshot, block)) + Ok((snapshot.into_pre_state(), block)) } else { // Load the blocks parent block from the database, returning invalid if that block is not // found. @@ -1136,11 +1274,10 @@ fn load_parent( })?; Ok(( - BeaconSnapshot { + PreProcessingSnapshot { beacon_block: parent_block, beacon_block_root: root, - beacon_state: parent_state, - beacon_state_root: parent_state_root, + pre_state: parent_state, }, block, )) @@ -1151,12 +1288,12 @@ fn load_parent( result } -/// Performs a cheap (time-efficient) state advancement so the committees for `slot` can be -/// obtained from `state`. +/// Performs a cheap (time-efficient) state advancement so the committees and proposer shuffling for +/// `slot` can be obtained from `state`. /// /// The state advancement is "cheap" since it does not generate state roots. As a result, the -/// returned state might be holistically invalid but the committees will be correct (since they do -/// not rely upon state roots). +/// returned state might be holistically invalid but the committees/proposers will be correct (since +/// they do not rely upon state roots). /// /// If the given `state` can already serve the `slot`, the committees will be built on the `state` /// and `Cow::Borrowed(state)` will be returned. Otherwise, the state will be cloned, cheaply @@ -1176,7 +1313,7 @@ fn cheap_state_advance_to_obtain_committees<'a, E: EthSpec>( } else if state.slot > block_slot { Err(BlockError::BlockIsNotLaterThanParent { block_slot, - state_slot: state.slot, + parent_slot: state.slot, }) } else { let mut state = state.clone_with(CloneConfig::committee_caches_only()); diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 8b188549045..d63c28c0182 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -328,7 +328,6 @@ where let genesis = BeaconSnapshot { beacon_block_root, beacon_block, - beacon_state_root, beacon_state, }; @@ -468,14 +467,9 @@ where let mut canonical_head = BeaconSnapshot { beacon_block_root: head_block_root, beacon_block: head_block, - beacon_state_root: head_state_root, beacon_state: head_state, }; - if canonical_head.beacon_block.state_root() != canonical_head.beacon_state_root { - return Err("beacon_block.state_root != beacon_state".to_string()); - } - canonical_head .beacon_state .build_all_caches(&self.spec) @@ -560,6 +554,7 @@ where canonical_head, )), shuffling_cache: TimeoutRwLock::new(ShufflingCache::new()), + beacon_proposer_cache: <_>::default(), validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache), disabled_forks: self.disabled_forks, shutdown_sender: self @@ -599,7 +594,7 @@ where info!( log, "Beacon chain initialized"; - "head_state" => format!("{}", head.beacon_state_root), + "head_state" => format!("{}", head.beacon_state_root()), "head_block" => format!("{}", head.beacon_block_root), "head_slot" => format!("{}", head.beacon_block.slot()), ); diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 9af9167f752..e7297767e36 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -69,9 +69,11 @@ pub enum BeaconChainError { /// Returned when an internal check fails, indicating corrupt data. InvariantViolated(String), SszTypesError(SszTypesError), + NoProposerForSlot(Slot), CanonicalHeadLockTimeout, AttestationCacheLockTimeout, ValidatorPubkeyCacheLockTimeout, + SnapshotCacheLockTimeout, IncorrectStateForAttestation(RelativeEpochError), InvalidValidatorPubkeyBytes(bls::Error), ValidatorPubkeyCacheIncomplete(usize), @@ -96,6 +98,13 @@ pub enum BeaconChainError { head_slot: Slot, request_slot: Slot, }, + BadPreState { + parent_root: Hash256, + parent_slot: Slot, + block_root: Hash256, + block_slot: Slot, + state_slot: Slot, + }, } easy_from_to!(SlotProcessingError, BeaconChainError); diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index bdeee8e9743..448e60fb17f 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -2,6 +2,7 @@ pub mod attestation_verification; mod beacon_chain; mod beacon_fork_choice_store; +mod beacon_proposer_cache; mod beacon_snapshot; mod block_verification; pub mod builder; @@ -21,6 +22,7 @@ mod persisted_beacon_chain; mod persisted_fork_choice; mod shuffling_cache; mod snapshot_cache; +pub mod state_advance_timer; pub mod test_utils; mod timeout_rw_lock; pub mod validator_monitor; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index f70a11dcb03..624c5821f66 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -424,6 +424,54 @@ lazy_static! { /* * Validator Monitor Metrics (per-epoch summaries) */ + pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_ATTESTER_HIT: Result = + try_create_int_counter_vec( + "validator_monitor_prev_epoch_on_chain_attester_hit", + "Incremented if the validator is flagged as a previous epoch attester \ + during per epoch processing", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_ATTESTER_MISS: Result = + try_create_int_counter_vec( + "validator_monitor_prev_epoch_on_chain_attester_miss", + "Incremented if the validator is not flagged as a previous epoch attester \ + during per epoch processing", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_HEAD_ATTESTER_HIT: Result = + try_create_int_counter_vec( + "validator_monitor_prev_epoch_on_chain_head_attester_hit", + "Incremented if the validator is flagged as a previous epoch head attester \ + during per epoch processing", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_HEAD_ATTESTER_MISS: Result = + try_create_int_counter_vec( + "validator_monitor_prev_epoch_on_chain_head_attester_miss", + "Incremented if the validator is not flagged as a previous epoch head attester \ + during per epoch processing", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_TARGET_ATTESTER_HIT: Result = + try_create_int_counter_vec( + "validator_monitor_prev_epoch_on_chain_target_attester_hit", + "Incremented if the validator is flagged as a previous epoch target attester \ + during per epoch processing", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_TARGET_ATTESTER_MISS: Result = + try_create_int_counter_vec( + "validator_monitor_prev_epoch_on_chain_target_attester_miss", + "Incremented if the validator is not flagged as a previous epoch target attester \ + during per epoch processing", + &["validator"] + ); + pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_INCLUSION_DISTANCE: Result = + try_create_int_gauge_vec( + "validator_monitor_prev_epoch_on_chain_inclusion_distance", + "The attestation inclusion distance calculated during per epoch processing", + &["validator"] + ); pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATIONS_TOTAL: Result = try_create_int_gauge_vec( "validator_monitor_prev_epoch_attestations_total", @@ -575,15 +623,28 @@ lazy_static! { */ pub static ref BEACON_BLOCK_IMPORTED_SLOT_START_DELAY_TIME: Result = try_create_histogram( "beacon_block_imported_slot_start_delay_time", - "Duration between the start of the blocks slot and the current time.", + "Duration between the start of the blocks slot and the current time when it was imported.", ); + pub static ref BEACON_BLOCK_HEAD_SLOT_START_DELAY_TIME: Result = try_create_histogram( + "beacon_block_head_slot_start_delay_time", + "Duration between the start of the blocks slot and the current time when it was as head.", + ); + + /* + * General block metrics + */ + pub static ref GOSSIP_BEACON_BLOCK_SKIPPED_SLOTS: Result = + try_create_int_gauge( + "gossip_beacon_block_skipped_slots", + "For each gossip blocks, the number of skip slots between it and its parent" + ); } /// Scrape the `beacon_chain` for metrics that are not constantly updated (e.g., the present slot, /// head state info, etc) and update the Prometheus `DEFAULT_REGISTRY`. pub fn scrape_for_metrics(beacon_chain: &BeaconChain) { if let Ok(head) = beacon_chain.head() { - scrape_head_state::(&head.beacon_state, head.beacon_state_root) + scrape_head_state::(&head.beacon_state, head.beacon_state_root()) } if let Some(slot) = beacon_chain.slot_clock.now() { diff --git a/beacon_node/beacon_chain/src/shuffling_cache.rs b/beacon_node/beacon_chain/src/shuffling_cache.rs index b76adf05e64..5a287daf0f1 100644 --- a/beacon_node/beacon_chain/src/shuffling_cache.rs +++ b/beacon_node/beacon_chain/src/shuffling_cache.rs @@ -1,6 +1,6 @@ use crate::metrics; use lru::LruCache; -use types::{beacon_state::CommitteeCache, Epoch, Hash256, ShufflingId}; +use types::{beacon_state::CommitteeCache, AttestationShufflingId, Epoch, Hash256}; /// The size of the LRU cache that stores committee caches for quicker verification. /// @@ -14,7 +14,7 @@ const CACHE_SIZE: usize = 16; /// It has been named `ShufflingCache` because `CommitteeCacheCache` is a bit weird and looks like /// a find/replace error. pub struct ShufflingCache { - cache: LruCache, + cache: LruCache, } impl ShufflingCache { @@ -24,7 +24,7 @@ impl ShufflingCache { } } - pub fn get(&mut self, key: &ShufflingId) -> Option<&CommitteeCache> { + pub fn get(&mut self, key: &AttestationShufflingId) -> Option<&CommitteeCache> { let opt = self.cache.get(key); if opt.is_some() { @@ -36,11 +36,11 @@ impl ShufflingCache { opt } - pub fn contains(&self, key: &ShufflingId) -> bool { + pub fn contains(&self, key: &AttestationShufflingId) -> bool { self.cache.contains(key) } - pub fn insert(&mut self, key: ShufflingId, committee_cache: &CommitteeCache) { + pub fn insert(&mut self, key: AttestationShufflingId, committee_cache: &CommitteeCache) { if !self.cache.contains(&key) { self.cache.put(key, committee_cache.clone()); } @@ -49,8 +49,8 @@ impl ShufflingCache { /// Contains the shuffling IDs for a beacon block. pub struct BlockShufflingIds { - pub current: ShufflingId, - pub next: ShufflingId, + pub current: AttestationShufflingId, + pub next: AttestationShufflingId, pub block_root: Hash256, } @@ -58,13 +58,16 @@ impl BlockShufflingIds { /// Returns the shuffling ID for the given epoch. /// /// Returns `None` if `epoch` is prior to `self.current.shuffling_epoch`. - pub fn id_for_epoch(&self, epoch: Epoch) -> Option { + pub fn id_for_epoch(&self, epoch: Epoch) -> Option { if epoch == self.current.shuffling_epoch { Some(self.current.clone()) } else if epoch == self.next.shuffling_epoch { Some(self.next.clone()) } else if epoch > self.next.shuffling_epoch { - Some(ShufflingId::from_components(epoch, self.block_root)) + Some(AttestationShufflingId::from_components( + epoch, + self.block_root, + )) } else { None } diff --git a/beacon_node/beacon_chain/src/snapshot_cache.rs b/beacon_node/beacon_chain/src/snapshot_cache.rs index 8168ffd174d..b1531d6b781 100644 --- a/beacon_node/beacon_chain/src/snapshot_cache.rs +++ b/beacon_node/beacon_chain/src/snapshot_cache.rs @@ -1,10 +1,93 @@ use crate::BeaconSnapshot; use std::cmp; -use types::{beacon_state::CloneConfig, Epoch, EthSpec, Hash256}; +use types::{ + beacon_state::CloneConfig, BeaconState, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot, +}; /// The default size of the cache. pub const DEFAULT_SNAPSHOT_CACHE_SIZE: usize = 4; +/// This snapshot is to be used for verifying a child of `self.beacon_block`. +pub struct PreProcessingSnapshot { + /// This state is equivalent to the `self.beacon_block.state_root()` state that has been + /// advanced forward one slot using `per_slot_processing`. This state is "primed and ready" for + /// the application of another block. + pub pre_state: BeaconState, + pub beacon_block: SignedBeaconBlock, + pub beacon_block_root: Hash256, +} + +impl From> for PreProcessingSnapshot { + fn from(snapshot: BeaconSnapshot) -> Self { + Self { + pre_state: snapshot.beacon_state, + beacon_block: snapshot.beacon_block, + beacon_block_root: snapshot.beacon_block_root, + } + } +} + +impl CacheItem { + pub fn new_without_pre_state(snapshot: BeaconSnapshot) -> Self { + Self { + beacon_block: snapshot.beacon_block, + beacon_block_root: snapshot.beacon_block_root, + beacon_state: snapshot.beacon_state, + pre_state: None, + } + } + + fn clone_to_snapshot_with(&self, clone_config: CloneConfig) -> BeaconSnapshot { + BeaconSnapshot { + beacon_state: self.beacon_state.clone_with(clone_config), + beacon_block: self.beacon_block.clone(), + beacon_block_root: self.beacon_block_root, + } + } + + pub fn into_pre_state(self) -> PreProcessingSnapshot { + PreProcessingSnapshot { + beacon_block: self.beacon_block, + beacon_block_root: self.beacon_block_root, + pre_state: self.pre_state.unwrap_or(self.beacon_state), + } + } +} + +impl Into> for CacheItem { + fn into(self) -> BeaconSnapshot { + BeaconSnapshot { + beacon_state: self.beacon_state, + beacon_block: self.beacon_block, + beacon_block_root: self.beacon_block_root, + } + } +} + +pub enum StateAdvance { + /// The cache does not contain the supplied block root. + BlockNotFound, + /// The cache contains the supplied block root but the state has already been advanced. + AlreadyAdvanced, + /// The cache contains the supplied block root and the state has not yet been advanced. + State { + state: Box>, + state_root: Hash256, + block_slot: Slot, + }, +} + +/// The item stored in the `SnapshotCache`. +pub struct CacheItem { + beacon_block: SignedBeaconBlock, + beacon_block_root: Hash256, + /// This state is equivalent to `self.beacon_block.state_root()`. + beacon_state: BeaconState, + /// This state is equivalent to `self.beacon_state` that has had `per_slot_processing` applied + /// to it. This state assists in optimizing block processing. + pre_state: Option>, +} + /// Provides a cache of `BeaconSnapshot` that is intended primarily for block processing. /// /// ## Cache Queuing @@ -20,7 +103,7 @@ pub const DEFAULT_SNAPSHOT_CACHE_SIZE: usize = 4; pub struct SnapshotCache { max_len: usize, head_block_root: Hash256, - snapshots: Vec>, + snapshots: Vec>, } impl SnapshotCache { @@ -31,15 +114,22 @@ impl SnapshotCache { Self { max_len: cmp::max(max_len, 1), head_block_root: head.beacon_block_root, - snapshots: vec![head], + snapshots: vec![CacheItem::new_without_pre_state(head)], } } /// Insert a snapshot, potentially removing an existing snapshot if `self` is at capacity (see /// struct-level documentation for more info). - pub fn insert(&mut self, snapshot: BeaconSnapshot) { + pub fn insert(&mut self, snapshot: BeaconSnapshot, pre_state: Option>) { + let item = CacheItem { + beacon_block: snapshot.beacon_block, + beacon_block_root: snapshot.beacon_block_root, + beacon_state: snapshot.beacon_state, + pre_state, + }; + if self.snapshots.len() < self.max_len { - self.snapshots.push(snapshot); + self.snapshots.push(item); } else { let insert_at = self .snapshots @@ -56,13 +146,13 @@ impl SnapshotCache { .map(|(i, _slot)| i); if let Some(i) = insert_at { - self.snapshots[i] = snapshot; + self.snapshots[i] = item; } } } /// If there is a snapshot with `block_root`, remove and return it. - pub fn try_remove(&mut self, block_root: Hash256) -> Option> { + pub fn try_remove(&mut self, block_root: Hash256) -> Option> { self.snapshots .iter() .position(|snapshot| snapshot.beacon_block_root == block_root) @@ -78,7 +168,40 @@ impl SnapshotCache { self.snapshots .iter() .find(|snapshot| snapshot.beacon_block_root == block_root) - .map(|snapshot| snapshot.clone_with(clone_config)) + .map(|snapshot| snapshot.clone_to_snapshot_with(clone_config)) + } + + pub fn get_for_state_advance(&mut self, block_root: Hash256) -> StateAdvance { + if let Some(snapshot) = self + .snapshots + .iter_mut() + .find(|snapshot| snapshot.beacon_block_root == block_root) + { + if snapshot.pre_state.is_some() { + StateAdvance::AlreadyAdvanced + } else { + let cloned = snapshot + .beacon_state + .clone_with(CloneConfig::committee_caches_only()); + + StateAdvance::State { + state: Box::new(std::mem::replace(&mut snapshot.beacon_state, cloned)), + state_root: snapshot.beacon_block.state_root(), + block_slot: snapshot.beacon_block.slot(), + } + } + } else { + StateAdvance::BlockNotFound + } + } + + pub fn update_pre_state(&mut self, block_root: Hash256, state: BeaconState) -> Option<()> { + self.snapshots + .iter_mut() + .find(|snapshot| snapshot.beacon_block_root == block_root) + .map(|snapshot| { + snapshot.pre_state = Some(state); + }) } /// Removes all snapshots from the queue that are less than or equal to the finalized epoch. @@ -115,7 +238,6 @@ mod test { BeaconSnapshot { beacon_state, - beacon_state_root: Hash256::from_low_u64_be(i), beacon_block: SignedBeaconBlock { message: BeaconBlock::empty(&spec), signature: generate_deterministic_keypair(0) @@ -143,7 +265,7 @@ mod test { // Each snapshot should be one slot into an epoch, with each snapshot one epoch apart. snapshot.beacon_state.slot = Slot::from(i * MainnetEthSpec::slots_per_epoch() + 1); - cache.insert(snapshot); + cache.insert(snapshot, None); assert_eq!( cache.snapshots.len(), @@ -161,7 +283,7 @@ mod test { // 2 2 // 3 3 assert_eq!(cache.snapshots.len(), CACHE_SIZE); - cache.insert(get_snapshot(42)); + cache.insert(get_snapshot(42), None); assert_eq!(cache.snapshots.len(), CACHE_SIZE); assert!( @@ -208,7 +330,7 @@ mod test { // Over-fill the cache so it needs to eject some old values on insert. for i in 0..CACHE_SIZE as u64 { - cache.insert(get_snapshot(u64::max_value() - i)); + cache.insert(get_snapshot(u64::max_value() - i), None); } // Ensure that the new head value was not removed from the cache. diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs new file mode 100644 index 00000000000..260b9d0bc12 --- /dev/null +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -0,0 +1,327 @@ +//! Provides a timer which runs in the tail-end of each slot and maybe advances the state of the +//! head block forward a single slot. +//! +//! This provides an optimization with the following benefits: +//! +//! 1. Removes the burden of a single, mandatory `per_slot_processing` call from the leading-edge of +//! block processing. This helps import blocks faster. +//! 2. Allows the node to learn of the shuffling for the next epoch, before the first block from +//! that epoch has arrived. This helps reduce gossip block propagation times. +//! +//! The downsides to this optimization are: +//! +//! 1. We are required to store an additional `BeaconState` for the head block. This consumes +//! memory. +//! 2. There's a possibility that the head block is never built upon, causing wasted CPU cycles. +use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; +use crate::{ + beacon_chain::BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT, snapshot_cache::StateAdvance, BeaconChain, + BeaconChainError, BeaconChainTypes, +}; +use slog::{debug, error, warn, Logger}; +use slot_clock::SlotClock; +use state_processing::per_slot_processing; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; +use task_executor::TaskExecutor; +use tokio::time::sleep; +use types::{EthSpec, Hash256, Slot}; + +/// If the head slot is more than `MAX_ADVANCE_DISTANCE` from the current slot, then don't perform +/// the state advancement. +/// +/// This avoids doing unnecessary work whilst the node is syncing or has perhaps been put to sleep +/// for some period of time. +const MAX_ADVANCE_DISTANCE: u64 = 4; + +#[derive(Debug)] +enum Error { + BeaconChain(BeaconChainError), + HeadMissingFromSnapshotCache(Hash256), + MaxDistanceExceeded { current_slot: Slot, head_slot: Slot }, + StateAlreadyAdvanced { block_root: Hash256 }, + BadStateSlot { state_slot: Slot, block_slot: Slot }, +} + +impl From for Error { + fn from(e: BeaconChainError) -> Self { + Self::BeaconChain(e) + } +} + +/// Provides a simple thread-safe lock to be used for task co-ordination. Practically equivalent to +/// `Mutex<()>`. +#[derive(Clone)] +struct Lock(Arc); + +impl Lock { + /// Instantiate an unlocked self. + pub fn new() -> Self { + Self(Arc::new(AtomicBool::new(false))) + } + + /// Lock self, returning `true` if the lock was already set. + pub fn lock(&self) -> bool { + self.0.fetch_or(true, Ordering::SeqCst) + } + + /// Unlock self. + pub fn unlock(&self) { + self.0.store(false, Ordering::SeqCst); + } +} + +/// Spawns the timer described in the module-level documentation. +pub fn spawn_state_advance_timer( + executor: TaskExecutor, + beacon_chain: Arc>, + log: Logger, +) { + executor.spawn( + state_advance_timer(executor.clone(), beacon_chain, log), + "state_advance_timer", + ); +} + +/// Provides the timer described in the module-level documentation. +async fn state_advance_timer( + executor: TaskExecutor, + beacon_chain: Arc>, + log: Logger, +) { + let is_running = Lock::new(); + let slot_clock = &beacon_chain.slot_clock; + let slot_duration = slot_clock.slot_duration(); + + loop { + match beacon_chain.slot_clock.duration_to_next_slot() { + Some(duration) => sleep(duration + (slot_duration / 4) * 3).await, + None => { + error!(log, "Failed to read slot clock"); + // If we can't read the slot clock, just wait another slot. + sleep(slot_duration).await; + continue; + } + }; + + // Only start spawn the state advance task if the lock was previously free. + if !is_running.lock() { + let log = log.clone(); + let beacon_chain = beacon_chain.clone(); + let is_running = is_running.clone(); + + executor.spawn_blocking( + move || { + match advance_head(&beacon_chain, &log) { + Ok(()) => (), + Err(Error::BeaconChain(e)) => error!( + log, + "Failed to advance head state"; + "error" => ?e + ), + Err(Error::StateAlreadyAdvanced { block_root }) => debug!( + log, + "State already advanced on slot"; + "block_root" => ?block_root + ), + Err(Error::MaxDistanceExceeded { + current_slot, + head_slot, + }) => debug!( + log, + "Refused to advance head state"; + "head_slot" => head_slot, + "current_slot" => current_slot, + ), + other => warn!( + log, + "Did not advance head state"; + "reason" => ?other + ), + }; + + // Permit this blocking task to spawn again, next time the timer fires. + is_running.unlock(); + }, + "state_advance_blocking", + ); + } else { + warn!( + log, + "State advance routine overloaded"; + "msg" => "system resources may be overloaded" + ) + } + } +} + +/// Reads the `snapshot_cache` from the `beacon_chain` and attempts to take a clone of the +/// `BeaconState` of the head block. If it obtains this clone, the state will be advanced a single +/// slot then placed back in the `snapshot_cache` to be used for block verification. +/// +/// See the module-level documentation for rationale. +fn advance_head( + beacon_chain: &BeaconChain, + log: &Logger, +) -> Result<(), Error> { + let current_slot = beacon_chain.slot()?; + + // These brackets ensure that the `head_slot` value is dropped before we run fork choice and + // potentially invalidate it. + // + // Fork-choice is not run *before* this function to avoid unnecessary calls whilst syncing. + { + let head_slot = beacon_chain.head_info()?.slot; + + // Don't run this when syncing or if lagging too far behind. + if head_slot + MAX_ADVANCE_DISTANCE < current_slot { + return Err(Error::MaxDistanceExceeded { + current_slot, + head_slot, + }); + } + } + + // Run fork choice so we get the latest view of the head. + // + // This is useful since it's quite likely that the last time we ran fork choice was shortly + // after receiving the latest gossip block, but not necessarily after we've received the + // majority of attestations. + beacon_chain.fork_choice()?; + + let head_root = beacon_chain.head_info()?.block_root; + + let (head_slot, head_state_root, mut state) = match beacon_chain + .snapshot_cache + .try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) + .ok_or(BeaconChainError::SnapshotCacheLockTimeout)? + .get_for_state_advance(head_root) + { + StateAdvance::AlreadyAdvanced => { + return Err(Error::StateAlreadyAdvanced { + block_root: head_root, + }) + } + StateAdvance::BlockNotFound => return Err(Error::HeadMissingFromSnapshotCache(head_root)), + StateAdvance::State { + state, + state_root, + block_slot, + } => (block_slot, state_root, *state), + }; + + let initial_slot = state.slot; + let initial_epoch = state.current_epoch(); + + let state_root = if state.slot == head_slot { + Some(head_state_root) + } else { + // Protect against advancing a state more than a single slot. + // + // Advancing more than one slot without storing the intermediate state would corrupt the + // database. Future works might store temporary, intermediate states inside this function. + return Err(Error::BadStateSlot { + block_slot: head_slot, + state_slot: state.slot, + }); + }; + + // Advance the state a single slot. + if let Some(summary) = per_slot_processing(&mut state, state_root, &beacon_chain.spec) + .map_err(BeaconChainError::from)? + { + // Only notify the validator monitor for recent blocks. + if state.current_epoch() + VALIDATOR_MONITOR_HISTORIC_EPOCHS as u64 + >= current_slot.epoch(T::EthSpec::slots_per_epoch()) + { + // Potentially create logs/metrics for locally monitored validators. + beacon_chain + .validator_monitor + .read() + .process_validator_statuses(state.current_epoch(), &summary.statuses); + } + } + + debug!( + log, + "Advanced head state one slot"; + "head_root" => ?head_root, + "state_slot" => state.slot, + "current_slot" => current_slot, + ); + + // If the `pre_state` is in a later epoch than `state`, pre-emptively add the proposer + // shuffling for the next epoch into the cache. + if initial_epoch > state.current_epoch() { + debug!( + log, + "Priming proposer cache"; + "head_root" => ?head_root, + "state_epoch" => state.current_epoch(), + "current_epoch" => current_slot.epoch(T::EthSpec::slots_per_epoch()), + ); + beacon_chain + .beacon_proposer_cache + .lock() + .insert( + state.current_epoch(), + head_root, + state + .get_beacon_proposer_indices(&beacon_chain.spec) + .map_err(BeaconChainError::from)?, + state.fork, + ) + .map_err(BeaconChainError::from)?; + } + + let final_slot = state.slot; + + // Insert the advanced state back into the snapshot cache. + beacon_chain + .snapshot_cache + .try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) + .ok_or(BeaconChainError::SnapshotCacheLockTimeout)? + .update_pre_state(head_root, state) + .ok_or(Error::HeadMissingFromSnapshotCache(head_root))?; + + let current_slot = beacon_chain.slot()?; + if final_slot <= current_slot { + warn!( + log, + "State advance too slow"; + "head_root" => %head_root, + "advanced_slot" => final_slot, + "current_slot" => current_slot, + "initial_slot" => initial_slot, + "msg" => "system resources may be overloaded", + ); + } + + debug!( + log, + "Completed state advance"; + "head_root" => ?head_root, + "advanced_slot" => final_slot, + "initial_slot" => initial_slot, + ); + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn lock() { + let lock = Lock::new(); + assert_eq!(lock.lock(), false); + assert_eq!(lock.lock(), true); + assert_eq!(lock.lock(), true); + lock.unlock(); + assert_eq!(lock.lock(), false); + assert_eq!(lock.lock(), true); + } +} diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 753422238fe..afa76360c16 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -15,7 +15,7 @@ use genesis::interop_genesis_state; use parking_lot::Mutex; use rand::rngs::StdRng; use rand::Rng; -use rand_core::SeedableRng; +use rand::SeedableRng; use rayon::prelude::*; use slog::Logger; use slot_clock::TestingSlotClock; diff --git a/beacon_node/beacon_chain/src/validator_monitor.rs b/beacon_node/beacon_chain/src/validator_monitor.rs index ec906ed199f..c4f6f2393c2 100644 --- a/beacon_node/beacon_chain/src/validator_monitor.rs +++ b/beacon_node/beacon_chain/src/validator_monitor.rs @@ -4,8 +4,9 @@ use crate::metrics; use parking_lot::RwLock; -use slog::{crit, info, Logger}; +use slog::{crit, error, info, warn, Logger}; use slot_clock::SlotClock; +use state_processing::per_epoch_processing::ValidatorStatus; use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; use std::io; @@ -325,6 +326,103 @@ impl ValidatorMonitor { } } + pub fn process_validator_statuses(&self, epoch: Epoch, summaries: &[ValidatorStatus]) { + for monitored_validator in self.validators.values() { + // We subtract two from the state of the epoch that generated these summaries. + // + // - One to account for it being the previous epoch. + // - One to account for the state advancing an epoch whilst generating the validator + // statuses. + let prev_epoch = epoch - 2; + if let Some(i) = monitored_validator.index { + let i = i as usize; + let id = &monitored_validator.id; + + if let Some(summary) = summaries.get(i) { + if summary.is_previous_epoch_attester { + let lag = summary + .inclusion_info + .map(|i| format!("{} slot(s)", i.delay.saturating_sub(1).to_string())) + .unwrap_or_else(|| "??".to_string()); + + info!( + self.log, + "Previous epoch attestation success"; + "inclusion_lag" => lag, + "matched_target" => summary.is_previous_epoch_target_attester, + "matched_head" => summary.is_previous_epoch_head_attester, + "epoch" => prev_epoch, + "validator" => id, + + ) + } else if summary.is_active_in_previous_epoch + && !summary.is_previous_epoch_attester + { + error!( + self.log, + "Previous epoch attestation missing"; + "epoch" => prev_epoch, + "validator" => id, + ) + } + + if summary.is_previous_epoch_attester { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_ATTESTER_HIT, + &[id], + ); + } else { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_ATTESTER_MISS, + &[id], + ); + } + if summary.is_previous_epoch_head_attester { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_HEAD_ATTESTER_HIT, + &[id], + ); + } else { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_HEAD_ATTESTER_MISS, + &[id], + ); + warn!( + self.log, + "Attested to an incorrect head"; + "epoch" => prev_epoch, + "validator" => id, + ); + } + if summary.is_previous_epoch_target_attester { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_TARGET_ATTESTER_HIT, + &[id], + ); + } else { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_TARGET_ATTESTER_MISS, + &[id], + ); + warn!( + self.log, + "Attested to an incorrect target"; + "epoch" => prev_epoch, + "validator" => id, + ); + } + if let Some(inclusion_info) = summary.inclusion_info { + metrics::set_int_gauge( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_INCLUSION_DISTANCE, + &[id], + inclusion_info.delay as i64, + ); + } + } + } + } + } + fn get_validator_id(&self, validator_index: u64) -> Option<&str> { self.indices .get(&validator_index) @@ -945,9 +1043,18 @@ pub fn get_block_delay_ms( seen_timestamp: Duration, block: &BeaconBlock, slot_clock: &S, +) -> Duration { + get_slot_delay_ms::(seen_timestamp, block.slot, slot_clock) +} + +/// Returns the delay between the start of `slot` and `seen_timestamp`. +pub fn get_slot_delay_ms( + seen_timestamp: Duration, + slot: Slot, + slot_clock: &S, ) -> Duration { slot_clock - .start_of(block.slot) + .start_of(slot) .and_then(|slot_start| seen_timestamp.checked_sub(slot_start)) .unwrap_or_else(|| Duration::from_secs(0)) } diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index a082d08d0de..63182cc9fe8 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -1706,7 +1706,7 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) { for checkpoint in &chain_dump { // Check that the tree hash of the stored state is as expected assert_eq!( - checkpoint.beacon_state_root, + checkpoint.beacon_state_root(), checkpoint.beacon_state.tree_hash_root(), "tree hash of stored state is incorrect" ); @@ -1717,7 +1717,7 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) { harness .chain .store - .get_state(&checkpoint.beacon_state_root, None) + .get_state(&checkpoint.beacon_state_root(), None) .expect("no error") .expect("state exists") .slot, diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index d74f953a0f9..9281b1a5e6b 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -133,7 +133,7 @@ fn iterators() { assert_eq!( *state_roots.first().expect("should have some state roots"), - (head.beacon_state_root, head.beacon_state.slot), + (head.beacon_state_root(), head.beacon_state.slot), "first state root and slot should be for the head state" ); } diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index c47e74cebc6..200869ec478 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -5,6 +5,7 @@ use beacon_chain::{ builder::{BeaconChainBuilder, Witness}, eth1_chain::{CachingEth1Backend, Eth1Chain}, slot_clock::{SlotClock, SystemTimeSlotClock}, + state_advance_timer::spawn_state_advance_timer, store::{HotColdDB, ItemStore, LevelDB, StoreConfig}, BeaconChain, BeaconChainTypes, Eth1ChainBackend, ServerSentEventHandler, }; @@ -481,6 +482,12 @@ where self.start_slasher_service()?; } + if let Some(beacon_chain) = self.beacon_chain.as_ref() { + let state_advance_context = runtime_context.service_context("state_advance".into()); + let log = state_advance_context.log().clone(); + spawn_state_advance_timer(state_advance_context.executor, beacon_chain.clone(), log); + } + Ok(Client { beacon_chain: self.beacon_chain, network_globals: self.network_globals, diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index fc0ef8b4857..d998389a8db 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -893,7 +893,7 @@ impl Behaviour { } // perform gossipsub score updates when necessary - while let Poll::Ready(_) = self.update_gossipsub_scores.poll_tick(cx) { + while self.update_gossipsub_scores.poll_tick(cx).is_ready() { self.peer_manager.update_gossipsub_scores(&self.gossipsub); } diff --git a/beacon_node/eth2_libp2p/src/discovery/enr_ext.rs b/beacon_node/eth2_libp2p/src/discovery/enr_ext.rs index dde8b6c2fb2..30ace623ced 100644 --- a/beacon_node/eth2_libp2p/src/discovery/enr_ext.rs +++ b/beacon_node/eth2_libp2p/src/discovery/enr_ext.rs @@ -29,7 +29,7 @@ pub trait EnrExt { /// Extend ENR CombinedPublicKey for libp2p types. pub trait CombinedKeyPublicExt { /// Converts the publickey into a peer id, without consuming the key. - fn into_peer_id(&self) -> PeerId; + fn as_peer_id(&self) -> PeerId; } /// Extend ENR CombinedKey for conversion to libp2p keys. @@ -41,7 +41,7 @@ pub trait CombinedKeyExt { impl EnrExt for Enr { /// The libp2p `PeerId` for the record. fn peer_id(&self) -> PeerId { - self.public_key().into_peer_id() + self.public_key().as_peer_id() } /// Returns a list of multiaddrs if the ENR has an `ip` and either a `tcp` or `udp` key **or** an `ip6` and either a `tcp6` or `udp6`. @@ -195,7 +195,7 @@ impl CombinedKeyPublicExt for CombinedPublicKey { /// Converts the publickey into a peer id, without consuming the key. /// /// This is only available with the `libp2p` feature flag. - fn into_peer_id(&self) -> PeerId { + fn as_peer_id(&self) -> PeerId { match self { Self::Secp256k1(pk) => { let pk_bytes = pk.to_bytes(); diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index 4da3f69406b..eeef06d0791 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -972,7 +972,7 @@ impl Stream for PeerManager { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // perform the heartbeat when necessary - while let Poll::Ready(_) = self.heartbeat.poll_tick(cx) { + while self.heartbeat.poll_tick(cx).is_ready() { self.heartbeat(); } diff --git a/beacon_node/eth2_libp2p/src/rpc/rate_limiter.rs b/beacon_node/eth2_libp2p/src/rpc/rate_limiter.rs index 8e096f8164d..07aa6330f36 100644 --- a/beacon_node/eth2_libp2p/src/rpc/rate_limiter.rs +++ b/beacon_node/eth2_libp2p/src/rpc/rate_limiter.rs @@ -240,7 +240,7 @@ impl Future for RPCRateLimiter { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - while let Poll::Ready(_) = self.prune_interval.poll_tick(cx) { + while self.prune_interval.poll_tick(cx).is_ready() { self.prune(); } diff --git a/beacon_node/network/src/attestation_service/mod.rs b/beacon_node/network/src/attestation_service/mod.rs index da031877acd..b560e4e3f0d 100644 --- a/beacon_node/network/src/attestation_service/mod.rs +++ b/beacon_node/network/src/attestation_service/mod.rs @@ -584,13 +584,13 @@ impl AttestationService { /// We don't keep track of a specific validator to random subnet, rather the ratio of active /// validators to random subnets. So when a validator goes offline, we can simply remove the /// allocated amount of random subnets. - fn handle_known_validator_expiry(&mut self) -> Result<(), ()> { + fn handle_known_validator_expiry(&mut self) { let spec = &self.beacon_chain.spec; let subnet_count = spec.attestation_subnet_count; let random_subnets_per_validator = spec.random_subnets_per_validator; if self.known_validators.len() as u64 * random_subnets_per_validator >= subnet_count { // have too many validators, ignore - return Ok(()); + return; } let subscribed_subnets = self.random_subnets.keys().cloned().collect::>(); @@ -616,7 +616,6 @@ impl AttestationService { .push_back(AttServiceMessage::EnrRemove(*subnet_id)); self.random_subnets.remove(subnet_id); } - Ok(()) } } diff --git a/beacon_node/network/src/attestation_service/tests/mod.rs b/beacon_node/network/src/attestation_service/tests/mod.rs index 2462f2c94e6..4a625e0ec08 100644 --- a/beacon_node/network/src/attestation_service/tests/mod.rs +++ b/beacon_node/network/src/attestation_service/tests/mod.rs @@ -203,7 +203,11 @@ async fn subscribe_current_slot_wait_for_unsubscribe() { let events = get_events(&mut attestation_service, None, 1).await; assert_matches!( events[..3], - [AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any1), AttServiceMessage::EnrAdd(_any3)] + [ + AttServiceMessage::DiscoverPeers(_), + AttServiceMessage::Subscribe(_any1), + AttServiceMessage::EnrAdd(_any3) + ] ); // If the long lived and short lived subnets are the same, there should be no more events @@ -281,7 +285,11 @@ async fn test_same_subnet_unsubscription() { let events = get_events(&mut attestation_service, None, 1).await; assert_matches!( events[..3], - [AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any1), AttServiceMessage::EnrAdd(_any3)] + [ + AttServiceMessage::DiscoverPeers(_), + AttServiceMessage::Subscribe(_any1), + AttServiceMessage::EnrAdd(_any3) + ] ); let expected = AttServiceMessage::Subscribe(subnet_id1); diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 443aa0265e8..d9bf65b5efe 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -269,6 +269,11 @@ impl Worker { verified_block } Err(BlockError::ParentUnknown(block)) => { + debug!( + self.log, + "Unknown parent for gossip block"; + "root" => %block.canonical_root() + ); self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block)); return; } diff --git a/beacon_node/network/src/nat.rs b/beacon_node/network/src/nat.rs index 02444240dbb..4067fd2bc63 100644 --- a/beacon_node/network/src/nat.rs +++ b/beacon_node/network/src/nat.rs @@ -52,11 +52,7 @@ pub fn construct_upnp_mappings( // Just use the first IP of the first interface that is not a loopback and not an // ipv6 address. if !interface.is_loopback() { - if let IpAddr::V4(_) = interface.ip() { - Some(interface.ip()) - } else { - None - } + interface.ip().is_ipv4().then(|| interface.ip()) } else { None } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 2a68d5b02e7..35af5976a04 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -219,7 +219,7 @@ impl NetworkService { log: network_log, }; - spawn_service(executor, network_service)?; + spawn_service(executor, network_service); Ok((network_globals, network_send)) } @@ -228,44 +228,16 @@ impl NetworkService { fn spawn_service( executor: task_executor::TaskExecutor, mut service: NetworkService, -) -> error::Result<()> { - let mut exit_rx = executor.exit(); +) { let mut shutdown_sender = executor.shutdown_sender(); // spawn on the current executor - executor.spawn_without_exit(async move { + executor.spawn(async move { let mut metric_update_counter = 0; loop { // build the futures to check simultaneously tokio::select! { - // handle network shutdown - _ = (&mut exit_rx) => { - // network thread is terminating - let enrs = service.libp2p.swarm.enr_entries(); - debug!( - service.log, - "Persisting DHT to store"; - "Number of peers" => enrs.len(), - ); - match persist_dht::(service.store.clone(), enrs) { - Err(e) => error!( - service.log, - "Failed to persist DHT on drop"; - "error" => ?e - ), - Ok(_) => info!( - service.log, - "Saved DHT state"; - ), - } - - // attempt to remove port mappings - crate::nat::remove_mappings(service.upnp_mappings.0, service.upnp_mappings.1, &service.log); - - info!(service.log, "Network service shutdown"); - return; - } _ = service.metrics_update.tick() => { // update various network metrics metric_update_counter +=1; @@ -570,8 +542,6 @@ fn spawn_service( metrics::update_bandwidth_metrics(service.libp2p.bandwidth.clone()); } }, "network"); - - Ok(()) } /// Returns a `Sleep` that triggers shortly after the next change in the beacon chain fork version. @@ -585,3 +555,31 @@ fn next_fork_delay( tokio::time::sleep_until(tokio::time::Instant::now() + until_fork + delay) }) } + +impl Drop for NetworkService { + fn drop(&mut self) { + // network thread is terminating + let enrs = self.libp2p.swarm.enr_entries(); + debug!( + self.log, + "Persisting DHT to store"; + "Number of peers" => enrs.len(), + ); + match persist_dht::(self.store.clone(), enrs) { + Err(e) => error!( + self.log, + "Failed to persist DHT on drop"; + "error" => ?e + ), + Ok(_) => info!( + self.log, + "Saved DHT state"; + ), + } + + // attempt to remove port mappings + crate::nat::remove_mappings(self.upnp_mappings.0, self.upnp_mappings.1, &self.log); + + info!(self.log, "Network service shutdown"); + } +} diff --git a/common/account_utils/src/validator_definitions.rs b/common/account_utils/src/validator_definitions.rs index a38aebf40df..48643c9e87e 100644 --- a/common/account_utils/src/validator_definitions.rs +++ b/common/account_utils/src/validator_definitions.rs @@ -164,6 +164,12 @@ impl ValidatorDefinitions { }) .collect(); + let known_pubkeys: HashSet = self + .0 + .iter() + .map(|def| def.voting_public_key.clone()) + .collect(); + let mut new_defs = keystore_paths .into_iter() .filter_map(|voting_keystore_path| { @@ -200,7 +206,13 @@ impl ValidatorDefinitions { .filter(|path| path.exists()); let voting_public_key = match keystore.public_key() { - Some(pubkey) => pubkey, + Some(pubkey) => { + if known_pubkeys.contains(&pubkey) { + return None; + } else { + pubkey + } + } None => { error!( log, diff --git a/common/eth2_network_config/built_in_network_configs/mainnet/boot_enr.yaml b/common/eth2_network_config/built_in_network_configs/mainnet/boot_enr.yaml index 2f1b0980fd3..cca3aa5fc1d 100644 --- a/common/eth2_network_config/built_in_network_configs/mainnet/boot_enr.yaml +++ b/common/eth2_network_config/built_in_network_configs/mainnet/boot_enr.yaml @@ -1,9 +1,18 @@ # Lighthouse Team (Sigma Prime) -- enr:-Jq4QN6_FzIYyfJET9hiLcGUsg_EVOwCQ4bwsBwe0S4ElrfXUXufSYLtQAHU9_LuO9uice7EAaLbDlMK8QEhtyg8Oh4BhGV0aDKQtTA_KgAAAAD__________4JpZIJ2NIJpcIQDGh4giXNlY3AyNTZrMaECSHaY_36GdNjF8-CLfMSg-8lB0wce5VRZ96HkT9tSkVeDdWRwgiMo -- enr:-Jq4QMOjjkLYSN7GVAf_zBSS5c_MokSPMZZvmjLUYiuHrPLHInjeBtF1IfskuYlmhglGan2ECmPk89SRXr4FY1jVp5YBhGV0aDKQtTA_KgAAAAD__________4JpZIJ2NIJpcIQi8wB6iXNlY3AyNTZrMaEC0EiXxAB2QKZJuXnUwmf-KqbP9ZP7m9gsRxcYvoK9iTCDdWRwgiMo +- enr:-Jq4QFs9If3eUC8mHx6-BLVw0jRMbyEgXNn6sl7c77bBmji_afJ-0_X7Q4vttQ8SO8CYReudHsGVvgSybh1y96yyL-oChGV0aDKQtTA_KgAAAAD__________4JpZIJ2NIJpcIQ2_YtGiXNlY3AyNTZrMaECSHaY_36GdNjF8-CLfMSg-8lB0wce5VRZ96HkT9tSkVeDdWRwgiMo +- enr:-Jq4QA4kNIdO1FkIHpl5iqEKjJEjCVfp77aFulytCEPvEQOdbTTf6ucNmWSuXjlwvgka86gkpnCTv-V7CfBn4AMBRvIChGV0aDKQtTA_KgAAAAD__________4JpZIJ2NIJpcIQ22Gh-iXNlY3AyNTZrMaEC0EiXxAB2QKZJuXnUwmf-KqbP9ZP7m9gsRxcYvoK9iTCDdWRwgiMo +# EF Team +- enr:-Ku4QHqVeJ8PPICcWk1vSn_XcSkjOkNiTg6Fmii5j6vUQgvzMc9L1goFnLKgXqBJspJjIsB91LTOleFmyWWrFVATGngBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpC1MD8qAAAAAP__________gmlkgnY0gmlwhAMRHkWJc2VjcDI1NmsxoQKLVXFOhp2uX6jeT0DvvDpPcU8FWMjQdR4wMuORMhpX24N1ZHCCIyg +- enr:-Ku4QG-2_Md3sZIAUebGYT6g0SMskIml77l6yR-M_JXc-UdNHCmHQeOiMLbylPejyJsdAPsTHJyjJB2sYGDLe0dn8uYBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpC1MD8qAAAAAP__________gmlkgnY0gmlwhBLY-NyJc2VjcDI1NmsxoQORcM6e19T1T9gi7jxEZjk_sjVLGFscUNqAY9obgZaxbIN1ZHCCIyg +- enr:-Ku4QPn5eVhcoF1opaFEvg1b6JNFD2rqVkHQ8HApOKK61OIcIXD127bKWgAtbwI7pnxx6cDyk_nI88TrZKQaGMZj0q0Bh2F0dG5ldHOIAAAAAAAAAACEZXRoMpC1MD8qAAAAAP__________gmlkgnY0gmlwhDayLMaJc2VjcDI1NmsxoQK2sBOLGcUb4AwuYzFuAVCaNHA-dy24UuEKkeFNgCVCsIN1ZHCCIyg +- enr:-Ku4QEWzdnVtXc2Q0ZVigfCGggOVB2Vc1ZCPEc6j21NIFLODSJbvNaef1g4PxhPwl_3kax86YPheFUSLXPRs98vvYsoBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpC1MD8qAAAAAP__________gmlkgnY0gmlwhDZBrP2Jc2VjcDI1NmsxoQM6jr8Rb1ktLEsVcKAPa08wCsKUmvoQ8khiOl_SLozf9IN1ZHCCIyg # Teku team (Consensys) - enr:-KG4QOtcP9X1FbIMOe17QNMKqDxCpm14jcX5tiOE4_TyMrFqbmhPZHK_ZPG2Gxb1GE2xdtodOfx9-cgvNtxnRyHEmC0ghGV0aDKQ9aX9QgAAAAD__________4JpZIJ2NIJpcIQDE8KdiXNlY3AyNTZrMaEDhpehBDbZjM_L9ek699Y7vhUJ-eAdMyQW_Fil522Y0fODdGNwgiMog3VkcIIjKA +- enr:-KG4QDyytgmE4f7AnvW-ZaUOIi9i79qX4JwjRAiXBZCU65wOfBu-3Nb5I7b_Rmg3KCOcZM_C3y5pg7EBU5XGrcLTduQEhGV0aDKQ9aX9QgAAAAD__________4JpZIJ2NIJpcIQ2_DUbiXNlY3AyNTZrMaEDKnz_-ps3UUOfHWVYaskI5kWYO_vtYMGYCQRAR3gHDouDdGNwgiMog3VkcIIjKA # Prysm team (Prysmatic Labs) - enr:-Ku4QImhMc1z8yCiNJ1TyUxdcfNucje3BGwEHzodEZUan8PherEo4sF7pPHPSIB1NNuSg5fZy7qFsjmUKs2ea1Whi0EBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpD1pf1CAAAAAP__________gmlkgnY0gmlwhBLf22SJc2VjcDI1NmsxoQOVphkDqal4QzPMksc5wnpuC3gvSC8AfbFOnZY_On34wIN1ZHCCIyg - enr:-Ku4QP2xDnEtUXIjzJ_DhlCRN9SN99RYQPJL92TMlSv7U5C1YnYLjwOQHgZIUXw6c-BvRg2Yc2QsZxxoS_pPRVe0yK8Bh2F0dG5ldHOIAAAAAAAAAACEZXRoMpD1pf1CAAAAAP__________gmlkgnY0gmlwhBLf22SJc2VjcDI1NmsxoQMeFF5GrS7UZpAH2Ly84aLK-TyvH-dRo0JM1i8yygH50YN1ZHCCJxA - enr:-Ku4QPp9z1W4tAO8Ber_NQierYaOStqhDqQdOPY3bB3jDgkjcbk6YrEnVYIiCBbTxuar3CzS528d2iE7TdJsrL-dEKoBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpD1pf1CAAAAAP__________gmlkgnY0gmlwhBLf22SJc2VjcDI1NmsxoQMw5fqqkw2hHC4F5HZZDPsNmPdB1Gi8JPQK7pRc9XHh-oN1ZHCCKvg +# Nimbus team +- enr:-LK4QA8FfhaAjlb_BXsXxSfiysR7R52Nhi9JBt4F8SPssu8hdE1BXQQEtVDC3qStCW60LSO7hEsVHv5zm8_6Vnjhcn0Bh2F0dG5ldHOIAAAAAAAAAACEZXRoMpC1MD8qAAAAAP__________gmlkgnY0gmlwhAN4aBKJc2VjcDI1NmsxoQJerDhsJ-KxZ8sHySMOCmTO6sHM3iCFQ6VMvLTe948MyYN0Y3CCI4yDdWRwgiOM +- enr:-LK4QKWrXTpV9T78hNG6s8AM6IO4XH9kFT91uZtFg1GcsJ6dKovDOr1jtAAFPnS2lvNltkOGA9k29BUN7lFh_sjuc9QBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpC1MD8qAAAAAP__________gmlkgnY0gmlwhANAdd-Jc2VjcDI1NmsxoQLQa6ai7y9PMN5hpLe5HmiJSlYzMuzP7ZhwRiwHvqNXdoN0Y3CCI4yDdWRwgiOM diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index 6697af80667..8acc55b055a 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -3,8 +3,8 @@ use std::marker::PhantomData; use proto_array::{Block as ProtoBlock, ProtoArrayForkChoice}; use ssz_derive::{Decode, Encode}; use types::{ - BeaconBlock, BeaconState, BeaconStateError, Checkpoint, Epoch, EthSpec, Hash256, - IndexedAttestation, RelativeEpoch, ShufflingId, Slot, + AttestationShufflingId, BeaconBlock, BeaconState, BeaconStateError, Checkpoint, Epoch, EthSpec, + Hash256, IndexedAttestation, RelativeEpoch, Slot, }; use crate::ForkChoiceStore; @@ -247,10 +247,10 @@ where let finalized_block_slot = genesis_block.slot; let finalized_block_state_root = genesis_block.state_root; let current_epoch_shuffling_id = - ShufflingId::new(genesis_block_root, genesis_state, RelativeEpoch::Current) + AttestationShufflingId::new(genesis_block_root, genesis_state, RelativeEpoch::Current) .map_err(Error::BeaconStateError)?; let next_epoch_shuffling_id = - ShufflingId::new(genesis_block_root, genesis_state, RelativeEpoch::Next) + AttestationShufflingId::new(genesis_block_root, genesis_state, RelativeEpoch::Next) .map_err(Error::BeaconStateError)?; let proto_array = ProtoArrayForkChoice::new( @@ -543,10 +543,18 @@ where root: block_root, parent_root: Some(block.parent_root), target_root, - current_epoch_shuffling_id: ShufflingId::new(block_root, state, RelativeEpoch::Current) - .map_err(Error::BeaconStateError)?, - next_epoch_shuffling_id: ShufflingId::new(block_root, state, RelativeEpoch::Next) - .map_err(Error::BeaconStateError)?, + current_epoch_shuffling_id: AttestationShufflingId::new( + block_root, + state, + RelativeEpoch::Current, + ) + .map_err(Error::BeaconStateError)?, + next_epoch_shuffling_id: AttestationShufflingId::new( + block_root, + state, + RelativeEpoch::Next, + ) + .map_err(Error::BeaconStateError)?, state_root: block.state_root, justified_epoch: state.current_justified_checkpoint.epoch, finalized_epoch: state.finalized_checkpoint.epoch, diff --git a/consensus/proto_array/src/fork_choice_test_definition.rs b/consensus/proto_array/src/fork_choice_test_definition.rs index 9cac0bafb10..688878e1ae5 100644 --- a/consensus/proto_array/src/fork_choice_test_definition.rs +++ b/consensus/proto_array/src/fork_choice_test_definition.rs @@ -4,7 +4,7 @@ mod votes; use crate::proto_array_fork_choice::{Block, ProtoArrayForkChoice}; use serde_derive::{Deserialize, Serialize}; -use types::{Epoch, Hash256, ShufflingId, Slot}; +use types::{AttestationShufflingId, Epoch, Hash256, Slot}; pub use ffg_updates::*; pub use no_votes::*; @@ -55,7 +55,8 @@ pub struct ForkChoiceTestDefinition { impl ForkChoiceTestDefinition { pub fn run(self) { - let junk_shuffling_id = ShufflingId::from_components(Epoch::new(0), Hash256::zero()); + let junk_shuffling_id = + AttestationShufflingId::from_components(Epoch::new(0), Hash256::zero()); let mut fork_choice = ProtoArrayForkChoice::new( self.finalized_block_slot, Hash256::zero(), @@ -128,11 +129,11 @@ impl ForkChoiceTestDefinition { parent_root: Some(parent_root), state_root: Hash256::zero(), target_root: Hash256::zero(), - current_epoch_shuffling_id: ShufflingId::from_components( + current_epoch_shuffling_id: AttestationShufflingId::from_components( Epoch::new(0), Hash256::zero(), ), - next_epoch_shuffling_id: ShufflingId::from_components( + next_epoch_shuffling_id: AttestationShufflingId::from_components( Epoch::new(0), Hash256::zero(), ), diff --git a/consensus/proto_array/src/proto_array.rs b/consensus/proto_array/src/proto_array.rs index 3a1d6ccc4b6..c0d8500bd8a 100644 --- a/consensus/proto_array/src/proto_array.rs +++ b/consensus/proto_array/src/proto_array.rs @@ -2,7 +2,7 @@ use crate::{error::Error, Block}; use serde_derive::{Deserialize, Serialize}; use ssz_derive::{Decode, Encode}; use std::collections::HashMap; -use types::{Epoch, Hash256, ShufflingId, Slot}; +use types::{AttestationShufflingId, Epoch, Hash256, Slot}; #[derive(Clone, PartialEq, Debug, Encode, Decode, Serialize, Deserialize)] pub struct ProtoNode { @@ -18,8 +18,8 @@ pub struct ProtoNode { /// The `target_root` is not necessary for `ProtoArray` either, it also just exists for upstream /// components (namely fork choice attestation verification). pub target_root: Hash256, - pub current_epoch_shuffling_id: ShufflingId, - pub next_epoch_shuffling_id: ShufflingId, + pub current_epoch_shuffling_id: AttestationShufflingId, + pub next_epoch_shuffling_id: AttestationShufflingId, pub root: Hash256, pub parent: Option, pub justified_epoch: Epoch, diff --git a/consensus/proto_array/src/proto_array_fork_choice.rs b/consensus/proto_array/src/proto_array_fork_choice.rs index c3bffb9c2d0..3e27867410b 100644 --- a/consensus/proto_array/src/proto_array_fork_choice.rs +++ b/consensus/proto_array/src/proto_array_fork_choice.rs @@ -4,7 +4,7 @@ use crate::ssz_container::SszContainer; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use std::collections::HashMap; -use types::{Epoch, Hash256, ShufflingId, Slot}; +use types::{AttestationShufflingId, Epoch, Hash256, Slot}; pub const DEFAULT_PRUNE_THRESHOLD: usize = 256; @@ -25,8 +25,8 @@ pub struct Block { pub parent_root: Option, pub state_root: Hash256, pub target_root: Hash256, - pub current_epoch_shuffling_id: ShufflingId, - pub next_epoch_shuffling_id: ShufflingId, + pub current_epoch_shuffling_id: AttestationShufflingId, + pub next_epoch_shuffling_id: AttestationShufflingId, pub justified_epoch: Epoch, pub finalized_epoch: Epoch, } @@ -72,8 +72,8 @@ impl ProtoArrayForkChoice { justified_epoch: Epoch, finalized_epoch: Epoch, finalized_root: Hash256, - current_epoch_shuffling_id: ShufflingId, - next_epoch_shuffling_id: ShufflingId, + current_epoch_shuffling_id: AttestationShufflingId, + next_epoch_shuffling_id: AttestationShufflingId, ) -> Result { let mut proto_array = ProtoArray { prune_threshold: DEFAULT_PRUNE_THRESHOLD, @@ -349,7 +349,8 @@ mod test_compute_deltas { let finalized_desc = Hash256::from_low_u64_be(2); let not_finalized_desc = Hash256::from_low_u64_be(3); let unknown = Hash256::from_low_u64_be(4); - let junk_shuffling_id = ShufflingId::from_components(Epoch::new(0), Hash256::zero()); + let junk_shuffling_id = + AttestationShufflingId::from_components(Epoch::new(0), Hash256::zero()); let mut fc = ProtoArrayForkChoice::new( genesis_slot, diff --git a/consensus/state_processing/src/per_block_processing/signature_sets.rs b/consensus/state_processing/src/per_block_processing/signature_sets.rs index 52aa743a562..6dd0c660bc8 100644 --- a/consensus/state_processing/src/per_block_processing/signature_sets.rs +++ b/consensus/state_processing/src/per_block_processing/signature_sets.rs @@ -156,14 +156,14 @@ where get_pubkey(proposer_index) .ok_or_else(|| Error::ValidatorUnknown(proposer_index as u64))?, spec, - )?, + ), block_header_signature_set( state, &proposer_slashing.signed_header_2, get_pubkey(proposer_index) .ok_or_else(|| Error::ValidatorUnknown(proposer_index as u64))?, spec, - )?, + ), )) } @@ -173,7 +173,7 @@ fn block_header_signature_set<'a, T: EthSpec>( signed_header: &'a SignedBeaconBlockHeader, pubkey: Cow<'a, PublicKey>, spec: &'a ChainSpec, -) -> Result> { +) -> SignatureSet<'a> { let domain = spec.get_domain( signed_header.message.slot.epoch(T::slots_per_epoch()), Domain::BeaconProposer, @@ -183,11 +183,7 @@ fn block_header_signature_set<'a, T: EthSpec>( let message = signed_header.message.signing_root(domain); - Ok(SignatureSet::single_pubkey( - &signed_header.signature, - pubkey, - message, - )) + SignatureSet::single_pubkey(&signed_header.signature, pubkey, message) } /// Returns the signature set for the given `indexed_attestation`. diff --git a/consensus/state_processing/src/per_epoch_processing.rs b/consensus/state_processing/src/per_epoch_processing.rs index 19b87aa57b9..cc1464eef93 100644 --- a/consensus/state_processing/src/per_epoch_processing.rs +++ b/consensus/state_processing/src/per_epoch_processing.rs @@ -18,6 +18,7 @@ pub use validator_statuses::{TotalBalances, ValidatorStatus, ValidatorStatuses}; /// Provides a summary of validator participation during the epoch. pub struct EpochProcessingSummary { pub total_balances: TotalBalances, + pub statuses: Vec, } /// Performs per-epoch processing on some BeaconState. @@ -65,6 +66,7 @@ pub fn per_epoch_processing( Ok(EpochProcessingSummary { total_balances: validator_statuses.total_balances, + statuses: validator_statuses.statuses, }) } diff --git a/consensus/swap_or_not_shuffle/src/compute_shuffled_index.rs b/consensus/swap_or_not_shuffle/src/compute_shuffled_index.rs index e8cd82e5f40..6f863525b22 100644 --- a/consensus/swap_or_not_shuffle/src/compute_shuffled_index.rs +++ b/consensus/swap_or_not_shuffle/src/compute_shuffled_index.rs @@ -35,18 +35,22 @@ pub fn compute_shuffled_index( let mut index = index; for round in 0..shuffle_round_count { let pivot = bytes_to_int64(&hash_with_round(seed, round)[..]) as usize % list_size; - index = do_round(seed, index, pivot, round, list_size)?; + index = do_round(seed, index, pivot, round, list_size); } Some(index) } -fn do_round(seed: &[u8], index: usize, pivot: usize, round: u8, list_size: usize) -> Option { +fn do_round(seed: &[u8], index: usize, pivot: usize, round: u8, list_size: usize) -> usize { let flip = (pivot + (list_size - index)) % list_size; let position = max(index, flip); let source = hash_with_round_and_position(seed, round, position); let byte = source[(position % 256) / 8]; let bit = (byte >> (position % 8)) % 2; - Some(if bit == 1 { flip } else { index }) + if bit == 1 { + flip + } else { + index + } } fn hash_with_round_and_position(seed: &[u8], round: u8, position: usize) -> Hash256 { diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index 2eec1f0b8e9..69419dfacdb 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -527,6 +527,24 @@ impl BeaconState { self.compute_proposer_index(&indices, &seed, spec) } + /// Returns the beacon proposer index for each `slot` in `self.current_epoch()`. + /// + /// The returned `Vec` contains one proposer index for each slot. For example, if + /// `state.current_epoch() == 1`, then `vec[0]` refers to slot `32` and `vec[1]` refers to slot + /// `33`. It will always be the case that `vec.len() == SLOTS_PER_EPOCH`. + pub fn get_beacon_proposer_indices(&self, spec: &ChainSpec) -> Result, Error> { + // Not using the cached validator indices since they are shuffled. + let indices = self.get_active_validator_indices(self.current_epoch(), spec)?; + + self.current_epoch() + .slot_iter(T::slots_per_epoch()) + .map(|slot| { + let seed = self.get_beacon_proposer_seed(slot, spec)?; + self.compute_proposer_index(&indices, &seed, spec) + }) + .collect() + } + /// Compute the seed to use for the beacon proposer selection at the given `slot`. /// /// Spec v0.12.1 diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index b9b1950ae7b..be86daad56d 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -86,7 +86,7 @@ pub use crate::pending_attestation::PendingAttestation; pub use crate::proposer_slashing::ProposerSlashing; pub use crate::relative_epoch::{Error as RelativeEpochError, RelativeEpoch}; pub use crate::selection_proof::SelectionProof; -pub use crate::shuffling_id::ShufflingId; +pub use crate::shuffling_id::AttestationShufflingId; pub use crate::signed_aggregate_and_proof::SignedAggregateAndProof; pub use crate::signed_beacon_block::{SignedBeaconBlock, SignedBeaconBlockHash}; pub use crate::signed_beacon_block_header::SignedBeaconBlockHeader; diff --git a/consensus/types/src/shuffling_id.rs b/consensus/types/src/shuffling_id.rs index d54b5fa640f..d2c501083e4 100644 --- a/consensus/types/src/shuffling_id.rs +++ b/consensus/types/src/shuffling_id.rs @@ -15,12 +15,12 @@ use std::hash::Hash; /// /// The struct stores exactly that 2-tuple. #[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize, Encode, Decode)] -pub struct ShufflingId { +pub struct AttestationShufflingId { pub shuffling_epoch: Epoch, - shuffling_decision_block: Hash256, + pub shuffling_decision_block: Hash256, } -impl ShufflingId { +impl AttestationShufflingId { /// Using the given `state`, return the shuffling id for the shuffling at the given /// `relative_epoch`. /// diff --git a/validator_client/src/initialized_validators.rs b/validator_client/src/initialized_validators.rs index f36b2b1175d..92c07d4b9c1 100644 --- a/validator_client/src/initialized_validators.rs +++ b/validator_client/src/initialized_validators.rs @@ -238,29 +238,29 @@ fn unlock_keystore_via_stdin_password( keystore: &Keystore, keystore_path: &PathBuf, ) -> Result<(ZeroizeString, Keypair), Error> { - eprintln!(""); + eprintln!(); eprintln!( "The {} file does not contain either of the following fields for {:?}:", CONFIG_FILENAME, keystore_path ); - eprintln!(""); + eprintln!(); eprintln!(" - voting_keystore_password"); eprintln!(" - voting_keystore_password_path"); - eprintln!(""); + eprintln!(); eprintln!( "You may exit and update {} or enter a password. \ If you choose to enter a password now then this prompt \ will be raised next time the validator is started.", CONFIG_FILENAME ); - eprintln!(""); + eprintln!(); eprintln!("Enter password (or press Ctrl+c to exit):"); loop { let password = read_password_from_user(USE_STDIN).map_err(Error::UnableToReadPasswordFromUser)?; - eprintln!(""); + eprintln!(); match keystore.decrypt_keypair(password.as_ref()) { Ok(keystore) => break Ok((password, keystore)),