Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Update statement-distribution to fetch PVD
Browse files Browse the repository at this point in the history
  • Loading branch information
slumber committed Jun 7, 2022
1 parent 39c194e commit 4a762d7
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 18 deletions.
12 changes: 10 additions & 2 deletions node/network/statement-distribution/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
//! Error handling related code and Error/Result definitions.
use polkadot_node_network_protocol::PeerId;
use polkadot_node_subsystem::SubsystemError;
use polkadot_node_subsystem::{RuntimeApiError, SubsystemError};
use polkadot_node_subsystem_util::runtime;
use polkadot_primitives::v2::{CandidateHash, Hash};
use polkadot_primitives::v2::{CandidateHash, Hash, Id as ParaId};

use futures::channel::oneshot;

use crate::LOG_TARGET;

Expand Down Expand Up @@ -56,6 +58,12 @@ pub enum Error {
#[error("Error while accessing runtime information")]
Runtime(#[from] runtime::Error),

#[error("RuntimeAPISubsystem channel closed before receipt")]
RuntimeApiUnavailable(#[source] oneshot::Canceled),

#[error("Fetching persisted validation data for para {0:?}, {1:?}")]
FetchPersistedValidationData(ParaId, RuntimeApiError),

#[error("Relay parent could not be found in active heads")]
NoSuchHead(Hash),

Expand Down
103 changes: 87 additions & 16 deletions node/network/statement-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ use polkadot_node_network_protocol::{
v1::{self as protocol_v1, StatementMetadata},
IfDisconnected, PeerId, UnifiedReputationChange as Rep, Versioned, View,
};
use polkadot_node_primitives::{SignedFullStatement, Statement, UncheckedSignedFullStatement};
use polkadot_node_primitives::{
SignedFullStatement, Statement, StatementWithPVD, UncheckedSignedFullStatement,
};
use polkadot_node_subsystem_util::{self as util, rand, MIN_GOSSIP_PEERS};

use polkadot_node_subsystem::{
Expand All @@ -43,12 +45,12 @@ use polkadot_node_subsystem::{
StatementDistributionMessage,
},
overseer, ActiveLeavesUpdate, FromOverseer, OverseerSignal, PerLeafSpan, SpawnedSubsystem,
SubsystemError,
StatementDistributionSenderTrait, SubsystemError,
};
use polkadot_primitives::v2::{
AuthorityDiscoveryId, CandidateHash, CommittedCandidateReceipt, CompactStatement, Hash,
SignedStatement, SigningContext, UncheckedSignedStatement, ValidatorId, ValidatorIndex,
ValidatorSignature,
Id as ParaId, OccupiedCoreAssumption, PersistedValidationData, SignedStatement, SigningContext,
UncheckedSignedStatement, ValidatorId, ValidatorIndex, ValidatorSignature,
};

use futures::{
Expand Down Expand Up @@ -656,7 +658,11 @@ enum DeniedStatement {

struct ActiveHeadData {
/// All candidates we are aware of for this head, keyed by hash.
candidates: HashSet<CandidateHash>,
/// We also store para id for querying persisted validation data
/// from the Runtime.
candidates: HashMap<CandidateHash, ParaId>,
/// Persisted validation data cache.
cached_validation_data: HashMap<ParaId, PersistedValidationData>,
/// Stored statements for circulation to peers.
///
/// These are iterable in insertion order, and `Seconded` statements are always
Expand All @@ -682,6 +688,7 @@ impl ActiveHeadData {
) -> Self {
ActiveHeadData {
candidates: Default::default(),
cached_validation_data: Default::default(),
statements: Default::default(),
waiting_large_statements: Default::default(),
validators,
Expand All @@ -691,6 +698,42 @@ impl ActiveHeadData {
}
}

async fn get_persisted_validation_data<Sender>(
&mut self,
sender: &mut Sender,
relay_parent: Hash,
candidate_hash: CandidateHash,
) -> Result<Option<PersistedValidationData>>
where
Sender: StatementDistributionSenderTrait,
{
let para_id = match self.candidates.get(&candidate_hash) {
Some(para_id) => *para_id,
None => return Ok(None),
};

if let Entry::Vacant(entry) = self.cached_validation_data.entry(para_id) {
let persisted_validation_data =
polkadot_node_subsystem_util::request_persisted_validation_data(
relay_parent,
para_id,
OccupiedCoreAssumption::Free,
sender,
)
.await
.await
.map_err(Error::RuntimeApiUnavailable)?
.map_err(|err| Error::FetchPersistedValidationData(para_id, err))?;

match persisted_validation_data {
Some(pvd) => entry.insert(pvd),
None => return Ok(None),
};
}

Ok(self.cached_validation_data.get(&para_id).cloned())
}

/// Note the given statement.
///
/// If it was not already known and can be accepted, returns `NotedStatement::Fresh`,
Expand All @@ -713,8 +756,11 @@ impl ActiveHeadData {
signature: statement.signature().clone(),
};

match comparator.compact {
CompactStatement::Seconded(h) => {
match statement.payload() {
Statement::Seconded(candidate_receipt) => {
let h = candidate_receipt.hash();
let para_id = candidate_receipt.descriptor.para_id;

let seconded_so_far = self.seconded_counts.entry(validator_index).or_insert(0);
if *seconded_so_far >= VC_THRESHOLD {
gum::trace!(
Expand All @@ -726,7 +772,7 @@ impl ActiveHeadData {
return NotedStatement::NotUseful
}

self.candidates.insert(h);
self.candidates.insert(h, para_id);
if let Some(old) = self.statements.insert(comparator.clone(), statement) {
gum::trace!(
target: LOG_TARGET,
Expand All @@ -753,8 +799,8 @@ impl ActiveHeadData {
NotedStatement::Fresh(key_value.into())
}
},
CompactStatement::Valid(h) => {
if !self.candidates.contains(&h) {
Statement::Valid(h) => {
if !self.candidates.contains_key(&h) {
gum::trace!(
target: LOG_TARGET,
?validator_index,
Expand Down Expand Up @@ -829,7 +875,7 @@ impl ActiveHeadData {
}
},
CompactStatement::Valid(h) => {
if !self.candidates.contains(&h) {
if !self.candidates.contains_key(&h) {
gum::trace!(
target: LOG_TARGET,
?validator_index,
Expand Down Expand Up @@ -1503,6 +1549,22 @@ async fn handle_incoming_message<'a, Context>(
}
};

// TODO [https://github.com/paritytech/polkadot/issues/5055]
//
// For `Seconded` statements `None` or `Err` means we couldn't fetch the PVD, which
// means the statement shouldn't be accepted.
//
// In case of `Valid` we should have it cached prior, therefore this performs
// no Runtime API calls and always returns `Ok(Some(_))`.
let persisted_validation_data = if let Ok(Some(persisted_validation_data)) = active_head
.get_persisted_validation_data(ctx.sender(), relay_parent, candidate_hash)
.await
{
persisted_validation_data
} else {
return None
};

// Fetch from the network only after signature and usefulness checks are completed.
let is_large_statement = message.is_large_statement();
let statement =
Expand Down Expand Up @@ -1561,17 +1623,26 @@ async fn handle_incoming_message<'a, Context>(
unreachable!("checked in `is_useful_or_unknown` above; qed");
},
NotedStatement::Fresh(statement) => {
// Extend the payload with persisted validation data required by the backing
// subsystem.
let statement_with_pvd = statement
.statement
.clone()
.convert_to_superpayload_with(|statement| match statement {
Statement::Seconded(receipt) =>
StatementWithPVD::Seconded(receipt, persisted_validation_data),
Statement::Valid(candidate_hash) => StatementWithPVD::Valid(candidate_hash),
})
.expect("payload was checked with conversion from compact; qed");

report_peer(ctx.sender(), peer, BENEFIT_VALID_STATEMENT_FIRST).await;

let mut _span = handle_incoming_span.child("notify-backing");

// When we receive a new message from a peer, we forward it to the
// candidate backing subsystem.
ctx.send_message(CandidateBackingMessage::Statement(
relay_parent,
statement.statement.clone(),
))
.await;
ctx.send_message(CandidateBackingMessage::Statement(relay_parent, statement_with_pvd))
.await;

Some((relay_parent, statement))
},
Expand Down

0 comments on commit 4a762d7

Please sign in to comment.