Skip to content

Commit

Permalink
fix issues in conflicts pool
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Feb 5, 2024
1 parent 7da53f3 commit fad9bd3
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 36 deletions.
2 changes: 2 additions & 0 deletions rpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7139,6 +7139,8 @@ Tx-pool entries object

* `proposed`: `{ [ key:` [`H256`](#type-h256) `]: ` [`TxPoolEntry`](#type-txpoolentry) `}` - Proposed tx verbose info

* `conflicted`: `Array<` [`H256`](#type-h256) `>` - Conflicted tx hash vec


### Type `TxPoolEntry`

Expand Down
6 changes: 6 additions & 0 deletions tx-pool/src/component/pool_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,12 @@ impl PoolMap {
.collect()
}

pub(crate) fn find_conflict_outpoint(&self, tx: &TransactionView) -> HashSet<OutPoint> {
tx.input_pts_iter()
.filter_map(|out_point| self.edges.get_input_ref(&out_point).map(|_| out_point))
.collect()
}

pub(crate) fn resolve_conflict(&mut self, tx: &TransactionView) -> Vec<ConflictEntry> {
let mut conflicts = Vec::new();

Expand Down
11 changes: 10 additions & 1 deletion tx-pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,16 @@ impl TxPool {
.map(|entry| (entry.transaction().hash(), entry.to_info()))
.collect();

TxPoolEntryInfo { pending, proposed }
let conflicted = self
.conflicts_cache
.iter()
.map(|(_id, tx)| tx.hash())
.collect();
TxPoolEntryInfo {
pending,
proposed,
conflicted,
}
}

pub(crate) fn drain_all_transactions(&mut self) -> Vec<TransactionView> {
Expand Down
64 changes: 30 additions & 34 deletions tx-pool/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,21 +105,32 @@ impl TxPoolService {
) -> (Result<(), Reject>, Arc<Snapshot>) {
let (ret, snapshot) = self
.with_tx_pool_write_lock(move |tx_pool, snapshot| {
// here we double confirm RBF rules before insert entry
// check_rbf must be invoked in `write` lock to avoid concurrent issues.
let conflicts = if tx_pool.enable_rbf() {
// here we double confirm RBF rules before insert entry
let rbf_res = tx_pool.check_rbf(&snapshot, &entry);
if rbf_res.is_err() {
// here if RBF is enabled, but `check_rbf` returned an Err,
// means RBF check failed with conflicts, we put old entry into conflicts before return Err
// if RBF is disabled and there is conflict happened, `pre_check` will handle it
tx_pool.record_conflict(
entry.proposal_short_id(),
entry.transaction().clone(),
);
}
rbf_res?
} else {
// RBF is disabled, but we found conflicts, we put old entry into conflicts before return Err
let conflicted_outpoints =
tx_pool.pool_map.find_conflict_outpoint(entry.transaction());
if !conflicted_outpoints.is_empty() {
tx_pool.record_conflict(
entry.proposal_short_id(),
entry.transaction().clone(),
);
return Err(Reject::Resolve(OutPointError::Dead(
conflicted_outpoints.into_iter().next().unwrap(),
)));
}
HashSet::new()
};

Expand Down Expand Up @@ -234,7 +245,6 @@ impl TxPoolService {
) -> (Result<PreCheckedTx, Reject>, Arc<Snapshot>) {
// Acquire read lock for cheap check
let tx_size = tx.data().serialized_size_in_block();
let mut conflicted = None;

let (ret, snapshot) = self
.with_tx_pool_read_lock(|tx_pool, snapshot| {
Expand All @@ -252,42 +262,28 @@ impl TxPoolService {
let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
Ok((tip_hash, rtx, status, fee, tx_size))
}
Err(err) => {
if matches!(err, Reject::Resolve(OutPointError::Dead(_))) {
let (rtx, status) = resolve_tx(tx_pool, &snapshot, tx.clone(), true)?;
let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
// Try an RBF cheap check, here if the tx is resolved as Dead,
// we assume there must be conflicted happened in txpool now,
// if there is no conflicted transactions reject it
let conflicts = tx_pool.pool_map.find_conflict_tx(&rtx.transaction);
if conflicts.is_empty() {
error!(
"{} is resolved as Dead, but there is no conflicted tx",
rtx.transaction.proposal_short_id()
);
return Err(err);
}
if tx_pool.enable_rbf() {
Ok((tip_hash, rtx, status, fee, tx_size))
} else {
conflicted = Some(rtx.transaction.clone());
Err(err)
}
} else {
Err(err)
Err(Reject::Resolve(OutPointError::Dead(out))) => {
let (rtx, status) = resolve_tx(tx_pool, &snapshot, tx.clone(), true)?;
let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
let conflicts = tx_pool.pool_map.find_conflict_tx(&rtx.transaction);
if conflicts.is_empty() {
// this mean one input's outpoint is dead, but there is no direct conflicted tx in tx_pool
// we should reject it directly and we don't need to put it into conflicts pool
error!(
"{} is resolved as Dead, but there is no conflicted tx",
rtx.transaction.proposal_short_id()
);
return Err(Reject::Resolve(OutPointError::Dead(out.clone())));
}
// we also return Ok here, so that the entry will be continue to be verified before submit
// we only want to put it into conflicts pool after the verification stage passed
// then we will handle the conflicted txs in `submit_entry`
Ok((tip_hash, rtx, status, fee, tx_size))
}
Err(err) => Err(err),
}
})
.await;
if let Some(transaction) = conflicted {
// If RBF is disabled, but there is a double-spending tx in txpool
// we reject the new tx directly but record it in conflicts pool
self.with_tx_pool_write_lock(|tx_pool, _snapshot| {
tx_pool.record_conflict(transaction.proposal_short_id(), transaction.clone());
})
.await;
}
(ret, snapshot)
}

Expand Down
9 changes: 8 additions & 1 deletion util/jsonrpc-types/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,17 @@ pub struct TxPoolEntries {
pub pending: HashMap<H256, TxPoolEntry>,
/// Proposed tx verbose info
pub proposed: HashMap<H256, TxPoolEntry>,
/// Conflicted tx hash vec
pub conflicted: Vec<H256>,
}

impl From<TxPoolEntryInfo> for TxPoolEntries {
fn from(info: TxPoolEntryInfo) -> Self {
let TxPoolEntryInfo { pending, proposed } = info;
let TxPoolEntryInfo {
pending,
proposed,
conflicted,
} = info;

TxPoolEntries {
pending: pending
Expand All @@ -196,6 +202,7 @@ impl From<TxPoolEntryInfo> for TxPoolEntries {
.into_iter()
.map(|(hash, entry)| (hash.unpack(), entry.into()))
.collect(),
conflicted: conflicted.iter().map(Unpack::unpack).collect(),
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions util/types/src/core/tx_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ pub struct TxPoolEntryInfo {
pub pending: HashMap<Byte32, TxEntryInfo>,
/// Proposed transaction entry info
pub proposed: HashMap<Byte32, TxEntryInfo>,
/// Conflicted transaction hash vec
pub conflicted: Vec<Byte32>,
}

/// The JSON view of a transaction as well as its status.
Expand Down

0 comments on commit fad9bd3

Please sign in to comment.