diff --git a/bin/collator/src/parachain/service.rs b/bin/collator/src/parachain/service.rs index 191dee6865..ef6b0d8619 100644 --- a/bin/collator/src/parachain/service.rs +++ b/bin/collator/src/parachain/service.rs @@ -39,16 +39,18 @@ use sc_client_api::BlockchainEvents; use sc_consensus::{import_queue::BasicQueue, ImportQueue}; use sc_executor::NativeElseWasmExecutor; use sc_network::NetworkBlock; +use sc_network_sync::SyncingService; use sc_service::{Configuration, PartialComponents, TFullBackend, TFullClient, TaskManager}; use sc_telemetry::{Telemetry, TelemetryHandle, TelemetryWorker, TelemetryWorkerHandle}; use sp_api::{ConstructRuntimeApi, ProvideRuntimeApi}; use sp_consensus_aura::{ sr25519::AuthorityId as AuraId, sr25519::AuthorityPair as AuraPair, AuraApi, }; -use sp_core::traits::SpawnEssentialNamed; +use sp_keystore::KeystorePtr; use sp_runtime::traits::BlakeTwo256; use sp_runtime::Percent; use std::{collections::BTreeMap, sync::Arc, time::Duration}; +use substrate_prometheus_endpoint::Registry; use super::shell_upgrade::*; @@ -289,13 +291,14 @@ async fn build_relay_chain_interface( /// This is the actual implementation that is abstract over the executor and the runtime api. #[cfg(not(feature = "evm-tracing"))] #[sc_tracing::logging::prefix_logs_with("Parachain")] -async fn start_node_impl( +async fn start_node_impl( parachain_config: Configuration, polkadot_config: Configuration, collator_options: CollatorOptions, id: ParaId, additional_config: AdditionalConfig, build_import_queue: BIQ, + start_consensus: SC, ) -> sc_service::error::Result<( TaskManager, Arc>>, @@ -334,6 +337,33 @@ where Option, &TaskManager, ) -> Result, sc_service::Error>, + SC: FnOnce( + Arc>>, + ParachainBlockImport< + Block, + FrontierBlockImport< + Block, + Arc>>, + TFullClient>, + >, + TFullBackend, + >, + Option<&Registry>, + Option, + &TaskManager, + Arc, + Arc< + sc_transaction_pool::FullPool< + Block, + TFullClient>, + >, + >, + Arc>, + KeystorePtr, + ParaId, + CollatorPair, + AdditionalConfig, + ) -> Result<(), sc_service::Error>, { let parachain_config = prepare_node_config(parachain_config); @@ -517,115 +547,25 @@ where }, })?; - start_network.start_network(); - if is_authority { - let mut proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording( - task_manager.spawn_handle(), + start_consensus( client.clone(), - transaction_pool, + parachain_block_import, prometheus_registry.as_ref(), - telemetry.as_ref().map(|t| t.handle()), - ); - - proposer_factory.set_default_block_size_limit(additional_config.proposer_block_size_limit); - proposer_factory.set_soft_deadline(Percent::from_percent( - additional_config.proposer_soft_deadline_percent, - )); - - let overseer_handle = relay_chain_interface - .overseer_handle() - .map_err(|e| sc_service::Error::Application(Box::new(e)))?; - - let collator_key = collator_key.expect("Command line arguments do not allow this. qed"); - - let spawner = task_manager.spawn_handle(); - let client_ = client.clone(); - - let collation_future = Box::pin(async move { - use parity_scale_codec::Decode; - use sp_api::ApiExt; - use sp_runtime::traits::Block as BlockT; - - let client = client_.clone(); - - // Start collating with the `shell` runtime while waiting for an upgrade to an Aura - // compatible runtime. - let mut request_stream = cumulus_client_collator::relay_chain_driven::init( - collator_key.clone(), - id, - overseer_handle.clone(), - ) - .await; - - while let Some(request) = request_stream.next().await { - let pvd = request.persisted_validation_data().clone(); - let last_head_hash = - match ::Header::decode(&mut &pvd.parent_head.0[..]) { - Ok(header) => header.hash(), - Err(e) => { - log::error!("Could not decode the head data: {e}"); - request.complete(None); - continue; - } - }; - - // Check if we have upgraded to an Aura compatible runtime and transition if - // necessary. - if client - .runtime_api() - .has_api::>(last_head_hash) - .unwrap_or_default() - { - // Respond to this request before transitioning to Aura. - request.complete(None); - break; - } - } - - // Move to Aura consensus. - let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client) - .expect("aura is present; qed"); - - let announce_block = { - let sync_service = sync_service.clone(); - Arc::new(move |hash, data| sync_service.announce_block(hash, data)) - }; - - let collator_service = cumulus_client_collator::service::CollatorService::new( - client.clone(), - Arc::new(spawner), - announce_block, - client.clone(), - ); - - let params = basic_aura::Params { - create_inherent_data_providers: move |_, ()| async move { Ok(()) }, - block_import: parachain_block_import.clone(), - para_client: client.clone(), - relay_client: relay_chain_interface.clone(), - sync_oracle: sync_service.clone(), - keystore: params.keystore_container.keystore(), - collator_key, - para_id: id, - overseer_handle, - slot_duration, - relay_chain_slot_duration: Duration::from_secs(6), - proposer: cumulus_client_consensus_proposer::Proposer::new(proposer_factory), - collator_service, - // We got around 500ms for proposing - authoring_duration: Duration::from_millis(500), - collation_request_receiver: None, - }; - - basic_aura::run::(params).await - }); - - task_manager - .spawn_essential_handle() - .spawn_essential("aura", None, collation_future); + telemetry.map(|t| t.handle()), + &mut task_manager, + relay_chain_interface, + transaction_pool, + sync_service, + params.keystore_container.keystore(), + id, + collator_key.expect("Command line arguments do not allow this. qed"), + additional_config, + )?; } + start_network.start_network(); + Ok((task_manager, client)) } @@ -651,13 +591,14 @@ pub struct AdditionalConfig { /// This is the actual implementation that is abstract over the executor and the runtime api. #[cfg(feature = "evm-tracing")] #[sc_tracing::logging::prefix_logs_with("Parachain")] -async fn start_node_impl( +async fn start_node_impl( parachain_config: Configuration, polkadot_config: Configuration, collator_options: CollatorOptions, id: ParaId, additional_config: AdditionalConfig, build_import_queue: BIQ, + start_consensus: SC, ) -> sc_service::error::Result<( TaskManager, Arc>>, @@ -698,6 +639,33 @@ where Option, &TaskManager, ) -> Result, sc_service::Error>, + SC: FnOnce( + Arc>>, + ParachainBlockImport< + Block, + FrontierBlockImport< + Block, + Arc>>, + TFullClient>, + >, + TFullBackend, + >, + Option<&Registry>, + Option, + &TaskManager, + Arc, + Arc< + sc_transaction_pool::FullPool< + Block, + TFullClient>, + >, + >, + Arc>, + KeystorePtr, + ParaId, + CollatorPair, + AdditionalConfig, + ) -> Result<(), sc_service::Error>, { let parachain_config = prepare_node_config(parachain_config); @@ -912,115 +880,25 @@ where }, })?; - start_network.start_network(); - if is_authority { - let mut proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording( - task_manager.spawn_handle(), + start_consensus( client.clone(), - transaction_pool, + parachain_block_import, prometheus_registry.as_ref(), - telemetry.as_ref().map(|t| t.handle()), - ); - - proposer_factory.set_default_block_size_limit(additional_config.proposer_block_size_limit); - proposer_factory.set_soft_deadline(Percent::from_percent( - additional_config.proposer_soft_deadline_percent, - )); - - let overseer_handle = relay_chain_interface - .overseer_handle() - .map_err(|e| sc_service::Error::Application(Box::new(e)))?; - - let collator_key = collator_key.expect("Command line arguments do not allow this. qed"); - - let spawner = task_manager.spawn_handle(); - let client_ = client.clone(); - - let collation_future = Box::pin(async move { - use parity_scale_codec::Decode; - use sp_api::ApiExt; - use sp_runtime::traits::Block as BlockT; - - let client = client_.clone(); - - // Start collating with the `shell` runtime while waiting for an upgrade to an Aura - // compatible runtime. - let mut request_stream = cumulus_client_collator::relay_chain_driven::init( - collator_key.clone(), - id, - overseer_handle.clone(), - ) - .await; - - while let Some(request) = request_stream.next().await { - let pvd = request.persisted_validation_data().clone(); - let last_head_hash = - match ::Header::decode(&mut &pvd.parent_head.0[..]) { - Ok(header) => header.hash(), - Err(e) => { - log::error!("Could not decode the head data: {e}"); - request.complete(None); - continue; - } - }; - - // Check if we have upgraded to an Aura compatible runtime and transition if - // necessary. - if client - .runtime_api() - .has_api::>(last_head_hash) - .unwrap_or_default() - { - // Respond to this request before transitioning to Aura. - request.complete(None); - break; - } - } - - // Move to Aura consensus. - let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client) - .expect("aura is present; qed"); - - let announce_block = { - let sync_service = sync_service.clone(); - Arc::new(move |hash, data| sync_service.announce_block(hash, data)) - }; - - let collator_service = cumulus_client_collator::service::CollatorService::new( - client.clone(), - Arc::new(spawner), - announce_block, - client.clone(), - ); - - let params = basic_aura::Params { - create_inherent_data_providers: move |_, ()| async move { Ok(()) }, - block_import: parachain_block_import.clone(), - para_client: client.clone(), - relay_client: relay_chain_interface.clone(), - sync_oracle: sync_service.clone(), - keystore: params.keystore_container.keystore(), - collator_key, - para_id: id, - overseer_handle, - slot_duration, - relay_chain_slot_duration: Duration::from_secs(6), - proposer: cumulus_client_consensus_proposer::Proposer::new(proposer_factory), - collator_service, - // We got around 500ms for proposing - authoring_duration: Duration::from_millis(500), - collation_request_receiver: None, - }; - - basic_aura::run::(params).await - }); - - task_manager - .spawn_essential_handle() - .spawn_essential("aura", None, collation_future); + telemetry.map(|t| t.handle()), + &mut task_manager, + relay_chain_interface, + transaction_pool, + sync_service, + params.keystore_container.keystore(), + id, + collator_key.expect("Command line arguments do not allow this. qed"), + additional_config, + )?; } + start_network.start_network(); + Ok((task_manager, client)) } @@ -1169,6 +1047,254 @@ where }).map_err(Into::into) } +/// Start collating with the `shell` runtime while waiting for an upgrade to an Aura compatible runtime. +fn start_aura_consensus_fallback( + client: Arc>>, + parachain_block_import: ParachainBlockImport< + Block, + FrontierBlockImport< + Block, + Arc>>, + TFullClient>, + >, + TFullBackend, + >, + prometheus_registry: Option<&Registry>, + telemetry: Option, + task_manager: &TaskManager, + relay_chain_interface: Arc, + transaction_pool: Arc< + sc_transaction_pool::FullPool< + Block, + TFullClient>, + >, + >, + sync_oracle: Arc>, + keystore: KeystorePtr, + para_id: ParaId, + collator_key: CollatorPair, + additional_config: AdditionalConfig, +) -> Result<(), sc_service::Error> +where + RuntimeApi: ConstructRuntimeApi>> + + Send + + Sync + + 'static, + RuntimeApi::RuntimeApi: sp_transaction_pool::runtime_api::TaggedTransactionQueue + + sp_api::Metadata + + sp_session::SessionKeys + + sp_api::ApiExt + + sp_offchain::OffchainWorkerApi + + sp_block_builder::BlockBuilder + + fp_rpc::EthereumRuntimeRPCApi + + AuraApi + + cumulus_primitives_core::CollectCollationInfo, + sc_client_api::StateBackendFor, Block>: sp_api::StateBackend, + Executor: sc_executor::NativeExecutionDispatch + 'static, +{ + let mut proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording( + task_manager.spawn_handle(), + client.clone(), + transaction_pool, + prometheus_registry, + telemetry, + ); + + proposer_factory.set_default_block_size_limit(additional_config.proposer_block_size_limit); + proposer_factory.set_soft_deadline(Percent::from_percent( + additional_config.proposer_soft_deadline_percent, + )); + + let overseer_handle = relay_chain_interface + .overseer_handle() + .map_err(|e| sc_service::Error::Application(Box::new(e)))?; + + let spawner = task_manager.spawn_handle(); + let client_ = client.clone(); + + let collation_future = Box::pin(async move { + use parity_scale_codec::Decode; + use sp_api::ApiExt; + use sp_runtime::traits::Block as BlockT; + + let client = client_.clone(); + + // Start collating with the `shell` runtime while waiting for an upgrade to an Aura + // compatible runtime. + let mut request_stream = cumulus_client_collator::relay_chain_driven::init( + collator_key.clone(), + para_id.clone(), + overseer_handle.clone(), + ) + .await; + + while let Some(request) = request_stream.next().await { + let pvd = request.persisted_validation_data().clone(); + let last_head_hash = + match ::Header::decode(&mut &pvd.parent_head.0[..]) { + Ok(header) => header.hash(), + Err(e) => { + log::error!("Could not decode the head data: {e}"); + request.complete(None); + continue; + } + }; + + // Check if we have upgraded to an Aura compatible runtime and transition if + // necessary. + if client + .runtime_api() + .has_api::>(last_head_hash) + .unwrap_or_default() + { + // Respond to this request before transitioning to Aura. + request.complete(None); + break; + } + } + + // Move to Aura consensus. + let slot_duration = + cumulus_client_consensus_aura::slot_duration(&*client).expect("aura is present; qed"); + + let announce_block = { + let sync_service = sync_oracle.clone(); + Arc::new(move |hash, data| sync_service.announce_block(hash, data)) + }; + + let collator_service = cumulus_client_collator::service::CollatorService::new( + client.clone(), + Arc::new(spawner), + announce_block, + client.clone(), + ); + + basic_aura::run::(basic_aura::Params { + create_inherent_data_providers: move |_, ()| async move { Ok(()) }, + block_import: parachain_block_import.clone(), + para_client: client.clone(), + relay_client: relay_chain_interface.clone(), + sync_oracle: sync_oracle.clone(), + keystore, + collator_key, + para_id, + overseer_handle, + slot_duration, + relay_chain_slot_duration: Duration::from_secs(6), + proposer: cumulus_client_consensus_proposer::Proposer::new(proposer_factory), + collator_service, + // We got around 500ms for proposing + authoring_duration: Duration::from_millis(500), + collation_request_receiver: Some(request_stream), + }) + .await + }); + + task_manager + .spawn_essential_handle() + .spawn("aura", None, collation_future); + Ok(()) +} + +fn start_aura_consensus( + client: Arc>>, + parachain_block_import: ParachainBlockImport< + Block, + FrontierBlockImport< + Block, + Arc>>, + TFullClient>, + >, + TFullBackend, + >, + prometheus_registry: Option<&Registry>, + telemetry: Option, + task_manager: &TaskManager, + relay_chain_interface: Arc, + transaction_pool: Arc< + sc_transaction_pool::FullPool< + Block, + TFullClient>, + >, + >, + sync_oracle: Arc>, + keystore: KeystorePtr, + para_id: ParaId, + collator_key: CollatorPair, + additional_config: AdditionalConfig, +) -> Result<(), sc_service::Error> +where + RuntimeApi: ConstructRuntimeApi>> + + Send + + Sync + + 'static, + RuntimeApi::RuntimeApi: sp_transaction_pool::runtime_api::TaggedTransactionQueue + + sp_api::Metadata + + sp_session::SessionKeys + + sp_api::ApiExt + + sp_offchain::OffchainWorkerApi + + sp_block_builder::BlockBuilder + + fp_rpc::EthereumRuntimeRPCApi + + AuraApi + + cumulus_primitives_core::CollectCollationInfo, + sc_client_api::StateBackendFor, Block>: sp_api::StateBackend, + Executor: sc_executor::NativeExecutionDispatch + 'static, +{ + let mut proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording( + task_manager.spawn_handle(), + client.clone(), + transaction_pool, + prometheus_registry, + telemetry, + ); + + proposer_factory.set_default_block_size_limit(additional_config.proposer_block_size_limit); + proposer_factory.set_soft_deadline(Percent::from_percent( + additional_config.proposer_soft_deadline_percent, + )); + + let overseer_handle = relay_chain_interface + .overseer_handle() + .map_err(|e| sc_service::Error::Application(Box::new(e)))?; + + let announce_block = { + let sync_service = sync_oracle.clone(); + Arc::new(move |hash, data| sync_service.announce_block(hash, data)) + }; + + let collator_service = cumulus_client_collator::service::CollatorService::new( + client.clone(), + Arc::new(task_manager.spawn_handle()), + announce_block, + client.clone(), + ); + + let fut = basic_aura::run::(basic_aura::Params { + create_inherent_data_providers: move |_, ()| async move { Ok(()) }, + block_import: parachain_block_import.clone(), + para_client: client.clone(), + relay_client: relay_chain_interface.clone(), + sync_oracle: sync_oracle.clone(), + keystore, + collator_key, + para_id, + overseer_handle, + slot_duration: cumulus_client_consensus_aura::slot_duration(&*client)?, + relay_chain_slot_duration: Duration::from_secs(6), + proposer: cumulus_client_consensus_proposer::Proposer::new(proposer_factory), + collator_service, + // We got around 500ms for proposing + authoring_duration: Duration::from_millis(500), + collation_request_receiver: None, + }); + + task_manager + .spawn_essential_handle() + .spawn("aura", None, fut); + + Ok(()) +} + /// Start a parachain node for Astar. pub async fn start_astar_node( parachain_config: Configuration, @@ -1180,13 +1306,14 @@ pub async fn start_astar_node( TaskManager, Arc>>, )> { - start_node_impl::( + start_node_impl::( parachain_config, polkadot_config, collator_options, id, additional_config.clone(), build_import_queue, + start_aura_consensus, ) .await } @@ -1202,13 +1329,14 @@ pub async fn start_shiden_node( TaskManager, Arc>>, )> { - start_node_impl::( + start_node_impl::( parachain_config, polkadot_config, collator_options, id, additional_config.clone(), build_import_queue_fallback, + start_aura_consensus_fallback, ) .await } @@ -1224,13 +1352,14 @@ pub async fn start_shibuya_node( TaskManager, Arc>>, )> { - start_node_impl::( + start_node_impl::( parachain_config, polkadot_config, collator_options, id, additional_config.clone(), build_import_queue, + start_aura_consensus, ) .await }