From 860ebd409c9c040c4e6192861a0183530998e276 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 22 Apr 2022 14:12:37 -0500 Subject: [PATCH 1/4] backing: refactor off of jobs system --- node/core/backing/src/lib.rs | 784 +++++++++++++++++++-------------- node/core/backing/src/tests.rs | 7 +- 2 files changed, 456 insertions(+), 335 deletions(-) diff --git a/node/core/backing/src/lib.rs b/node/core/backing/src/lib.rs index f7212db27077..d5e972905be4 100644 --- a/node/core/backing/src/lib.rs +++ b/node/core/backing/src/lib.rs @@ -20,14 +20,13 @@ use std::{ collections::{HashMap, HashSet}, - pin::Pin, sync::Arc, }; use bitvec::vec::BitVec; use futures::{ channel::{mpsc, oneshot}, - Future, FutureExt, SinkExt, StreamExt, + FutureExt, SinkExt, StreamExt, }; use polkadot_node_primitives::{ @@ -38,7 +37,7 @@ use polkadot_node_subsystem_util::{ self as util, metrics::{self, prometheus}, request_from_runtime, request_session_index_for_child, request_validator_groups, - request_validators, FromJobCommand, JobSender, Validator, + request_validators, Validator, }; use polkadot_primitives::v2::{ BackedCandidate, CandidateCommitments, CandidateHash, CandidateReceipt, CollatorId, @@ -53,7 +52,8 @@ use polkadot_subsystem::{ DisputeCoordinatorMessage, ProvisionableData, ProvisionerMessage, RuntimeApiRequest, StatementDistributionMessage, ValidationFailed, }, - overseer, ActivatedLeaf, PerLeafSpan, Stage, SubsystemSender, + overseer, ActiveLeavesUpdate, FromOverseer, OverseerSignal, PerLeafSpan, SpawnedSubsystem, + Stage, SubsystemContext, SubsystemError, SubsystemSender, }; use sp_keystore::SyncCryptoStorePtr; use statement_table::{ @@ -82,6 +82,8 @@ pub enum Error { Send(Vec), #[error("FetchPoV failed")] FetchPoV, + #[error("Failed to spawn background task")] + FailedToSpawnBg, #[error("ValidateFromChainState channel closed before receipt")] ValidateFromChainState(#[source] oneshot::Canceled), #[error("StoreAvailableData channel closed before receipt")] @@ -96,6 +98,8 @@ pub enum Error { BackgroundValidationMpsc(#[from] mpsc::SendError), #[error(transparent)] UtilError(#[from] util::Error), + #[error(transparent)] + SubsystemError(#[from] SubsystemError), } /// PoV data to validate. @@ -142,8 +146,283 @@ impl ValidatedCandidateCommand { } } +/// The candidate backing subsystem. +pub struct CandidateBackingSubsystem { + keystore: SyncCryptoStorePtr, + metrics: Metrics, +} + +impl CandidateBackingSubsystem { + /// Create a new instance of the `CandidateBackingSubsystem`. + pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self { + Self { keystore, metrics } + } +} + +impl overseer::Subsystem for CandidateBackingSubsystem +where + Context: SubsystemContext, + Context: overseer::SubsystemContext, +{ + fn start(self, ctx: Context) -> SpawnedSubsystem { + let future = async move { + run(ctx, self.keystore, self.metrics) + .await + .map_err(|e| SubsystemError::with_origin("candidate-backing", e)) + } + .boxed(); + + SpawnedSubsystem { name: "candidate-backing-subsystem", future } + } +} + +async fn run( + mut ctx: Context, + keystore: SyncCryptoStorePtr, + metrics: Metrics, +) -> Result<(), Error> +where + Context: SubsystemContext, + Context: overseer::SubsystemContext, +{ + let (background_validation_tx, mut background_validation_rx) = mpsc::channel(16); + let mut jobs = HashMap::new(); + + // TODO [now]: refactor to JFYIError and add a `run_until_error` function + // which actually does this loop + loop { + futures::select!( + validated_command = background_validation_rx.next().fuse() => { + if let Some((relay_parent, command)) = validated_command { + handle_validated_candidate_command( + &mut ctx, + &mut jobs, + relay_parent, + command, + ).await?; + } else { + panic!("background_validation_tx always alive at this point; qed"); + } + } + from_overseer = ctx.recv().fuse() => { + match from_overseer? { + FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => handle_active_leaves_update( + &mut ctx, + update, + &mut jobs, + &keystore, + &background_validation_tx, + &metrics, + ).await?, + FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {} + FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()), + FromOverseer::Communication { msg } => handle_communication(&mut ctx, &mut jobs, msg).await?, + } + } + ) + } +} + +async fn handle_validated_candidate_command( + ctx: &mut Context, + jobs: &mut HashMap>, + relay_parent: Hash, + command: ValidatedCandidateCommand, +) -> Result<(), Error> +where + Context: SubsystemContext, + Context: overseer::SubsystemContext, +{ + if let Some(job) = jobs.get_mut(&relay_parent) { + job.job.handle_validated_candidate_command(&job.span, ctx, command).await?; + } else { + // simple race condition; can be ignored - this relay-parent + // is no longer relevant. + } + + Ok(()) +} + +async fn handle_communication( + ctx: &mut Context, + jobs: &mut HashMap>, + message: CandidateBackingMessage, +) -> Result<(), Error> +where + Context: SubsystemContext, + Context: overseer::SubsystemContext, +{ + match message { + CandidateBackingMessage::Second(relay_parent, candidate, pov) => { + if let Some(job) = jobs.get_mut(&relay_parent) { + job.job.handle_second_msg(&job.span, ctx, candidate, pov).await?; + } + }, + CandidateBackingMessage::Statement(relay_parent, statement) => { + if let Some(job) = jobs.get_mut(&relay_parent) { + job.job.handle_statement_message(&job.span, ctx, statement).await?; + } + }, + CandidateBackingMessage::GetBackedCandidates(relay_parent, requested_candidates, tx) => + if let Some(job) = jobs.get_mut(&relay_parent) { + job.job.handle_get_backed_candidates_message(requested_candidates, tx)?; + }, + } + + Ok(()) +} + +async fn handle_active_leaves_update( + ctx: &mut Context, + update: ActiveLeavesUpdate, + jobs: &mut HashMap>, + keystore: &SyncCryptoStorePtr, + background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>, + metrics: &Metrics, +) -> Result<(), Error> +where + Context: SubsystemContext, + Context: overseer::SubsystemContext, +{ + for deactivated in update.deactivated { + jobs.remove(&deactivated); + } + + let leaf = match update.activated { + None => return Ok(()), + Some(a) => a, + }; + + macro_rules! try_runtime_api { + ($x: expr) => { + match $x { + Ok(x) => x, + Err(e) => { + gum::warn!( + target: LOG_TARGET, + err = ?e, + "Failed to fetch runtime API data for job", + ); + + // We can't do candidate validation work if we don't have the + // requisite runtime API data. But these errors should not take + // down the node. + return Ok(()); + } + } + } + } + + let parent = leaf.hash; + let span = PerLeafSpan::new(leaf.span, "backing"); + let _span = span.child("runtime-apis"); + + let (validators, groups, session_index, cores) = futures::try_join!( + request_validators(parent, ctx.sender()).await, + request_validator_groups(parent, ctx.sender()).await, + request_session_index_for_child(parent, ctx.sender()).await, + request_from_runtime(parent, ctx.sender(), |tx| { + RuntimeApiRequest::AvailabilityCores(tx) + },) + .await, + ) + .map_err(Error::JoinMultiple)?; + + let validators: Vec<_> = try_runtime_api!(validators); + let (validator_groups, group_rotation_info) = try_runtime_api!(groups); + let session_index = try_runtime_api!(session_index); + let cores = try_runtime_api!(cores); + + drop(_span); + let _span = span.child("validator-construction"); + + let signing_context = SigningContext { parent_hash: parent, session_index }; + let validator = + match Validator::construct(&validators, signing_context.clone(), keystore.clone()).await { + Ok(v) => Some(v), + Err(util::Error::NotAValidator) => None, + Err(e) => { + gum::warn!( + target: LOG_TARGET, + err = ?e, + "Cannot participate in candidate backing", + ); + + return Ok(()) + }, + }; + + drop(_span); + let mut assignments_span = span.child("compute-assignments"); + + let mut groups = HashMap::new(); + + let n_cores = cores.len(); + + let mut assignment = None; + + for (idx, core) in cores.into_iter().enumerate() { + // Ignore prospective assignments on occupied cores for the time being. + if let CoreState::Scheduled(scheduled) = core { + let core_index = CoreIndex(idx as _); + let group_index = group_rotation_info.group_for_core(core_index, n_cores); + if let Some(g) = validator_groups.get(group_index.0 as usize) { + if validator.as_ref().map_or(false, |v| g.contains(&v.index())) { + assignment = Some((scheduled.para_id, scheduled.collator)); + } + groups.insert(scheduled.para_id, g.clone()); + } + } + } + + let table_context = TableContext { groups, validators, validator }; + + let (assignment, required_collator) = match assignment { + None => { + assignments_span.add_string_tag("assigned", "false"); + (None, None) + }, + Some((assignment, required_collator)) => { + assignments_span.add_string_tag("assigned", "true"); + assignments_span.add_para_id(assignment); + (Some(assignment), required_collator) + }, + }; + + drop(assignments_span); + let _span = span.child("wait-for-job"); + + let job = CandidateBackingJob { + parent, + session_index, + assignment, + required_collator, + issued_statements: HashSet::new(), + awaiting_validation: HashSet::new(), + fallbacks: HashMap::new(), + seconded: None, + unbacked_candidates: HashMap::new(), + backed: HashSet::new(), + keystore: keystore.clone(), + table: Table::default(), + table_context, + background_validation_tx: background_validation_tx.clone(), + metrics: metrics.clone(), + _marker: std::marker::PhantomData, + }; + + jobs.insert(parent, JobAndSpan { job, span }); + + Ok(()) +} + +struct JobAndSpan { + job: CandidateBackingJob, + span: PerLeafSpan, +} + /// Holds all data needed for candidate backing job operation. -pub struct CandidateBackingJob { +struct CandidateBackingJob { /// The hash of the relay parent on top of which this job is doing it's work. parent: Hash, /// The session index this corresponds to. @@ -168,9 +447,9 @@ pub struct CandidateBackingJob { keystore: SyncCryptoStorePtr, table: Table, table_context: TableContext, - background_validation: mpsc::Receiver, - background_validation_tx: mpsc::Sender, + background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>, metrics: Metrics, + _marker: std::marker::PhantomData, } /// In case a backing validator does not provide a PoV, we need to retry with other backing @@ -297,19 +576,22 @@ fn table_attested_to_backed( } async fn store_available_data( - sender: &mut JobSender, + sender: &mut impl SubsystemSender, n_validators: u32, candidate_hash: CandidateHash, available_data: AvailableData, ) -> Result<(), Error> { let (tx, rx) = oneshot::channel(); sender - .send_message(AvailabilityStoreMessage::StoreAvailableData { - candidate_hash, - n_validators, - available_data, - tx, - }) + .send_message( + AvailabilityStoreMessage::StoreAvailableData { + candidate_hash, + n_validators, + available_data, + tx, + } + .into(), + ) .await; let _ = rx.await.map_err(Error::StoreAvailableData)?; @@ -322,7 +604,7 @@ async fn store_available_data( // This will compute the erasure root internally and compare it to the expected erasure root. // This returns `Err()` iff there is an internal error. Otherwise, it returns either `Ok(Ok(()))` or `Ok(Err(_))`. async fn make_pov_available( - sender: &mut JobSender, + sender: &mut impl SubsystemSender, n_validators: usize, pov: Arc, candidate_hash: CandidateHash, @@ -355,7 +637,7 @@ async fn make_pov_available( } async fn request_pov( - sender: &mut JobSender, + sender: &mut impl SubsystemSender, relay_parent: Hash, from_validator: ValidatorIndex, candidate_hash: CandidateHash, @@ -363,13 +645,16 @@ async fn request_pov( ) -> Result, Error> { let (tx, rx) = oneshot::channel(); sender - .send_message(AvailabilityDistributionMessage::FetchPoV { - relay_parent, - from_validator, - candidate_hash, - pov_hash, - tx, - }) + .send_message( + AvailabilityDistributionMessage::FetchPoV { + relay_parent, + from_validator, + candidate_hash, + pov_hash, + tx, + } + .into(), + ) .await; let pov = rx.await.map_err(|_| Error::FetchPoV)?; @@ -377,19 +662,22 @@ async fn request_pov( } async fn request_candidate_validation( - sender: &mut JobSender, + sender: &mut impl SubsystemSender, candidate_receipt: CandidateReceipt, pov: Arc, ) -> Result { let (tx, rx) = oneshot::channel(); sender - .send_message(CandidateValidationMessage::ValidateFromChainState( - candidate_receipt, - pov, - BACKING_EXECUTION_TIMEOUT, - tx, - )) + .send_message( + CandidateValidationMessage::ValidateFromChainState( + candidate_receipt, + pov, + BACKING_EXECUTION_TIMEOUT, + tx, + ) + .into(), + ) .await; match rx.await { @@ -403,8 +691,8 @@ type BackgroundValidationResult = Result<(CandidateReceipt, CandidateCommitments, Arc), CandidateReceipt>; struct BackgroundValidationParams, F> { - sender: JobSender, - tx_command: mpsc::Sender, + sender: S, + tx_command: mpsc::Sender<(Hash, ValidatedCandidateCommand)>, candidate: CandidateReceipt, relay_parent: Hash, pov: PoVData, @@ -439,7 +727,10 @@ async fn validate_and_make_available( { Err(Error::FetchPoV) => { tx_command - .send(ValidatedCandidateCommand::AttestNoPoV(candidate.hash())) + .send(( + relay_parent, + ValidatedCandidateCommand::AttestNoPoV(candidate.hash()), + )) .await .map_err(Error::BackgroundValidationMpsc)?; return Ok(()) @@ -511,48 +802,20 @@ async fn validate_and_make_available( }, }; - tx_command.send(make_command(res)).await.map_err(Into::into) + tx_command.send((relay_parent, make_command(res))).await.map_err(Into::into) } struct ValidatorIndexOutOfBounds; -impl CandidateBackingJob { - /// Run asynchronously. - async fn run_loop( - mut self, - mut sender: JobSender, - mut rx_to: mpsc::Receiver, - span: PerLeafSpan, - ) -> Result<(), Error> { - loop { - futures::select! { - validated_command = self.background_validation.next() => { - let _span = span.child("process-validation-result"); - if let Some(c) = validated_command { - self.handle_validated_candidate_command(&span, &mut sender, c).await?; - } else { - panic!("`self` hasn't dropped and `self` holds a reference to this sender; qed"); - } - } - to_job = rx_to.next() => match to_job { - None => break, - Some(msg) => { - // we intentionally want spans created in `process_msg` to descend from the - // `span ` which is longer-lived than this ephemeral timing span. - let _timing_span = span.child("process-message"); - self.process_msg(&span, &mut sender, msg).await?; - } - } - } - } - - Ok(()) - } - +impl CandidateBackingJob +where + Context: SubsystemContext, + Context: overseer::SubsystemContext, +{ async fn handle_validated_candidate_command( &mut self, root_span: &jaeger::Span, - sender: &mut JobSender, + ctx: &mut Context, command: ValidatedCandidateCommand, ) -> Result<(), Error> { let candidate_hash = command.candidate_hash(); @@ -575,21 +838,19 @@ impl CandidateBackingJob { commitments, }); if let Some(stmt) = self - .sign_import_and_distribute_statement(sender, statement, root_span) + .sign_import_and_distribute_statement(ctx, statement, root_span) .await? { - sender - .send_message(CollatorProtocolMessage::Seconded( - self.parent, - stmt, - )) - .await; + ctx.send_message(CollatorProtocolMessage::Seconded( + self.parent, + stmt, + )) + .await; } } }, Err(candidate) => { - sender - .send_message(CollatorProtocolMessage::Invalid(self.parent, candidate)) + ctx.send_message(CollatorProtocolMessage::Invalid(self.parent, candidate)) .await; }, } @@ -601,7 +862,7 @@ impl CandidateBackingJob { if !self.issued_statements.contains(&candidate_hash) { if res.is_ok() { let statement = Statement::Valid(candidate_hash); - self.sign_import_and_distribute_statement(sender, statement, &root_span) + self.sign_import_and_distribute_statement(ctx, statement, &root_span) .await?; } self.issued_statements.insert(candidate_hash); @@ -614,7 +875,7 @@ impl CandidateBackingJob { // Ok, another try: let c_span = span.as_ref().map(|s| s.child("try")); let attesting = attesting.clone(); - self.kick_off_validation_work(sender, attesting, c_span).await? + self.kick_off_validation_work(ctx, attesting, c_span).await? } } else { gum::warn!( @@ -631,7 +892,7 @@ impl CandidateBackingJob { async fn background_validate_and_make_available( &mut self, - sender: &mut JobSender, + ctx: &mut Context, params: BackgroundValidationParams< impl SubsystemSender, impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Send + 'static + Sync, @@ -657,9 +918,9 @@ impl CandidateBackingJob { } } }; - sender - .send_command(FromJobCommand::Spawn("backing-validation", bg.boxed())) - .await?; + + ctx.spawn("backing-validation", bg.boxed()) + .map_err(|_| Error::FailedToSpawnBg)?; } Ok(()) @@ -670,7 +931,7 @@ impl CandidateBackingJob { &mut self, parent_span: &jaeger::Span, root_span: &jaeger::Span, - sender: &mut JobSender, + ctx: &mut Context, candidate: &CandidateReceipt, pov: Arc, ) -> Result<(), Error> { @@ -680,8 +941,7 @@ impl CandidateBackingJob { .as_ref() .map_or(false, |c| c != &candidate.descriptor().collator) { - sender - .send_message(CollatorProtocolMessage::Invalid(self.parent, candidate.clone())) + ctx.send_message(CollatorProtocolMessage::Invalid(self.parent, candidate.clone())) .await; return Ok(()) } @@ -702,9 +962,9 @@ impl CandidateBackingJob { "Validate and second candidate", ); - let bg_sender = sender.clone(); + let bg_sender = ctx.sender().clone(); self.background_validate_and_make_available( - sender, + ctx, BackgroundValidationParams { sender: bg_sender, tx_command: self.background_validation_tx.clone(), @@ -723,14 +983,14 @@ impl CandidateBackingJob { async fn sign_import_and_distribute_statement( &mut self, - sender: &mut JobSender, + ctx: &mut Context, statement: Statement, root_span: &jaeger::Span, ) -> Result, Error> { if let Some(signed_statement) = self.sign_statement(statement).await { - self.import_statement(sender, &signed_statement, root_span).await?; + self.import_statement(ctx, &signed_statement, root_span).await?; let smsg = StatementDistributionMessage::Share(self.parent, signed_statement.clone()); - sender.send_unbounded_message(smsg); + ctx.send_unbounded_message(smsg); Ok(Some(signed_statement)) } else { @@ -739,23 +999,22 @@ impl CandidateBackingJob { } /// Check if there have happened any new misbehaviors and issue necessary messages. - async fn issue_new_misbehaviors(&mut self, sender: &mut JobSender) { + async fn issue_new_misbehaviors(&mut self, ctx: &mut Context) { // collect the misbehaviors to avoid double mutable self borrow issues let misbehaviors: Vec<_> = self.table.drain_misbehaviors().collect(); for (validator_id, report) in misbehaviors { - sender - .send_message(ProvisionerMessage::ProvisionableData( - self.parent, - ProvisionableData::MisbehaviorReport(self.parent, validator_id, report), - )) - .await; + ctx.send_message(ProvisionerMessage::ProvisionableData( + self.parent, + ProvisionableData::MisbehaviorReport(self.parent, validator_id, report), + )) + .await; } } /// Import a statement into the statement table and return the summary of the import. async fn import_statement( &mut self, - sender: &mut JobSender, + ctx: &mut Context, statement: &SignedFullStatement, root_span: &jaeger::Span, ) -> Result, Error> { @@ -777,7 +1036,7 @@ impl CandidateBackingJob { }; if let Err(ValidatorIndexOutOfBounds) = self - .dispatch_new_statement_to_dispute_coordinator(sender, candidate_hash, &statement) + .dispatch_new_statement_to_dispute_coordinator(ctx, candidate_hash, &statement) .await { gum::warn!( @@ -817,7 +1076,7 @@ impl CandidateBackingJob { self.parent, ProvisionableData::BackedCandidate(backed.receipt()), ); - sender.send_message(message).await; + ctx.send_message(message).await; span.as_ref().map(|s| s.child("backed")); span @@ -831,7 +1090,7 @@ impl CandidateBackingJob { None }; - self.issue_new_misbehaviors(sender).await; + self.issue_new_misbehaviors(ctx).await; // It is important that the child span is dropped before its parent span (`unbacked_span`) drop(import_statement_span); @@ -853,8 +1112,8 @@ impl CandidateBackingJob { /// the networking component responsible for feeding statements to the backing subsystem /// is meant to check the signature and provenance of all statements before submission. async fn dispatch_new_statement_to_dispute_coordinator( - &self, - sender: &mut JobSender, + &mut self, + ctx: &mut Context, candidate_hash: CandidateHash, statement: &SignedFullStatement, ) -> Result<(), ValidatorIndexOutOfBounds> { @@ -887,100 +1146,107 @@ impl CandidateBackingJob { if let (Some(candidate_receipt), Some(dispute_statement)) = (maybe_candidate_receipt, maybe_signed_dispute_statement) { - sender - .send_message(DisputeCoordinatorMessage::ImportStatements { - candidate_hash, - candidate_receipt, - session: self.session_index, - statements: vec![(dispute_statement, validator_index)], - pending_confirmation: None, - }) - .await; + ctx.send_message(DisputeCoordinatorMessage::ImportStatements { + candidate_hash, + candidate_receipt, + session: self.session_index, + statements: vec![(dispute_statement, validator_index)], + pending_confirmation: None, + }) + .await; } Ok(()) } - async fn process_msg( + async fn handle_second_msg( &mut self, root_span: &jaeger::Span, - sender: &mut JobSender, - msg: CandidateBackingMessage, + ctx: &mut Context, + candidate: CandidateReceipt, + pov: PoV, ) -> Result<(), Error> { - match msg { - CandidateBackingMessage::Second(relay_parent, candidate, pov) => { - let _timer = self.metrics.time_process_second(); - - let span = root_span - .child("second") - .with_stage(jaeger::Stage::CandidateBacking) - .with_pov(&pov) - .with_candidate(candidate.hash()) - .with_relay_parent(relay_parent); - - // Sanity check that candidate is from our assignment. - if Some(candidate.descriptor().para_id) != self.assignment { - gum::debug!( - target: LOG_TARGET, - our_assignment = ?self.assignment, - collation = ?candidate.descriptor().para_id, - "Subsystem asked to second for para outside of our assignment", - ); + let _timer = self.metrics.time_process_second(); - return Ok(()) - } + let candidate_hash = candidate.hash(); + let span = root_span + .child("second") + .with_stage(jaeger::Stage::CandidateBacking) + .with_pov(&pov) + .with_candidate(candidate_hash) + .with_relay_parent(self.parent); + + // Sanity check that candidate is from our assignment. + if Some(candidate.descriptor().para_id) != self.assignment { + gum::debug!( + target: LOG_TARGET, + our_assignment = ?self.assignment, + collation = ?candidate.descriptor().para_id, + "Subsystem asked to second for para outside of our assignment", + ); - // If the message is a `CandidateBackingMessage::Second`, sign and dispatch a - // Seconded statement only if we have not seconded any other candidate and - // have not signed a Valid statement for the requested candidate. - if self.seconded.is_none() { - // This job has not seconded a candidate yet. - let candidate_hash = candidate.hash(); + return Ok(()) + } - if !self.issued_statements.contains(&candidate_hash) { - let pov = Arc::new(pov); - self.validate_and_second(&span, &root_span, sender, &candidate, pov) - .await?; - } - } - }, - CandidateBackingMessage::Statement(_relay_parent, statement) => { - let _timer = self.metrics.time_process_statement(); - let _span = root_span - .child("statement") - .with_stage(jaeger::Stage::CandidateBacking) - .with_candidate(statement.payload().candidate_hash()) - .with_relay_parent(_relay_parent); - - match self.maybe_validate_and_import(&root_span, sender, statement).await { - Err(Error::ValidationFailed(_)) => return Ok(()), - Err(e) => return Err(e), - Ok(()) => (), - } - }, - CandidateBackingMessage::GetBackedCandidates(_, requested_candidates, tx) => { - let _timer = self.metrics.time_get_backed_candidates(); - - let backed = requested_candidates - .into_iter() - .filter_map(|hash| { - self.table.attested_candidate(&hash, &self.table_context).and_then( - |attested| table_attested_to_backed(attested, &self.table_context), - ) - }) - .collect(); - - tx.send(backed).map_err(|data| Error::Send(data))?; - }, + // If the message is a `CandidateBackingMessage::Second`, sign and dispatch a + // Seconded statement only if we have not seconded any other candidate and + // have not signed a Valid statement for the requested candidate. + if self.seconded.is_none() { + // This job has not seconded a candidate yet. + + if !self.issued_statements.contains(&candidate_hash) { + let pov = Arc::new(pov); + self.validate_and_second(&span, &root_span, ctx, &candidate, pov).await?; + } } Ok(()) } + async fn handle_statement_message( + &mut self, + root_span: &jaeger::Span, + ctx: &mut Context, + statement: SignedFullStatement, + ) -> Result<(), Error> { + let _timer = self.metrics.time_process_statement(); + let _span = root_span + .child("statement") + .with_stage(jaeger::Stage::CandidateBacking) + .with_candidate(statement.payload().candidate_hash()) + .with_relay_parent(self.parent); + + match self.maybe_validate_and_import(&root_span, ctx, statement).await { + Err(Error::ValidationFailed(_)) => Ok(()), + Err(e) => Err(e), + Ok(()) => Ok(()), + } + } + + fn handle_get_backed_candidates_message( + &mut self, + requested_candidates: Vec, + tx: oneshot::Sender>, + ) -> Result<(), Error> { + let _timer = self.metrics.time_get_backed_candidates(); + + let backed = requested_candidates + .into_iter() + .filter_map(|hash| { + self.table + .attested_candidate(&hash, &self.table_context) + .and_then(|attested| table_attested_to_backed(attested, &self.table_context)) + }) + .collect(); + + tx.send(backed).map_err(|data| Error::Send(data))?; + Ok(()) + } + /// Kick off validation work and distribute the result as a signed statement. async fn kick_off_validation_work( &mut self, - sender: &mut JobSender, + ctx: &mut Context, attesting: AttestingData, span: Option, ) -> Result<(), Error> { @@ -1008,14 +1274,14 @@ impl CandidateBackingJob { return Ok(()) } - let bg_sender = sender.clone(); + let bg_sender = ctx.sender().clone(); let pov = PoVData::FetchFromValidator { from_validator: attesting.from_validator, candidate_hash, pov_hash: attesting.pov_hash, }; self.background_validate_and_make_available( - sender, + ctx, BackgroundValidationParams { sender: bg_sender, tx_command: self.background_validation_tx.clone(), @@ -1034,10 +1300,10 @@ impl CandidateBackingJob { async fn maybe_validate_and_import( &mut self, root_span: &jaeger::Span, - sender: &mut JobSender, + ctx: &mut Context, statement: SignedFullStatement, ) -> Result<(), Error> { - if let Some(summary) = self.import_statement(sender, &statement, root_span).await? { + if let Some(summary) = self.import_statement(ctx, &statement, root_span).await? { if Some(summary.group_id) != self.assignment { return Ok(()) } @@ -1087,12 +1353,12 @@ impl CandidateBackingJob { }, }; - self.kick_off_validation_work(sender, attesting, span).await?; + self.kick_off_validation_work(ctx, attesting, span).await?; } Ok(()) } - async fn sign_statement(&self, statement: Statement) -> Option { + async fn sign_statement(&mut self, statement: Statement) -> Option { let signed = self .table_context .validator @@ -1159,150 +1425,6 @@ impl CandidateBackingJob { } } -impl util::JobTrait for CandidateBackingJob { - type ToJob = CandidateBackingMessage; - type Error = Error; - type RunArgs = SyncCryptoStorePtr; - type Metrics = Metrics; - - const NAME: &'static str = "candidate-backing-job"; - - fn run( - leaf: ActivatedLeaf, - keystore: SyncCryptoStorePtr, - metrics: Metrics, - rx_to: mpsc::Receiver, - mut sender: JobSender, - ) -> Pin> + Send>> { - let parent = leaf.hash; - async move { - macro_rules! try_runtime_api { - ($x: expr) => { - match $x { - Ok(x) => x, - Err(e) => { - gum::warn!( - target: LOG_TARGET, - err = ?e, - "Failed to fetch runtime API data for job", - ); - - // We can't do candidate validation work if we don't have the - // requisite runtime API data. But these errors should not take - // down the node. - return Ok(()); - } - } - } - } - - let span = PerLeafSpan::new(leaf.span, "backing"); - let _span = span.child("runtime-apis"); - - let (validators, groups, session_index, cores) = futures::try_join!( - request_validators(parent, &mut sender).await, - request_validator_groups(parent, &mut sender).await, - request_session_index_for_child(parent, &mut sender).await, - request_from_runtime(parent, &mut sender, |tx| { - RuntimeApiRequest::AvailabilityCores(tx) - },) - .await, - ) - .map_err(Error::JoinMultiple)?; - - let validators = try_runtime_api!(validators); - let (validator_groups, group_rotation_info) = try_runtime_api!(groups); - let session_index = try_runtime_api!(session_index); - let cores = try_runtime_api!(cores); - - drop(_span); - let _span = span.child("validator-construction"); - - let signing_context = SigningContext { parent_hash: parent, session_index }; - let validator = - match Validator::construct(&validators, signing_context.clone(), keystore.clone()) - .await - { - Ok(v) => Some(v), - Err(util::Error::NotAValidator) => None, - Err(e) => { - gum::warn!( - target: LOG_TARGET, - err = ?e, - "Cannot participate in candidate backing", - ); - - return Ok(()) - }, - }; - - drop(_span); - let mut assignments_span = span.child("compute-assignments"); - - let mut groups = HashMap::new(); - - let n_cores = cores.len(); - - let mut assignment = None; - - for (idx, core) in cores.into_iter().enumerate() { - // Ignore prospective assignments on occupied cores for the time being. - if let CoreState::Scheduled(scheduled) = core { - let core_index = CoreIndex(idx as _); - let group_index = group_rotation_info.group_for_core(core_index, n_cores); - if let Some(g) = validator_groups.get(group_index.0 as usize) { - if validator.as_ref().map_or(false, |v| g.contains(&v.index())) { - assignment = Some((scheduled.para_id, scheduled.collator)); - } - groups.insert(scheduled.para_id, g.clone()); - } - } - } - - let table_context = TableContext { groups, validators, validator }; - - let (assignment, required_collator) = match assignment { - None => { - assignments_span.add_string_tag("assigned", "false"); - (None, None) - }, - Some((assignment, required_collator)) => { - assignments_span.add_string_tag("assigned", "true"); - assignments_span.add_para_id(assignment); - (Some(assignment), required_collator) - }, - }; - - drop(assignments_span); - let _span = span.child("wait-for-job"); - - let (background_tx, background_rx) = mpsc::channel(16); - let job = CandidateBackingJob { - parent, - session_index, - assignment, - required_collator, - issued_statements: HashSet::new(), - awaiting_validation: HashSet::new(), - fallbacks: HashMap::new(), - seconded: None, - unbacked_candidates: HashMap::new(), - backed: HashSet::new(), - keystore, - table: Table::default(), - table_context, - background_validation: background_rx, - background_validation_tx: background_tx, - metrics, - }; - drop(_span); - - job.run_loop(sender, rx_to, span).await - } - .boxed() - } -} - #[derive(Clone)] struct MetricsInner { signed_statements_total: prometheus::Counter, @@ -1389,7 +1511,3 @@ impl metrics::Metrics for Metrics { Ok(Metrics(Some(metrics))) } } - -/// The candidate backing subsystem. -pub type CandidateBackingSubsystem = - polkadot_node_subsystem_util::JobSubsystem; diff --git a/node/core/backing/src/tests.rs b/node/core/backing/src/tests.rs index 5eaee56bcac7..a83906e7478b 100644 --- a/node/core/backing/src/tests.rs +++ b/node/core/backing/src/tests.rs @@ -153,8 +153,11 @@ fn test_harness>( let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone()); - let subsystem = - CandidateBackingSubsystem::new(pool.clone(), keystore, Metrics(None)).run(context); + let subsystem = async move { + if let Err(e) = super::run(context, keystore, Metrics(None)).await { + panic!("{:?}", e); + } + }; let test_fut = test(virtual_overseer); From ba5c1b1dde9831f070603c7f22bd81828ccb6c4a Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 25 Apr 2022 12:15:07 -0500 Subject: [PATCH 2/4] rename FailedToSpawnBg --- node/core/backing/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/core/backing/src/lib.rs b/node/core/backing/src/lib.rs index d5e972905be4..a03508fb3994 100644 --- a/node/core/backing/src/lib.rs +++ b/node/core/backing/src/lib.rs @@ -83,7 +83,7 @@ pub enum Error { #[error("FetchPoV failed")] FetchPoV, #[error("Failed to spawn background task")] - FailedToSpawnBg, + FailedToSpawnBackgroundTask, #[error("ValidateFromChainState channel closed before receipt")] ValidateFromChainState(#[source] oneshot::Canceled), #[error("StoreAvailableData channel closed before receipt")] @@ -920,7 +920,7 @@ where }; ctx.spawn("backing-validation", bg.boxed()) - .map_err(|_| Error::FailedToSpawnBg)?; + .map_err(|_| Error::FailedToSpawnBackgroundTask)?; } Ok(()) From 50a10eba95cfa3bb8b542cd41c6215d23f42c967 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 27 Apr 2022 16:28:31 -0500 Subject: [PATCH 3/4] refactor: backing uses fatality --- Cargo.lock | 1 + node/core/backing/Cargo.toml | 1 + node/core/backing/src/error.rs | 94 ++++++++++++++++++++++++++++++++++ node/core/backing/src/lib.rs | 83 +++++++++++++++--------------- node/core/backing/src/tests.rs | 2 +- 5 files changed, 139 insertions(+), 42 deletions(-) create mode 100644 node/core/backing/src/error.rs diff --git a/Cargo.lock b/Cargo.lock index ff17d1181ac0..7be18973f68c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6700,6 +6700,7 @@ version = "0.9.19" dependencies = [ "assert_matches", "bitvec", + "fatality", "futures 0.3.21", "polkadot-erasure-coding", "polkadot-node-primitives", diff --git a/node/core/backing/Cargo.toml b/node/core/backing/Cargo.toml index 89aaf30ae91c..c4e938c92539 100644 --- a/node/core/backing/Cargo.toml +++ b/node/core/backing/Cargo.toml @@ -16,6 +16,7 @@ statement-table = { package = "polkadot-statement-table", path = "../../../state bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] } gum = { package = "tracing-gum", path = "../../gum" } thiserror = "1.0.30" +fatality = "0.0.6" [dev-dependencies] sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/node/core/backing/src/error.rs b/node/core/backing/src/error.rs new file mode 100644 index 000000000000..39f5c8b7f3fd --- /dev/null +++ b/node/core/backing/src/error.rs @@ -0,0 +1,94 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use fatality::Nested; +use futures::channel::{mpsc, oneshot}; + +use polkadot_node_subsystem_util::Error as UtilError; +use polkadot_primitives::v2::BackedCandidate; +use polkadot_subsystem::{messages::ValidationFailed, SubsystemError}; + +use crate::LOG_TARGET; + +pub type Result = std::result::Result; +pub type FatalResult = std::result::Result; + +/// Errors that can occur in candidate backing. +#[allow(missing_docs)] +#[fatality::fatality(splitable)] +pub enum Error { + #[error("Candidate is not found")] + CandidateNotFound, + + #[error("Signature is invalid")] + InvalidSignature, + + #[error("Failed to send candidates {0:?}")] + Send(Vec), + + #[error("FetchPoV failed")] + FetchPoV, + + #[fatal] + #[error("Failed to spawn background task")] + FailedToSpawnBackgroundTask, + + #[error("ValidateFromChainState channel closed before receipt")] + ValidateFromChainState(#[source] oneshot::Canceled), + + #[error("StoreAvailableData channel closed before receipt")] + StoreAvailableData(#[source] oneshot::Canceled), + + #[error("a channel was closed before receipt in try_join!")] + JoinMultiple(#[source] oneshot::Canceled), + + #[error("Obtaining erasure chunks failed")] + ObtainErasureChunks(#[from] erasure_coding::Error), + + #[error(transparent)] + ValidationFailed(#[from] ValidationFailed), + + #[fatal] + #[error(transparent)] + BackgroundValidationMpsc(#[from] mpsc::SendError), + + #[error(transparent)] + UtilError(#[from] UtilError), + + #[error(transparent)] + SubsystemError(#[from] SubsystemError), +} + +/// Utility for eating top level errors and log them. +/// +/// We basically always want to try and continue on error. This utility function is meant to +/// consume top-level errors by simply logging them +pub fn log_error(result: Result<()>) -> std::result::Result<(), FatalError> { + match result.into_nested()? { + Ok(()) => Ok(()), + Err(jfyi) => { + jfyi.log(); + Ok(()) + }, + } +} + +impl JfyiError { + /// Log a `JfyiError`. + pub fn log(self) { + gum::debug!(target: LOG_TARGET, error = ?self); + } +} diff --git a/node/core/backing/src/lib.rs b/node/core/backing/src/lib.rs index a03508fb3994..77e8997cc789 100644 --- a/node/core/backing/src/lib.rs +++ b/node/core/backing/src/lib.rs @@ -29,6 +29,7 @@ use futures::{ FutureExt, SinkExt, StreamExt, }; +use error::{Error, FatalResult}; use polkadot_node_primitives::{ AvailableData, InvalidCandidate, PoV, SignedDisputeStatement, SignedFullStatement, Statement, ValidationResult, BACKING_EXECUTION_TIMEOUT, @@ -50,7 +51,7 @@ use polkadot_subsystem::{ AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, CandidateBackingMessage, CandidateValidationMessage, CollatorProtocolMessage, DisputeCoordinatorMessage, ProvisionableData, ProvisionerMessage, RuntimeApiRequest, - StatementDistributionMessage, ValidationFailed, + StatementDistributionMessage, }, overseer, ActiveLeavesUpdate, FromOverseer, OverseerSignal, PerLeafSpan, SpawnedSubsystem, Stage, SubsystemContext, SubsystemError, SubsystemSender, @@ -64,44 +65,14 @@ use statement_table::{ }, Context as TableContextTrait, Table, }; -use thiserror::Error; + +mod error; #[cfg(test)] mod tests; const LOG_TARGET: &str = "parachain::candidate-backing"; -/// Errors that can occur in candidate backing. -#[derive(Debug, Error)] -pub enum Error { - #[error("Candidate is not found")] - CandidateNotFound, - #[error("Signature is invalid")] - InvalidSignature, - #[error("Failed to send candidates {0:?}")] - Send(Vec), - #[error("FetchPoV failed")] - FetchPoV, - #[error("Failed to spawn background task")] - FailedToSpawnBackgroundTask, - #[error("ValidateFromChainState channel closed before receipt")] - ValidateFromChainState(#[source] oneshot::Canceled), - #[error("StoreAvailableData channel closed before receipt")] - StoreAvailableData(#[source] oneshot::Canceled), - #[error("a channel was closed before receipt in try_join!")] - JoinMultiple(#[source] oneshot::Canceled), - #[error("Obtaining erasure chunks failed")] - ObtainErasureChunks(#[from] erasure_coding::Error), - #[error(transparent)] - ValidationFailed(#[from] ValidationFailed), - #[error(transparent)] - BackgroundValidationMpsc(#[from] mpsc::SendError), - #[error(transparent)] - UtilError(#[from] util::Error), - #[error(transparent)] - SubsystemError(#[from] SubsystemError), -} - /// PoV data to validate. enum PoVData { /// Already available (from candidate selection). @@ -180,7 +151,7 @@ async fn run( mut ctx: Context, keystore: SyncCryptoStorePtr, metrics: Metrics, -) -> Result<(), Error> +) -> FatalResult<()> where Context: SubsystemContext, Context: overseer::SubsystemContext, @@ -188,15 +159,45 @@ where let (background_validation_tx, mut background_validation_rx) = mpsc::channel(16); let mut jobs = HashMap::new(); - // TODO [now]: refactor to JFYIError and add a `run_until_error` function - // which actually does this loop + loop { + let res = run_iteration( + &mut ctx, + keystore.clone(), + &metrics, + &mut jobs, + background_validation_tx.clone(), + &mut background_validation_rx, + ) + .await; + + match res { + Ok(()) => break, + Err(e) => crate::error::log_error(Err(e))?, + } + } + + Ok(()) +} + +async fn run_iteration( + ctx: &mut Context, + keystore: SyncCryptoStorePtr, + metrics: &Metrics, + jobs: &mut HashMap>, + background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>, + background_validation_rx: &mut mpsc::Receiver<(Hash, ValidatedCandidateCommand)>, +) -> Result<(), Error> +where + Context: SubsystemContext, + Context: overseer::SubsystemContext, +{ loop { futures::select!( validated_command = background_validation_rx.next().fuse() => { if let Some((relay_parent, command)) = validated_command { handle_validated_candidate_command( - &mut ctx, - &mut jobs, + &mut *ctx, + jobs, relay_parent, command, ).await?; @@ -207,16 +208,16 @@ where from_overseer = ctx.recv().fuse() => { match from_overseer? { FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => handle_active_leaves_update( - &mut ctx, + &mut *ctx, update, - &mut jobs, + jobs, &keystore, &background_validation_tx, &metrics, ).await?, FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {} FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()), - FromOverseer::Communication { msg } => handle_communication(&mut ctx, &mut jobs, msg).await?, + FromOverseer::Communication { msg } => handle_communication(&mut *ctx, jobs, msg).await?, } } ) diff --git a/node/core/backing/src/tests.rs b/node/core/backing/src/tests.rs index a83906e7478b..dbe65f5256f2 100644 --- a/node/core/backing/src/tests.rs +++ b/node/core/backing/src/tests.rs @@ -28,7 +28,7 @@ use polkadot_primitives::v2::{ ScheduledCore, }; use polkadot_subsystem::{ - messages::{CollatorProtocolMessage, RuntimeApiMessage, RuntimeApiRequest}, + messages::{CollatorProtocolMessage, RuntimeApiMessage, RuntimeApiRequest, ValidationFailed}, ActivatedLeaf, ActiveLeavesUpdate, FromOverseer, LeafStatus, OverseerSignal, }; use sp_application_crypto::AppKey; From de80b8d8bb8aec57a08b7d03bd060eef8f26f9bf Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 27 Apr 2022 16:32:34 -0500 Subject: [PATCH 4/4] fix service compilation --- node/service/src/overseer.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/node/service/src/overseer.rs b/node/service/src/overseer.rs index fbdd57586b72..fd07ddfe825d 100644 --- a/node/service/src/overseer.rs +++ b/node/service/src/overseer.rs @@ -145,7 +145,7 @@ pub fn prepared_overseer_builder<'a, Spawner, RuntimeClient>( Arc, CandidateValidationSubsystem, PvfCheckerSubsystem, - CandidateBackingSubsystem, + CandidateBackingSubsystem, StatementDistributionSubsystem, AvailabilityDistributionSubsystem, AvailabilityRecoverySubsystem, @@ -201,7 +201,6 @@ where Metrics::register(registry)?, )) .candidate_backing(CandidateBackingSubsystem::new( - spawner.clone(), keystore.clone(), Metrics::register(registry)?, ))