From eecc8491ccf58990697f4c700a1191f56937884a Mon Sep 17 00:00:00 2001 From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Date: Thu, 28 Nov 2024 15:40:08 +0100 Subject: [PATCH] fatp: pending actions now support removals --- .../fork_aware_txpool/fork_aware_txpool.rs | 9 +- .../src/fork_aware_txpool/view_store.rs | 166 ++++++++++++++---- 2 files changed, 141 insertions(+), 34 deletions(-) diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs index 58a73f1c2259..5ed241796171 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs @@ -31,7 +31,10 @@ use crate::{ api::FullChainApi, common::log_xt::log_xt_trace, enactment_state::{EnactmentAction, EnactmentState}, - fork_aware_txpool::{dropped_watcher::DroppedReason, revalidation_worker}, + fork_aware_txpool::{ + dropped_watcher::{DroppedReason, DroppedTransaction}, + revalidation_worker, + }, graph::{ self, base_pool::{TimedTransactionSource, Transaction}, @@ -1325,6 +1328,10 @@ where self.mempool.try_replace_transaction(xt, source, watched, worst_tx_hash)?; // 5. notify listner + self.view_store + .listener + .transaction_dropped(DroppedTransaction::new_enforced_by_limts(worst_tx_hash)); + // 6. remove transaction from the view_store self.view_store.remove_transaction_subtree( worst_tx_hash, diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs index 3c095ae2a599..9de3e7422981 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs @@ -43,15 +43,13 @@ use std::{ time::Instant, }; -/// Helper struct to keep the context for transaction replacements. +/// Helper struct to maintain the context for pending transaction submission, executed for +/// newly inserted views. #[derive(Clone)] -struct PendingTxReplacement +struct PendingTxSubmission where ChainApi: graph::ChainApi, { - /// Indicates if the new transaction was already submitted to all the views in the view_store. - /// If true, it can be removed after inserting any new view. - processed: bool, /// New transaction replacing the old one. xt: ExtrinsicFor, /// Source of the transaction. @@ -60,13 +58,84 @@ where watched: bool, } -impl PendingTxReplacement +/// Helper type representing the callback allowing to trigger per-transaction events on +/// `ValidatedPool`'s listener. +type RemovalListener = + Arc, ExtrinsicHash) + Send + Sync>; + +/// Helper struct to maintain the context for pending transaction removal, executed for +/// newly inserted views. +struct PendingTxRemoval +where + ChainApi: graph::ChainApi, +{ + /// Hash of the transaction that will be removed, + xt_hash: ExtrinsicHash, + /// Action that shall be executed on underlying `ValidatedPool`'s listener. + listener_action: RemovalListener, +} + +/// This enum represents an action that should be executed on the newly built +/// view before this view is inserted into the view store. +enum PreInsertAction +where + ChainApi: graph::ChainApi, +{ + /// Represents the action of submitting a new transaction. Intended to use to handle usurped + /// transactions. + SubmitTx(PendingTxSubmission), + + /// Represents the action of removing a subtree of transactions. + RemoveSubtree(PendingTxRemoval), +} + +/// Represents a task awaiting execution, to be performed immediately prior to the view insertion +/// into the view store. +struct PendingPreInsertTask where ChainApi: graph::ChainApi, { - /// Creates new unprocessed instance of pending transaction replacement. - fn new(xt: ExtrinsicFor, source: TimedTransactionSource, watched: bool) -> Self { - Self { processed: false, xt, source, watched } + /// The action to be applied when inserting a new view. + action: PreInsertAction, + /// Indicates if the action was already applied to all the views in the view_store. + /// If true, it can be removed after inserting any new view. + processed: bool, +} + +impl PendingPreInsertTask +where + ChainApi: graph::ChainApi, +{ + /// Creates new unprocessed instance of pending transaction submission. + fn new_submission_action( + xt: ExtrinsicFor, + source: TimedTransactionSource, + watched: bool, + ) -> Self { + Self { + processed: false, + action: PreInsertAction::SubmitTx(PendingTxSubmission { xt, source, watched }), + } + } + + /// Creates new processed instance of pending transaction removal. + fn new_removal_action( + xt_hash: ExtrinsicHash, + listener: RemovalListener, + ) -> Self { + Self { + processed: false, + action: PreInsertAction::RemoveSubtree(PendingTxRemoval { + xt_hash, + listener_action: listener, + }), + } + } + + /// Marks a task as done for every view present in view store. Basically means that can be + /// removed on new view insertion. + fn mark_processed(&mut self) { + self.processed = true; } } @@ -100,9 +169,8 @@ where /// notifcication threads. It is meant to assure that replaced transaction is also removed from /// newly built views in maintain process. /// - /// The map's key is hash of replaced extrinsic. - pending_txs_replacements: - RwLock, PendingTxReplacement>>, + /// The map's key is hash of actionable extrinsic (to avoid duplicated entries). + pending_txs_tasks: RwLock, PendingPreInsertTask>>, } impl ViewStore @@ -124,7 +192,7 @@ where listener, most_recent_view: RwLock::from(None), dropped_stream_controller, - pending_txs_replacements: Default::default(), + pending_txs_tasks: Default::default(), } } @@ -575,8 +643,12 @@ where replaced: ExtrinsicHash, watched: bool, ) { - if let Entry::Vacant(entry) = self.pending_txs_replacements.write().entry(replaced) { - entry.insert(PendingTxReplacement::new(xt.clone(), source.clone(), watched)); + if let Entry::Vacant(entry) = self.pending_txs_tasks.write().entry(replaced) { + entry.insert(PendingPreInsertTask::new_submission_action( + xt.clone(), + source.clone(), + watched, + )); } else { return }; @@ -586,8 +658,8 @@ where self.replace_transaction_in_views(source, xt, xt_hash, replaced, watched).await; - if let Some(replacement) = self.pending_txs_replacements.write().get_mut(&replaced) { - replacement.processed = true; + if let Some(replacement) = self.pending_txs_tasks.write().get_mut(&replaced) { + replacement.mark_processed(); } } @@ -596,18 +668,25 @@ where /// After application, all already processed replacements are removed. async fn apply_pending_tx_replacements(&self, view: Arc>) { let mut futures = vec![]; - for replacement in self.pending_txs_replacements.read().values() { - let xt_hash = self.api.hash_and_length(&replacement.xt).0; - futures.push(self.replace_transaction_in_view( - view.clone(), - replacement.source.clone(), - replacement.xt.clone(), - xt_hash, - replacement.watched, - )); + for replacement in self.pending_txs_tasks.read().values() { + match replacement.action { + PreInsertAction::SubmitTx(ref submission) => { + let xt_hash = self.api.hash_and_length(&submission.xt).0; + futures.push(self.replace_transaction_in_view( + view.clone(), + submission.source.clone(), + submission.xt.clone(), + xt_hash, + submission.watched, + )); + }, + PreInsertAction::RemoveSubtree(ref removal) => { + view.remove_subtree(removal.xt_hash, &*removal.listener_action); + }, + } } let _results = futures::future::join_all(futures).await; - self.pending_txs_replacements.write().retain(|_, r| r.processed); + self.pending_txs_tasks.write().retain(|_, r| r.processed); } /// Submits `xt` to the given view. @@ -702,6 +781,9 @@ where /// allows to trigger the required events. Note listener may be called multiple times for the /// same hash. /// + /// Function will also schedule view pre-insertion actions to ensure that transactions will be + /// removed from newly created view. + /// /// Returns a vector containing the hashes of all removed transactions, including the root /// transaction specified by `tx_hash`. Vector contains only unique hashes. pub(super) fn remove_transaction_subtree( @@ -710,18 +792,36 @@ where listener_action: F, ) -> Vec> where - F: Fn(&mut crate::graph::Listener, ExtrinsicHash), + F: Fn(&mut crate::graph::Listener, ExtrinsicHash) + + Clone + + Send + + Sync + + 'static, { + if let Entry::Vacant(entry) = self.pending_txs_tasks.write().entry(xt_hash) { + entry.insert(PendingPreInsertTask::new_removal_action( + xt_hash, + Arc::from(listener_action.clone()), + )); + }; + let mut seen = HashSet::new(); - let active_views = self.active_views.read(); - let inactive_views = self.inactive_views.read(); - active_views + + let removed = self + .active_views + .read() .iter() - .chain(inactive_views.iter()) + .chain(self.inactive_views.read().iter()) .filter(|(_, view)| view.is_imported(&xt_hash)) .map(|(_, view)| view.remove_subtree(xt_hash, &listener_action)) .flatten() .filter(|xt_hash| seen.insert(*xt_hash)) - .collect() + .collect(); + + if let Some(removal_action) = self.pending_txs_tasks.write().get_mut(&xt_hash) { + removal_action.mark_processed(); + } + + removed } }