diff --git a/Cargo.lock b/Cargo.lock
index 0507d0029ad9..efe4fca99358 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -6704,6 +6704,7 @@ version = "0.9.19"
dependencies = [
"assert_matches",
"bitvec",
+ "fatality",
"futures 0.3.21",
"polkadot-erasure-coding",
"polkadot-node-primitives",
diff --git a/node/core/backing/Cargo.toml b/node/core/backing/Cargo.toml
index 89aaf30ae91c..c4e938c92539 100644
--- a/node/core/backing/Cargo.toml
+++ b/node/core/backing/Cargo.toml
@@ -16,6 +16,7 @@ statement-table = { package = "polkadot-statement-table", path = "../../../state
bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] }
gum = { package = "tracing-gum", path = "../../gum" }
thiserror = "1.0.30"
+fatality = "0.0.6"
[dev-dependencies]
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
diff --git a/node/core/backing/src/error.rs b/node/core/backing/src/error.rs
new file mode 100644
index 000000000000..39f5c8b7f3fd
--- /dev/null
+++ b/node/core/backing/src/error.rs
@@ -0,0 +1,94 @@
+// Copyright 2022 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+use fatality::Nested;
+use futures::channel::{mpsc, oneshot};
+
+use polkadot_node_subsystem_util::Error as UtilError;
+use polkadot_primitives::v2::BackedCandidate;
+use polkadot_subsystem::{messages::ValidationFailed, SubsystemError};
+
+use crate::LOG_TARGET;
+
+pub type Result = std::result::Result;
+pub type FatalResult = std::result::Result;
+
+/// Errors that can occur in candidate backing.
+#[allow(missing_docs)]
+#[fatality::fatality(splitable)]
+pub enum Error {
+ #[error("Candidate is not found")]
+ CandidateNotFound,
+
+ #[error("Signature is invalid")]
+ InvalidSignature,
+
+ #[error("Failed to send candidates {0:?}")]
+ Send(Vec),
+
+ #[error("FetchPoV failed")]
+ FetchPoV,
+
+ #[fatal]
+ #[error("Failed to spawn background task")]
+ FailedToSpawnBackgroundTask,
+
+ #[error("ValidateFromChainState channel closed before receipt")]
+ ValidateFromChainState(#[source] oneshot::Canceled),
+
+ #[error("StoreAvailableData channel closed before receipt")]
+ StoreAvailableData(#[source] oneshot::Canceled),
+
+ #[error("a channel was closed before receipt in try_join!")]
+ JoinMultiple(#[source] oneshot::Canceled),
+
+ #[error("Obtaining erasure chunks failed")]
+ ObtainErasureChunks(#[from] erasure_coding::Error),
+
+ #[error(transparent)]
+ ValidationFailed(#[from] ValidationFailed),
+
+ #[fatal]
+ #[error(transparent)]
+ BackgroundValidationMpsc(#[from] mpsc::SendError),
+
+ #[error(transparent)]
+ UtilError(#[from] UtilError),
+
+ #[error(transparent)]
+ SubsystemError(#[from] SubsystemError),
+}
+
+/// Utility for eating top level errors and log them.
+///
+/// We basically always want to try and continue on error. This utility function is meant to
+/// consume top-level errors by simply logging them
+pub fn log_error(result: Result<()>) -> std::result::Result<(), FatalError> {
+ match result.into_nested()? {
+ Ok(()) => Ok(()),
+ Err(jfyi) => {
+ jfyi.log();
+ Ok(())
+ },
+ }
+}
+
+impl JfyiError {
+ /// Log a `JfyiError`.
+ pub fn log(self) {
+ gum::debug!(target: LOG_TARGET, error = ?self);
+ }
+}
diff --git a/node/core/backing/src/lib.rs b/node/core/backing/src/lib.rs
index e26b99426b03..b459264d7a02 100644
--- a/node/core/backing/src/lib.rs
+++ b/node/core/backing/src/lib.rs
@@ -20,23 +20,23 @@
use std::{
collections::{HashMap, HashSet},
- pin::Pin,
sync::Arc,
};
use bitvec::vec::BitVec;
use futures::{
channel::{mpsc, oneshot},
- Future, FutureExt, SinkExt, StreamExt,
+ FutureExt, SinkExt, StreamExt,
};
+use error::{Error, FatalResult};
use polkadot_node_primitives::{
AvailableData, InvalidCandidate, PoV, SignedDisputeStatement, SignedFullStatement, Statement,
ValidationResult, BACKING_EXECUTION_TIMEOUT,
};
use polkadot_node_subsystem_util::{
self as util, request_from_runtime, request_session_index_for_child, request_validator_groups,
- request_validators, FromJobCommand, JobSender, Validator,
+ request_validators, Validator,
};
use polkadot_primitives::v2::{
BackedCandidate, CandidateCommitments, CandidateHash, CandidateReceipt, CollatorId,
@@ -49,9 +49,10 @@ use polkadot_subsystem::{
AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage,
CandidateBackingMessage, CandidateValidationMessage, CollatorProtocolMessage,
DisputeCoordinatorMessage, ProvisionableData, ProvisionerMessage, RuntimeApiRequest,
- StatementDistributionMessage, ValidationFailed,
+ StatementDistributionMessage,
},
- overseer, ActivatedLeaf, PerLeafSpan, Stage, SubsystemSender,
+ overseer, ActiveLeavesUpdate, FromOverseer, OverseerSignal, PerLeafSpan, SpawnedSubsystem,
+ Stage, SubsystemContext, SubsystemError, SubsystemSender,
};
use sp_keystore::SyncCryptoStorePtr;
use statement_table::{
@@ -62,7 +63,8 @@ use statement_table::{
},
Context as TableContextTrait, Table,
};
-use thiserror::Error;
+
+mod error;
mod metrics;
use self::metrics::Metrics;
@@ -72,33 +74,6 @@ mod tests;
const LOG_TARGET: &str = "parachain::candidate-backing";
-/// Errors that can occur in candidate backing.
-#[derive(Debug, Error)]
-pub enum Error {
- #[error("Candidate is not found")]
- CandidateNotFound,
- #[error("Signature is invalid")]
- InvalidSignature,
- #[error("Failed to send candidates {0:?}")]
- Send(Vec),
- #[error("FetchPoV failed")]
- FetchPoV,
- #[error("ValidateFromChainState channel closed before receipt")]
- ValidateFromChainState(#[source] oneshot::Canceled),
- #[error("StoreAvailableData channel closed before receipt")]
- StoreAvailableData(#[source] oneshot::Canceled),
- #[error("a channel was closed before receipt in try_join!")]
- JoinMultiple(#[source] oneshot::Canceled),
- #[error("Obtaining erasure chunks failed")]
- ObtainErasureChunks(#[from] erasure_coding::Error),
- #[error(transparent)]
- ValidationFailed(#[from] ValidationFailed),
- #[error(transparent)]
- BackgroundValidationMpsc(#[from] mpsc::SendError),
- #[error(transparent)]
- UtilError(#[from] util::Error),
-}
-
/// PoV data to validate.
enum PoVData {
/// Already available (from candidate selection).
@@ -143,8 +118,313 @@ impl ValidatedCandidateCommand {
}
}
+/// The candidate backing subsystem.
+pub struct CandidateBackingSubsystem {
+ keystore: SyncCryptoStorePtr,
+ metrics: Metrics,
+}
+
+impl CandidateBackingSubsystem {
+ /// Create a new instance of the `CandidateBackingSubsystem`.
+ pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self {
+ Self { keystore, metrics }
+ }
+}
+
+impl overseer::Subsystem for CandidateBackingSubsystem
+where
+ Context: SubsystemContext,
+ Context: overseer::SubsystemContext,
+{
+ fn start(self, ctx: Context) -> SpawnedSubsystem {
+ let future = async move {
+ run(ctx, self.keystore, self.metrics)
+ .await
+ .map_err(|e| SubsystemError::with_origin("candidate-backing", e))
+ }
+ .boxed();
+
+ SpawnedSubsystem { name: "candidate-backing-subsystem", future }
+ }
+}
+
+async fn run(
+ mut ctx: Context,
+ keystore: SyncCryptoStorePtr,
+ metrics: Metrics,
+) -> FatalResult<()>
+where
+ Context: SubsystemContext,
+ Context: overseer::SubsystemContext,
+{
+ let (background_validation_tx, mut background_validation_rx) = mpsc::channel(16);
+ let mut jobs = HashMap::new();
+
+ loop {
+ let res = run_iteration(
+ &mut ctx,
+ keystore.clone(),
+ &metrics,
+ &mut jobs,
+ background_validation_tx.clone(),
+ &mut background_validation_rx,
+ )
+ .await;
+
+ match res {
+ Ok(()) => break,
+ Err(e) => crate::error::log_error(Err(e))?,
+ }
+ }
+
+ Ok(())
+}
+
+async fn run_iteration(
+ ctx: &mut Context,
+ keystore: SyncCryptoStorePtr,
+ metrics: &Metrics,
+ jobs: &mut HashMap>,
+ background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
+ background_validation_rx: &mut mpsc::Receiver<(Hash, ValidatedCandidateCommand)>,
+) -> Result<(), Error>
+where
+ Context: SubsystemContext,
+ Context: overseer::SubsystemContext,
+{
+ loop {
+ futures::select!(
+ validated_command = background_validation_rx.next().fuse() => {
+ if let Some((relay_parent, command)) = validated_command {
+ handle_validated_candidate_command(
+ &mut *ctx,
+ jobs,
+ relay_parent,
+ command,
+ ).await?;
+ } else {
+ panic!("background_validation_tx always alive at this point; qed");
+ }
+ }
+ from_overseer = ctx.recv().fuse() => {
+ match from_overseer? {
+ FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => handle_active_leaves_update(
+ &mut *ctx,
+ update,
+ jobs,
+ &keystore,
+ &background_validation_tx,
+ &metrics,
+ ).await?,
+ FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {}
+ FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()),
+ FromOverseer::Communication { msg } => handle_communication(&mut *ctx, jobs, msg).await?,
+ }
+ }
+ )
+ }
+}
+
+async fn handle_validated_candidate_command(
+ ctx: &mut Context,
+ jobs: &mut HashMap>,
+ relay_parent: Hash,
+ command: ValidatedCandidateCommand,
+) -> Result<(), Error>
+where
+ Context: SubsystemContext,
+ Context: overseer::SubsystemContext,
+{
+ if let Some(job) = jobs.get_mut(&relay_parent) {
+ job.job.handle_validated_candidate_command(&job.span, ctx, command).await?;
+ } else {
+ // simple race condition; can be ignored - this relay-parent
+ // is no longer relevant.
+ }
+
+ Ok(())
+}
+
+async fn handle_communication(
+ ctx: &mut Context,
+ jobs: &mut HashMap>,
+ message: CandidateBackingMessage,
+) -> Result<(), Error>
+where
+ Context: SubsystemContext,
+ Context: overseer::SubsystemContext,
+{
+ match message {
+ CandidateBackingMessage::Second(relay_parent, candidate, pov) => {
+ if let Some(job) = jobs.get_mut(&relay_parent) {
+ job.job.handle_second_msg(&job.span, ctx, candidate, pov).await?;
+ }
+ },
+ CandidateBackingMessage::Statement(relay_parent, statement) => {
+ if let Some(job) = jobs.get_mut(&relay_parent) {
+ job.job.handle_statement_message(&job.span, ctx, statement).await?;
+ }
+ },
+ CandidateBackingMessage::GetBackedCandidates(relay_parent, requested_candidates, tx) =>
+ if let Some(job) = jobs.get_mut(&relay_parent) {
+ job.job.handle_get_backed_candidates_message(requested_candidates, tx)?;
+ },
+ }
+
+ Ok(())
+}
+
+async fn handle_active_leaves_update(
+ ctx: &mut Context,
+ update: ActiveLeavesUpdate,
+ jobs: &mut HashMap>,
+ keystore: &SyncCryptoStorePtr,
+ background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
+ metrics: &Metrics,
+) -> Result<(), Error>
+where
+ Context: SubsystemContext,
+ Context: overseer::SubsystemContext,
+{
+ for deactivated in update.deactivated {
+ jobs.remove(&deactivated);
+ }
+
+ let leaf = match update.activated {
+ None => return Ok(()),
+ Some(a) => a,
+ };
+
+ macro_rules! try_runtime_api {
+ ($x: expr) => {
+ match $x {
+ Ok(x) => x,
+ Err(e) => {
+ gum::warn!(
+ target: LOG_TARGET,
+ err = ?e,
+ "Failed to fetch runtime API data for job",
+ );
+
+ // We can't do candidate validation work if we don't have the
+ // requisite runtime API data. But these errors should not take
+ // down the node.
+ return Ok(());
+ }
+ }
+ }
+ }
+
+ let parent = leaf.hash;
+ let span = PerLeafSpan::new(leaf.span, "backing");
+ let _span = span.child("runtime-apis");
+
+ let (validators, groups, session_index, cores) = futures::try_join!(
+ request_validators(parent, ctx.sender()).await,
+ request_validator_groups(parent, ctx.sender()).await,
+ request_session_index_for_child(parent, ctx.sender()).await,
+ request_from_runtime(parent, ctx.sender(), |tx| {
+ RuntimeApiRequest::AvailabilityCores(tx)
+ },)
+ .await,
+ )
+ .map_err(Error::JoinMultiple)?;
+
+ let validators: Vec<_> = try_runtime_api!(validators);
+ let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
+ let session_index = try_runtime_api!(session_index);
+ let cores = try_runtime_api!(cores);
+
+ drop(_span);
+ let _span = span.child("validator-construction");
+
+ let signing_context = SigningContext { parent_hash: parent, session_index };
+ let validator =
+ match Validator::construct(&validators, signing_context.clone(), keystore.clone()).await {
+ Ok(v) => Some(v),
+ Err(util::Error::NotAValidator) => None,
+ Err(e) => {
+ gum::warn!(
+ target: LOG_TARGET,
+ err = ?e,
+ "Cannot participate in candidate backing",
+ );
+
+ return Ok(())
+ },
+ };
+
+ drop(_span);
+ let mut assignments_span = span.child("compute-assignments");
+
+ let mut groups = HashMap::new();
+
+ let n_cores = cores.len();
+
+ let mut assignment = None;
+
+ for (idx, core) in cores.into_iter().enumerate() {
+ // Ignore prospective assignments on occupied cores for the time being.
+ if let CoreState::Scheduled(scheduled) = core {
+ let core_index = CoreIndex(idx as _);
+ let group_index = group_rotation_info.group_for_core(core_index, n_cores);
+ if let Some(g) = validator_groups.get(group_index.0 as usize) {
+ if validator.as_ref().map_or(false, |v| g.contains(&v.index())) {
+ assignment = Some((scheduled.para_id, scheduled.collator));
+ }
+ groups.insert(scheduled.para_id, g.clone());
+ }
+ }
+ }
+
+ let table_context = TableContext { groups, validators, validator };
+
+ let (assignment, required_collator) = match assignment {
+ None => {
+ assignments_span.add_string_tag("assigned", "false");
+ (None, None)
+ },
+ Some((assignment, required_collator)) => {
+ assignments_span.add_string_tag("assigned", "true");
+ assignments_span.add_para_id(assignment);
+ (Some(assignment), required_collator)
+ },
+ };
+
+ drop(assignments_span);
+ let _span = span.child("wait-for-job");
+
+ let job = CandidateBackingJob {
+ parent,
+ session_index,
+ assignment,
+ required_collator,
+ issued_statements: HashSet::new(),
+ awaiting_validation: HashSet::new(),
+ fallbacks: HashMap::new(),
+ seconded: None,
+ unbacked_candidates: HashMap::new(),
+ backed: HashSet::new(),
+ keystore: keystore.clone(),
+ table: Table::default(),
+ table_context,
+ background_validation_tx: background_validation_tx.clone(),
+ metrics: metrics.clone(),
+ _marker: std::marker::PhantomData,
+ };
+
+ jobs.insert(parent, JobAndSpan { job, span });
+
+ Ok(())
+}
+
+struct JobAndSpan {
+ job: CandidateBackingJob,
+ span: PerLeafSpan,
+}
+
/// Holds all data needed for candidate backing job operation.
-pub struct CandidateBackingJob {
+struct CandidateBackingJob {
/// The hash of the relay parent on top of which this job is doing it's work.
parent: Hash,
/// The session index this corresponds to.
@@ -169,9 +449,9 @@ pub struct CandidateBackingJob {
keystore: SyncCryptoStorePtr,
table: Table,
table_context: TableContext,
- background_validation: mpsc::Receiver,
- background_validation_tx: mpsc::Sender,
+ background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
metrics: Metrics,
+ _marker: std::marker::PhantomData,
}
/// In case a backing validator does not provide a PoV, we need to retry with other backing
@@ -298,19 +578,22 @@ fn table_attested_to_backed(
}
async fn store_available_data(
- sender: &mut JobSender,
+ sender: &mut impl SubsystemSender,
n_validators: u32,
candidate_hash: CandidateHash,
available_data: AvailableData,
) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
sender
- .send_message(AvailabilityStoreMessage::StoreAvailableData {
- candidate_hash,
- n_validators,
- available_data,
- tx,
- })
+ .send_message(
+ AvailabilityStoreMessage::StoreAvailableData {
+ candidate_hash,
+ n_validators,
+ available_data,
+ tx,
+ }
+ .into(),
+ )
.await;
let _ = rx.await.map_err(Error::StoreAvailableData)?;
@@ -323,7 +606,7 @@ async fn store_available_data(
// This will compute the erasure root internally and compare it to the expected erasure root.
// This returns `Err()` iff there is an internal error. Otherwise, it returns either `Ok(Ok(()))` or `Ok(Err(_))`.
async fn make_pov_available(
- sender: &mut JobSender,
+ sender: &mut impl SubsystemSender,
n_validators: usize,
pov: Arc,
candidate_hash: CandidateHash,
@@ -356,7 +639,7 @@ async fn make_pov_available(
}
async fn request_pov(
- sender: &mut JobSender,
+ sender: &mut impl SubsystemSender,
relay_parent: Hash,
from_validator: ValidatorIndex,
candidate_hash: CandidateHash,
@@ -364,13 +647,16 @@ async fn request_pov(
) -> Result, Error> {
let (tx, rx) = oneshot::channel();
sender
- .send_message(AvailabilityDistributionMessage::FetchPoV {
- relay_parent,
- from_validator,
- candidate_hash,
- pov_hash,
- tx,
- })
+ .send_message(
+ AvailabilityDistributionMessage::FetchPoV {
+ relay_parent,
+ from_validator,
+ candidate_hash,
+ pov_hash,
+ tx,
+ }
+ .into(),
+ )
.await;
let pov = rx.await.map_err(|_| Error::FetchPoV)?;
@@ -378,19 +664,22 @@ async fn request_pov(
}
async fn request_candidate_validation(
- sender: &mut JobSender,
+ sender: &mut impl SubsystemSender,
candidate_receipt: CandidateReceipt,
pov: Arc,
) -> Result {
let (tx, rx) = oneshot::channel();
sender
- .send_message(CandidateValidationMessage::ValidateFromChainState(
- candidate_receipt,
- pov,
- BACKING_EXECUTION_TIMEOUT,
- tx,
- ))
+ .send_message(
+ CandidateValidationMessage::ValidateFromChainState(
+ candidate_receipt,
+ pov,
+ BACKING_EXECUTION_TIMEOUT,
+ tx,
+ )
+ .into(),
+ )
.await;
match rx.await {
@@ -404,8 +693,8 @@ type BackgroundValidationResult =
Result<(CandidateReceipt, CandidateCommitments, Arc), CandidateReceipt>;
struct BackgroundValidationParams, F> {
- sender: JobSender,
- tx_command: mpsc::Sender,
+ sender: S,
+ tx_command: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
candidate: CandidateReceipt,
relay_parent: Hash,
pov: PoVData,
@@ -440,7 +729,10 @@ async fn validate_and_make_available(
{
Err(Error::FetchPoV) => {
tx_command
- .send(ValidatedCandidateCommand::AttestNoPoV(candidate.hash()))
+ .send((
+ relay_parent,
+ ValidatedCandidateCommand::AttestNoPoV(candidate.hash()),
+ ))
.await
.map_err(Error::BackgroundValidationMpsc)?;
return Ok(())
@@ -512,48 +804,20 @@ async fn validate_and_make_available(
},
};
- tx_command.send(make_command(res)).await.map_err(Into::into)
+ tx_command.send((relay_parent, make_command(res))).await.map_err(Into::into)
}
struct ValidatorIndexOutOfBounds;
-impl CandidateBackingJob {
- /// Run asynchronously.
- async fn run_loop(
- mut self,
- mut sender: JobSender,
- mut rx_to: mpsc::Receiver,
- span: PerLeafSpan,
- ) -> Result<(), Error> {
- loop {
- futures::select! {
- validated_command = self.background_validation.next() => {
- let _span = span.child("process-validation-result");
- if let Some(c) = validated_command {
- self.handle_validated_candidate_command(&span, &mut sender, c).await?;
- } else {
- panic!("`self` hasn't dropped and `self` holds a reference to this sender; qed");
- }
- }
- to_job = rx_to.next() => match to_job {
- None => break,
- Some(msg) => {
- // we intentionally want spans created in `process_msg` to descend from the
- // `span ` which is longer-lived than this ephemeral timing span.
- let _timing_span = span.child("process-message");
- self.process_msg(&span, &mut sender, msg).await?;
- }
- }
- }
- }
-
- Ok(())
- }
-
+impl CandidateBackingJob
+where
+ Context: SubsystemContext,
+ Context: overseer::SubsystemContext,
+{
async fn handle_validated_candidate_command(
&mut self,
root_span: &jaeger::Span,
- sender: &mut JobSender,
+ ctx: &mut Context,
command: ValidatedCandidateCommand,
) -> Result<(), Error> {
let candidate_hash = command.candidate_hash();
@@ -576,21 +840,19 @@ impl CandidateBackingJob {
commitments,
});
if let Some(stmt) = self
- .sign_import_and_distribute_statement(sender, statement, root_span)
+ .sign_import_and_distribute_statement(ctx, statement, root_span)
.await?
{
- sender
- .send_message(CollatorProtocolMessage::Seconded(
- self.parent,
- stmt,
- ))
- .await;
+ ctx.send_message(CollatorProtocolMessage::Seconded(
+ self.parent,
+ stmt,
+ ))
+ .await;
}
}
},
Err(candidate) => {
- sender
- .send_message(CollatorProtocolMessage::Invalid(self.parent, candidate))
+ ctx.send_message(CollatorProtocolMessage::Invalid(self.parent, candidate))
.await;
},
}
@@ -602,7 +864,7 @@ impl CandidateBackingJob {
if !self.issued_statements.contains(&candidate_hash) {
if res.is_ok() {
let statement = Statement::Valid(candidate_hash);
- self.sign_import_and_distribute_statement(sender, statement, &root_span)
+ self.sign_import_and_distribute_statement(ctx, statement, &root_span)
.await?;
}
self.issued_statements.insert(candidate_hash);
@@ -615,7 +877,7 @@ impl CandidateBackingJob {
// Ok, another try:
let c_span = span.as_ref().map(|s| s.child("try"));
let attesting = attesting.clone();
- self.kick_off_validation_work(sender, attesting, c_span).await?
+ self.kick_off_validation_work(ctx, attesting, c_span).await?
}
} else {
gum::warn!(
@@ -632,7 +894,7 @@ impl CandidateBackingJob {
async fn background_validate_and_make_available(
&mut self,
- sender: &mut JobSender,
+ ctx: &mut Context,
params: BackgroundValidationParams<
impl SubsystemSender,
impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Send + 'static + Sync,
@@ -658,9 +920,9 @@ impl CandidateBackingJob {
}
}
};
- sender
- .send_command(FromJobCommand::Spawn("backing-validation", bg.boxed()))
- .await?;
+
+ ctx.spawn("backing-validation", bg.boxed())
+ .map_err(|_| Error::FailedToSpawnBackgroundTask)?;
}
Ok(())
@@ -671,7 +933,7 @@ impl CandidateBackingJob {
&mut self,
parent_span: &jaeger::Span,
root_span: &jaeger::Span,
- sender: &mut JobSender,
+ ctx: &mut Context,
candidate: &CandidateReceipt,
pov: Arc,
) -> Result<(), Error> {
@@ -681,8 +943,7 @@ impl CandidateBackingJob {
.as_ref()
.map_or(false, |c| c != &candidate.descriptor().collator)
{
- sender
- .send_message(CollatorProtocolMessage::Invalid(self.parent, candidate.clone()))
+ ctx.send_message(CollatorProtocolMessage::Invalid(self.parent, candidate.clone()))
.await;
return Ok(())
}
@@ -703,9 +964,9 @@ impl CandidateBackingJob {
"Validate and second candidate",
);
- let bg_sender = sender.clone();
+ let bg_sender = ctx.sender().clone();
self.background_validate_and_make_available(
- sender,
+ ctx,
BackgroundValidationParams {
sender: bg_sender,
tx_command: self.background_validation_tx.clone(),
@@ -724,14 +985,14 @@ impl CandidateBackingJob {
async fn sign_import_and_distribute_statement(
&mut self,
- sender: &mut JobSender,
+ ctx: &mut Context,
statement: Statement,
root_span: &jaeger::Span,
) -> Result