Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

consensus: handle justification sync for blocks authored locally #8698

Merged
16 commits merged into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
let slot_duration = sc_consensus_aura::slot_duration(&*client)?;
let raw_slot_duration = slot_duration.slot_duration();

let aura = sc_consensus_aura::start_aura::<AuraPair, _, _, _, _, _, _, _, _, _, _>(
let aura = sc_consensus_aura::start_aura::<AuraPair, _, _, _, _, _, _, _, _, _, _, _>(
andresilva marked this conversation as resolved.
Show resolved Hide resolved
StartAuraParams {
slot_duration,
client: client.clone(),
Expand All @@ -243,6 +243,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
keystore: keystore_container.sync_keystore(),
can_author_with,
sync_oracle: network.clone(),
justification_sync_link: network.clone(),
block_proposal_slot_portion: SlotProportion::new(2f32 / 3f32),
telemetry: telemetry.as_ref().map(|x| x.handle()),
},
Expand Down
1 change: 1 addition & 0 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ pub fn new_full_base(
env: proposer,
block_import,
sync_oracle: network.clone(),
justification_sync_link: network.clone(),
create_inherent_data_providers: move |parent, ()| {
let client_clone = client_clone.clone();
async move {
Expand Down
62 changes: 41 additions & 21 deletions client/consensus/aura/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ fn slot_author<P: Pair>(slot: Slot, authorities: &[AuthorityId<P>]) -> Option<&A
}

/// Parameters of [`start_aura`].
pub struct StartAuraParams<C, SC, I, PF, SO, BS, CAW, IDP> {
pub struct StartAuraParams<C, SC, I, PF, SO, L, CIDP, BS, CAW> {
/// The duration of a slot.
pub slot_duration: SlotDuration,
/// The client to interact with the chain.
Expand All @@ -122,8 +122,10 @@ pub struct StartAuraParams<C, SC, I, PF, SO, BS, CAW, IDP> {
pub proposer_factory: PF,
/// The sync oracle that can give us the current sync status.
pub sync_oracle: SO,
/// Hook into the sync module to control the justification sync process.
pub justification_sync_link: L,
/// Something that can create the inherent data providers.
pub create_inherent_data_providers: IDP,
pub create_inherent_data_providers: CIDP,
/// Should we force the authoring of blocks?
pub force_authoring: bool,
/// The backoff strategy when we miss slots.
Expand All @@ -143,46 +145,49 @@ pub struct StartAuraParams<C, SC, I, PF, SO, BS, CAW, IDP> {
}

/// Start the aura worker. The returned future should be run in a futures executor.
pub fn start_aura<P, B, C, SC, PF, I, SO, CAW, BS, Error, IDP>(
pub fn start_aura<P, B, C, SC, I, PF, SO, L, CIDP, BS, CAW, Error>(
StartAuraParams {
slot_duration,
client,
select_chain,
block_import,
proposer_factory,
sync_oracle,
justification_sync_link,
create_inherent_data_providers,
force_authoring,
backoff_authoring_blocks,
keystore,
can_author_with,
block_proposal_slot_portion,
telemetry,
}: StartAuraParams<C, SC, I, PF, SO, BS, CAW, IDP>,
}: StartAuraParams<C, SC, I, PF, SO, L, CIDP, BS, CAW>,
) -> Result<impl Future<Output = ()>, sp_consensus::Error> where
P: Pair + Send + Sync,
P::Public: AppPublic + Hash + Member + Encode + Decode,
P::Signature: TryFrom<Vec<u8>> + Hash + Member + Encode + Decode,
B: BlockT,
C: ProvideRuntimeApi<B> + BlockOf + ProvideCache<B> + AuxStore + HeaderBackend<B> + Send + Sync,
C::Api: AuraApi<B, AuthorityId<P>>,
SC: SelectChain<B>,
I: BlockImport<B, Transaction = sp_api::TransactionFor<C, B>> + Send + Sync + 'static,
PF: Environment<B, Error = Error> + Send + Sync + 'static,
PF::Proposer: Proposer<B, Error = Error, Transaction = sp_api::TransactionFor<C, B>>,
P: Pair + Send + Sync,
P::Public: AppPublic + Hash + Member + Encode + Decode,
P::Signature: TryFrom<Vec<u8>> + Hash + Member + Encode + Decode,
I: BlockImport<B, Transaction = sp_api::TransactionFor<C, B>> + Send + Sync + 'static,
Error: std::error::Error + Send + From<sp_consensus::Error> + 'static,
SO: SyncOracle + Send + Sync + Clone,
CAW: CanAuthorWith<B> + Send,
L: sp_consensus::JustificationSyncLink<B>,
CIDP: CreateInherentDataProviders<B, ()> + Send,
CIDP::InherentDataProviders: InherentDataProviderExt + Send,
BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + 'static,
IDP: CreateInherentDataProviders<B, ()> + Send,
IDP::InherentDataProviders: InherentDataProviderExt + Send,
CAW: CanAuthorWith<B> + Send,
Error: std::error::Error + Send + From<sp_consensus::Error> + 'static,
{
let worker = build_aura_worker::<P, _, _, _, _, _, _, _>(BuildAuraWorkerParams {
let worker = build_aura_worker::<P, _, _, _, _, _, _, _, _>(BuildAuraWorkerParams {
client: client.clone(),
block_import,
proposer_factory,
keystore,
sync_oracle: sync_oracle.clone(),
justification_sync_link,
force_authoring,
backoff_authoring_blocks,
telemetry,
Expand All @@ -200,7 +205,7 @@ pub fn start_aura<P, B, C, SC, PF, I, SO, CAW, BS, Error, IDP>(
}

/// Parameters of [`build_aura_worker`].
pub struct BuildAuraWorkerParams<C, I, PF, SO, BS> {
pub struct BuildAuraWorkerParams<C, I, PF, SO, L, BS> {
/// The client to interact with the chain.
pub client: Arc<C>,
/// The block import.
Expand All @@ -209,6 +214,8 @@ pub struct BuildAuraWorkerParams<C, I, PF, SO, BS> {
pub proposer_factory: PF,
/// The sync oracle that can give us the current sync status.
pub sync_oracle: SO,
/// Hook into the sync module to control the justification sync process.
pub justification_sync_link: L,
/// Should we force the authoring of blocks?
pub force_authoring: bool,
/// The backoff strategy when we miss slots.
Expand All @@ -228,18 +235,19 @@ pub struct BuildAuraWorkerParams<C, I, PF, SO, BS> {
/// Build the aura worker.
///
/// The caller is responsible for running this worker, otherwise it will do nothing.
pub fn build_aura_worker<P, B, C, PF, I, SO, BS, Error>(
pub fn build_aura_worker<P, B, C, PF, I, SO, L, BS, Error>(
BuildAuraWorkerParams {
client,
block_import,
proposer_factory,
sync_oracle,
justification_sync_link,
backoff_authoring_blocks,
keystore,
block_proposal_slot_portion,
telemetry,
force_authoring,
}: BuildAuraWorkerParams<C, I, PF, SO, BS>,
}: BuildAuraWorkerParams<C, I, PF, SO, L, BS>,
) -> impl sc_consensus_slots::SlotWorker<B, <PF::Proposer as Proposer<B>>::Proof> where
B: BlockT,
C: ProvideRuntimeApi<B> + BlockOf + ProvideCache<B> + AuxStore + HeaderBackend<B> + Send + Sync,
Expand All @@ -252,6 +260,7 @@ pub fn build_aura_worker<P, B, C, PF, I, SO, BS, Error>(
I: BlockImport<B, Transaction = sp_api::TransactionFor<C, B>> + Send + Sync + 'static,
Error: std::error::Error + Send + From<sp_consensus::Error> + 'static,
SO: SyncOracle + Send + Sync + Clone,
L: sp_consensus::JustificationSyncLink<B>,
BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + 'static,
{
AuraWorker {
Expand All @@ -260,6 +269,7 @@ pub fn build_aura_worker<P, B, C, PF, I, SO, BS, Error>(
env: proposer_factory,
keystore,
sync_oracle,
justification_sync_link,
force_authoring,
backoff_authoring_blocks,
telemetry,
Expand All @@ -268,21 +278,22 @@ pub fn build_aura_worker<P, B, C, PF, I, SO, BS, Error>(
}
}

struct AuraWorker<C, E, I, P, SO, BS> {
struct AuraWorker<C, E, I, P, SO, L, BS> {
client: Arc<C>,
block_import: I,
env: E,
keystore: SyncCryptoStorePtr,
sync_oracle: SO,
justification_sync_link: L,
force_authoring: bool,
backoff_authoring_blocks: Option<BS>,
block_proposal_slot_portion: SlotProportion,
telemetry: Option<TelemetryHandle>,
_key_type: PhantomData<P>,
}

impl<B, C, E, I, P, Error, SO, BS> sc_consensus_slots::SimpleSlotWorker<B>
for AuraWorker<C, E, I, P, SO, BS>
impl<B, C, E, I, P, Error, SO, L, BS> sc_consensus_slots::SimpleSlotWorker<B>
for AuraWorker<C, E, I, P, SO, L, BS>
where
B: BlockT,
C: ProvideRuntimeApi<B> + BlockOf + ProvideCache<B> + HeaderBackend<B> + Sync,
Expand All @@ -294,11 +305,13 @@ where
P::Public: AppPublic + Public + Member + Encode + Decode + Hash,
P::Signature: TryFrom<Vec<u8>> + Member + Encode + Decode + Hash + Debug,
SO: SyncOracle + Send + Clone,
L: sp_consensus::JustificationSyncLink<B>,
BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + 'static,
Error: std::error::Error + Send + From<sp_consensus::Error> + 'static,
{
type BlockImport = I;
type SyncOracle = SO;
type JustificationSyncLink = L;
type CreateProposer = Pin<Box<
dyn Future<Output = Result<E::Proposer, sp_consensus::Error>> + Send + 'static
>>;
Expand Down Expand Up @@ -425,6 +438,10 @@ where
&mut self.sync_oracle
}

fn justification_sync_link(&mut self) -> &mut Self::JustificationSyncLink {
&mut self.justification_sync_link
}

fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer {
Box::pin(self.env.init(block).map_err(|e| {
sp_consensus::Error::ClientImport(format!("{:?}", e)).into()
Expand Down Expand Up @@ -725,13 +742,14 @@ mod tests {

let slot_duration = slot_duration(&*client).expect("slot duration available");

aura_futures.push(start_aura::<AuthorityPair, _, _, _, _, _, _, _, _, _, _>(StartAuraParams {
aura_futures.push(start_aura::<AuthorityPair, _, _, _, _, _, _, _, _, _, _, _>(StartAuraParams {
slot_duration,
block_import: client.clone(),
select_chain,
client,
proposer_factory: environ,
sync_oracle: DummyOracle,
justification_sync_link: (),
create_inherent_data_providers: |_, _| async {
let timestamp = TimestampInherentDataProvider::from_system_time();
let slot = InherentDataProvider::from_timestamp_and_duration(
Expand Down Expand Up @@ -804,6 +822,7 @@ mod tests {
env: environ,
keystore: keystore.into(),
sync_oracle: DummyOracle.clone(),
justification_sync_link: (),
force_authoring: false,
backoff_authoring_blocks: Some(BackoffAuthoringOnFinalizedHeadLagging::default()),
telemetry: None,
Expand Down Expand Up @@ -853,6 +872,7 @@ mod tests {
env: environ,
keystore: keystore.into(),
sync_oracle: DummyOracle.clone(),
justification_sync_link: (),
force_authoring: false,
backoff_authoring_blocks: Option::<()>::None,
telemetry: None,
Expand All @@ -871,7 +891,7 @@ mod tests {
duration: Duration::from_millis(1000),
chain_head: head,
block_size_limit: None,
},
}
)).unwrap();

// The returned block should be imported and we should be able to get its header by now.
Expand Down
53 changes: 37 additions & 16 deletions client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ impl std::ops::Deref for Config {
}

/// Parameters for BABE.
pub struct BabeParams<B: BlockT, C, E, I, SO, SC, CAW, BS, IDP> {
pub struct BabeParams<B: BlockT, C, SC, E, I, SO, L, CIDP, BS, CAW> {
/// The keystore that manages the keys of the node.
pub keystore: SyncCryptoStorePtr,

Expand All @@ -384,8 +384,11 @@ pub struct BabeParams<B: BlockT, C, E, I, SO, SC, CAW, BS, IDP> {
/// A sync oracle
pub sync_oracle: SO,

/// Hook into the sync module to control the justification sync process.
pub justification_sync_link: L,

/// Something that can create the inherent data providers.
pub create_inherent_data_providers: IDP,
pub create_inherent_data_providers: CIDP,

/// Force authoring of blocks even if we are offline
pub force_authoring: bool,
Expand All @@ -411,40 +414,50 @@ pub struct BabeParams<B: BlockT, C, E, I, SO, SC, CAW, BS, IDP> {
}

/// Start the babe worker.
pub fn start_babe<B, C, SC, E, I, SO, CAW, BS, Error, IDP>(BabeParams {
pub fn start_babe<B, C, SC, E, I, SO, CIDP, BS, CAW, L, Error>(BabeParams {
keystore,
client,
select_chain,
env,
block_import,
sync_oracle,
justification_sync_link,
create_inherent_data_providers,
force_authoring,
backoff_authoring_blocks,
babe_link,
can_author_with,
block_proposal_slot_portion,
telemetry,
}: BabeParams<B, C, E, I, SO, SC, CAW, BS, IDP>) -> Result<
}: BabeParams<B, C, SC, E, I, SO, L, CIDP, BS, CAW>) -> Result<
BabeWorker<B>,
sp_consensus::Error,
> where
B: BlockT,
C: ProvideRuntimeApi<B> + ProvideCache<B> + ProvideUncles<B> + BlockchainEvents<B>
+ HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>
+ Send + Sync + 'static,
C: ProvideRuntimeApi<B>
+ ProvideCache<B>
+ ProvideUncles<B>
+ BlockchainEvents<B>
+ HeaderBackend<B>
+ HeaderMetadata<B, Error = ClientError>
+ Send
+ Sync
+ 'static,
C::Api: BabeApi<B>,
SC: SelectChain<B> + 'static,
E: Environment<B, Error = Error> + Send + Sync + 'static,
E::Proposer: Proposer<B, Error = Error, Transaction = sp_api::TransactionFor<C, B>>,
I: BlockImport<B, Error = ConsensusError, Transaction = sp_api::TransactionFor<C, B>> + Send
+ Sync + 'static,
Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
I: BlockImport<B, Error = ConsensusError, Transaction = sp_api::TransactionFor<C, B>>
+ Send
+ Sync
+ 'static,
SO: SyncOracle + Send + Sync + Clone + 'static,
CAW: CanAuthorWith<B> + Send + Sync + 'static,
L: sp_consensus::JustificationSyncLink<B> + 'static,
CIDP: CreateInherentDataProviders<B, ()> + Send + Sync + 'static,
CIDP::InherentDataProviders: InherentDataProviderExt + Send,
BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + 'static,
IDP: CreateInherentDataProviders<B, ()> + Send + Sync + 'static,
IDP::InherentDataProviders: InherentDataProviderExt + Send,
CAW: CanAuthorWith<B> + Send + Sync + 'static,
Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
{
const HANDLE_BUFFER_SIZE: usize = 1024;

Expand All @@ -456,6 +469,7 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, BS, Error, IDP>(BabeParams {
block_import,
env,
sync_oracle: sync_oracle.clone(),
justification_sync_link,
force_authoring,
backoff_authoring_blocks,
keystore,
Expand Down Expand Up @@ -600,11 +614,12 @@ type SlotNotificationSinks<B> = Arc<
Mutex<Vec<Sender<(Slot, ViableEpochDescriptor<<B as BlockT>::Hash, NumberFor<B>, Epoch>)>>>
>;

struct BabeSlotWorker<B: BlockT, C, E, I, SO, BS> {
struct BabeSlotWorker<B: BlockT, C, E, I, SO, L, BS> {
client: Arc<C>,
block_import: I,
env: E,
sync_oracle: SO,
justification_sync_link: L,
force_authoring: bool,
backoff_authoring_blocks: Option<BS>,
keystore: SyncCryptoStorePtr,
Expand All @@ -615,8 +630,8 @@ struct BabeSlotWorker<B: BlockT, C, E, I, SO, BS> {
telemetry: Option<TelemetryHandle>,
}

impl<B, C, E, I, Error, SO, BS> sc_consensus_slots::SimpleSlotWorker<B>
for BabeSlotWorker<B, C, E, I, SO, BS>
impl<B, C, E, I, Error, SO, L, BS> sc_consensus_slots::SimpleSlotWorker<B>
for BabeSlotWorker<B, C, E, I, SO, L, BS>
where
B: BlockT,
C: ProvideRuntimeApi<B> +
Expand All @@ -628,12 +643,14 @@ where
E::Proposer: Proposer<B, Error = Error, Transaction = sp_api::TransactionFor<C, B>>,
I: BlockImport<B, Transaction = sp_api::TransactionFor<C, B>> + Send + Sync + 'static,
SO: SyncOracle + Send + Clone,
L: sp_consensus::JustificationSyncLink<B>,
BS: BackoffAuthoringBlocksStrategy<NumberFor<B>>,
Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
{
type EpochData = ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>;
type Claim = (PreDigest, AuthorityId);
type SyncOracle = SO;
type JustificationSyncLink = L;
type CreateProposer = Pin<Box<
dyn Future<Output = Result<E::Proposer, sp_consensus::Error>> + Send + 'static
>>;
Expand Down Expand Up @@ -798,6 +815,10 @@ where
&mut self.sync_oracle
}

fn justification_sync_link(&mut self) -> &mut Self::JustificationSyncLink {
&mut self.justification_sync_link
}

fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer {
Box::pin(self.env.init(block).map_err(|e| {
sp_consensus::Error::ClientImport(format!("{:?}", e))
Expand Down
Loading