diff --git a/rpc/README.md b/rpc/README.md index 315b5eeb40..932b5bac31 100644 --- a/rpc/README.md +++ b/rpc/README.md @@ -7139,6 +7139,8 @@ Tx-pool entries object * `proposed`: `{ [ key:` [`H256`](#type-h256) `]: ` [`TxPoolEntry`](#type-txpoolentry) `}` - Proposed tx verbose info +* `conflicted`: `Array<` [`H256`](#type-h256) `>` - Conflicted tx hash vec + ### Type `TxPoolEntry` diff --git a/tx-pool/src/component/pool_map.rs b/tx-pool/src/component/pool_map.rs index e3daee1615..2f1ab65b7d 100644 --- a/tx-pool/src/component/pool_map.rs +++ b/tx-pool/src/component/pool_map.rs @@ -278,6 +278,12 @@ impl PoolMap { .collect() } + pub(crate) fn find_conflict_outpoint(&self, tx: &TransactionView) -> HashSet { + tx.input_pts_iter() + .filter_map(|out_point| self.edges.get_input_ref(&out_point).map(|_| out_point)) + .collect() + } + pub(crate) fn resolve_conflict(&mut self, tx: &TransactionView) -> Vec { let mut conflicts = Vec::new(); diff --git a/tx-pool/src/pool.rs b/tx-pool/src/pool.rs index ac358d2611..ea3776c21b 100644 --- a/tx-pool/src/pool.rs +++ b/tx-pool/src/pool.rs @@ -427,7 +427,16 @@ impl TxPool { .map(|entry| (entry.transaction().hash(), entry.to_info())) .collect(); - TxPoolEntryInfo { pending, proposed } + let conflicted = self + .conflicts_cache + .iter() + .map(|(_id, tx)| tx.hash()) + .collect(); + TxPoolEntryInfo { + pending, + proposed, + conflicted, + } } pub(crate) fn drain_all_transactions(&mut self) -> Vec { diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index bc4736eed9..a8bd190b0d 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -105,14 +105,13 @@ impl TxPoolService { ) -> (Result<(), Reject>, Arc) { let (ret, snapshot) = self .with_tx_pool_write_lock(move |tx_pool, snapshot| { - // here we double confirm RBF rules before insert entry // check_rbf must be invoked in `write` lock to avoid concurrent issues. let conflicts = if tx_pool.enable_rbf() { + // here we double confirm RBF rules before insert entry let rbf_res = tx_pool.check_rbf(&snapshot, &entry); if rbf_res.is_err() { // here if RBF is enabled, but `check_rbf` returned an Err, // means RBF check failed with conflicts, we put old entry into conflicts before return Err - // if RBF is disabled and there is conflict happened, `pre_check` will handle it tx_pool.record_conflict( entry.proposal_short_id(), entry.transaction().clone(), @@ -120,6 +119,18 @@ impl TxPoolService { } rbf_res? } else { + // RBF is disabled, but we found conflicts, we put old entry into conflicts before return Err + let conflicted_outpoints = + tx_pool.pool_map.find_conflict_outpoint(entry.transaction()); + if !conflicted_outpoints.is_empty() { + tx_pool.record_conflict( + entry.proposal_short_id(), + entry.transaction().clone(), + ); + return Err(Reject::Resolve(OutPointError::Dead( + conflicted_outpoints.into_iter().next().unwrap(), + ))); + } HashSet::new() }; @@ -234,7 +245,6 @@ impl TxPoolService { ) -> (Result, Arc) { // Acquire read lock for cheap check let tx_size = tx.data().serialized_size_in_block(); - let mut conflicted = None; let (ret, snapshot) = self .with_tx_pool_read_lock(|tx_pool, snapshot| { @@ -252,42 +262,31 @@ impl TxPoolService { let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?; Ok((tip_hash, rtx, status, fee, tx_size)) } - Err(err) => { - if matches!(err, Reject::Resolve(OutPointError::Dead(_))) { - let (rtx, status) = resolve_tx(tx_pool, &snapshot, tx.clone(), true)?; - let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?; - // Try an RBF cheap check, here if the tx is resolved as Dead, - // we assume there must be conflicted happened in txpool now, - // if there is no conflicted transactions reject it - let conflicts = tx_pool.pool_map.find_conflict_tx(&rtx.transaction); - if conflicts.is_empty() { - error!( - "{} is resolved as Dead, but there is no conflicted tx", - rtx.transaction.proposal_short_id() - ); - return Err(err); - } - if tx_pool.enable_rbf() { - Ok((tip_hash, rtx, status, fee, tx_size)) - } else { - conflicted = Some(rtx.transaction.clone()); - Err(err) - } - } else { - Err(err) + Err(Reject::Resolve(OutPointError::Dead(out))) => { + let (rtx, status) = resolve_tx(tx_pool, &snapshot, tx.clone(), true)?; + let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?; + // Try an RBF cheap check, here if the tx is resolved as Dead, + // we assume there must be conflicted happened in tx_pool now, + // if there is no conflicted transactions reject it + let conflicts = tx_pool.pool_map.find_conflict_tx(&rtx.transaction); + if conflicts.is_empty() { + // this mean one input's outpoint is dead, but there is no direct conflicted tx in tx_pool + // we should reject it directly and we don't need to put it into conflicts pool + error!( + "{} is resolved as Dead, but there is no conflicted tx", + rtx.transaction.proposal_short_id() + ); + return Err(Reject::Resolve(OutPointError::Dead(out.clone()))); } + // we also return Ok here, so that the entry will be continue to be verified before submit + // we only want to put it into conflicts pool after the verification stage passed + // then we will handle the conflicted txs in `submit_entry` + Ok((tip_hash, rtx, status, fee, tx_size)) } + Err(err) => Err(err), } }) .await; - if let Some(transaction) = conflicted { - // If RBF is disabled, but there is a double-spending tx in txpool - // we reject the new tx directly but record it in conflicts pool - self.with_tx_pool_write_lock(|tx_pool, _snapshot| { - tx_pool.record_conflict(transaction.proposal_short_id(), transaction.clone()); - }) - .await; - } (ret, snapshot) } diff --git a/util/jsonrpc-types/src/pool.rs b/util/jsonrpc-types/src/pool.rs index 90e1351c73..07ef88000e 100644 --- a/util/jsonrpc-types/src/pool.rs +++ b/util/jsonrpc-types/src/pool.rs @@ -181,11 +181,17 @@ pub struct TxPoolEntries { pub pending: HashMap, /// Proposed tx verbose info pub proposed: HashMap, + /// Conflicted tx hash vec + pub conflicted: Vec, } impl From for TxPoolEntries { fn from(info: TxPoolEntryInfo) -> Self { - let TxPoolEntryInfo { pending, proposed } = info; + let TxPoolEntryInfo { + pending, + proposed, + conflicted, + } = info; TxPoolEntries { pending: pending @@ -196,6 +202,7 @@ impl From for TxPoolEntries { .into_iter() .map(|(hash, entry)| (hash.unpack(), entry.into())) .collect(), + conflicted: conflicted.iter().map(Unpack::unpack).collect(), } } } diff --git a/util/types/src/core/tx_pool.rs b/util/types/src/core/tx_pool.rs index c588dec140..5d0fa788d5 100644 --- a/util/types/src/core/tx_pool.rs +++ b/util/types/src/core/tx_pool.rs @@ -161,6 +161,8 @@ pub struct TxPoolEntryInfo { pub pending: HashMap, /// Proposed transaction entry info pub proposed: HashMap, + /// Conflicted transaction hash vec + pub conflicted: Vec, } /// The JSON view of a transaction as well as its status.