Skip to content

Commit

Permalink
fatp: pending actions now support removals
Browse files Browse the repository at this point in the history
  • Loading branch information
michalkucharczyk committed Nov 28, 2024
1 parent 3ca1e57 commit eecc849
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ use crate::{
api::FullChainApi,
common::log_xt::log_xt_trace,
enactment_state::{EnactmentAction, EnactmentState},
fork_aware_txpool::{dropped_watcher::DroppedReason, revalidation_worker},
fork_aware_txpool::{
dropped_watcher::{DroppedReason, DroppedTransaction},
revalidation_worker,
},
graph::{
self,
base_pool::{TimedTransactionSource, Transaction},
Expand Down Expand Up @@ -1325,6 +1328,10 @@ where
self.mempool.try_replace_transaction(xt, source, watched, worst_tx_hash)?;

// 5. notify listner
self.view_store
.listener
.transaction_dropped(DroppedTransaction::new_enforced_by_limts(worst_tx_hash));

// 6. remove transaction from the view_store
self.view_store.remove_transaction_subtree(
worst_tx_hash,
Expand Down
166 changes: 133 additions & 33 deletions substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,13 @@ use std::{
time::Instant,
};

/// Helper struct to keep the context for transaction replacements.
/// Helper struct to maintain the context for pending transaction submission, executed for
/// newly inserted views.
#[derive(Clone)]
struct PendingTxReplacement<ChainApi>
struct PendingTxSubmission<ChainApi>
where
ChainApi: graph::ChainApi,
{
/// Indicates if the new transaction was already submitted to all the views in the view_store.
/// If true, it can be removed after inserting any new view.
processed: bool,
/// New transaction replacing the old one.
xt: ExtrinsicFor<ChainApi>,
/// Source of the transaction.
Expand All @@ -60,13 +58,84 @@ where
watched: bool,
}

impl<ChainApi> PendingTxReplacement<ChainApi>
/// Helper type representing the callback allowing to trigger per-transaction events on
/// `ValidatedPool`'s listener.
type RemovalListener<ChainApi> =
Arc<dyn Fn(&mut crate::graph::Listener<ChainApi>, ExtrinsicHash<ChainApi>) + Send + Sync>;

/// Helper struct to maintain the context for pending transaction removal, executed for
/// newly inserted views.
struct PendingTxRemoval<ChainApi>
where
ChainApi: graph::ChainApi,
{
/// Hash of the transaction that will be removed,
xt_hash: ExtrinsicHash<ChainApi>,
/// Action that shall be executed on underlying `ValidatedPool`'s listener.
listener_action: RemovalListener<ChainApi>,
}

/// This enum represents an action that should be executed on the newly built
/// view before this view is inserted into the view store.
enum PreInsertAction<ChainApi>
where
ChainApi: graph::ChainApi,
{
/// Represents the action of submitting a new transaction. Intended to use to handle usurped
/// transactions.
SubmitTx(PendingTxSubmission<ChainApi>),

/// Represents the action of removing a subtree of transactions.
RemoveSubtree(PendingTxRemoval<ChainApi>),
}

/// Represents a task awaiting execution, to be performed immediately prior to the view insertion
/// into the view store.
struct PendingPreInsertTask<ChainApi>
where
ChainApi: graph::ChainApi,
{
/// Creates new unprocessed instance of pending transaction replacement.
fn new(xt: ExtrinsicFor<ChainApi>, source: TimedTransactionSource, watched: bool) -> Self {
Self { processed: false, xt, source, watched }
/// The action to be applied when inserting a new view.
action: PreInsertAction<ChainApi>,
/// Indicates if the action was already applied to all the views in the view_store.
/// If true, it can be removed after inserting any new view.
processed: bool,
}

impl<ChainApi> PendingPreInsertTask<ChainApi>
where
ChainApi: graph::ChainApi,
{
/// Creates new unprocessed instance of pending transaction submission.
fn new_submission_action(
xt: ExtrinsicFor<ChainApi>,
source: TimedTransactionSource,
watched: bool,
) -> Self {
Self {
processed: false,
action: PreInsertAction::SubmitTx(PendingTxSubmission { xt, source, watched }),
}
}

/// Creates new processed instance of pending transaction removal.
fn new_removal_action(
xt_hash: ExtrinsicHash<ChainApi>,
listener: RemovalListener<ChainApi>,
) -> Self {
Self {
processed: false,
action: PreInsertAction::RemoveSubtree(PendingTxRemoval {
xt_hash,
listener_action: listener,
}),
}
}

/// Marks a task as done for every view present in view store. Basically means that can be
/// removed on new view insertion.
fn mark_processed(&mut self) {
self.processed = true;
}
}

Expand Down Expand Up @@ -100,9 +169,8 @@ where
/// notifcication threads. It is meant to assure that replaced transaction is also removed from
/// newly built views in maintain process.
///
/// The map's key is hash of replaced extrinsic.
pending_txs_replacements:
RwLock<HashMap<ExtrinsicHash<ChainApi>, PendingTxReplacement<ChainApi>>>,
/// The map's key is hash of actionable extrinsic (to avoid duplicated entries).
pending_txs_tasks: RwLock<HashMap<ExtrinsicHash<ChainApi>, PendingPreInsertTask<ChainApi>>>,
}

impl<ChainApi, Block> ViewStore<ChainApi, Block>
Expand All @@ -124,7 +192,7 @@ where
listener,
most_recent_view: RwLock::from(None),
dropped_stream_controller,
pending_txs_replacements: Default::default(),
pending_txs_tasks: Default::default(),
}
}

Expand Down Expand Up @@ -575,8 +643,12 @@ where
replaced: ExtrinsicHash<ChainApi>,
watched: bool,
) {
if let Entry::Vacant(entry) = self.pending_txs_replacements.write().entry(replaced) {
entry.insert(PendingTxReplacement::new(xt.clone(), source.clone(), watched));
if let Entry::Vacant(entry) = self.pending_txs_tasks.write().entry(replaced) {
entry.insert(PendingPreInsertTask::new_submission_action(
xt.clone(),
source.clone(),
watched,
));
} else {
return
};
Expand All @@ -586,8 +658,8 @@ where

self.replace_transaction_in_views(source, xt, xt_hash, replaced, watched).await;

if let Some(replacement) = self.pending_txs_replacements.write().get_mut(&replaced) {
replacement.processed = true;
if let Some(replacement) = self.pending_txs_tasks.write().get_mut(&replaced) {
replacement.mark_processed();
}
}

Expand All @@ -596,18 +668,25 @@ where
/// After application, all already processed replacements are removed.
async fn apply_pending_tx_replacements(&self, view: Arc<View<ChainApi>>) {
let mut futures = vec![];
for replacement in self.pending_txs_replacements.read().values() {
let xt_hash = self.api.hash_and_length(&replacement.xt).0;
futures.push(self.replace_transaction_in_view(
view.clone(),
replacement.source.clone(),
replacement.xt.clone(),
xt_hash,
replacement.watched,
));
for replacement in self.pending_txs_tasks.read().values() {
match replacement.action {
PreInsertAction::SubmitTx(ref submission) => {
let xt_hash = self.api.hash_and_length(&submission.xt).0;
futures.push(self.replace_transaction_in_view(
view.clone(),
submission.source.clone(),
submission.xt.clone(),
xt_hash,
submission.watched,
));
},
PreInsertAction::RemoveSubtree(ref removal) => {
view.remove_subtree(removal.xt_hash, &*removal.listener_action);
},
}
}
let _results = futures::future::join_all(futures).await;
self.pending_txs_replacements.write().retain(|_, r| r.processed);
self.pending_txs_tasks.write().retain(|_, r| r.processed);
}

/// Submits `xt` to the given view.
Expand Down Expand Up @@ -702,6 +781,9 @@ where
/// allows to trigger the required events. Note listener may be called multiple times for the
/// same hash.
///
/// Function will also schedule view pre-insertion actions to ensure that transactions will be
/// removed from newly created view.
///
/// Returns a vector containing the hashes of all removed transactions, including the root
/// transaction specified by `tx_hash`. Vector contains only unique hashes.
pub(super) fn remove_transaction_subtree<F>(
Expand All @@ -710,18 +792,36 @@ where
listener_action: F,
) -> Vec<ExtrinsicHash<ChainApi>>
where
F: Fn(&mut crate::graph::Listener<ChainApi>, ExtrinsicHash<ChainApi>),
F: Fn(&mut crate::graph::Listener<ChainApi>, ExtrinsicHash<ChainApi>)
+ Clone
+ Send
+ Sync
+ 'static,
{
if let Entry::Vacant(entry) = self.pending_txs_tasks.write().entry(xt_hash) {
entry.insert(PendingPreInsertTask::new_removal_action(
xt_hash,
Arc::from(listener_action.clone()),
));
};

let mut seen = HashSet::new();
let active_views = self.active_views.read();
let inactive_views = self.inactive_views.read();
active_views

let removed = self
.active_views
.read()
.iter()
.chain(inactive_views.iter())
.chain(self.inactive_views.read().iter())
.filter(|(_, view)| view.is_imported(&xt_hash))
.map(|(_, view)| view.remove_subtree(xt_hash, &listener_action))
.flatten()
.filter(|xt_hash| seen.insert(*xt_hash))
.collect()
.collect();

if let Some(removal_action) = self.pending_txs_tasks.write().get_mut(&xt_hash) {
removal_action.mark_processed();
}

removed
}
}

0 comments on commit eecc849

Please sign in to comment.