diff --git a/Cargo.lock b/Cargo.lock index 3b7e9331f1..b2c6edb0a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -630,10 +630,8 @@ dependencies = [ "async-trait", "clap", "cumulus-client-cli", - "cumulus-client-collator", "cumulus-client-consensus-aura", "cumulus-client-consensus-common", - "cumulus-client-consensus-proposer", "cumulus-client-consensus-relay-chain", "cumulus-client-network", "cumulus-client-service", diff --git a/Cargo.toml b/Cargo.toml index 8a79c947de..f4e0835d72 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -238,8 +238,6 @@ cumulus-client-consensus-common = { git = "https://github.com/paritytech/polkado cumulus-client-consensus-relay-chain = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } cumulus-client-network = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } cumulus-client-service = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } -cumulus-client-collator = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } -cumulus-client-consensus-proposer = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } cumulus-primitives-parachain-inherent = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } cumulus-relay-chain-inprocess-interface = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } cumulus-relay-chain-interface = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } diff --git a/bin/collator/Cargo.toml b/bin/collator/Cargo.toml index fe82a8e38e..39fdafe2d4 100644 --- a/bin/collator/Cargo.toml +++ b/bin/collator/Cargo.toml @@ -108,10 +108,8 @@ sc-cli = { workspace = true, optional = true } # cumulus dependencies cumulus-client-cli = { workspace = true } -cumulus-client-collator = { workspace = true } cumulus-client-consensus-aura = { workspace = true } cumulus-client-consensus-common = { workspace = true } -cumulus-client-consensus-proposer = { workspace = true } cumulus-client-consensus-relay-chain = { workspace = true } cumulus-client-network = { workspace = true } cumulus-client-service = { workspace = true } diff --git a/bin/collator/src/command.rs b/bin/collator/src/command.rs index 88ee884483..8e2abb6ffa 100644 --- a/bin/collator/src/command.rs +++ b/bin/collator/src/command.rs @@ -220,7 +220,7 @@ pub fn run() -> Result<()> { .. } = parachain::new_partial::( &config, - parachain::build_import_queue_fallback, + parachain::build_import_queue, )?; Ok((cmd.run(client, import_queue), task_manager)) }) @@ -261,7 +261,7 @@ pub fn run() -> Result<()> { .. } = parachain::new_partial::( &config, - parachain::build_import_queue_fallback, + parachain::build_import_queue, )?; Ok((cmd.run(client, config.database), task_manager)) }) @@ -301,7 +301,7 @@ pub fn run() -> Result<()> { .. } = parachain::new_partial::( &config, - parachain::build_import_queue_fallback, + parachain::build_import_queue, )?; Ok((cmd.run(client, config.chain_spec), task_manager)) }) @@ -343,7 +343,7 @@ pub fn run() -> Result<()> { .. } = parachain::new_partial::( &config, - parachain::build_import_queue_fallback, + parachain::build_import_queue, )?; Ok((cmd.run(client, import_queue), task_manager)) }) @@ -409,7 +409,7 @@ pub fn run() -> Result<()> { .. } = parachain::new_partial::( &config, - parachain::build_import_queue_fallback, + parachain::build_import_queue, )?; let aux_revert = Box::new(|client, _, blocks| { sc_consensus_grandpa::revert(client, blocks)?; @@ -452,7 +452,7 @@ pub fn run() -> Result<()> { let PartialComponents { client, .. } = parachain::new_partial::( &config, - parachain::build_import_queue_fallback, + parachain::build_import_queue, )?; cmd.run(config.chain_spec.as_ref(), client.as_ref()) }) @@ -522,7 +522,7 @@ pub fn run() -> Result<()> { let params = parachain::new_partial::( &config, - parachain::build_import_queue_fallback, + parachain::build_import_queue, )?; cmd.run(params.client) }) @@ -562,7 +562,7 @@ pub fn run() -> Result<()> { let params = parachain::new_partial::( &config, - parachain::build_import_queue_fallback, + parachain::build_import_queue, )?; let db = params.backend.expose_db(); let storage = params.backend.expose_storage(); @@ -618,7 +618,7 @@ pub fn run() -> Result<()> { let params = parachain::new_partial::( &config, - parachain::build_import_queue_fallback, + parachain::build_import_queue, )?; let ext_builder = RemarkBuilder::new(params.client.clone()); @@ -700,7 +700,7 @@ pub fn run() -> Result<()> { let params = parachain::new_partial::( &config, - parachain::build_import_queue_fallback, + parachain::build_import_queue, )?; let remark_builder = RemarkBuilder::new(params.client.clone()); let tka_builder = TransferKeepAliveBuilder::new( @@ -788,8 +788,14 @@ pub fn run() -> Result<()> { }; runner.run_node_until_exit(|config| async move { + #[cfg(feature = "evm-tracing")] + if config.chain_spec.is_dev() { + return local::start_node(config, evm_tracing_config).map_err(Into::into); + } + + #[cfg(not(feature = "evm-tracing"))] if config.chain_spec.is_dev() { - return local::start_node(config, #[cfg(feature = "evm-tracing")] evm_tracing_config).map_err(Into::into); + return local::start_node(config).map_err(Into::into); } let polkadot_cli = RelayChainCli::new( diff --git a/bin/collator/src/local/service.rs b/bin/collator/src/local/service.rs index 5e486d88cd..59e98cd09c 100644 --- a/bin/collator/src/local/service.rs +++ b/bin/collator/src/local/service.rs @@ -22,18 +22,14 @@ use fc_consensus::FrontierBlockImport; use fc_rpc_core::types::{FeeHistoryCache, FilterPool}; use futures::{FutureExt, StreamExt}; use sc_client_api::{Backend, BlockBackend, BlockchainEvents}; +use sc_consensus_aura::{ImportQueueParams, SlotProportion, StartAuraParams}; use sc_consensus_grandpa::SharedVoterState; use sc_executor::NativeElseWasmExecutor; use sc_service::{error::Error as ServiceError, Configuration, TaskManager}; use sc_telemetry::{Telemetry, TelemetryWorker}; use sc_transaction_pool_api::OffchainTransactionPoolFactory; -use std::{collections::BTreeMap, sync::Arc, time::Duration}; - -#[cfg(not(feature = "manual-seal"))] use sp_consensus_aura::sr25519::AuthorityPair as AuraPair; - -#[cfg(feature = "evm-tracing")] -use crate::{evm_tracing_types::EthApi as EthApiCmd, rpc::tracing}; +use std::{collections::BTreeMap, sync::Arc, time::Duration}; pub use local_runtime::RuntimeApi; @@ -142,6 +138,7 @@ pub fn new_partial( let frontier_backend = crate::rpc::open_frontier_backend(client.clone(), config)?; let frontier_block_import = FrontierBlockImport::new(grandpa_block_import.clone(), client.clone()); + let slot_duration = sc_consensus_aura::slot_duration(&*client)?; #[cfg(feature = "manual-seal")] let import_queue = sc_consensus_manual_seal::import_queue( @@ -151,30 +148,27 @@ pub fn new_partial( ); #[cfg(not(feature = "manual-seal"))] - let import_queue = { - let slot_duration = sc_consensus_aura::slot_duration(&*client)?; - sc_consensus_aura::import_queue::( - sc_consensus_aura::ImportQueueParams { - block_import: frontier_block_import.clone(), - justification_import: Some(Box::new(grandpa_block_import)), - client: client.clone(), - create_inherent_data_providers: move |_, ()| async move { - let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); - let slot = - sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( - *timestamp, - slot_duration, - ); - Ok((slot, timestamp)) - }, - spawner: &task_manager.spawn_essential_handle(), - registry: config.prometheus_registry(), - check_for_equivocation: Default::default(), - telemetry: telemetry.as_ref().map(|x| x.handle()), - compatibility_mode: Default::default(), + let import_queue = sc_consensus_aura::import_queue::( + ImportQueueParams { + block_import: frontier_block_import.clone(), + justification_import: Some(Box::new(grandpa_block_import)), + client: client.clone(), + create_inherent_data_providers: move |_, ()| async move { + let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); + let slot = + sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( + *timestamp, + slot_duration, + ); + Ok((slot, timestamp)) }, - )? - }; + spawner: &task_manager.spawn_essential_handle(), + registry: config.prometheus_registry(), + check_for_equivocation: Default::default(), + telemetry: telemetry.as_ref().map(|x| x.handle()), + compatibility_mode: Default::default(), + }, + )?; Ok(sc_service::PartialComponents { client, @@ -194,10 +188,14 @@ pub fn new_partial( } /// Builds a new service. +#[cfg(feature = "evm-tracing")] pub fn start_node( config: Configuration, - #[cfg(feature = "evm-tracing")] evm_tracing_config: crate::evm_tracing_types::EvmTracingConfig, + evm_tracing_config: crate::evm_tracing_types::EvmTracingConfig, ) -> Result { + use crate::evm_tracing_types::EthApi as EthApiCmd; + use crate::rpc::tracing; + let sc_service::PartialComponents { client, backend, @@ -266,10 +264,7 @@ pub fn start_node( > = Default::default(); let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks); - #[cfg(feature = "evm-tracing")] let ethapi_cmd = evm_tracing_config.ethapi.clone(); - - #[cfg(feature = "evm-tracing")] let tracing_requesters = if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) { tracing::spawn_tracing_tasks( @@ -336,12 +331,9 @@ pub fn start_node( ), ); - #[cfg(not(feature = "manual-seal"))] + let role = config.role.clone(); let force_authoring = config.force_authoring; - #[cfg(not(feature = "manual-seal"))] let backoff_authoring_blocks: Option<()> = None; - - let role = config.role.clone(); let name = config.network.node_name.clone(); let enable_grandpa = !config.disable_grandpa; let prometheus_registry = config.prometheus_registry().cloned(); @@ -355,7 +347,6 @@ pub fn start_node( prometheus_registry.clone(), )); - // Channel for the rpc handler to communicate with the authorship task. #[cfg(feature = "manual-seal")] let (command_sink, commands_stream) = futures::channel::mpsc::channel(1024); @@ -363,6 +354,11 @@ pub fn start_node( let client = client.clone(); let network = network.clone(); let transaction_pool = transaction_pool.clone(); + let rpc_config = crate::rpc::EvmTracingConfig { + tracing_requesters, + trace_filter_max_count: evm_tracing_config.ethapi_trace_max_count, + enable_txpool: ethapi_cmd.contains(&EthApiCmd::TxPool), + }; let sync = sync_service.clone(); let pubsub_notification_sinks = pubsub_notification_sinks.clone(); @@ -390,12 +386,7 @@ pub fn start_node( deps, subscription, pubsub_notification_sinks.clone(), - #[cfg(feature = "evm-tracing")] - crate::rpc::EvmTracingConfig { - tracing_requesters: tracing_requesters.clone(), - trace_filter_max_count: evm_tracing_config.ethapi_trace_max_count, - enable_txpool: ethapi_cmd.contains(&EthApiCmd::TxPool), - }, + rpc_config.clone(), ) .map_err::(Into::into) }) @@ -455,7 +446,318 @@ pub fn start_node( #[cfg(not(feature = "manual-seal"))] let aura = sc_consensus_aura::start_aura::( - sc_consensus_aura::StartAuraParams { + StartAuraParams { + slot_duration, + client, + select_chain, + block_import, + proposer_factory, + create_inherent_data_providers: move |_, ()| async move { + let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); + + let slot = + sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( + *timestamp, + slot_duration, + ); + + Ok((slot, timestamp)) + }, + force_authoring, + backoff_authoring_blocks, + keystore: keystore_container.keystore(), + sync_oracle: sync_service.clone(), + justification_sync_link: sync_service.clone(), + block_proposal_slot_portion: SlotProportion::new(2f32 / 3f32), + max_block_proposal_slot_portion: None, + telemetry: telemetry.as_ref().map(|x| x.handle()), + compatibility_mode: Default::default(), + }, + )?; + + // the AURA authoring task is considered essential, i.e. if it + // fails we take down the service with it. + task_manager + .spawn_essential_handle() + .spawn_blocking("aura", Some("block-authoring"), aura); + } + + // if the node isn't actively participating in consensus then it doesn't + // need a keystore, regardless of which protocol we use below. + let keystore = if role.is_authority() { + Some(keystore_container.keystore()) + } else { + None + }; + + let grandpa_config = sc_consensus_grandpa::Config { + // FIXME #1578 make this available through chainspec + gossip_duration: Duration::from_millis(333), + justification_generation_period: GRANDPA_JUSTIFICATION_PERIOD, + name: Some(name), + observer_enabled: false, + keystore, + local_role: role, + telemetry: telemetry.as_ref().map(|x| x.handle()), + protocol_name, + }; + + if enable_grandpa { + // start the full GRANDPA voter + // NOTE: non-authorities could run the GRANDPA observer protocol, but at + // this point the full voter should provide better guarantees of block + // and vote data availability than the observer. The observer has not + // been tested extensively yet and having most nodes in a network run it + // could lead to finality stalls. + let grandpa_config = sc_consensus_grandpa::GrandpaParams { + config: grandpa_config, + link: grandpa_link, + network, + sync: sync_service, + voting_rule: sc_consensus_grandpa::VotingRulesBuilder::default().build(), + prometheus_registry, + shared_voter_state: SharedVoterState::empty(), + telemetry: telemetry.as_ref().map(|x| x.handle()), + offchain_tx_pool_factory: OffchainTransactionPoolFactory::new(transaction_pool), + }; + + // the GRANDPA voter task is considered infallible, i.e. + // if it fails we take down the service with it. + task_manager.spawn_essential_handle().spawn_blocking( + "grandpa-voter", + None, + sc_consensus_grandpa::run_grandpa_voter(grandpa_config)?, + ); + } + + network_starter.start_network(); + Ok(task_manager) +} + +/// Builds a new service. +#[cfg(not(feature = "evm-tracing"))] +pub fn start_node(config: Configuration) -> Result { + let sc_service::PartialComponents { + client, + backend, + mut task_manager, + import_queue, + keystore_container, + select_chain, + transaction_pool, + other: (block_import, grandpa_link, mut telemetry, frontier_backend), + } = new_partial(&config)?; + + let protocol_name = sc_consensus_grandpa::protocol_standard_name( + &client + .block_hash(0) + .ok() + .flatten() + .expect("Genesis block exists; qed"), + &config.chain_spec, + ); + let net_config = sc_network::config::FullNetworkConfiguration::new(&config.network); + + let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) = + sc_service::build_network(sc_service::BuildNetworkParams { + config: &config, + net_config, + client: client.clone(), + transaction_pool: transaction_pool.clone(), + spawn_handle: task_manager.spawn_handle(), + import_queue, + block_announce_validator_builder: None, + warp_sync_params: None, + block_relay: None, + })?; + + if config.offchain_worker.enabled { + task_manager.spawn_handle().spawn( + "offchain-workers-runner", + "offchain-work", + sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions { + runtime_api_provider: client.clone(), + keystore: Some(keystore_container.keystore()), + offchain_db: backend.offchain_storage(), + transaction_pool: Some(OffchainTransactionPoolFactory::new( + transaction_pool.clone(), + )), + network_provider: network.clone(), + is_validator: config.role.is_authority(), + enable_http_requests: true, + custom_extensions: move |_| vec![], + }) + .run(client.clone(), task_manager.spawn_handle()) + .boxed(), + ); + } + + let filter_pool: FilterPool = Arc::new(std::sync::Mutex::new(BTreeMap::new())); + let fee_history_cache: FeeHistoryCache = Arc::new(std::sync::Mutex::new(BTreeMap::new())); + let overrides = fc_storage::overrides_handle(client.clone()); + + // Sinks for pubsub notifications. + // Everytime a new subscription is created, a new mpsc channel is added to the sink pool. + // The MappingSyncWorker sends through the channel on block import and the subscription emits a notification to the subscriber on receiving a message through this channel. + // This way we avoid race conditions when using native substrate block import notification stream. + let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks< + fc_mapping_sync::EthereumBlockNotification, + > = Default::default(); + let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks); + + // Frontier offchain DB task. Essential. + // Maps emulated ethereum data to substrate native data. + task_manager.spawn_essential_handle().spawn( + "frontier-mapping-sync-worker", + Some("frontier"), + fc_mapping_sync::kv::MappingSyncWorker::new( + client.import_notification_stream(), + Duration::new(6, 0), + client.clone(), + backend.clone(), + overrides.clone(), + frontier_backend.clone(), + 3, + 0, + fc_mapping_sync::SyncStrategy::Parachain, + sync_service.clone(), + pubsub_notification_sinks.clone(), + ) + .for_each(|()| futures::future::ready(())), + ); + + // Frontier `EthFilterApi` maintenance. Manages the pool of user-created Filters. + // Each filter is allowed to stay in the pool for 100 blocks. + const FILTER_RETAIN_THRESHOLD: u64 = 100; + task_manager.spawn_essential_handle().spawn( + "frontier-filter-pool", + Some("frontier"), + fc_rpc::EthTask::filter_pool_task( + client.clone(), + filter_pool.clone(), + FILTER_RETAIN_THRESHOLD, + ), + ); + + const FEE_HISTORY_LIMIT: u64 = 2048; + task_manager.spawn_essential_handle().spawn( + "frontier-fee-history", + Some("frontier"), + fc_rpc::EthTask::fee_history_task( + client.clone(), + overrides.clone(), + fee_history_cache.clone(), + FEE_HISTORY_LIMIT, + ), + ); + + let role = config.role.clone(); + let force_authoring = config.force_authoring; + let backoff_authoring_blocks: Option<()> = None; + let name = config.network.node_name.clone(); + let enable_grandpa = !config.disable_grandpa; + let prometheus_registry = config.prometheus_registry().cloned(); + let is_authority = config.role.is_authority(); + + let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new( + task_manager.spawn_handle(), + overrides.clone(), + 50, + 50, + prometheus_registry.clone(), + )); + + // Channel for the rpc handler to communicate with the authorship task. + #[cfg(feature = "manual-seal")] + let (command_sink, commands_stream) = futures::channel::mpsc::channel(1024); + + let rpc_extensions_builder = { + let client = client.clone(); + let network = network.clone(); + let sync = sync_service.clone(); + let transaction_pool = transaction_pool.clone(); + let pubsub_notification_sinks = pubsub_notification_sinks.clone(); + + Box::new(move |deny_unsafe, subscription| { + let deps = crate::rpc::FullDeps { + client: client.clone(), + pool: transaction_pool.clone(), + graph: transaction_pool.pool().clone(), + network: network.clone(), + sync: sync.clone(), + is_authority, + deny_unsafe, + frontier_backend: frontier_backend.clone(), + filter_pool: filter_pool.clone(), + fee_history_limit: FEE_HISTORY_LIMIT, + fee_history_cache: fee_history_cache.clone(), + block_data_cache: block_data_cache.clone(), + overrides: overrides.clone(), + enable_evm_rpc: true, // enable EVM RPC for dev node by default + #[cfg(feature = "manual-seal")] + command_sink: Some(command_sink.clone()), + }; + + crate::rpc::create_full(deps, subscription, pubsub_notification_sinks.clone()) + .map_err::(Into::into) + }) + }; + + let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams { + network: network.clone(), + client: client.clone(), + keystore: keystore_container.keystore(), + task_manager: &mut task_manager, + transaction_pool: transaction_pool.clone(), + rpc_builder: rpc_extensions_builder, + backend, + system_rpc_tx, + tx_handler_controller, + sync_service: sync_service.clone(), + config, + telemetry: telemetry.as_mut(), + })?; + + if role.is_authority() { + let proposer_factory = sc_basic_authorship::ProposerFactory::new( + task_manager.spawn_handle(), + client.clone(), + transaction_pool.clone(), + prometheus_registry.as_ref(), + telemetry.as_ref().map(|x| x.handle()), + ); + + let slot_duration = sc_consensus_aura::slot_duration(&*client)?; + + #[cfg(feature = "manual-seal")] + let aura = sc_consensus_manual_seal::run_manual_seal( + sc_consensus_manual_seal::ManualSealParams { + block_import, + env: proposer_factory, + client: client.clone(), + pool: transaction_pool.clone(), + commands_stream, + select_chain, + consensus_data_provider: Some(Box::new( + sc_consensus_manual_seal::consensus::aura::AuraConsensusDataProvider::new( + client.clone(), + ), + )), + create_inherent_data_providers: move |_, ()| async move { + let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); + let slot = + sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( + *timestamp, + slot_duration.clone(), + ); + Ok((slot, timestamp)) + }, + }, + ); + + #[cfg(not(feature = "manual-seal"))] + let aura = sc_consensus_aura::start_aura::( + StartAuraParams { slot_duration, client, select_chain, @@ -477,7 +779,7 @@ pub fn start_node( keystore: keystore_container.keystore(), sync_oracle: sync_service.clone(), justification_sync_link: sync_service.clone(), - block_proposal_slot_portion: sc_consensus_aura::SlotProportion::new(2f32 / 3f32), + block_proposal_slot_portion: SlotProportion::new(2f32 / 3f32), max_block_proposal_slot_portion: None, telemetry: telemetry.as_ref().map(|x| x.handle()), compatibility_mode: Default::default(), diff --git a/bin/collator/src/parachain/mod.rs b/bin/collator/src/parachain/mod.rs index 101be82796..077a99f1de 100644 --- a/bin/collator/src/parachain/mod.rs +++ b/bin/collator/src/parachain/mod.rs @@ -28,8 +28,8 @@ pub mod service; pub mod chain_spec; pub use service::{ - astar, build_import_queue, build_import_queue_fallback, new_partial, shibuya, shiden, - start_astar_node, start_shibuya_node, start_shiden_node, HostFunctions, + astar, build_import_queue, new_partial, shibuya, shiden, start_astar_node, start_shibuya_node, + start_shiden_node, HostFunctions, }; pub(crate) use shell_upgrade::{ diff --git a/bin/collator/src/parachain/service.rs b/bin/collator/src/parachain/service.rs index 191dee6865..72e2bfd30e 100644 --- a/bin/collator/src/parachain/service.rs +++ b/bin/collator/src/parachain/service.rs @@ -20,12 +20,14 @@ use astar_primitives::*; use cumulus_client_cli::CollatorOptions; -use cumulus_client_consensus_aura::collators::basic as basic_aura; -use cumulus_client_consensus_common::ParachainBlockImport; +#[allow(deprecated)] +use cumulus_client_consensus_aura::{AuraConsensus, BuildAuraConsensusParams, SlotProportion}; +use cumulus_client_consensus_common::{ParachainBlockImport, ParachainConsensus}; use cumulus_client_consensus_relay_chain::Verifier as RelayChainVerifier; +#[allow(deprecated)] use cumulus_client_service::{ - prepare_node_config, start_relay_chain_tasks, BuildNetworkParams, DARecoveryProfile, - StartRelayChainTasksParams, + prepare_node_config, start_collator, start_relay_chain_tasks, BuildNetworkParams, + DARecoveryProfile, StartCollatorParams, StartRelayChainTasksParams, }; use cumulus_primitives_core::ParaId; use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain; @@ -33,30 +35,29 @@ use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node_with_rpc; use fc_consensus::FrontierBlockImport; use fc_rpc_core::types::{FeeHistoryCache, FilterPool}; -use futures::StreamExt; +use futures::{lock::Mutex, StreamExt}; use polkadot_service::CollatorPair; use sc_client_api::BlockchainEvents; use sc_consensus::{import_queue::BasicQueue, ImportQueue}; use sc_executor::NativeElseWasmExecutor; use sc_network::NetworkBlock; +use sc_network_sync::SyncingService; use sc_service::{Configuration, PartialComponents, TFullBackend, TFullClient, TaskManager}; use sc_telemetry::{Telemetry, TelemetryHandle, TelemetryWorker, TelemetryWorkerHandle}; -use sp_api::{ConstructRuntimeApi, ProvideRuntimeApi}; -use sp_consensus_aura::{ - sr25519::AuthorityId as AuraId, sr25519::AuthorityPair as AuraPair, AuraApi, -}; -use sp_core::traits::SpawnEssentialNamed; +use sp_api::ConstructRuntimeApi; +use sp_consensus_aura::{sr25519::AuthorityId as AuraId, AuraApi}; +use sp_keystore::KeystorePtr; use sp_runtime::traits::BlakeTwo256; use sp_runtime::Percent; use std::{collections::BTreeMap, sync::Arc, time::Duration}; +use substrate_prometheus_endpoint::Registry; use super::shell_upgrade::*; +#[cfg(feature = "evm-tracing")] +use crate::evm_tracing_types::{EthApi as EthApiCmd, EvmTracingConfig}; #[cfg(feature = "evm-tracing")] -use crate::{ - evm_tracing_types::{EthApi as EthApiCmd, EvmTracingConfig}, - rpc::tracing, -}; +use crate::rpc::tracing; /// Extra host functions pub type HostFunctions = ( @@ -289,13 +290,14 @@ async fn build_relay_chain_interface( /// This is the actual implementation that is abstract over the executor and the runtime api. #[cfg(not(feature = "evm-tracing"))] #[sc_tracing::logging::prefix_logs_with("Parachain")] -async fn start_node_impl( +async fn start_node_impl( parachain_config: Configuration, polkadot_config: Configuration, collator_options: CollatorOptions, id: ParaId, additional_config: AdditionalConfig, build_import_queue: BIQ, + build_consensus: BIC, ) -> sc_service::error::Result<( TaskManager, Arc>>, @@ -334,6 +336,31 @@ where Option, &TaskManager, ) -> Result, sc_service::Error>, + BIC: FnOnce( + Arc>>, + ParachainBlockImport< + Block, + FrontierBlockImport< + Block, + Arc>>, + TFullClient>, + >, + TFullBackend, + >, + Option<&Registry>, + Option, + &TaskManager, + Arc, + Arc< + sc_transaction_pool::FullPool< + Block, + TFullClient>, + >, + >, + Arc>, + KeystorePtr, + bool, + ) -> Result>, sc_service::Error>, { let parachain_config = prepare_node_config(parachain_config); @@ -356,6 +383,7 @@ where .await .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?; + let force_authoring = parachain_config.force_authoring; let is_authority = parachain_config.role.is_authority(); let prometheus_registry = parachain_config.prometheus_registry().cloned(); let transaction_pool = params.transaction_pool.clone(); @@ -500,132 +528,59 @@ where .overseer_handle() .map_err(|e| sc_service::Error::Application(Box::new(e)))?; - start_relay_chain_tasks(StartRelayChainTasksParams { - client: client.clone(), - announce_block: announce_block.clone(), - task_manager: &mut task_manager, - para_id: id, - relay_chain_interface: relay_chain_interface.clone(), - relay_chain_slot_duration, - import_queue: import_queue_service, - recovery_handle: Box::new(overseer_handle.clone()), - sync_service: sync_service.clone(), - da_recovery_profile: if is_authority { - DARecoveryProfile::Collator - } else { - DARecoveryProfile::FullNode - }, - })?; - - start_network.start_network(); - if is_authority { - let mut proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording( - task_manager.spawn_handle(), + let parachain_consensus = build_consensus( client.clone(), - transaction_pool, + parachain_block_import, prometheus_registry.as_ref(), telemetry.as_ref().map(|t| t.handle()), - ); - - proposer_factory.set_default_block_size_limit(additional_config.proposer_block_size_limit); - proposer_factory.set_soft_deadline(Percent::from_percent( - additional_config.proposer_soft_deadline_percent, - )); - - let overseer_handle = relay_chain_interface - .overseer_handle() - .map_err(|e| sc_service::Error::Application(Box::new(e)))?; - - let collator_key = collator_key.expect("Command line arguments do not allow this. qed"); + &task_manager, + relay_chain_interface.clone(), + transaction_pool, + sync_service.clone(), + params.keystore_container.keystore(), + force_authoring, + )?; let spawner = task_manager.spawn_handle(); - let client_ = client.clone(); - - let collation_future = Box::pin(async move { - use parity_scale_codec::Decode; - use sp_api::ApiExt; - use sp_runtime::traits::Block as BlockT; - let client = client_.clone(); - - // Start collating with the `shell` runtime while waiting for an upgrade to an Aura - // compatible runtime. - let mut request_stream = cumulus_client_collator::relay_chain_driven::init( - collator_key.clone(), - id, - overseer_handle.clone(), - ) - .await; - - while let Some(request) = request_stream.next().await { - let pvd = request.persisted_validation_data().clone(); - let last_head_hash = - match ::Header::decode(&mut &pvd.parent_head.0[..]) { - Ok(header) => header.hash(), - Err(e) => { - log::error!("Could not decode the head data: {e}"); - request.complete(None); - continue; - } - }; - - // Check if we have upgraded to an Aura compatible runtime and transition if - // necessary. - if client - .runtime_api() - .has_api::>(last_head_hash) - .unwrap_or_default() - { - // Respond to this request before transitioning to Aura. - request.complete(None); - break; - } - } - - // Move to Aura consensus. - let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client) - .expect("aura is present; qed"); - - let announce_block = { - let sync_service = sync_service.clone(); - Arc::new(move |hash, data| sync_service.announce_block(hash, data)) - }; - - let collator_service = cumulus_client_collator::service::CollatorService::new( - client.clone(), - Arc::new(spawner), - announce_block, - client.clone(), - ); - - let params = basic_aura::Params { - create_inherent_data_providers: move |_, ()| async move { Ok(()) }, - block_import: parachain_block_import.clone(), - para_client: client.clone(), - relay_client: relay_chain_interface.clone(), - sync_oracle: sync_service.clone(), - keystore: params.keystore_container.keystore(), - collator_key, - para_id: id, - overseer_handle, - slot_duration, - relay_chain_slot_duration: Duration::from_secs(6), - proposer: cumulus_client_consensus_proposer::Proposer::new(proposer_factory), - collator_service, - // We got around 500ms for proposing - authoring_duration: Duration::from_millis(500), - collation_request_receiver: None, - }; + let params = StartCollatorParams { + para_id: id, + block_status: client.clone(), + announce_block, + client: client.clone(), + task_manager: &mut task_manager, + relay_chain_interface: relay_chain_interface.clone(), + spawner, + parachain_consensus, + import_queue: import_queue_service, + collator_key: collator_key.expect("Command line arguments do not allow this. qed"), + relay_chain_slot_duration, + recovery_handle: Box::new(overseer_handle), + sync_service, + }; - basic_aura::run::(params).await - }); + #[allow(deprecated)] + start_collator(params).await?; + } else { + let params = StartRelayChainTasksParams { + client: client.clone(), + announce_block, + task_manager: &mut task_manager, + para_id: id, + relay_chain_interface, + relay_chain_slot_duration, + import_queue: import_queue_service, + recovery_handle: Box::new(overseer_handle), + sync_service, + da_recovery_profile: DARecoveryProfile::FullNode, + }; - task_manager - .spawn_essential_handle() - .spawn_essential("aura", None, collation_future); + start_relay_chain_tasks(params)?; } + start_network.start_network(); + Ok((task_manager, client)) } @@ -651,13 +606,14 @@ pub struct AdditionalConfig { /// This is the actual implementation that is abstract over the executor and the runtime api. #[cfg(feature = "evm-tracing")] #[sc_tracing::logging::prefix_logs_with("Parachain")] -async fn start_node_impl( +async fn start_node_impl( parachain_config: Configuration, polkadot_config: Configuration, collator_options: CollatorOptions, id: ParaId, additional_config: AdditionalConfig, build_import_queue: BIQ, + build_consensus: BIC, ) -> sc_service::error::Result<( TaskManager, Arc>>, @@ -698,6 +654,31 @@ where Option, &TaskManager, ) -> Result, sc_service::Error>, + BIC: FnOnce( + Arc>>, + ParachainBlockImport< + Block, + FrontierBlockImport< + Block, + Arc>>, + TFullClient>, + >, + TFullBackend, + >, + Option<&Registry>, + Option, + &TaskManager, + Arc, + Arc< + sc_transaction_pool::FullPool< + Block, + TFullClient>, + >, + >, + Arc>, + KeystorePtr, + bool, + ) -> Result>, sc_service::Error>, { let parachain_config = prepare_node_config(parachain_config); @@ -720,6 +701,7 @@ where .await .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?; + let force_authoring = parachain_config.force_authoring; let is_authority = parachain_config.role.is_authority(); let prometheus_registry = parachain_config.prometheus_registry().cloned(); let transaction_pool = params.transaction_pool.clone(); @@ -895,138 +877,63 @@ where .overseer_handle() .map_err(|e| sc_service::Error::Application(Box::new(e)))?; - start_relay_chain_tasks(StartRelayChainTasksParams { - client: client.clone(), - announce_block: announce_block.clone(), - task_manager: &mut task_manager, - para_id: id, - relay_chain_interface: relay_chain_interface.clone(), - relay_chain_slot_duration, - import_queue: import_queue_service, - recovery_handle: Box::new(overseer_handle.clone()), - sync_service: sync_service.clone(), - da_recovery_profile: if is_authority { - DARecoveryProfile::Collator - } else { - DARecoveryProfile::FullNode - }, - })?; - - start_network.start_network(); - if is_authority { - let mut proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording( - task_manager.spawn_handle(), + let parachain_consensus = build_consensus( client.clone(), - transaction_pool, + parachain_block_import, prometheus_registry.as_ref(), telemetry.as_ref().map(|t| t.handle()), - ); - - proposer_factory.set_default_block_size_limit(additional_config.proposer_block_size_limit); - proposer_factory.set_soft_deadline(Percent::from_percent( - additional_config.proposer_soft_deadline_percent, - )); - - let overseer_handle = relay_chain_interface - .overseer_handle() - .map_err(|e| sc_service::Error::Application(Box::new(e)))?; - - let collator_key = collator_key.expect("Command line arguments do not allow this. qed"); + &task_manager, + relay_chain_interface.clone(), + transaction_pool, + sync_service.clone(), + params.keystore_container.keystore(), + force_authoring, + )?; let spawner = task_manager.spawn_handle(); - let client_ = client.clone(); - - let collation_future = Box::pin(async move { - use parity_scale_codec::Decode; - use sp_api::ApiExt; - use sp_runtime::traits::Block as BlockT; - - let client = client_.clone(); - - // Start collating with the `shell` runtime while waiting for an upgrade to an Aura - // compatible runtime. - let mut request_stream = cumulus_client_collator::relay_chain_driven::init( - collator_key.clone(), - id, - overseer_handle.clone(), - ) - .await; - - while let Some(request) = request_stream.next().await { - let pvd = request.persisted_validation_data().clone(); - let last_head_hash = - match ::Header::decode(&mut &pvd.parent_head.0[..]) { - Ok(header) => header.hash(), - Err(e) => { - log::error!("Could not decode the head data: {e}"); - request.complete(None); - continue; - } - }; - - // Check if we have upgraded to an Aura compatible runtime and transition if - // necessary. - if client - .runtime_api() - .has_api::>(last_head_hash) - .unwrap_or_default() - { - // Respond to this request before transitioning to Aura. - request.complete(None); - break; - } - } - - // Move to Aura consensus. - let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client) - .expect("aura is present; qed"); - let announce_block = { - let sync_service = sync_service.clone(); - Arc::new(move |hash, data| sync_service.announce_block(hash, data)) - }; - - let collator_service = cumulus_client_collator::service::CollatorService::new( - client.clone(), - Arc::new(spawner), - announce_block, - client.clone(), - ); - - let params = basic_aura::Params { - create_inherent_data_providers: move |_, ()| async move { Ok(()) }, - block_import: parachain_block_import.clone(), - para_client: client.clone(), - relay_client: relay_chain_interface.clone(), - sync_oracle: sync_service.clone(), - keystore: params.keystore_container.keystore(), - collator_key, - para_id: id, - overseer_handle, - slot_duration, - relay_chain_slot_duration: Duration::from_secs(6), - proposer: cumulus_client_consensus_proposer::Proposer::new(proposer_factory), - collator_service, - // We got around 500ms for proposing - authoring_duration: Duration::from_millis(500), - collation_request_receiver: None, - }; - - basic_aura::run::(params).await - }); + let params = StartCollatorParams { + para_id: id, + block_status: client.clone(), + announce_block, + client: client.clone(), + task_manager: &mut task_manager, + relay_chain_interface: relay_chain_interface.clone(), + spawner, + parachain_consensus, + import_queue: import_queue_service, + collator_key: collator_key.expect("Command line arguments do not allow this. qed"), + relay_chain_slot_duration, + recovery_handle: Box::new(overseer_handle), + sync_service, + }; + #[allow(deprecated)] + start_collator(params).await?; + } else { + let params = StartRelayChainTasksParams { + client: client.clone(), + announce_block, + task_manager: &mut task_manager, + para_id: id, + relay_chain_interface, + relay_chain_slot_duration, + import_queue: import_queue_service, + recovery_handle: Box::new(overseer_handle), + sync_service, + da_recovery_profile: DARecoveryProfile::FullNode, + }; - task_manager - .spawn_essential_handle() - .spawn_essential("aura", None, collation_future); + start_relay_chain_tasks(params)?; } + start_network.start_network(); + Ok((task_manager, client)) } -/// Build aura import queue with fallback to relay-chain verifier. -/// Starts with relay-chain verifier until aura becomes available. -pub fn build_import_queue_fallback( +/// Build the import queue. +pub fn build_import_queue( client: Arc>>, block_import: ParachainBlockImport< Block, @@ -1053,7 +960,7 @@ where + sp_offchain::OffchainWorkerApi + sp_block_builder::BlockBuilder + fp_rpc::EthereumRuntimeRPCApi - + AuraApi, + + sp_consensus_aura::AuraApi, sc_client_api::StateBackendFor, Block>: sp_api::StateBackend, Executor: sc_executor::NativeExecutionDispatch + 'static, { @@ -1063,7 +970,7 @@ where let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client2).unwrap(); Box::new(cumulus_client_consensus_aura::build_verifier::< - AuraPair, + sp_consensus_aura::sr25519::AuthorityPair, _, _, _, @@ -1108,68 +1015,143 @@ where )) } -/// Build aura only import queue. -pub fn build_import_queue( - client: Arc>>, - block_import: ParachainBlockImport< - Block, - FrontierBlockImport< - Block, - Arc>>, - TFullClient>, - >, - TFullBackend, - >, - config: &Configuration, - telemetry_handle: Option, - task_manager: &TaskManager, -) -> Result, sc_service::Error> -where - RuntimeApi: ConstructRuntimeApi>> - + Send - + Sync - + 'static, - RuntimeApi::RuntimeApi: sp_transaction_pool::runtime_api::TaggedTransactionQueue - + sp_api::Metadata - + sp_session::SessionKeys - + sp_api::ApiExt - + sp_offchain::OffchainWorkerApi - + sp_block_builder::BlockBuilder - + fp_rpc::EthereumRuntimeRPCApi - + AuraApi, - sc_client_api::StateBackendFor, Block>: sp_api::StateBackend, - Executor: sc_executor::NativeExecutionDispatch + 'static, -{ - let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client)?; - - cumulus_client_consensus_aura::import_queue::< - AuraPair, - _, - _, - _, - _, - _, - >(cumulus_client_consensus_aura::ImportQueueParams { - block_import, - client, - create_inherent_data_providers: move |_, _| async move { - let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); +/// Start a parachain node for Astar. +#[cfg(feature = "evm-tracing")] +pub async fn start_astar_node( + parachain_config: Configuration, + polkadot_config: Configuration, + collator_options: CollatorOptions, + id: ParaId, + additional_config: AdditionalConfig, +) -> sc_service::error::Result<( + TaskManager, + Arc>>, +)> { + start_node_impl::( + parachain_config, + polkadot_config, + collator_options, + id, + additional_config.clone(), + |client, + block_import, + config, + telemetry, + task_manager| { + let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client)?; + + cumulus_client_consensus_aura::import_queue::< + sp_consensus_aura::sr25519::AuthorityPair, + _, + _, + _, + _, + _, + >(cumulus_client_consensus_aura::ImportQueueParams { + block_import, + client, + create_inherent_data_providers: move |_, _| async move { + let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); - let slot = - sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( - *timestamp, - slot_duration, - ); + let slot = + sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( + *timestamp, + slot_duration, + ); - Ok((slot, timestamp)) + Ok((slot, timestamp)) + }, + registry: config.prometheus_registry(), + spawner: &task_manager.spawn_essential_handle(), + telemetry, + }) + .map_err(Into::into) }, - registry: config.prometheus_registry(), - spawner: &task_manager.spawn_essential_handle(), - telemetry: telemetry_handle, - }).map_err(Into::into) + |client, + block_import, + prometheus_registry, + telemetry, + task_manager, + relay_chain_interface, + transaction_pool, + sync_oracle, + keystore, + force_authoring| { + let spawn_handle = task_manager.spawn_handle(); + + let slot_duration = + cumulus_client_consensus_aura::slot_duration(&*client).unwrap(); + + let mut proposer_factory = + sc_basic_authorship::ProposerFactory::with_proof_recording( + spawn_handle, + client.clone(), + transaction_pool, + prometheus_registry, + telemetry.clone(), + ); + + proposer_factory.set_default_block_size_limit(additional_config.proposer_block_size_limit); + proposer_factory.set_soft_deadline(Percent::from_percent(additional_config.proposer_soft_deadline_percent)); + + let relay_chain_for_aura = relay_chain_interface.clone(); + + #[allow(deprecated)] + Ok(AuraConsensus::build::< + sp_consensus_aura::sr25519::AuthorityPair, + _, + _, + _, + _, + _, + _, + >(BuildAuraConsensusParams { + proposer_factory, + create_inherent_data_providers: + move |_, (relay_parent, validation_data)| { + let relay_chain_for_aura = relay_chain_for_aura.clone(); + async move { + let parachain_inherent = + cumulus_primitives_parachain_inherent::ParachainInherentData::create_at( + relay_parent, + &relay_chain_for_aura, + &validation_data, + id, + ).await; + let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); + let slot = + sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( + *timestamp, + slot_duration, + ); + + let parachain_inherent = parachain_inherent.ok_or_else(|| { + Box::::from( + "Failed to create parachain inherent", + ) + })?; + Ok((slot, timestamp, parachain_inherent)) + } + }, + block_import: block_import, + para_client: client, + backoff_authoring_blocks: Option::<()>::None, + sync_oracle, + keystore, + force_authoring, + slot_duration, + // We got around 500ms for proposing + block_proposal_slot_portion: SlotProportion::new(1f32 / 24f32), + // And a maximum of 750ms if slots are skipped + max_block_proposal_slot_portion: Some(SlotProportion::new(1f32 / 16f32)), + telemetry, + }) + ) + }).await } /// Start a parachain node for Astar. +#[cfg(not(feature = "evm-tracing"))] pub async fn start_astar_node( parachain_config: Configuration, polkadot_config: Configuration, @@ -1180,18 +1162,295 @@ pub async fn start_astar_node( TaskManager, Arc>>, )> { - start_node_impl::( + start_node_impl::( + parachain_config, + polkadot_config, + collator_options, + id, + additional_config.clone(), + |client, + block_import, + config, + telemetry, + task_manager| { + let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client)?; + + cumulus_client_consensus_aura::import_queue::< + sp_consensus_aura::sr25519::AuthorityPair, + _, + _, + _, + _, + _, + >(cumulus_client_consensus_aura::ImportQueueParams { + block_import, + client, + create_inherent_data_providers: move |_, _| async move { + let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); + + let slot = + sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( + *timestamp, + slot_duration, + ); + + Ok((slot, timestamp)) + }, + registry: config.prometheus_registry(), + spawner: &task_manager.spawn_essential_handle(), + telemetry, + }) + .map_err(Into::into) + }, + |client, + block_import, + prometheus_registry, + telemetry, + task_manager, + relay_chain_interface, + transaction_pool, + sync_service, + keystore, + force_authoring| { + let spawn_handle = task_manager.spawn_handle(); + + let slot_duration = + cumulus_client_consensus_aura::slot_duration(&*client).unwrap(); + + let mut proposer_factory = + sc_basic_authorship::ProposerFactory::with_proof_recording( + spawn_handle, + client.clone(), + transaction_pool, + prometheus_registry, + telemetry.clone(), + ); + + proposer_factory.set_default_block_size_limit(additional_config.proposer_block_size_limit); + proposer_factory.set_soft_deadline(Percent::from_percent(additional_config.proposer_soft_deadline_percent)); + + let relay_chain_for_aura = relay_chain_interface.clone(); + + #[allow(deprecated)] + Ok(AuraConsensus::build::< + sp_consensus_aura::sr25519::AuthorityPair, + _, + _, + _, + _, + _, + _, + >(BuildAuraConsensusParams { + proposer_factory, + create_inherent_data_providers: + move |_, (relay_parent, validation_data)| { + let relay_chain_for_aura = relay_chain_for_aura.clone(); + async move { + let parachain_inherent = + cumulus_primitives_parachain_inherent::ParachainInherentData::create_at( + relay_parent, + &relay_chain_for_aura, + &validation_data, + id, + ).await; + let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); + let slot = + sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( + *timestamp, + slot_duration, + ); + + let parachain_inherent = parachain_inherent.ok_or_else(|| { + Box::::from( + "Failed to create parachain inherent", + ) + })?; + Ok((slot, timestamp, parachain_inherent)) + } + }, + block_import, + para_client: client, + backoff_authoring_blocks: Option::<()>::None, + keystore, + force_authoring, + slot_duration, + sync_oracle: sync_service.clone(), + // We got around 500ms for proposing + block_proposal_slot_portion: SlotProportion::new(1f32 / 24f32), + // And a maximum of 750ms if slots are skipped + max_block_proposal_slot_portion: Some(SlotProportion::new(1f32 / 16f32)), + telemetry, + }) + ) + }).await +} + +/// Start a parachain node for Shiden. +#[cfg(feature = "evm-tracing")] +pub async fn start_shiden_node( + parachain_config: Configuration, + polkadot_config: Configuration, + collator_options: CollatorOptions, + id: ParaId, + additional_config: AdditionalConfig, +) -> sc_service::error::Result<( + TaskManager, + Arc>>, +)> { + start_node_impl::( parachain_config, polkadot_config, collator_options, id, additional_config.clone(), build_import_queue, - ) - .await + |client, + block_import, + prometheus_registry, + telemetry, + task_manager, + relay_chain_interface, + transaction_pool, + sync_oracle, + keystore, + force_authoring| { + let client2 = client.clone(); + let spawn_handle = task_manager.spawn_handle(); + let transaction_pool2 = transaction_pool.clone(); + let telemetry2 = telemetry.clone(); + let prometheus_registry2 = prometheus_registry.map(|r| (*r).clone()); + let relay_chain_for_aura = relay_chain_interface.clone(); + let block_import2 = block_import.clone(); + let sync_oracle2 = sync_oracle.clone(); + let keystore2 = keystore.clone(); + + let aura_consensus = BuildOnAccess::Uninitialized(Some( + Box::new(move || { + let slot_duration = + cumulus_client_consensus_aura::slot_duration(&*client2).unwrap(); + + let mut proposer_factory = + sc_basic_authorship::ProposerFactory::with_proof_recording( + spawn_handle, + client2.clone(), + transaction_pool2, + prometheus_registry2.as_ref(), + telemetry2.clone(), + ); + + proposer_factory.set_default_block_size_limit(additional_config.proposer_block_size_limit); + proposer_factory.set_soft_deadline(Percent::from_percent(additional_config.proposer_soft_deadline_percent)); + + #[allow(deprecated)] + AuraConsensus::build::< + sp_consensus_aura::sr25519::AuthorityPair, + _, + _, + _, + _, + _, + _, + >(BuildAuraConsensusParams { + proposer_factory, + create_inherent_data_providers: + move |_, (relay_parent, validation_data)| { + let relay_chain_for_aura = relay_chain_for_aura.clone(); + async move { + let parachain_inherent = + cumulus_primitives_parachain_inherent::ParachainInherentData::create_at( + relay_parent, + &relay_chain_for_aura, + &validation_data, + id, + ).await; + let timestamp = + sp_timestamp::InherentDataProvider::from_system_time(); + + let slot = + sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( + *timestamp, + slot_duration, + ); + + let parachain_inherent = + parachain_inherent.ok_or_else(|| { + Box::::from( + "Failed to create parachain inherent", + ) + })?; + Ok((slot, timestamp, parachain_inherent)) + } + }, + block_import: block_import2.clone(), + para_client: client2.clone(), + backoff_authoring_blocks: Option::<()>::None, + sync_oracle: sync_oracle2, + keystore: keystore2, + force_authoring, + slot_duration, + // We got around 500ms for proposing + block_proposal_slot_portion: SlotProportion::new(1f32 / 24f32), + // And a maximum of 750ms if slots are skipped + max_block_proposal_slot_portion: Some(SlotProportion::new(1f32 / 16f32)), + telemetry: telemetry2, + }) + }), + )); + + let mut proposer_factory = + sc_basic_authorship::ProposerFactory::with_proof_recording( + task_manager.spawn_handle(), + client.clone(), + transaction_pool, + prometheus_registry, + telemetry.clone(), + ); + + proposer_factory.set_default_block_size_limit(additional_config.proposer_block_size_limit); + proposer_factory.set_soft_deadline(Percent::from_percent(additional_config.proposer_soft_deadline_percent)); + + let relay_chain_consensus = + cumulus_client_consensus_relay_chain::build_relay_chain_consensus( + cumulus_client_consensus_relay_chain::BuildRelayChainConsensusParams { + para_id: id, + proposer_factory, + block_import: block_import, //client.clone(), + relay_chain_interface: relay_chain_interface.clone(), + create_inherent_data_providers: + move |_, (relay_parent, validation_data)| { + let relay_chain_for_aura = relay_chain_interface.clone(); + async move { + let parachain_inherent = + cumulus_primitives_parachain_inherent::ParachainInherentData::create_at( + relay_parent, + &relay_chain_for_aura, + &validation_data, + id, + ).await; + let parachain_inherent = + parachain_inherent.ok_or_else(|| { + Box::::from( + "Failed to create parachain inherent", + ) + })?; + Ok(parachain_inherent) + } + }, + }, + ); + + let parachain_consensus = Box::new(WaitForAuraConsensus { + client, + aura_consensus: Arc::new(Mutex::new(aura_consensus)), + relay_chain_consensus: Arc::new(Mutex::new(relay_chain_consensus)), + }); + + Ok(parachain_consensus) + }).await } /// Start a parachain node for Shiden. +#[cfg(not(feature = "evm-tracing"))] pub async fn start_shiden_node( parachain_config: Configuration, polkadot_config: Configuration, @@ -1202,18 +1461,159 @@ pub async fn start_shiden_node( TaskManager, Arc>>, )> { - start_node_impl::( + start_node_impl::( parachain_config, polkadot_config, collator_options, id, additional_config.clone(), - build_import_queue_fallback, - ) - .await + build_import_queue, + |client, + block_import, + prometheus_registry, + telemetry, + task_manager, + relay_chain_interface, + transaction_pool, + sync_oracle, + keystore, + force_authoring| { + let client2 = client.clone(); + let spawn_handle = task_manager.spawn_handle(); + let transaction_pool2 = transaction_pool.clone(); + let telemetry2 = telemetry.clone(); + let prometheus_registry2 = prometheus_registry.map(|r| (*r).clone()); + let relay_chain_for_aura = relay_chain_interface.clone(); + let block_import2 = block_import.clone(); + let sync_oracle2 = sync_oracle.clone(); + let keystore2 = keystore.clone(); + let aura_consensus = BuildOnAccess::Uninitialized(Some( + Box::new(move || { + let slot_duration = + cumulus_client_consensus_aura::slot_duration(&*client2).unwrap(); + + let mut proposer_factory = + sc_basic_authorship::ProposerFactory::with_proof_recording( + spawn_handle, + client2.clone(), + transaction_pool2, + prometheus_registry2.as_ref(), + telemetry2.clone(), + ); + + proposer_factory.set_default_block_size_limit(additional_config.proposer_block_size_limit); + proposer_factory.set_soft_deadline(Percent::from_percent(additional_config.proposer_soft_deadline_percent)); + + #[allow(deprecated)] + AuraConsensus::build::< + sp_consensus_aura::sr25519::AuthorityPair, + _, + _, + _, + _, + _, + _, + >(BuildAuraConsensusParams { + proposer_factory, + create_inherent_data_providers: + move |_, (relay_parent, validation_data)| { + let relay_chain_for_aura = relay_chain_for_aura.clone(); + async move { + let parachain_inherent = + cumulus_primitives_parachain_inherent::ParachainInherentData::create_at( + relay_parent, + &relay_chain_for_aura, + &validation_data, + id, + ).await; + let timestamp = + sp_timestamp::InherentDataProvider::from_system_time(); + + let slot = + sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( + *timestamp, + slot_duration, + ); + + let parachain_inherent = + parachain_inherent.ok_or_else(|| { + Box::::from( + "Failed to create parachain inherent", + ) + })?; + Ok((slot, timestamp, parachain_inherent)) + } + }, + block_import: block_import2.clone(), + para_client: client2.clone(), + backoff_authoring_blocks: Option::<()>::None, + sync_oracle: sync_oracle2, + keystore: keystore2, + force_authoring, + slot_duration, + // We got around 500ms for proposing + block_proposal_slot_portion: SlotProportion::new(1f32 / 24f32), + // And a maximum of 750ms if slots are skipped + max_block_proposal_slot_portion: Some(SlotProportion::new(1f32 / 16f32)), + telemetry: telemetry2, + }) + }), + )); + + let mut proposer_factory = + sc_basic_authorship::ProposerFactory::with_proof_recording( + task_manager.spawn_handle(), + client.clone(), + transaction_pool, + prometheus_registry, + telemetry.clone(), + ); + + proposer_factory.set_default_block_size_limit(additional_config.proposer_block_size_limit); + proposer_factory.set_soft_deadline(Percent::from_percent(additional_config.proposer_soft_deadline_percent)); + + let relay_chain_consensus = + cumulus_client_consensus_relay_chain::build_relay_chain_consensus( + cumulus_client_consensus_relay_chain::BuildRelayChainConsensusParams { + para_id: id, + proposer_factory, + block_import: block_import, //client.clone(), + relay_chain_interface: relay_chain_interface.clone(), + create_inherent_data_providers: + move |_, (relay_parent, validation_data)| { + let relay_chain_for_aura = relay_chain_interface.clone(); + async move { + let parachain_inherent = + cumulus_primitives_parachain_inherent::ParachainInherentData::create_at( + relay_parent, + &relay_chain_for_aura, + &validation_data, + id, + ).await; + let parachain_inherent = + parachain_inherent.ok_or_else(|| { + Box::::from( + "Failed to create parachain inherent", + ) + })?; + Ok(parachain_inherent) + } + }, + }, + ); + + let parachain_consensus = Box::new(WaitForAuraConsensus { + client, + aura_consensus: Arc::new(Mutex::new(aura_consensus)), + relay_chain_consensus: Arc::new(Mutex::new(relay_chain_consensus)), + }); + + Ok(parachain_consensus) + }).await } /// Start a parachain node for Shibuya. +#[cfg(feature = "evm-tracing")] pub async fn start_shibuya_node( parachain_config: Configuration, polkadot_config: Configuration, @@ -1224,13 +1624,256 @@ pub async fn start_shibuya_node( TaskManager, Arc>>, )> { - start_node_impl::( + start_node_impl::( parachain_config, polkadot_config, collator_options, id, additional_config.clone(), - build_import_queue, - ) - .await + |client, + block_import, + config, + telemetry, + task_manager| { + let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client)?; + + cumulus_client_consensus_aura::import_queue::< + sp_consensus_aura::sr25519::AuthorityPair, + _, + _, + _, + _, + _, + >(cumulus_client_consensus_aura::ImportQueueParams { + block_import, + client, + create_inherent_data_providers: move |_, _| async move { + let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); + + let slot = + sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( + *timestamp, + slot_duration, + ); + + Ok((slot, timestamp)) + }, + registry: config.prometheus_registry(), + spawner: &task_manager.spawn_essential_handle(), + telemetry, + }) + .map_err(Into::into) + }, + |client, + block_import, + prometheus_registry, + telemetry, + task_manager, + relay_chain_interface, + transaction_pool, + sync_oracle, + keystore, + force_authoring| { + let spawn_handle = task_manager.spawn_handle(); + + let slot_duration = + cumulus_client_consensus_aura::slot_duration(&*client).unwrap(); + + let mut proposer_factory = + sc_basic_authorship::ProposerFactory::with_proof_recording( + spawn_handle, + client.clone(), + transaction_pool, + prometheus_registry, + telemetry.clone(), + ); + + proposer_factory.set_default_block_size_limit(additional_config.proposer_block_size_limit); + proposer_factory.set_soft_deadline(Percent::from_percent(additional_config.proposer_soft_deadline_percent)); + + #[allow(deprecated)] + Ok(AuraConsensus::build::< + sp_consensus_aura::sr25519::AuthorityPair, + _, + _, + _, + _, + _, + _, + >(BuildAuraConsensusParams { + proposer_factory, + create_inherent_data_providers: + move |_, (relay_parent, validation_data)| { + let relay_chain_for_aura = relay_chain_interface.clone(); + async move { + let parachain_inherent = + cumulus_primitives_parachain_inherent::ParachainInherentData::create_at( + relay_parent, + &relay_chain_for_aura, + &validation_data, + id, + ).await; + let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); + let slot = + sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( + *timestamp, + slot_duration, + ); + + let parachain_inherent = parachain_inherent.ok_or_else(|| { + Box::::from( + "Failed to create parachain inherent", + ) + })?; + Ok((slot, timestamp, parachain_inherent)) + } + }, + block_import: block_import, + para_client: client, + backoff_authoring_blocks: Option::<()>::None, + sync_oracle, + keystore, + force_authoring, + slot_duration, + // We got around 500ms for proposing + block_proposal_slot_portion: SlotProportion::new(1f32 / 24f32), + // And a maximum of 750ms if slots are skipped + max_block_proposal_slot_portion: Some(SlotProportion::new(1f32 / 16f32)), + telemetry, + }) + ) + }).await +} + +/// Start a parachain node for Shibuya. +#[cfg(not(feature = "evm-tracing"))] +pub async fn start_shibuya_node( + parachain_config: Configuration, + polkadot_config: Configuration, + collator_options: CollatorOptions, + id: ParaId, + additional_config: AdditionalConfig, +) -> sc_service::error::Result<( + TaskManager, + Arc>>, +)> { + start_node_impl::( + parachain_config, + polkadot_config, + collator_options, + id, + additional_config.clone(), + |client, + block_import, + config, + telemetry, + task_manager| { + let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client)?; + + cumulus_client_consensus_aura::import_queue::< + sp_consensus_aura::sr25519::AuthorityPair, + _, + _, + _, + _, + _, + >(cumulus_client_consensus_aura::ImportQueueParams { + block_import, + client, + create_inherent_data_providers: move |_, _| async move { + let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); + + let slot = + sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( + *timestamp, + slot_duration, + ); + + Ok((slot, timestamp)) + }, + registry: config.prometheus_registry(), + spawner: &task_manager.spawn_essential_handle(), + telemetry, + }) + .map_err(Into::into) + }, + |client, + block_import, + prometheus_registry, + telemetry, + task_manager, + relay_chain_interface, + transaction_pool, + sync_oracle, + keystore, + force_authoring| { + let spawn_handle = task_manager.spawn_handle(); + + let slot_duration = + cumulus_client_consensus_aura::slot_duration(&*client).unwrap(); + + let mut proposer_factory = + sc_basic_authorship::ProposerFactory::with_proof_recording( + spawn_handle, + client.clone(), + transaction_pool, + prometheus_registry, + telemetry.clone(), + ); + + proposer_factory.set_default_block_size_limit(additional_config.proposer_block_size_limit); + proposer_factory.set_soft_deadline(Percent::from_percent(additional_config.proposer_soft_deadline_percent)); + + #[allow(deprecated)] + Ok(AuraConsensus::build::< + sp_consensus_aura::sr25519::AuthorityPair, + _, + _, + _, + _, + _, + _, + >(BuildAuraConsensusParams { + proposer_factory, + create_inherent_data_providers: + move |_, (relay_parent, validation_data)| { + let relay_chain_for_aura = relay_chain_interface.clone(); + async move { + let parachain_inherent = + cumulus_primitives_parachain_inherent::ParachainInherentData::create_at( + relay_parent, + &relay_chain_for_aura, + &validation_data, + id, + ).await; + let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); + let slot = + sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( + *timestamp, + slot_duration, + ); + + let parachain_inherent = parachain_inherent.ok_or_else(|| { + Box::::from( + "Failed to create parachain inherent", + ) + })?; + Ok((slot, timestamp, parachain_inherent)) + } + }, + block_import: block_import, + para_client: client, + backoff_authoring_blocks: Option::<()>::None, + sync_oracle, + keystore, + force_authoring, + slot_duration, + // We got around 500ms for proposing + block_proposal_slot_portion: SlotProportion::new(1f32 / 24f32), + // And a maximum of 750ms if slots are skipped + max_block_proposal_slot_portion: Some(SlotProportion::new(1f32 / 16f32)), + telemetry, + }) + ) + }).await } diff --git a/bin/collator/src/parachain/shell_upgrade.rs b/bin/collator/src/parachain/shell_upgrade.rs index 212cd2d79f..510565bade 100644 --- a/bin/collator/src/parachain/shell_upgrade.rs +++ b/bin/collator/src/parachain/shell_upgrade.rs @@ -16,11 +16,14 @@ // You should have received a copy of the GNU General Public License // along with Astar. If not, see . -//! Utility for the upgrade from shell to a parachain runtime that implements Aura. +///! Special [`ParachainConsensus`] implementation that waits for the upgrade from +///! shell to a parachain runtime that implements Aura. use astar_primitives::*; -use cumulus_primitives_core::relay_chain::PersistedValidationData; +use cumulus_client_consensus_common::{ParachainCandidate, ParachainConsensus}; +use cumulus_primitives_core::relay_chain::{Hash as PHash, PersistedValidationData}; use cumulus_test_relay_sproof_builder::RelayStateSproofBuilder; use fc_rpc::pending::ConsensusDataProvider; +use futures::lock::Mutex; use sc_client_api::{AuxStore, UsageProvider}; use sc_consensus::{import_queue::Verifier as VerifierT, BlockImportParams, ForkChoiceStrategy}; use sp_api::{ApiExt, ProvideRuntimeApi}; @@ -55,6 +58,57 @@ impl BuildOnAccess { } } +pub struct WaitForAuraConsensus { + pub client: Arc, + pub aura_consensus: Arc>>>>, + pub relay_chain_consensus: Arc>>>, +} + +impl Clone for WaitForAuraConsensus { + fn clone(&self) -> Self { + Self { + client: self.client.clone(), + aura_consensus: self.aura_consensus.clone(), + relay_chain_consensus: self.relay_chain_consensus.clone(), + } + } +} + +#[async_trait::async_trait] +impl ParachainConsensus for WaitForAuraConsensus +where + Client: sp_api::ProvideRuntimeApi + Send + Sync, + Client::Api: AuraApi, +{ + async fn produce_candidate( + &mut self, + parent: &Header, + relay_parent: PHash, + validation_data: &PersistedValidationData, + ) -> Option> { + let block_hash = parent.hash(); + if self + .client + .runtime_api() + .has_api::>(block_hash) + .unwrap_or(false) + { + self.aura_consensus + .lock() + .await + .get_mut() + .produce_candidate(parent, relay_parent, validation_data) + .await + } else { + self.relay_chain_consensus + .lock() + .await + .produce_candidate(parent, relay_parent, validation_data) + .await + } + } +} + pub struct Verifier { pub client: Arc, pub aura_verifier: BuildOnAccess>>, @@ -64,7 +118,7 @@ pub struct Verifier { #[async_trait::async_trait] impl VerifierT for Verifier where - Client: ProvideRuntimeApi + Send + Sync, + Client: sp_api::ProvideRuntimeApi + Send + Sync, Client::Api: AuraApi, { async fn verify(