diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index 4abbddb3373..c4abd56e2e5 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -2,14 +2,17 @@ use derivative::Derivative; use slot_clock::SlotClock; use std::sync::Arc; -use crate::beacon_chain::{BeaconChain, BeaconChainTypes, MAXIMUM_GOSSIP_CLOCK_DISPARITY}; +use crate::beacon_chain::{ + BeaconChain, BeaconChainTypes, MAXIMUM_GOSSIP_CLOCK_DISPARITY, + VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT, +}; use crate::{kzg_utils, BeaconChainError}; use state_processing::per_block_processing::eip4844::eip4844::verify_kzg_commitments_against_transactions; use types::signed_beacon_block::BlobReconstructionError; use types::{ BeaconBlockRef, BeaconStateError, BlobsSidecar, EthSpec, Hash256, KzgCommitment, - SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, SignedBeaconBlockHeader, Slot, - Transactions, + SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, SignedBeaconBlockHeader, + SignedBlobSidecar, Slot, Transactions, }; use types::{Epoch, ExecPayload}; @@ -62,6 +65,52 @@ pub enum BlobError { UnavailableBlobs, /// Blobs provided for a pre-Eip4844 fork. InconsistentFork, + + /// The `blobs_sidecar.message.beacon_block_root` block is unknown. + /// + /// ## Peer scoring + /// + /// The blob points to a block we have not yet imported. The blob cannot be imported + /// into fork choice yet + UnknownHeadBlock { + beacon_block_root: Hash256, + }, + + /// The `BlobSidecar` was gossiped over an incorrect subnet. + InvalidSubnet { + expected: u64, + received: u64, + }, + + /// The sidecar corresponds to a slot older than the finalized head slot. + PastFinalizedSlot { + blob_slot: Slot, + finalized_slot: Slot, + }, + + /// The proposer index specified in the sidecar does not match the locally computed + /// proposer index. + ProposerIndexMismatch { + sidecar: usize, + local: usize, + }, + + ProposerSignatureInvalid, + + /// A sidecar with same slot, beacon_block_root and proposer_index but different blob is received for + /// the same blob index. + RepeatSidecar { + proposer: usize, + slot: Slot, + blob_index: usize, + }, + + /// The proposal_index corresponding to blob.beacon_block_root is not known. + /// + /// ## Peer scoring + /// + /// The block is invalid and the peer is faulty. + UnknownValidator(u64), } impl From for BlobError { @@ -115,6 +164,127 @@ pub fn validate_blob_for_gossip( block_wrapper.into_available_block(block_root, chain) } +pub fn validate_blob_sidecar_for_gossip( + blob_sidecar: SignedBlobSidecar, + subnet: u64, + chain: &BeaconChain, +) -> Result<(), BlobError> { + let blob_slot = blob_sidecar.message.slot; + let blob_index = blob_sidecar.message.index; + let block_root = blob_sidecar.message.block_root; + + // Verify that the blob_sidecar was received on the correct subnet. + if blob_index != subnet { + return Err(BlobError::InvalidSubnet { + expected: blob_index, + received: subnet, + }); + } + + // Verify that the sidecar is not from a future slot. + let latest_permissible_slot = chain + .slot_clock + .now_with_future_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY) + .ok_or(BeaconChainError::UnableToReadSlot)?; + if blob_slot > latest_permissible_slot { + return Err(BlobError::FutureSlot { + message_slot: blob_slot, + latest_permissible_slot, + }); + } + + // TODO(pawan): Verify not from a past slot? + + // Verify that the sidecar slot is greater than the latest finalized slot + let latest_finalized_slot = chain + .head() + .finalized_checkpoint() + .epoch + .start_slot(T::EthSpec::slots_per_epoch()); + if blob_slot <= latest_finalized_slot { + return Err(BlobError::PastFinalizedSlot { + blob_slot, + finalized_slot: latest_finalized_slot, + }); + } + + // TODO(pawan): should we verify locally that the parent root is correct + // or just use whatever the proposer gives us? + let proposer_shuffling_root = blob_sidecar.message.block_parent_root; + + let (proposer_index, fork) = match chain + .beacon_proposer_cache + .lock() + .get_slot::(proposer_shuffling_root, blob_slot) + { + Some(proposer) => (proposer.index, proposer.fork), + None => { + let state = &chain.canonical_head.cached_head().snapshot.beacon_state; + ( + state.get_beacon_proposer_index(blob_slot, &chain.spec)?, + state.fork(), + ) + } + }; + + let blob_proposer_index = blob_sidecar.message.proposer_index; + if proposer_index != blob_proposer_index { + return Err(BlobError::ProposerIndexMismatch { + sidecar: blob_proposer_index, + local: proposer_index, + }); + } + + let signature_is_valid = { + let pubkey_cache = chain + .validator_pubkey_cache + .try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT) + .ok_or(BeaconChainError::ValidatorPubkeyCacheLockTimeout) + .map_err(BlobError::BeaconChainError)?; + + let pubkey = pubkey_cache + .get(proposer_index as usize) + .ok_or_else(|| BlobError::UnknownValidator(proposer_index as u64))?; + + blob_sidecar.verify_signature( + None, + pubkey, + &fork, + chain.genesis_validators_root, + &chain.spec, + ) + }; + + if !signature_is_valid { + return Err(BlobError::ProposerSignatureInvalid); + } + + // TODO(pawan): kzg validations. + + // TODO(pawan): Check if other blobs for the same proposer index and blob index have been + // received and drop if required. + + // TODO(pawan): potentially add to a seen cache at this point. + + // Verify if the corresponding block for this blob has been received. + // Note: this should be the last gossip check so that we can forward the blob + // over the gossip network even if we haven't received the corresponding block yet + // as all other validations have passed. + let block_opt = chain + .canonical_head + .fork_choice_read_lock() + .get_block(&block_root) + .or_else(|| chain.early_attester_cache.get_proto_block(block_root)); // TODO(pawan): should we be checking this cache? + + if block_opt.is_none() { + return Err(BlobError::UnknownHeadBlock { + beacon_block_root: block_root, + }); + } + + Ok(()) +} + fn verify_data_availability( blob_sidecar: &BlobsSidecar, kzg_commitments: &[KzgCommitment], diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index a9a0bc9c6be..0272d835be2 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -1,5 +1,4 @@ use crate::metrics; -use beacon_chain::blob_verification::{AsBlock, BlockWrapper, IntoAvailableBlock}; use beacon_chain::validator_monitor::{get_block_delay_ms, timestamp_now}; use beacon_chain::NotifyExecutionLayer; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, CountUnrealized}; @@ -12,7 +11,7 @@ use tokio::sync::mpsc::UnboundedSender; use tree_hash::TreeHash; use types::{ AbstractExecPayload, BlindedPayload, EthSpec, ExecPayload, ExecutionBlockHash, FullPayload, - Hash256, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, + Hash256, SignedBeaconBlock, }; use warp::Rejection; @@ -32,51 +31,33 @@ pub async fn publish_block( // Send the block, regardless of whether or not it is valid. The API // specification is very clear that this is the desired behaviour. - let wrapped_block: BlockWrapper = - if matches!(block.as_ref(), &SignedBeaconBlock::Eip4844(_)) { - if let Some(sidecar) = chain.blob_cache.pop(&block_root) { - let block_and_blobs = SignedBeaconBlockAndBlobsSidecar { - beacon_block: block, - blobs_sidecar: Arc::new(sidecar), - }; - crate::publish_pubsub_message( - network_tx, - PubsubMessage::BeaconBlockAndBlobsSidecars(block_and_blobs.clone()), - )?; - block_and_blobs.into() - } else { - //FIXME(sean): This should probably return a specific no-blob-cached error code, beacon API coordination required - return Err(warp_utils::reject::broadcast_without_import(format!( - "no blob cached for block" - ))); - } - } else { - crate::publish_pubsub_message(network_tx, PubsubMessage::BeaconBlock(block.clone()))?; - block.into() - }; + crate::publish_pubsub_message(network_tx, PubsubMessage::BeaconBlock(block.clone()))?; + + /* TODO: publish all blob sidecars associated with this block */ // Determine the delay after the start of the slot, register it with metrics. - let block = wrapped_block.as_block(); let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock); metrics::observe_duration(&metrics::HTTP_API_BLOCK_BROADCAST_DELAY_TIMES, delay); - let available_block = match wrapped_block.into_available_block(block_root, &chain) { - Ok(available_block) => available_block, - Err(e) => { - let msg = format!("{:?}", e); - error!( - log, - "Invalid block provided to HTTP API"; - "reason" => &msg - ); - return Err(warp_utils::reject::broadcast_without_import(msg)); - } - }; + /* TODO: check availability of block */ + + // let available_block = match wrapped_block.into_available_block(block_root, &chain) { + // Ok(available_block) => available_block, + // Err(e) => { + // let msg = format!("{:?}", e); + // error!( + // log, + // "Invalid block provided to HTTP API"; + // "reason" => &msg + // ); + // return Err(warp_utils::reject::broadcast_without_import(msg)); + // } + // }; match chain .process_block( block_root, - available_block.clone(), + block.clone(), CountUnrealized::True, NotifyExecutionLayer::Yes, ) @@ -88,14 +69,14 @@ pub async fn publish_block( "Valid block from HTTP API"; "block_delay" => ?delay, "root" => format!("{}", root), - "proposer_index" => available_block.message().proposer_index(), - "slot" => available_block.slot(), + "proposer_index" => block.message().proposer_index(), + "slot" => block.slot(), ); // Notify the validator monitor. chain.validator_monitor.read().register_api_block( seen_timestamp, - available_block.message(), + block.message(), root, &chain.slot_clock, ); @@ -117,7 +98,7 @@ pub async fn publish_block( "Block was broadcast too late"; "msg" => "system may be overloaded, block likely to be orphaned", "delay_ms" => delay.as_millis(), - "slot" => available_block.slot(), + "slot" => block.slot(), "root" => ?root, ) } else if delay >= delayed_threshold { @@ -126,7 +107,7 @@ pub async fn publish_block( "Block broadcast was delayed"; "msg" => "system may be overloaded, block may be orphaned", "delay_ms" => delay.as_millis(), - "slot" => available_block.slot(), + "slot" => block.slot(), "root" => ?root, ) } @@ -138,7 +119,7 @@ pub async fn publish_block( log, "Block from HTTP API already known"; "block" => ?block_root, - "slot" => available_block.slot(), + "slot" => block.slot(), ); Ok(()) } diff --git a/beacon_node/lighthouse_network/src/service/gossip_cache.rs b/beacon_node/lighthouse_network/src/service/gossip_cache.rs index d3971a7d742..a6e003934cd 100644 --- a/beacon_node/lighthouse_network/src/service/gossip_cache.rs +++ b/beacon_node/lighthouse_network/src/service/gossip_cache.rs @@ -21,7 +21,7 @@ pub struct GossipCache { /// Timeout for blocks. beacon_block: Option, /// Timeout for blobs. - beacon_block_and_blobs_sidecar: Option, + blob_sidecar: Option, /// Timeout for aggregate attestations. aggregates: Option, /// Timeout for attestations. @@ -50,7 +50,7 @@ pub struct GossipCacheBuilder { /// Timeout for blocks. beacon_block: Option, /// Timeout for blob sidecars. - beacon_block_and_blobs_sidecar: Option, + blob_sidecar: Option, /// Timeout for aggregate attestations. aggregates: Option, /// Timeout for attestations. @@ -151,7 +151,7 @@ impl GossipCacheBuilder { let GossipCacheBuilder { default_timeout, beacon_block, - beacon_block_and_blobs_sidecar, + blob_sidecar, aggregates, attestation, voluntary_exit, @@ -167,7 +167,7 @@ impl GossipCacheBuilder { expirations: DelayQueue::default(), topic_msgs: HashMap::default(), beacon_block: beacon_block.or(default_timeout), - beacon_block_and_blobs_sidecar: beacon_block_and_blobs_sidecar.or(default_timeout), + blob_sidecar: blob_sidecar.or(default_timeout), aggregates: aggregates.or(default_timeout), attestation: attestation.or(default_timeout), voluntary_exit: voluntary_exit.or(default_timeout), @@ -193,7 +193,7 @@ impl GossipCache { pub fn insert(&mut self, topic: GossipTopic, data: Vec) { let expire_timeout = match topic.kind() { GossipKind::BeaconBlock => self.beacon_block, - GossipKind::BeaconBlocksAndBlobsSidecar => self.beacon_block_and_blobs_sidecar, + GossipKind::BlobSidecar(_) => self.blob_sidecar, GossipKind::BeaconAggregateAndProof => self.aggregates, GossipKind::Attestation(_) => self.attestation, GossipKind::VoluntaryExit => self.voluntary_exit, diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index e7f8d89457c..2271a60e1c8 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -234,6 +234,7 @@ impl Network { possible_fork_digests, ctx.chain_spec.attestation_subnet_count, SYNC_COMMITTEE_SUBNET_COUNT, + 4, // TODO(pawan): get this from chainspec ), max_subscribed_topics: 200, max_subscriptions_per_request: 150, // 148 in theory = (64 attestation + 4 sync committee + 6 core topics) * 2 diff --git a/beacon_node/lighthouse_network/src/service/utils.rs b/beacon_node/lighthouse_network/src/service/utils.rs index 383b78abf24..e0eb3ff50db 100644 --- a/beacon_node/lighthouse_network/src/service/utils.rs +++ b/beacon_node/lighthouse_network/src/service/utils.rs @@ -236,6 +236,7 @@ pub(crate) fn create_whitelist_filter( possible_fork_digests: Vec<[u8; 4]>, attestation_subnet_count: u64, sync_committee_subnet_count: u64, + blob_sidecar_subnet_count: u64, ) -> WhitelistSubscriptionFilter { let mut possible_hashes = HashSet::new(); for fork_digest in possible_fork_digests { @@ -255,13 +256,15 @@ pub(crate) fn create_whitelist_filter( add(BlsToExecutionChange); add(LightClientFinalityUpdate); add(LightClientOptimisticUpdate); - add(BeaconBlocksAndBlobsSidecar); for id in 0..attestation_subnet_count { add(Attestation(SubnetId::new(id))); } for id in 0..sync_committee_subnet_count { add(SyncCommitteeMessage(SyncSubnetId::new(id))); } + for id in 0..blob_sidecar_subnet_count { + add(BlobSidecar(id)); + } } WhitelistSubscriptionFilter(possible_hashes) } diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index 7951a072438..a820fefccd6 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -11,8 +11,8 @@ use std::sync::Arc; use types::{ Attestation, AttesterSlashing, EthSpec, ForkContext, ForkName, LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, - SignedBeaconBlockAltair, SignedBeaconBlockAndBlobsSidecar, SignedBeaconBlockBase, - SignedBeaconBlockCapella, SignedBeaconBlockMerge, SignedBlsToExecutionChange, + SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockCapella, + SignedBeaconBlockMerge, SignedBlobSidecar, SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId, }; @@ -20,8 +20,8 @@ use types::{ pub enum PubsubMessage { /// Gossipsub message providing notification of a new block. BeaconBlock(Arc>), - /// Gossipsub message providing notification of a new SignedBeaconBlock coupled with a blobs sidecar. - BeaconBlockAndBlobsSidecars(SignedBeaconBlockAndBlobsSidecar), + /// Gossipsub message providing notification of a BlobSidecar along with the subnet id where it was received.. + BlobSidecar((u64, Box>)), /// Gossipsub message providing notification of a Aggregate attestation and associated proof. AggregateAndProofAttestation(Box>), /// Gossipsub message providing notification of a raw un-aggregated attestation with its shard id. @@ -115,9 +115,7 @@ impl PubsubMessage { pub fn kind(&self) -> GossipKind { match self { PubsubMessage::BeaconBlock(_) => GossipKind::BeaconBlock, - PubsubMessage::BeaconBlockAndBlobsSidecars(_) => { - GossipKind::BeaconBlocksAndBlobsSidecar - } + PubsubMessage::BlobSidecar((subnet, _blob_sidecar)) => GossipKind::BlobSidecar(*subnet), PubsubMessage::AggregateAndProofAttestation(_) => GossipKind::BeaconAggregateAndProof, PubsubMessage::Attestation(attestation_data) => { GossipKind::Attestation(attestation_data.0) @@ -203,15 +201,15 @@ impl PubsubMessage { }; Ok(PubsubMessage::BeaconBlock(Arc::new(beacon_block))) } - GossipKind::BeaconBlocksAndBlobsSidecar => { + GossipKind::BlobSidecar(blob_index) => { match fork_context.from_context_bytes(gossip_topic.fork_digest) { Some(ForkName::Eip4844) => { - let block_and_blobs_sidecar = - SignedBeaconBlockAndBlobsSidecar::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?; - Ok(PubsubMessage::BeaconBlockAndBlobsSidecars( - block_and_blobs_sidecar, - )) + let blob_sidecar = SignedBlobSidecar::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?; + Ok(PubsubMessage::BlobSidecar(( + *blob_index, + Box::new(blob_sidecar), + ))) } Some( ForkName::Base @@ -293,7 +291,7 @@ impl PubsubMessage { // messages for us. match &self { PubsubMessage::BeaconBlock(data) => data.as_ssz_bytes(), - PubsubMessage::BeaconBlockAndBlobsSidecars(data) => data.as_ssz_bytes(), + PubsubMessage::BlobSidecar(data) => data.1.as_ssz_bytes(), PubsubMessage::AggregateAndProofAttestation(data) => data.as_ssz_bytes(), PubsubMessage::VoluntaryExit(data) => data.as_ssz_bytes(), PubsubMessage::ProposerSlashing(data) => data.as_ssz_bytes(), @@ -317,11 +315,10 @@ impl std::fmt::Display for PubsubMessage { block.slot(), block.message().proposer_index() ), - PubsubMessage::BeaconBlockAndBlobsSidecars(block_and_blob) => write!( + PubsubMessage::BlobSidecar(data) => write!( f, - "Beacon block and Blobs Sidecar: slot: {}, blobs: {}", - block_and_blob.beacon_block.message().slot(), - block_and_blob.blobs_sidecar.blobs.len(), + "BlobSidecar: slot: {}, blobs index: {}", + data.1.message.slot, data.1.message.index, ), PubsubMessage::AggregateAndProofAttestation(att) => write!( f, diff --git a/beacon_node/lighthouse_network/src/types/topics.rs b/beacon_node/lighthouse_network/src/types/topics.rs index ab7fb722bf8..6183f0b8f36 100644 --- a/beacon_node/lighthouse_network/src/types/topics.rs +++ b/beacon_node/lighthouse_network/src/types/topics.rs @@ -11,9 +11,9 @@ use crate::Subnet; pub const TOPIC_PREFIX: &str = "eth2"; pub const SSZ_SNAPPY_ENCODING_POSTFIX: &str = "ssz_snappy"; pub const BEACON_BLOCK_TOPIC: &str = "beacon_block"; -pub const BEACON_BLOCK_AND_BLOBS_SIDECAR_TOPIC: &str = "beacon_block_and_blobs_sidecar"; pub const BEACON_AGGREGATE_AND_PROOF_TOPIC: &str = "beacon_aggregate_and_proof"; pub const BEACON_ATTESTATION_PREFIX: &str = "beacon_attestation_"; +pub const BLOB_SIDECAR_PREFIX: &str = "blob_sidecar_"; pub const VOLUNTARY_EXIT_TOPIC: &str = "voluntary_exit"; pub const PROPOSER_SLASHING_TOPIC: &str = "proposer_slashing"; pub const ATTESTER_SLASHING_TOPIC: &str = "attester_slashing"; @@ -23,9 +23,8 @@ pub const BLS_TO_EXECUTION_CHANGE_TOPIC: &str = "bls_to_execution_change"; pub const LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update"; pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update"; -pub const CORE_TOPICS: [GossipKind; 8] = [ +pub const CORE_TOPICS: [GossipKind; 7] = [ GossipKind::BeaconBlock, - GossipKind::BeaconBlocksAndBlobsSidecar, GossipKind::BeaconAggregateAndProof, GossipKind::VoluntaryExit, GossipKind::ProposerSlashing, @@ -58,10 +57,10 @@ pub struct GossipTopic { pub enum GossipKind { /// Topic for publishing beacon blocks. BeaconBlock, - /// Topic for publishing beacon block coupled with blob sidecars. - BeaconBlocksAndBlobsSidecar, /// Topic for publishing aggregate attestations and proofs. BeaconAggregateAndProof, + /// Topic for publishing BlobSidecars. + BlobSidecar(u64), /// Topic for publishing raw attestations on a particular subnet. #[strum(serialize = "beacon_attestation")] Attestation(SubnetId), @@ -91,6 +90,7 @@ impl std::fmt::Display for GossipKind { GossipKind::SyncCommitteeMessage(subnet_id) => { write!(f, "sync_committee_{}", **subnet_id) } + GossipKind::BlobSidecar(blob_index) => write!(f, "blob_sidecar_{}", blob_index), x => f.write_str(x.as_ref()), } } @@ -151,7 +151,6 @@ impl GossipTopic { let kind = match topic_parts[3] { BEACON_BLOCK_TOPIC => GossipKind::BeaconBlock, BEACON_AGGREGATE_AND_PROOF_TOPIC => GossipKind::BeaconAggregateAndProof, - BEACON_BLOCK_AND_BLOBS_SIDECAR_TOPIC => GossipKind::BeaconBlocksAndBlobsSidecar, SIGNED_CONTRIBUTION_AND_PROOF_TOPIC => GossipKind::SignedContributionAndProof, VOLUNTARY_EXIT_TOPIC => GossipKind::VoluntaryExit, PROPOSER_SLASHING_TOPIC => GossipKind::ProposerSlashing, @@ -159,11 +158,8 @@ impl GossipTopic { BLS_TO_EXECUTION_CHANGE_TOPIC => GossipKind::BlsToExecutionChange, LIGHT_CLIENT_FINALITY_UPDATE => GossipKind::LightClientFinalityUpdate, LIGHT_CLIENT_OPTIMISTIC_UPDATE => GossipKind::LightClientOptimisticUpdate, - topic => match committee_topic_index(topic) { - Some(subnet) => match subnet { - Subnet::Attestation(s) => GossipKind::Attestation(s), - Subnet::SyncCommittee(s) => GossipKind::SyncCommitteeMessage(s), - }, + topic => match subnet_topic_index(topic) { + Some(kind) => kind, None => return Err(format!("Unknown topic: {}", topic)), }, }; @@ -208,7 +204,6 @@ impl std::fmt::Display for GossipTopic { let kind = match self.kind { GossipKind::BeaconBlock => BEACON_BLOCK_TOPIC.into(), - GossipKind::BeaconBlocksAndBlobsSidecar => BEACON_BLOCK_AND_BLOBS_SIDECAR_TOPIC.into(), GossipKind::BeaconAggregateAndProof => BEACON_AGGREGATE_AND_PROOF_TOPIC.into(), GossipKind::VoluntaryExit => VOLUNTARY_EXIT_TOPIC.into(), GossipKind::ProposerSlashing => PROPOSER_SLASHING_TOPIC.into(), @@ -218,6 +213,9 @@ impl std::fmt::Display for GossipTopic { GossipKind::SyncCommitteeMessage(index) => { format!("{}{}", SYNC_COMMITTEE_PREFIX_TOPIC, *index) } + GossipKind::BlobSidecar(blob_index) => { + format!("{}{}", BLOB_SIDECAR_PREFIX, blob_index) + } GossipKind::BlsToExecutionChange => BLS_TO_EXECUTION_CHANGE_TOPIC.into(), GossipKind::LightClientFinalityUpdate => LIGHT_CLIENT_FINALITY_UPDATE.into(), GossipKind::LightClientOptimisticUpdate => LIGHT_CLIENT_OPTIMISTIC_UPDATE.into(), @@ -249,22 +247,29 @@ pub fn subnet_from_topic_hash(topic_hash: &TopicHash) -> Option { GossipTopic::decode(topic_hash.as_str()).ok()?.subnet_id() } -// Determines if a string is an attestation or sync committee topic. -fn committee_topic_index(topic: &str) -> Option { +// Determines if a string is an attestation, sync committee or blob sidecar topic. +fn subnet_topic_index(topic: &str) -> Option { if topic.starts_with(BEACON_ATTESTATION_PREFIX) { - return Some(Subnet::Attestation(SubnetId::new( + return Some(GossipKind::Attestation(SubnetId::new( topic .trim_start_matches(BEACON_ATTESTATION_PREFIX) .parse::() .ok()?, ))); } else if topic.starts_with(SYNC_COMMITTEE_PREFIX_TOPIC) { - return Some(Subnet::SyncCommittee(SyncSubnetId::new( + return Some(GossipKind::SyncCommitteeMessage(SyncSubnetId::new( topic .trim_start_matches(SYNC_COMMITTEE_PREFIX_TOPIC) .parse::() .ok()?, ))); + } else if topic.starts_with(BLOB_SIDECAR_PREFIX) { + return Some(GossipKind::BlobSidecar( + topic + .trim_start_matches(BLOB_SIDECAR_PREFIX) + .parse::() + .ok()?, + )); } None } diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 25f2830b0c9..de827ca0ccd 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -65,13 +65,13 @@ use task_executor::TaskExecutor; use tokio::sync::mpsc; use types::{ Attestation, AttesterSlashing, Hash256, LightClientFinalityUpdate, LightClientOptimisticUpdate, - ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, + ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBlobSidecar, SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId, }; use work_reprocessing_queue::{ - spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock, - QueuedUnaggregate, ReadyWork, + spawn_reprocess_scheduler, QueuedAggregate, QueuedBlobSidecar, QueuedLightClientUpdate, + QueuedRpcBlock, QueuedUnaggregate, ReadyWork, }; use worker::{Toolbox, Worker}; @@ -117,9 +117,9 @@ const MAX_AGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN: usize = 1_024; /// before we start dropping them. const MAX_GOSSIP_BLOCK_QUEUE_LEN: usize = 1_024; -/// The maximum number of queued `SignedBeaconBlockAndBlobsSidecar` objects received on gossip that +/// The maximum number of queued `BlobSidecar` objects received on gossip that /// will be stored before we start dropping them. -const MAX_GOSSIP_BLOCK_AND_BLOB_QUEUE_LEN: usize = 1_024; +const MAX_GOSSIP_BLOB_QUEUE_LEN: usize = 1_024; /// The maximum number of queued `SignedBeaconBlock` objects received prior to their slot (but /// within acceptable clock disparity) that will be queued before we start dropping them. @@ -149,6 +149,8 @@ const MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN: usize = 1_024; /// for reprocessing before we start dropping them. const MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN: usize = 128; +const MAX_BLOBS_SIDECAR_REPROCESS_QUEUE_LEN: usize = 1_024; + /// The maximum number of queued `SyncCommitteeMessage` objects that will be stored before we start dropping /// them. const MAX_SYNC_MESSAGE_QUEUE_LEN: usize = 2048; @@ -219,7 +221,7 @@ pub const GOSSIP_ATTESTATION_BATCH: &str = "gossip_attestation_batch"; pub const GOSSIP_AGGREGATE: &str = "gossip_aggregate"; pub const GOSSIP_AGGREGATE_BATCH: &str = "gossip_aggregate_batch"; pub const GOSSIP_BLOCK: &str = "gossip_block"; -pub const GOSSIP_BLOCK_AND_BLOBS_SIDECAR: &str = "gossip_block_and_blobs_sidecar"; +pub const GOSSIP_BLOB_SIDECAR: &str = "gossip_blob_sidecar"; pub const DELAYED_IMPORT_BLOCK: &str = "delayed_import_block"; pub const GOSSIP_VOLUNTARY_EXIT: &str = "gossip_voluntary_exit"; pub const GOSSIP_PROPOSER_SLASHING: &str = "gossip_proposer_slashing"; @@ -238,6 +240,7 @@ pub const BLOBS_BY_ROOTS_REQUEST: &str = "blobs_by_roots_request"; pub const LIGHT_CLIENT_BOOTSTRAP_REQUEST: &str = "light_client_bootstrap"; pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation"; pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate"; +pub const UNKNOWN_BLOB_SIDECAR: &str = "unknown_blob_sidecar"; pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update"; pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change"; @@ -444,20 +447,20 @@ impl WorkEvent { } /// Create a new `Work` event for some blobs sidecar. - pub fn gossip_block_and_blobs_sidecar( + pub fn gossip_blob_sidecar( message_id: MessageId, peer_id: PeerId, - peer_client: Client, - block_and_blobs: SignedBeaconBlockAndBlobsSidecar, + blob_sidecar: Box>, + subnet: u64, seen_timestamp: Duration, ) -> Self { Self { drop_during_sync: false, - work: Work::GossipBlockAndBlobsSidecar { + work: Work::GossipBlobSidecar { message_id, peer_id, - peer_client, - block_and_blobs, + blob_sidecar, + subnet, seen_timestamp, }, } @@ -787,6 +790,22 @@ impl std::convert::From> for WorkEvent { seen_timestamp, }, }, + ReadyWork::BlobSidecar(QueuedBlobSidecar { + peer_id, + message_id, + blob_sidecar, + subnet, + seen_timestamp, + }) => Self { + drop_during_sync: true, + work: Work::UnknownBlobSidecar { + message_id, + peer_id, + blob_sidecar, + subnet, + seen_timestamp, + }, + }, ReadyWork::LightClientUpdate(QueuedLightClientUpdate { peer_id, message_id, @@ -857,11 +876,18 @@ pub enum Work { block: Arc>, seen_timestamp: Duration, }, - GossipBlockAndBlobsSidecar { + GossipBlobSidecar { message_id: MessageId, peer_id: PeerId, - peer_client: Client, - block_and_blobs: SignedBeaconBlockAndBlobsSidecar, + blob_sidecar: Box>, + subnet: u64, + seen_timestamp: Duration, + }, + UnknownBlobSidecar { + message_id: MessageId, + peer_id: PeerId, + blob_sidecar: Box>, + subnet: u64, seen_timestamp: Duration, }, DelayedImportBlock { @@ -965,7 +991,7 @@ impl Work { Work::GossipAggregate { .. } => GOSSIP_AGGREGATE, Work::GossipAggregateBatch { .. } => GOSSIP_AGGREGATE_BATCH, Work::GossipBlock { .. } => GOSSIP_BLOCK, - Work::GossipBlockAndBlobsSidecar { .. } => GOSSIP_BLOCK_AND_BLOBS_SIDECAR, + Work::GossipBlobSidecar { .. } => GOSSIP_BLOB_SIDECAR, Work::DelayedImportBlock { .. } => DELAYED_IMPORT_BLOCK, Work::GossipVoluntaryExit { .. } => GOSSIP_VOLUNTARY_EXIT, Work::GossipProposerSlashing { .. } => GOSSIP_PROPOSER_SLASHING, @@ -984,6 +1010,7 @@ impl Work { Work::LightClientBootstrapRequest { .. } => LIGHT_CLIENT_BOOTSTRAP_REQUEST, Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION, Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE, + Work::UnknownBlobSidecar { .. } => UNKNOWN_BLOB_SIDECAR, Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE, Work::GossipBlsToExecutionChange { .. } => GOSSIP_BLS_TO_EXECUTION_CHANGE, } @@ -1103,6 +1130,8 @@ impl BeaconProcessor { let mut unknown_block_attestation_queue = LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN); + let mut unknown_blob_sidecar_queue = LifoQueue::new(MAX_BLOBS_SIDECAR_REPROCESS_QUEUE_LEN); + let mut sync_message_queue = LifoQueue::new(MAX_SYNC_MESSAGE_QUEUE_LEN); let mut sync_contribution_queue = LifoQueue::new(MAX_SYNC_CONTRIBUTION_QUEUE_LEN); @@ -1128,8 +1157,7 @@ impl BeaconProcessor { let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN); - let mut gossip_block_and_blobs_sidecar_queue = - FifoQueue::new(MAX_GOSSIP_BLOCK_AND_BLOB_QUEUE_LEN); + let mut gossip_blob_sidecar_queue = FifoQueue::new(MAX_GOSSIP_BLOB_QUEUE_LEN); let mut delayed_block_queue = FifoQueue::new(MAX_DELAYED_BLOCK_QUEUE_LEN); let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN); @@ -1244,7 +1272,7 @@ impl BeaconProcessor { // required to verify some attestations. } else if let Some(item) = gossip_block_queue.pop() { self.spawn_worker(item, toolbox); - } else if let Some(item) = gossip_block_and_blobs_sidecar_queue.pop() { + } else if let Some(item) = gossip_blob_sidecar_queue.pop() { self.spawn_worker(item, toolbox); // Check the aggregates, *then* the unaggregates since we assume that // aggregates are more valuable to local validators and effectively give us @@ -1459,8 +1487,8 @@ impl BeaconProcessor { Work::GossipBlock { .. } => { gossip_block_queue.push(work, work_id, &self.log) } - Work::GossipBlockAndBlobsSidecar { .. } => { - gossip_block_and_blobs_sidecar_queue.push(work, work_id, &self.log) + Work::GossipBlobSidecar { .. } => { + gossip_blob_sidecar_queue.push(work, work_id, &self.log) } Work::DelayedImportBlock { .. } => { delayed_block_queue.push(work, work_id, &self.log) @@ -1513,6 +1541,9 @@ impl BeaconProcessor { Work::UnknownBlockAggregate { .. } => { unknown_block_aggregate_queue.push(work) } + Work::UnknownBlobSidecar { .. } => { + unknown_blob_sidecar_queue.push(work) + } Work::GossipBlsToExecutionChange { .. } => { gossip_bls_to_execution_change_queue.push(work, work_id, &self.log) } @@ -1742,25 +1773,39 @@ impl BeaconProcessor { /* * Verification for blobs sidecars received on gossip. */ - Work::GossipBlockAndBlobsSidecar { + Work::GossipBlobSidecar { message_id, peer_id, - peer_client, - block_and_blobs: block_sidecar_pair, + blob_sidecar, + subnet, seen_timestamp, - } => task_spawner.spawn_async(async move { - worker - .process_gossip_block( - message_id, - peer_id, - peer_client, - block_sidecar_pair.into(), - work_reprocessing_tx, - duplicate_cache, - seen_timestamp, - ) - .await + } => task_spawner.spawn_blocking(move || { + worker.process_gossip_blob_sidecar( + message_id, + peer_id, + blob_sidecar, + subnet, + Some(work_reprocessing_tx), + seen_timestamp, + ) + }), + Work::UnknownBlobSidecar { + message_id, + peer_id, + blob_sidecar, + subnet, + seen_timestamp, + } => task_spawner.spawn_blocking(move || { + worker.process_gossip_blob_sidecar( + message_id, + peer_id, + blob_sidecar, + subnet, + None, + seen_timestamp, + ) }), + /* * Import for blocks that we received earlier than their intended slot. */ diff --git a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs index 4d0bdc00278..59dfd62e558 100644 --- a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs +++ b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs @@ -31,7 +31,8 @@ use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::time::error::Error as TimeError; use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey}; use types::{ - Attestation, EthSpec, Hash256, LightClientOptimisticUpdate, SignedAggregateAndProof, SubnetId, + Attestation, EthSpec, Hash256, LightClientOptimisticUpdate, SignedAggregateAndProof, + SignedBlobSidecar, SubnetId, }; const TASK_NAME: &str = "beacon_processor_reprocess_queue"; @@ -47,6 +48,10 @@ const ADDITIONAL_QUEUED_BLOCK_DELAY: Duration = Duration::from_millis(5); /// For how long to queue aggregated and unaggregated attestations for re-processing. pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12); +/// For how long to queue blob sidecars for re-processing. +/// TODO: rethink duration +pub const QUEUED_BLOB_SIDECAR_DELAY: Duration = Duration::from_secs(12); + /// For how long to queue light client updates for re-processing. pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12); @@ -61,6 +66,10 @@ const MAXIMUM_QUEUED_BLOCKS: usize = 16; /// How many attestations we keep before new ones get dropped. const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384; +/// TODO: fix number +/// How many blobs we keep before new ones get dropped. +const MAXIMUM_QUEUED_BLOB_SIDECARS: usize = 16_384; + /// How many light client updates we keep before new ones get dropped. const MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES: usize = 128; @@ -81,6 +90,8 @@ pub enum ReprocessQueueMessage { UnknownBlockUnaggregate(QueuedUnaggregate), /// An aggregated attestation that references an unknown block. UnknownBlockAggregate(QueuedAggregate), + /// A blob sidecar that references an unknown block. + UnknownBlobSidecar(QueuedBlobSidecar), /// A light client optimistic update that references a parent root that has not been seen as a parent. UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate), } @@ -91,6 +102,7 @@ pub enum ReadyWork { RpcBlock(QueuedRpcBlock), Unaggregate(QueuedUnaggregate), Aggregate(QueuedAggregate), + BlobSidecar(QueuedBlobSidecar), LightClientUpdate(QueuedLightClientUpdate), } @@ -114,6 +126,16 @@ pub struct QueuedAggregate { pub seen_timestamp: Duration, } +/// A blob sidecar for which the corresponding block was not seen while processing, queued for +/// later. +pub struct QueuedBlobSidecar { + pub peer_id: PeerId, + pub message_id: MessageId, + pub blob_sidecar: Box>, + pub subnet: u64, + pub seen_timestamp: Duration, +} + /// A light client update for which the corresponding parent block was not seen while processing, /// queued for later. pub struct QueuedLightClientUpdate { @@ -152,6 +174,8 @@ enum InboundEvent { ReadyRpcBlock(QueuedRpcBlock), /// An aggregated or unaggregated attestation is ready for re-processing. ReadyAttestation(QueuedAttestationId), + /// A blob sidecar is ready for re-processing. + ReadyBlobSidecar(QueuedBlobSidecarId), /// A light client update that is ready for re-processing. ReadyLightClientUpdate(QueuedLightClientUpdateId), /// A `DelayQueue` returned an error. @@ -174,6 +198,7 @@ struct ReprocessQueue { rpc_block_delay_queue: DelayQueue>, /// Queue to manage scheduled attestations. attestations_delay_queue: DelayQueue, + blob_sidecar_delay_queue: DelayQueue, /// Queue to manage scheduled light client updates. lc_updates_delay_queue: DelayQueue, @@ -186,6 +211,8 @@ struct ReprocessQueue { queued_unaggregates: FnvHashMap, DelayKey)>, /// Attestations (aggregated and unaggregated) per root. awaiting_attestations_per_root: HashMap>, + queued_blob_sidecars: FnvHashMap, DelayKey)>, + awaiting_blob_sidecars_per_root: HashMap>, /// Queued Light Client Updates. queued_lc_updates: FnvHashMap, DelayKey)>, /// Light Client Updates per parent_root. @@ -195,14 +222,19 @@ struct ReprocessQueue { /// Next attestation id, used for both aggregated and unaggregated attestations next_attestation: usize, next_lc_update: usize, + next_sidecar: usize, early_block_debounce: TimeLatch, rpc_block_debounce: TimeLatch, attestation_delay_debounce: TimeLatch, lc_update_delay_debounce: TimeLatch, + blob_sidecar_debounce: TimeLatch, } pub type QueuedLightClientUpdateId = usize; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +struct QueuedBlobSidecarId(usize); + #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum QueuedAttestationId { Aggregate(usize), @@ -272,6 +304,21 @@ impl Stream for ReprocessQueue { Poll::Ready(None) | Poll::Pending => (), } + match self.blob_sidecar_delay_queue.poll_expired(cx) { + Poll::Ready(Some(Ok(id))) => { + return Poll::Ready(Some(InboundEvent::ReadyBlobSidecar(id.into_inner()))); + } + Poll::Ready(Some(Err(e))) => { + return Poll::Ready(Some(InboundEvent::DelayQueueError( + e, + "blobs_sidecar_queue", + ))); + } + // `Poll::Ready(None)` means that there are no more entries in the delay queue and we + // will continue to get this result until something else is added into the queue. + Poll::Ready(None) | Poll::Pending => (), + } + match self.lc_updates_delay_queue.poll_expired(cx) { Poll::Ready(Some(Ok(lc_id))) => { return Poll::Ready(Some(InboundEvent::ReadyLightClientUpdate( @@ -313,21 +360,26 @@ pub fn spawn_reprocess_scheduler( work_reprocessing_rx, ready_work_tx, gossip_block_delay_queue: DelayQueue::new(), + blob_sidecar_delay_queue: DelayQueue::new(), rpc_block_delay_queue: DelayQueue::new(), attestations_delay_queue: DelayQueue::new(), lc_updates_delay_queue: DelayQueue::new(), queued_gossip_block_roots: HashSet::new(), queued_lc_updates: FnvHashMap::default(), + queued_blob_sidecars: FnvHashMap::default(), queued_aggregates: FnvHashMap::default(), queued_unaggregates: FnvHashMap::default(), awaiting_attestations_per_root: HashMap::new(), + awaiting_blob_sidecars_per_root: HashMap::new(), awaiting_lc_updates_per_parent_root: HashMap::new(), next_attestation: 0, + next_sidecar: 0, next_lc_update: 0, early_block_debounce: TimeLatch::default(), rpc_block_debounce: TimeLatch::default(), attestation_delay_debounce: TimeLatch::default(), lc_update_delay_debounce: TimeLatch::default(), + blob_sidecar_debounce: TimeLatch::default(), }; executor.spawn( @@ -529,6 +581,39 @@ impl ReprocessQueue { self.next_attestation += 1; } + InboundEvent::Msg(UnknownBlobSidecar(queued_blob_sidecar)) => { + if self.blob_sidecar_delay_queue.len() >= MAXIMUM_QUEUED_BLOB_SIDECARS { + if self.blob_sidecar_debounce.elapsed() { + error!( + log, + "Blob sidecar queue is full"; + "queue_size" => MAXIMUM_QUEUED_BLOB_SIDECARS, + "msg" => "check system clock" + ); + } + // Drop the blob. + return; + } + + let id = QueuedBlobSidecarId(self.next_sidecar); + + // Register the delay. + let delay_key = self + .blob_sidecar_delay_queue + .insert(id, QUEUED_BLOB_SIDECAR_DELAY); + + // Register this sidecar for the corresponding root. + self.awaiting_blob_sidecars_per_root + .entry(queued_blob_sidecar.blob_sidecar.message.block_root) + .or_default() + .push(id); + + // Store the blob sidecar and its info. + self.queued_blob_sidecars + .insert(self.next_sidecar, (queued_blob_sidecar, delay_key)); + + self.next_sidecar += 1; + } InboundEvent::Msg(UnknownLightClientOptimisticUpdate( queued_light_client_optimistic_update, )) => { @@ -613,6 +698,43 @@ impl ReprocessQueue { } } } + // Unqueue the blob sidecars we have for this root, if any. + // TODO: merge the 2 data structures. + if let Some(queued_ids) = self.awaiting_blob_sidecars_per_root.remove(&block_root) { + for id in queued_ids { + // metrics::inc_counter( + // &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_ATTESTATIONS, + // ); + + if let Some((work, delay_key)) = self + .queued_blob_sidecars + .remove(&id.0) + .map(|(blobs_sidecar, delay_key)| { + (ReadyWork::BlobSidecar(blobs_sidecar), delay_key) + }) + { + // Remove the delay. + self.blob_sidecar_delay_queue.remove(&delay_key); + + // Send the work. + if self.ready_work_tx.try_send(work).is_err() { + error!( + log, + "Failed to send scheduled blob sidecar"; + ); + } + } else { + // There is a mismatch between the blob sidecar ids registered for this + // root and the queued blob sidecars. This should never happen. + error!( + log, + "Unknown queued blob sidecar for block root"; + "block_root" => ?block_root, + "id" => ?id, + ); + } + } + } // Unqueue the light client optimistic updates we have for this root, if any. if let Some(queued_lc_id) = self .awaiting_lc_updates_per_parent_root @@ -737,6 +859,40 @@ impl ReprocessQueue { } } } + InboundEvent::ReadyBlobSidecar(queued_blobs_sidecar_id) => { + // metrics::inc_counter( + // &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS, + // ); + + if let Some((root, work)) = self + .queued_blob_sidecars + .remove(&queued_blobs_sidecar_id.0) + .map(|(blobs_sidecar, _delay_key)| { + ( + blobs_sidecar.blob_sidecar.message.block_root, + ReadyWork::BlobSidecar(blobs_sidecar), + ) + }) + { + if self.ready_work_tx.try_send(work).is_err() { + error!( + log, + "Failed to send scheduled attestation"; + ); + } + + if let Some(queued_blob_sidecars) = + self.awaiting_blob_sidecars_per_root.get_mut(&root) + { + if let Some(index) = queued_blob_sidecars + .iter() + .position(|&id| id == queued_blobs_sidecar_id) + { + queued_blob_sidecars.swap_remove(index); + } + } + } + } InboundEvent::ReadyLightClientUpdate(queued_id) => { metrics::inc_counter( &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_OPTIMISTIC_UPDATES, diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 5e84cbb5f22..8765dbed920 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -1,9 +1,11 @@ +use crate::beacon_processor::work_reprocessing_queue::QueuedBlobSidecar; use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; use beacon_chain::blob_verification::{AsBlock, BlockWrapper}; use beacon_chain::store::Error; use beacon_chain::{ attestation_verification::{self, Error as AttnError, VerifiedAttestation}, + blob_verification::BlobError, light_client_finality_update_verification::Error as LightClientFinalityUpdateError, light_client_optimistic_update_verification::Error as LightClientOptimisticUpdateError, observed_operations::ObservationOutcome, @@ -22,7 +24,7 @@ use store::hot_cold_store::HotColdDBError; use tokio::sync::mpsc; use types::{ Attestation, AttesterSlashing, EthSpec, Hash256, IndexedAttestation, LightClientFinalityUpdate, - LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, + LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, SignedBlobSidecar, SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId, }; @@ -698,6 +700,33 @@ impl Worker { } } + pub fn process_gossip_blob_sidecar( + self, + _message_id: MessageId, + _peer_id: PeerId, + _blob_sidecar: Box>, + _subnet: u64, + _reprocess_tx: Option>>, + _seen_duration: Duration, + ) { + // match self.chain.verify_blobs_sidecar_for_gossip(&blob) { + // Ok(verified_sidecar) => { + // // Register with validator monitor + // // Propagate + // // Apply to fork choice + // } + // Err(error) => self.handle_blobs_verification_failure( + // peer_id, + // message_id, + // reprocess_tx, + // error, + // blob, + // seen_timestamp, + // ), + // }; + unimplemented!() + } + /// Process the beacon block received from the gossip network and /// if it passes gossip propagation criteria, tell the network thread to forward it. /// @@ -2453,6 +2482,88 @@ impl Worker { ); } + /// Handle an error whilst verifying a `SignedBlobsSidecar` from the network. + fn handle_blobs_verification_failure( + &self, + peer_id: PeerId, + message_id: MessageId, + reprocess_tx: Option>>, + error: BlobError, + blob_sidecar: Box>, + subnet: u64, + seen_timestamp: Duration, + ) { + // TODO: metrics + match &error { + BlobError::FutureSlot { .. } => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } + BlobError::BeaconChainError(_e) => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } + BlobError::ProposerSignatureInvalid => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + } + BlobError::RepeatSidecar { + proposer: _, + slot: _, + blob_index: _, + } => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + } + BlobError::UnknownHeadBlock { beacon_block_root } => { + debug!( + self.log, + "Blob sidecar for unknown block"; + "peer_id" => %peer_id, + "block" => ?beacon_block_root + ); + if let Some(sender) = reprocess_tx { + // We don't know the block, get the sync manager to handle the block lookup, and + // send the attestation to be scheduled for re-processing. + self.sync_tx + .send(SyncMessage::UnknownBlockHash(peer_id, *beacon_block_root)) + .unwrap_or_else(|_| { + warn!( + self.log, + "Failed to send to sync service"; + "msg" => "UnknownBlockHash" + ) + }); + let msg = ReprocessQueueMessage::UnknownBlobSidecar(QueuedBlobSidecar { + peer_id, + message_id, + blob_sidecar, + subnet, + seen_timestamp, + }); + + if sender.try_send(msg).is_err() { + error!( + self.log, + "Failed to send blob sidecar for re-processing"; + ) + } + } else { + // We shouldn't make any further attempts to process this attestation. + // + // Don't downscore the peer since it's not clear if we requested this head + // block from them or not. + self.propagate_validation_result( + message_id, + peer_id, + MessageAcceptance::Ignore, + ); + } + + return; + } + BlobError::UnknownValidator(_) => todo!(), + //TODO(pawan): handle all cases + _ => todo!(), + } + } + /// Propagate (accept) if `is_timely == true`, otherwise ignore. fn propagate_if_timely(&self, is_timely: bool, message_id: MessageId, peer_id: PeerId) { if is_timely { diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 31f2092049f..b712dc6e735 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -250,12 +250,12 @@ impl Router { block, ); } - PubsubMessage::BeaconBlockAndBlobsSidecars(block_and_blobs) => { - self.processor.on_block_and_blobs_sidecar_gossip( + PubsubMessage::BlobSidecar(blob_sidecar) => { + self.processor.on_blob_sidecar_gossip( id, peer_id, - self.network_globals.client(&peer_id), - block_and_blobs, + blob_sidecar.1.clone(), + blob_sidecar.0, ); } PubsubMessage::VoluntaryExit(exit) => { diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index d0879babacb..7f241dd8d8b 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -20,8 +20,8 @@ use tokio::sync::mpsc; use types::{ Attestation, AttesterSlashing, BlobsSidecar, EthSpec, LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, - SignedBeaconBlockAndBlobsSidecar, SignedBlsToExecutionChange, SignedContributionAndProof, - SignedVoluntaryExit, SubnetId, SyncSubnetId, + SignedBeaconBlockAndBlobsSidecar, SignedBlobSidecar, SignedBlsToExecutionChange, + SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncSubnetId, }; /// Processes validated messages from the network. It relays necessary data to the syncing thread @@ -359,18 +359,18 @@ impl Processor { )) } - pub fn on_block_and_blobs_sidecar_gossip( + pub fn on_blob_sidecar_gossip( &mut self, message_id: MessageId, peer_id: PeerId, - peer_client: Client, - block_and_blobs: SignedBeaconBlockAndBlobsSidecar, + blob_sidecar: Box>, + subnet: u64, ) { - self.send_beacon_processor_work(BeaconWorkEvent::gossip_block_and_blobs_sidecar( + self.send_beacon_processor_work(BeaconWorkEvent::gossip_blob_sidecar( message_id, peer_id, - peer_client, - block_and_blobs, + blob_sidecar, + subnet, timestamp_now(), )) } diff --git a/consensus/types/src/blob_sidecar.rs b/consensus/types/src/blob_sidecar.rs new file mode 100644 index 00000000000..f584c63a535 --- /dev/null +++ b/consensus/types/src/blob_sidecar.rs @@ -0,0 +1,51 @@ +use crate::test_utils::TestRandom; +use crate::{Blob, EthSpec, Hash256, SignedRoot, Slot}; +use derivative::Derivative; +use kzg::{KzgCommitment, KzgProof}; +use serde_derive::{Deserialize, Serialize}; +use ssz::Encode; +use ssz_derive::{Decode, Encode}; +use test_random_derive::TestRandom; +use tree_hash_derive::TreeHash; + +#[derive( + Debug, + Clone, + Serialize, + Deserialize, + Encode, + Decode, + TreeHash, + Default, + TestRandom, + Derivative, + arbitrary::Arbitrary, +)] +#[serde(bound = "T: EthSpec")] +#[arbitrary(bound = "T: EthSpec")] +#[derivative(PartialEq, Hash(bound = "T: EthSpec"))] +pub struct BlobSidecar { + pub block_root: Hash256, + // TODO: fix the type, should fit in u8 as well + pub index: u64, + pub slot: Slot, + pub block_parent_root: Hash256, + pub proposer_index: usize, + pub blob: Blob, + pub kzg_commitment: KzgCommitment, + pub kzg_proof: KzgProof, +} + +impl SignedRoot for BlobSidecar {} + +impl BlobSidecar { + pub fn empty() -> Self { + Self::default() + } + + #[allow(clippy::integer_arithmetic)] + pub fn max_size() -> usize { + // Fixed part + Self::empty().as_ssz_bytes().len() + } +} diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index c09b34f87c3..9ae0a3ce7f0 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -99,7 +99,9 @@ pub mod slot_data; #[cfg(feature = "sqlite")] pub mod sqlite; +pub mod blob_sidecar; pub mod blobs_sidecar; +pub mod signed_blob_sidecar; pub mod signed_block_and_blobs; pub mod transaction; @@ -121,6 +123,7 @@ pub use crate::beacon_block_body::{ pub use crate::beacon_block_header::BeaconBlockHeader; pub use crate::beacon_committee::{BeaconCommittee, OwnedBeaconCommittee}; pub use crate::beacon_state::{BeaconTreeHashCache, Error as BeaconStateError, *}; +pub use crate::blob_sidecar::BlobSidecar; pub use crate::blobs_sidecar::BlobsSidecar; pub use crate::bls_to_execution_change::BlsToExecutionChange; pub use crate::chain_spec::{ChainSpec, Config, Domain}; @@ -177,6 +180,7 @@ pub use crate::signed_beacon_block::{ SignedBlindedBeaconBlock, }; pub use crate::signed_beacon_block_header::SignedBeaconBlockHeader; +pub use crate::signed_blob_sidecar::SignedBlobSidecar; pub use crate::signed_block_and_blobs::SignedBeaconBlockAndBlobsSidecar; pub use crate::signed_block_and_blobs::SignedBeaconBlockAndBlobsSidecarDecode; pub use crate::signed_bls_to_execution_change::SignedBlsToExecutionChange; diff --git a/consensus/types/src/signed_blob_sidecar.rs b/consensus/types/src/signed_blob_sidecar.rs new file mode 100644 index 00000000000..65ea48ee815 --- /dev/null +++ b/consensus/types/src/signed_blob_sidecar.rs @@ -0,0 +1,43 @@ +use crate::test_utils::TestRandom; +use crate::{BlobSidecar, ChainSpec, EthSpec, Fork, Hash256, PublicKey, Signature, SignedRoot}; +use derivative::Derivative; +use serde_derive::{Deserialize, Serialize}; +use ssz_derive::{Decode, Encode}; +use test_random_derive::TestRandom; +use tree_hash_derive::TreeHash; + +#[derive( + Debug, + Clone, + Serialize, + Deserialize, + Encode, + Decode, + TreeHash, + TestRandom, + Derivative, + arbitrary::Arbitrary, +)] +#[serde(bound = "T: EthSpec")] +#[arbitrary(bound = "T: EthSpec")] +#[derivative(PartialEq, Hash(bound = "T: EthSpec"))] +pub struct SignedBlobSidecar { + pub message: BlobSidecar, + pub signature: Signature, +} + +impl SignedRoot for SignedBlobSidecar {} + +impl SignedBlobSidecar { + pub fn verify_signature( + &self, + _object_root_opt: Option, + _pubkey: &PublicKey, + _fork: &Fork, + _genesis_validators_root: Hash256, + _spec: &ChainSpec, + ) -> bool { + // TODO (pawan): fill up logic + unimplemented!() + } +} diff --git a/crypto/kzg/src/kzg_commitment.rs b/crypto/kzg/src/kzg_commitment.rs index 8d6eefecd86..4d82539b38f 100644 --- a/crypto/kzg/src/kzg_commitment.rs +++ b/crypto/kzg/src/kzg_commitment.rs @@ -20,6 +20,12 @@ impl Display for KzgCommitment { } } +impl Default for KzgCommitment { + fn default() -> Self { + KzgCommitment([0; KZG_COMMITMENT_BYTES_LEN]) + } +} + impl TreeHash for KzgCommitment { fn tree_hash_type() -> tree_hash::TreeHashType { <[u8; KZG_COMMITMENT_BYTES_LEN] as TreeHash>::tree_hash_type()