Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add conflicts cache for tx_pool to record conflicted transactions #4339

Merged
merged 21 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions rpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4562,6 +4562,7 @@ Response
"timestamp": "0x17c983e6e44"
}
},
"conflicted": [],
"proposed": {}
}
}
Expand Down Expand Up @@ -6702,6 +6703,8 @@ Tx-pool entries object

`TxPoolEntries` is a JSON object with the following fields.

* `conflicted`: `Array<` [`H256`](#type-h256) `>` - Conflicted tx hash vec

* `pending`: - Pending tx verbose info

* `proposed`: - Proposed tx verbose info
Expand Down
1 change: 1 addition & 0 deletions rpc/src/module/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ pub trait PoolRpc {
/// "timestamp": "0x17c983e6e44"
/// }
/// },
/// "conflicted": [],
/// "proposed": {}
/// }
/// }
Expand Down
1 change: 0 additions & 1 deletion sync/src/relayer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,6 @@ impl Relayer {

if !short_ids_set.is_empty() {
let tx_pool = self.shared.shared().tx_pool_controller();

let fetch_txs = tx_pool.fetch_txs(short_ids_set);
if let Err(e) = fetch_txs {
return ReconstructionResult::Error(StatusCode::TxPool.with_context(e));
Expand Down
2 changes: 2 additions & 0 deletions test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ fn all_specs() -> Vec<Box<dyn Spec>> {
Box::new(LongForks),
Box::new(ForksContainSameTransactions),
Box::new(ForksContainSameUncle),
Box::new(SendConflictTxToRelay),
Box::new(SendConflictTxToRelayRBF),
Box::new(WithdrawDAO),
Box::new(WithdrawDAOWithOverflowCapacity),
Box::new(DAOWithSatoshiCellOccupied),
Expand Down
163 changes: 155 additions & 8 deletions test/src/specs/tx_pool/replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use crate::{
rpc::RpcClient,
util::{
cell::gen_spendable,
transaction::{always_success_transaction, always_success_transactions},
transaction::{
always_success_transaction, always_success_transactions, get_tx_pool_conflicts,
},
},
utils::wait_until,
Node, Spec,
Expand Down Expand Up @@ -85,7 +87,8 @@ impl Spec for RbfBasic {
let res = node0
.rpc_client()
.send_transaction_result(tx2.data().into());
assert!(res.is_ok(), "tx2 should replace old tx");
assert!(res.is_ok(), "tx2 should replace with old tx");
assert_eq!(get_tx_pool_conflicts(node0), vec![tx1.hash().unpack()]);

let ret = node0
.rpc_client()
Expand Down Expand Up @@ -138,6 +141,7 @@ impl Spec for RbfBasic {
assert!(ret.transaction.is_none());
assert!(matches!(ret.tx_status.status, Status::Rejected));
assert!(ret.tx_status.reason.unwrap().contains("RBFRejected"));
assert_eq!(get_tx_pool_conflicts(node0), vec![tx1.hash().unpack()]);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
Expand Down Expand Up @@ -165,6 +169,9 @@ impl Spec for RbfSameInput {
.rpc_client()
.send_transaction_result(tx2.data().into());
assert!(res.is_err(), "tx2 should be rejected");
let message = res.err().unwrap().to_string();
assert!(message.contains("PoolRejectedDuplicatedTransaction"));
assert_eq!(get_tx_pool_conflicts(node0), vec![]);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
Expand Down Expand Up @@ -200,6 +207,7 @@ impl Spec for RbfOnlyForResolveDead {
.send_transaction_result(tx2.data().into());
let message = res.err().unwrap().to_string();
assert!(message.contains("TransactionFailedToResolve: Resolve failed Unknown"));
assert_eq!(get_tx_pool_conflicts(node0), vec![]);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
Expand Down Expand Up @@ -246,6 +254,9 @@ impl Spec for RbfSameInputwithLessFee {
assert!(message.contains(
"Tx's current fee is 1000000000, expect it to >= 2000000363 to replace old txs"
));

// local submit tx RBF check failed, will be added into conflicts pool
assert_eq!(get_tx_pool_conflicts(node0), vec![tx2.hash().unpack()]);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
Expand Down Expand Up @@ -306,6 +317,9 @@ impl Spec for RbfTooManyDescendants {
.unwrap()
.to_string()
.contains("Tx conflict with too many txs"));

// local submit tx RBF check failed, will not in conflicts pool
assert_eq!(get_tx_pool_conflicts(node0), vec![tx2.hash().unpack()]);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
Expand Down Expand Up @@ -378,6 +392,9 @@ impl Spec for RbfContainNewTx {
.unwrap()
.to_string()
.contains("new Tx contains unconfirmed inputs"));

// local submit tx RBF check failed, will be in conflicts pool
assert_eq!(get_tx_pool_conflicts(node0), vec![tx2.hash().unpack()]);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
Expand Down Expand Up @@ -450,6 +467,9 @@ impl Spec for RbfContainInvalidInput {
.unwrap()
.to_string()
.contains("new Tx contains inputs in descendants of to be replaced Tx"));

// local submit tx RBF check failed, will not in conflicts pool
assert_eq!(get_tx_pool_conflicts(node0), vec![tx2.hash().unpack()]);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
Expand Down Expand Up @@ -502,12 +522,12 @@ impl Spec for RbfChildPayForParent {
}

let clone_tx = txs[2].clone();
// Set tx2 fee to a higher value, but not enough to pay for tx5
// Set tx2 fee to a higher value, but not enough to pay for tx4
let output2 = CellOutputBuilder::default()
.capacity(capacity_bytes!(70).pack())
.build();

let tx2 = clone_tx
let new_tx = clone_tx
.as_advanced_builder()
.set_inputs(vec![{
CellInput::new_builder()
Expand All @@ -519,19 +539,22 @@ impl Spec for RbfChildPayForParent {

let res = node0
.rpc_client()
.send_transaction_result(tx2.data().into());
.send_transaction_result(new_tx.data().into());
assert!(res.is_err(), "tx2 should be rejected");
assert!(res
.err()
.unwrap()
.to_string()
.contains("RBF rejected: Tx's current fee is 3000000000, expect it to >= 5000000363 to replace old txs"));

// local submit tx RBF check failed, will be in conflicts pool
assert_eq!(get_tx_pool_conflicts(node0), vec![new_tx.hash().unpack()]);

// let's try a new transaction with new higher fee
let output2 = CellOutputBuilder::default()
.capacity(capacity_bytes!(45).pack())
.build();
let tx2 = clone_tx
let new_tx_ok = clone_tx
.as_advanced_builder()
.set_inputs(vec![{
CellInput::new_builder()
Expand All @@ -542,8 +565,19 @@ impl Spec for RbfChildPayForParent {
.build();
let res = node0
.rpc_client()
.send_transaction_result(tx2.data().into());
.send_transaction_result(new_tx_ok.data().into());
assert!(res.is_ok());

// replaced txs are in conflicts pool
// tx2 tx3 tx4 is replaced
let mut expected: Vec<ckb_types::H256> = txs[2..=max_count - 1]
.iter()
.map(|tx| tx.hash().unpack())
.collect::<Vec<_>>();
expected.push(new_tx.hash().unpack());
expected.sort_unstable();
let conflicts = get_tx_pool_conflicts(node0);
assert_eq!(conflicts, expected);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
Expand Down Expand Up @@ -592,6 +626,8 @@ impl Spec for RbfContainInvalidCells {
.rpc_client()
.send_transaction_result(tx2.data().into());
assert!(res.is_err(), "tx2 should be rejected");
// script verification failed because of invalid cell dep, will not in conflicts pool
assert_eq!(get_tx_pool_conflicts(node0), vec![]);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
Expand Down Expand Up @@ -806,6 +842,14 @@ impl Spec for RbfReplaceProposedSuccess {
let tx1_status = node0.rpc_client().get_transaction(txs[2].hash()).tx_status;
assert_eq!(tx1_status.status, Status::Rejected);

let mut expected = [
txs[2].hash().unpack(),
txs[3].hash().unpack(),
txs[4].hash().unpack(),
];
expected.sort_unstable();
assert_eq!(get_tx_pool_conflicts(node0), expected);

let window_count = node0.consensus().tx_proposal_window().closest();
node0.mine(window_count);
// since old tx is already in BlockAssembler,
Expand Down Expand Up @@ -840,7 +884,7 @@ impl Spec for RbfConcurrency {
let tx1 = node0.new_transaction(tx_hash_0.clone());

let mut conflicts = vec![tx1];
// tx1 capacity is 100, set other txs to higer fee
// tx1 capacity is 100, set other txs to higher fee
let fees = [
capacity_bytes!(83),
capacity_bytes!(82),
Expand Down Expand Up @@ -886,6 +930,14 @@ impl Spec for RbfConcurrency {
for s in status.iter().take(4) {
assert_eq!(*s, Status::Rejected);
}

let mut expected: Vec<ckb_types::H256> = conflicts
.iter()
.take(4)
.map(|x| x.hash().unpack())
.collect();
expected.sort_unstable();
assert_eq!(get_tx_pool_conflicts(node0), expected);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
Expand Down Expand Up @@ -952,9 +1004,104 @@ impl Spec for RbfCellDepsCheck {
.unwrap()
.to_string()
.contains("new Tx contains cell deps from conflicts"));
assert_eq!(get_tx_pool_conflicts(node0), vec![new_tx.hash().unpack()]);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(1500);
}
}

fn run_spec_send_conflict_relay(nodes: &mut [Node]) {
let node0 = &nodes[0];
let node1 = &nodes[1];

node1.mine_until_out_bootstrap_period();
node0.connect(node1);
info!("Generate large cycles tx");

node0.new_block_with_blocking(|template| template.number.value() != 13);
let tx_hash_0 = node0.generate_transaction();
info!("Generate 2 txs with same input");
let tx1 = node0.new_transaction(tx_hash_0.clone());

let output = CellOutputBuilder::default()
.capacity(capacity_bytes!(90).pack())
.build();

let tx1 = tx1.as_advanced_builder().set_outputs(vec![output]).build();
node0.rpc_client().send_transaction(tx1.data().into());

let result = wait_until(60, || {
node1.get_tip_block_number() == node0.get_tip_block_number()
});
assert!(result, "node0 can't sync with node1");

let result = wait_until(60, || {
node1
.rpc_client()
.get_transaction(tx1.hash())
.transaction
.is_some()
});
assert!(result, "Node0 should accept tx");
// node0 remove tx1 from tx_pool
node0.remove_transaction(tx1.hash());

// a new tx with same input and lower fee
// node0 will accept it and node1 will reject it and put it in conflicts pool
let tx2_temp = node0.new_transaction(tx_hash_0);
let output = CellOutputBuilder::default()
.capacity(capacity_bytes!(95).pack())
.build();

let tx2 = tx2_temp
.as_advanced_builder()
.set_outputs(vec![output])
.build();
let res = node0
.rpc_client()
.send_transaction_result(tx2.data().into());
assert!(res.is_ok(), "tx2 should be accepted by node0");

let _ = wait_until(60, || {
node1.get_tip_block_number() == node0.get_tip_block_number()
});

let _result = wait_until(60, || get_tx_pool_conflicts(node1).len() == 1);

let res = node1.get_transaction(tx2.hash());
assert_eq!(res.status, Status::Rejected);
let res = node1.get_transaction(tx1.hash());
assert_eq!(res.status, Status::Pending);
assert_eq!(get_tx_pool_conflicts(node1), vec![tx2.hash().unpack()]);
}

pub struct SendConflictTxToRelay;
impl Spec for SendConflictTxToRelay {
crate::setup!(num_nodes: 2, retry_failed: 5);

fn run(&self, nodes: &mut Vec<Node>) {
run_spec_send_conflict_relay(nodes);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
config.network.connect_outbound_interval_secs = 0;
config.tx_pool.min_fee_rate = ckb_types::core::FeeRate(1500);
}
}

pub struct SendConflictTxToRelayRBF;
impl Spec for SendConflictTxToRelayRBF {
crate::setup!(num_nodes: 2, retry_failed: 5);

fn run(&self, nodes: &mut Vec<Node>) {
run_spec_send_conflict_relay(nodes);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
config.network.connect_outbound_interval_secs = 0;
config.tx_pool.min_fee_rate = ckb_types::core::FeeRate(1000);
config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(1500);
}
}
14 changes: 14 additions & 0 deletions test/src/util/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::util::cell::{as_input, as_inputs, as_output, as_outputs};
use crate::{Net, Node};
use ckb_jsonrpc_types::{RawTxPool, TxPoolEntries};
use ckb_network::SupportProtocols;
use ckb_types::{
bytes::Bytes,
Expand Down Expand Up @@ -81,3 +82,16 @@ pub fn relay_tx(net: &Net, node: &Node, tx: TransactionView, cycles: u64) {
.build();
net.send(node, SupportProtocols::RelayV3, tx_msg.as_bytes());
}

pub fn get_tx_pool_conflicts(node: &Node) -> Vec<ckb_types::H256> {
let tx_pool_raw = node.rpc_client().get_raw_tx_pool(Some(true));
match tx_pool_raw {
RawTxPool::Verbose(TxPoolEntries { mut conflicted, .. }) => {
conflicted.sort_unstable();
conflicted
}
_ => {
panic!("tx_pool_raw is None");
}
}
}
5 changes: 5 additions & 0 deletions tx-pool/src/component/pool_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,11 @@ impl PoolMap {
.collect()
}

pub(crate) fn find_conflict_outpoint(&self, tx: &TransactionView) -> Option<OutPoint> {
tx.input_pts_iter()
.find_map(|out_point| self.edges.get_input_ref(&out_point).map(|_| out_point))
}

pub(crate) fn resolve_conflict(&mut self, tx: &TransactionView) -> Vec<ConflictEntry> {
let mut conflicts = Vec::new();

Expand Down
Loading
Loading