From bad814f18af0e983a15de69f7077918f471b24c1 Mon Sep 17 00:00:00 2001 From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Date: Thu, 22 Dec 2022 13:13:08 +0100 Subject: [PATCH 01/17] txpool: don't maintain the pool during major sync Fix shall prevent from wasting the CPU during the major sync. No actions are actually required in transaction pool during the major sync. Fixes: #12903 --- client/transaction-pool/Cargo.toml | 1 + .../transaction-pool/src/enactment_state.rs | 5 ++- client/transaction-pool/src/lib.rs | 39 ++++++++++++++----- 3 files changed, 35 insertions(+), 10 deletions(-) diff --git a/client/transaction-pool/Cargo.toml b/client/transaction-pool/Cargo.toml index 7a3ab042d5a13..77441a80ff4bc 100644 --- a/client/transaction-pool/Cargo.toml +++ b/client/transaction-pool/Cargo.toml @@ -28,6 +28,7 @@ sc-transaction-pool-api = { version = "4.0.0-dev", path = "./api" } sc-utils = { version = "4.0.0-dev", path = "../utils" } sp-api = { version = "4.0.0-dev", path = "../../primitives/api" } sp-blockchain = { version = "4.0.0-dev", path = "../../primitives/blockchain" } +sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/common" } sp-core = { version = "7.0.0", path = "../../primitives/core" } sp-runtime = { version = "7.0.0", path = "../../primitives/runtime" } sp-tracing = { version = "6.0.0", path = "../../primitives/tracing" } diff --git a/client/transaction-pool/src/enactment_state.rs b/client/transaction-pool/src/enactment_state.rs index 6aac98641cf85..76edf72e4027a 100644 --- a/client/transaction-pool/src/enactment_state.rs +++ b/client/transaction-pool/src/enactment_state.rs @@ -140,7 +140,10 @@ where pub fn force_update(&mut self, event: &ChainEvent) { match event { ChainEvent::NewBestBlock { hash, .. } => self.recent_best_block = *hash, - ChainEvent::Finalized { hash, .. } => self.recent_finalized_block = *hash, + ChainEvent::Finalized { hash, .. } => { + self.recent_best_block = *hash; + self.recent_finalized_block = *hash; + }, }; log::debug!(target: "txpool", "forced update: {:?}, {:?}", self.recent_best_block, self.recent_finalized_block); } diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index f3797b180f14d..b515dc1971f61 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -66,6 +66,7 @@ use crate::metrics::MetricsLink as PrometheusMetrics; use prometheus_endpoint::Registry as PrometheusRegistry; use sp_blockchain::{HashAndNumber, TreeRoute}; +use sp_consensus::SyncOracle as SyncOracleT; type BoxedReadyIterator = Box>> + Send>; @@ -76,13 +77,15 @@ type ReadyIteratorFor = type PolledIterator = Pin> + Send>>; /// A transaction pool for a full node. -pub type FullPool = BasicPool, Block>; +pub type FullPool = + BasicPool, Block, SyncOracle>; /// Basic implementation of transaction pool that can be customized by providing PoolApi. -pub struct BasicPool +pub struct BasicPool where Block: BlockT, PoolApi: graph::ChainApi, + SyncOracle: SyncOracleT, { pool: Arc>, api: Arc, @@ -91,6 +94,7 @@ where ready_poll: Arc, Block>>>, metrics: PrometheusMetrics, enactment_state: Arc>>, + sync_oracle: Arc, } struct ReadyPoll { @@ -152,16 +156,18 @@ pub enum RevalidationType { Full, } -impl BasicPool +impl BasicPool where Block: BlockT, PoolApi: graph::ChainApi + 'static, + SyncOracle: SyncOracleT, { /// Create new basic transaction pool with provided api, for tests. pub fn new_test( pool_api: Arc, best_block_hash: Block::Hash, finalized_hash: Block::Hash, + sync_oracle: Arc, ) -> (Self, Pin + Send>>) { let pool = Arc::new(graph::Pool::new(Default::default(), true.into(), pool_api.clone())); let (revalidation_queue, background_task) = @@ -178,6 +184,7 @@ where best_block_hash, finalized_hash, ))), + sync_oracle, }, background_task, ) @@ -195,6 +202,7 @@ where best_block_number: NumberFor, best_block_hash: Block::Hash, finalized_hash: Block::Hash, + sync_oracle: Arc, ) -> Self { let pool = Arc::new(graph::Pool::new(options, is_validator, pool_api.clone())); let (revalidation_queue, background_task) = match revalidation_type { @@ -226,6 +234,7 @@ where best_block_hash, finalized_hash, ))), + sync_oracle, } } @@ -240,10 +249,11 @@ where } } -impl TransactionPool for BasicPool +impl TransactionPool for BasicPool where Block: BlockT, PoolApi: 'static + graph::ChainApi, + SyncOracle: SyncOracleT + std::marker::Send + std::marker::Sync, { type Block = PoolApi::Block; type Hash = graph::ExtrinsicHash; @@ -358,7 +368,7 @@ where } } -impl FullPool +impl FullPool where Block: BlockT, Client: sp_api::ProvideRuntimeApi @@ -372,6 +382,7 @@ where + Sync + 'static, Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue, + SyncOracle: SyncOracleT + std::marker::Send + std::marker::Sync + 'static, { /// Create new basic transaction pool for a full node with the provided api. pub fn new_full( @@ -380,6 +391,7 @@ where prometheus: Option<&PrometheusRegistry>, spawner: impl SpawnEssentialNamed, client: Arc, + sync_oracle: Arc, ) -> Arc { let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, &spawner)); let pool = Arc::new(Self::with_revalidation_type( @@ -392,6 +404,7 @@ where client.usage_info().chain.best_number, client.usage_info().chain.best_hash, client.usage_info().chain.finalized_hash, + sync_oracle, )); // make transaction pool available for off-chain runtime calls. @@ -401,8 +414,8 @@ where } } -impl sc_transaction_pool_api::LocalTransactionPool - for BasicPool, Block> +impl sc_transaction_pool_api::LocalTransactionPool + for BasicPool, Block, SyncOracle> where Block: BlockT, Client: sp_api::ProvideRuntimeApi @@ -412,6 +425,7 @@ where + sp_blockchain::HeaderMetadata, Client: Send + Sync + 'static, Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue, + SyncOracle: SyncOracleT + std::marker::Send + std::marker::Sync, { type Block = Block; type Hash = graph::ExtrinsicHash>; @@ -578,10 +592,11 @@ async fn prune_known_txs_for_block BasicPool +impl BasicPool where Block: BlockT, PoolApi: 'static + graph::ChainApi, + SyncOracle: SyncOracleT, { /// Handles enactment and retraction of blocks, prunes stale transactions /// (that have already been enacted) and resubmits transactions that were @@ -716,12 +731,18 @@ where } #[async_trait] -impl MaintainedTransactionPool for BasicPool +impl MaintainedTransactionPool for BasicPool where Block: BlockT, PoolApi: 'static + graph::ChainApi, + SyncOracle: SyncOracleT + std::marker::Send + std::marker::Sync, { async fn maintain(&self, event: ChainEvent) { + if self.sync_oracle.is_major_syncing() { + self.enactment_state.lock().force_update(&event); + return + } + let prev_finalized_block = self.enactment_state.lock().recent_finalized_block(); let compute_tree_route = |from, to| -> Result, String> { match self.api.tree_route(from, to) { From 536df11efabbede8448016084fe7b75609b0c578 Mon Sep 17 00:00:00 2001 From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Date: Thu, 22 Dec 2022 15:32:16 +0100 Subject: [PATCH 02/17] passing sync_oracle to maintain method --- client/transaction-pool/api/Cargo.toml | 1 + client/transaction-pool/api/src/lib.rs | 4 +- client/transaction-pool/src/lib.rs | 51 +++++++++++--------------- 3 files changed, 25 insertions(+), 31 deletions(-) diff --git a/client/transaction-pool/api/Cargo.toml b/client/transaction-pool/api/Cargo.toml index e14a3ff4f3839..829d94f211297 100644 --- a/client/transaction-pool/api/Cargo.toml +++ b/client/transaction-pool/api/Cargo.toml @@ -16,6 +16,7 @@ serde = { version = "1.0.136", features = ["derive"] } thiserror = "1.0.30" sp-blockchain = { version = "4.0.0-dev", path = "../../../primitives/blockchain" } sp-runtime = { version = "7.0.0", default-features = false, path = "../../../primitives/runtime" } +sp-consensus = { version = "0.10.0-dev", path = "../../../primitives/consensus/common" } [dev-dependencies] serde_json = "1.0" diff --git a/client/transaction-pool/api/src/lib.rs b/client/transaction-pool/api/src/lib.rs index c1e49ad07d7b1..49f4f86037d1f 100644 --- a/client/transaction-pool/api/src/lib.rs +++ b/client/transaction-pool/api/src/lib.rs @@ -307,7 +307,9 @@ pub enum ChainEvent { #[async_trait] pub trait MaintainedTransactionPool: TransactionPool { /// Perform maintenance - async fn maintain(&self, event: ChainEvent); + async fn maintain(&self, event: ChainEvent, sync_oracle: Arc) + where + SO: sp_consensus::SyncOracle + std::marker::Send + std::marker::Sync; } /// Transaction pool interface for submitting local transactions that exposes a diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index b515dc1971f61..c22ab8abee384 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -66,7 +66,6 @@ use crate::metrics::MetricsLink as PrometheusMetrics; use prometheus_endpoint::Registry as PrometheusRegistry; use sp_blockchain::{HashAndNumber, TreeRoute}; -use sp_consensus::SyncOracle as SyncOracleT; type BoxedReadyIterator = Box>> + Send>; @@ -77,15 +76,13 @@ type ReadyIteratorFor = type PolledIterator = Pin> + Send>>; /// A transaction pool for a full node. -pub type FullPool = - BasicPool, Block, SyncOracle>; +pub type FullPool = BasicPool, Block>; /// Basic implementation of transaction pool that can be customized by providing PoolApi. -pub struct BasicPool +pub struct BasicPool where Block: BlockT, PoolApi: graph::ChainApi, - SyncOracle: SyncOracleT, { pool: Arc>, api: Arc, @@ -94,7 +91,6 @@ where ready_poll: Arc, Block>>>, metrics: PrometheusMetrics, enactment_state: Arc>>, - sync_oracle: Arc, } struct ReadyPoll { @@ -156,18 +152,16 @@ pub enum RevalidationType { Full, } -impl BasicPool +impl BasicPool where Block: BlockT, PoolApi: graph::ChainApi + 'static, - SyncOracle: SyncOracleT, { /// Create new basic transaction pool with provided api, for tests. pub fn new_test( pool_api: Arc, best_block_hash: Block::Hash, finalized_hash: Block::Hash, - sync_oracle: Arc, ) -> (Self, Pin + Send>>) { let pool = Arc::new(graph::Pool::new(Default::default(), true.into(), pool_api.clone())); let (revalidation_queue, background_task) = @@ -184,7 +178,6 @@ where best_block_hash, finalized_hash, ))), - sync_oracle, }, background_task, ) @@ -202,7 +195,6 @@ where best_block_number: NumberFor, best_block_hash: Block::Hash, finalized_hash: Block::Hash, - sync_oracle: Arc, ) -> Self { let pool = Arc::new(graph::Pool::new(options, is_validator, pool_api.clone())); let (revalidation_queue, background_task) = match revalidation_type { @@ -234,7 +226,6 @@ where best_block_hash, finalized_hash, ))), - sync_oracle, } } @@ -249,11 +240,10 @@ where } } -impl TransactionPool for BasicPool +impl TransactionPool for BasicPool where Block: BlockT, PoolApi: 'static + graph::ChainApi, - SyncOracle: SyncOracleT + std::marker::Send + std::marker::Sync, { type Block = PoolApi::Block; type Hash = graph::ExtrinsicHash; @@ -368,7 +358,7 @@ where } } -impl FullPool +impl FullPool where Block: BlockT, Client: sp_api::ProvideRuntimeApi @@ -382,7 +372,6 @@ where + Sync + 'static, Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue, - SyncOracle: SyncOracleT + std::marker::Send + std::marker::Sync + 'static, { /// Create new basic transaction pool for a full node with the provided api. pub fn new_full( @@ -391,7 +380,6 @@ where prometheus: Option<&PrometheusRegistry>, spawner: impl SpawnEssentialNamed, client: Arc, - sync_oracle: Arc, ) -> Arc { let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, &spawner)); let pool = Arc::new(Self::with_revalidation_type( @@ -404,7 +392,6 @@ where client.usage_info().chain.best_number, client.usage_info().chain.best_hash, client.usage_info().chain.finalized_hash, - sync_oracle, )); // make transaction pool available for off-chain runtime calls. @@ -414,8 +401,8 @@ where } } -impl sc_transaction_pool_api::LocalTransactionPool - for BasicPool, Block, SyncOracle> +impl sc_transaction_pool_api::LocalTransactionPool + for BasicPool, Block> where Block: BlockT, Client: sp_api::ProvideRuntimeApi @@ -425,7 +412,6 @@ where + sp_blockchain::HeaderMetadata, Client: Send + Sync + 'static, Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue, - SyncOracle: SyncOracleT + std::marker::Send + std::marker::Sync, { type Block = Block; type Hash = graph::ExtrinsicHash>; @@ -592,11 +578,10 @@ async fn prune_known_txs_for_block BasicPool +impl BasicPool where Block: BlockT, PoolApi: 'static + graph::ChainApi, - SyncOracle: SyncOracleT, { /// Handles enactment and retraction of blocks, prunes stale transactions /// (that have already been enacted) and resubmits transactions that were @@ -731,14 +716,16 @@ where } #[async_trait] -impl MaintainedTransactionPool for BasicPool +impl MaintainedTransactionPool for BasicPool where Block: BlockT, PoolApi: 'static + graph::ChainApi, - SyncOracle: SyncOracleT + std::marker::Send + std::marker::Sync, { - async fn maintain(&self, event: ChainEvent) { - if self.sync_oracle.is_major_syncing() { + async fn maintain(&self, event: ChainEvent, sync_oracle: Arc) + where + SO: sp_consensus::SyncOracle + std::marker::Send + std::marker::Sync, + { + if sync_oracle.is_major_syncing() { self.enactment_state.lock().force_update(&event); return } @@ -788,11 +775,15 @@ where } /// Inform the transaction pool about imported and finalized blocks. -pub async fn notification_future(client: Arc, txpool: Arc) -where +pub async fn notification_future( + client: Arc, + txpool: Arc, + sync_oracle: Arc, +) where Block: BlockT, Client: sc_client_api::BlockchainEvents, Pool: MaintainedTransactionPool, + SyncOracle: sp_consensus::SyncOracle + std::marker::Sync + std::marker::Send, { let import_stream = client .import_notification_stream() @@ -801,6 +792,6 @@ where let finality_stream = client.finality_notification_stream().map(Into::into).fuse(); futures::stream::select(import_stream, finality_stream) - .for_each(|evt| txpool.maintain(evt)) + .for_each(|evt| txpool.maintain(evt, sync_oracle.clone())) .await } From 368653dda2c758b4739f78a6859d255b06bad39b Mon Sep 17 00:00:00 2001 From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Date: Thu, 22 Dec 2022 17:11:56 +0100 Subject: [PATCH 03/17] fixed: builder, txpool tests --- client/service/src/builder.rs | 8 +- client/transaction-pool/api/src/lib.rs | 2 +- .../transaction-pool/src/enactment_state.rs | 2 +- client/transaction-pool/src/lib.rs | 4 +- client/transaction-pool/tests/pool.rs | 131 ++++++++++-------- 5 files changed, 83 insertions(+), 64 deletions(-) diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 0a26c00485e2f..57c1529ef6756 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -350,6 +350,7 @@ pub trait SpawnTaskNetwork: sc_offchain::NetworkProvider + NetworkStateInfo + NetworkStatusProvider + + sp_consensus::SyncOracle + Send + Sync + 'static @@ -362,6 +363,7 @@ where T: sc_offchain::NetworkProvider + NetworkStateInfo + NetworkStatusProvider + + sp_consensus::SyncOracle + Send + Sync + 'static, @@ -498,7 +500,11 @@ where spawn_handle.spawn( "txpool-notifications", Some("transaction-pool"), - sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()), + sc_transaction_pool::notification_future( + client.clone(), + transaction_pool.clone(), + network.clone(), + ), ); spawn_handle.spawn( diff --git a/client/transaction-pool/api/src/lib.rs b/client/transaction-pool/api/src/lib.rs index 49f4f86037d1f..a83ca6aafbfbc 100644 --- a/client/transaction-pool/api/src/lib.rs +++ b/client/transaction-pool/api/src/lib.rs @@ -309,7 +309,7 @@ pub trait MaintainedTransactionPool: TransactionPool { /// Perform maintenance async fn maintain(&self, event: ChainEvent, sync_oracle: Arc) where - SO: sp_consensus::SyncOracle + std::marker::Send + std::marker::Sync; + SO: sp_consensus::SyncOracle + std::marker::Send + std::marker::Sync + ?Sized; } /// Transaction pool interface for submitting local transactions that exposes a diff --git a/client/transaction-pool/src/enactment_state.rs b/client/transaction-pool/src/enactment_state.rs index 76edf72e4027a..88d89d7631d62 100644 --- a/client/transaction-pool/src/enactment_state.rs +++ b/client/transaction-pool/src/enactment_state.rs @@ -611,6 +611,6 @@ mod enactment_state_tests { let mut es = EnactmentState::new(a().hash, a().hash); es.force_update(&ChainEvent::Finalized { hash: b1().hash, tree_route: Arc::from([]) }); - assert_es_eq(&es, a(), b1()); + assert_es_eq(&es, b1(), b1()); } } diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index c22ab8abee384..2d5d7e6e279ff 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -723,7 +723,7 @@ where { async fn maintain(&self, event: ChainEvent, sync_oracle: Arc) where - SO: sp_consensus::SyncOracle + std::marker::Send + std::marker::Sync, + SO: sp_consensus::SyncOracle + std::marker::Send + std::marker::Sync + ?Sized, { if sync_oracle.is_major_syncing() { self.enactment_state.lock().force_update(&event); @@ -783,7 +783,7 @@ pub async fn notification_future( Block: BlockT, Client: sc_client_api::BlockchainEvents, Pool: MaintainedTransactionPool, - SyncOracle: sp_consensus::SyncOracle + std::marker::Sync + std::marker::Send, + SyncOracle: sp_consensus::SyncOracle + std::marker::Sync + std::marker::Send + ?Sized, { let import_stream = client .import_notification_stream() diff --git a/client/transaction-pool/tests/pool.rs b/client/transaction-pool/tests/pool.rs index 27d62c3250abc..20a7d6c8b8780 100644 --- a/client/transaction-pool/tests/pool.rs +++ b/client/transaction-pool/tests/pool.rs @@ -77,6 +77,16 @@ fn create_basic_pool(test_api: TestApi) -> BasicPool { create_basic_pool_with_genesis(Arc::from(test_api)).0 } +fn block_on_pool_maintain_with_event( + pool: &BasicPool, + event: ChainEvent, +) where + PoolApi: sc_transaction_pool::ChainApi + 'static, + Block: sp_runtime::traits::Block, +{ + block_on(pool.maintain(event, Arc::new(sp_consensus::NoNetwork))); +} + const SOURCE: TransactionSource = TransactionSource::External; #[test] @@ -165,7 +175,7 @@ fn only_prune_on_new_best() { let header = api.push_block(2, vec![uxt], true); let event = ChainEvent::NewBestBlock { hash: header.hash(), tree_route: None }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); assert_eq!(pool.status().ready, 0); } @@ -236,7 +246,7 @@ fn should_prune_old_during_maintenance() { let header = api.push_block(1, vec![xt.clone()], true); - block_on(pool.maintain(block_event(header))); + block_on_pool_maintain_with_event(&pool, block_event(header)); assert_eq!(pool.status().ready, 0); } @@ -256,7 +266,7 @@ fn should_revalidate_during_maintenance() { api.add_invalid(&xt2); - block_on(pool.maintain(block_event(header))); + block_on_pool_maintain_with_event(&pool, block_event(header)); assert_eq!(pool.status().ready, 1); // test that pool revalidated transaction that left ready and not included in the block @@ -280,7 +290,7 @@ fn should_resubmit_from_retracted_during_maintenance() { let event = block_event_with_retracted(header, fork_header.hash(), pool.api()); - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); assert_eq!(pool.status().ready, 1); } @@ -298,7 +308,7 @@ fn should_not_resubmit_from_retracted_during_maintenance_if_tx_is_also_in_enacte let event = block_event_with_retracted(header, fork_header.hash(), pool.api()); - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); assert_eq!(pool.status().ready, 0); } @@ -317,7 +327,7 @@ fn should_not_retain_invalid_hashes_from_retracted() { api.add_invalid(&xt); let event = block_event_with_retracted(header, fork_header.hash(), pool.api()); - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); assert_eq!( futures::executor::block_on_stream(watcher).collect::>(), @@ -341,14 +351,14 @@ fn should_revalidate_across_many_blocks() { assert_eq!(pool.status().ready, 2); let header = api.push_block(1, vec![], true); - block_on(pool.maintain(block_event(header))); + block_on_pool_maintain_with_event(&pool, block_event(header)); block_on(pool.submit_one(&BlockId::number(1), SOURCE, xt3.clone())).expect("1. Imported"); assert_eq!(pool.status().ready, 3); let header = api.push_block(2, vec![xt1.clone()], true); let block_hash = header.hash(); - block_on(pool.maintain(block_event(header.clone()))); + block_on_pool_maintain_with_event(&pool, block_event(header.clone())); block_on( watcher1 @@ -391,7 +401,7 @@ fn should_push_watchers_during_maintenance() { // clear timer events if any let header = api.push_block(1, vec![], true); - block_on(pool.maintain(block_event(header))); + block_on_pool_maintain_with_event(&pool, block_event(header)); // then // hash3 is now invalid @@ -409,10 +419,10 @@ fn should_push_watchers_during_maintenance() { // when let header = api.push_block(2, vec![tx0, tx1, tx2], true); let header_hash = header.hash(); - block_on(pool.maintain(block_event(header))); + block_on_pool_maintain_with_event(&pool, block_event(header)); let event = ChainEvent::Finalized { hash: header_hash, tree_route: Arc::from(vec![]) }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); // then // events for hash0 are: Ready, InBlock @@ -456,10 +466,10 @@ fn finalization() { let header = pool.api().chain().read().block_by_number.get(&2).unwrap()[0].0.header().clone(); let event = ChainEvent::NewBestBlock { hash: header.hash(), tree_route: None }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); let event = ChainEvent::Finalized { hash: header.hash(), tree_route: Arc::from(vec![]) }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); let mut stream = futures::executor::block_on_stream(watcher); assert_eq!(stream.next(), Some(TransactionStatus::Ready)); @@ -494,7 +504,7 @@ fn fork_aware_finalization() { let c2; let d2; - block_on(pool.maintain(block_event(a_header))); + block_on_pool_maintain_with_event(&pool, block_event(a_header)); // block B1 { @@ -508,10 +518,10 @@ fn fork_aware_finalization() { log::trace!(target:"txpool", ">> B1: {:?} {:?}", header.hash(), header); let event = ChainEvent::NewBestBlock { hash: header.hash(), tree_route: None }; b1 = header.hash(); - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); assert_eq!(pool.status().ready, 0); let event = ChainEvent::Finalized { hash: b1, tree_route: Arc::from(vec![]) }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); } // block C2 @@ -524,7 +534,7 @@ fn fork_aware_finalization() { log::trace!(target:"txpool", ">> C2: {:?} {:?}", header.hash(), header); let event = ChainEvent::NewBestBlock { hash: header.hash(), tree_route: None }; c2 = header.hash(); - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); assert_eq!(pool.status().ready, 0); } @@ -539,7 +549,7 @@ fn fork_aware_finalization() { log::trace!(target:"txpool", ">> D2: {:?} {:?}", header.hash(), header); let event = ChainEvent::NewBestBlock { hash: header.hash(), tree_route: None }; d2 = header.hash(); - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); assert_eq!(pool.status().ready, 0); } @@ -554,11 +564,11 @@ fn fork_aware_finalization() { c1 = header.hash(); canon_watchers.push((watcher, header.hash())); let event = block_event_with_retracted(header.clone(), d2, pool.api()); - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); assert_eq!(pool.status().ready, 2); let event = ChainEvent::Finalized { hash: header.hash(), tree_route: Arc::from(vec![]) }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); } // block D1 @@ -573,10 +583,10 @@ fn fork_aware_finalization() { canon_watchers.push((w, header.hash())); let event = ChainEvent::NewBestBlock { hash: header.hash(), tree_route: None }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); assert_eq!(pool.status().ready, 2); let event = ChainEvent::Finalized { hash: d1, tree_route: Arc::from(vec![]) }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); } let e1; @@ -587,9 +597,12 @@ fn fork_aware_finalization() { log::trace!(target:"txpool", ">> E1: {:?} {:?}", header.hash(), header); e1 = header.hash(); let event = ChainEvent::NewBestBlock { hash: header.hash(), tree_route: None }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); assert_eq!(pool.status().ready, 0); - block_on(pool.maintain(ChainEvent::Finalized { hash: e1, tree_route: Arc::from(vec![]) })); + block_on_pool_maintain_with_event( + &pool, + ChainEvent::Finalized { hash: e1, tree_route: Arc::from(vec![]) }, + ); } for (canon_watcher, h) in canon_watchers { @@ -646,7 +659,7 @@ fn prune_and_retract_tx_at_same_time() { assert_eq!(pool.status().ready, 1); let event = ChainEvent::NewBestBlock { hash: header.hash(), tree_route: None }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); assert_eq!(pool.status().ready, 0); header.hash() }; @@ -657,11 +670,11 @@ fn prune_and_retract_tx_at_same_time() { assert_eq!(pool.status().ready, 0); let event = block_event_with_retracted(header.clone(), b1, pool.api()); - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); assert_eq!(pool.status().ready, 0); let event = ChainEvent::Finalized { hash: header.hash(), tree_route: Arc::from(vec![]) }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); header.hash() }; @@ -716,7 +729,7 @@ fn resubmit_tx_of_fork_that_is_not_part_of_retracted() { let event = ChainEvent::NewBestBlock { hash: header.hash(), tree_route: None }; d0 = header.hash(); - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); assert_eq!(pool.status().ready, 0); } @@ -733,7 +746,7 @@ fn resubmit_tx_of_fork_that_is_not_part_of_retracted() { //push new best block let header = pool.api().push_block(2, vec![], true); let event = block_event_with_retracted(header, d0, pool.api()); - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); assert_eq!(pool.status().ready, 2); } } @@ -769,7 +782,7 @@ fn resubmit_from_retracted_fork() { let header = pool.api().push_block(2, vec![tx0.clone()], true); assert_eq!(pool.status().ready, 1); - block_on(pool.maintain(block_event(header))); + block_on_pool_maintain_with_event(&pool, block_event(header)); assert_eq!(pool.status().ready, 0); } @@ -778,7 +791,7 @@ fn resubmit_from_retracted_fork() { let _ = block_on(pool.submit_and_watch(&BlockId::number(1), SOURCE, tx1.clone())) .expect("1. Imported"); let header = pool.api().push_block(3, vec![tx1.clone()], true); - block_on(pool.maintain(block_event(header))); + block_on_pool_maintain_with_event(&pool, block_event(header)); assert_eq!(pool.status().ready, 0); } @@ -787,7 +800,7 @@ fn resubmit_from_retracted_fork() { let _ = block_on(pool.submit_and_watch(&BlockId::number(1), SOURCE, tx2.clone())) .expect("1. Imported"); let header = pool.api().push_block(4, vec![tx2.clone()], true); - block_on(pool.maintain(block_event(header.clone()))); + block_on_pool_maintain_with_event(&pool, block_event(header.clone())); assert_eq!(pool.status().ready, 0); header.hash() }; @@ -826,7 +839,7 @@ fn resubmit_from_retracted_fork() { assert_eq!(expected_ready, ready); let event = block_event_with_retracted(f1_header, f0, pool.api()); - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); assert_eq!(pool.status().ready, 3); let ready = pool.ready().map(|t| t.data.encode()).collect::>(); @@ -851,7 +864,7 @@ fn ready_set_should_resolve_after_block_update() { let xt1 = uxt(Alice, 209); block_on(pool.submit_one(&BlockId::number(1), SOURCE, xt1.clone())).expect("1. Imported"); - block_on(pool.maintain(block_event(header))); + block_on_pool_maintain_with_event(&pool, block_event(header)); assert!(pool.ready_at(1).now_or_never().is_some()); } @@ -873,7 +886,7 @@ fn ready_set_should_eventually_resolve_when_block_update_arrives() { panic!("Ready set should not be ready before block update!"); } - block_on(pool.maintain(block_event(header))); + block_on_pool_maintain_with_event(&pool, block_event(header)); match ready_set_future.poll_unpin(&mut context) { Poll::Pending => { @@ -964,7 +977,7 @@ fn import_notification_to_pool_maintain_works() { // Get the notification of the block import and maintain the pool with it, // Now, the pool should not contain any transactions. let evt = import_stream.next().expect("Importing a block leads to an event"); - block_on(pool.maintain(evt.try_into().expect("Imported as new best block"))); + block_on_pool_maintain_with_event(&*pool, evt.try_into().expect("Imported as new best block")); assert_eq!(pool.status().ready, 0); } @@ -980,7 +993,7 @@ fn pruning_a_transaction_should_remove_it_from_best_transaction() { let header = api.push_block(1, vec![xt1.clone()], true); // This will prune `xt1`. - block_on(pool.maintain(block_event(header))); + block_on_pool_maintain_with_event(&pool, block_event(header)); assert_eq!(pool.status().ready, 0); } @@ -1015,7 +1028,7 @@ fn stale_transactions_are_pruned() { // Import block let header = api.push_block(1, xts, true); - block_on(pool.maintain(block_event(header))); + block_on_pool_maintain_with_event(&pool, block_event(header)); // The imported transactions have a different hash and should not evict our initial // transactions. assert_eq!(pool.status().future, 3); @@ -1023,7 +1036,7 @@ fn stale_transactions_are_pruned() { // Import enough blocks to make our transactions stale for n in 1..66 { let header = api.push_block(n, vec![], true); - block_on(pool.maintain(block_event(header))); + block_on_pool_maintain_with_event(&pool, block_event(header)); } assert_eq!(pool.status().future, 0); @@ -1045,7 +1058,7 @@ fn finalized_only_handled_correctly() { let event = ChainEvent::Finalized { hash: header.clone().hash(), tree_route: Arc::from(vec![]) }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); assert_eq!(pool.status().ready, 0); @@ -1073,8 +1086,8 @@ fn best_block_after_finalized_handled_correctly() { let event = ChainEvent::Finalized { hash: header.clone().hash(), tree_route: Arc::from(vec![]) }; - block_on(pool.maintain(event)); - block_on(pool.maintain(block_event(header.clone()))); + block_on_pool_maintain_with_event(&pool, event); + block_on_pool_maintain_with_event(&pool, block_event(header.clone())); assert_eq!(pool.status().ready, 0); @@ -1137,13 +1150,13 @@ fn switching_fork_with_finalized_works() { { let event = ChainEvent::NewBestBlock { hash: b1_header.hash(), tree_route: None }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); assert_eq!(pool.status().ready, 1); } { let event = ChainEvent::Finalized { hash: b2_header.hash(), tree_route: Arc::from(vec![]) }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); } { @@ -1216,28 +1229,28 @@ fn switching_fork_multiple_times_works() { { // phase-0 let event = ChainEvent::NewBestBlock { hash: b1_header.hash(), tree_route: None }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); assert_eq!(pool.status().ready, 1); } { // phase-1 let event = block_event_with_retracted(b2_header.clone(), b1_header.hash(), pool.api()); - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); assert_eq!(pool.status().ready, 0); } { // phase-2 let event = block_event_with_retracted(b1_header.clone(), b2_header.hash(), pool.api()); - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); assert_eq!(pool.status().ready, 1); } { // phase-3 let event = ChainEvent::Finalized { hash: b2_header.hash(), tree_route: Arc::from(vec![]) }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); } { @@ -1340,13 +1353,13 @@ fn two_blocks_delayed_finalization_works() { { let event = ChainEvent::Finalized { hash: a_header.hash(), tree_route: Arc::from(vec![]) }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); assert_eq!(pool.status().ready, 3); } { let event = ChainEvent::NewBestBlock { hash: d1_header.hash(), tree_route: None }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); assert_eq!(pool.status().ready, 0); } @@ -1355,13 +1368,13 @@ fn two_blocks_delayed_finalization_works() { hash: c1_header.hash(), tree_route: Arc::from(vec![b1_header.hash()]), }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); } // this is to collect events from_charlie_watcher and make sure nothing was retracted { let event = ChainEvent::Finalized { hash: d1_header.hash(), tree_route: Arc::from(vec![]) }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); } { @@ -1439,27 +1452,27 @@ fn delayed_finalization_does_not_retract() { { // phase-0 let event = ChainEvent::NewBestBlock { hash: b1_header.hash(), tree_route: None }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); assert_eq!(pool.status().ready, 1); } { // phase-1 let event = ChainEvent::NewBestBlock { hash: c1_header.hash(), tree_route: None }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); assert_eq!(pool.status().ready, 0); } { // phase-2 let event = ChainEvent::Finalized { hash: b1_header.hash(), tree_route: Arc::from(vec![]) }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); } { // phase-3 let event = ChainEvent::Finalized { hash: c1_header.hash(), tree_route: Arc::from(vec![]) }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); } { @@ -1533,7 +1546,7 @@ fn best_block_after_finalization_does_not_retract() { { let event = ChainEvent::Finalized { hash: a_header.hash(), tree_route: Arc::from(vec![]) }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); } { @@ -1541,13 +1554,13 @@ fn best_block_after_finalization_does_not_retract() { hash: c1_header.hash(), tree_route: Arc::from(vec![a_header.hash(), b1_header.hash()]), }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); assert_eq!(pool.status().ready, 0); } { let event = ChainEvent::NewBestBlock { hash: b1_header.hash(), tree_route: None }; - block_on(pool.maintain(event)); + block_on_pool_maintain_with_event(&pool, event); } { From f13c244de837d60cc4a2c9e2ff64be918fd57ca6 Mon Sep 17 00:00:00 2001 From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Date: Tue, 10 Jan 2023 14:25:39 +0100 Subject: [PATCH 04/17] do not maintain tx-pool if node gone out of sync --- Cargo.lock | 1 + client/transaction-pool/src/lib.rs | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index eb644202ae382..e44332b9d24e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9076,6 +9076,7 @@ dependencies = [ "serde", "serde_json", "sp-blockchain", + "sp-consensus", "sp-runtime", "thiserror", ] diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index 2d5d7e6e279ff..851edd249d588 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -591,6 +591,11 @@ where let pool = self.pool.clone(); let api = self.api.clone(); + // do not process maintain txpool if node is out of sync + if tree_route.enacted().len() > 20 { + return + } + let (hash, block_number) = match tree_route.last() { Some(HashAndNumber { hash, number }) => (hash, number), None => { From 80325696fada21c6b2e259849bb00a3db72ba5e7 Mon Sep 17 00:00:00 2001 From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Date: Thu, 12 Jan 2023 08:27:01 +0100 Subject: [PATCH 05/17] EnactmentAction: all logic moved to EnactmentState Tests to be done. --- client/transaction-pool/Cargo.toml | 1 + .../transaction-pool/src/enactment_state.rs | 73 ++++++++++++++++--- client/transaction-pool/src/graph/pool.rs | 4 +- client/transaction-pool/src/lib.rs | 28 +++---- 4 files changed, 79 insertions(+), 27 deletions(-) diff --git a/client/transaction-pool/Cargo.toml b/client/transaction-pool/Cargo.toml index 77441a80ff4bc..0e141ae4e448f 100644 --- a/client/transaction-pool/Cargo.toml +++ b/client/transaction-pool/Cargo.toml @@ -19,6 +19,7 @@ futures = "0.3.21" futures-timer = "3.0.2" linked-hash-map = "0.5.4" log = "0.4.17" +num-traits = "0.2.8" parking_lot = "0.12.1" serde = { version = "1.0.136", features = ["derive"] } thiserror = "1.0.30" diff --git a/client/transaction-pool/src/enactment_state.rs b/client/transaction-pool/src/enactment_state.rs index 88d89d7631d62..6b743f0a38623 100644 --- a/client/transaction-pool/src/enactment_state.rs +++ b/client/transaction-pool/src/enactment_state.rs @@ -18,9 +18,12 @@ //! Substrate transaction pool implementation. +use num_traits::CheckedSub; use sc_transaction_pool_api::ChainEvent; use sp_blockchain::TreeRoute; -use sp_runtime::traits::Block as BlockT; +use sp_runtime::traits::{Block as BlockT, NumberFor}; + +const SKIP_MAINTAINANCE_THRESHOLD: u16 = 20; /// Helper struct for keeping track of the current state of processed new best /// block and finalized events. The main purpose of keeping track of this state @@ -54,6 +57,12 @@ where recent_finalized_block: Block::Hash, } +pub enum EnactmentAction { + Skip, + HandleEnactment(TreeRoute), + HandleFinalization, +} + impl EnactmentState where Block: BlockT, @@ -68,32 +77,66 @@ where self.recent_finalized_block } + /// Returns the recently finalized block. + pub fn recent_best_block(&self) -> Block::Hash { + self.recent_best_block + } + /// Updates the state according to the given `ChainEvent`, returning /// `Some(tree_route)` with a tree route including the blocks that need to /// be enacted/retracted. If no enactment is needed then `None` is returned. - pub fn update( + pub fn update( &mut self, event: &ChainEvent, - tree_route: &F, - ) -> Result>, String> + tree_route: &TreeRouteF, + is_major_syncing: &IsMajorSyncF, + hash_to_number: &BlockNumberF, + ) -> Result, String> where - F: Fn(Block::Hash, Block::Hash) -> Result, String>, + TreeRouteF: Fn(Block::Hash, Block::Hash) -> Result, String>, + IsMajorSyncF: Fn() -> bool, + BlockNumberF: Fn(Block::Hash) -> Result>, String>, { + if is_major_syncing() { + log::info!(target: "txpool", "skip: major"); + self.force_update(event); + return Ok(EnactmentAction::Skip) + } + let (new_hash, finalized) = match event { ChainEvent::NewBestBlock { hash, .. } => (*hash, false), ChainEvent::Finalized { hash, .. } => (*hash, true), }; + let distance = match (hash_to_number(new_hash), hash_to_number(self.recent_best_block())) { + (Ok(Some(notified)), Ok(Some(current))) => notified.checked_sub(¤t), + _ => None, + }; + + // do not process maintain txpool if node is out of sync + if Some(SKIP_MAINTAINANCE_THRESHOLD.into()) < distance { + log::info!(target: "txpool", "skip: long"); + self.force_update(event); + return Ok(EnactmentAction::Skip) + } + // block was already finalized if self.recent_finalized_block == new_hash { log::debug!(target: "txpool", "handle_enactment: block already finalized"); - return Ok(None) + return Ok(EnactmentAction::Skip) //Ok(None) } // compute actual tree route from best_block to notified block, and use // it instead of tree_route provided with event let tree_route = tree_route(self.recent_best_block, new_hash)?; + // do not process maintain txpool if node is out of sync + if tree_route.enacted().len() > SKIP_MAINTAINANCE_THRESHOLD.into() { + log::info!(target: "txpool", "skip: enacted too long"); + self.force_update(event); + return Ok(EnactmentAction::Skip) + } + log::debug!( target: "txpool", "resolve hash:{:?} finalized:{:?} tree_route:{:?} best_block:{:?} finalized_block:{:?}", @@ -109,7 +152,7 @@ where "Recently finalized block {} would be retracted by ChainEvent {}, skipping", self.recent_finalized_block, new_hash ); - return Ok(None) + return Ok(EnactmentAction::Skip) //Ok(None) } if finalized { @@ -124,7 +167,7 @@ where target: "txpool", "handle_enactment: no newly enacted blocks since recent best block" ); - return Ok(None) + return Ok(EnactmentAction::HandleFinalization) } // otherwise enacted finalized block becomes best block... @@ -132,7 +175,7 @@ where self.recent_best_block = new_hash; - Ok(Some(tree_route)) + Ok(EnactmentAction::HandleEnactment(tree_route)) } /// Forces update of the state according to the given `ChainEvent`. Intended to be used as a @@ -151,7 +194,7 @@ where #[cfg(test)] mod enactment_state_tests { - use super::EnactmentState; + use super::{EnactmentAction, EnactmentState}; use sc_transaction_pool_api::ChainEvent; use sp_blockchain::{HashAndNumber, TreeRoute}; use std::sync::Arc; @@ -395,8 +438,11 @@ mod enactment_state_tests { }, &tree_route, ) + .map(|s| match s { + EnactmentAction::HandleEnactment(_) => true, + _ => false, + }) .unwrap() - .is_some() } fn trigger_finalized( @@ -415,8 +461,11 @@ mod enactment_state_tests { state .update(&ChainEvent::Finalized { hash: acted_on, tree_route: v.into() }, &tree_route) + .map(|s| match s { + EnactmentAction::HandleEnactment(_) => true, + _ => false, + }) .unwrap() - .is_some() } fn assert_es_eq( diff --git a/client/transaction-pool/src/graph/pool.rs b/client/transaction-pool/src/graph/pool.rs index 8e3570d1dbd1f..471670aa34d43 100644 --- a/client/transaction-pool/src/graph/pool.rs +++ b/client/transaction-pool/src/graph/pool.rs @@ -272,7 +272,7 @@ impl Pool { // to get validity info and tags that the extrinsic provides. None => { // Avoid validating block txs if the pool is empty - if !self.validated_pool.status().is_empty() { + if true || !self.validated_pool.status().is_empty() { let validity = self .validated_pool .api() @@ -283,6 +283,8 @@ impl Pool { ) .await; + log::trace!(target: "txpool", "validating {:?}", validity); + if let Ok(Ok(validity)) = validity { future_tags.extend(validity.provides); } diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index 851edd249d588..b5172a2532ff8 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -33,7 +33,7 @@ mod tests; pub use crate::api::FullChainApi; use async_trait::async_trait; -use enactment_state::EnactmentState; +use enactment_state::{EnactmentAction, EnactmentState}; use futures::{ channel::oneshot, future::{self, ready}, @@ -591,11 +591,6 @@ where let pool = self.pool.clone(); let api = self.api.clone(); - // do not process maintain txpool if node is out of sync - if tree_route.enacted().len() > 20 { - return - } - let (hash, block_number) = match tree_route.last() { Some(HashAndNumber { hash, number }) => (hash, number), None => { @@ -730,11 +725,6 @@ where where SO: sp_consensus::SyncOracle + std::marker::Send + std::marker::Sync + ?Sized, { - if sync_oracle.is_major_syncing() { - self.enactment_state.lock().force_update(&event); - return - } - let prev_finalized_block = self.enactment_state.lock().recent_finalized_block(); let compute_tree_route = |from, to| -> Result, String> { match self.api.tree_route(from, to) { @@ -746,15 +736,25 @@ where } }; - let result = self.enactment_state.lock().update(&event, &compute_tree_route); + let is_major_syncing = || sync_oracle.is_major_syncing(); + let block_id_to_number = + |hash| self.api.block_id_to_number(&BlockId::Hash(hash)).map_err(|e| format!("{}", e)); + + let result = self.enactment_state.lock().update( + &event, + &compute_tree_route, + &is_major_syncing, + &block_id_to_number, + ); match result { Err(msg) => { log::debug!(target: "txpool", "{msg}"); self.enactment_state.lock().force_update(&event); }, - Ok(None) => {}, - Ok(Some(tree_route)) => { + Ok(EnactmentAction::Skip) => return, + Ok(EnactmentAction::HandleFinalization) => {}, + Ok(EnactmentAction::HandleEnactment(tree_route)) => { self.handle_enactment(tree_route).await; }, }; From 614960630d30cc806a41c53631f971f65c56c58e Mon Sep 17 00:00:00 2001 From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Date: Thu, 12 Jan 2023 08:46:17 +0100 Subject: [PATCH 06/17] maintain guard logic moved directly to MaintainedTransactionPool --- Cargo.lock | 1 + .../transaction-pool/src/enactment_state.rs | 68 ++++--------------- client/transaction-pool/src/lib.rs | 54 +++++++++++---- 3 files changed, 53 insertions(+), 70 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e44332b9d24e4..dafd650f8cc25 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9045,6 +9045,7 @@ dependencies = [ "futures-timer", "linked-hash-map", "log", + "num-traits", "parity-scale-codec", "parking_lot 0.12.1", "sc-block-builder", diff --git a/client/transaction-pool/src/enactment_state.rs b/client/transaction-pool/src/enactment_state.rs index 6b743f0a38623..941564cf32807 100644 --- a/client/transaction-pool/src/enactment_state.rs +++ b/client/transaction-pool/src/enactment_state.rs @@ -18,12 +18,9 @@ //! Substrate transaction pool implementation. -use num_traits::CheckedSub; use sc_transaction_pool_api::ChainEvent; use sp_blockchain::TreeRoute; -use sp_runtime::traits::{Block as BlockT, NumberFor}; - -const SKIP_MAINTAINANCE_THRESHOLD: u16 = 20; +use sp_runtime::traits::Block as BlockT; /// Helper struct for keeping track of the current state of processed new best /// block and finalized events. The main purpose of keeping track of this state @@ -57,12 +54,6 @@ where recent_finalized_block: Block::Hash, } -pub enum EnactmentAction { - Skip, - HandleEnactment(TreeRoute), - HandleFinalization, -} - impl EnactmentState where Block: BlockT, @@ -85,58 +76,29 @@ where /// Updates the state according to the given `ChainEvent`, returning /// `Some(tree_route)` with a tree route including the blocks that need to /// be enacted/retracted. If no enactment is needed then `None` is returned. - pub fn update( + pub fn update( &mut self, event: &ChainEvent, - tree_route: &TreeRouteF, - is_major_syncing: &IsMajorSyncF, - hash_to_number: &BlockNumberF, - ) -> Result, String> + tree_route: &F, + ) -> Result>, String> where - TreeRouteF: Fn(Block::Hash, Block::Hash) -> Result, String>, - IsMajorSyncF: Fn() -> bool, - BlockNumberF: Fn(Block::Hash) -> Result>, String>, + F: Fn(Block::Hash, Block::Hash) -> Result, String>, { - if is_major_syncing() { - log::info!(target: "txpool", "skip: major"); - self.force_update(event); - return Ok(EnactmentAction::Skip) - } - let (new_hash, finalized) = match event { ChainEvent::NewBestBlock { hash, .. } => (*hash, false), ChainEvent::Finalized { hash, .. } => (*hash, true), }; - let distance = match (hash_to_number(new_hash), hash_to_number(self.recent_best_block())) { - (Ok(Some(notified)), Ok(Some(current))) => notified.checked_sub(¤t), - _ => None, - }; - - // do not process maintain txpool if node is out of sync - if Some(SKIP_MAINTAINANCE_THRESHOLD.into()) < distance { - log::info!(target: "txpool", "skip: long"); - self.force_update(event); - return Ok(EnactmentAction::Skip) - } - // block was already finalized if self.recent_finalized_block == new_hash { log::debug!(target: "txpool", "handle_enactment: block already finalized"); - return Ok(EnactmentAction::Skip) //Ok(None) + return Ok(None) } // compute actual tree route from best_block to notified block, and use // it instead of tree_route provided with event let tree_route = tree_route(self.recent_best_block, new_hash)?; - // do not process maintain txpool if node is out of sync - if tree_route.enacted().len() > SKIP_MAINTAINANCE_THRESHOLD.into() { - log::info!(target: "txpool", "skip: enacted too long"); - self.force_update(event); - return Ok(EnactmentAction::Skip) - } - log::debug!( target: "txpool", "resolve hash:{:?} finalized:{:?} tree_route:{:?} best_block:{:?} finalized_block:{:?}", @@ -152,7 +114,7 @@ where "Recently finalized block {} would be retracted by ChainEvent {}, skipping", self.recent_finalized_block, new_hash ); - return Ok(EnactmentAction::Skip) //Ok(None) + return Ok(None) } if finalized { @@ -167,7 +129,7 @@ where target: "txpool", "handle_enactment: no newly enacted blocks since recent best block" ); - return Ok(EnactmentAction::HandleFinalization) + return Ok(None) } // otherwise enacted finalized block becomes best block... @@ -175,7 +137,7 @@ where self.recent_best_block = new_hash; - Ok(EnactmentAction::HandleEnactment(tree_route)) + Ok(Some(tree_route)) } /// Forces update of the state according to the given `ChainEvent`. Intended to be used as a @@ -194,7 +156,7 @@ where #[cfg(test)] mod enactment_state_tests { - use super::{EnactmentAction, EnactmentState}; + use super::EnactmentState; use sc_transaction_pool_api::ChainEvent; use sp_blockchain::{HashAndNumber, TreeRoute}; use std::sync::Arc; @@ -438,11 +400,8 @@ mod enactment_state_tests { }, &tree_route, ) - .map(|s| match s { - EnactmentAction::HandleEnactment(_) => true, - _ => false, - }) .unwrap() + .is_some() } fn trigger_finalized( @@ -461,11 +420,8 @@ mod enactment_state_tests { state .update(&ChainEvent::Finalized { hash: acted_on, tree_route: v.into() }, &tree_route) - .map(|s| match s { - EnactmentAction::HandleEnactment(_) => true, - _ => false, - }) .unwrap() + .is_some() } fn assert_es_eq( diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index b5172a2532ff8..f7158b1203364 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -33,7 +33,7 @@ mod tests; pub use crate::api::FullChainApi; use async_trait::async_trait; -use enactment_state::{EnactmentAction, EnactmentState}; +use enactment_state::EnactmentState; use futures::{ channel::oneshot, future::{self, ready}, @@ -42,6 +42,7 @@ use futures::{ pub use graph::{ base_pool::Limit as PoolLimit, ChainApi, Options, Pool, Transaction, ValidatedTransaction, }; +use num_traits::CheckedSub; use parking_lot::Mutex; use std::{ collections::{HashMap, HashSet}, @@ -49,6 +50,7 @@ use std::{ sync::Arc, }; + use graph::{ExtrinsicHash, IsValidator}; use sc_transaction_pool_api::{ error::Error as TxPoolError, ChainEvent, ImportNotificationStream, MaintainedTransactionPool, @@ -67,6 +69,8 @@ use prometheus_endpoint::Registry as PrometheusRegistry; use sp_blockchain::{HashAndNumber, TreeRoute}; +const SKIP_MAINTAINANCE_THRESHOLD: u16 = 20; + type BoxedReadyIterator = Box>> + Send>; @@ -725,6 +729,32 @@ where where SO: sp_consensus::SyncOracle + std::marker::Send + std::marker::Sync + ?Sized, { + if sync_oracle.is_major_syncing() { + log::info!(target: "txpool", "skip maintain: major syncing"); + self.enactment_state.lock().force_update(&event); + return + } + + // do not maintain txpool if block distance is to high + let skip_maintanance = match event { + ChainEvent::Finalized { hash, .. } | ChainEvent::NewBestBlock { hash, .. } => match ( + self.api.block_id_to_number(&BlockId::Hash(hash)), + self.api.block_id_to_number(&BlockId::Hash( + self.enactment_state.lock().recent_best_block(), + )), + ) { + (Ok(Some(notified)), Ok(Some(current))) => + Some(SKIP_MAINTAINANCE_THRESHOLD.into()) < notified.checked_sub(¤t), + _ => true, + }, + }; + + if skip_maintanance { + log::info!(target: "txpool", "skip maintain: tree_route would be too long"); + self.enactment_state.lock().force_update(&event); + return + } + let prev_finalized_block = self.enactment_state.lock().recent_finalized_block(); let compute_tree_route = |from, to| -> Result, String> { match self.api.tree_route(from, to) { @@ -736,25 +766,21 @@ where } }; - let is_major_syncing = || sync_oracle.is_major_syncing(); - let block_id_to_number = - |hash| self.api.block_id_to_number(&BlockId::Hash(hash)).map_err(|e| format!("{}", e)); - - let result = self.enactment_state.lock().update( - &event, - &compute_tree_route, - &is_major_syncing, - &block_id_to_number, - ); + let result = self.enactment_state.lock().update(&event, &compute_tree_route); match result { Err(msg) => { log::debug!(target: "txpool", "{msg}"); self.enactment_state.lock().force_update(&event); }, - Ok(EnactmentAction::Skip) => return, - Ok(EnactmentAction::HandleFinalization) => {}, - Ok(EnactmentAction::HandleEnactment(tree_route)) => { + Ok(None) => {}, + Ok(Some(tree_route)) => { + // do not maintain txpool if enacted path is too long + if tree_route.enacted().len() > SKIP_MAINTAINANCE_THRESHOLD.into() { + log::info!(target: "txpool", "skip maintain: enacted too long"); + return + } + self.handle_enactment(tree_route).await; }, }; From 3b5ddc4aed8e4a2db2ed58287336a221e0765293 Mon Sep 17 00:00:00 2001 From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Date: Thu, 12 Jan 2023 09:08:11 +0100 Subject: [PATCH 07/17] minor fixes --- client/transaction-pool/src/graph/pool.rs | 4 +--- client/transaction-pool/src/lib.rs | 1 - 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/client/transaction-pool/src/graph/pool.rs b/client/transaction-pool/src/graph/pool.rs index 471670aa34d43..8e3570d1dbd1f 100644 --- a/client/transaction-pool/src/graph/pool.rs +++ b/client/transaction-pool/src/graph/pool.rs @@ -272,7 +272,7 @@ impl Pool { // to get validity info and tags that the extrinsic provides. None => { // Avoid validating block txs if the pool is empty - if true || !self.validated_pool.status().is_empty() { + if !self.validated_pool.status().is_empty() { let validity = self .validated_pool .api() @@ -283,8 +283,6 @@ impl Pool { ) .await; - log::trace!(target: "txpool", "validating {:?}", validity); - if let Ok(Ok(validity)) = validity { future_tags.extend(validity.provides); } diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index f7158b1203364..9a31d11c61f9b 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -50,7 +50,6 @@ use std::{ sync::Arc, }; - use graph::{ExtrinsicHash, IsValidator}; use sc_transaction_pool_api::{ error::Error as TxPoolError, ChainEvent, ImportNotificationStream, MaintainedTransactionPool, From 6fd64ff47d495124e03f788964c9b877c5f5e3db Mon Sep 17 00:00:00 2001 From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Date: Thu, 12 Jan 2023 14:02:36 +0100 Subject: [PATCH 08/17] EnactmentAction: all logic moved to EnactmentState (again) --- .../transaction-pool/src/enactment_state.rs | 224 ++++++++++++++---- client/transaction-pool/src/lib.rs | 54 ++--- 2 files changed, 188 insertions(+), 90 deletions(-) diff --git a/client/transaction-pool/src/enactment_state.rs b/client/transaction-pool/src/enactment_state.rs index 941564cf32807..35297416b6f2e 100644 --- a/client/transaction-pool/src/enactment_state.rs +++ b/client/transaction-pool/src/enactment_state.rs @@ -18,9 +18,12 @@ //! Substrate transaction pool implementation. +use num_traits::CheckedSub; use sc_transaction_pool_api::ChainEvent; use sp_blockchain::TreeRoute; -use sp_runtime::traits::Block as BlockT; +use sp_runtime::traits::{Block as BlockT, NumberFor}; + +const SKIP_MAINTAINANCE_THRESHOLD: u16 = 20; /// Helper struct for keeping track of the current state of processed new best /// block and finalized events. The main purpose of keeping track of this state @@ -54,6 +57,13 @@ where recent_finalized_block: Block::Hash, } +#[derive(Debug)] +pub enum EnactmentAction { + Skip, + HandleEnactment(TreeRoute), + HandleFinalization, +} + impl EnactmentState where Block: BlockT, @@ -76,29 +86,61 @@ where /// Updates the state according to the given `ChainEvent`, returning /// `Some(tree_route)` with a tree route including the blocks that need to /// be enacted/retracted. If no enactment is needed then `None` is returned. - pub fn update( + pub fn update( &mut self, event: &ChainEvent, - tree_route: &F, - ) -> Result>, String> + tree_route: &TreeRouteF, + hash_to_number: &BlockNumberF, + is_major_syncing: &IsMajorSyncF, + ) -> Result, String> where - F: Fn(Block::Hash, Block::Hash) -> Result, String>, + TreeRouteF: Fn(Block::Hash, Block::Hash) -> Result, String>, + BlockNumberF: Fn(Block::Hash) -> Result>, String>, + IsMajorSyncF: Fn() -> bool, { + if is_major_syncing() { + log::info!(target: "txpool", "skip maintain: major syncing"); + self.force_update(event); + return Ok(EnactmentAction::Skip) + } + let (new_hash, finalized) = match event { ChainEvent::NewBestBlock { hash, .. } => (*hash, false), ChainEvent::Finalized { hash, .. } => (*hash, true), }; + // do not proceed with txpool maintain if block distance is to high + let skip_maintanance = + match (hash_to_number(new_hash), hash_to_number(self.recent_best_block())) { + (Ok(Some(notified)), Ok(Some(current))) => + notified.checked_sub(¤t) > Some(SKIP_MAINTAINANCE_THRESHOLD.into()), + _ => true, + }; + + if skip_maintanance { + log::info!(target: "txpool", "skip maintain: tree_route would be too long"); + self.force_update(event); + return Ok(EnactmentAction::Skip) + } + // block was already finalized if self.recent_finalized_block == new_hash { log::debug!(target: "txpool", "handle_enactment: block already finalized"); - return Ok(None) + return Ok(EnactmentAction::Skip) } // compute actual tree route from best_block to notified block, and use // it instead of tree_route provided with event let tree_route = tree_route(self.recent_best_block, new_hash)?; + log::info!(target: "txpool", "skip maintain: enacted: {:?}", tree_route.enacted().len()); + // do not proceed with txpool maintain if enacted path is too long + if tree_route.enacted().len() > SKIP_MAINTAINANCE_THRESHOLD.into() { + log::info!(target: "txpool", "skip maintain: enacted too long"); + self.force_update(event); + return Ok(EnactmentAction::Skip) + } + log::debug!( target: "txpool", "resolve hash:{:?} finalized:{:?} tree_route:{:?} best_block:{:?} finalized_block:{:?}", @@ -114,7 +156,7 @@ where "Recently finalized block {} would be retracted by ChainEvent {}, skipping", self.recent_finalized_block, new_hash ); - return Ok(None) + return Ok(EnactmentAction::Skip) } if finalized { @@ -129,7 +171,7 @@ where target: "txpool", "handle_enactment: no newly enacted blocks since recent best block" ); - return Ok(None) + return Ok(EnactmentAction::HandleFinalization) } // otherwise enacted finalized block becomes best block... @@ -137,7 +179,7 @@ where self.recent_best_block = new_hash; - Ok(Some(tree_route)) + Ok(EnactmentAction::HandleEnactment(tree_route)) } /// Forces update of the state according to the given `ChainEvent`. Intended to be used as a @@ -156,9 +198,10 @@ where #[cfg(test)] mod enactment_state_tests { - use super::EnactmentState; + use super::{EnactmentAction, EnactmentState}; use sc_transaction_pool_api::ChainEvent; use sp_blockchain::{HashAndNumber, TreeRoute}; + use sp_runtime::traits::NumberFor; use std::sync::Arc; use substrate_test_runtime_client::runtime::{Block, Hash}; @@ -178,6 +221,9 @@ mod enactment_state_tests { fn e1() -> HashAndNumber { HashAndNumber { number: 5, hash: Hash::from([0xE1; 32]) } } + fn x1() -> HashAndNumber { + HashAndNumber { number: 22, hash: Hash::from([0x1E; 32]) } + } fn b2() -> HashAndNumber { HashAndNumber { number: 2, hash: Hash::from([0xB2; 32]) } } @@ -190,11 +236,22 @@ mod enactment_state_tests { fn e2() -> HashAndNumber { HashAndNumber { number: 5, hash: Hash::from([0xE2; 32]) } } + fn x2() -> HashAndNumber { + HashAndNumber { number: 22, hash: Hash::from([0x2E; 32]) } + } + + fn test_chain() -> Vec> { + vec![x1(), e1(), d1(), c1(), b1(), a(), b2(), c2(), d2(), e2(), x2()] + } + + fn block_hash_to_block_number(hash: Hash) -> Result>, String> { + Ok(test_chain().iter().find(|x| x.hash == hash).map(|x| x.number)) + } /// mock tree_route computing function for simple two-forks chain fn tree_route(from: Hash, to: Hash) -> Result, String> { - let chain = vec![e1(), d1(), c1(), b1(), a(), b2(), c2(), d2(), e2()]; - let pivot = 4_usize; + let chain = test_chain(); + let pivot = chain.iter().position(|x| x.number == a().number).unwrap(); let from = chain .iter() @@ -205,13 +262,13 @@ mod enactment_state_tests { .position(|bn| bn.hash == to) .ok_or("existing block should be given")?; - // B1-C1-D1-E1 + // B1-C1-D1-E1-..-X1 // / // A // \ - // B2-C2-D2-E2 + // B2-C2-D2-E2-..-X2 // - // [E1 D1 C1 B1 A B2 C2 D2 E2] + // [X1 E1 D1 C1 B1 A B2 C2 D2 E2 X2] let vec: Vec> = if from < to { chain.into_iter().skip(from).take(to - from + 1).collect() @@ -381,13 +438,21 @@ mod enactment_state_tests { let expected = TreeRoute::new(vec![a()], 0); assert_treeroute_eq(result, expected); } + + #[test] + fn tree_route_mock_test_17() { + let result = tree_route(x2().hash, b1().hash); + let expected = TreeRoute::new(vec![x2(), e2(), d2(), c2(), b2(), a(), b1()], 5); + assert_treeroute_eq(result, expected); + } } - fn trigger_new_best_block( + fn trigger_new_best_block_with_major_sync( state: &mut EnactmentState, from: HashAndNumber, acted_on: HashAndNumber, - ) -> bool { + major_sync: bool, + ) -> EnactmentAction { let (from, acted_on) = (from.hash, acted_on.hash); let event_tree_route = tree_route(from, acted_on).expect("Tree route exists"); @@ -399,16 +464,18 @@ mod enactment_state_tests { tree_route: Some(Arc::new(event_tree_route)), }, &tree_route, + &block_hash_to_block_number, + &|| major_sync, ) .unwrap() - .is_some() } - fn trigger_finalized( + fn trigger_finalized_with_major_sync( state: &mut EnactmentState, from: HashAndNumber, acted_on: HashAndNumber, - ) -> bool { + major_sync: bool, + ) -> EnactmentAction { let (from, acted_on) = (from.hash, acted_on.hash); let v = tree_route(from, acted_on) @@ -419,9 +486,29 @@ mod enactment_state_tests { .collect::>(); state - .update(&ChainEvent::Finalized { hash: acted_on, tree_route: v.into() }, &tree_route) + .update( + &ChainEvent::Finalized { hash: acted_on, tree_route: v.into() }, + &tree_route, + &block_hash_to_block_number, + &|| major_sync, + ) .unwrap() - .is_some() + } + + fn trigger_new_best_block( + state: &mut EnactmentState, + from: HashAndNumber, + acted_on: HashAndNumber, + ) -> EnactmentAction { + trigger_new_best_block_with_major_sync(state, from, acted_on, false) + } + + fn trigger_finalized( + state: &mut EnactmentState, + from: HashAndNumber, + acted_on: HashAndNumber, + ) -> EnactmentAction { + trigger_finalized_with_major_sync(state, from, acted_on, false) } fn assert_es_eq( @@ -445,51 +532,51 @@ mod enactment_state_tests { // B2-C2-D2-E2 let result = trigger_new_best_block(&mut es, a(), d1()); - assert!(result); + assert!(matches!(result, EnactmentAction::HandleEnactment { .. })); assert_es_eq(&es, d1(), a()); let result = trigger_new_best_block(&mut es, d1(), e1()); - assert!(result); + assert!(matches!(result, EnactmentAction::HandleEnactment { .. })); assert_es_eq(&es, e1(), a()); let result = trigger_finalized(&mut es, a(), d2()); - assert!(result); + assert!(matches!(result, EnactmentAction::HandleEnactment { .. })); assert_es_eq(&es, d2(), d2()); let result = trigger_new_best_block(&mut es, d2(), e1()); - assert_eq!(result, false); + assert!(matches!(result, EnactmentAction::Skip)); assert_es_eq(&es, d2(), d2()); let result = trigger_finalized(&mut es, a(), b2()); - assert_eq!(result, false); + assert!(matches!(result, EnactmentAction::Skip)); assert_es_eq(&es, d2(), d2()); let result = trigger_finalized(&mut es, a(), b1()); - assert_eq!(result, false); + assert!(matches!(result, EnactmentAction::Skip)); assert_es_eq(&es, d2(), d2()); let result = trigger_new_best_block(&mut es, a(), d2()); - assert_eq!(result, false); + assert!(matches!(result, EnactmentAction::Skip)); assert_es_eq(&es, d2(), d2()); let result = trigger_finalized(&mut es, a(), d2()); - assert_eq!(result, false); + assert!(matches!(result, EnactmentAction::Skip)); assert_es_eq(&es, d2(), d2()); let result = trigger_new_best_block(&mut es, a(), c2()); - assert_eq!(result, false); + assert!(matches!(result, EnactmentAction::Skip)); assert_es_eq(&es, d2(), d2()); let result = trigger_new_best_block(&mut es, a(), c1()); - assert_eq!(result, false); + assert!(matches!(result, EnactmentAction::Skip)); assert_es_eq(&es, d2(), d2()); let result = trigger_new_best_block(&mut es, d2(), e2()); - assert!(result); + assert!(matches!(result, EnactmentAction::HandleEnactment { .. })); assert_es_eq(&es, e2(), d2()); let result = trigger_finalized(&mut es, d2(), e2()); - assert_eq!(result, false); + assert!(matches!(result, EnactmentAction::HandleFinalization)); assert_es_eq(&es, e2(), e2()); } @@ -501,27 +588,27 @@ mod enactment_state_tests { // A-B1-C1-D1-E1 let result = trigger_new_best_block(&mut es, a(), b1()); - assert!(result); + assert!(matches!(result, EnactmentAction::HandleEnactment { .. })); assert_es_eq(&es, b1(), a()); let result = trigger_new_best_block(&mut es, b1(), c1()); - assert!(result); + assert!(matches!(result, EnactmentAction::HandleEnactment { .. })); assert_es_eq(&es, c1(), a()); let result = trigger_new_best_block(&mut es, c1(), d1()); - assert!(result); + assert!(matches!(result, EnactmentAction::HandleEnactment { .. })); assert_es_eq(&es, d1(), a()); let result = trigger_new_best_block(&mut es, d1(), e1()); - assert!(result); + assert!(matches!(result, EnactmentAction::HandleEnactment { .. })); assert_es_eq(&es, e1(), a()); let result = trigger_finalized(&mut es, a(), c1()); - assert_eq!(result, false); + assert!(matches!(result, EnactmentAction::HandleFinalization)); assert_es_eq(&es, e1(), c1()); let result = trigger_finalized(&mut es, c1(), e1()); - assert_eq!(result, false); + assert!(matches!(result, EnactmentAction::HandleFinalization)); assert_es_eq(&es, e1(), e1()); } @@ -533,11 +620,11 @@ mod enactment_state_tests { // A-B1-C1-D1-E1 let result = trigger_new_best_block(&mut es, a(), e1()); - assert!(result); + assert!(matches!(result, EnactmentAction::HandleEnactment { .. })); assert_es_eq(&es, e1(), a()); let result = trigger_finalized(&mut es, a(), b1()); - assert_eq!(result, false); + assert!(matches!(result, EnactmentAction::HandleFinalization)); assert_es_eq(&es, e1(), b1()); } @@ -549,11 +636,11 @@ mod enactment_state_tests { // A-B1-C1-D1-E1 let result = trigger_finalized(&mut es, a(), e1()); - assert!(result); + assert!(matches!(result, EnactmentAction::HandleEnactment { .. })); assert_es_eq(&es, e1(), e1()); let result = trigger_finalized(&mut es, e1(), b1()); - assert_eq!(result, false); + assert!(matches!(result, EnactmentAction::Skip)); assert_es_eq(&es, e1(), e1()); } @@ -569,11 +656,11 @@ mod enactment_state_tests { // B2-C2-D2-E2 let result = trigger_finalized(&mut es, a(), e1()); - assert!(result); + assert!(matches!(result, EnactmentAction::HandleEnactment { .. })); assert_es_eq(&es, e1(), e1()); let result = trigger_finalized(&mut es, e1(), e2()); - assert_eq!(result, false); + assert!(matches!(result, EnactmentAction::Skip)); assert_es_eq(&es, e1(), e1()); } @@ -585,19 +672,19 @@ mod enactment_state_tests { // A-B1-C1-D1-E1 let result = trigger_new_best_block(&mut es, a(), b1()); - assert!(result); + assert!(matches!(result, EnactmentAction::HandleEnactment { .. })); assert_es_eq(&es, b1(), a()); let result = trigger_finalized(&mut es, a(), d1()); - assert!(result); + assert!(matches!(result, EnactmentAction::HandleEnactment { .. })); assert_es_eq(&es, d1(), d1()); let result = trigger_new_best_block(&mut es, a(), e1()); - assert!(result); + assert!(matches!(result, EnactmentAction::HandleEnactment { .. })); assert_es_eq(&es, e1(), d1()); let result = trigger_new_best_block(&mut es, a(), c1()); - assert_eq!(result, false); + assert!(matches!(result, EnactmentAction::Skip)); assert_es_eq(&es, e1(), d1()); } @@ -618,4 +705,41 @@ mod enactment_state_tests { es.force_update(&ChainEvent::Finalized { hash: b1().hash, tree_route: Arc::from([]) }); assert_es_eq(&es, b1(), b1()); } + + #[test] + fn test_enactment_skip_major_sync() { + sp_tracing::try_init_simple(); + let mut es = EnactmentState::new(a().hash, a().hash); + + // A-B1-C1-..-X1 + let result = trigger_new_best_block_with_major_sync(&mut es, a(), c1(), true); + assert!(matches!(result, EnactmentAction::Skip)); + assert_es_eq(&es, c1(), a()); + + let result = trigger_finalized_with_major_sync(&mut es, a(), x1(), true); + assert!(matches!(result, EnactmentAction::Skip)); + assert_es_eq(&es, x1(), x1()); + } + + #[test] + fn test_enactment_skip_long_enacted_path() { + sp_tracing::try_init_simple(); + let mut es = EnactmentState::new(a().hash, a().hash); + + // A-B1-C1-..-X1 + let result = trigger_new_best_block(&mut es, a(), x1()); + assert!(matches!(result, EnactmentAction::Skip)); + assert_es_eq(&es, x1(), a()); + } + + #[test] + fn test_enactment_proceed_with_enacted_path_at_threshold() { + sp_tracing::try_init_simple(); + let mut es = EnactmentState::new(b1().hash, b1().hash); + + // A-B1-C1-..-X1 + let result = trigger_new_best_block(&mut es, b1(), x1()); + assert!(matches!(result, EnactmentAction::HandleEnactment { .. })); + assert_es_eq(&es, x1(), b1()); + } } diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index 9a31d11c61f9b..8893ee0dfe45e 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -33,7 +33,7 @@ mod tests; pub use crate::api::FullChainApi; use async_trait::async_trait; -use enactment_state::EnactmentState; +use enactment_state::{EnactmentAction, EnactmentState}; use futures::{ channel::oneshot, future::{self, ready}, @@ -42,7 +42,6 @@ use futures::{ pub use graph::{ base_pool::Limit as PoolLimit, ChainApi, Options, Pool, Transaction, ValidatedTransaction, }; -use num_traits::CheckedSub; use parking_lot::Mutex; use std::{ collections::{HashMap, HashSet}, @@ -68,8 +67,6 @@ use prometheus_endpoint::Registry as PrometheusRegistry; use sp_blockchain::{HashAndNumber, TreeRoute}; -const SKIP_MAINTAINANCE_THRESHOLD: u16 = 20; - type BoxedReadyIterator = Box>> + Send>; @@ -728,32 +725,6 @@ where where SO: sp_consensus::SyncOracle + std::marker::Send + std::marker::Sync + ?Sized, { - if sync_oracle.is_major_syncing() { - log::info!(target: "txpool", "skip maintain: major syncing"); - self.enactment_state.lock().force_update(&event); - return - } - - // do not maintain txpool if block distance is to high - let skip_maintanance = match event { - ChainEvent::Finalized { hash, .. } | ChainEvent::NewBestBlock { hash, .. } => match ( - self.api.block_id_to_number(&BlockId::Hash(hash)), - self.api.block_id_to_number(&BlockId::Hash( - self.enactment_state.lock().recent_best_block(), - )), - ) { - (Ok(Some(notified)), Ok(Some(current))) => - Some(SKIP_MAINTAINANCE_THRESHOLD.into()) < notified.checked_sub(¤t), - _ => true, - }, - }; - - if skip_maintanance { - log::info!(target: "txpool", "skip maintain: tree_route would be too long"); - self.enactment_state.lock().force_update(&event); - return - } - let prev_finalized_block = self.enactment_state.lock().recent_finalized_block(); let compute_tree_route = |from, to| -> Result, String> { match self.api.tree_route(from, to) { @@ -764,22 +735,25 @@ where )), } }; - - let result = self.enactment_state.lock().update(&event, &compute_tree_route); + let is_major_syncing = || sync_oracle.is_major_syncing(); + let block_id_to_number = + |hash| self.api.block_id_to_number(&BlockId::Hash(hash)).map_err(|e| format!("{}", e)); + + let result = self.enactment_state.lock().update( + &event, + &compute_tree_route, + &block_id_to_number, + &is_major_syncing, + ); match result { Err(msg) => { log::debug!(target: "txpool", "{msg}"); self.enactment_state.lock().force_update(&event); }, - Ok(None) => {}, - Ok(Some(tree_route)) => { - // do not maintain txpool if enacted path is too long - if tree_route.enacted().len() > SKIP_MAINTAINANCE_THRESHOLD.into() { - log::info!(target: "txpool", "skip maintain: enacted too long"); - return - } - + Ok(EnactmentAction::Skip) => return, + Ok(EnactmentAction::HandleFinalization) => {}, + Ok(EnactmentAction::HandleEnactment(tree_route)) => { self.handle_enactment(tree_route).await; }, }; From f3487f0f258e75862681d2cc71df146a60c932b4 Mon Sep 17 00:00:00 2001 From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Date: Thu, 12 Jan 2023 14:33:38 +0100 Subject: [PATCH 09/17] SyncOracle fixes here and there --- bin/node/cli/src/service.rs | 1 + .../basic-authorship/src/basic_authorship.rs | 78 +++++++++++-------- client/consensus/manual-seal/src/lib.rs | 11 ++- 3 files changed, 55 insertions(+), 35 deletions(-) diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index cdee61af3f500..ca9e9020427fe 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -646,6 +646,7 @@ mod tests { futures::executor::block_on(service.transaction_pool().maintain( ChainEvent::NewBestBlock { hash: parent_header.hash(), tree_route: None }, + Arc::new(sp_consensus::NoNetwork), )); let mut proposer_factory = sc_basic_authorship::ProposerFactory::new( diff --git a/client/basic-authorship/src/basic_authorship.rs b/client/basic-authorship/src/basic_authorship.rs index c39d07a14f0f1..4052fe1982b4c 100644 --- a/client/basic-authorship/src/basic_authorship.rs +++ b/client/basic-authorship/src/basic_authorship.rs @@ -615,11 +615,14 @@ mod tests { .unwrap(); block_on( - txpool.maintain(chain_event( - client - .expect_header(client.info().genesis_hash) - .expect("there should be header"), - )), + txpool.maintain( + chain_event( + client + .expect_header(client.info().genesis_hash) + .expect("there should be header"), + ), + Arc::new(sp_consensus::NoNetwork), + ), ); let mut proposer_factory = @@ -708,11 +711,14 @@ mod tests { block_on(txpool.submit_at(&BlockId::number(0), SOURCE, vec![extrinsic(0)])).unwrap(); block_on( - txpool.maintain(chain_event( - client - .expect_header(client.info().genesis_hash) - .expect("there should be header"), - )), + txpool.maintain( + chain_event( + client + .expect_header(client.info().genesis_hash) + .expect("there should be header"), + ), + Arc::new(sp_consensus::NoNetwork), + ), ); let mut proposer_factory = @@ -809,11 +815,14 @@ mod tests { }; block_on( - txpool.maintain(chain_event( - client - .expect_header(client.info().genesis_hash) - .expect("there should be header"), - )), + txpool.maintain( + chain_event( + client + .expect_header(client.info().genesis_hash) + .expect("there should be header"), + ), + Arc::new(sp_consensus::NoNetwork), + ), ); assert_eq!(txpool.ready().count(), 7); @@ -822,11 +831,10 @@ mod tests { let hashof1 = block.hash(); block_on(client.import(BlockOrigin::Own, block)).unwrap(); - block_on( - txpool.maintain(chain_event( - client.expect_header(hashof1).expect("there should be header"), - )), - ); + block_on(txpool.maintain( + chain_event(client.expect_header(hashof1).expect("there should be header")), + Arc::new(sp_consensus::NoNetwork), + )); assert_eq!(txpool.ready().count(), 5); // now let's make sure that we can still make some progress @@ -872,7 +880,9 @@ mod tests { block_on(txpool.submit_at(&BlockId::number(0), SOURCE, extrinsics)).unwrap(); - block_on(txpool.maintain(chain_event(genesis_header.clone()))); + block_on( + txpool.maintain(chain_event(genesis_header.clone()), Arc::new(sp_consensus::NoNetwork)), + ); let mut proposer_factory = ProposerFactory::new(spawner.clone(), client.clone(), txpool.clone(), None, None); @@ -958,11 +968,14 @@ mod tests { .unwrap(); block_on( - txpool.maintain(chain_event( - client - .expect_header(client.info().genesis_hash) - .expect("there should be header"), - )), + txpool.maintain( + chain_event( + client + .expect_header(client.info().genesis_hash) + .expect("there should be header"), + ), + Arc::new(sp_consensus::NoNetwork), + ), ); assert_eq!(txpool.ready().count(), MAX_SKIPPED_TRANSACTIONS * 3); @@ -1020,11 +1033,14 @@ mod tests { .unwrap(); block_on( - txpool.maintain(chain_event( - client - .expect_header(client.info().genesis_hash) - .expect("there should be header"), - )), + txpool.maintain( + chain_event( + client + .expect_header(client.info().genesis_hash) + .expect("there should be header"), + ), + Arc::new(sp_consensus::NoNetwork), + ), ); assert_eq!(txpool.ready().count(), MAX_SKIPPED_TRANSACTIONS * 2 + 2); diff --git a/client/consensus/manual-seal/src/lib.rs b/client/consensus/manual-seal/src/lib.rs index 5c86382a3ab7b..a7df541cdd94c 100644 --- a/client/consensus/manual-seal/src/lib.rs +++ b/client/consensus/manual-seal/src/lib.rs @@ -585,10 +585,13 @@ mod tests { let header = client.header(created_block.hash).expect("db error").expect("imported above"); assert_eq!(header.number, 1); - pool.maintain(sc_transaction_pool_api::ChainEvent::NewBestBlock { - hash: header.hash(), - tree_route: None, - }) + pool.maintain( + sc_transaction_pool_api::ChainEvent::NewBestBlock { + hash: header.hash(), + tree_route: None, + }, + Arc::new(sp_consensus::NoNetwork), + ) .await; let (tx1, rx1) = futures::channel::oneshot::channel(); From 47f9fce0e4bae0ab610fa6af99ce24d993bb3f16 Mon Sep 17 00:00:00 2001 From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Date: Thu, 12 Jan 2023 16:24:36 +0100 Subject: [PATCH 10/17] Update client/transaction-pool/src/enactment_state.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/transaction-pool/src/enactment_state.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/transaction-pool/src/enactment_state.rs b/client/transaction-pool/src/enactment_state.rs index 35297416b6f2e..10381f1b56266 100644 --- a/client/transaction-pool/src/enactment_state.rs +++ b/client/transaction-pool/src/enactment_state.rs @@ -110,7 +110,7 @@ where }; // do not proceed with txpool maintain if block distance is to high - let skip_maintanance = + let skip_maintenance = match (hash_to_number(new_hash), hash_to_number(self.recent_best_block())) { (Ok(Some(notified)), Ok(Some(current))) => notified.checked_sub(¤t) > Some(SKIP_MAINTAINANCE_THRESHOLD.into()), From 54cf4c0b4e56fd1b6543447d9964afc68141ffc9 Mon Sep 17 00:00:00 2001 From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Date: Thu, 12 Jan 2023 16:24:51 +0100 Subject: [PATCH 11/17] Update client/transaction-pool/src/enactment_state.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/transaction-pool/src/enactment_state.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/transaction-pool/src/enactment_state.rs b/client/transaction-pool/src/enactment_state.rs index 10381f1b56266..cf11ce0a414c2 100644 --- a/client/transaction-pool/src/enactment_state.rs +++ b/client/transaction-pool/src/enactment_state.rs @@ -117,7 +117,7 @@ where _ => true, }; - if skip_maintanance { + if skip_maintenance { log::info!(target: "txpool", "skip maintain: tree_route would be too long"); self.force_update(event); return Ok(EnactmentAction::Skip) From d5261cc514c5d27158f09f2a4e4fda02765adc3d Mon Sep 17 00:00:00 2001 From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Date: Thu, 12 Jan 2023 16:32:30 +0100 Subject: [PATCH 12/17] sync_oracle removed --- Cargo.lock | 1 - bin/node/cli/src/service.rs | 1 - .../basic-authorship/src/basic_authorship.rs | 78 +++++------ client/consensus/manual-seal/src/lib.rs | 11 +- client/service/src/builder.rs | 8 +- client/transaction-pool/Cargo.toml | 1 - client/transaction-pool/api/Cargo.toml | 1 - client/transaction-pool/api/src/lib.rs | 4 +- .../transaction-pool/src/enactment_state.rs | 56 +------- client/transaction-pool/src/lib.rs | 20 +-- client/transaction-pool/tests/pool.rs | 131 ++++++++---------- 11 files changed, 107 insertions(+), 205 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fcffb3ab43172..2e968b5e97ca8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9077,7 +9077,6 @@ dependencies = [ "serde", "serde_json", "sp-blockchain", - "sp-consensus", "sp-runtime", "thiserror", ] diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index ca9e9020427fe..cdee61af3f500 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -646,7 +646,6 @@ mod tests { futures::executor::block_on(service.transaction_pool().maintain( ChainEvent::NewBestBlock { hash: parent_header.hash(), tree_route: None }, - Arc::new(sp_consensus::NoNetwork), )); let mut proposer_factory = sc_basic_authorship::ProposerFactory::new( diff --git a/client/basic-authorship/src/basic_authorship.rs b/client/basic-authorship/src/basic_authorship.rs index 4052fe1982b4c..c39d07a14f0f1 100644 --- a/client/basic-authorship/src/basic_authorship.rs +++ b/client/basic-authorship/src/basic_authorship.rs @@ -615,14 +615,11 @@ mod tests { .unwrap(); block_on( - txpool.maintain( - chain_event( - client - .expect_header(client.info().genesis_hash) - .expect("there should be header"), - ), - Arc::new(sp_consensus::NoNetwork), - ), + txpool.maintain(chain_event( + client + .expect_header(client.info().genesis_hash) + .expect("there should be header"), + )), ); let mut proposer_factory = @@ -711,14 +708,11 @@ mod tests { block_on(txpool.submit_at(&BlockId::number(0), SOURCE, vec![extrinsic(0)])).unwrap(); block_on( - txpool.maintain( - chain_event( - client - .expect_header(client.info().genesis_hash) - .expect("there should be header"), - ), - Arc::new(sp_consensus::NoNetwork), - ), + txpool.maintain(chain_event( + client + .expect_header(client.info().genesis_hash) + .expect("there should be header"), + )), ); let mut proposer_factory = @@ -815,14 +809,11 @@ mod tests { }; block_on( - txpool.maintain( - chain_event( - client - .expect_header(client.info().genesis_hash) - .expect("there should be header"), - ), - Arc::new(sp_consensus::NoNetwork), - ), + txpool.maintain(chain_event( + client + .expect_header(client.info().genesis_hash) + .expect("there should be header"), + )), ); assert_eq!(txpool.ready().count(), 7); @@ -831,10 +822,11 @@ mod tests { let hashof1 = block.hash(); block_on(client.import(BlockOrigin::Own, block)).unwrap(); - block_on(txpool.maintain( - chain_event(client.expect_header(hashof1).expect("there should be header")), - Arc::new(sp_consensus::NoNetwork), - )); + block_on( + txpool.maintain(chain_event( + client.expect_header(hashof1).expect("there should be header"), + )), + ); assert_eq!(txpool.ready().count(), 5); // now let's make sure that we can still make some progress @@ -880,9 +872,7 @@ mod tests { block_on(txpool.submit_at(&BlockId::number(0), SOURCE, extrinsics)).unwrap(); - block_on( - txpool.maintain(chain_event(genesis_header.clone()), Arc::new(sp_consensus::NoNetwork)), - ); + block_on(txpool.maintain(chain_event(genesis_header.clone()))); let mut proposer_factory = ProposerFactory::new(spawner.clone(), client.clone(), txpool.clone(), None, None); @@ -968,14 +958,11 @@ mod tests { .unwrap(); block_on( - txpool.maintain( - chain_event( - client - .expect_header(client.info().genesis_hash) - .expect("there should be header"), - ), - Arc::new(sp_consensus::NoNetwork), - ), + txpool.maintain(chain_event( + client + .expect_header(client.info().genesis_hash) + .expect("there should be header"), + )), ); assert_eq!(txpool.ready().count(), MAX_SKIPPED_TRANSACTIONS * 3); @@ -1033,14 +1020,11 @@ mod tests { .unwrap(); block_on( - txpool.maintain( - chain_event( - client - .expect_header(client.info().genesis_hash) - .expect("there should be header"), - ), - Arc::new(sp_consensus::NoNetwork), - ), + txpool.maintain(chain_event( + client + .expect_header(client.info().genesis_hash) + .expect("there should be header"), + )), ); assert_eq!(txpool.ready().count(), MAX_SKIPPED_TRANSACTIONS * 2 + 2); diff --git a/client/consensus/manual-seal/src/lib.rs b/client/consensus/manual-seal/src/lib.rs index a7df541cdd94c..5c86382a3ab7b 100644 --- a/client/consensus/manual-seal/src/lib.rs +++ b/client/consensus/manual-seal/src/lib.rs @@ -585,13 +585,10 @@ mod tests { let header = client.header(created_block.hash).expect("db error").expect("imported above"); assert_eq!(header.number, 1); - pool.maintain( - sc_transaction_pool_api::ChainEvent::NewBestBlock { - hash: header.hash(), - tree_route: None, - }, - Arc::new(sp_consensus::NoNetwork), - ) + pool.maintain(sc_transaction_pool_api::ChainEvent::NewBestBlock { + hash: header.hash(), + tree_route: None, + }) .await; let (tx1, rx1) = futures::channel::oneshot::channel(); diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 05d1dad8badb1..1f94f96fae89e 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -350,7 +350,6 @@ pub trait SpawnTaskNetwork: sc_offchain::NetworkProvider + NetworkStateInfo + NetworkStatusProvider - + sp_consensus::SyncOracle + Send + Sync + 'static @@ -363,7 +362,6 @@ where T: sc_offchain::NetworkProvider + NetworkStateInfo + NetworkStatusProvider - + sp_consensus::SyncOracle + Send + Sync + 'static, @@ -500,11 +498,7 @@ where spawn_handle.spawn( "txpool-notifications", Some("transaction-pool"), - sc_transaction_pool::notification_future( - client.clone(), - transaction_pool.clone(), - network.clone(), - ), + sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()), ); spawn_handle.spawn( diff --git a/client/transaction-pool/Cargo.toml b/client/transaction-pool/Cargo.toml index 0e141ae4e448f..4b69b6d0f2fe0 100644 --- a/client/transaction-pool/Cargo.toml +++ b/client/transaction-pool/Cargo.toml @@ -29,7 +29,6 @@ sc-transaction-pool-api = { version = "4.0.0-dev", path = "./api" } sc-utils = { version = "4.0.0-dev", path = "../utils" } sp-api = { version = "4.0.0-dev", path = "../../primitives/api" } sp-blockchain = { version = "4.0.0-dev", path = "../../primitives/blockchain" } -sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/common" } sp-core = { version = "7.0.0", path = "../../primitives/core" } sp-runtime = { version = "7.0.0", path = "../../primitives/runtime" } sp-tracing = { version = "6.0.0", path = "../../primitives/tracing" } diff --git a/client/transaction-pool/api/Cargo.toml b/client/transaction-pool/api/Cargo.toml index 829d94f211297..e14a3ff4f3839 100644 --- a/client/transaction-pool/api/Cargo.toml +++ b/client/transaction-pool/api/Cargo.toml @@ -16,7 +16,6 @@ serde = { version = "1.0.136", features = ["derive"] } thiserror = "1.0.30" sp-blockchain = { version = "4.0.0-dev", path = "../../../primitives/blockchain" } sp-runtime = { version = "7.0.0", default-features = false, path = "../../../primitives/runtime" } -sp-consensus = { version = "0.10.0-dev", path = "../../../primitives/consensus/common" } [dev-dependencies] serde_json = "1.0" diff --git a/client/transaction-pool/api/src/lib.rs b/client/transaction-pool/api/src/lib.rs index a83ca6aafbfbc..c1e49ad07d7b1 100644 --- a/client/transaction-pool/api/src/lib.rs +++ b/client/transaction-pool/api/src/lib.rs @@ -307,9 +307,7 @@ pub enum ChainEvent { #[async_trait] pub trait MaintainedTransactionPool: TransactionPool { /// Perform maintenance - async fn maintain(&self, event: ChainEvent, sync_oracle: Arc) - where - SO: sp_consensus::SyncOracle + std::marker::Send + std::marker::Sync + ?Sized; + async fn maintain(&self, event: ChainEvent); } /// Transaction pool interface for submitting local transactions that exposes a diff --git a/client/transaction-pool/src/enactment_state.rs b/client/transaction-pool/src/enactment_state.rs index cf11ce0a414c2..fffaf3fb782fc 100644 --- a/client/transaction-pool/src/enactment_state.rs +++ b/client/transaction-pool/src/enactment_state.rs @@ -86,24 +86,16 @@ where /// Updates the state according to the given `ChainEvent`, returning /// `Some(tree_route)` with a tree route including the blocks that need to /// be enacted/retracted. If no enactment is needed then `None` is returned. - pub fn update( + pub fn update( &mut self, event: &ChainEvent, tree_route: &TreeRouteF, hash_to_number: &BlockNumberF, - is_major_syncing: &IsMajorSyncF, ) -> Result, String> where TreeRouteF: Fn(Block::Hash, Block::Hash) -> Result, String>, BlockNumberF: Fn(Block::Hash) -> Result>, String>, - IsMajorSyncF: Fn() -> bool, { - if is_major_syncing() { - log::info!(target: "txpool", "skip maintain: major syncing"); - self.force_update(event); - return Ok(EnactmentAction::Skip) - } - let (new_hash, finalized) = match event { ChainEvent::NewBestBlock { hash, .. } => (*hash, false), ChainEvent::Finalized { hash, .. } => (*hash, true), @@ -187,10 +179,7 @@ where pub fn force_update(&mut self, event: &ChainEvent) { match event { ChainEvent::NewBestBlock { hash, .. } => self.recent_best_block = *hash, - ChainEvent::Finalized { hash, .. } => { - self.recent_best_block = *hash; - self.recent_finalized_block = *hash; - }, + ChainEvent::Finalized { hash, .. } => self.recent_finalized_block = *hash, }; log::debug!(target: "txpool", "forced update: {:?}, {:?}", self.recent_best_block, self.recent_finalized_block); } @@ -447,11 +436,10 @@ mod enactment_state_tests { } } - fn trigger_new_best_block_with_major_sync( + fn trigger_new_best_block( state: &mut EnactmentState, from: HashAndNumber, acted_on: HashAndNumber, - major_sync: bool, ) -> EnactmentAction { let (from, acted_on) = (from.hash, acted_on.hash); @@ -465,16 +453,14 @@ mod enactment_state_tests { }, &tree_route, &block_hash_to_block_number, - &|| major_sync, ) .unwrap() } - fn trigger_finalized_with_major_sync( + fn trigger_finalized( state: &mut EnactmentState, from: HashAndNumber, acted_on: HashAndNumber, - major_sync: bool, ) -> EnactmentAction { let (from, acted_on) = (from.hash, acted_on.hash); @@ -490,27 +476,10 @@ mod enactment_state_tests { &ChainEvent::Finalized { hash: acted_on, tree_route: v.into() }, &tree_route, &block_hash_to_block_number, - &|| major_sync, ) .unwrap() } - fn trigger_new_best_block( - state: &mut EnactmentState, - from: HashAndNumber, - acted_on: HashAndNumber, - ) -> EnactmentAction { - trigger_new_best_block_with_major_sync(state, from, acted_on, false) - } - - fn trigger_finalized( - state: &mut EnactmentState, - from: HashAndNumber, - acted_on: HashAndNumber, - ) -> EnactmentAction { - trigger_finalized_with_major_sync(state, from, acted_on, false) - } - fn assert_es_eq( es: &EnactmentState, expected_best_block: HashAndNumber, @@ -703,22 +672,7 @@ mod enactment_state_tests { let mut es = EnactmentState::new(a().hash, a().hash); es.force_update(&ChainEvent::Finalized { hash: b1().hash, tree_route: Arc::from([]) }); - assert_es_eq(&es, b1(), b1()); - } - - #[test] - fn test_enactment_skip_major_sync() { - sp_tracing::try_init_simple(); - let mut es = EnactmentState::new(a().hash, a().hash); - - // A-B1-C1-..-X1 - let result = trigger_new_best_block_with_major_sync(&mut es, a(), c1(), true); - assert!(matches!(result, EnactmentAction::Skip)); - assert_es_eq(&es, c1(), a()); - - let result = trigger_finalized_with_major_sync(&mut es, a(), x1(), true); - assert!(matches!(result, EnactmentAction::Skip)); - assert_es_eq(&es, x1(), x1()); + assert_es_eq(&es, a(), b1()); } #[test] diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index 8893ee0dfe45e..bc79910b41af5 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -76,7 +76,8 @@ type ReadyIteratorFor = type PolledIterator = Pin> + Send>>; /// A transaction pool for a full node. -pub type FullPool = BasicPool, Block>; +pub type FullPool = + BasicPool, Block>; /// Basic implementation of transaction pool that can be customized by providing PoolApi. pub struct BasicPool @@ -721,10 +722,7 @@ where Block: BlockT, PoolApi: 'static + graph::ChainApi, { - async fn maintain(&self, event: ChainEvent, sync_oracle: Arc) - where - SO: sp_consensus::SyncOracle + std::marker::Send + std::marker::Sync + ?Sized, - { + async fn maintain(&self, event: ChainEvent) { let prev_finalized_block = self.enactment_state.lock().recent_finalized_block(); let compute_tree_route = |from, to| -> Result, String> { match self.api.tree_route(from, to) { @@ -735,7 +733,6 @@ where )), } }; - let is_major_syncing = || sync_oracle.is_major_syncing(); let block_id_to_number = |hash| self.api.block_id_to_number(&BlockId::Hash(hash)).map_err(|e| format!("{}", e)); @@ -743,7 +740,6 @@ where &event, &compute_tree_route, &block_id_to_number, - &is_major_syncing, ); match result { @@ -779,15 +775,11 @@ where } /// Inform the transaction pool about imported and finalized blocks. -pub async fn notification_future( - client: Arc, - txpool: Arc, - sync_oracle: Arc, -) where +pub async fn notification_future(client: Arc, txpool: Arc) +where Block: BlockT, Client: sc_client_api::BlockchainEvents, Pool: MaintainedTransactionPool, - SyncOracle: sp_consensus::SyncOracle + std::marker::Sync + std::marker::Send + ?Sized, { let import_stream = client .import_notification_stream() @@ -796,6 +788,6 @@ pub async fn notification_future( let finality_stream = client.finality_notification_stream().map(Into::into).fuse(); futures::stream::select(import_stream, finality_stream) - .for_each(|evt| txpool.maintain(evt, sync_oracle.clone())) + .for_each(|evt| txpool.maintain(evt)) .await } diff --git a/client/transaction-pool/tests/pool.rs b/client/transaction-pool/tests/pool.rs index 20a7d6c8b8780..27d62c3250abc 100644 --- a/client/transaction-pool/tests/pool.rs +++ b/client/transaction-pool/tests/pool.rs @@ -77,16 +77,6 @@ fn create_basic_pool(test_api: TestApi) -> BasicPool { create_basic_pool_with_genesis(Arc::from(test_api)).0 } -fn block_on_pool_maintain_with_event( - pool: &BasicPool, - event: ChainEvent, -) where - PoolApi: sc_transaction_pool::ChainApi + 'static, - Block: sp_runtime::traits::Block, -{ - block_on(pool.maintain(event, Arc::new(sp_consensus::NoNetwork))); -} - const SOURCE: TransactionSource = TransactionSource::External; #[test] @@ -175,7 +165,7 @@ fn only_prune_on_new_best() { let header = api.push_block(2, vec![uxt], true); let event = ChainEvent::NewBestBlock { hash: header.hash(), tree_route: None }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 0); } @@ -246,7 +236,7 @@ fn should_prune_old_during_maintenance() { let header = api.push_block(1, vec![xt.clone()], true); - block_on_pool_maintain_with_event(&pool, block_event(header)); + block_on(pool.maintain(block_event(header))); assert_eq!(pool.status().ready, 0); } @@ -266,7 +256,7 @@ fn should_revalidate_during_maintenance() { api.add_invalid(&xt2); - block_on_pool_maintain_with_event(&pool, block_event(header)); + block_on(pool.maintain(block_event(header))); assert_eq!(pool.status().ready, 1); // test that pool revalidated transaction that left ready and not included in the block @@ -290,7 +280,7 @@ fn should_resubmit_from_retracted_during_maintenance() { let event = block_event_with_retracted(header, fork_header.hash(), pool.api()); - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 1); } @@ -308,7 +298,7 @@ fn should_not_resubmit_from_retracted_during_maintenance_if_tx_is_also_in_enacte let event = block_event_with_retracted(header, fork_header.hash(), pool.api()); - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 0); } @@ -327,7 +317,7 @@ fn should_not_retain_invalid_hashes_from_retracted() { api.add_invalid(&xt); let event = block_event_with_retracted(header, fork_header.hash(), pool.api()); - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); assert_eq!( futures::executor::block_on_stream(watcher).collect::>(), @@ -351,14 +341,14 @@ fn should_revalidate_across_many_blocks() { assert_eq!(pool.status().ready, 2); let header = api.push_block(1, vec![], true); - block_on_pool_maintain_with_event(&pool, block_event(header)); + block_on(pool.maintain(block_event(header))); block_on(pool.submit_one(&BlockId::number(1), SOURCE, xt3.clone())).expect("1. Imported"); assert_eq!(pool.status().ready, 3); let header = api.push_block(2, vec![xt1.clone()], true); let block_hash = header.hash(); - block_on_pool_maintain_with_event(&pool, block_event(header.clone())); + block_on(pool.maintain(block_event(header.clone()))); block_on( watcher1 @@ -401,7 +391,7 @@ fn should_push_watchers_during_maintenance() { // clear timer events if any let header = api.push_block(1, vec![], true); - block_on_pool_maintain_with_event(&pool, block_event(header)); + block_on(pool.maintain(block_event(header))); // then // hash3 is now invalid @@ -419,10 +409,10 @@ fn should_push_watchers_during_maintenance() { // when let header = api.push_block(2, vec![tx0, tx1, tx2], true); let header_hash = header.hash(); - block_on_pool_maintain_with_event(&pool, block_event(header)); + block_on(pool.maintain(block_event(header))); let event = ChainEvent::Finalized { hash: header_hash, tree_route: Arc::from(vec![]) }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); // then // events for hash0 are: Ready, InBlock @@ -466,10 +456,10 @@ fn finalization() { let header = pool.api().chain().read().block_by_number.get(&2).unwrap()[0].0.header().clone(); let event = ChainEvent::NewBestBlock { hash: header.hash(), tree_route: None }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); let event = ChainEvent::Finalized { hash: header.hash(), tree_route: Arc::from(vec![]) }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); let mut stream = futures::executor::block_on_stream(watcher); assert_eq!(stream.next(), Some(TransactionStatus::Ready)); @@ -504,7 +494,7 @@ fn fork_aware_finalization() { let c2; let d2; - block_on_pool_maintain_with_event(&pool, block_event(a_header)); + block_on(pool.maintain(block_event(a_header))); // block B1 { @@ -518,10 +508,10 @@ fn fork_aware_finalization() { log::trace!(target:"txpool", ">> B1: {:?} {:?}", header.hash(), header); let event = ChainEvent::NewBestBlock { hash: header.hash(), tree_route: None }; b1 = header.hash(); - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 0); let event = ChainEvent::Finalized { hash: b1, tree_route: Arc::from(vec![]) }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); } // block C2 @@ -534,7 +524,7 @@ fn fork_aware_finalization() { log::trace!(target:"txpool", ">> C2: {:?} {:?}", header.hash(), header); let event = ChainEvent::NewBestBlock { hash: header.hash(), tree_route: None }; c2 = header.hash(); - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 0); } @@ -549,7 +539,7 @@ fn fork_aware_finalization() { log::trace!(target:"txpool", ">> D2: {:?} {:?}", header.hash(), header); let event = ChainEvent::NewBestBlock { hash: header.hash(), tree_route: None }; d2 = header.hash(); - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 0); } @@ -564,11 +554,11 @@ fn fork_aware_finalization() { c1 = header.hash(); canon_watchers.push((watcher, header.hash())); let event = block_event_with_retracted(header.clone(), d2, pool.api()); - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 2); let event = ChainEvent::Finalized { hash: header.hash(), tree_route: Arc::from(vec![]) }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); } // block D1 @@ -583,10 +573,10 @@ fn fork_aware_finalization() { canon_watchers.push((w, header.hash())); let event = ChainEvent::NewBestBlock { hash: header.hash(), tree_route: None }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 2); let event = ChainEvent::Finalized { hash: d1, tree_route: Arc::from(vec![]) }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); } let e1; @@ -597,12 +587,9 @@ fn fork_aware_finalization() { log::trace!(target:"txpool", ">> E1: {:?} {:?}", header.hash(), header); e1 = header.hash(); let event = ChainEvent::NewBestBlock { hash: header.hash(), tree_route: None }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 0); - block_on_pool_maintain_with_event( - &pool, - ChainEvent::Finalized { hash: e1, tree_route: Arc::from(vec![]) }, - ); + block_on(pool.maintain(ChainEvent::Finalized { hash: e1, tree_route: Arc::from(vec![]) })); } for (canon_watcher, h) in canon_watchers { @@ -659,7 +646,7 @@ fn prune_and_retract_tx_at_same_time() { assert_eq!(pool.status().ready, 1); let event = ChainEvent::NewBestBlock { hash: header.hash(), tree_route: None }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 0); header.hash() }; @@ -670,11 +657,11 @@ fn prune_and_retract_tx_at_same_time() { assert_eq!(pool.status().ready, 0); let event = block_event_with_retracted(header.clone(), b1, pool.api()); - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 0); let event = ChainEvent::Finalized { hash: header.hash(), tree_route: Arc::from(vec![]) }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); header.hash() }; @@ -729,7 +716,7 @@ fn resubmit_tx_of_fork_that_is_not_part_of_retracted() { let event = ChainEvent::NewBestBlock { hash: header.hash(), tree_route: None }; d0 = header.hash(); - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 0); } @@ -746,7 +733,7 @@ fn resubmit_tx_of_fork_that_is_not_part_of_retracted() { //push new best block let header = pool.api().push_block(2, vec![], true); let event = block_event_with_retracted(header, d0, pool.api()); - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 2); } } @@ -782,7 +769,7 @@ fn resubmit_from_retracted_fork() { let header = pool.api().push_block(2, vec![tx0.clone()], true); assert_eq!(pool.status().ready, 1); - block_on_pool_maintain_with_event(&pool, block_event(header)); + block_on(pool.maintain(block_event(header))); assert_eq!(pool.status().ready, 0); } @@ -791,7 +778,7 @@ fn resubmit_from_retracted_fork() { let _ = block_on(pool.submit_and_watch(&BlockId::number(1), SOURCE, tx1.clone())) .expect("1. Imported"); let header = pool.api().push_block(3, vec![tx1.clone()], true); - block_on_pool_maintain_with_event(&pool, block_event(header)); + block_on(pool.maintain(block_event(header))); assert_eq!(pool.status().ready, 0); } @@ -800,7 +787,7 @@ fn resubmit_from_retracted_fork() { let _ = block_on(pool.submit_and_watch(&BlockId::number(1), SOURCE, tx2.clone())) .expect("1. Imported"); let header = pool.api().push_block(4, vec![tx2.clone()], true); - block_on_pool_maintain_with_event(&pool, block_event(header.clone())); + block_on(pool.maintain(block_event(header.clone()))); assert_eq!(pool.status().ready, 0); header.hash() }; @@ -839,7 +826,7 @@ fn resubmit_from_retracted_fork() { assert_eq!(expected_ready, ready); let event = block_event_with_retracted(f1_header, f0, pool.api()); - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 3); let ready = pool.ready().map(|t| t.data.encode()).collect::>(); @@ -864,7 +851,7 @@ fn ready_set_should_resolve_after_block_update() { let xt1 = uxt(Alice, 209); block_on(pool.submit_one(&BlockId::number(1), SOURCE, xt1.clone())).expect("1. Imported"); - block_on_pool_maintain_with_event(&pool, block_event(header)); + block_on(pool.maintain(block_event(header))); assert!(pool.ready_at(1).now_or_never().is_some()); } @@ -886,7 +873,7 @@ fn ready_set_should_eventually_resolve_when_block_update_arrives() { panic!("Ready set should not be ready before block update!"); } - block_on_pool_maintain_with_event(&pool, block_event(header)); + block_on(pool.maintain(block_event(header))); match ready_set_future.poll_unpin(&mut context) { Poll::Pending => { @@ -977,7 +964,7 @@ fn import_notification_to_pool_maintain_works() { // Get the notification of the block import and maintain the pool with it, // Now, the pool should not contain any transactions. let evt = import_stream.next().expect("Importing a block leads to an event"); - block_on_pool_maintain_with_event(&*pool, evt.try_into().expect("Imported as new best block")); + block_on(pool.maintain(evt.try_into().expect("Imported as new best block"))); assert_eq!(pool.status().ready, 0); } @@ -993,7 +980,7 @@ fn pruning_a_transaction_should_remove_it_from_best_transaction() { let header = api.push_block(1, vec![xt1.clone()], true); // This will prune `xt1`. - block_on_pool_maintain_with_event(&pool, block_event(header)); + block_on(pool.maintain(block_event(header))); assert_eq!(pool.status().ready, 0); } @@ -1028,7 +1015,7 @@ fn stale_transactions_are_pruned() { // Import block let header = api.push_block(1, xts, true); - block_on_pool_maintain_with_event(&pool, block_event(header)); + block_on(pool.maintain(block_event(header))); // The imported transactions have a different hash and should not evict our initial // transactions. assert_eq!(pool.status().future, 3); @@ -1036,7 +1023,7 @@ fn stale_transactions_are_pruned() { // Import enough blocks to make our transactions stale for n in 1..66 { let header = api.push_block(n, vec![], true); - block_on_pool_maintain_with_event(&pool, block_event(header)); + block_on(pool.maintain(block_event(header))); } assert_eq!(pool.status().future, 0); @@ -1058,7 +1045,7 @@ fn finalized_only_handled_correctly() { let event = ChainEvent::Finalized { hash: header.clone().hash(), tree_route: Arc::from(vec![]) }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 0); @@ -1086,8 +1073,8 @@ fn best_block_after_finalized_handled_correctly() { let event = ChainEvent::Finalized { hash: header.clone().hash(), tree_route: Arc::from(vec![]) }; - block_on_pool_maintain_with_event(&pool, event); - block_on_pool_maintain_with_event(&pool, block_event(header.clone())); + block_on(pool.maintain(event)); + block_on(pool.maintain(block_event(header.clone()))); assert_eq!(pool.status().ready, 0); @@ -1150,13 +1137,13 @@ fn switching_fork_with_finalized_works() { { let event = ChainEvent::NewBestBlock { hash: b1_header.hash(), tree_route: None }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 1); } { let event = ChainEvent::Finalized { hash: b2_header.hash(), tree_route: Arc::from(vec![]) }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); } { @@ -1229,28 +1216,28 @@ fn switching_fork_multiple_times_works() { { // phase-0 let event = ChainEvent::NewBestBlock { hash: b1_header.hash(), tree_route: None }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 1); } { // phase-1 let event = block_event_with_retracted(b2_header.clone(), b1_header.hash(), pool.api()); - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 0); } { // phase-2 let event = block_event_with_retracted(b1_header.clone(), b2_header.hash(), pool.api()); - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 1); } { // phase-3 let event = ChainEvent::Finalized { hash: b2_header.hash(), tree_route: Arc::from(vec![]) }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); } { @@ -1353,13 +1340,13 @@ fn two_blocks_delayed_finalization_works() { { let event = ChainEvent::Finalized { hash: a_header.hash(), tree_route: Arc::from(vec![]) }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 3); } { let event = ChainEvent::NewBestBlock { hash: d1_header.hash(), tree_route: None }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 0); } @@ -1368,13 +1355,13 @@ fn two_blocks_delayed_finalization_works() { hash: c1_header.hash(), tree_route: Arc::from(vec![b1_header.hash()]), }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); } // this is to collect events from_charlie_watcher and make sure nothing was retracted { let event = ChainEvent::Finalized { hash: d1_header.hash(), tree_route: Arc::from(vec![]) }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); } { @@ -1452,27 +1439,27 @@ fn delayed_finalization_does_not_retract() { { // phase-0 let event = ChainEvent::NewBestBlock { hash: b1_header.hash(), tree_route: None }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 1); } { // phase-1 let event = ChainEvent::NewBestBlock { hash: c1_header.hash(), tree_route: None }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 0); } { // phase-2 let event = ChainEvent::Finalized { hash: b1_header.hash(), tree_route: Arc::from(vec![]) }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); } { // phase-3 let event = ChainEvent::Finalized { hash: c1_header.hash(), tree_route: Arc::from(vec![]) }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); } { @@ -1546,7 +1533,7 @@ fn best_block_after_finalization_does_not_retract() { { let event = ChainEvent::Finalized { hash: a_header.hash(), tree_route: Arc::from(vec![]) }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); } { @@ -1554,13 +1541,13 @@ fn best_block_after_finalization_does_not_retract() { hash: c1_header.hash(), tree_route: Arc::from(vec![a_header.hash(), b1_header.hash()]), }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 0); } { let event = ChainEvent::NewBestBlock { hash: b1_header.hash(), tree_route: None }; - block_on_pool_maintain_with_event(&pool, event); + block_on(pool.maintain(event)); } { From ed7fdf62097732d81c935bae1704ae809691dd7b Mon Sep 17 00:00:00 2001 From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Date: Thu, 12 Jan 2023 17:00:06 +0100 Subject: [PATCH 13/17] spelling + fmt + doc --- .../transaction-pool/src/enactment_state.rs | 20 +++++++++---------- client/transaction-pool/src/lib.rs | 12 +++++------ 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/client/transaction-pool/src/enactment_state.rs b/client/transaction-pool/src/enactment_state.rs index fffaf3fb782fc..7108b5e704978 100644 --- a/client/transaction-pool/src/enactment_state.rs +++ b/client/transaction-pool/src/enactment_state.rs @@ -23,11 +23,12 @@ use sc_transaction_pool_api::ChainEvent; use sp_blockchain::TreeRoute; use sp_runtime::traits::{Block as BlockT, NumberFor}; -const SKIP_MAINTAINANCE_THRESHOLD: u16 = 20; +const SKIP_MAINTENANCE_THRESHOLD: u16 = 20; /// Helper struct for keeping track of the current state of processed new best /// block and finalized events. The main purpose of keeping track of this state -/// is to figure out if a transaction pool enactment is needed or not. +/// is to figure out which phases (enactment / finalization) of transaction pool +/// maintenance are needed. /// /// Given the following chain: /// @@ -57,10 +58,14 @@ where recent_finalized_block: Block::Hash, } +/// Enactment action that should be performed after processing the `ChainEvent` #[derive(Debug)] pub enum EnactmentAction { + /// Both phases of maintenance shall be skipped Skip, + /// Both phases of maintenance shall be performed HandleEnactment(TreeRoute), + /// Enactment phase of maintenance shall be skipped HandleFinalization, } @@ -78,11 +83,6 @@ where self.recent_finalized_block } - /// Returns the recently finalized block. - pub fn recent_best_block(&self) -> Block::Hash { - self.recent_best_block - } - /// Updates the state according to the given `ChainEvent`, returning /// `Some(tree_route)` with a tree route including the blocks that need to /// be enacted/retracted. If no enactment is needed then `None` is returned. @@ -103,9 +103,9 @@ where // do not proceed with txpool maintain if block distance is to high let skip_maintenance = - match (hash_to_number(new_hash), hash_to_number(self.recent_best_block())) { + match (hash_to_number(new_hash), hash_to_number(self.recent_best_block)) { (Ok(Some(notified)), Ok(Some(current))) => - notified.checked_sub(¤t) > Some(SKIP_MAINTAINANCE_THRESHOLD.into()), + notified.checked_sub(¤t) > Some(SKIP_MAINTENANCE_THRESHOLD.into()), _ => true, }; @@ -127,7 +127,7 @@ where log::info!(target: "txpool", "skip maintain: enacted: {:?}", tree_route.enacted().len()); // do not proceed with txpool maintain if enacted path is too long - if tree_route.enacted().len() > SKIP_MAINTAINANCE_THRESHOLD.into() { + if tree_route.enacted().len() > SKIP_MAINTENANCE_THRESHOLD.into() { log::info!(target: "txpool", "skip maintain: enacted too long"); self.force_update(event); return Ok(EnactmentAction::Skip) diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index bc79910b41af5..1cd9bef77bc69 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -76,8 +76,7 @@ type ReadyIteratorFor = type PolledIterator = Pin> + Send>>; /// A transaction pool for a full node. -pub type FullPool = - BasicPool, Block>; +pub type FullPool = BasicPool, Block>; /// Basic implementation of transaction pool that can be customized by providing PoolApi. pub struct BasicPool @@ -736,11 +735,10 @@ where let block_id_to_number = |hash| self.api.block_id_to_number(&BlockId::Hash(hash)).map_err(|e| format!("{}", e)); - let result = self.enactment_state.lock().update( - &event, - &compute_tree_route, - &block_id_to_number, - ); + let result = + self.enactment_state + .lock() + .update(&event, &compute_tree_route, &block_id_to_number); match result { Err(msg) => { From c0f8d0fe4f5984a3ac50015dab89c02ec02e4656 Mon Sep 17 00:00:00 2001 From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Date: Thu, 12 Jan 2023 17:22:40 +0100 Subject: [PATCH 14/17] Review suggestions applied --- .../transaction-pool/src/enactment_state.rs | 25 ++++++------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/client/transaction-pool/src/enactment_state.rs b/client/transaction-pool/src/enactment_state.rs index 7108b5e704978..757d61a9b984a 100644 --- a/client/transaction-pool/src/enactment_state.rs +++ b/client/transaction-pool/src/enactment_state.rs @@ -96,18 +96,17 @@ where TreeRouteF: Fn(Block::Hash, Block::Hash) -> Result, String>, BlockNumberF: Fn(Block::Hash) -> Result>, String>, { - let (new_hash, finalized) = match event { - ChainEvent::NewBestBlock { hash, .. } => (*hash, false), - ChainEvent::Finalized { hash, .. } => (*hash, true), + let (new_hash, current_hash, finalized) = match event { + ChainEvent::NewBestBlock { hash, .. } => (*hash, self.recent_best_block, false), + ChainEvent::Finalized { hash, .. } => (*hash, self.recent_finalized_block, true), }; // do not proceed with txpool maintain if block distance is to high - let skip_maintenance = - match (hash_to_number(new_hash), hash_to_number(self.recent_best_block)) { - (Ok(Some(notified)), Ok(Some(current))) => - notified.checked_sub(¤t) > Some(SKIP_MAINTENANCE_THRESHOLD.into()), - _ => true, - }; + let skip_maintenance = match (hash_to_number(new_hash), hash_to_number(current_hash)) { + (Ok(Some(new)), Ok(Some(current))) => + new.checked_sub(¤t) > Some(SKIP_MAINTENANCE_THRESHOLD.into()), + _ => true, + }; if skip_maintenance { log::info!(target: "txpool", "skip maintain: tree_route would be too long"); @@ -125,14 +124,6 @@ where // it instead of tree_route provided with event let tree_route = tree_route(self.recent_best_block, new_hash)?; - log::info!(target: "txpool", "skip maintain: enacted: {:?}", tree_route.enacted().len()); - // do not proceed with txpool maintain if enacted path is too long - if tree_route.enacted().len() > SKIP_MAINTENANCE_THRESHOLD.into() { - log::info!(target: "txpool", "skip maintain: enacted too long"); - self.force_update(event); - return Ok(EnactmentAction::Skip) - } - log::debug!( target: "txpool", "resolve hash:{:?} finalized:{:?} tree_route:{:?} best_block:{:?} finalized_block:{:?}", From b12aee14342db09b1b18f1692afd597af75796c6 Mon Sep 17 00:00:00 2001 From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Date: Fri, 13 Jan 2023 08:27:12 +0100 Subject: [PATCH 15/17] log::info -> debug --- client/transaction-pool/src/enactment_state.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/transaction-pool/src/enactment_state.rs b/client/transaction-pool/src/enactment_state.rs index 757d61a9b984a..efbe3d481a660 100644 --- a/client/transaction-pool/src/enactment_state.rs +++ b/client/transaction-pool/src/enactment_state.rs @@ -109,7 +109,7 @@ where }; if skip_maintenance { - log::info!(target: "txpool", "skip maintain: tree_route would be too long"); + log::debug!(target: "txpool", "skip maintain: tree_route would be too long"); self.force_update(event); return Ok(EnactmentAction::Skip) } From 68e3192fea558ec73c7d75dc2a30d50431d5976b Mon Sep 17 00:00:00 2001 From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Date: Fri, 13 Jan 2023 17:36:48 +0100 Subject: [PATCH 16/17] Update client/transaction-pool/src/enactment_state.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/transaction-pool/src/enactment_state.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/client/transaction-pool/src/enactment_state.rs b/client/transaction-pool/src/enactment_state.rs index efbe3d481a660..59c69d2fb0a98 100644 --- a/client/transaction-pool/src/enactment_state.rs +++ b/client/transaction-pool/src/enactment_state.rs @@ -23,6 +23,10 @@ use sc_transaction_pool_api::ChainEvent; use sp_blockchain::TreeRoute; use sp_runtime::traits::{Block as BlockT, NumberFor}; +/// The threshold since the last update where we will skip any maintenance for blocks. +/// +/// This includes tracking re-orgs and sending out certain notifications. In general this shouldn't happen +/// and may only happen when the node is doing a full sync. const SKIP_MAINTENANCE_THRESHOLD: u16 = 20; /// Helper struct for keeping track of the current state of processed new best From 05332a25866cbc3abb5db0c652055d4c8a84a950 Mon Sep 17 00:00:00 2001 From: command-bot <> Date: Mon, 16 Jan 2023 18:40:23 +0000 Subject: [PATCH 17/17] ".git/.scripts/commands/fmt/fmt.sh" --- client/transaction-pool/src/enactment_state.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/transaction-pool/src/enactment_state.rs b/client/transaction-pool/src/enactment_state.rs index 59c69d2fb0a98..382b2683156b6 100644 --- a/client/transaction-pool/src/enactment_state.rs +++ b/client/transaction-pool/src/enactment_state.rs @@ -24,9 +24,9 @@ use sp_blockchain::TreeRoute; use sp_runtime::traits::{Block as BlockT, NumberFor}; /// The threshold since the last update where we will skip any maintenance for blocks. -/// -/// This includes tracking re-orgs and sending out certain notifications. In general this shouldn't happen -/// and may only happen when the node is doing a full sync. +/// +/// This includes tracking re-orgs and sending out certain notifications. In general this shouldn't +/// happen and may only happen when the node is doing a full sync. const SKIP_MAINTENANCE_THRESHOLD: u16 = 20; /// Helper struct for keeping track of the current state of processed new best