From 5477e6fe146160207b9ff317cb6dc7e75ab9c072 Mon Sep 17 00:00:00 2001 From: Ermal Kaleci Date: Tue, 18 Jun 2024 11:39:58 +0200 Subject: [PATCH] client refactor (#1263) --- Cargo.lock | 2 + Cargo.toml | 2 + bin/collator/Cargo.toml | 2 + bin/collator/src/command.rs | 28 +- bin/collator/src/local/service.rs | 394 +----- bin/collator/src/parachain/mod.rs | 4 +- bin/collator/src/parachain/service.rs | 1281 +++++-------------- bin/collator/src/parachain/shell_upgrade.rs | 60 +- 8 files changed, 387 insertions(+), 1386 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b2c6edb0a0..3b7e9331f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -630,8 +630,10 @@ dependencies = [ "async-trait", "clap", "cumulus-client-cli", + "cumulus-client-collator", "cumulus-client-consensus-aura", "cumulus-client-consensus-common", + "cumulus-client-consensus-proposer", "cumulus-client-consensus-relay-chain", "cumulus-client-network", "cumulus-client-service", diff --git a/Cargo.toml b/Cargo.toml index f4e0835d72..8a79c947de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -238,6 +238,8 @@ cumulus-client-consensus-common = { git = "https://github.com/paritytech/polkado cumulus-client-consensus-relay-chain = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } cumulus-client-network = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } cumulus-client-service = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } +cumulus-client-collator = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } +cumulus-client-consensus-proposer = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } cumulus-primitives-parachain-inherent = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } cumulus-relay-chain-inprocess-interface = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } cumulus-relay-chain-interface = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.3.0" } diff --git a/bin/collator/Cargo.toml b/bin/collator/Cargo.toml index 39fdafe2d4..fe82a8e38e 100644 --- a/bin/collator/Cargo.toml +++ b/bin/collator/Cargo.toml @@ -108,8 +108,10 @@ sc-cli = { workspace = true, optional = true } # cumulus dependencies cumulus-client-cli = { workspace = true } +cumulus-client-collator = { workspace = true } cumulus-client-consensus-aura = { workspace = true } cumulus-client-consensus-common = { workspace = true } +cumulus-client-consensus-proposer = { workspace = true } cumulus-client-consensus-relay-chain = { workspace = true } cumulus-client-network = { workspace = true } cumulus-client-service = { workspace = true } diff --git a/bin/collator/src/command.rs b/bin/collator/src/command.rs index 8e2abb6ffa..88ee884483 100644 --- a/bin/collator/src/command.rs +++ b/bin/collator/src/command.rs @@ -220,7 +220,7 @@ pub fn run() -> Result<()> { .. } = parachain::new_partial::( &config, - parachain::build_import_queue, + parachain::build_import_queue_fallback, )?; Ok((cmd.run(client, import_queue), task_manager)) }) @@ -261,7 +261,7 @@ pub fn run() -> Result<()> { .. } = parachain::new_partial::( &config, - parachain::build_import_queue, + parachain::build_import_queue_fallback, )?; Ok((cmd.run(client, config.database), task_manager)) }) @@ -301,7 +301,7 @@ pub fn run() -> Result<()> { .. } = parachain::new_partial::( &config, - parachain::build_import_queue, + parachain::build_import_queue_fallback, )?; Ok((cmd.run(client, config.chain_spec), task_manager)) }) @@ -343,7 +343,7 @@ pub fn run() -> Result<()> { .. } = parachain::new_partial::( &config, - parachain::build_import_queue, + parachain::build_import_queue_fallback, )?; Ok((cmd.run(client, import_queue), task_manager)) }) @@ -409,7 +409,7 @@ pub fn run() -> Result<()> { .. } = parachain::new_partial::( &config, - parachain::build_import_queue, + parachain::build_import_queue_fallback, )?; let aux_revert = Box::new(|client, _, blocks| { sc_consensus_grandpa::revert(client, blocks)?; @@ -452,7 +452,7 @@ pub fn run() -> Result<()> { let PartialComponents { client, .. } = parachain::new_partial::( &config, - parachain::build_import_queue, + parachain::build_import_queue_fallback, )?; cmd.run(config.chain_spec.as_ref(), client.as_ref()) }) @@ -522,7 +522,7 @@ pub fn run() -> Result<()> { let params = parachain::new_partial::( &config, - parachain::build_import_queue, + parachain::build_import_queue_fallback, )?; cmd.run(params.client) }) @@ -562,7 +562,7 @@ pub fn run() -> Result<()> { let params = parachain::new_partial::( &config, - parachain::build_import_queue, + parachain::build_import_queue_fallback, )?; let db = params.backend.expose_db(); let storage = params.backend.expose_storage(); @@ -618,7 +618,7 @@ pub fn run() -> Result<()> { let params = parachain::new_partial::( &config, - parachain::build_import_queue, + parachain::build_import_queue_fallback, )?; let ext_builder = RemarkBuilder::new(params.client.clone()); @@ -700,7 +700,7 @@ pub fn run() -> Result<()> { let params = parachain::new_partial::( &config, - parachain::build_import_queue, + parachain::build_import_queue_fallback, )?; let remark_builder = RemarkBuilder::new(params.client.clone()); let tka_builder = TransferKeepAliveBuilder::new( @@ -788,14 +788,8 @@ pub fn run() -> Result<()> { }; runner.run_node_until_exit(|config| async move { - #[cfg(feature = "evm-tracing")] - if config.chain_spec.is_dev() { - return local::start_node(config, evm_tracing_config).map_err(Into::into); - } - - #[cfg(not(feature = "evm-tracing"))] if config.chain_spec.is_dev() { - return local::start_node(config).map_err(Into::into); + return local::start_node(config, #[cfg(feature = "evm-tracing")] evm_tracing_config).map_err(Into::into); } let polkadot_cli = RelayChainCli::new( diff --git a/bin/collator/src/local/service.rs b/bin/collator/src/local/service.rs index 59e98cd09c..5e486d88cd 100644 --- a/bin/collator/src/local/service.rs +++ b/bin/collator/src/local/service.rs @@ -22,15 +22,19 @@ use fc_consensus::FrontierBlockImport; use fc_rpc_core::types::{FeeHistoryCache, FilterPool}; use futures::{FutureExt, StreamExt}; use sc_client_api::{Backend, BlockBackend, BlockchainEvents}; -use sc_consensus_aura::{ImportQueueParams, SlotProportion, StartAuraParams}; use sc_consensus_grandpa::SharedVoterState; use sc_executor::NativeElseWasmExecutor; use sc_service::{error::Error as ServiceError, Configuration, TaskManager}; use sc_telemetry::{Telemetry, TelemetryWorker}; use sc_transaction_pool_api::OffchainTransactionPoolFactory; -use sp_consensus_aura::sr25519::AuthorityPair as AuraPair; use std::{collections::BTreeMap, sync::Arc, time::Duration}; +#[cfg(not(feature = "manual-seal"))] +use sp_consensus_aura::sr25519::AuthorityPair as AuraPair; + +#[cfg(feature = "evm-tracing")] +use crate::{evm_tracing_types::EthApi as EthApiCmd, rpc::tracing}; + pub use local_runtime::RuntimeApi; use astar_primitives::*; @@ -138,7 +142,6 @@ pub fn new_partial( let frontier_backend = crate::rpc::open_frontier_backend(client.clone(), config)?; let frontier_block_import = FrontierBlockImport::new(grandpa_block_import.clone(), client.clone()); - let slot_duration = sc_consensus_aura::slot_duration(&*client)?; #[cfg(feature = "manual-seal")] let import_queue = sc_consensus_manual_seal::import_queue( @@ -148,27 +151,30 @@ pub fn new_partial( ); #[cfg(not(feature = "manual-seal"))] - let import_queue = sc_consensus_aura::import_queue::( - ImportQueueParams { - block_import: frontier_block_import.clone(), - justification_import: Some(Box::new(grandpa_block_import)), - client: client.clone(), - create_inherent_data_providers: move |_, ()| async move { - let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); - let slot = - sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( - *timestamp, - slot_duration, - ); - Ok((slot, timestamp)) + let import_queue = { + let slot_duration = sc_consensus_aura::slot_duration(&*client)?; + sc_consensus_aura::import_queue::( + sc_consensus_aura::ImportQueueParams { + block_import: frontier_block_import.clone(), + justification_import: Some(Box::new(grandpa_block_import)), + client: client.clone(), + create_inherent_data_providers: move |_, ()| async move { + let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); + let slot = + sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( + *timestamp, + slot_duration, + ); + Ok((slot, timestamp)) + }, + spawner: &task_manager.spawn_essential_handle(), + registry: config.prometheus_registry(), + check_for_equivocation: Default::default(), + telemetry: telemetry.as_ref().map(|x| x.handle()), + compatibility_mode: Default::default(), }, - spawner: &task_manager.spawn_essential_handle(), - registry: config.prometheus_registry(), - check_for_equivocation: Default::default(), - telemetry: telemetry.as_ref().map(|x| x.handle()), - compatibility_mode: Default::default(), - }, - )?; + )? + }; Ok(sc_service::PartialComponents { client, @@ -188,14 +194,10 @@ pub fn new_partial( } /// Builds a new service. -#[cfg(feature = "evm-tracing")] pub fn start_node( config: Configuration, - evm_tracing_config: crate::evm_tracing_types::EvmTracingConfig, + #[cfg(feature = "evm-tracing")] evm_tracing_config: crate::evm_tracing_types::EvmTracingConfig, ) -> Result { - use crate::evm_tracing_types::EthApi as EthApiCmd; - use crate::rpc::tracing; - let sc_service::PartialComponents { client, backend, @@ -264,7 +266,10 @@ pub fn start_node( > = Default::default(); let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks); + #[cfg(feature = "evm-tracing")] let ethapi_cmd = evm_tracing_config.ethapi.clone(); + + #[cfg(feature = "evm-tracing")] let tracing_requesters = if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) { tracing::spawn_tracing_tasks( @@ -331,9 +336,12 @@ pub fn start_node( ), ); - let role = config.role.clone(); + #[cfg(not(feature = "manual-seal"))] let force_authoring = config.force_authoring; + #[cfg(not(feature = "manual-seal"))] let backoff_authoring_blocks: Option<()> = None; + + let role = config.role.clone(); let name = config.network.node_name.clone(); let enable_grandpa = !config.disable_grandpa; let prometheus_registry = config.prometheus_registry().cloned(); @@ -347,6 +355,7 @@ pub fn start_node( prometheus_registry.clone(), )); + // Channel for the rpc handler to communicate with the authorship task. #[cfg(feature = "manual-seal")] let (command_sink, commands_stream) = futures::channel::mpsc::channel(1024); @@ -354,11 +363,6 @@ pub fn start_node( let client = client.clone(); let network = network.clone(); let transaction_pool = transaction_pool.clone(); - let rpc_config = crate::rpc::EvmTracingConfig { - tracing_requesters, - trace_filter_max_count: evm_tracing_config.ethapi_trace_max_count, - enable_txpool: ethapi_cmd.contains(&EthApiCmd::TxPool), - }; let sync = sync_service.clone(); let pubsub_notification_sinks = pubsub_notification_sinks.clone(); @@ -386,7 +390,12 @@ pub fn start_node( deps, subscription, pubsub_notification_sinks.clone(), - rpc_config.clone(), + #[cfg(feature = "evm-tracing")] + crate::rpc::EvmTracingConfig { + tracing_requesters: tracing_requesters.clone(), + trace_filter_max_count: evm_tracing_config.ethapi_trace_max_count, + enable_txpool: ethapi_cmd.contains(&EthApiCmd::TxPool), + }, ) .map_err::(Into::into) }) @@ -446,318 +455,7 @@ pub fn start_node( #[cfg(not(feature = "manual-seal"))] let aura = sc_consensus_aura::start_aura::( - StartAuraParams { - slot_duration, - client, - select_chain, - block_import, - proposer_factory, - create_inherent_data_providers: move |_, ()| async move { - let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); - - let slot = - sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( - *timestamp, - slot_duration, - ); - - Ok((slot, timestamp)) - }, - force_authoring, - backoff_authoring_blocks, - keystore: keystore_container.keystore(), - sync_oracle: sync_service.clone(), - justification_sync_link: sync_service.clone(), - block_proposal_slot_portion: SlotProportion::new(2f32 / 3f32), - max_block_proposal_slot_portion: None, - telemetry: telemetry.as_ref().map(|x| x.handle()), - compatibility_mode: Default::default(), - }, - )?; - - // the AURA authoring task is considered essential, i.e. if it - // fails we take down the service with it. - task_manager - .spawn_essential_handle() - .spawn_blocking("aura", Some("block-authoring"), aura); - } - - // if the node isn't actively participating in consensus then it doesn't - // need a keystore, regardless of which protocol we use below. - let keystore = if role.is_authority() { - Some(keystore_container.keystore()) - } else { - None - }; - - let grandpa_config = sc_consensus_grandpa::Config { - // FIXME #1578 make this available through chainspec - gossip_duration: Duration::from_millis(333), - justification_generation_period: GRANDPA_JUSTIFICATION_PERIOD, - name: Some(name), - observer_enabled: false, - keystore, - local_role: role, - telemetry: telemetry.as_ref().map(|x| x.handle()), - protocol_name, - }; - - if enable_grandpa { - // start the full GRANDPA voter - // NOTE: non-authorities could run the GRANDPA observer protocol, but at - // this point the full voter should provide better guarantees of block - // and vote data availability than the observer. The observer has not - // been tested extensively yet and having most nodes in a network run it - // could lead to finality stalls. - let grandpa_config = sc_consensus_grandpa::GrandpaParams { - config: grandpa_config, - link: grandpa_link, - network, - sync: sync_service, - voting_rule: sc_consensus_grandpa::VotingRulesBuilder::default().build(), - prometheus_registry, - shared_voter_state: SharedVoterState::empty(), - telemetry: telemetry.as_ref().map(|x| x.handle()), - offchain_tx_pool_factory: OffchainTransactionPoolFactory::new(transaction_pool), - }; - - // the GRANDPA voter task is considered infallible, i.e. - // if it fails we take down the service with it. - task_manager.spawn_essential_handle().spawn_blocking( - "grandpa-voter", - None, - sc_consensus_grandpa::run_grandpa_voter(grandpa_config)?, - ); - } - - network_starter.start_network(); - Ok(task_manager) -} - -/// Builds a new service. -#[cfg(not(feature = "evm-tracing"))] -pub fn start_node(config: Configuration) -> Result { - let sc_service::PartialComponents { - client, - backend, - mut task_manager, - import_queue, - keystore_container, - select_chain, - transaction_pool, - other: (block_import, grandpa_link, mut telemetry, frontier_backend), - } = new_partial(&config)?; - - let protocol_name = sc_consensus_grandpa::protocol_standard_name( - &client - .block_hash(0) - .ok() - .flatten() - .expect("Genesis block exists; qed"), - &config.chain_spec, - ); - let net_config = sc_network::config::FullNetworkConfiguration::new(&config.network); - - let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) = - sc_service::build_network(sc_service::BuildNetworkParams { - config: &config, - net_config, - client: client.clone(), - transaction_pool: transaction_pool.clone(), - spawn_handle: task_manager.spawn_handle(), - import_queue, - block_announce_validator_builder: None, - warp_sync_params: None, - block_relay: None, - })?; - - if config.offchain_worker.enabled { - task_manager.spawn_handle().spawn( - "offchain-workers-runner", - "offchain-work", - sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions { - runtime_api_provider: client.clone(), - keystore: Some(keystore_container.keystore()), - offchain_db: backend.offchain_storage(), - transaction_pool: Some(OffchainTransactionPoolFactory::new( - transaction_pool.clone(), - )), - network_provider: network.clone(), - is_validator: config.role.is_authority(), - enable_http_requests: true, - custom_extensions: move |_| vec![], - }) - .run(client.clone(), task_manager.spawn_handle()) - .boxed(), - ); - } - - let filter_pool: FilterPool = Arc::new(std::sync::Mutex::new(BTreeMap::new())); - let fee_history_cache: FeeHistoryCache = Arc::new(std::sync::Mutex::new(BTreeMap::new())); - let overrides = fc_storage::overrides_handle(client.clone()); - - // Sinks for pubsub notifications. - // Everytime a new subscription is created, a new mpsc channel is added to the sink pool. - // The MappingSyncWorker sends through the channel on block import and the subscription emits a notification to the subscriber on receiving a message through this channel. - // This way we avoid race conditions when using native substrate block import notification stream. - let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks< - fc_mapping_sync::EthereumBlockNotification, - > = Default::default(); - let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks); - - // Frontier offchain DB task. Essential. - // Maps emulated ethereum data to substrate native data. - task_manager.spawn_essential_handle().spawn( - "frontier-mapping-sync-worker", - Some("frontier"), - fc_mapping_sync::kv::MappingSyncWorker::new( - client.import_notification_stream(), - Duration::new(6, 0), - client.clone(), - backend.clone(), - overrides.clone(), - frontier_backend.clone(), - 3, - 0, - fc_mapping_sync::SyncStrategy::Parachain, - sync_service.clone(), - pubsub_notification_sinks.clone(), - ) - .for_each(|()| futures::future::ready(())), - ); - - // Frontier `EthFilterApi` maintenance. Manages the pool of user-created Filters. - // Each filter is allowed to stay in the pool for 100 blocks. - const FILTER_RETAIN_THRESHOLD: u64 = 100; - task_manager.spawn_essential_handle().spawn( - "frontier-filter-pool", - Some("frontier"), - fc_rpc::EthTask::filter_pool_task( - client.clone(), - filter_pool.clone(), - FILTER_RETAIN_THRESHOLD, - ), - ); - - const FEE_HISTORY_LIMIT: u64 = 2048; - task_manager.spawn_essential_handle().spawn( - "frontier-fee-history", - Some("frontier"), - fc_rpc::EthTask::fee_history_task( - client.clone(), - overrides.clone(), - fee_history_cache.clone(), - FEE_HISTORY_LIMIT, - ), - ); - - let role = config.role.clone(); - let force_authoring = config.force_authoring; - let backoff_authoring_blocks: Option<()> = None; - let name = config.network.node_name.clone(); - let enable_grandpa = !config.disable_grandpa; - let prometheus_registry = config.prometheus_registry().cloned(); - let is_authority = config.role.is_authority(); - - let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new( - task_manager.spawn_handle(), - overrides.clone(), - 50, - 50, - prometheus_registry.clone(), - )); - - // Channel for the rpc handler to communicate with the authorship task. - #[cfg(feature = "manual-seal")] - let (command_sink, commands_stream) = futures::channel::mpsc::channel(1024); - - let rpc_extensions_builder = { - let client = client.clone(); - let network = network.clone(); - let sync = sync_service.clone(); - let transaction_pool = transaction_pool.clone(); - let pubsub_notification_sinks = pubsub_notification_sinks.clone(); - - Box::new(move |deny_unsafe, subscription| { - let deps = crate::rpc::FullDeps { - client: client.clone(), - pool: transaction_pool.clone(), - graph: transaction_pool.pool().clone(), - network: network.clone(), - sync: sync.clone(), - is_authority, - deny_unsafe, - frontier_backend: frontier_backend.clone(), - filter_pool: filter_pool.clone(), - fee_history_limit: FEE_HISTORY_LIMIT, - fee_history_cache: fee_history_cache.clone(), - block_data_cache: block_data_cache.clone(), - overrides: overrides.clone(), - enable_evm_rpc: true, // enable EVM RPC for dev node by default - #[cfg(feature = "manual-seal")] - command_sink: Some(command_sink.clone()), - }; - - crate::rpc::create_full(deps, subscription, pubsub_notification_sinks.clone()) - .map_err::(Into::into) - }) - }; - - let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams { - network: network.clone(), - client: client.clone(), - keystore: keystore_container.keystore(), - task_manager: &mut task_manager, - transaction_pool: transaction_pool.clone(), - rpc_builder: rpc_extensions_builder, - backend, - system_rpc_tx, - tx_handler_controller, - sync_service: sync_service.clone(), - config, - telemetry: telemetry.as_mut(), - })?; - - if role.is_authority() { - let proposer_factory = sc_basic_authorship::ProposerFactory::new( - task_manager.spawn_handle(), - client.clone(), - transaction_pool.clone(), - prometheus_registry.as_ref(), - telemetry.as_ref().map(|x| x.handle()), - ); - - let slot_duration = sc_consensus_aura::slot_duration(&*client)?; - - #[cfg(feature = "manual-seal")] - let aura = sc_consensus_manual_seal::run_manual_seal( - sc_consensus_manual_seal::ManualSealParams { - block_import, - env: proposer_factory, - client: client.clone(), - pool: transaction_pool.clone(), - commands_stream, - select_chain, - consensus_data_provider: Some(Box::new( - sc_consensus_manual_seal::consensus::aura::AuraConsensusDataProvider::new( - client.clone(), - ), - )), - create_inherent_data_providers: move |_, ()| async move { - let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); - let slot = - sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( - *timestamp, - slot_duration.clone(), - ); - Ok((slot, timestamp)) - }, - }, - ); - - #[cfg(not(feature = "manual-seal"))] - let aura = sc_consensus_aura::start_aura::( - StartAuraParams { + sc_consensus_aura::StartAuraParams { slot_duration, client, select_chain, @@ -779,7 +477,7 @@ pub fn start_node(config: Configuration) -> Result { keystore: keystore_container.keystore(), sync_oracle: sync_service.clone(), justification_sync_link: sync_service.clone(), - block_proposal_slot_portion: SlotProportion::new(2f32 / 3f32), + block_proposal_slot_portion: sc_consensus_aura::SlotProportion::new(2f32 / 3f32), max_block_proposal_slot_portion: None, telemetry: telemetry.as_ref().map(|x| x.handle()), compatibility_mode: Default::default(), diff --git a/bin/collator/src/parachain/mod.rs b/bin/collator/src/parachain/mod.rs index 077a99f1de..101be82796 100644 --- a/bin/collator/src/parachain/mod.rs +++ b/bin/collator/src/parachain/mod.rs @@ -28,8 +28,8 @@ pub mod service; pub mod chain_spec; pub use service::{ - astar, build_import_queue, new_partial, shibuya, shiden, start_astar_node, start_shibuya_node, - start_shiden_node, HostFunctions, + astar, build_import_queue, build_import_queue_fallback, new_partial, shibuya, shiden, + start_astar_node, start_shibuya_node, start_shiden_node, HostFunctions, }; pub(crate) use shell_upgrade::{ diff --git a/bin/collator/src/parachain/service.rs b/bin/collator/src/parachain/service.rs index 72e2bfd30e..191dee6865 100644 --- a/bin/collator/src/parachain/service.rs +++ b/bin/collator/src/parachain/service.rs @@ -20,14 +20,12 @@ use astar_primitives::*; use cumulus_client_cli::CollatorOptions; -#[allow(deprecated)] -use cumulus_client_consensus_aura::{AuraConsensus, BuildAuraConsensusParams, SlotProportion}; -use cumulus_client_consensus_common::{ParachainBlockImport, ParachainConsensus}; +use cumulus_client_consensus_aura::collators::basic as basic_aura; +use cumulus_client_consensus_common::ParachainBlockImport; use cumulus_client_consensus_relay_chain::Verifier as RelayChainVerifier; -#[allow(deprecated)] use cumulus_client_service::{ - prepare_node_config, start_collator, start_relay_chain_tasks, BuildNetworkParams, - DARecoveryProfile, StartCollatorParams, StartRelayChainTasksParams, + prepare_node_config, start_relay_chain_tasks, BuildNetworkParams, DARecoveryProfile, + StartRelayChainTasksParams, }; use cumulus_primitives_core::ParaId; use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain; @@ -35,29 +33,30 @@ use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node_with_rpc; use fc_consensus::FrontierBlockImport; use fc_rpc_core::types::{FeeHistoryCache, FilterPool}; -use futures::{lock::Mutex, StreamExt}; +use futures::StreamExt; use polkadot_service::CollatorPair; use sc_client_api::BlockchainEvents; use sc_consensus::{import_queue::BasicQueue, ImportQueue}; use sc_executor::NativeElseWasmExecutor; use sc_network::NetworkBlock; -use sc_network_sync::SyncingService; use sc_service::{Configuration, PartialComponents, TFullBackend, TFullClient, TaskManager}; use sc_telemetry::{Telemetry, TelemetryHandle, TelemetryWorker, TelemetryWorkerHandle}; -use sp_api::ConstructRuntimeApi; -use sp_consensus_aura::{sr25519::AuthorityId as AuraId, AuraApi}; -use sp_keystore::KeystorePtr; +use sp_api::{ConstructRuntimeApi, ProvideRuntimeApi}; +use sp_consensus_aura::{ + sr25519::AuthorityId as AuraId, sr25519::AuthorityPair as AuraPair, AuraApi, +}; +use sp_core::traits::SpawnEssentialNamed; use sp_runtime::traits::BlakeTwo256; use sp_runtime::Percent; use std::{collections::BTreeMap, sync::Arc, time::Duration}; -use substrate_prometheus_endpoint::Registry; use super::shell_upgrade::*; -#[cfg(feature = "evm-tracing")] -use crate::evm_tracing_types::{EthApi as EthApiCmd, EvmTracingConfig}; #[cfg(feature = "evm-tracing")] -use crate::rpc::tracing; +use crate::{ + evm_tracing_types::{EthApi as EthApiCmd, EvmTracingConfig}, + rpc::tracing, +}; /// Extra host functions pub type HostFunctions = ( @@ -290,14 +289,13 @@ async fn build_relay_chain_interface( /// This is the actual implementation that is abstract over the executor and the runtime api. #[cfg(not(feature = "evm-tracing"))] #[sc_tracing::logging::prefix_logs_with("Parachain")] -async fn start_node_impl( +async fn start_node_impl( parachain_config: Configuration, polkadot_config: Configuration, collator_options: CollatorOptions, id: ParaId, additional_config: AdditionalConfig, build_import_queue: BIQ, - build_consensus: BIC, ) -> sc_service::error::Result<( TaskManager, Arc>>, @@ -336,31 +334,6 @@ where Option, &TaskManager, ) -> Result, sc_service::Error>, - BIC: FnOnce( - Arc>>, - ParachainBlockImport< - Block, - FrontierBlockImport< - Block, - Arc>>, - TFullClient>, - >, - TFullBackend, - >, - Option<&Registry>, - Option, - &TaskManager, - Arc, - Arc< - sc_transaction_pool::FullPool< - Block, - TFullClient>, - >, - >, - Arc>, - KeystorePtr, - bool, - ) -> Result>, sc_service::Error>, { let parachain_config = prepare_node_config(parachain_config); @@ -383,7 +356,6 @@ where .await .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?; - let force_authoring = parachain_config.force_authoring; let is_authority = parachain_config.role.is_authority(); let prometheus_registry = parachain_config.prometheus_registry().cloned(); let transaction_pool = params.transaction_pool.clone(); @@ -528,58 +500,131 @@ where .overseer_handle() .map_err(|e| sc_service::Error::Application(Box::new(e)))?; + start_relay_chain_tasks(StartRelayChainTasksParams { + client: client.clone(), + announce_block: announce_block.clone(), + task_manager: &mut task_manager, + para_id: id, + relay_chain_interface: relay_chain_interface.clone(), + relay_chain_slot_duration, + import_queue: import_queue_service, + recovery_handle: Box::new(overseer_handle.clone()), + sync_service: sync_service.clone(), + da_recovery_profile: if is_authority { + DARecoveryProfile::Collator + } else { + DARecoveryProfile::FullNode + }, + })?; + + start_network.start_network(); + if is_authority { - let parachain_consensus = build_consensus( + let mut proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording( + task_manager.spawn_handle(), client.clone(), - parachain_block_import, + transaction_pool, prometheus_registry.as_ref(), telemetry.as_ref().map(|t| t.handle()), - &task_manager, - relay_chain_interface.clone(), - transaction_pool, - sync_service.clone(), - params.keystore_container.keystore(), - force_authoring, - )?; + ); + + proposer_factory.set_default_block_size_limit(additional_config.proposer_block_size_limit); + proposer_factory.set_soft_deadline(Percent::from_percent( + additional_config.proposer_soft_deadline_percent, + )); + + let overseer_handle = relay_chain_interface + .overseer_handle() + .map_err(|e| sc_service::Error::Application(Box::new(e)))?; + + let collator_key = collator_key.expect("Command line arguments do not allow this. qed"); let spawner = task_manager.spawn_handle(); + let client_ = client.clone(); - let params = StartCollatorParams { - para_id: id, - block_status: client.clone(), - announce_block, - client: client.clone(), - task_manager: &mut task_manager, - relay_chain_interface: relay_chain_interface.clone(), - spawner, - parachain_consensus, - import_queue: import_queue_service, - collator_key: collator_key.expect("Command line arguments do not allow this. qed"), - relay_chain_slot_duration, - recovery_handle: Box::new(overseer_handle), - sync_service, - }; + let collation_future = Box::pin(async move { + use parity_scale_codec::Decode; + use sp_api::ApiExt; + use sp_runtime::traits::Block as BlockT; - #[allow(deprecated)] - start_collator(params).await?; - } else { - let params = StartRelayChainTasksParams { - client: client.clone(), - announce_block, - task_manager: &mut task_manager, - para_id: id, - relay_chain_interface, - relay_chain_slot_duration, - import_queue: import_queue_service, - recovery_handle: Box::new(overseer_handle), - sync_service, - da_recovery_profile: DARecoveryProfile::FullNode, - }; + let client = client_.clone(); - start_relay_chain_tasks(params)?; - } + // Start collating with the `shell` runtime while waiting for an upgrade to an Aura + // compatible runtime. + let mut request_stream = cumulus_client_collator::relay_chain_driven::init( + collator_key.clone(), + id, + overseer_handle.clone(), + ) + .await; + + while let Some(request) = request_stream.next().await { + let pvd = request.persisted_validation_data().clone(); + let last_head_hash = + match ::Header::decode(&mut &pvd.parent_head.0[..]) { + Ok(header) => header.hash(), + Err(e) => { + log::error!("Could not decode the head data: {e}"); + request.complete(None); + continue; + } + }; + + // Check if we have upgraded to an Aura compatible runtime and transition if + // necessary. + if client + .runtime_api() + .has_api::>(last_head_hash) + .unwrap_or_default() + { + // Respond to this request before transitioning to Aura. + request.complete(None); + break; + } + } - start_network.start_network(); + // Move to Aura consensus. + let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client) + .expect("aura is present; qed"); + + let announce_block = { + let sync_service = sync_service.clone(); + Arc::new(move |hash, data| sync_service.announce_block(hash, data)) + }; + + let collator_service = cumulus_client_collator::service::CollatorService::new( + client.clone(), + Arc::new(spawner), + announce_block, + client.clone(), + ); + + let params = basic_aura::Params { + create_inherent_data_providers: move |_, ()| async move { Ok(()) }, + block_import: parachain_block_import.clone(), + para_client: client.clone(), + relay_client: relay_chain_interface.clone(), + sync_oracle: sync_service.clone(), + keystore: params.keystore_container.keystore(), + collator_key, + para_id: id, + overseer_handle, + slot_duration, + relay_chain_slot_duration: Duration::from_secs(6), + proposer: cumulus_client_consensus_proposer::Proposer::new(proposer_factory), + collator_service, + // We got around 500ms for proposing + authoring_duration: Duration::from_millis(500), + collation_request_receiver: None, + }; + + basic_aura::run::(params).await + }); + + task_manager + .spawn_essential_handle() + .spawn_essential("aura", None, collation_future); + } Ok((task_manager, client)) } @@ -606,14 +651,13 @@ pub struct AdditionalConfig { /// This is the actual implementation that is abstract over the executor and the runtime api. #[cfg(feature = "evm-tracing")] #[sc_tracing::logging::prefix_logs_with("Parachain")] -async fn start_node_impl( +async fn start_node_impl( parachain_config: Configuration, polkadot_config: Configuration, collator_options: CollatorOptions, id: ParaId, additional_config: AdditionalConfig, build_import_queue: BIQ, - build_consensus: BIC, ) -> sc_service::error::Result<( TaskManager, Arc>>, @@ -654,31 +698,6 @@ where Option, &TaskManager, ) -> Result, sc_service::Error>, - BIC: FnOnce( - Arc>>, - ParachainBlockImport< - Block, - FrontierBlockImport< - Block, - Arc>>, - TFullClient>, - >, - TFullBackend, - >, - Option<&Registry>, - Option, - &TaskManager, - Arc, - Arc< - sc_transaction_pool::FullPool< - Block, - TFullClient>, - >, - >, - Arc>, - KeystorePtr, - bool, - ) -> Result>, sc_service::Error>, { let parachain_config = prepare_node_config(parachain_config); @@ -701,7 +720,6 @@ where .await .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?; - let force_authoring = parachain_config.force_authoring; let is_authority = parachain_config.role.is_authority(); let prometheus_registry = parachain_config.prometheus_registry().cloned(); let transaction_pool = params.transaction_pool.clone(); @@ -877,63 +895,138 @@ where .overseer_handle() .map_err(|e| sc_service::Error::Application(Box::new(e)))?; + start_relay_chain_tasks(StartRelayChainTasksParams { + client: client.clone(), + announce_block: announce_block.clone(), + task_manager: &mut task_manager, + para_id: id, + relay_chain_interface: relay_chain_interface.clone(), + relay_chain_slot_duration, + import_queue: import_queue_service, + recovery_handle: Box::new(overseer_handle.clone()), + sync_service: sync_service.clone(), + da_recovery_profile: if is_authority { + DARecoveryProfile::Collator + } else { + DARecoveryProfile::FullNode + }, + })?; + + start_network.start_network(); + if is_authority { - let parachain_consensus = build_consensus( + let mut proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording( + task_manager.spawn_handle(), client.clone(), - parachain_block_import, + transaction_pool, prometheus_registry.as_ref(), telemetry.as_ref().map(|t| t.handle()), - &task_manager, - relay_chain_interface.clone(), - transaction_pool, - sync_service.clone(), - params.keystore_container.keystore(), - force_authoring, - )?; + ); + + proposer_factory.set_default_block_size_limit(additional_config.proposer_block_size_limit); + proposer_factory.set_soft_deadline(Percent::from_percent( + additional_config.proposer_soft_deadline_percent, + )); + + let overseer_handle = relay_chain_interface + .overseer_handle() + .map_err(|e| sc_service::Error::Application(Box::new(e)))?; + + let collator_key = collator_key.expect("Command line arguments do not allow this. qed"); let spawner = task_manager.spawn_handle(); + let client_ = client.clone(); - let params = StartCollatorParams { - para_id: id, - block_status: client.clone(), - announce_block, - client: client.clone(), - task_manager: &mut task_manager, - relay_chain_interface: relay_chain_interface.clone(), - spawner, - parachain_consensus, - import_queue: import_queue_service, - collator_key: collator_key.expect("Command line arguments do not allow this. qed"), - relay_chain_slot_duration, - recovery_handle: Box::new(overseer_handle), - sync_service, - }; - #[allow(deprecated)] - start_collator(params).await?; - } else { - let params = StartRelayChainTasksParams { - client: client.clone(), - announce_block, - task_manager: &mut task_manager, - para_id: id, - relay_chain_interface, - relay_chain_slot_duration, - import_queue: import_queue_service, - recovery_handle: Box::new(overseer_handle), - sync_service, - da_recovery_profile: DARecoveryProfile::FullNode, - }; + let collation_future = Box::pin(async move { + use parity_scale_codec::Decode; + use sp_api::ApiExt; + use sp_runtime::traits::Block as BlockT; - start_relay_chain_tasks(params)?; - } + let client = client_.clone(); - start_network.start_network(); + // Start collating with the `shell` runtime while waiting for an upgrade to an Aura + // compatible runtime. + let mut request_stream = cumulus_client_collator::relay_chain_driven::init( + collator_key.clone(), + id, + overseer_handle.clone(), + ) + .await; + + while let Some(request) = request_stream.next().await { + let pvd = request.persisted_validation_data().clone(); + let last_head_hash = + match ::Header::decode(&mut &pvd.parent_head.0[..]) { + Ok(header) => header.hash(), + Err(e) => { + log::error!("Could not decode the head data: {e}"); + request.complete(None); + continue; + } + }; + + // Check if we have upgraded to an Aura compatible runtime and transition if + // necessary. + if client + .runtime_api() + .has_api::>(last_head_hash) + .unwrap_or_default() + { + // Respond to this request before transitioning to Aura. + request.complete(None); + break; + } + } + + // Move to Aura consensus. + let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client) + .expect("aura is present; qed"); + + let announce_block = { + let sync_service = sync_service.clone(); + Arc::new(move |hash, data| sync_service.announce_block(hash, data)) + }; + + let collator_service = cumulus_client_collator::service::CollatorService::new( + client.clone(), + Arc::new(spawner), + announce_block, + client.clone(), + ); + + let params = basic_aura::Params { + create_inherent_data_providers: move |_, ()| async move { Ok(()) }, + block_import: parachain_block_import.clone(), + para_client: client.clone(), + relay_client: relay_chain_interface.clone(), + sync_oracle: sync_service.clone(), + keystore: params.keystore_container.keystore(), + collator_key, + para_id: id, + overseer_handle, + slot_duration, + relay_chain_slot_duration: Duration::from_secs(6), + proposer: cumulus_client_consensus_proposer::Proposer::new(proposer_factory), + collator_service, + // We got around 500ms for proposing + authoring_duration: Duration::from_millis(500), + collation_request_receiver: None, + }; + + basic_aura::run::(params).await + }); + + task_manager + .spawn_essential_handle() + .spawn_essential("aura", None, collation_future); + } Ok((task_manager, client)) } -/// Build the import queue. -pub fn build_import_queue( +/// Build aura import queue with fallback to relay-chain verifier. +/// Starts with relay-chain verifier until aura becomes available. +pub fn build_import_queue_fallback( client: Arc>>, block_import: ParachainBlockImport< Block, @@ -960,7 +1053,7 @@ where + sp_offchain::OffchainWorkerApi + sp_block_builder::BlockBuilder + fp_rpc::EthereumRuntimeRPCApi - + sp_consensus_aura::AuraApi, + + AuraApi, sc_client_api::StateBackendFor, Block>: sp_api::StateBackend, Executor: sc_executor::NativeExecutionDispatch + 'static, { @@ -970,7 +1063,7 @@ where let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client2).unwrap(); Box::new(cumulus_client_consensus_aura::build_verifier::< - sp_consensus_aura::sr25519::AuthorityPair, + AuraPair, _, _, _, @@ -1015,143 +1108,68 @@ where )) } -/// Start a parachain node for Astar. -#[cfg(feature = "evm-tracing")] -pub async fn start_astar_node( - parachain_config: Configuration, - polkadot_config: Configuration, - collator_options: CollatorOptions, - id: ParaId, - additional_config: AdditionalConfig, -) -> sc_service::error::Result<( - TaskManager, - Arc>>, -)> { - start_node_impl::( - parachain_config, - polkadot_config, - collator_options, - id, - additional_config.clone(), - |client, - block_import, - config, - telemetry, - task_manager| { - let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client)?; - - cumulus_client_consensus_aura::import_queue::< - sp_consensus_aura::sr25519::AuthorityPair, - _, - _, - _, - _, - _, - >(cumulus_client_consensus_aura::ImportQueueParams { - block_import, - client, - create_inherent_data_providers: move |_, _| async move { - let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); - - let slot = - sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( - *timestamp, - slot_duration, - ); +/// Build aura only import queue. +pub fn build_import_queue( + client: Arc>>, + block_import: ParachainBlockImport< + Block, + FrontierBlockImport< + Block, + Arc>>, + TFullClient>, + >, + TFullBackend, + >, + config: &Configuration, + telemetry_handle: Option, + task_manager: &TaskManager, +) -> Result, sc_service::Error> +where + RuntimeApi: ConstructRuntimeApi>> + + Send + + Sync + + 'static, + RuntimeApi::RuntimeApi: sp_transaction_pool::runtime_api::TaggedTransactionQueue + + sp_api::Metadata + + sp_session::SessionKeys + + sp_api::ApiExt + + sp_offchain::OffchainWorkerApi + + sp_block_builder::BlockBuilder + + fp_rpc::EthereumRuntimeRPCApi + + AuraApi, + sc_client_api::StateBackendFor, Block>: sp_api::StateBackend, + Executor: sc_executor::NativeExecutionDispatch + 'static, +{ + let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client)?; + + cumulus_client_consensus_aura::import_queue::< + AuraPair, + _, + _, + _, + _, + _, + >(cumulus_client_consensus_aura::ImportQueueParams { + block_import, + client, + create_inherent_data_providers: move |_, _| async move { + let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); - Ok((slot, timestamp)) - }, - registry: config.prometheus_registry(), - spawner: &task_manager.spawn_essential_handle(), - telemetry, - }) - .map_err(Into::into) - }, - |client, - block_import, - prometheus_registry, - telemetry, - task_manager, - relay_chain_interface, - transaction_pool, - sync_oracle, - keystore, - force_authoring| { - let spawn_handle = task_manager.spawn_handle(); - - let slot_duration = - cumulus_client_consensus_aura::slot_duration(&*client).unwrap(); - - let mut proposer_factory = - sc_basic_authorship::ProposerFactory::with_proof_recording( - spawn_handle, - client.clone(), - transaction_pool, - prometheus_registry, - telemetry.clone(), + let slot = + sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( + *timestamp, + slot_duration, ); - proposer_factory.set_default_block_size_limit(additional_config.proposer_block_size_limit); - proposer_factory.set_soft_deadline(Percent::from_percent(additional_config.proposer_soft_deadline_percent)); - - let relay_chain_for_aura = relay_chain_interface.clone(); - - #[allow(deprecated)] - Ok(AuraConsensus::build::< - sp_consensus_aura::sr25519::AuthorityPair, - _, - _, - _, - _, - _, - _, - >(BuildAuraConsensusParams { - proposer_factory, - create_inherent_data_providers: - move |_, (relay_parent, validation_data)| { - let relay_chain_for_aura = relay_chain_for_aura.clone(); - async move { - let parachain_inherent = - cumulus_primitives_parachain_inherent::ParachainInherentData::create_at( - relay_parent, - &relay_chain_for_aura, - &validation_data, - id, - ).await; - let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); - let slot = - sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( - *timestamp, - slot_duration, - ); - - let parachain_inherent = parachain_inherent.ok_or_else(|| { - Box::::from( - "Failed to create parachain inherent", - ) - })?; - Ok((slot, timestamp, parachain_inherent)) - } - }, - block_import: block_import, - para_client: client, - backoff_authoring_blocks: Option::<()>::None, - sync_oracle, - keystore, - force_authoring, - slot_duration, - // We got around 500ms for proposing - block_proposal_slot_portion: SlotProportion::new(1f32 / 24f32), - // And a maximum of 750ms if slots are skipped - max_block_proposal_slot_portion: Some(SlotProportion::new(1f32 / 16f32)), - telemetry, - }) - ) - }).await + Ok((slot, timestamp)) + }, + registry: config.prometheus_registry(), + spawner: &task_manager.spawn_essential_handle(), + telemetry: telemetry_handle, + }).map_err(Into::into) } /// Start a parachain node for Astar. -#[cfg(not(feature = "evm-tracing"))] pub async fn start_astar_node( parachain_config: Configuration, polkadot_config: Configuration, @@ -1162,295 +1180,18 @@ pub async fn start_astar_node( TaskManager, Arc>>, )> { - start_node_impl::( - parachain_config, - polkadot_config, - collator_options, - id, - additional_config.clone(), - |client, - block_import, - config, - telemetry, - task_manager| { - let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client)?; - - cumulus_client_consensus_aura::import_queue::< - sp_consensus_aura::sr25519::AuthorityPair, - _, - _, - _, - _, - _, - >(cumulus_client_consensus_aura::ImportQueueParams { - block_import, - client, - create_inherent_data_providers: move |_, _| async move { - let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); - - let slot = - sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( - *timestamp, - slot_duration, - ); - - Ok((slot, timestamp)) - }, - registry: config.prometheus_registry(), - spawner: &task_manager.spawn_essential_handle(), - telemetry, - }) - .map_err(Into::into) - }, - |client, - block_import, - prometheus_registry, - telemetry, - task_manager, - relay_chain_interface, - transaction_pool, - sync_service, - keystore, - force_authoring| { - let spawn_handle = task_manager.spawn_handle(); - - let slot_duration = - cumulus_client_consensus_aura::slot_duration(&*client).unwrap(); - - let mut proposer_factory = - sc_basic_authorship::ProposerFactory::with_proof_recording( - spawn_handle, - client.clone(), - transaction_pool, - prometheus_registry, - telemetry.clone(), - ); - - proposer_factory.set_default_block_size_limit(additional_config.proposer_block_size_limit); - proposer_factory.set_soft_deadline(Percent::from_percent(additional_config.proposer_soft_deadline_percent)); - - let relay_chain_for_aura = relay_chain_interface.clone(); - - #[allow(deprecated)] - Ok(AuraConsensus::build::< - sp_consensus_aura::sr25519::AuthorityPair, - _, - _, - _, - _, - _, - _, - >(BuildAuraConsensusParams { - proposer_factory, - create_inherent_data_providers: - move |_, (relay_parent, validation_data)| { - let relay_chain_for_aura = relay_chain_for_aura.clone(); - async move { - let parachain_inherent = - cumulus_primitives_parachain_inherent::ParachainInherentData::create_at( - relay_parent, - &relay_chain_for_aura, - &validation_data, - id, - ).await; - let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); - let slot = - sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( - *timestamp, - slot_duration, - ); - - let parachain_inherent = parachain_inherent.ok_or_else(|| { - Box::::from( - "Failed to create parachain inherent", - ) - })?; - Ok((slot, timestamp, parachain_inherent)) - } - }, - block_import, - para_client: client, - backoff_authoring_blocks: Option::<()>::None, - keystore, - force_authoring, - slot_duration, - sync_oracle: sync_service.clone(), - // We got around 500ms for proposing - block_proposal_slot_portion: SlotProportion::new(1f32 / 24f32), - // And a maximum of 750ms if slots are skipped - max_block_proposal_slot_portion: Some(SlotProportion::new(1f32 / 16f32)), - telemetry, - }) - ) - }).await -} - -/// Start a parachain node for Shiden. -#[cfg(feature = "evm-tracing")] -pub async fn start_shiden_node( - parachain_config: Configuration, - polkadot_config: Configuration, - collator_options: CollatorOptions, - id: ParaId, - additional_config: AdditionalConfig, -) -> sc_service::error::Result<( - TaskManager, - Arc>>, -)> { - start_node_impl::( + start_node_impl::( parachain_config, polkadot_config, collator_options, id, additional_config.clone(), build_import_queue, - |client, - block_import, - prometheus_registry, - telemetry, - task_manager, - relay_chain_interface, - transaction_pool, - sync_oracle, - keystore, - force_authoring| { - let client2 = client.clone(); - let spawn_handle = task_manager.spawn_handle(); - let transaction_pool2 = transaction_pool.clone(); - let telemetry2 = telemetry.clone(); - let prometheus_registry2 = prometheus_registry.map(|r| (*r).clone()); - let relay_chain_for_aura = relay_chain_interface.clone(); - let block_import2 = block_import.clone(); - let sync_oracle2 = sync_oracle.clone(); - let keystore2 = keystore.clone(); - - let aura_consensus = BuildOnAccess::Uninitialized(Some( - Box::new(move || { - let slot_duration = - cumulus_client_consensus_aura::slot_duration(&*client2).unwrap(); - - let mut proposer_factory = - sc_basic_authorship::ProposerFactory::with_proof_recording( - spawn_handle, - client2.clone(), - transaction_pool2, - prometheus_registry2.as_ref(), - telemetry2.clone(), - ); - - proposer_factory.set_default_block_size_limit(additional_config.proposer_block_size_limit); - proposer_factory.set_soft_deadline(Percent::from_percent(additional_config.proposer_soft_deadline_percent)); - - #[allow(deprecated)] - AuraConsensus::build::< - sp_consensus_aura::sr25519::AuthorityPair, - _, - _, - _, - _, - _, - _, - >(BuildAuraConsensusParams { - proposer_factory, - create_inherent_data_providers: - move |_, (relay_parent, validation_data)| { - let relay_chain_for_aura = relay_chain_for_aura.clone(); - async move { - let parachain_inherent = - cumulus_primitives_parachain_inherent::ParachainInherentData::create_at( - relay_parent, - &relay_chain_for_aura, - &validation_data, - id, - ).await; - let timestamp = - sp_timestamp::InherentDataProvider::from_system_time(); - - let slot = - sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( - *timestamp, - slot_duration, - ); - - let parachain_inherent = - parachain_inherent.ok_or_else(|| { - Box::::from( - "Failed to create parachain inherent", - ) - })?; - Ok((slot, timestamp, parachain_inherent)) - } - }, - block_import: block_import2.clone(), - para_client: client2.clone(), - backoff_authoring_blocks: Option::<()>::None, - sync_oracle: sync_oracle2, - keystore: keystore2, - force_authoring, - slot_duration, - // We got around 500ms for proposing - block_proposal_slot_portion: SlotProportion::new(1f32 / 24f32), - // And a maximum of 750ms if slots are skipped - max_block_proposal_slot_portion: Some(SlotProportion::new(1f32 / 16f32)), - telemetry: telemetry2, - }) - }), - )); - - let mut proposer_factory = - sc_basic_authorship::ProposerFactory::with_proof_recording( - task_manager.spawn_handle(), - client.clone(), - transaction_pool, - prometheus_registry, - telemetry.clone(), - ); - - proposer_factory.set_default_block_size_limit(additional_config.proposer_block_size_limit); - proposer_factory.set_soft_deadline(Percent::from_percent(additional_config.proposer_soft_deadline_percent)); - - let relay_chain_consensus = - cumulus_client_consensus_relay_chain::build_relay_chain_consensus( - cumulus_client_consensus_relay_chain::BuildRelayChainConsensusParams { - para_id: id, - proposer_factory, - block_import: block_import, //client.clone(), - relay_chain_interface: relay_chain_interface.clone(), - create_inherent_data_providers: - move |_, (relay_parent, validation_data)| { - let relay_chain_for_aura = relay_chain_interface.clone(); - async move { - let parachain_inherent = - cumulus_primitives_parachain_inherent::ParachainInherentData::create_at( - relay_parent, - &relay_chain_for_aura, - &validation_data, - id, - ).await; - let parachain_inherent = - parachain_inherent.ok_or_else(|| { - Box::::from( - "Failed to create parachain inherent", - ) - })?; - Ok(parachain_inherent) - } - }, - }, - ); - - let parachain_consensus = Box::new(WaitForAuraConsensus { - client, - aura_consensus: Arc::new(Mutex::new(aura_consensus)), - relay_chain_consensus: Arc::new(Mutex::new(relay_chain_consensus)), - }); - - Ok(parachain_consensus) - }).await + ) + .await } /// Start a parachain node for Shiden. -#[cfg(not(feature = "evm-tracing"))] pub async fn start_shiden_node( parachain_config: Configuration, polkadot_config: Configuration, @@ -1461,292 +1202,18 @@ pub async fn start_shiden_node( TaskManager, Arc>>, )> { - start_node_impl::( - parachain_config, - polkadot_config, - collator_options, - id, - additional_config.clone(), - build_import_queue, - |client, - block_import, - prometheus_registry, - telemetry, - task_manager, - relay_chain_interface, - transaction_pool, - sync_oracle, - keystore, - force_authoring| { - let client2 = client.clone(); - let spawn_handle = task_manager.spawn_handle(); - let transaction_pool2 = transaction_pool.clone(); - let telemetry2 = telemetry.clone(); - let prometheus_registry2 = prometheus_registry.map(|r| (*r).clone()); - let relay_chain_for_aura = relay_chain_interface.clone(); - let block_import2 = block_import.clone(); - let sync_oracle2 = sync_oracle.clone(); - let keystore2 = keystore.clone(); - let aura_consensus = BuildOnAccess::Uninitialized(Some( - Box::new(move || { - let slot_duration = - cumulus_client_consensus_aura::slot_duration(&*client2).unwrap(); - - let mut proposer_factory = - sc_basic_authorship::ProposerFactory::with_proof_recording( - spawn_handle, - client2.clone(), - transaction_pool2, - prometheus_registry2.as_ref(), - telemetry2.clone(), - ); - - proposer_factory.set_default_block_size_limit(additional_config.proposer_block_size_limit); - proposer_factory.set_soft_deadline(Percent::from_percent(additional_config.proposer_soft_deadline_percent)); - - #[allow(deprecated)] - AuraConsensus::build::< - sp_consensus_aura::sr25519::AuthorityPair, - _, - _, - _, - _, - _, - _, - >(BuildAuraConsensusParams { - proposer_factory, - create_inherent_data_providers: - move |_, (relay_parent, validation_data)| { - let relay_chain_for_aura = relay_chain_for_aura.clone(); - async move { - let parachain_inherent = - cumulus_primitives_parachain_inherent::ParachainInherentData::create_at( - relay_parent, - &relay_chain_for_aura, - &validation_data, - id, - ).await; - let timestamp = - sp_timestamp::InherentDataProvider::from_system_time(); - - let slot = - sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( - *timestamp, - slot_duration, - ); - - let parachain_inherent = - parachain_inherent.ok_or_else(|| { - Box::::from( - "Failed to create parachain inherent", - ) - })?; - Ok((slot, timestamp, parachain_inherent)) - } - }, - block_import: block_import2.clone(), - para_client: client2.clone(), - backoff_authoring_blocks: Option::<()>::None, - sync_oracle: sync_oracle2, - keystore: keystore2, - force_authoring, - slot_duration, - // We got around 500ms for proposing - block_proposal_slot_portion: SlotProportion::new(1f32 / 24f32), - // And a maximum of 750ms if slots are skipped - max_block_proposal_slot_portion: Some(SlotProportion::new(1f32 / 16f32)), - telemetry: telemetry2, - }) - }), - )); - - let mut proposer_factory = - sc_basic_authorship::ProposerFactory::with_proof_recording( - task_manager.spawn_handle(), - client.clone(), - transaction_pool, - prometheus_registry, - telemetry.clone(), - ); - - proposer_factory.set_default_block_size_limit(additional_config.proposer_block_size_limit); - proposer_factory.set_soft_deadline(Percent::from_percent(additional_config.proposer_soft_deadline_percent)); - - let relay_chain_consensus = - cumulus_client_consensus_relay_chain::build_relay_chain_consensus( - cumulus_client_consensus_relay_chain::BuildRelayChainConsensusParams { - para_id: id, - proposer_factory, - block_import: block_import, //client.clone(), - relay_chain_interface: relay_chain_interface.clone(), - create_inherent_data_providers: - move |_, (relay_parent, validation_data)| { - let relay_chain_for_aura = relay_chain_interface.clone(); - async move { - let parachain_inherent = - cumulus_primitives_parachain_inherent::ParachainInherentData::create_at( - relay_parent, - &relay_chain_for_aura, - &validation_data, - id, - ).await; - let parachain_inherent = - parachain_inherent.ok_or_else(|| { - Box::::from( - "Failed to create parachain inherent", - ) - })?; - Ok(parachain_inherent) - } - }, - }, - ); - - let parachain_consensus = Box::new(WaitForAuraConsensus { - client, - aura_consensus: Arc::new(Mutex::new(aura_consensus)), - relay_chain_consensus: Arc::new(Mutex::new(relay_chain_consensus)), - }); - - Ok(parachain_consensus) - }).await -} - -/// Start a parachain node for Shibuya. -#[cfg(feature = "evm-tracing")] -pub async fn start_shibuya_node( - parachain_config: Configuration, - polkadot_config: Configuration, - collator_options: CollatorOptions, - id: ParaId, - additional_config: AdditionalConfig, -) -> sc_service::error::Result<( - TaskManager, - Arc>>, -)> { - start_node_impl::( + start_node_impl::( parachain_config, polkadot_config, collator_options, id, additional_config.clone(), - |client, - block_import, - config, - telemetry, - task_manager| { - let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client)?; - - cumulus_client_consensus_aura::import_queue::< - sp_consensus_aura::sr25519::AuthorityPair, - _, - _, - _, - _, - _, - >(cumulus_client_consensus_aura::ImportQueueParams { - block_import, - client, - create_inherent_data_providers: move |_, _| async move { - let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); - - let slot = - sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( - *timestamp, - slot_duration, - ); - - Ok((slot, timestamp)) - }, - registry: config.prometheus_registry(), - spawner: &task_manager.spawn_essential_handle(), - telemetry, - }) - .map_err(Into::into) - }, - |client, - block_import, - prometheus_registry, - telemetry, - task_manager, - relay_chain_interface, - transaction_pool, - sync_oracle, - keystore, - force_authoring| { - let spawn_handle = task_manager.spawn_handle(); - - let slot_duration = - cumulus_client_consensus_aura::slot_duration(&*client).unwrap(); - - let mut proposer_factory = - sc_basic_authorship::ProposerFactory::with_proof_recording( - spawn_handle, - client.clone(), - transaction_pool, - prometheus_registry, - telemetry.clone(), - ); - - proposer_factory.set_default_block_size_limit(additional_config.proposer_block_size_limit); - proposer_factory.set_soft_deadline(Percent::from_percent(additional_config.proposer_soft_deadline_percent)); - - #[allow(deprecated)] - Ok(AuraConsensus::build::< - sp_consensus_aura::sr25519::AuthorityPair, - _, - _, - _, - _, - _, - _, - >(BuildAuraConsensusParams { - proposer_factory, - create_inherent_data_providers: - move |_, (relay_parent, validation_data)| { - let relay_chain_for_aura = relay_chain_interface.clone(); - async move { - let parachain_inherent = - cumulus_primitives_parachain_inherent::ParachainInherentData::create_at( - relay_parent, - &relay_chain_for_aura, - &validation_data, - id, - ).await; - let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); - let slot = - sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( - *timestamp, - slot_duration, - ); - - let parachain_inherent = parachain_inherent.ok_or_else(|| { - Box::::from( - "Failed to create parachain inherent", - ) - })?; - Ok((slot, timestamp, parachain_inherent)) - } - }, - block_import: block_import, - para_client: client, - backoff_authoring_blocks: Option::<()>::None, - sync_oracle, - keystore, - force_authoring, - slot_duration, - // We got around 500ms for proposing - block_proposal_slot_portion: SlotProportion::new(1f32 / 24f32), - // And a maximum of 750ms if slots are skipped - max_block_proposal_slot_portion: Some(SlotProportion::new(1f32 / 16f32)), - telemetry, - }) - ) - }).await + build_import_queue_fallback, + ) + .await } /// Start a parachain node for Shibuya. -#[cfg(not(feature = "evm-tracing"))] pub async fn start_shibuya_node( parachain_config: Configuration, polkadot_config: Configuration, @@ -1757,123 +1224,13 @@ pub async fn start_shibuya_node( TaskManager, Arc>>, )> { - start_node_impl::( + start_node_impl::( parachain_config, polkadot_config, collator_options, id, additional_config.clone(), - |client, - block_import, - config, - telemetry, - task_manager| { - let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client)?; - - cumulus_client_consensus_aura::import_queue::< - sp_consensus_aura::sr25519::AuthorityPair, - _, - _, - _, - _, - _, - >(cumulus_client_consensus_aura::ImportQueueParams { - block_import, - client, - create_inherent_data_providers: move |_, _| async move { - let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); - - let slot = - sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( - *timestamp, - slot_duration, - ); - - Ok((slot, timestamp)) - }, - registry: config.prometheus_registry(), - spawner: &task_manager.spawn_essential_handle(), - telemetry, - }) - .map_err(Into::into) - }, - |client, - block_import, - prometheus_registry, - telemetry, - task_manager, - relay_chain_interface, - transaction_pool, - sync_oracle, - keystore, - force_authoring| { - let spawn_handle = task_manager.spawn_handle(); - - let slot_duration = - cumulus_client_consensus_aura::slot_duration(&*client).unwrap(); - - let mut proposer_factory = - sc_basic_authorship::ProposerFactory::with_proof_recording( - spawn_handle, - client.clone(), - transaction_pool, - prometheus_registry, - telemetry.clone(), - ); - - proposer_factory.set_default_block_size_limit(additional_config.proposer_block_size_limit); - proposer_factory.set_soft_deadline(Percent::from_percent(additional_config.proposer_soft_deadline_percent)); - - #[allow(deprecated)] - Ok(AuraConsensus::build::< - sp_consensus_aura::sr25519::AuthorityPair, - _, - _, - _, - _, - _, - _, - >(BuildAuraConsensusParams { - proposer_factory, - create_inherent_data_providers: - move |_, (relay_parent, validation_data)| { - let relay_chain_for_aura = relay_chain_interface.clone(); - async move { - let parachain_inherent = - cumulus_primitives_parachain_inherent::ParachainInherentData::create_at( - relay_parent, - &relay_chain_for_aura, - &validation_data, - id, - ).await; - let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); - let slot = - sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( - *timestamp, - slot_duration, - ); - - let parachain_inherent = parachain_inherent.ok_or_else(|| { - Box::::from( - "Failed to create parachain inherent", - ) - })?; - Ok((slot, timestamp, parachain_inherent)) - } - }, - block_import: block_import, - para_client: client, - backoff_authoring_blocks: Option::<()>::None, - sync_oracle, - keystore, - force_authoring, - slot_duration, - // We got around 500ms for proposing - block_proposal_slot_portion: SlotProportion::new(1f32 / 24f32), - // And a maximum of 750ms if slots are skipped - max_block_proposal_slot_portion: Some(SlotProportion::new(1f32 / 16f32)), - telemetry, - }) - ) - }).await + build_import_queue, + ) + .await } diff --git a/bin/collator/src/parachain/shell_upgrade.rs b/bin/collator/src/parachain/shell_upgrade.rs index 510565bade..212cd2d79f 100644 --- a/bin/collator/src/parachain/shell_upgrade.rs +++ b/bin/collator/src/parachain/shell_upgrade.rs @@ -16,14 +16,11 @@ // You should have received a copy of the GNU General Public License // along with Astar. If not, see . -///! Special [`ParachainConsensus`] implementation that waits for the upgrade from -///! shell to a parachain runtime that implements Aura. +//! Utility for the upgrade from shell to a parachain runtime that implements Aura. use astar_primitives::*; -use cumulus_client_consensus_common::{ParachainCandidate, ParachainConsensus}; -use cumulus_primitives_core::relay_chain::{Hash as PHash, PersistedValidationData}; +use cumulus_primitives_core::relay_chain::PersistedValidationData; use cumulus_test_relay_sproof_builder::RelayStateSproofBuilder; use fc_rpc::pending::ConsensusDataProvider; -use futures::lock::Mutex; use sc_client_api::{AuxStore, UsageProvider}; use sc_consensus::{import_queue::Verifier as VerifierT, BlockImportParams, ForkChoiceStrategy}; use sp_api::{ApiExt, ProvideRuntimeApi}; @@ -58,57 +55,6 @@ impl BuildOnAccess { } } -pub struct WaitForAuraConsensus { - pub client: Arc, - pub aura_consensus: Arc>>>>, - pub relay_chain_consensus: Arc>>>, -} - -impl Clone for WaitForAuraConsensus { - fn clone(&self) -> Self { - Self { - client: self.client.clone(), - aura_consensus: self.aura_consensus.clone(), - relay_chain_consensus: self.relay_chain_consensus.clone(), - } - } -} - -#[async_trait::async_trait] -impl ParachainConsensus for WaitForAuraConsensus -where - Client: sp_api::ProvideRuntimeApi + Send + Sync, - Client::Api: AuraApi, -{ - async fn produce_candidate( - &mut self, - parent: &Header, - relay_parent: PHash, - validation_data: &PersistedValidationData, - ) -> Option> { - let block_hash = parent.hash(); - if self - .client - .runtime_api() - .has_api::>(block_hash) - .unwrap_or(false) - { - self.aura_consensus - .lock() - .await - .get_mut() - .produce_candidate(parent, relay_parent, validation_data) - .await - } else { - self.relay_chain_consensus - .lock() - .await - .produce_candidate(parent, relay_parent, validation_data) - .await - } - } -} - pub struct Verifier { pub client: Arc, pub aura_verifier: BuildOnAccess>>, @@ -118,7 +64,7 @@ pub struct Verifier { #[async_trait::async_trait] impl VerifierT for Verifier where - Client: sp_api::ProvideRuntimeApi + Send + Sync, + Client: ProvideRuntimeApi + Send + Sync, Client::Api: AuraApi, { async fn verify(