diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index f504904100765..8ed9c1ee50378 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -220,7 +220,7 @@ pub fn new_full(mut config: Configuration) -> Result let slot_duration = sc_consensus_aura::slot_duration(&*client)?; let raw_slot_duration = slot_duration.slot_duration(); - let aura = sc_consensus_aura::start_aura::( + let aura = sc_consensus_aura::start_aura::( StartAuraParams { slot_duration, client: client.clone(), @@ -243,6 +243,7 @@ pub fn new_full(mut config: Configuration) -> Result keystore: keystore_container.sync_keystore(), can_author_with, sync_oracle: network.clone(), + justification_sync_link: network.clone(), block_proposal_slot_portion: SlotProportion::new(2f32 / 3f32), telemetry: telemetry.as_ref().map(|x| x.handle()), }, diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index 42020e6668e42..a9ac2ac8065f9 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -308,6 +308,7 @@ pub fn new_full_base( env: proposer, block_import, sync_oracle: network.clone(), + justification_sync_link: network.clone(), create_inherent_data_providers: move |parent, ()| { let client_clone = client_clone.clone(); async move { diff --git a/client/consensus/aura/src/lib.rs b/client/consensus/aura/src/lib.rs index 623096cd5c640..702e4dc0bf1bd 100644 --- a/client/consensus/aura/src/lib.rs +++ b/client/consensus/aura/src/lib.rs @@ -109,7 +109,7 @@ fn slot_author(slot: Slot, authorities: &[AuthorityId

]) -> Option<&A } /// Parameters of [`start_aura`]. -pub struct StartAuraParams { +pub struct StartAuraParams { /// The duration of a slot. pub slot_duration: SlotDuration, /// The client to interact with the chain. @@ -122,8 +122,10 @@ pub struct StartAuraParams { pub proposer_factory: PF, /// The sync oracle that can give us the current sync status. pub sync_oracle: SO, + /// Hook into the sync module to control the justification sync process. + pub justification_sync_link: L, /// Something that can create the inherent data providers. - pub create_inherent_data_providers: IDP, + pub create_inherent_data_providers: CIDP, /// Should we force the authoring of blocks? pub force_authoring: bool, /// The backoff strategy when we miss slots. @@ -143,7 +145,7 @@ pub struct StartAuraParams { } /// Start the aura worker. The returned future should be run in a futures executor. -pub fn start_aura( +pub fn start_aura( StartAuraParams { slot_duration, client, @@ -151,6 +153,7 @@ pub fn start_aura( block_import, proposer_factory, sync_oracle, + justification_sync_link, create_inherent_data_providers, force_authoring, backoff_authoring_blocks, @@ -158,31 +161,33 @@ pub fn start_aura( can_author_with, block_proposal_slot_portion, telemetry, - }: StartAuraParams, + }: StartAuraParams, ) -> Result, sp_consensus::Error> where + P: Pair + Send + Sync, + P::Public: AppPublic + Hash + Member + Encode + Decode, + P::Signature: TryFrom> + Hash + Member + Encode + Decode, B: BlockT, C: ProvideRuntimeApi + BlockOf + ProvideCache + AuxStore + HeaderBackend + Send + Sync, C::Api: AuraApi>, SC: SelectChain, + I: BlockImport> + Send + Sync + 'static, PF: Environment + Send + Sync + 'static, PF::Proposer: Proposer>, - P: Pair + Send + Sync, - P::Public: AppPublic + Hash + Member + Encode + Decode, - P::Signature: TryFrom> + Hash + Member + Encode + Decode, - I: BlockImport> + Send + Sync + 'static, - Error: std::error::Error + Send + From + 'static, SO: SyncOracle + Send + Sync + Clone, - CAW: CanAuthorWith + Send, + L: sp_consensus::JustificationSyncLink, + CIDP: CreateInherentDataProviders + Send, + CIDP::InherentDataProviders: InherentDataProviderExt + Send, BS: BackoffAuthoringBlocksStrategy> + Send + 'static, - IDP: CreateInherentDataProviders + Send, - IDP::InherentDataProviders: InherentDataProviderExt + Send, + CAW: CanAuthorWith + Send, + Error: std::error::Error + Send + From + 'static, { - let worker = build_aura_worker::(BuildAuraWorkerParams { + let worker = build_aura_worker::(BuildAuraWorkerParams { client: client.clone(), block_import, proposer_factory, keystore, sync_oracle: sync_oracle.clone(), + justification_sync_link, force_authoring, backoff_authoring_blocks, telemetry, @@ -200,7 +205,7 @@ pub fn start_aura( } /// Parameters of [`build_aura_worker`]. -pub struct BuildAuraWorkerParams { +pub struct BuildAuraWorkerParams { /// The client to interact with the chain. pub client: Arc, /// The block import. @@ -209,6 +214,8 @@ pub struct BuildAuraWorkerParams { pub proposer_factory: PF, /// The sync oracle that can give us the current sync status. pub sync_oracle: SO, + /// Hook into the sync module to control the justification sync process. + pub justification_sync_link: L, /// Should we force the authoring of blocks? pub force_authoring: bool, /// The backoff strategy when we miss slots. @@ -228,18 +235,19 @@ pub struct BuildAuraWorkerParams { /// Build the aura worker. /// /// The caller is responsible for running this worker, otherwise it will do nothing. -pub fn build_aura_worker( +pub fn build_aura_worker( BuildAuraWorkerParams { client, block_import, proposer_factory, sync_oracle, + justification_sync_link, backoff_authoring_blocks, keystore, block_proposal_slot_portion, telemetry, force_authoring, - }: BuildAuraWorkerParams, + }: BuildAuraWorkerParams, ) -> impl sc_consensus_slots::SlotWorker>::Proof> where B: BlockT, C: ProvideRuntimeApi + BlockOf + ProvideCache + AuxStore + HeaderBackend + Send + Sync, @@ -252,6 +260,7 @@ pub fn build_aura_worker( I: BlockImport> + Send + Sync + 'static, Error: std::error::Error + Send + From + 'static, SO: SyncOracle + Send + Sync + Clone, + L: sp_consensus::JustificationSyncLink, BS: BackoffAuthoringBlocksStrategy> + Send + 'static, { AuraWorker { @@ -260,6 +269,7 @@ pub fn build_aura_worker( env: proposer_factory, keystore, sync_oracle, + justification_sync_link, force_authoring, backoff_authoring_blocks, telemetry, @@ -268,12 +278,13 @@ pub fn build_aura_worker( } } -struct AuraWorker { +struct AuraWorker { client: Arc, block_import: I, env: E, keystore: SyncCryptoStorePtr, sync_oracle: SO, + justification_sync_link: L, force_authoring: bool, backoff_authoring_blocks: Option, block_proposal_slot_portion: SlotProportion, @@ -281,8 +292,8 @@ struct AuraWorker { _key_type: PhantomData

, } -impl sc_consensus_slots::SimpleSlotWorker - for AuraWorker +impl sc_consensus_slots::SimpleSlotWorker + for AuraWorker where B: BlockT, C: ProvideRuntimeApi + BlockOf + ProvideCache + HeaderBackend + Sync, @@ -294,11 +305,13 @@ where P::Public: AppPublic + Public + Member + Encode + Decode + Hash, P::Signature: TryFrom> + Member + Encode + Decode + Hash + Debug, SO: SyncOracle + Send + Clone, + L: sp_consensus::JustificationSyncLink, BS: BackoffAuthoringBlocksStrategy> + Send + 'static, Error: std::error::Error + Send + From + 'static, { type BlockImport = I; type SyncOracle = SO; + type JustificationSyncLink = L; type CreateProposer = Pin> + Send + 'static >>; @@ -425,6 +438,10 @@ where &mut self.sync_oracle } + fn justification_sync_link(&mut self) -> &mut Self::JustificationSyncLink { + &mut self.justification_sync_link + } + fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer { Box::pin(self.env.init(block).map_err(|e| { sp_consensus::Error::ClientImport(format!("{:?}", e)).into() @@ -725,13 +742,14 @@ mod tests { let slot_duration = slot_duration(&*client).expect("slot duration available"); - aura_futures.push(start_aura::(StartAuraParams { + aura_futures.push(start_aura::(StartAuraParams { slot_duration, block_import: client.clone(), select_chain, client, proposer_factory: environ, sync_oracle: DummyOracle, + justification_sync_link: (), create_inherent_data_providers: |_, _| async { let timestamp = TimestampInherentDataProvider::from_system_time(); let slot = InherentDataProvider::from_timestamp_and_duration( @@ -804,6 +822,7 @@ mod tests { env: environ, keystore: keystore.into(), sync_oracle: DummyOracle.clone(), + justification_sync_link: (), force_authoring: false, backoff_authoring_blocks: Some(BackoffAuthoringOnFinalizedHeadLagging::default()), telemetry: None, @@ -853,6 +872,7 @@ mod tests { env: environ, keystore: keystore.into(), sync_oracle: DummyOracle.clone(), + justification_sync_link: (), force_authoring: false, backoff_authoring_blocks: Option::<()>::None, telemetry: None, @@ -871,7 +891,7 @@ mod tests { duration: Duration::from_millis(1000), chain_head: head, block_size_limit: None, - }, + } )).unwrap(); // The returned block should be imported and we should be able to get its header by now. diff --git a/client/consensus/babe/src/lib.rs b/client/consensus/babe/src/lib.rs index 0b02bbbe14106..409999ef1fdca 100644 --- a/client/consensus/babe/src/lib.rs +++ b/client/consensus/babe/src/lib.rs @@ -363,7 +363,7 @@ impl std::ops::Deref for Config { } /// Parameters for BABE. -pub struct BabeParams { +pub struct BabeParams { /// The keystore that manages the keys of the node. pub keystore: SyncCryptoStorePtr, @@ -384,8 +384,11 @@ pub struct BabeParams { /// A sync oracle pub sync_oracle: SO, + /// Hook into the sync module to control the justification sync process. + pub justification_sync_link: L, + /// Something that can create the inherent data providers. - pub create_inherent_data_providers: IDP, + pub create_inherent_data_providers: CIDP, /// Force authoring of blocks even if we are offline pub force_authoring: bool, @@ -411,13 +414,14 @@ pub struct BabeParams { } /// Start the babe worker. -pub fn start_babe(BabeParams { +pub fn start_babe(BabeParams { keystore, client, select_chain, env, block_import, sync_oracle, + justification_sync_link, create_inherent_data_providers, force_authoring, backoff_authoring_blocks, @@ -425,26 +429,35 @@ pub fn start_babe(BabeParams { can_author_with, block_proposal_slot_portion, telemetry, -}: BabeParams) -> Result< +}: BabeParams) -> Result< BabeWorker, sp_consensus::Error, > where B: BlockT, - C: ProvideRuntimeApi + ProvideCache + ProvideUncles + BlockchainEvents - + HeaderBackend + HeaderMetadata - + Send + Sync + 'static, + C: ProvideRuntimeApi + + ProvideCache + + ProvideUncles + + BlockchainEvents + + HeaderBackend + + HeaderMetadata + + Send + + Sync + + 'static, C::Api: BabeApi, SC: SelectChain + 'static, E: Environment + Send + Sync + 'static, E::Proposer: Proposer>, - I: BlockImport> + Send - + Sync + 'static, - Error: std::error::Error + Send + From + From + 'static, + I: BlockImport> + + Send + + Sync + + 'static, SO: SyncOracle + Send + Sync + Clone + 'static, - CAW: CanAuthorWith + Send + Sync + 'static, + L: sp_consensus::JustificationSyncLink + 'static, + CIDP: CreateInherentDataProviders + Send + Sync + 'static, + CIDP::InherentDataProviders: InherentDataProviderExt + Send, BS: BackoffAuthoringBlocksStrategy> + Send + 'static, - IDP: CreateInherentDataProviders + Send + Sync + 'static, - IDP::InherentDataProviders: InherentDataProviderExt + Send, + CAW: CanAuthorWith + Send + Sync + 'static, + Error: std::error::Error + Send + From + From + 'static, { const HANDLE_BUFFER_SIZE: usize = 1024; @@ -456,6 +469,7 @@ pub fn start_babe(BabeParams { block_import, env, sync_oracle: sync_oracle.clone(), + justification_sync_link, force_authoring, backoff_authoring_blocks, keystore, @@ -600,11 +614,12 @@ type SlotNotificationSinks = Arc< Mutex::Hash, NumberFor, Epoch>)>>> >; -struct BabeSlotWorker { +struct BabeSlotWorker { client: Arc, block_import: I, env: E, sync_oracle: SO, + justification_sync_link: L, force_authoring: bool, backoff_authoring_blocks: Option, keystore: SyncCryptoStorePtr, @@ -615,8 +630,8 @@ struct BabeSlotWorker { telemetry: Option, } -impl sc_consensus_slots::SimpleSlotWorker - for BabeSlotWorker +impl sc_consensus_slots::SimpleSlotWorker + for BabeSlotWorker where B: BlockT, C: ProvideRuntimeApi + @@ -628,12 +643,14 @@ where E::Proposer: Proposer>, I: BlockImport> + Send + Sync + 'static, SO: SyncOracle + Send + Clone, + L: sp_consensus::JustificationSyncLink, BS: BackoffAuthoringBlocksStrategy>, Error: std::error::Error + Send + From + From + 'static, { type EpochData = ViableEpochDescriptor, Epoch>; type Claim = (PreDigest, AuthorityId); type SyncOracle = SO; + type JustificationSyncLink = L; type CreateProposer = Pin> + Send + 'static >>; @@ -798,6 +815,10 @@ where &mut self.sync_oracle } + fn justification_sync_link(&mut self) -> &mut Self::JustificationSyncLink { + &mut self.justification_sync_link + } + fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer { Box::pin(self.env.init(block).map_err(|e| { sp_consensus::Error::ClientImport(format!("{:?}", e)) diff --git a/client/consensus/babe/src/tests.rs b/client/consensus/babe/src/tests.rs index d042f25399ee4..467de9683c689 100644 --- a/client/consensus/babe/src/tests.rs +++ b/client/consensus/babe/src/tests.rs @@ -390,9 +390,7 @@ fn rejects_empty_block() { }) } -fn run_one_test( - mutator: impl Fn(&mut TestHeader, Stage) + Send + Sync + 'static, -) { +fn run_one_test(mutator: impl Fn(&mut TestHeader, Stage) + Send + Sync + 'static) { sp_tracing::try_init_simple(); let mutator = Arc::new(mutator) as Mutator; @@ -473,6 +471,7 @@ fn run_one_test( babe_link: data.link.clone(), keystore, can_author_with: sp_consensus::AlwaysCanAuthor, + justification_sync_link: (), block_proposal_slot_portion: SlotProportion::new(0.5), telemetry: None, }).expect("Starts babe")); diff --git a/client/consensus/pow/src/lib.rs b/client/consensus/pow/src/lib.rs index 17cdae48cdb67..6688c14b6375d 100644 --- a/client/consensus/pow/src/lib.rs +++ b/client/consensus/pow/src/lib.rs @@ -527,20 +527,21 @@ pub fn import_queue( /// /// `pre_runtime` is a parameter that allows a custom additional pre-runtime digest to be inserted /// for blocks being built. This can encode authorship information, or just be a graffiti. -pub fn start_mining_worker( +pub fn start_mining_worker( block_import: BoxBlockImport>, client: Arc, select_chain: S, algorithm: Algorithm, mut env: E, mut sync_oracle: SO, + justification_sync_link: L, pre_runtime: Option>, create_inherent_data_providers: CIDP, timeout: Duration, build_time: Duration, can_author_with: CAW, ) -> ( - Arc>::Proof>>>, + Arc>::Proof>>>, impl Future, ) where Block: BlockT, @@ -552,14 +553,16 @@ pub fn start_mining_worker( E::Error: std::fmt::Debug, E::Proposer: Proposer>, SO: SyncOracle + Clone + Send + Sync + 'static, - CAW: CanAuthorWith + Clone + Send + 'static, + L: sp_consensus::JustificationSyncLink, CIDP: CreateInherentDataProviders, + CAW: CanAuthorWith + Clone + Send + 'static, { let mut timer = UntilImportedOrTimeout::new(client.import_notification_stream(), timeout); - let worker = Arc::new(Mutex::new(MiningWorker:: { + let worker = Arc::new(Mutex::new(MiningWorker { build: None, algorithm: algorithm.clone(), block_import, + justification_sync_link, })); let worker_ret = worker.clone(); diff --git a/client/consensus/pow/src/worker.rs b/client/consensus/pow/src/worker.rs index 18844e51ce418..e5d76592b7fd1 100644 --- a/client/consensus/pow/src/worker.rs +++ b/client/consensus/pow/src/worker.rs @@ -18,8 +18,12 @@ use std::{pin::Pin, time::Duration, collections::HashMap, borrow::Cow}; use sc_client_api::ImportNotifications; -use sp_runtime::{DigestItem, traits::Block as BlockT, generic::BlockId}; use sp_consensus::{Proposal, BlockOrigin, BlockImportParams, import_queue::BoxBlockImport}; +use sp_runtime::{ + generic::BlockId, + traits::{Block as BlockT, Header as HeaderT}, + DigestItem, +}; use futures::{prelude::*, task::{Context, Poll}}; use futures_timer::Delay; use log::*; @@ -57,18 +61,22 @@ pub struct MiningWorker< Block: BlockT, Algorithm: PowAlgorithm, C: sp_api::ProvideRuntimeApi, - Proof + L: sp_consensus::JustificationSyncLink, + Proof, > { pub(crate) build: Option>, pub(crate) algorithm: Algorithm, pub(crate) block_import: BoxBlockImport>, + pub(crate) justification_sync_link: L, } -impl MiningWorker where +impl MiningWorker +where Block: BlockT, C: sp_api::ProvideRuntimeApi, Algorithm: PowAlgorithm, Algorithm::Difficulty: 'static + Send, + L: sp_consensus::JustificationSyncLink, sp_api::TransactionFor: Send + 'static, { /// Get the current best hash. `None` if the worker has just started or the client is doing @@ -139,8 +147,11 @@ impl MiningWorker where Box::new(intermediate) as Box<_>, ); + let header = import_block.post_header(); match self.block_import.import_block(import_block, HashMap::default()).await { - Ok(_) => { + Ok(res) => { + res.handle_justification(&header.hash(), *header.number(), &mut self.justification_sync_link); + info!( target: "pow", "✅ Successfully mined block on top of: {}", diff --git a/client/consensus/slots/src/lib.rs b/client/consensus/slots/src/lib.rs index cc879f769e47f..188aa52881a78 100644 --- a/client/consensus/slots/src/lib.rs +++ b/client/consensus/slots/src/lib.rs @@ -39,7 +39,9 @@ use futures_timer::Delay; use log::{debug, error, info, warn}; use sp_api::{ProvideRuntimeApi, ApiRef}; use sp_arithmetic::traits::BaseArithmetic; -use sp_consensus::{BlockImport, Proposer, SyncOracle, SelectChain, CanAuthorWith, SlotData}; +use sp_consensus::{ + BlockImport, CanAuthorWith, JustificationSyncLink, Proposer, SelectChain, SlotData, SyncOracle, +}; use sp_consensus_slots::Slot; use sp_inherents::CreateInherentDataProviders; use sp_runtime::{ @@ -92,6 +94,10 @@ pub trait SimpleSlotWorker { /// A handle to a `SyncOracle`. type SyncOracle: SyncOracle; + /// A handle to a `JustificationSyncLink`, allows hooking into the sync module to control the + /// justification sync process. + type JustificationSyncLink: JustificationSyncLink; + /// The type of future resolving to the proposer. type CreateProposer: Future> + Send + Unpin + 'static; @@ -178,6 +184,9 @@ pub trait SimpleSlotWorker { /// Returns a handle to a `SyncOracle`. fn sync_oracle(&mut self) -> &mut Self::SyncOracle; + /// Returns a handle to a `JustificationSyncLink`. + fn justification_sync_link(&mut self) -> &mut Self::JustificationSyncLink; + /// Returns a `Proposer` to author on top of the given block. fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer; @@ -392,27 +401,37 @@ pub trait SimpleSlotWorker { ); let header = block_import_params.post_header(); - if let Err(err) = block_import + match block_import .import_block(block_import_params, Default::default()) .await { - warn!( - target: logging_target, - "Error with block built on {:?}: {:?}", - parent_hash, - err, - ); + Ok(res) => { + res.handle_justification( + &header.hash(), + *header.number(), + self.justification_sync_link(), + ); + } + Err(err) => { + warn!( + target: logging_target, + "Error with block built on {:?}: {:?}", parent_hash, err, + ); - telemetry!( - telemetry; - CONSENSUS_WARN; - "slots.err_with_block_built_on"; - "hash" => ?parent_hash, - "err" => ?err, - ); + telemetry!( + telemetry; + CONSENSUS_WARN; + "slots.err_with_block_built_on"; + "hash" => ?parent_hash, + "err" => ?err, + ); + } } - Some(SlotResult { block: B::new(header, body), storage_proof }) + Some(SlotResult { + block: B::new(header, body), + storage_proof, + }) } } @@ -481,7 +500,7 @@ where /// /// Every time a new slot is triggered, `worker.on_slot` is called and the future it returns is /// polled until completion, unless we are major syncing. -pub async fn start_slot_worker( +pub async fn start_slot_worker( slot_duration: SlotDuration, client: C, mut worker: W, @@ -495,9 +514,9 @@ where W: SlotWorker, SO: SyncOracle + Send, T: SlotData + Clone, - CAW: CanAuthorWith + Send, CIDP: CreateInherentDataProviders + Send, CIDP::InherentDataProviders: InherentDataProviderExt + Send, + CAW: CanAuthorWith + Send, { let SlotDuration(slot_duration) = slot_duration; diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 6431250c96f3a..a3a490e097780 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -966,6 +966,11 @@ impl Protocol { self.sync.request_justification(&hash, number) } + /// Clear all pending justification requests. + pub fn clear_justification_requests(&mut self) { + self.sync.clear_justification_requests(); + } + /// Request syncing for the given block from given set of peers. /// Uses `protocol` to queue a new block download request and tries to dispatch all pending /// requests. diff --git a/client/network/src/protocol/sync.rs b/client/network/src/protocol/sync.rs index f1b744c89a995..7b7ac721b5b47 100644 --- a/client/network/src/protocol/sync.rs +++ b/client/network/src/protocol/sync.rs @@ -632,6 +632,11 @@ impl ChainSync { }) } + /// Clear all pending justification requests. + pub fn clear_justification_requests(&mut self) { + self.extra_justifications.reset(); + } + /// Request syncing for the given block from given set of peers. // The implementation is similar to on_block_announce with unknown parent hash. pub fn set_sync_fork_request( @@ -1117,7 +1122,7 @@ impl ChainSync { number, hash ); - self.extra_justifications.reset() + self.clear_justification_requests(); } if aux.needs_justification { diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 6351f03a393ed..666108363f640 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -976,6 +976,13 @@ impl NetworkService { .unbounded_send(ServiceToWorkerMsg::RequestJustification(*hash, number)); } + /// Clear all pending justification requests. + pub fn clear_justification_requests(&self) { + let _ = self + .to_worker + .unbounded_send(ServiceToWorkerMsg::ClearJustificationRequests); + } + /// Are we in the process of downloading the chain? pub fn is_major_syncing(&self) -> bool { self.is_major_syncing.load(Ordering::Relaxed) @@ -1219,6 +1226,16 @@ impl<'a, B: BlockT + 'static, H: ExHashT> sp_consensus::SyncOracle } } +impl sp_consensus::JustificationSyncLink for NetworkService { + fn request_justification(&self, hash: &B::Hash, number: NumberFor) { + NetworkService::request_justification(self, hash, number); + } + + fn clear_justification_requests(&self) { + NetworkService::clear_justification_requests(self); + } +} + impl NetworkStateInfo for NetworkService where B: sp_runtime::traits::Block, @@ -1323,6 +1340,7 @@ enum ServiceToWorkerMsg { PropagateTransaction(H), PropagateTransactions, RequestJustification(B::Hash, NumberFor), + ClearJustificationRequests, AnnounceBlock(B::Hash, Option>), GetValue(record::Key), PutValue(record::Key, Vec), @@ -1444,6 +1462,8 @@ impl Future for NetworkWorker { this.network_service.behaviour_mut().user_protocol_mut().announce_block(hash, data), ServiceToWorkerMsg::RequestJustification(hash, number) => this.network_service.behaviour_mut().user_protocol_mut().request_justification(&hash, number), + ServiceToWorkerMsg::ClearJustificationRequests => + this.network_service.behaviour_mut().user_protocol_mut().clear_justification_requests(), ServiceToWorkerMsg::PropagateTransaction(hash) => this.tx_handler_controller.propagate_transaction(hash), ServiceToWorkerMsg::PropagateTransactions => diff --git a/primitives/consensus/common/src/block_import.rs b/primitives/consensus/common/src/block_import.rs index 6e4fb98865015..31c3eb74457c3 100644 --- a/primitives/consensus/common/src/block_import.rs +++ b/primitives/consensus/common/src/block_import.rs @@ -68,6 +68,30 @@ impl ImportResult { ImportResult::Imported(aux) } + + /// Handles any necessary request for justifications (or clearing of pending requests) based on + /// the outcome of this block import. + pub fn handle_justification( + &self, + hash: &B::Hash, + number: NumberFor, + justification_sync_link: &mut dyn JustificationSyncLink, + ) where + B: BlockT, + { + match self { + ImportResult::Imported(aux) => { + if aux.clear_justification_requests { + justification_sync_link.clear_justification_requests(); + } + + if aux.needs_justification { + justification_sync_link.request_justification(hash, number); + } + } + _ => {} + } + } } /// Block data origin. @@ -354,3 +378,32 @@ pub trait JustificationImport { justification: Justification, ) -> Result<(), Self::Error>; } + +/// Control the synchronization process of block justifications. +/// +/// When importing blocks different consensus engines might require that +/// additional finality data is provided (i.e. a justification for the block). +/// This trait abstracts the required methods to issue those requests +pub trait JustificationSyncLink: Send + Sync { + /// Request a justification for the given block. + fn request_justification(&self, hash: &B::Hash, number: NumberFor); + + /// Clear all pending justification requests. + fn clear_justification_requests(&self); +} + +impl JustificationSyncLink for () { + fn request_justification(&self, _hash: &B::Hash, _number: NumberFor) {} + + fn clear_justification_requests(&self) {} +} + +impl> JustificationSyncLink for Arc { + fn request_justification(&self, hash: &B::Hash, number: NumberFor) { + L::request_justification(&*self, hash, number); + } + + fn clear_justification_requests(&self) { + L::clear_justification_requests(&*self); + } +} diff --git a/primitives/consensus/common/src/lib.rs b/primitives/consensus/common/src/lib.rs index 642b6b12e7d6f..37df7230fd62b 100644 --- a/primitives/consensus/common/src/lib.rs +++ b/primitives/consensus/common/src/lib.rs @@ -49,8 +49,8 @@ mod metrics; pub use self::error::Error; pub use block_import::{ - BlockImport, BlockOrigin, ForkChoiceStrategy, ImportedAux, BlockImportParams, BlockCheckParams, - ImportResult, JustificationImport, + BlockCheckParams, BlockImport, BlockImportParams, BlockOrigin, ForkChoiceStrategy, + ImportResult, ImportedAux, JustificationImport, JustificationSyncLink, }; pub use select_chain::SelectChain; pub use sp_state_machine::Backend as StateBackend;