diff --git a/Cargo.lock b/Cargo.lock index 8583a3b1db51..90fd96800ba9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4488,8 +4488,6 @@ dependencies = [ "bitvec", "derive_more 0.99.9", "futures 0.3.5", - "futures-timer 3.0.2", - "log 0.4.8", "polkadot-erasure-coding", "polkadot-node-primitives", "polkadot-node-subsystem", @@ -4502,7 +4500,6 @@ dependencies = [ "sp-blockchain", "sp-core", "sp-keyring", - "streamunordered", ] [[package]] @@ -4547,11 +4544,19 @@ name = "polkadot-node-subsystem" version = "0.1.0" dependencies = [ "async-trait", + "derive_more 0.99.9", "futures 0.3.5", + "futures-timer 3.0.2", + "log 0.4.8", + "parity-scale-codec", + "pin-project", "polkadot-node-primitives", "polkadot-primitives", "polkadot-statement-table", + "sc-keystore", "sc-network", + "sp-core", + "streamunordered", ] [[package]] diff --git a/node/core/backing/Cargo.toml b/node/core/backing/Cargo.toml index b25055293add..720b8af418d2 100644 --- a/node/core/backing/Cargo.toml +++ b/node/core/backing/Cargo.toml @@ -6,20 +6,16 @@ edition = "2018" [dependencies] futures = "0.3.5" -log = "0.4.8" sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" } keystore = { package = "sc-keystore", git = "https://github.com/paritytech/substrate", branch = "master" } primitives = { package = "sp-core", git = "https://github.com/paritytech/substrate", branch = "master" } - polkadot-primitives = { path = "../../../primitives" } polkadot-node-primitives = { path = "../../primitives" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } erasure-coding = { package = "polkadot-erasure-coding", path = "../../../erasure-coding" } statement-table = { package = "polkadot-statement-table", path = "../../../statement-table" } -futures-timer = "3.0.2" -streamunordered = "0.5.1" derive_more = "0.99.9" bitvec = { version = "0.17.4", default-features = false, features = ["alloc"] } diff --git a/node/core/backing/src/lib.rs b/node/core/backing/src/lib.rs index 281147847e19..e0309ec8428e 100644 --- a/node/core/backing/src/lib.rs +++ b/node/core/backing/src/lib.rs @@ -16,45 +16,43 @@ //! Implements a `CandidateBackingSubsystem`. -#![recursion_limit="256"] - use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; use std::pin::Pin; use std::sync::Arc; -use std::time::Duration; use bitvec::vec::BitVec; -use log; use futures::{ - select, FutureExt, SinkExt, StreamExt, - channel::{oneshot, mpsc}, - future::{self, Either}, - task::{Spawn, SpawnError, SpawnExt}, + channel::{mpsc, oneshot}, + task::{Spawn, SpawnError}, + Future, FutureExt, SinkExt, StreamExt, }; -use futures_timer::Delay; -use streamunordered::{StreamUnordered, StreamYield}; -use primitives::Pair; use keystore::KeyStorePtr; use polkadot_primitives::v1::{ - CommittedCandidateReceipt, BackedCandidate, Id as ParaId, ValidatorPair, ValidatorId, + CommittedCandidateReceipt, BackedCandidate, Id as ParaId, ValidatorId, ValidatorIndex, SigningContext, PoV, OmittedValidationData, CandidateDescriptor, AvailableData, ErasureChunk, ValidatorSignature, Hash, CandidateReceipt, CandidateCommitments, }; use polkadot_node_primitives::{ - FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport, ValidationResult, - ValidationOutputs, + FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport, + ValidationOutputs, ValidationResult, }; use polkadot_subsystem::{ - FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem, -}; -use polkadot_subsystem::messages::{ - AllMessages, CandidateBackingMessage, CandidateSelectionMessage, SchedulerRoster, - RuntimeApiMessage, RuntimeApiRequest, CandidateValidationMessage, ValidationFailed, - StatementDistributionMessage, NewBackedCandidate, ProvisionerMessage, ProvisionableData, - PoVDistributionMessage, AvailabilityStoreMessage, + Subsystem, SubsystemContext, SpawnedSubsystem, + messages::{ + AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage, + CandidateValidationMessage, NewBackedCandidate, PoVDistributionMessage, ProvisionableData, + ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage, ValidationFailed, + }, + util::{ + self, + request_signing_context, + request_validator_groups, + request_validators, + Validator, + }, }; use statement_table::{ generic::AttestedCandidate as TableAttestedCandidate, @@ -68,9 +66,7 @@ use statement_table::{ #[derive(Debug, derive_more::From)] enum Error { - NotInValidatorSet, CandidateNotFound, - JobNotFound(Hash), InvalidSignature, #[from] Erasure(erasure_coding::Error), @@ -82,6 +78,8 @@ enum Error { Mpsc(mpsc::SendError), #[from] Spawn(SpawnError), + #[from] + UtilError(util::Error), } /// Holds all data needed for candidate backing job operation. @@ -92,7 +90,6 @@ struct CandidateBackingJob { rx_to: mpsc::Receiver, /// Outbound message channel sending part. tx_from: mpsc::Sender, - /// The `ParaId`s assigned to this validator. assignment: ParaId, /// We issued `Valid` or `Invalid` statements on about these candidates. @@ -101,7 +98,6 @@ struct CandidateBackingJob { seconded: Option, /// We have already reported misbehaviors for these validators. reported_misbehavior_for: HashSet, - table: Table, table_context: TableContext, } @@ -113,7 +109,7 @@ const fn group_quorum(n_validators: usize) -> usize { #[derive(Default)] struct TableContext { signing_context: SigningContext, - key: Option, + validator: Option, groups: HashMap>, validators: Vec, } @@ -142,30 +138,40 @@ impl TableContextTrait for TableContext { } } -impl TableContext { - fn local_id(&self) -> Option { - self.key.as_ref().map(|k| k.public()) +/// A message type that is sent from `CandidateBackingSubsystem` to `CandidateBackingJob`. +pub enum ToJob { + /// A `CandidateBackingMessage`. + CandidateBacking(CandidateBackingMessage), + /// Stop working. + Stop, +} + +impl TryFrom for ToJob { + type Error = (); + + fn try_from(msg: AllMessages) -> Result { + match msg { + AllMessages::CandidateBacking(msg) => Ok(ToJob::CandidateBacking(msg)), + _ => Err(()), + } } +} - fn local_index(&self) -> Option { - self.local_id().and_then(|id| - self.validators - .iter() - .enumerate() - .find(|(_, k)| k == &&id) - .map(|(i, _)| i as ValidatorIndex) - ) +impl From for ToJob { + fn from(msg: CandidateBackingMessage) -> Self { + Self::CandidateBacking(msg) } } -const CHANNEL_CAPACITY: usize = 64; +impl util::ToJobTrait for ToJob { + const STOP: Self = ToJob::Stop; -/// A message type that is sent from `CandidateBackingSubsystem` to `CandidateBackingJob`. -enum ToJob { - /// A `CandidateBackingMessage`. - CandidateBacking(CandidateBackingMessage), - /// Stop working. - Stop, + fn relay_parent(&self) -> Option { + match self { + Self::CandidateBacking(cb) => cb.relay_parent(), + Self::Stop => None, + } + } } /// A message type that is sent from `CandidateBackingJob` to `CandidateBackingSubsystem`. @@ -193,6 +199,23 @@ impl From for AllMessages { } } +impl TryFrom for FromJob { + type Error = &'static str; + + fn try_from(f: AllMessages) -> Result { + match f { + AllMessages::AvailabilityStore(msg) => Ok(FromJob::AvailabilityStore(msg)), + AllMessages::RuntimeApi(msg) => Ok(FromJob::RuntimeApiMessage(msg)), + AllMessages::CandidateValidation(msg) => Ok(FromJob::CandidateValidation(msg)), + AllMessages::CandidateSelection(msg) => Ok(FromJob::CandidateSelection(msg)), + AllMessages::StatementDistribution(msg) => Ok(FromJob::StatementDistribution(msg)), + AllMessages::PoVDistribution(msg) => Ok(FromJob::PoVDistribution(msg)), + AllMessages::Provisioner(msg) => Ok(FromJob::Provisioner(msg)), + _ => Err("can't convert this AllMessages variant to FromJob"), + } + } +} + // It looks like it's not possible to do an `impl From` given the current state of // the code. So this does the necessary conversion. fn primitive_statement_to_table(s: &SignedFullStatement) -> TableSignedStatement { @@ -209,19 +232,9 @@ fn primitive_statement_to_table(s: &SignedFullStatement) -> TableSignedStatement } } -// finds the first key we are capable of signing with out of the given set of validators, -// if any. -fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option { - let keystore = keystore.read(); - validators.iter() - .find_map(|v| { - keystore.key_pair::(&v).ok() - }) -} - impl CandidateBackingJob { /// Run asynchronously. - async fn run(mut self) -> Result<(), Error> { + async fn run_loop(mut self) -> Result<(), Error> { while let Some(msg) = self.rx_to.next().await { match msg { ToJob::CandidateBacking(msg) => { @@ -328,9 +341,7 @@ impl CandidateBackingJob { None => continue, }; - let mut validator_indices = BitVec::with_capacity( - group.len() - ); + let mut validator_indices = BitVec::with_capacity(group.len()); validator_indices.resize(group.len(), false); @@ -371,7 +382,7 @@ impl CandidateBackingJob { if let Ok(report) = MisbehaviorReport::try_from(f) { let message = ProvisionerMessage::ProvisionableData( - ProvisionableData::MisbehaviorReport(self.parent, report) + ProvisionableData::MisbehaviorReport(self.parent, report), ); reports.push(message); @@ -513,18 +524,7 @@ impl CandidateBackingJob { } fn sign_statement(&self, statement: Statement) -> Option { - let local_index = self.table_context.local_index()?; - - let signing_key = self.table_context.key.as_ref()?; - - let signed_statement = SignedFullStatement::sign( - statement, - &self.table_context.signing_context, - local_index, - signing_key, - ); - - Some(signed_statement) + Some(self.table_context.validator.as_ref()?.sign(statement)) } fn check_statement_signature(&self, statement: &SignedFullStatement) -> Result<(), Error> { @@ -657,329 +657,133 @@ impl CandidateBackingJob { } } -struct JobHandle { - abort_handle: future::AbortHandle, - to_job: mpsc::Sender, - finished: oneshot::Receiver<()>, - su_handle: usize, -} +impl util::JobTrait for CandidateBackingJob { + type ToJob = ToJob; + type FromJob = FromJob; + type Error = Error; + type RunArgs = KeyStorePtr; -impl JobHandle { - async fn stop(mut self) { - let _ = self.to_job.send(ToJob::Stop).await; - let stop_timer = Delay::new(Duration::from_secs(1)); - - match future::select(stop_timer, self.finished).await { - Either::Left((_, _)) => { - }, - Either::Right((_, _)) => { - self.abort_handle.abort(); - }, - } - } + const NAME: &'static str = "CandidateBackingJob"; - async fn send_msg(&mut self, msg: ToJob) -> Result<(), Error> { - Ok(self.to_job.send(msg).await?) - } -} - -struct Jobs { - spawner: S, - running: HashMap, - outgoing_msgs: StreamUnordered>, -} - -async fn run_job( - parent: Hash, - keystore: KeyStorePtr, - rx_to: mpsc::Receiver, - mut tx_from: mpsc::Sender, -) -> Result<(), Error> { - let (validators, roster) = futures::try_join!( - request_validators(parent, &mut tx_from).await?, - request_validator_groups(parent, &mut tx_from).await?, - )?; - - let key = signing_key(&validators[..], &keystore).ok_or(Error::NotInValidatorSet)?; - let mut groups = HashMap::new(); - - for assignment in roster.scheduled { - if let Some(g) = roster.validator_groups.get(assignment.group_idx.0 as usize) { - groups.insert( - assignment.para_id, - g.clone(), - ); - } - } - - let mut assignment = Default::default(); - - if let Some(idx) = validators.iter().position(|k| *k == key.public()) { - let idx = idx as u32; - for (para_id, group) in groups.iter() { - if group.contains(&idx) { - assignment = *para_id; - break; + fn run( + parent: Hash, + keystore: KeyStorePtr, + rx_to: mpsc::Receiver, + mut tx_from: mpsc::Sender, + ) -> Pin> + Send>> { + async move { + let (validators, roster, signing_context) = futures::try_join!( + request_validators(parent, &mut tx_from).await?, + request_validator_groups(parent, &mut tx_from).await?, + request_signing_context(parent, &mut tx_from).await?, + )?; + + let validator = Validator::construct(&validators, signing_context, keystore.clone())?; + + let mut groups = HashMap::new(); + + for assignment in roster.scheduled { + if let Some(g) = roster.validator_groups.get(assignment.group_idx.0 as usize) { + groups.insert(assignment.para_id, g.clone()); + } } - } - } - - let signing_context = request_signing_context(parent, &mut tx_from).await?.await?; - - let table_context = TableContext { - signing_context, - key: Some(key), - groups, - validators, - }; - - let job = CandidateBackingJob { - parent, - rx_to, - tx_from, - assignment, - issued_statements: HashSet::new(), - seconded: None, - reported_misbehavior_for: HashSet::new(), - table: Table::default(), - table_context, - }; - - job.run().await -} - -/// Request a validator set from the `RuntimeApi`. -async fn request_validators( - parent: Hash, - s: &mut mpsc::Sender, -) -> Result>, Error> { - let (tx, rx) = oneshot::channel(); - s.send(FromJob::RuntimeApiMessage(RuntimeApiMessage::Request( - parent, - RuntimeApiRequest::Validators(tx), - ) - )).await?; + let mut assignment = Default::default(); - Ok(rx) -} - -/// Request the scheduler roster from `RuntimeApi`. -async fn request_validator_groups( - parent: Hash, - s: &mut mpsc::Sender, -) -> Result, Error> { - let (tx, rx) = oneshot::channel(); - - s.send(FromJob::RuntimeApiMessage(RuntimeApiMessage::Request( - parent, - RuntimeApiRequest::ValidatorGroups(tx), - ) - )).await?; - - Ok(rx) -} - -/// Request a `SigningContext` from the `RuntimeApi`. -async fn request_signing_context( - parent: Hash, - s: &mut mpsc::Sender, -) -> Result, Error> { - let (tx, rx) = oneshot::channel(); - - s.send(FromJob::RuntimeApiMessage(RuntimeApiMessage::Request( - parent, - RuntimeApiRequest::SigningContext(tx), - ) - )).await?; - - Ok(rx) -} - -impl Jobs { - fn new(spawner: S) -> Self { - Self { - spawner, - running: HashMap::default(), - outgoing_msgs: StreamUnordered::new(), - } - } - - fn spawn_job(&mut self, parent_hash: Hash, keystore: KeyStorePtr) -> Result<(), Error> { - let (to_job_tx, to_job_rx) = mpsc::channel(CHANNEL_CAPACITY); - let (from_job_tx, from_job_rx) = mpsc::channel(CHANNEL_CAPACITY); - - let (future, abort_handle) = future::abortable(async move { - if let Err(e) = run_job(parent_hash, keystore, to_job_rx, from_job_tx).await { - log::error!( - "CandidateBackingJob({}) finished with an error {:?}", - parent_hash, - e, - ); + if let Some(idx) = validators.iter().position(|k| *k == validator.id()) { + let idx = idx as u32; + for (para_id, group) in groups.iter() { + if group.contains(&idx) { + assignment = *para_id; + break; + } + } } - }); - - let (finished_tx, finished) = oneshot::channel(); - - let future = async move { - let _ = future.await; - let _ = finished_tx.send(()); - }; - self.spawner.spawn(future)?; - - let su_handle = self.outgoing_msgs.push(from_job_rx); - - let handle = JobHandle { - abort_handle, - to_job: to_job_tx, - finished, - su_handle, - }; - - self.running.insert(parent_hash, handle); - Ok(()) - } + let table_context = TableContext { + groups, + validators, + signing_context: validator.signing_context().clone(), + validator: Some(validator), + }; - async fn stop_job(&mut self, parent_hash: Hash) -> Result<(), Error> { - match self.running.remove(&parent_hash) { - Some(handle) => { - Pin::new(&mut self.outgoing_msgs).remove(handle.su_handle); - handle.stop().await; - Ok(()) - } - None => Err(Error::JobNotFound(parent_hash)) - } - } + let job = CandidateBackingJob { + parent, + rx_to, + tx_from, + assignment, + issued_statements: HashSet::new(), + seconded: None, + reported_misbehavior_for: HashSet::new(), + table: Table::default(), + table_context, + }; - async fn send_msg(&mut self, parent_hash: Hash, msg: ToJob) -> Result<(), Error> { - if let Some(job) = self.running.get_mut(&parent_hash) { - job.send_msg(msg).await?; + job.run_loop().await } - Ok(()) - } - - async fn next(&mut self) -> Option { - self.outgoing_msgs.next().await.and_then(|(e, _)| match e { - StreamYield::Item(e) => Some(e), - _ => None, - }) + .boxed() } } +/// Manager type for the CandidateBackingSubsystem +type Manager = util::JobManager; + /// An implementation of the Candidate Backing subsystem. -pub struct CandidateBackingSubsystem { - spawner: S, - keystore: KeyStorePtr, - _context: std::marker::PhantomData, +pub struct CandidateBackingSubsystem { + manager: Manager, } -impl CandidateBackingSubsystem - where - S: Spawn + Clone, - Context: SubsystemContext, +impl CandidateBackingSubsystem +where + Spawner: Clone + Spawn + Send + Unpin, + Context: SubsystemContext, + ToJob: From<::Message>, { /// Creates a new `CandidateBackingSubsystem`. - pub fn new(keystore: KeyStorePtr, spawner: S) -> Self { - Self { - spawner, - keystore, - _context: std::marker::PhantomData, + pub fn new(spawner: Spawner, keystore: KeyStorePtr) -> Self { + CandidateBackingSubsystem { + manager: util::JobManager::new(spawner, keystore) } } - async fn run( - mut ctx: Context, - keystore: KeyStorePtr, - spawner: S, - ) { - let mut jobs = Jobs::new(spawner.clone()); - - loop { - select! { - incoming = ctx.recv().fuse() => { - match incoming { - Ok(msg) => match msg { - FromOverseer::Signal(OverseerSignal::StartWork(hash)) => { - if let Err(e) = jobs.spawn_job(hash, keystore.clone()) { - log::error!("Failed to spawn a job: {:?}", e); - break; - } - } - FromOverseer::Signal(OverseerSignal::StopWork(hash)) => { - if let Err(e) = jobs.stop_job(hash).await { - log::error!("Failed to spawn a job: {:?}", e); - break; - } - } - FromOverseer::Communication { msg } => { - match msg { - CandidateBackingMessage::Second(hash, _, _) | - CandidateBackingMessage::Statement(hash, _) | - CandidateBackingMessage::GetBackedCandidates(hash, _) => { - let res = jobs.send_msg( - hash.clone(), - ToJob::CandidateBacking(msg), - ).await; - - if let Err(e) = res { - log::error!( - "Failed to send a message to a job: {:?}", - e, - ); - - break; - } - } - _ => (), - } - } - _ => (), - }, - Err(_) => break, - } - } - outgoing = jobs.next().fuse() => { - match outgoing { - Some(msg) => { - let _ = ctx.send_message(msg.into()).await; - } - None => break, - } - } - complete => break, - } - } + /// Run this subsystem + pub async fn run(ctx: Context, keystore: KeyStorePtr, spawner: Spawner) { + >::run(ctx, keystore, spawner).await } } -impl Subsystem for CandidateBackingSubsystem - where - S: Spawn + Send + Clone + 'static, - Context: SubsystemContext, +impl Subsystem for CandidateBackingSubsystem +where + Spawner: Spawn + Send + Clone + Unpin + 'static, + Context: SubsystemContext, + ::Message: Into, { fn start(self, ctx: Context) -> SpawnedSubsystem { - let keystore = self.keystore.clone(); - let spawner = self.spawner.clone(); - - SpawnedSubsystem(Box::pin(async move { - Self::run(ctx, keystore, spawner).await; - })) + self.manager.start(ctx) } } + + #[cfg(test)] mod tests { use super::*; - use futures::{Future, executor::{self, ThreadPool}}; - use std::collections::HashMap; - use std::sync::Arc; - use sp_keyring::Sr25519Keyring; + use assert_matches::assert_matches; + use futures::{ + executor::{self, ThreadPool}, + future, Future, + }; use polkadot_primitives::v1::{ - AssignmentKind, CollatorId, CoreAssignment, BlockData, CoreIndex, GroupIndex, ValidityAttestation, - CandidateCommitments, LocalValidationData, GlobalValidationSchedule, HeadData, + AssignmentKind, BlockData, CandidateCommitments, CollatorId, CoreAssignment, CoreIndex, + LocalValidationData, GlobalValidationSchedule, GroupIndex, HeadData, + ValidatorPair, ValidityAttestation, }; - use assert_matches::assert_matches; + use polkadot_subsystem::{ + messages::{RuntimeApiRequest, SchedulerRoster}, + FromOverseer, OverseerSignal, + }; + use sp_keyring::Sr25519Keyring; + use std::collections::HashMap; fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec { val_ids.iter().map(|v| v.public().into()).collect() @@ -1545,7 +1349,6 @@ mod tests { ).unwrap(); } ); - }); } @@ -1626,8 +1429,6 @@ mod tests { virtual_overseer.send(FromOverseer::Communication{ msg: second }).await; - let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap(); - assert_matches!( virtual_overseer.recv().await, AllMessages::CandidateValidation( diff --git a/node/subsystem/Cargo.toml b/node/subsystem/Cargo.toml index 43712319cb71..188e7cbfa764 100644 --- a/node/subsystem/Cargo.toml +++ b/node/subsystem/Cargo.toml @@ -6,9 +6,17 @@ edition = "2018" description = "Subsystem traits and message definitions" [dependencies] +async-trait = "0.1" +derive_more = "0.99.9" +futures = "0.3.5" +futures-timer = "3.0.2" +keystore = { package = "sc-keystore", git = "https://github.com/paritytech/substrate", branch = "master" } +log = "0.4.8" +parity-scale-codec = "1.3.0" +pin-project = "0.4.22" +polkadot-node-primitives = { path = "../primitives" } polkadot-primitives = { path = "../../primitives" } polkadot-statement-table = { path = "../../statement-table" } -polkadot-node-primitives = { path = "../primitives" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } -futures = "0.3.5" -async-trait = "0.1" +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } +streamunordered = "0.5.1" diff --git a/node/subsystem/src/lib.rs b/node/subsystem/src/lib.rs index e374eb9cfcdd..b6c3a79ef3d9 100644 --- a/node/subsystem/src/lib.rs +++ b/node/subsystem/src/lib.rs @@ -20,6 +20,8 @@ //! that communicate via message-passing. They are coordinated by an overseer, provided by a //! separate crate. +#![warn(missing_docs)] + use std::pin::Pin; use futures::prelude::*; @@ -32,6 +34,7 @@ use async_trait::async_trait; use crate::messages::AllMessages; pub mod messages; +pub mod util; /// Signals sent by an overseer to a subsystem. #[derive(PartialEq, Clone, Debug)] @@ -56,6 +59,7 @@ pub enum FromOverseer { /// Some other `Subsystem`'s message. Communication { + /// Contained message msg: M, }, } diff --git a/node/subsystem/src/messages.rs b/node/subsystem/src/messages.rs index 10c861f1410c..d3c630cb56f0 100644 --- a/node/subsystem/src/messages.rs +++ b/node/subsystem/src/messages.rs @@ -51,6 +51,15 @@ pub enum CandidateSelectionMessage { Invalid(Hash, CandidateReceipt), } +impl CandidateSelectionMessage { + /// If the current variant contains the relay parent hash, return it. + pub fn relay_parent(&self) -> Option { + match self { + Self::Invalid(hash, _) => Some(*hash), + } + } +} + /// Messages received by the Candidate Backing subsystem. #[derive(Debug)] pub enum CandidateBackingMessage { @@ -65,6 +74,18 @@ pub enum CandidateBackingMessage { Statement(Hash, SignedFullStatement), } + +impl CandidateBackingMessage { + /// If the current variant contains the relay parent hash, return it. + pub fn relay_parent(&self) -> Option { + match self { + Self::GetBackedCandidates(hash, _) => Some(*hash), + Self::Second(hash, _, _) => Some(*hash), + Self::Statement(hash, _) => Some(*hash), + } + } +} + /// Blanket error for validation failing. #[derive(Debug)] pub struct ValidationFailed; @@ -102,6 +123,16 @@ pub enum CandidateValidationMessage { ), } +impl CandidateValidationMessage { + /// If the current variant contains the relay parent hash, return it. + pub fn relay_parent(&self) -> Option { + match self { + Self::ValidateFromChainState(_, _, _) => None, + Self::ValidateFromExhaustive(_, _, _, _, _) => None, + } + } +} + /// Events from network. #[derive(Debug, Clone)] pub enum NetworkBridgeEvent { @@ -134,6 +165,17 @@ pub enum NetworkBridgeMessage { SendMessage(Vec, ProtocolId, Vec), } +impl NetworkBridgeMessage { + /// If the current variant contains the relay parent hash, return it. + pub fn relay_parent(&self) -> Option { + match self { + Self::RegisterEventProducer(_, _) => None, + Self::ReportPeer(_, _) => None, + Self::SendMessage(_, _, _) => None, + } + } +} + /// Availability Distribution Message. #[derive(Debug)] pub enum AvailabilityDistributionMessage { @@ -147,6 +189,17 @@ pub enum AvailabilityDistributionMessage { NetworkBridgeUpdate(NetworkBridgeEvent), } +impl AvailabilityDistributionMessage { + /// If the current variant contains the relay parent hash, return it. + pub fn relay_parent(&self) -> Option { + match self { + Self::DistributeChunk(hash, _) => Some(*hash), + Self::FetchChunk(hash, _) => Some(*hash), + Self::NetworkBridgeUpdate(_) => None, + } + } +} + /// Bitfield distribution message. #[derive(Debug)] pub enum BitfieldDistributionMessage { @@ -157,6 +210,16 @@ pub enum BitfieldDistributionMessage { NetworkBridgeUpdate(NetworkBridgeEvent), } +impl BitfieldDistributionMessage { + /// If the current variant contains the relay parent hash, return it. + pub fn relay_parent(&self) -> Option { + match self { + Self::DistributeBitfield(hash, _) => Some(*hash), + Self::NetworkBridgeUpdate(_) => None, + } + } +} + /// Availability store subsystem message. #[derive(Debug)] pub enum AvailabilityStoreMessage { @@ -170,6 +233,17 @@ pub enum AvailabilityStoreMessage { StoreChunk(Hash, ValidatorIndex, ErasureChunk), } +impl AvailabilityStoreMessage { + /// If the current variant contains the relay parent hash, return it. + pub fn relay_parent(&self) -> Option { + match self { + Self::QueryPoV(hash, _) => Some(*hash), + Self::QueryChunk(hash, _, _) => Some(*hash), + Self::StoreChunk(hash, _, _) => Some(*hash), + } + } +} + /// The information on scheduler assignments that some somesystems may be querying. #[derive(Debug, Clone)] pub struct SchedulerRoster { @@ -207,6 +281,15 @@ pub enum RuntimeApiMessage { Request(Hash, RuntimeApiRequest), } +impl RuntimeApiMessage { + /// If the current variant contains the relay parent hash, return it. + pub fn relay_parent(&self) -> Option { + match self { + Self::Request(hash, _) => Some(*hash), + } + } +} + /// Statement distribution message. #[derive(Debug)] pub enum StatementDistributionMessage { @@ -217,6 +300,16 @@ pub enum StatementDistributionMessage { NetworkBridgeUpdate(NetworkBridgeEvent), } +impl StatementDistributionMessage { + /// If the current variant contains the relay parent hash, return it. + pub fn relay_parent(&self) -> Option { + match self { + Self::Share(hash, _) => Some(*hash), + Self::NetworkBridgeUpdate(_) => None, + } + } +} + /// This data becomes intrinsics or extrinsics which should be included in a future relay chain block. #[derive(Debug)] pub enum ProvisionableData { @@ -253,6 +346,17 @@ pub enum ProvisionerMessage { ProvisionableData(ProvisionableData), } +impl ProvisionerMessage { + /// If the current variant contains the relay parent hash, return it. + pub fn relay_parent(&self) -> Option { + match self { + Self::RequestBlockAuthorshipData(hash, _) => Some(*hash), + Self::RequestInherentData(hash, _) => Some(*hash), + Self::ProvisionableData(_) => None, + } + } +} + /// Message to the PoV Distribution Subsystem. #[derive(Debug)] pub enum PoVDistributionMessage { @@ -268,6 +372,17 @@ pub enum PoVDistributionMessage { NetworkBridgeUpdate(NetworkBridgeEvent), } +impl PoVDistributionMessage { + /// If the current variant contains the relay parent hash, return it. + pub fn relay_parent(&self) -> Option { + match self { + Self::FetchPoV(hash, _, _) => Some(*hash), + Self::DistributePoV(hash, _, _) => Some(*hash), + Self::NetworkBridgeUpdate(_) => None, + } + } +} + /// A message type tying together all message types that are used across Subsystems. #[derive(Debug)] pub enum AllMessages { diff --git a/node/subsystem/src/util.rs b/node/subsystem/src/util.rs new file mode 100644 index 000000000000..1b89bfb053a6 --- /dev/null +++ b/node/subsystem/src/util.rs @@ -0,0 +1,613 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Utility module for subsystems +//! +//! Many subsystems have common interests such as canceling a bunch of spawned jobs, +//! or determining what their validator ID is. These common interests are factored into +//! this module. + +use crate::{ + messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, SchedulerRoster}, + FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult, +}; +use futures::{ + channel::{mpsc, oneshot}, + future::Either, + prelude::*, + select, + stream::Stream, + task::{self, Spawn, SpawnError, SpawnExt}, +}; +use futures_timer::Delay; +use keystore::KeyStorePtr; +use parity_scale_codec::Encode; +use pin_project::{pin_project, pinned_drop}; +use polkadot_primitives::v1::{ + EncodeAs, Hash, HeadData, Id as ParaId, Signed, SigningContext, + ValidatorId, ValidatorIndex, ValidatorPair, +}; +use sp_core::Pair; +use std::{ + collections::HashMap, + convert::{TryFrom, TryInto}, + marker::Unpin, + pin::Pin, + time::Duration, +}; +use streamunordered::{StreamUnordered, StreamYield}; + +/// Duration a job will wait after sending a stop signal before hard-aborting. +pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1); +/// Capacity of channels to and from individual jobs +pub const JOB_CHANNEL_CAPACITY: usize = 64; + +/// Utility errors +#[derive(Debug, derive_more::From)] +pub enum Error { + /// Attempted to send or receive on a oneshot channel which had been canceled + #[from] + Oneshot(oneshot::Canceled), + /// Attempted to send on a MPSC channel which has been canceled + #[from] + Mpsc(mpsc::SendError), + /// Attempted to spawn a new task, and failed + #[from] + Spawn(SpawnError), + /// Attempted to convert from an AllMessages to a FromJob, and failed. + SenderConversion(String), + /// The local node is not a validator. + NotAValidator, + /// The desired job is not present in the jobs list. + JobNotFound(Hash), +} + +/// Request some data from the `RuntimeApi`. +pub async fn request_from_runtime( + parent: Hash, + sender: &mut mpsc::Sender, + request_builder: RequestBuilder, +) -> Result, Error> +where + RequestBuilder: FnOnce(oneshot::Sender) -> RuntimeApiRequest, + FromJob: TryFrom, + >::Error: std::fmt::Debug, +{ + let (tx, rx) = oneshot::channel(); + + sender + .send( + AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx))) + .try_into() + .map_err(|err| Error::SenderConversion(format!("{:?}", err)))?, + ) + .await?; + + Ok(rx) +} + +/// Request a validator set from the `RuntimeApi`. +pub async fn request_validators( + parent: Hash, + s: &mut mpsc::Sender, +) -> Result>, Error> +where + FromJob: TryFrom, + >::Error: std::fmt::Debug, +{ + request_from_runtime(parent, s, |tx| RuntimeApiRequest::Validators(tx)).await +} + +/// Request the scheduler roster from `RuntimeApi`. +pub async fn request_validator_groups( + parent: Hash, + s: &mut mpsc::Sender, +) -> Result, Error> +where + FromJob: TryFrom, + >::Error: std::fmt::Debug, +{ + request_from_runtime(parent, s, |tx| RuntimeApiRequest::ValidatorGroups(tx)).await +} + +/// Request a `SigningContext` from the `RuntimeApi`. +pub async fn request_signing_context( + parent: Hash, + s: &mut mpsc::Sender, +) -> Result, Error> +where + FromJob: TryFrom, + >::Error: std::fmt::Debug, +{ + request_from_runtime(parent, s, |tx| RuntimeApiRequest::SigningContext(tx)).await +} + +/// Request `HeadData` for some `ParaId` from `RuntimeApi`. +pub async fn request_head_data( + parent: Hash, + s: &mut mpsc::Sender, + id: ParaId, +) -> Result, Error> +where + FromJob: TryFrom, + >::Error: std::fmt::Debug, +{ + request_from_runtime(parent, s, |tx| RuntimeApiRequest::HeadData(id, tx)).await +} + +/// From the given set of validators, find the first key we can sign with, if any. +pub fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option { + let keystore = keystore.read(); + validators + .iter() + .find_map(|v| keystore.key_pair::(&v).ok()) +} + +/// Local validator information +/// +/// It can be created if the local node is a validator in the context of a particular +/// relay chain block. +pub struct Validator { + signing_context: SigningContext, + key: ValidatorPair, + index: ValidatorIndex, +} + +impl Validator { + /// Get a struct representing this node's validator if this node is in fact a validator in the context of the given block. + pub async fn new( + parent: Hash, + keystore: KeyStorePtr, + mut sender: mpsc::Sender, + ) -> Result + where + FromJob: TryFrom, + >::Error: std::fmt::Debug, + { + // Note: request_validators and request_signing_context do not and cannot run concurrently: they both + // have a mutable handle to the same sender. + // However, each of them returns a oneshot::Receiver, and those are resolved concurrently. + let (validators, signing_context) = futures::try_join!( + request_validators(parent, &mut sender).await?, + request_signing_context(parent, &mut sender).await?, + )?; + + Self::construct(&validators, signing_context, keystore) + } + + /// Construct a validator instance without performing runtime fetches. + /// + /// This can be useful if external code also needs the same data. + pub fn construct( + validators: &[ValidatorId], + signing_context: SigningContext, + keystore: KeyStorePtr, + ) -> Result { + let key = signing_key(validators, &keystore).ok_or(Error::NotAValidator)?; + let index = validators + .iter() + .enumerate() + .find(|(_, k)| k == &&key.public()) + .map(|(idx, _)| idx as ValidatorIndex) + .expect("signing_key would have already returned NotAValidator if the item we're searching for isn't in this list; qed"); + + Ok(Validator { + signing_context, + key, + index, + }) + } + + /// Get this validator's id. + pub fn id(&self) -> ValidatorId { + self.key.public() + } + + /// Get this validator's local index. + pub fn index(&self) -> ValidatorIndex { + self.index + } + + /// Get the current signing context. + pub fn signing_context(&self) -> &SigningContext { + &self.signing_context + } + + /// Sign a payload with this validator + pub fn sign, RealPayload: Encode>( + &self, + payload: Payload, + ) -> Signed { + Signed::sign(payload, &self.signing_context, self.index, &self.key) + } + + /// Validate the payload with this validator + /// + /// Validation can only succeed if `signed.validator_index() == self.index()`. + /// Normally, this will always be the case for a properly operating program, + /// but it's double-checked here anyway. + pub fn check_payload, RealPayload: Encode>( + &self, + signed: Signed, + ) -> Result<(), ()> { + if signed.validator_index() != self.index { + return Err(()); + } + signed.check_signature(&self.signing_context, &self.id()) + } +} + +/// ToJob is expected to be an enum declaring the set of messages of interest to a particular job. +/// +/// Normally, this will be some subset of `Allmessages`, and a `Stop` variant. +pub trait ToJobTrait: TryFrom { + /// The `Stop` variant of the ToJob enum. + const STOP: Self; + + /// If the message variant contains its relay parent, return it here + fn relay_parent(&self) -> Option; +} + +/// A JobHandle manages a particular job for a subsystem. +pub struct JobHandle { + abort_handle: future::AbortHandle, + to_job: mpsc::Sender, + finished: oneshot::Receiver<()>, + outgoing_msgs_handle: usize, +} + +impl JobHandle { + /// Send a message to the job. + pub async fn send_msg(&mut self, msg: ToJob) -> Result<(), Error> { + self.to_job.send(msg).await.map_err(Into::into) + } + + /// Abort the job without waiting for a graceful shutdown + pub fn abort(self) { + self.abort_handle.abort(); + } +} + +impl JobHandle { + /// Stop this job gracefully. + /// + /// If it hasn't shut itself down after `JOB_GRACEFUL_STOP_DURATION`, abort it. + pub async fn stop(mut self) { + // we don't actually care if the message couldn't be sent + let _ = self.to_job.send(ToJob::STOP).await; + let stop_timer = Delay::new(JOB_GRACEFUL_STOP_DURATION); + + match future::select(stop_timer, self.finished).await { + Either::Left((_, _)) => {} + Either::Right((_, _)) => { + self.abort_handle.abort(); + } + } + } +} + +/// This trait governs jobs. +/// +/// Jobs are instantiated and killed automatically on appropriate overseer messages. +/// Other messages are passed along to and from the job via the overseer to other +/// subsystems. +pub trait JobTrait: Unpin { + /// Message type to the job. Typically a subset of AllMessages. + type ToJob: 'static + ToJobTrait + Send; + /// Message type from the job. Typically a subset of AllMessages. + type FromJob: 'static + Into + Send; + /// Job runtime error. + type Error: std::fmt::Debug; + /// Extra arguments this job needs to run properly. + /// + /// If no extra information is needed, it is perfectly acceptable to set it to `()`. + type RunArgs: 'static + Send; + + /// Name of the job, i.e. `CandidateBackingJob` + const NAME: &'static str; + + /// Run a job for the parent block indicated + fn run( + parent: Hash, + run_args: Self::RunArgs, + rx_to: mpsc::Receiver, + tx_from: mpsc::Sender, + ) -> Pin> + Send>>; + + /// Handle a message which has no relay parent, and therefore can't be dispatched to a particular job + /// + /// By default, this is implemented with a NOP function. However, if + /// ToJob occasionally has messages which do not correspond to a particular + /// parent relay hash, then this function will be spawned as a one-off + /// task to handle those messages. + // TODO: the API here is likely not precisely what we want; figure it out more + // once we're implementing a subsystem which actually needs this feature. + // In particular, we're quite likely to want this to return a future instead of + // interrupting the active thread for the duration of the handler. + fn handle_unanchored_msg(_msg: Self::ToJob) -> Result<(), Self::Error> { + Ok(()) + } +} + +/// Jobs manager for a subsystem +/// +/// - Spawns new jobs for a given relay-parent on demand. +/// - Closes old jobs for a given relay-parent on demand. +/// - Dispatches messages to the appropriate job for a given relay-parent. +/// - When dropped, aborts all remaining jobs. +/// - implements `Stream`, collecting all messages from subordinate jobs. +#[pin_project(PinnedDrop)] +pub struct Jobs { + spawner: Spawner, + running: HashMap>, + #[pin] + outgoing_msgs: StreamUnordered>, + job: std::marker::PhantomData, +} + +impl Jobs { + /// Create a new Jobs manager which handles spawning appropriate jobs. + pub fn new(spawner: Spawner) -> Self { + Self { + spawner, + running: HashMap::new(), + outgoing_msgs: StreamUnordered::new(), + job: std::marker::PhantomData, + } + } + + /// Spawn a new job for this `parent_hash`, with whatever args are appropriate. + fn spawn_job(&mut self, parent_hash: Hash, run_args: Job::RunArgs) -> Result<(), Error> { + let (to_job_tx, to_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY); + let (from_job_tx, from_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY); + let (finished_tx, finished) = oneshot::channel(); + + let (future, abort_handle) = future::abortable(async move { + if let Err(e) = Job::run(parent_hash, run_args, to_job_rx, from_job_tx).await { + log::error!( + "{}({}) finished with an error {:?}", + Job::NAME, + parent_hash, + e, + ); + } + }); + + // discard output + let future = async move { + let _ = future.await; + let _ = finished_tx.send(()); + }; + self.spawner.spawn(future)?; + + // this handle lets us remove the appropriate receiver from self.outgoing_msgs + // when it's time to stop the job. + let outgoing_msgs_handle = self.outgoing_msgs.push(from_job_rx); + + let handle = JobHandle { + abort_handle, + to_job: to_job_tx, + finished, + outgoing_msgs_handle, + }; + + self.running.insert(parent_hash, handle); + + Ok(()) + } + + /// Stop the job associated with this `parent_hash`. + pub async fn stop_job(&mut self, parent_hash: Hash) -> Result<(), Error> { + match self.running.remove(&parent_hash) { + Some(handle) => { + Pin::new(&mut self.outgoing_msgs).remove(handle.outgoing_msgs_handle); + handle.stop().await; + Ok(()) + } + None => Err(Error::JobNotFound(parent_hash)), + } + } + + /// Send a message to the appropriate job for this `parent_hash`. + async fn send_msg(&mut self, parent_hash: Hash, msg: Job::ToJob) -> Result<(), Error> { + match self.running.get_mut(&parent_hash) { + Some(job) => job.send_msg(msg).await?, + None => return Err(Error::JobNotFound(parent_hash)), + } + Ok(()) + } +} + +// Note that on drop, we don't have the chance to gracefully spin down each of the remaining handles; +// we just abort them all. Still better than letting them dangle. +#[pinned_drop] +impl PinnedDrop for Jobs { + fn drop(self: Pin<&mut Self>) { + for job_handle in self.running.values() { + job_handle.abort_handle.abort(); + } + } +} + +impl Stream for Jobs +where + Spawner: Spawn, + Job: JobTrait, +{ + type Item = Job::FromJob; + + fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll> { + // pin-project the outgoing messages + self.project() + .outgoing_msgs + .poll_next(cx) + .map(|opt| opt.and_then(|(stream_yield, _)| match stream_yield { + StreamYield::Item(msg) => Some(msg), + StreamYield::Finished(_) => None, + })) + } +} + +/// A basic implementation of a subsystem. +/// +/// This struct is responsible for handling message traffic between +/// this subsystem and the overseer. It spawns and kills jobs on the +/// appropriate Overseer messages, and dispatches standard traffic to +/// the appropriate job the rest of the time. +pub struct JobManager { + spawner: Spawner, + run_args: Job::RunArgs, + context: std::marker::PhantomData, + job: std::marker::PhantomData, +} + +impl JobManager +where + Spawner: Spawn + Clone + Send + Unpin, + Context: SubsystemContext, + Job: JobTrait, + Job::RunArgs: Clone, + Job::ToJob: TryFrom + TryFrom<::Message> + Sync, +{ + /// Creates a new `Subsystem`. + pub fn new(spawner: Spawner, run_args: Job::RunArgs) -> Self { + Self { + spawner, + run_args, + context: std::marker::PhantomData, + job: std::marker::PhantomData, + } + } + + /// Run this subsystem + /// + /// Conceptually, this is very simple: it just loops forever. + /// + /// - On incoming overseer messages, it starts or stops jobs as appropriate. + /// - On other incoming messages, if they can be converted into Job::ToJob and + /// include a hash, then they're forwarded to the appropriate individual job. + /// - On outgoing messages from the jobs, it forwards them to the overseer. + pub async fn run(mut ctx: Context, run_args: Job::RunArgs, spawner: Spawner) { + let mut jobs = Jobs::new(spawner.clone()); + + loop { + select! { + incoming = ctx.recv().fuse() => if Self::handle_incoming(incoming, &mut jobs, &run_args).await { break }, + outgoing = jobs.next().fuse() => if Self::handle_outgoing(outgoing, &mut ctx).await { break }, + complete => break, + } + } + } + + // handle an incoming message. return true if we should break afterwards. + async fn handle_incoming( + incoming: SubsystemResult>, + jobs: &mut Jobs, + run_args: &Job::RunArgs, + ) -> bool { + use crate::FromOverseer::{Communication, Signal}; + use crate::OverseerSignal::{Conclude, StartWork, StopWork}; + + match incoming { + Ok(Signal(StartWork(hash))) => { + if let Err(e) = jobs.spawn_job(hash, run_args.clone()) { + log::error!("Failed to spawn a job: {:?}", e); + return true; + } + } + Ok(Signal(StopWork(hash))) => { + if let Err(e) = jobs.stop_job(hash).await { + log::error!("Failed to stop a job: {:?}", e); + return true; + } + } + Ok(Signal(Conclude)) => { + // Breaking the loop ends fn run, which drops `jobs`, which immediately drops all ongoing work. + // We can afford to wait a little while to shut them all down properly before doing that. + // + // Forwarding the stream to a drain means we wait until all of the items in the stream + // have completed. Contrast with `into_future`, which turns it into a future of `(head, rest_stream)`. + use futures::stream::StreamExt; + use futures::stream::FuturesUnordered; + + let unordered = jobs.running + .drain() + .map(|(_, handle)| handle.stop()) + .collect::>(); + // now wait for all the futures to complete; collect a vector of their results + // this is strictly less efficient than draining them into oblivion, but this compiles, and that doesn't + // https://github.com/paritytech/polkadot/pull/1376#pullrequestreview-446488645 + let _ = async move { unordered.collect::>() }.await; + + return true; + } + Ok(Communication { msg }) => { + if let Ok(to_job) = ::try_from(msg) { + match to_job.relay_parent() { + Some(hash) => { + if let Err(err) = jobs.send_msg(hash, to_job).await { + log::error!("Failed to send a message to a job: {:?}", err); + return true; + } + } + None => { + if let Err(err) = Job::handle_unanchored_msg(to_job) { + log::error!("Failed to handle unhashed message: {:?}", err); + return true; + } + } + } + } + } + Err(err) => { + log::error!("error receiving message from subsystem context: {:?}", err); + return true; + } + } + false + } + + // handle an outgoing message. return true if we should break afterwards. + async fn handle_outgoing(outgoing: Option, ctx: &mut Context) -> bool { + match outgoing { + Some(msg) => { + // discard errors when sending the message upstream + let _ = ctx.send_message(msg.into()).await; + } + None => return true, + } + false + } +} + +impl Subsystem for JobManager +where + Spawner: Spawn + Send + Clone + Unpin + 'static, + Context: SubsystemContext, + ::Message: Into, + Job: JobTrait + Send, + Job::RunArgs: Clone + Sync, + Job::ToJob: TryFrom + Sync, +{ + fn start(self, ctx: Context) -> SpawnedSubsystem { + let spawner = self.spawner.clone(); + let run_args = self.run_args.clone(); + + SpawnedSubsystem(Box::pin(async move { + Self::run(ctx, run_args, spawner).await; + })) + } +}