diff --git a/CHANGELOG.md b/CHANGELOG.md index 1675622984..45cb473eaf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,16 @@ +### Features +* #4079, **tx-pol:** Implement `Replace-by-Fee(RBF)` for tx-pool (@chenyukang) +This feature enables users to replace a transaction with a higher fee rate, which is useful when the transaction is stuck in the tx-pool: + * Add `min_rbf_rate` in `ckb.toml` with default value `1500`, which means the minimum extra fee rate for RBF, the unit is `shannons/KB` + * Add fields `fee` and `min_replace_fee` in `get_transaction`, which means the the minimal fee need to pay for RBF for a specific transaction + * The replaced transaction will be removed from `tx-pool` and with the status `Rejected`. + +### Improvements +* #3993, **tx-pool:** Almost reimplemented `tx-pool` with `multi_index_map`, with the following improvements (@chenyukang): + * Sort txs in pool by `score` in `Pending` stage, `txs` with higher `score` be processed first + * Evict `txs` from pool with `descendants_count` and `fee_rate` + * Eliminate redundant code for clean and consistent code + # [v0.110.0](https://github.com/nervosnetwork/ckb/compare/v0.109.0...v0.110.0) (2023-05-15) ### Features diff --git a/Cargo.lock b/Cargo.lock index 8eeacd4a0d..f1d60565ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1436,9 +1436,9 @@ dependencies = [ "ckb-types", "ckb-util", "ckb-verification", - "ckb_multi_index_map", "hyper", "lru", + "multi_index_map", "rand 0.8.5", "rustc-hash", "sentry", @@ -1567,21 +1567,6 @@ dependencies = [ "paste", ] -[[package]] -name = "ckb_multi_index_map" -version = "0.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53c20823dfd9f9a8e30faa3b0bdcab4801fb2544957586fada3884c78dcdf38b" -dependencies = [ - "convert_case 0.6.0", - "proc-macro-error", - "proc-macro2", - "quote", - "rustc-hash", - "slab", - "syn", -] - [[package]] name = "clang-sys" version = "1.3.1" @@ -3157,6 +3142,30 @@ dependencies = [ "faster-hex", ] +[[package]] +name = "multi_index_map" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03f409d5f41e6b8a2faa0b363c4523742f0ef5e4015fd4e298a5c7dbb3a3e01c" +dependencies = [ + "multi_index_map_derive", + "rustc-hash", + "slab", +] + +[[package]] +name = "multi_index_map_derive" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98e81cd436463efbaa95a2d2bac3028e6998a4bb2ef8a82a661de3567bb79d5a" +dependencies = [ + "convert_case 0.6.0", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "native-tls" version = "0.2.11" diff --git a/pow/src/lib.rs b/pow/src/lib.rs index 4429f24b35..6ce8c1c163 100644 --- a/pow/src/lib.rs +++ b/pow/src/lib.rs @@ -27,10 +27,10 @@ pub enum Pow { /// Mocking dummy PoW engine Dummy, /// The Eaglesong PoW engine - /// Check details of Eaglesong from: https://github.com/nervosnetwork/rfcs/blob/master/rfcs/0010-eaglesong/0010-eaglesong.md + /// Check details of Eaglesong from: Eaglesong, /// The Eaglesong PoW engine, similar to `Eaglesong`, but using `blake2b` hash as the final output. - /// Check details of blake2b from: https://tools.ietf.org/html/rfc7693 and blake2b-rs from: https://github.com/nervosnetwork/blake2b-rs + /// Check details of blake2b from: and blake2b-rs from: EaglesongBlake2b, } diff --git a/resource/ckb.toml b/resource/ckb.toml index 89ec89f6fb..9f99d4bb98 100644 --- a/resource/ckb.toml +++ b/resource/ckb.toml @@ -134,6 +134,8 @@ enable_deprecated_rpc = false # {{ [tx_pool] max_tx_pool_size = 180_000_000 # 180mb min_fee_rate = 1_000 # Here fee_rate are calculated directly using size in units of shannons/KB +# min_rbf_rate > min_fee_rate means RBF is enabled +min_rbf_rate = 1_500 # Here fee_rate are calculated directly using size in units of shannons/KB max_tx_verify_cycles = 70_000_000 max_ancestors_count = 25 diff --git a/rpc/README.md b/rpc/README.md index b361043232..47a4ffc1c4 100644 --- a/rpc/README.md +++ b/rpc/README.md @@ -882,6 +882,8 @@ Response }, "cycles": "0x219", "time_added_to_pool" : "0x187b3d137a1", + "fee": "0x16923f7dcf", + "min_replace_fee": "0x16923f7f6a", "tx_status": { "block_hash": null, "status": "pending", @@ -4510,6 +4512,7 @@ Response "result": { "last_txs_updated_at": "0x0", "min_fee_rate": "0x3e8", + "min_rbf_rate": "0x5dc", "max_tx_pool_size": "0xaba9500", "orphan": "0x0", "pending": "0x1", @@ -5072,6 +5075,10 @@ For example, a cellbase transaction is not allowed in `send_transaction` RPC. (-1110): The transaction exceeded maximum size limit. +### Error `PoolRejectedRBF` + +(-1111): The transaction is rejected for RBF checking. + ### Error `Indexer` (-1200): The indexer error. @@ -6426,7 +6433,7 @@ TX reject message `PoolTransactionReject` is a JSON object with following fields. -* `type`: `"LowFeeRate" | "ExceededMaximumAncestorsCount" | "ExceededTransactionSizeLimit" | "Full" | "Duplicated" | "Malformed" | "DeclaredWrongCycles" | "Resolve" | "Verification" | "Expiry"` - Reject type. +* `type`: `"LowFeeRate" | "ExceededMaximumAncestorsCount" | "ExceededTransactionSizeLimit" | "Full" | "Duplicated" | "Malformed" | "DeclaredWrongCycles" | "Resolve" | "Verification" | "Expiry" | "RBFRejected"` - Reject type. * `description`: `string` - Detailed description about why the transaction is rejected. Different reject types: @@ -6441,6 +6448,7 @@ Different reject types: * `Resolve`: Resolve failed * `Verification`: Verification failed * `Expiry`: Transaction expired +* `RBFRejected`: RBF rejected ### Type `ProposalShortId` @@ -6951,6 +6959,10 @@ The JSON view of a transaction as well as its status. * `tx_status`: [`TxStatus`](#type-txstatus) - The Transaction status. +* `fee`: [`Capacity`](#type-capacity) `|` `null` - The transaction fee of the transaction + +* `min_replace_fee`: [`Capacity`](#type-capacity) `|` `null` - The minimal fee required to replace this transaction + ### Type `TxPoolEntries` @@ -7035,6 +7047,10 @@ Transaction pool information. The unit is Shannons per 1000 bytes transaction serialization size in the block. +* `min_rbf_rate`: [`Uint64`](#type-uint64) - RBF rate threshold. The pool reject to replace for transactions which fee rate is below this threshold. if min_rbf_rate > min_fee_rate then RBF is enabled on the node. + + The unit is Shannons per 1000 bytes transaction serialization size in the block. + * `last_txs_updated_at`: [`Timestamp`](#type-timestamp) - Last updated time. This is the Unix timestamp in milliseconds. * `tx_size_limit`: [`Uint64`](#type-uint64) - Limiting transactions to tx_size_limit diff --git a/rpc/src/error.rs b/rpc/src/error.rs index 1f86db9b72..e566e96eb2 100644 --- a/rpc/src/error.rs +++ b/rpc/src/error.rs @@ -112,6 +112,8 @@ pub enum RPCError { TransactionExpired = -1109, /// (-1110): The transaction exceeded maximum size limit. PoolRejectedTransactionBySizeLimit = -1110, + /// (-1111): The transaction is rejected for RBF checking. + PoolRejectedRBF = -1111, /// (-1200): The indexer error. Indexer = -1200, } @@ -173,6 +175,7 @@ impl RPCError { Reject::DeclaredWrongCycles(..) => RPCError::PoolRejectedMalformedTransaction, Reject::Resolve(_) => RPCError::TransactionFailedToResolve, Reject::Verification(_) => RPCError::TransactionFailedToVerify, + Reject::RBFRejected(_) => RPCError::PoolRejectedRBF, Reject::ExceededTransactionSizeLimit(_, _) => { RPCError::PoolRejectedTransactionBySizeLimit } diff --git a/rpc/src/module/chain.rs b/rpc/src/module/chain.rs index 08574e13bb..36e0f4238f 100644 --- a/rpc/src/module/chain.rs +++ b/rpc/src/module/chain.rs @@ -617,6 +617,8 @@ pub trait ChainRpc { /// }, /// "cycles": "0x219", /// "time_added_to_pool" : "0x187b3d137a1", + /// "fee": "0x16923f7dcf", + /// "min_replace_fee": "0x16923f7f6a", /// "tx_status": { /// "block_hash": null, /// "status": "pending", @@ -2130,11 +2132,11 @@ impl ChainRpcImpl { .and_then(|v| v.get(tx_info.index.saturating_sub(1)).copied()) }) }; - return Ok(TransactionWithStatus::with_committed( None, tx_info.block_hash.unpack(), cycles, + None, )); } @@ -2181,6 +2183,7 @@ impl ChainRpcImpl { Some(tx), tx_info.block_hash.unpack(), cycles, + None, )); } diff --git a/rpc/src/module/pool.rs b/rpc/src/module/pool.rs index dfd4d9531d..1fe3e45de6 100644 --- a/rpc/src/module/pool.rs +++ b/rpc/src/module/pool.rs @@ -166,6 +166,7 @@ pub trait PoolRpc { /// "result": { /// "last_txs_updated_at": "0x0", /// "min_fee_rate": "0x3e8", + /// "min_rbf_rate": "0x5dc", /// "max_tx_pool_size": "0xaba9500", /// "orphan": "0x0", /// "pending": "0x1", diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index b348226560..7f0a78c757 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -1,10 +1,10 @@ //! CKB node has initial block download phase (IBD mode) like Bitcoin: -//! https://btcinformation.org/en/glossary/initial-block-download +//! //! //! When CKB node is in IBD mode, it will respond `packed::InIBD` to `GetHeaders` and `GetBlocks` requests //! //! And CKB has a headers-first synchronization style like Bitcoin: -//! https://btcinformation.org/en/glossary/headers-first-sync +//! //! mod block_fetcher; mod block_process; diff --git a/test/src/main.rs b/test/src/main.rs index 02e09c809b..81da1d8a27 100644 --- a/test/src/main.rs +++ b/test/src/main.rs @@ -458,7 +458,16 @@ fn all_specs() -> Vec> { Box::new(RelayWithWrongTx::new()), Box::new(TxsRelayOrder), Box::new(SendTxChain), - Box::new(DifferentTxsWithSameInput), + Box::new(DifferentTxsWithSameInputWithOutRBF), + Box::new(RbfEnable), + Box::new(RbfBasic), + Box::new(RbfSameInput), + Box::new(RbfOnlyForResolveDead), + Box::new(RbfSameInputwithLessFee), + Box::new(RbfTooManyDescendants), + Box::new(RbfContainNewTx), + Box::new(RbfContainInvalidInput), + Box::new(RbfRejectReplaceProposed), Box::new(CompactBlockEmpty), Box::new(CompactBlockEmptyParentUnknown), Box::new(CompactBlockPrefilled), diff --git a/test/src/specs/tx_pool/descendant.rs b/test/src/specs/tx_pool/descendant.rs index 04b5a5bc4b..5fdb7a8bc0 100644 --- a/test/src/specs/tx_pool/descendant.rs +++ b/test/src/specs/tx_pool/descendant.rs @@ -1,3 +1,5 @@ +use ckb_jsonrpc_types::Status; + use crate::specs::tx_pool::utils::prepare_tx_family; use crate::utils::{blank, commit, propose}; use crate::{Node, Spec}; @@ -143,14 +145,27 @@ impl Spec for SubmitTransactionWhenItsParentInGap { // 2. Submit `tx_family.b` into pending-pool. Then we expect that miner propose it. node.submit_transaction(family.b()); - let block = node.new_block_with_blocking(|template| template.proposals.len() != 2); - assert!( - block - .union_proposal_ids() - .contains(&family.b().proposal_short_id()), - "Miner should propose tx_family.b since it has never been proposed, actual: {:?}", - block.union_proposal_ids(), - ); + + (0..=window.closest()).for_each(|_| { + node.submit_block(&blank(node)); + }); + + // commit `tx_family.a` + let block = node.new_block(None, None, None); + let trans = block.transactions(); + assert_eq!(trans.len(), 2); + assert_eq!(trans[1].proposal_short_id(), family.a().proposal_short_id()); + node.submit_block(&block); + + (0..=window.closest()).for_each(|_| { + node.submit_block(&blank(node)); + }); + + // commit `tx_family.b` + let block = node.new_block(None, None, None); + let trans = block.transactions(); + assert_eq!(trans.len(), 2); + assert_eq!(trans[1].proposal_short_id(), family.b().proposal_short_id()); node.submit_block(&block); } } @@ -166,21 +181,30 @@ impl Spec for SubmitTransactionWhenItsParentInProposed { // 1. Propose `tx_family.a` into proposed-pool. let family = prepare_tx_family(node); - node.submit_transaction(family.a()); - node.submit_block(&propose(node, &[family.a()])); + let tx_a = family.a(); + node.submit_transaction(tx_a); + node.submit_block(&propose(node, &[tx_a])); (0..=window.closest()).for_each(|_| { node.submit_block(&blank(node)); }); + // tx_a should in Proposed status + let tx_a_status = node.get_transaction(tx_a.hash()); + assert_eq!(tx_a_status.status, Status::Proposed); + // 2. Submit `tx_family.b` into pending-pool. Then we expect that miner propose it. node.submit_transaction(family.b()); let block = node.new_block_with_blocking(|template| template.proposals.is_empty()); + let union_proposal_ids = block.union_proposal_ids(); assert!( - block - .union_proposal_ids() - .contains(&family.b().proposal_short_id()), + union_proposal_ids.contains(&family.b().proposal_short_id()), "Miner should propose tx_family.b since it has never been proposed, actual: {:?}", - block.union_proposal_ids(), + union_proposal_ids, + ); + assert!( + !union_proposal_ids.contains(&tx_a.proposal_short_id()), + "Miner should not propose tx_family.a since it has been proposed, actual: {:?}", + union_proposal_ids, ); node.submit_block(&block); } @@ -201,7 +225,7 @@ impl Spec for ProposeTransactionButParentNot { node.submit_transaction(family.b()); // 2. Propose `tx_family.b`, but `tx_family.a` not, then continuously submit blank blocks. - // In the time, miner should not commit `tx_family.b` as its parent, `tx_family.a` has + // In the time, miner should not commit `tx_family.b` as its parent `tx_family.a` has // not been not proposed and committed yet. node.submit_block(&propose(node, &[family.b()])); (0..window.closest()).for_each(|_| { diff --git a/test/src/specs/tx_pool/different_txs_with_same_input.rs b/test/src/specs/tx_pool/different_txs_with_same_input.rs index a816bd2eb9..f590f7ae81 100644 --- a/test/src/specs/tx_pool/different_txs_with_same_input.rs +++ b/test/src/specs/tx_pool/different_txs_with_same_input.rs @@ -7,9 +7,9 @@ use ckb_types::{ prelude::*, }; -pub struct DifferentTxsWithSameInput; +pub struct DifferentTxsWithSameInputWithOutRBF; -impl Spec for DifferentTxsWithSameInput { +impl Spec for DifferentTxsWithSameInputWithOutRBF { fn run(&self, nodes: &mut Vec) { let node0 = &nodes[0]; @@ -19,6 +19,7 @@ impl Spec for DifferentTxsWithSameInput { info!("Generate 2 txs with same input"); let tx1 = node0.new_transaction(tx_hash_0.clone()); let tx2_temp = node0.new_transaction(tx_hash_0); + // Set tx2 fee to a higher value, tx1 capacity is 100, set tx2 capacity to 80 for +20 fee. let output = CellOutputBuilder::default() .capacity(capacity_bytes!(80).pack()) @@ -46,7 +47,7 @@ impl Spec for DifferentTxsWithSameInput { .map(TransactionView::hash) .collect(); - // RBF (Replace-By-Fees) is not implemented + // RBF (Replace-By-Fees) is not enabled assert!(commit_txs_hash.contains(&tx1.hash())); assert!(!commit_txs_hash.contains(&tx2.hash())); diff --git a/test/src/specs/tx_pool/mod.rs b/test/src/specs/tx_pool/mod.rs index bd8ffd3a87..925b5618fc 100644 --- a/test/src/specs/tx_pool/mod.rs +++ b/test/src/specs/tx_pool/mod.rs @@ -15,6 +15,7 @@ mod pool_resurrect; mod proposal_expire_rule; mod remove_tx; mod reorg_proposals; +mod replace; mod send_defected_binary; mod send_large_cycles_tx; mod send_low_fee_rate_tx; @@ -42,6 +43,7 @@ pub use pool_resurrect::*; pub use proposal_expire_rule::*; pub use remove_tx::*; pub use reorg_proposals::*; +pub use replace::*; pub use send_defected_binary::*; pub use send_large_cycles_tx::*; pub use send_low_fee_rate_tx::*; diff --git a/test/src/specs/tx_pool/replace.rs b/test/src/specs/tx_pool/replace.rs new file mode 100644 index 0000000000..90b972a640 --- /dev/null +++ b/test/src/specs/tx_pool/replace.rs @@ -0,0 +1,506 @@ +use crate::{Node, Spec}; +use ckb_jsonrpc_types::Status; +use ckb_logger::info; +use ckb_types::{ + core::{capacity_bytes, Capacity, TransactionView}, + packed::{Byte32, CellInput, CellOutputBuilder, OutPoint}, + prelude::*, +}; + +pub struct RbfEnable; +impl Spec for RbfEnable { + fn run(&self, nodes: &mut Vec) { + let node0 = &nodes[0]; + + node0.mine_until_out_bootstrap_period(); + node0.new_block_with_blocking(|template| template.number.value() != 13); + let tx_hash_0 = node0.generate_transaction(); + let tx1 = node0.new_transaction(tx_hash_0); + + let output = CellOutputBuilder::default() + .capacity(capacity_bytes!(70).pack()) + .build(); + + let tx1 = tx1.as_advanced_builder().set_outputs(vec![output]).build(); + + node0.rpc_client().send_transaction(tx1.data().into()); + let ret = node0 + .rpc_client() + .get_transaction_with_verbosity(tx1.hash(), 2); + + assert_eq!(ret.min_replace_fee, None); + } + + fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) { + config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(100); + config.tx_pool.min_fee_rate = ckb_types::core::FeeRate(100); + } +} + +pub struct RbfBasic; +impl Spec for RbfBasic { + fn run(&self, nodes: &mut Vec) { + let node0 = &nodes[0]; + + node0.mine_until_out_bootstrap_period(); + node0.new_block_with_blocking(|template| template.number.value() != 13); + let tx_hash_0 = node0.generate_transaction(); + info!("Generate 2 txs with same input"); + let tx1 = node0.new_transaction(tx_hash_0.clone()); + let tx2_temp = node0.new_transaction(tx_hash_0); + + // Set tx2 fee to a higher value, tx1 capacity is 100, set tx2 capacity to 80 for +20 fee. + let output = CellOutputBuilder::default() + .capacity(capacity_bytes!(80).pack()) + .build(); + + let tx2 = tx2_temp + .as_advanced_builder() + .set_outputs(vec![output]) + .build(); + + node0.rpc_client().send_transaction(tx1.data().into()); + let ret = node0 + .rpc_client() + .get_transaction_with_verbosity(tx1.hash(), 2); + // min_replace_fee is 363 + assert_eq!(ret.min_replace_fee.unwrap().to_string(), "0x16b"); + + let res = node0 + .rpc_client() + .send_transaction_result(tx2.data().into()); + assert!(res.is_ok(), "tx2 should replace old tx"); + + node0.mine_with_blocking(|template| template.proposals.len() != 2); + node0.mine_with_blocking(|template| template.number.value() != 14); + node0.mine_with_blocking(|template| template.transactions.len() != 2); + + let tip_block = node0.get_tip_block(); + let commit_txs_hash: Vec<_> = tip_block + .transactions() + .iter() + .map(TransactionView::hash) + .collect(); + + // RBF (Replace-By-Fees) is enabled + assert!(!commit_txs_hash.contains(&tx1.hash())); + assert!(commit_txs_hash.contains(&tx2.hash())); + + // when tx2 should be committed + let ret = node0.rpc_client().get_transaction(tx2.hash()); + assert!( + matches!(ret.tx_status.status, Status::Committed), + "tx2 should be committed" + ); + + // verbosity = 1 + let ret = node0 + .rpc_client() + .get_transaction_with_verbosity(tx1.hash(), 1); + assert!(ret.transaction.is_none()); + assert!(matches!(ret.tx_status.status, Status::Rejected)); + assert!(ret.tx_status.reason.unwrap().contains("RBFRejected")); + + // verbosity = 2 + let ret = node0 + .rpc_client() + .get_transaction_with_verbosity(tx2.hash(), 2); + assert!(ret.transaction.is_some()); + assert!(matches!(ret.tx_status.status, Status::Committed)); + + let ret = node0 + .rpc_client() + .get_transaction_with_verbosity(tx1.hash(), 2); + assert!(ret.transaction.is_none()); + assert!(matches!(ret.tx_status.status, Status::Rejected)); + assert!(ret.tx_status.reason.unwrap().contains("RBFRejected")); + } + + fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) { + config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(1500); + } +} + +pub struct RbfSameInput; +impl Spec for RbfSameInput { + fn run(&self, nodes: &mut Vec) { + let node0 = &nodes[0]; + + node0.mine_until_out_bootstrap_period(); + node0.new_block_with_blocking(|template| template.number.value() != 13); + let tx_hash_0 = node0.generate_transaction(); + info!("Generate 2 txs with same input"); + let tx1 = node0.new_transaction(tx_hash_0.clone()); + let tx2_temp = node0.new_transaction(tx_hash_0); + + let tx2 = tx2_temp.as_advanced_builder().build(); + + node0.rpc_client().send_transaction(tx1.data().into()); + let res = node0 + .rpc_client() + .send_transaction_result(tx2.data().into()); + assert!(res.is_err(), "tx2 should be rejected"); + } + + fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) { + config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(1500); + } +} + +pub struct RbfOnlyForResolveDead; +impl Spec for RbfOnlyForResolveDead { + fn run(&self, nodes: &mut Vec) { + let node0 = &nodes[0]; + + node0.mine_until_out_bootstrap_period(); + node0.new_block_with_blocking(|template| template.number.value() != 13); + + let tx_hash_0 = node0.generate_transaction(); + + let tx1 = node0.new_transaction(tx_hash_0); + + // This is an unknown input + let tx_hash_1 = Byte32::zero(); + let tx2 = tx1 + .as_advanced_builder() + .set_inputs(vec![{ + CellInput::new_builder() + .previous_output(OutPoint::new(tx_hash_1, 0)) + .build() + }]) + .build(); + + let res = node0 + .rpc_client() + .send_transaction_result(tx2.data().into()); + let message = res.err().unwrap().to_string(); + assert!(message.contains("TransactionFailedToResolve: Resolve failed Unknown")); + } + + fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) { + config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(1500); + } +} + +pub struct RbfSameInputwithLessFee; + +// RBF Rule #3, #4 +impl Spec for RbfSameInputwithLessFee { + fn run(&self, nodes: &mut Vec) { + let node0 = &nodes[0]; + + node0.mine_until_out_bootstrap_period(); + node0.new_block_with_blocking(|template| template.number.value() != 13); + let tx_hash_0 = node0.generate_transaction(); + info!("Generate 2 txs with same input"); + let tx1 = node0.new_transaction(tx_hash_0.clone()); + let tx2_temp = node0.new_transaction(tx_hash_0); + + let output1 = CellOutputBuilder::default() + .capacity(capacity_bytes!(80).pack()) + .build(); + + let tx1 = tx1.as_advanced_builder().set_outputs(vec![output1]).build(); + + // Set tx2 fee to a lower value + let output2 = CellOutputBuilder::default() + .capacity(capacity_bytes!(90).pack()) + .build(); + + let tx2 = tx2_temp + .as_advanced_builder() + .set_outputs(vec![output2]) + .build(); + + node0.rpc_client().send_transaction(tx1.data().into()); + let res = node0 + .rpc_client() + .send_transaction_result(tx2.data().into()); + assert!(res.is_err(), "tx2 should be rejected"); + let message = res.err().unwrap().to_string(); + assert!(message.contains( + "Tx's current fee is 1000000000, expect it to >= 2000000363 to replace old txs" + )); + } + + fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) { + config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(1500); + } +} + +pub struct RbfTooManyDescendants; + +// RBF Rule #5 +impl Spec for RbfTooManyDescendants { + fn run(&self, nodes: &mut Vec) { + let node0 = &nodes[0]; + + node0.mine_until_out_bootstrap_period(); + + // build txs chain + let tx0 = node0.new_transaction_spend_tip_cellbase(); + let tx0_temp = tx0.clone(); + let mut txs = vec![tx0]; + let max_count = 101; + while txs.len() <= max_count { + let parent = txs.last().unwrap(); + let child = parent + .as_advanced_builder() + .set_inputs(vec![{ + CellInput::new_builder() + .previous_output(OutPoint::new(parent.hash(), 0)) + .build() + }]) + .set_outputs(vec![parent.output(0).unwrap()]) + .build(); + txs.push(child); + } + assert_eq!(txs.len(), max_count + 1); + // send tx chain + for tx in txs[..=max_count - 1].iter() { + let ret = node0.rpc_client().send_transaction_result(tx.data().into()); + assert!(ret.is_ok()); + } + + // Set tx2 fee to a higher value + let output2 = CellOutputBuilder::default() + .capacity(capacity_bytes!(70).pack()) + .build(); + + let tx2 = tx0_temp + .as_advanced_builder() + .set_outputs(vec![output2]) + .build(); + + let res = node0 + .rpc_client() + .send_transaction_result(tx2.data().into()); + assert!(res.is_err(), "tx2 should be rejected"); + assert!(res + .err() + .unwrap() + .to_string() + .contains("Tx conflict too many txs")); + } + + fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) { + config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(1500); + } +} + +pub struct RbfContainNewTx; + +// RBF Rule #2 +impl Spec for RbfContainNewTx { + fn run(&self, nodes: &mut Vec) { + let node0 = &nodes[0]; + + node0.mine_until_out_bootstrap_period(); + + // build txs chain + let tx0 = node0.new_transaction_spend_tip_cellbase(); + let mut txs = vec![tx0]; + let max_count = 5; + while txs.len() <= max_count { + let parent = txs.last().unwrap(); + let child = parent + .as_advanced_builder() + .set_inputs(vec![{ + CellInput::new_builder() + .previous_output(OutPoint::new(parent.hash(), 0)) + .build() + }]) + .set_outputs(vec![parent.output(0).unwrap()]) + .build(); + txs.push(child); + } + assert_eq!(txs.len(), max_count + 1); + // send tx chain + for tx in txs[..=max_count - 1].iter() { + let ret = node0.rpc_client().send_transaction_result(tx.data().into()); + assert!(ret.is_ok()); + } + + let clone_tx = txs[2].clone(); + // Set tx2 fee to a higher value + let output2 = CellOutputBuilder::default() + .capacity(capacity_bytes!(70).pack()) + .build(); + + let tx2 = clone_tx + .as_advanced_builder() + .set_inputs(vec![ + { + CellInput::new_builder() + .previous_output(OutPoint::new(txs[1].hash(), 0)) + .build() + }, + { + CellInput::new_builder() + .previous_output(OutPoint::new(txs[4].hash(), 0)) + .build() + }, + ]) + .set_outputs(vec![output2]) + .build(); + + let res = node0 + .rpc_client() + .send_transaction_result(tx2.data().into()); + assert!(res.is_err(), "tx2 should be rejected"); + assert!(res + .err() + .unwrap() + .to_string() + .contains("new Tx contains unconfirmed inputs")); + } + + fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) { + config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(1500); + } +} + +pub struct RbfContainInvalidInput; + +// RBF Rule #2 +impl Spec for RbfContainInvalidInput { + fn run(&self, nodes: &mut Vec) { + let node0 = &nodes[0]; + + node0.mine_until_out_bootstrap_period(); + + // build txs chain + let tx0 = node0.new_transaction_spend_tip_cellbase(); + let mut txs = vec![tx0]; + let max_count = 5; + while txs.len() <= max_count { + let parent = txs.last().unwrap(); + let child = parent + .as_advanced_builder() + .set_inputs(vec![{ + CellInput::new_builder() + .previous_output(OutPoint::new(parent.hash(), 0)) + .build() + }]) + .set_outputs(vec![parent.output(0).unwrap()]) + .build(); + txs.push(child); + } + assert_eq!(txs.len(), max_count + 1); + // send Tx chain + for tx in txs[..=max_count - 1].iter() { + let ret = node0.rpc_client().send_transaction_result(tx.data().into()); + assert!(ret.is_ok()); + } + + let clone_tx = txs[2].clone(); + // Set tx2 fee to a higher value + let output2 = CellOutputBuilder::default() + .capacity(capacity_bytes!(70).pack()) + .build(); + + let tx2 = clone_tx + .as_advanced_builder() + .set_inputs(vec![ + { + CellInput::new_builder() + .previous_output(OutPoint::new(txs[1].hash(), 0)) + .build() + }, + { + CellInput::new_builder() + .previous_output(OutPoint::new(txs[3].hash(), 0)) + .build() + }, + ]) + .set_outputs(vec![output2]) + .build(); + + let res = node0 + .rpc_client() + .send_transaction_result(tx2.data().into()); + assert!(res.is_err(), "tx2 should be rejected"); + assert!(res + .err() + .unwrap() + .to_string() + .contains("new Tx contains inputs in descendants of to be replaced Tx")); + } + + fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) { + config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(1500); + } +} + +pub struct RbfRejectReplaceProposed; + +// RBF Rule #6 +impl Spec for RbfRejectReplaceProposed { + fn run(&self, nodes: &mut Vec) { + let node0 = &nodes[0]; + + node0.mine_until_out_bootstrap_period(); + + // build txs chain + let tx0 = node0.new_transaction_spend_tip_cellbase(); + let mut txs = vec![tx0]; + let max_count = 5; + while txs.len() <= max_count { + let parent = txs.last().unwrap(); + let child = parent + .as_advanced_builder() + .set_inputs(vec![{ + CellInput::new_builder() + .previous_output(OutPoint::new(parent.hash(), 0)) + .build() + }]) + .set_outputs(vec![parent.output(0).unwrap()]) + .build(); + txs.push(child); + } + assert_eq!(txs.len(), max_count + 1); + // send Tx chain + for tx in txs[..=max_count - 1].iter() { + let ret = node0.rpc_client().send_transaction_result(tx.data().into()); + assert!(ret.is_ok()); + } + + node0.mine_with_blocking(|template| template.proposals.len() != max_count); + let ret = node0.rpc_client().get_transaction(txs[2].hash()); + assert!( + matches!(ret.tx_status.status, Status::Pending), + "tx1 should be pending" + ); + node0.mine(1); + let ret = node0.rpc_client().get_transaction(txs[2].hash()); + assert!( + matches!(ret.tx_status.status, Status::Proposed), + "tx1 should be proposed" + ); + + let clone_tx = txs[2].clone(); + // Set tx2 fee to a higher value + let output2 = CellOutputBuilder::default() + .capacity(capacity_bytes!(70).pack()) + .build(); + + let tx2 = clone_tx + .as_advanced_builder() + .set_outputs(vec![output2]) + .build(); + + let res = node0 + .rpc_client() + .send_transaction_result(tx2.data().into()); + assert!(res.is_err(), "tx2 should be rejected"); + assert!(res + .err() + .unwrap() + .to_string() + .contains("all conflict Txs should be in Pending status")); + } + + fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) { + config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(1500); + } +} diff --git a/test/template/ckb.toml b/test/template/ckb.toml index c722fa1e55..0eea3eb7b1 100644 --- a/test/template/ckb.toml +++ b/test/template/ckb.toml @@ -79,6 +79,7 @@ enable_deprecated_rpc = true [tx_pool] max_tx_pool_size = 180_000_000 # 180mb min_fee_rate = 0 # Here fee_rate are calculated directly using size in units of shannons/KB +min_rbf_rate = 0 # Here rbf_rate are calculated directly using size in units of shannons/KB max_tx_verify_cycles = 70_000_000 max_ancestors_count = 25 diff --git a/tx-pool/Cargo.toml b/tx-pool/Cargo.toml index 04e465561a..c2c6c2ec67 100644 --- a/tx-pool/Cargo.toml +++ b/tx-pool/Cargo.toml @@ -36,7 +36,7 @@ sentry = { version = "0.26.0", optional = true } serde_json = "1.0" rand = "0.8.4" hyper = { version = "0.14", features = ["http1", "client", "tcp"] } -ckb_multi_index_map = "0.0.2" # ckb team fork crate +multi_index_map = "0.6.0" slab = "0.4" rustc-hash = "1.1" tokio-util = "0.7.8" diff --git a/tx-pool/src/chunk_process.rs b/tx-pool/src/chunk_process.rs index 73e4f246eb..51ebdec07d 100644 --- a/tx-pool/src/chunk_process.rs +++ b/tx-pool/src/chunk_process.rs @@ -226,7 +226,8 @@ impl ChunkProcess { let tx_hash = tx.hash(); let (ret, snapshot) = self.service.pre_check(&tx).await; - let (tip_hash, rtx, status, fee, tx_size) = try_or_return_with_snapshot!(ret, snapshot); + let (tip_hash, rtx, status, fee, tx_size, conflicts) = + try_or_return_with_snapshot!(ret, snapshot); let cached = self.service.fetch_tx_verify_cache(&tx_hash).await; @@ -251,8 +252,10 @@ impl ChunkProcess { let completed = try_or_return_with_snapshot!(ret, snapshot); let entry = TxEntry::new(rtx, completed.cycles, fee, tx_size); - let (ret, submit_snapshot) = - self.service.submit_entry(tip_hash, entry, status).await; + let (ret, submit_snapshot) = self + .service + .submit_entry(tip_hash, entry, status, conflicts) + .await; try_or_return_with_snapshot!(ret, submit_snapshot); self.service .after_process(tx, remote, &submit_snapshot, &Ok(completed)) @@ -321,7 +324,10 @@ impl ChunkProcess { } let entry = TxEntry::new(rtx, completed.cycles, fee, tx_size); - let (ret, submit_snapshot) = self.service.submit_entry(tip_hash, entry, status).await; + let (ret, submit_snapshot) = self + .service + .submit_entry(tip_hash, entry, status, conflicts) + .await; try_or_return_with_snapshot!(ret, snapshot); self.service.notify_block_assembler(status).await; diff --git a/tx-pool/src/component/commit_txs_scanner.rs b/tx-pool/src/component/commit_txs_scanner.rs index c2058fdba6..7795a711c4 100644 --- a/tx-pool/src/component/commit_txs_scanner.rs +++ b/tx-pool/src/component/commit_txs_scanner.rs @@ -1,44 +1,45 @@ +extern crate slab; use crate::component::pool_map::PoolMap; -use crate::component::{entry::TxEntry, score_key::AncestorsScoreSortKey}; +use crate::component::{entry::TxEntry, sort_key::AncestorsScoreSortKey}; use ckb_types::{core::Cycle, packed::ProposalShortId}; use ckb_util::LinkedHashMap; -use std::collections::{BTreeSet, HashMap, HashSet}; +use multi_index_map::MultiIndexMap; +use std::collections::HashSet; // A template data struct used to store modified entries when package txs -#[derive(Default)] -pub struct TxModifiedEntries { - entries: HashMap, - sorted_index: BTreeSet, +#[derive(MultiIndexMap, Clone)] +pub struct ModifiedTx { + #[multi_index(hashed_unique)] + pub id: ProposalShortId, + #[multi_index(ordered_non_unique)] + pub score: AncestorsScoreSortKey, + pub inner: TxEntry, } -impl TxModifiedEntries { +impl MultiIndexModifiedTxMap { pub fn next_best_entry(&self) -> Option<&TxEntry> { - self.sorted_index - .iter() - .max() - .map(|key| self.entries.get(&key.id).expect("consistent")) + self.iter_by_score().last().map(|x| &x.inner) } pub fn get(&self, id: &ProposalShortId) -> Option<&TxEntry> { - self.entries.get(id) + self.get_by_id(id).map(|x| &x.inner) } pub fn contains_key(&self, id: &ProposalShortId) -> bool { - self.entries.contains_key(id) + self.get_by_id(id).is_some() } - pub fn insert(&mut self, entry: TxEntry) { - let key = AncestorsScoreSortKey::from(&entry); - let short_id = entry.proposal_short_id(); - self.entries.insert(short_id, entry); - self.sorted_index.insert(key); + pub fn insert_entry(&mut self, entry: TxEntry) { + let score = AncestorsScoreSortKey::from(&entry); + self.insert(ModifiedTx { + id: entry.proposal_short_id(), + score, + inner: entry, + }); } pub fn remove(&mut self, id: &ProposalShortId) -> Option { - self.entries.remove(id).map(|entry| { - self.sorted_index.remove(&(&entry).into()); - entry - }) + self.remove_by_id(id).map(|x| x.inner) } } @@ -53,7 +54,7 @@ pub struct CommitTxsScanner<'a> { entries: Vec, // modified_entries will store sorted packages after they are modified // because some of their txs are already in the block - modified_entries: TxModifiedEntries, + modified_entries: MultiIndexModifiedTxMap, // txs that packaged in block fetched_txs: HashSet, // Keep track of entries that failed inclusion, to avoid duplicate work @@ -65,7 +66,7 @@ impl<'a> CommitTxsScanner<'a> { CommitTxsScanner { entries: Vec::new(), pool_map, - modified_entries: TxModifiedEntries::default(), + modified_entries: MultiIndexModifiedTxMap::default(), fetched_txs: HashSet::default(), failed_txs: HashSet::default(), } @@ -210,7 +211,7 @@ impl<'a> CommitTxsScanner<'a> { .or_else(|| self.pool_map.get(desc_id).cloned()) { desc.sub_ancestor_weight(entry); - self.modified_entries.insert(desc); + self.modified_entries.insert_entry(desc); } } } diff --git a/tx-pool/src/component/entry.rs b/tx-pool/src/component/entry.rs index 2f8fdf95ef..f45d4feace 100644 --- a/tx-pool/src/component/entry.rs +++ b/tx-pool/src/component/entry.rs @@ -1,4 +1,4 @@ -use crate::component::score_key::AncestorsScoreSortKey; +use crate::component::sort_key::{AncestorsScoreSortKey, EvictKey}; use ckb_systemtime::unix_time_as_millis; use ckb_types::{ core::{ @@ -194,21 +194,6 @@ impl TxEntry { } } -impl From<&TxEntry> for AncestorsScoreSortKey { - fn from(entry: &TxEntry) -> Self { - let weight = get_transaction_weight(entry.size, entry.cycles); - let ancestors_weight = get_transaction_weight(entry.ancestors_size, entry.ancestors_cycles); - AncestorsScoreSortKey { - fee: entry.fee, - weight, - id: entry.proposal_short_id(), - ancestors_fee: entry.ancestors_fee, - ancestors_weight, - //timestamp: entry.timestamp, - } - } -} - impl Hash for TxEntry { fn hash(&self, state: &mut H) { Hash::hash(self.transaction(), state); @@ -233,14 +218,17 @@ impl Ord for TxEntry { } } -/// First compare fee_rate, select the smallest fee_rate, -/// and then select the latest timestamp, for eviction, -/// the latest timestamp which also means that the fewer descendants may exist. -#[derive(Eq, PartialEq, Clone, Debug)] -pub struct EvictKey { - pub fee_rate: FeeRate, - pub timestamp: u64, - pub descendants_count: usize, +impl From<&TxEntry> for AncestorsScoreSortKey { + fn from(entry: &TxEntry) -> Self { + let weight = get_transaction_weight(entry.size, entry.cycles); + let ancestors_weight = get_transaction_weight(entry.ancestors_size, entry.ancestors_cycles); + AncestorsScoreSortKey { + fee: entry.fee, + weight, + ancestors_fee: entry.ancestors_fee, + ancestors_weight, + } + } } impl From<&TxEntry> for EvictKey { @@ -258,23 +246,3 @@ impl From<&TxEntry> for EvictKey { } } } - -impl PartialOrd for EvictKey { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for EvictKey { - fn cmp(&self, other: &Self) -> Ordering { - if self.fee_rate == other.fee_rate { - if self.descendants_count == other.descendants_count { - self.timestamp.cmp(&other.timestamp) - } else { - self.descendants_count.cmp(&other.descendants_count) - } - } else { - self.fee_rate.cmp(&other.fee_rate) - } - } -} diff --git a/tx-pool/src/component/mod.rs b/tx-pool/src/component/mod.rs index 7f325424ba..e5b8ab3cfc 100644 --- a/tx-pool/src/component/mod.rs +++ b/tx-pool/src/component/mod.rs @@ -7,7 +7,7 @@ pub(crate) mod links; pub(crate) mod orphan; pub(crate) mod pool_map; pub(crate) mod recent_reject; -pub(crate) mod score_key; +pub(crate) mod sort_key; #[cfg(test)] mod tests; diff --git a/tx-pool/src/component/pool_map.rs b/tx-pool/src/component/pool_map.rs index 238c63ad83..a0830a42d4 100644 --- a/tx-pool/src/component/pool_map.rs +++ b/tx-pool/src/component/pool_map.rs @@ -2,24 +2,21 @@ extern crate rustc_hash; extern crate slab; use crate::component::edges::Edges; -use crate::component::entry::EvictKey; use crate::component::links::{Relation, TxLinksMap}; -use crate::component::score_key::AncestorsScoreSortKey; +use crate::component::sort_key::{AncestorsScoreSortKey, EvictKey}; use crate::error::Reject; use crate::TxEntry; -use ckb_logger::{debug, trace}; -use ckb_multi_index_map::MultiIndexMap; + +use ckb_logger::trace; use ckb_types::core::error::OutPointError; use ckb_types::packed::OutPoint; +use ckb_types::prelude::*; use ckb_types::{ bytes::Bytes, - core::{cell::CellChecker, TransactionView}, + core::TransactionView, packed::{Byte32, CellOutput, ProposalShortId}, }; -use ckb_types::{ - core::cell::{CellMetaBuilder, CellProvider, CellStatus}, - prelude::*, -}; +use multi_index_map::MultiIndexMap; use std::collections::HashSet; use super::links::TxLinks; @@ -114,6 +111,10 @@ impl PoolMap { self.entries.get_by_id(id) } + fn get_by_id_checked(&self, id: &ProposalShortId) -> &PoolEntry { + self.get_by_id(id).expect("unconsistent pool") + } + pub(crate) fn get_by_status(&self, status: Status) -> Vec<&PoolEntry> { self.entries.get_by_status(&status) } @@ -171,9 +172,9 @@ impl PoolMap { return Ok(false); } trace!("pool_map.add_{:?} {}", status, entry.transaction().hash()); - self.check_record_ancestors(&mut entry)?; + self.check_and_record_ancestors(&mut entry)?; self.insert_entry(&entry, status); - self.record_entry_deps(&entry); + self.record_entry_edges(&entry); self.record_entry_descendants(&entry); Ok(true) } @@ -191,7 +192,6 @@ impl PoolMap { self.entries.remove_by_id(id).map(|entry| { self.update_ancestors_index_key(&entry.inner, EntryOp::Remove); self.update_descendants_index_key(&entry.inner, EntryOp::Remove); - self.remove_entry_deps(&entry.inner); self.remove_entry_edges(&entry.inner); self.remove_entry_links(id); entry.inner @@ -240,11 +240,16 @@ impl PoolMap { conflicts } + pub(crate) fn find_conflict_tx(&self, tx: &TransactionView) -> HashSet { + tx.input_pts_iter() + .filter_map(|out_point| self.edges.get_input_ref(&out_point).cloned()) + .collect() + } + pub(crate) fn resolve_conflict(&mut self, tx: &TransactionView) -> Vec { - let inputs = tx.input_pts_iter(); let mut conflicts = Vec::new(); - for i in inputs { + for i in tx.input_pts_iter() { if let Some(id) = self.edges.remove_input(&i) { let entries = self.remove_entry_and_descendants(&id); if !entries.is_empty() { @@ -361,7 +366,7 @@ impl PoolMap { } } - fn record_entry_deps(&mut self, entry: &TxEntry) { + fn record_entry_edges(&mut self, entry: &TxEntry) { let tx_short_id: ProposalShortId = entry.proposal_short_id(); let header_deps = entry.transaction().header_deps(); let related_dep_out_points: Vec<_> = entry.related_dep_out_points().cloned().collect(); @@ -414,7 +419,7 @@ impl PoolMap { } /// Check ancestors and record for entry - fn check_record_ancestors(&mut self, entry: &mut TxEntry) -> Result { + fn check_and_record_ancestors(&mut self, entry: &mut TxEntry) -> Result { let mut parents: HashSet = HashSet::with_capacity( entry.transaction().inputs().len() + entry.transaction().cell_deps().len(), ); @@ -446,14 +451,10 @@ impl PoolMap { // update parents references for ancestor_id in &ancestors { - let ancestor = self - .entries - .get_by_id(ancestor_id) - .expect("pool consistent"); + let ancestor = self.get_by_id_checked(ancestor_id); entry.add_ancestor_weight(&ancestor.inner); } if entry.ancestors_count > self.max_ancestors_count { - debug!("debug: exceeded maximum ancestors count"); return Err(Reject::ExceededMaximumAncestorsCount); } @@ -476,9 +477,6 @@ impl PoolMap { // release input record self.edges.remove_input(&i); } - } - - fn remove_entry_deps(&mut self, entry: &TxEntry) { let id = entry.proposal_short_id(); for d in entry.related_dep_out_points().cloned() { self.edges.delete_txid_by_dep(d, &id); @@ -500,31 +498,3 @@ impl PoolMap { }); } } - -impl CellProvider for PoolMap { - fn cell(&self, out_point: &OutPoint, _eager_load: bool) -> CellStatus { - if self.edges.get_input_ref(out_point).is_some() { - return CellStatus::Dead; - } - if let Some((output, data)) = self.get_output_with_data(out_point) { - let cell_meta = CellMetaBuilder::from_cell_output(output, data) - .out_point(out_point.to_owned()) - .build(); - CellStatus::live_cell(cell_meta) - } else { - CellStatus::Unknown - } - } -} - -impl CellChecker for PoolMap { - fn is_live(&self, out_point: &OutPoint) -> Option { - if self.edges.get_input_ref(out_point).is_some() { - return Some(false); - } - if self.get_output_with_data(out_point).is_some() { - return Some(true); - } - None - } -} diff --git a/tx-pool/src/component/sort_key.rs b/tx-pool/src/component/sort_key.rs new file mode 100644 index 0000000000..ceeab649bc --- /dev/null +++ b/tx-pool/src/component/sort_key.rs @@ -0,0 +1,78 @@ +use ckb_types::core::{Capacity, FeeRate}; +use std::cmp::Ordering; + +/// A struct to use as a sorted key +#[derive(Eq, PartialEq, Clone, Debug)] +pub struct AncestorsScoreSortKey { + pub fee: Capacity, + pub weight: u64, + pub ancestors_fee: Capacity, + pub ancestors_weight: u64, +} + +impl AncestorsScoreSortKey { + /// compare tx fee rate with ancestors fee rate and return the min one + pub(crate) fn min_fee_and_weight(&self) -> (Capacity, u64) { + // avoid division a_fee/a_weight > b_fee/b_weight + let tx_weight = u128::from(self.fee.as_u64()) * u128::from(self.ancestors_weight); + let ancestors_weight = u128::from(self.ancestors_fee.as_u64()) * u128::from(self.weight); + + if tx_weight < ancestors_weight { + (self.fee, self.weight) + } else { + (self.ancestors_fee, self.ancestors_weight) + } + } +} + +impl PartialOrd for AncestorsScoreSortKey { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for AncestorsScoreSortKey { + fn cmp(&self, other: &Self) -> Ordering { + // avoid division a_fee/a_weight > b_fee/b_weight + let (fee, weight) = self.min_fee_and_weight(); + let (other_fee, other_weight) = other.min_fee_and_weight(); + let self_weight = u128::from(fee.as_u64()) * u128::from(other_weight); + let other_weight = u128::from(other_fee.as_u64()) * u128::from(weight); + if self_weight == other_weight { + // if fee rate weight is same, then compare with ancestor weight + self.ancestors_weight.cmp(&other.ancestors_weight) + } else { + self_weight.cmp(&other_weight) + } + } +} + +/// First compare fee_rate, select the smallest fee_rate, +/// and then select the latest timestamp, for eviction, +/// the latest timestamp which also means that the fewer descendants may exist. +#[derive(Eq, PartialEq, Clone, Debug)] +pub struct EvictKey { + pub fee_rate: FeeRate, + pub timestamp: u64, + pub descendants_count: usize, +} + +impl PartialOrd for EvictKey { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for EvictKey { + fn cmp(&self, other: &Self) -> Ordering { + if self.fee_rate == other.fee_rate { + if self.descendants_count == other.descendants_count { + self.timestamp.cmp(&other.timestamp) + } else { + self.descendants_count.cmp(&other.descendants_count) + } + } else { + self.fee_rate.cmp(&other.fee_rate) + } + } +} diff --git a/tx-pool/src/component/tests/entry.rs b/tx-pool/src/component/tests/entry.rs index 8aa7edf3ff..6ae6708af2 100644 --- a/tx-pool/src/component/tests/entry.rs +++ b/tx-pool/src/component/tests/entry.rs @@ -1,7 +1,6 @@ +use crate::component::sort_key::EvictKey; use ckb_types::core::{Capacity, FeeRate}; -use crate::component::entry::EvictKey; - #[test] fn test_min_fee_and_weight_evict() { let mut result = vec![(500, 10, 30), (10, 10, 31), (100, 10, 32)] diff --git a/tx-pool/src/component/tests/score_key.rs b/tx-pool/src/component/tests/score_key.rs index 09475f3d19..c12cc7426c 100644 --- a/tx-pool/src/component/tests/score_key.rs +++ b/tx-pool/src/component/tests/score_key.rs @@ -1,12 +1,11 @@ use ckb_types::{ bytes::Bytes, core::{Capacity, TransactionBuilder}, - packed::{CellInput, OutPoint, ProposalShortId}, + packed::{CellInput, OutPoint}, prelude::*, }; -use std::mem::size_of; -use crate::component::{entry::TxEntry, pool_map::PoolMap, score_key::AncestorsScoreSortKey}; +use crate::component::{entry::TxEntry, pool_map::PoolMap, sort_key::AncestorsScoreSortKey}; const DEFAULT_MAX_ANCESTORS_COUNT: usize = 125; @@ -27,7 +26,6 @@ fn test_min_fee_and_weight() { let key = AncestorsScoreSortKey { fee: Capacity::shannons(fee), weight, - id: ProposalShortId::new([0u8; 10]), ancestors_fee: Capacity::shannons(ancestors_fee), ancestors_weight, }; @@ -51,7 +49,7 @@ fn test_min_fee_and_weight() { #[test] fn test_ancestors_sorted_key_order() { - let mut keys = vec![ + let table = vec![ (0, 0, 0, 0), (1, 0, 1, 0), (500, 10, 1000, 30), @@ -62,33 +60,39 @@ fn test_ancestors_sorted_key_order() { (std::u64::MAX, 0, std::u64::MAX, 0), (std::u64::MAX, 100, std::u64::MAX, 2000), (std::u64::MAX, std::u64::MAX, std::u64::MAX, std::u64::MAX), - ] - .into_iter() - .enumerate() - .map(|(i, (fee, weight, ancestors_fee, ancestors_weight))| { - let mut id = [0u8; 10]; - id[..size_of::()].copy_from_slice(&(i as u32).to_be_bytes()); - AncestorsScoreSortKey { - fee: Capacity::shannons(fee), - weight, - id: ProposalShortId::new(id), - ancestors_fee: Capacity::shannons(ancestors_fee), - ancestors_weight, - } - }) - .collect::>(); + ]; + let mut keys = table + .clone() + .into_iter() + .enumerate() + .map( + |(_i, (fee, weight, ancestors_fee, ancestors_weight))| AncestorsScoreSortKey { + fee: Capacity::shannons(fee), + weight, + ancestors_fee: Capacity::shannons(ancestors_fee), + ancestors_weight, + }, + ) + .collect::>(); keys.sort(); - assert_eq!( - keys.into_iter().map(|k| k.id).collect::>(), - [0, 3, 5, 9, 2, 4, 6, 8, 1, 7] - .iter() - .map(|&i| { - let mut id = [0u8; 10]; - id[..size_of::()].copy_from_slice(&(i as u32).to_be_bytes()); - ProposalShortId::new(id) - }) - .collect::>() - ); + let now = keys + .into_iter() + .map(|k| (k.fee, k.weight, k.ancestors_fee, k.ancestors_weight)) + .collect::>(); + let expect = [0, 3, 5, 9, 2, 4, 6, 8, 1, 7] + .iter() + .map(|&i| { + let key = table[i as usize]; + ( + Capacity::shannons(key.0), + key.1, + Capacity::shannons(key.2), + key.3, + ) + }) + .collect::>(); + + assert_eq!(now, expect); } #[test] diff --git a/tx-pool/src/lib.rs b/tx-pool/src/lib.rs index 48a2157679..d122177c01 100644 --- a/tx-pool/src/lib.rs +++ b/tx-pool/src/lib.rs @@ -8,6 +8,7 @@ mod component; pub mod error; mod persisted; pub mod pool; +mod pool_cell; mod process; pub mod service; mod util; diff --git a/tx-pool/src/pool.rs b/tx-pool/src/pool.rs index ae101e68a5..b7952f4838 100644 --- a/tx-pool/src/pool.rs +++ b/tx-pool/src/pool.rs @@ -6,15 +6,17 @@ use crate::callback::Callbacks; use crate::component::pool_map::{PoolEntry, PoolMap, Status}; use crate::component::recent_reject::RecentReject; use crate::error::Reject; +use crate::pool_cell::PoolCell; use ckb_app_config::TxPoolConfig; use ckb_logger::{debug, error, warn}; use ckb_snapshot::Snapshot; use ckb_store::ChainStore; +use ckb_types::core::CapacityError; use ckb_types::{ core::{ cell::{resolve_transaction, OverlayCellChecker, OverlayCellProvider, ResolvedTransaction}, tx_pool::{TxPoolEntryInfo, TxPoolIds}, - Cycle, TransactionView, UncleBlockView, + Capacity, Cycle, TransactionView, UncleBlockView, }, packed::{Byte32, ProposalShortId}, }; @@ -23,6 +25,7 @@ use std::collections::HashSet; use std::sync::Arc; const COMMITTED_HASH_CACHE_SIZE: usize = 100_000; +const MAX_REPLACEMENT_CANDIDATES: usize = 100; /// Tx-pool implementation pub struct TxPool { @@ -84,6 +87,44 @@ impl TxPool { self.total_tx_cycles += cycles; } + /// Check whether tx-pool enable RBF + pub fn enable_rbf(&self) -> bool { + self.config.min_rbf_rate > self.config.min_fee_rate + } + + /// The least required fee rate to allow tx to be replaced + pub fn min_replace_fee(&self, tx: &TxEntry) -> Option { + if !self.enable_rbf() { + return None; + } + let entry = vec![self.get_pool_entry(&tx.proposal_short_id()).unwrap()]; + self.calculate_min_replace_fee(&entry, tx.size) + } + + /// min_replace_fee = sum(replaced_txs.fee) + extra_rbf_fee + fn calculate_min_replace_fee(&self, conflicts: &[&PoolEntry], size: usize) -> Option { + let extra_rbf_fee = self.config.min_rbf_rate.fee(size as u64); + let replaced_sum_fee = conflicts + .iter() + .map(|c| c.inner.fee) + .try_fold(Capacity::zero(), |acc, x| acc.safe_add(x)); + let res = replaced_sum_fee.map_or(Err(CapacityError::Overflow), |sum| { + sum.safe_add(extra_rbf_fee) + }); + if let Ok(res) = res { + Some(res) + } else { + let fees = conflicts.iter().map(|c| c.inner.fee).collect::>(); + error!( + "conflicts: {:?} replaced_sum_fee {:?} overflow by add {}", + conflicts.iter().map(|e| e.id.clone()).collect::>(), + fees, + extra_rbf_fee + ); + None + } + } + /// Update size and cycles statics for remove tx /// cycles overflow is possible, currently obtaining cycles is not accurate pub fn update_statics_for_remove_tx(&mut self, tx_size: usize, cycles: Cycle) { @@ -287,7 +328,8 @@ impl TxPool { pub(crate) fn check_rtx_from_pool(&self, rtx: &ResolvedTransaction) -> Result<(), Reject> { let snapshot = self.snapshot(); - let checker = OverlayCellChecker::new(&self.pool_map, snapshot); + let pool_cell = PoolCell::new(&self.pool_map, false); + let checker = OverlayCellChecker::new(&pool_cell, snapshot); let mut seen_inputs = HashSet::new(); rtx.check(&mut seen_inputs, &checker, snapshot) .map_err(Reject::Resolve) @@ -296,9 +338,11 @@ impl TxPool { pub(crate) fn resolve_tx_from_pool( &self, tx: TransactionView, + rbf: bool, ) -> Result, Reject> { let snapshot = self.snapshot(); - let provider = OverlayCellProvider::new(&self.pool_map, snapshot); + let pool_cell = PoolCell::new(&self.pool_map, rbf); + let provider = OverlayCellProvider::new(&pool_cell, snapshot); let mut seen_inputs = HashSet::new(); resolve_transaction(tx, &mut seen_inputs, &provider, snapshot) .map(Arc::new) @@ -346,8 +390,6 @@ impl TxPool { let mut proposals = HashSet::with_capacity(limit); self.pool_map .fill_proposals(limit, exclusion, &mut proposals, Status::Pending); - self.pool_map - .fill_proposals(limit, exclusion, &mut proposals, Status::Gap); proposals } @@ -465,6 +507,110 @@ impl TxPool { (entries, size, cycles) } + pub(crate) fn check_rbf( + &self, + snapshot: &Snapshot, + rtx: &ResolvedTransaction, + conflict_ids: &HashSet, + fee: Capacity, + tx_size: usize, + ) -> Result<(), Reject> { + assert!(self.enable_rbf()); + assert!(!conflict_ids.is_empty()); + + let conflicts = conflict_ids + .iter() + .filter_map(|id| self.get_pool_entry(id)) + .collect::>(); + assert!(conflicts.len() == conflict_ids.len()); + + let short_id = rtx.transaction.proposal_short_id(); + // Rule #4, new tx's fee need to higher than min_rbf_fee computed from the tx_pool configuration + // Rule #3, new tx's fee need to higher than conflicts, here we only check the root tx + if let Some(min_replace_fee) = self.calculate_min_replace_fee(&conflicts, tx_size) { + if fee < min_replace_fee { + return Err(Reject::RBFRejected(format!( + "Tx's current fee is {}, expect it to >= {} to replace old txs", + fee, min_replace_fee, + ))); + } + } else { + return Err(Reject::RBFRejected( + "calculate_min_replace_fee failed".to_string(), + )); + } + + // Rule #2, new tx don't contain any new unconfirmed inputs + let mut inputs = HashSet::new(); + for c in conflicts.iter() { + inputs.extend(c.inner.transaction().input_pts_iter()); + } + + if rtx + .transaction + .input_pts_iter() + .any(|pt| !inputs.contains(&pt) && !snapshot.transaction_exists(&pt.tx_hash())) + { + return Err(Reject::RBFRejected( + "new Tx contains unconfirmed inputs".to_string(), + )); + } + + // Rule #5, the replaced tx's descendants can not more than 100 + // and the ancestor of the new tx don't have common set with the replaced tx's descendants + let mut replace_count: usize = 0; + let ancestors = self.pool_map.calc_ancestors(&short_id); + for conflict in conflicts.iter() { + let descendants = self.pool_map.calc_descendants(&conflict.id); + replace_count += descendants.len() + 1; + if replace_count > MAX_REPLACEMENT_CANDIDATES { + return Err(Reject::RBFRejected(format!( + "Tx conflict too many txs, conflict txs count: {}", + replace_count, + ))); + } + + if !descendants.is_disjoint(&ancestors) { + return Err(Reject::RBFRejected( + "Tx ancestors have common with conflict Tx descendants".to_string(), + )); + } + + let entries = descendants + .iter() + .filter_map(|id| self.get_pool_entry(id)) + .collect::>(); + + for entry in entries.iter() { + let hash = entry.inner.transaction().hash(); + if rtx + .transaction + .input_pts_iter() + .any(|pt| pt.tx_hash() == hash) + { + return Err(Reject::RBFRejected( + "new Tx contains inputs in descendants of to be replaced Tx".to_string(), + )); + } + } + + let mut entries_status = entries.iter().map(|e| e.status).collect::>(); + entries_status.push(conflict.status); + // Rule #6, all conflict Txs should be in `Pending` or `Gap` status + if entries_status + .iter() + .any(|s| ![Status::Pending, Status::Gap].contains(s)) + { + // Here we only refer to `Pending` status, since `Gap` is an internal status + return Err(Reject::RBFRejected( + "all conflict Txs should be in Pending status".to_string(), + )); + } + } + + Ok(()) + } + fn build_recent_reject(config: &TxPoolConfig) -> Option { if !config.recent_reject.as_os_str().is_empty() { let recent_reject_ttl = diff --git a/tx-pool/src/pool_cell.rs b/tx-pool/src/pool_cell.rs new file mode 100644 index 0000000000..4e70d44c82 --- /dev/null +++ b/tx-pool/src/pool_cell.rs @@ -0,0 +1,44 @@ +extern crate rustc_hash; +extern crate slab; +use crate::component::pool_map::PoolMap; +use ckb_types::core::cell::{CellChecker, CellMetaBuilder, CellProvider, CellStatus}; +use ckb_types::packed::OutPoint; + +pub(crate) struct PoolCell<'a> { + pub pool_map: &'a PoolMap, + pub rbf: bool, +} + +impl<'a> PoolCell<'a> { + pub fn new(pool_map: &'a PoolMap, rbf: bool) -> Self { + PoolCell { pool_map, rbf } + } +} + +impl<'a> CellProvider for PoolCell<'a> { + fn cell(&self, out_point: &OutPoint, _eager_load: bool) -> CellStatus { + if !self.rbf && self.pool_map.edges.get_input_ref(out_point).is_some() { + return CellStatus::Dead; + } + if let Some((output, data)) = self.pool_map.get_output_with_data(out_point) { + let cell_meta = CellMetaBuilder::from_cell_output(output, data) + .out_point(out_point.to_owned()) + .build(); + CellStatus::live_cell(cell_meta) + } else { + CellStatus::Unknown + } + } +} + +impl<'a> CellChecker for PoolCell<'a> { + fn is_live(&self, out_point: &OutPoint) -> Option { + if !self.rbf && self.pool_map.edges.get_input_ref(out_point).is_some() { + return Some(false); + } + if self.pool_map.get_output_with_data(out_point).is_some() { + return Some(true); + } + None + } +} diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index e921203aff..c386bd0524 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -19,6 +19,7 @@ use ckb_network::PeerIndex; use ckb_snapshot::Snapshot; use ckb_store::data_loader_wrapper::AsDataLoader; use ckb_store::ChainStore; +use ckb_types::core::error::OutPointError; use ckb_types::{ core::{cell::ResolvedTransaction, BlockView, Capacity, Cycle, HeaderView, TransactionView}, packed::{Byte32, ProposalShortId}, @@ -101,6 +102,7 @@ impl TxPoolService { pre_resolve_tip: Byte32, entry: TxEntry, mut status: TxStatus, + conflicts: HashSet, ) -> (Result<(), Reject>, Arc) { let (ret, snapshot) = self .with_tx_pool_write_lock(move |tx_pool, snapshot| { @@ -123,6 +125,19 @@ impl TxPoolService { time_relative_verify(snapshot, Arc::clone(&entry.rtx), tx_env)?; } + // try to remove conflicted tx here + for id in conflicts.iter() { + let removed = tx_pool.pool_map.remove_entry_and_descendants(id); + for old in removed { + let reject = Reject::RBFRejected(format!( + "replaced by {}", + entry.proposal_short_id() + )); + // remove old tx from tx_pool, not happened in service so we didn't call reject callbacks + // here we call them manually + self.callbacks.call_reject(tx_pool, &old, reject) + } + } _submit_entry(tx_pool, status, entry.clone(), &self.callbacks)?; Ok(()) }) @@ -202,13 +217,36 @@ impl TxPoolService { .with_tx_pool_read_lock(|tx_pool, snapshot| { let tip_hash = snapshot.tip_hash(); + // Same txid means exactly the same transaction, including inputs, outputs, witnesses, etc. + // It's also not possible for RBF, reject it directly check_txid_collision(tx_pool, tx)?; - let (rtx, status) = resolve_tx(tx_pool, &snapshot, tx.clone())?; - - let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?; - - Ok((tip_hash, rtx, status, fee, tx_size)) + // Try normal path first, if double-spending check success we don't need RBF check + // this make sure RBF won't introduce extra performance cost for hot path + let res = resolve_tx(tx_pool, &snapshot, tx.clone(), false); + match res { + Ok((rtx, status)) => { + let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?; + Ok((tip_hash, rtx, status, fee, tx_size, HashSet::new())) + } + Err(err) => { + if tx_pool.enable_rbf() + && matches!(err, Reject::Resolve(OutPointError::Dead(_))) + { + // Try RBF check + let conflicts = tx_pool.pool_map.find_conflict_tx(tx); + if conflicts.is_empty() { + return Err(err); + } + let (rtx, status) = resolve_tx(tx_pool, &snapshot, tx.clone(), true)?; + let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?; + tx_pool.check_rbf(&snapshot, &rtx, &conflicts, fee, tx_size)?; + Ok((tip_hash, rtx, status, fee, tx_size, conflicts)) + } else { + Err(err) + } + } + } }) .await; @@ -372,7 +410,12 @@ impl TxPoolService { }); } - if matches!(reject, Reject::Resolve(..) | Reject::Verification(..)) { + if matches!( + reject, + Reject::Resolve(..) + | Reject::Verification(..) + | Reject::RBFRejected(..) + ) { self.put_recent_reject(&tx_hash, reject).await; } } @@ -397,7 +440,12 @@ impl TxPoolService { }); } Err(reject) => { - if matches!(reject, Reject::Resolve(..) | Reject::Verification(..)) { + if matches!( + reject, + Reject::Resolve(..) + | Reject::Verification(..) + | Reject::RBFRejected(..) + ) { self.put_recent_reject(&tx_hash, reject).await; } } @@ -497,8 +545,12 @@ impl TxPoolService { tx_hash: orphan.tx.hash(), }); } - if matches!(reject, Reject::Resolve(..) | Reject::Verification(..)) - { + if matches!( + reject, + Reject::Resolve(..) + | Reject::Verification(..) + | Reject::RBFRejected(..) + ) { self.put_recent_reject(&orphan.tx.hash(), &reject).await; } } @@ -550,7 +602,8 @@ impl TxPoolService { let tx_hash = tx.hash(); let (ret, snapshot) = self.pre_check(&tx).await; - let (tip_hash, rtx, status, fee, tx_size) = try_or_return_with_snapshot!(ret, snapshot); + let (tip_hash, rtx, status, fee, tx_size, conflicts) = + try_or_return_with_snapshot!(ret, snapshot); if self.is_in_delay_window(&snapshot) { let mut delay = self.delay.write().await; @@ -634,7 +687,7 @@ impl TxPoolService { let entry = TxEntry::new(rtx, completed.cycles, fee, tx_size); - let (ret, submit_snapshot) = self.submit_entry(tip_hash, entry, status).await; + let (ret, submit_snapshot) = self.submit_entry(tip_hash, entry, status, conflicts).await; try_or_return_with_snapshot!(ret, submit_snapshot); self.notify_block_assembler(status).await; @@ -679,7 +732,8 @@ impl TxPoolService { let (ret, snapshot) = self.pre_check(&tx).await; - let (tip_hash, rtx, status, fee, tx_size) = try_or_return_with_snapshot!(ret, snapshot); + let (tip_hash, rtx, status, fee, tx_size, conflicts) = + try_or_return_with_snapshot!(ret, snapshot); if self.is_in_delay_window(&snapshot) { let mut delay = self.delay.write().await; @@ -715,7 +769,7 @@ impl TxPoolService { let entry = TxEntry::new(rtx, verified.cycles, fee, tx_size); - let (ret, submit_snapshot) = self.submit_entry(tip_hash, entry, status).await; + let (ret, submit_snapshot) = self.submit_entry(tip_hash, entry, status, conflicts).await; try_or_return_with_snapshot!(ret, submit_snapshot); self.notify_block_assembler(status).await; @@ -872,7 +926,7 @@ impl TxPoolService { for tx in txs { let tx_size = tx.data().serialized_size_in_block(); let tx_hash = tx.hash(); - if let Ok((rtx, status)) = resolve_tx(tx_pool, tx_pool.snapshot(), tx) { + if let Ok((rtx, status)) = resolve_tx(tx_pool, tx_pool.snapshot(), tx, false) { if let Ok(fee) = check_tx_fee(tx_pool, tx_pool.snapshot(), &rtx, tx_size) { let verify_cache = fetched_cache.get(&tx_hash).cloned(); let snapshot = tx_pool.cloned_snapshot(); @@ -949,36 +1003,50 @@ impl TxPoolService { } } -type PreCheckedTx = (Byte32, Arc, TxStatus, Capacity, usize); +type PreCheckedTx = ( + Byte32, // tip_hash + Arc, // rtx + TxStatus, // status + Capacity, // tx fee + usize, // tx size + // the conflicted txs, used for latter `check_rbf` + // the root txs for removing from `tx-pool` when RBF is checked + HashSet, +); type ResolveResult = Result<(Arc, TxStatus), Reject>; +fn get_tx_status(snapshot: &Snapshot, short_id: &ProposalShortId) -> TxStatus { + if snapshot.proposals().contains_proposed(short_id) { + TxStatus::Proposed + } else if snapshot.proposals().contains_gap(short_id) { + TxStatus::Gap + } else { + TxStatus::Fresh + } +} + fn check_rtx( tx_pool: &TxPool, snapshot: &Snapshot, rtx: &ResolvedTransaction, ) -> Result { let short_id = rtx.transaction.proposal_short_id(); - let tx_status = if snapshot.proposals().contains_proposed(&short_id) { - TxStatus::Proposed - } else if snapshot.proposals().contains_gap(&short_id) { - TxStatus::Gap - } else { - TxStatus::Fresh - }; + let tx_status = get_tx_status(snapshot, &short_id); tx_pool.check_rtx_from_pool(rtx).map(|_| tx_status) } -fn resolve_tx(tx_pool: &TxPool, snapshot: &Snapshot, tx: TransactionView) -> ResolveResult { +fn resolve_tx( + tx_pool: &TxPool, + snapshot: &Snapshot, + tx: TransactionView, + rbf: bool, +) -> ResolveResult { let short_id = tx.proposal_short_id(); - let tx_status = if snapshot.proposals().contains_proposed(&short_id) { - TxStatus::Proposed - } else if snapshot.proposals().contains_gap(&short_id) { - TxStatus::Gap - } else { - TxStatus::Fresh - }; - tx_pool.resolve_tx_from_pool(tx).map(|rtx| (rtx, tx_status)) + let tx_status = get_tx_status(snapshot, &short_id); + tx_pool + .resolve_tx_from_pool(tx, rbf) + .map(|rtx| (rtx, tx_status)) } fn _submit_entry( diff --git a/tx-pool/src/service.rs b/tx-pool/src/service.rs index 1a187615c8..9df681f071 100644 --- a/tx-pool/src/service.rs +++ b/tx-pool/src/service.rs @@ -779,26 +779,24 @@ async fn process(mut service: TxPoolService, message: Message) { .. }) = tx_pool.pool_map.get_by_id(&id) { - let trans_status = if status == &Status::Proposed { - TransactionWithStatus::with_proposed + let (tx_status, min_replace_fee) = if status == &Status::Proposed { + (TxStatus::Proposed, None) } else { - TransactionWithStatus::with_pending + (TxStatus::Pending, tx_pool.min_replace_fee(entry)) }; - Ok(trans_status( + Ok(TransactionWithStatus::with_status( Some(entry.transaction().clone()), entry.cycles, entry.timestamp, + tx_status, + Some(entry.fee), + min_replace_fee, )) } else if let Some(ref recent_reject_db) = tx_pool.recent_reject { - let recent_reject_result = recent_reject_db.get(&hash); - if let Ok(recent_reject) = recent_reject_result { - if let Some(record) = recent_reject { - Ok(TransactionWithStatus::with_rejected(record)) - } else { - Ok(TransactionWithStatus::with_unknown()) - } - } else { - Err(recent_reject_result.unwrap_err()) + match recent_reject_db.get(&hash) { + Ok(Some(record)) => Ok(TransactionWithStatus::with_rejected(record)), + Ok(_) => Ok(TransactionWithStatus::with_unknown()), + Err(err) => Err(err), } } else { Ok(TransactionWithStatus::with_unknown()) @@ -919,6 +917,7 @@ impl TxPoolService { total_tx_size: tx_pool.total_tx_size, total_tx_cycles: tx_pool.total_tx_cycles, min_fee_rate: self.tx_pool_config.min_fee_rate, + min_rbf_rate: self.tx_pool_config.min_rbf_rate, last_txs_updated_at: 0, tx_size_limit: TRANSACTION_SIZE_LIMIT, max_tx_pool_size: self.tx_pool_config.max_tx_pool_size as u64, diff --git a/util/app-config/src/configs/tx_pool.rs b/util/app-config/src/configs/tx_pool.rs index b71223ef7e..a24c7938de 100644 --- a/util/app-config/src/configs/tx_pool.rs +++ b/util/app-config/src/configs/tx_pool.rs @@ -14,6 +14,9 @@ pub struct TxPoolConfig { /// txs with lower fee rate than this will not be relayed or be mined #[serde(with = "FeeRateDef")] pub min_fee_rate: FeeRate, + /// txs need to pay larger fee rate than this for RBF + #[serde(with = "FeeRateDef")] + pub min_rbf_rate: FeeRate, /// tx pool rejects txs that cycles greater than max_tx_verify_cycles pub max_tx_verify_cycles: Cycle, /// max ancestors size limit for a single tx diff --git a/util/app-config/src/legacy/tx_pool.rs b/util/app-config/src/legacy/tx_pool.rs index bf82ecc1af..562f3ac6a6 100644 --- a/util/app-config/src/legacy/tx_pool.rs +++ b/util/app-config/src/legacy/tx_pool.rs @@ -7,6 +7,8 @@ use std::path::PathBuf; // default min fee rate, 1000 shannons per kilobyte const DEFAULT_MIN_FEE_RATE: FeeRate = FeeRate::from_u64(1000); +// default min rbf rate, 1500 shannons per kilobyte +const DEFAULT_MIN_RBF_RATE: FeeRate = FeeRate::from_u64(1500); // default max tx verify cycles const DEFAULT_MAX_TX_VERIFY_CYCLES: Cycle = TWO_IN_TWO_OUT_CYCLES * 20; // default max ancestors count @@ -33,6 +35,8 @@ pub(crate) struct TxPoolConfig { keep_rejected_tx_hashes_count: u64, #[serde(with = "FeeRateDef")] min_fee_rate: FeeRate, + #[serde(with = "FeeRateDef", default = "default_min_rbf_rate")] + min_rbf_rate: FeeRate, max_tx_verify_cycles: Cycle, max_ancestors_count: usize, #[serde(default)] @@ -59,6 +63,10 @@ fn default_max_tx_pool_size() -> usize { DEFAULT_MAX_TX_POOL_SIZE } +fn default_min_rbf_rate() -> FeeRate { + DEFAULT_MIN_RBF_RATE +} + impl Default for crate::TxPoolConfig { fn default() -> Self { TxPoolConfig::default().into() @@ -77,6 +85,7 @@ impl Default for TxPoolConfig { keep_rejected_tx_hashes_days: default_keep_rejected_tx_hashes_days(), keep_rejected_tx_hashes_count: default_keep_rejected_tx_hashes_count(), min_fee_rate: DEFAULT_MIN_FEE_RATE, + min_rbf_rate: DEFAULT_MIN_RBF_RATE, max_tx_verify_cycles: DEFAULT_MAX_TX_VERIFY_CYCLES, max_ancestors_count: DEFAULT_MAX_ANCESTORS_COUNT, persisted_data: Default::default(), @@ -98,6 +107,7 @@ impl From for crate::TxPoolConfig { keep_rejected_tx_hashes_days, keep_rejected_tx_hashes_count, min_fee_rate, + min_rbf_rate, max_tx_verify_cycles, max_ancestors_count, persisted_data, @@ -108,6 +118,7 @@ impl From for crate::TxPoolConfig { Self { max_tx_pool_size, min_fee_rate, + min_rbf_rate, max_tx_verify_cycles, max_ancestors_count: cmp::max(DEFAULT_MAX_ANCESTORS_COUNT, max_ancestors_count), keep_rejected_tx_hashes_days, diff --git a/util/jsonrpc-types/src/blockchain.rs b/util/jsonrpc-types/src/blockchain.rs index bf40d24f8e..3700c01cc1 100644 --- a/util/jsonrpc-types/src/blockchain.rs +++ b/util/jsonrpc-types/src/blockchain.rs @@ -540,6 +540,10 @@ pub struct TransactionWithStatusResponse { pub time_added_to_pool: Option, /// The Transaction status. pub tx_status: TxStatus, + /// The transaction fee of the transaction + pub fee: Option, + /// The minimal fee required to replace this transaction + pub min_replace_fee: Option, } impl TransactionWithStatusResponse { @@ -554,6 +558,8 @@ impl TransactionWithStatusResponse { tx_status: t.tx_status.into(), cycles: t.cycles.map(Into::into), time_added_to_pool: t.time_added_to_pool.map(Into::into), + fee: t.fee.map(Into::into), + min_replace_fee: t.min_replace_fee.map(Into::into), }, ResponseFormatInnerType::Json => TransactionWithStatusResponse { transaction: t @@ -562,6 +568,8 @@ impl TransactionWithStatusResponse { tx_status: t.tx_status.into(), cycles: t.cycles.map(Into::into), time_added_to_pool: t.time_added_to_pool.map(Into::into), + fee: t.fee.map(Into::into), + min_replace_fee: t.min_replace_fee.map(Into::into), }, } } @@ -602,8 +610,8 @@ impl From for TxStatus { tx_pool::TxStatus::Pending => TxStatus::pending(), tx_pool::TxStatus::Proposed => TxStatus::proposed(), tx_pool::TxStatus::Committed(hash) => TxStatus::committed(hash), - tx_pool::TxStatus::Unknown => TxStatus::unknown(), tx_pool::TxStatus::Rejected(reason) => TxStatus::rejected(reason), + tx_pool::TxStatus::Unknown => TxStatus::unknown(), } } } diff --git a/util/jsonrpc-types/src/pool.rs b/util/jsonrpc-types/src/pool.rs index 4a18c73e7c..e13918e857 100644 --- a/util/jsonrpc-types/src/pool.rs +++ b/util/jsonrpc-types/src/pool.rs @@ -40,6 +40,11 @@ pub struct TxPoolInfo { /// /// The unit is Shannons per 1000 bytes transaction serialization size in the block. pub min_fee_rate: Uint64, + /// RBF rate threshold. The pool reject to replace for transactions which fee rate is below this threshold. + /// if min_rbf_rate > min_fee_rate then RBF is enabled on the node. + /// + /// The unit is Shannons per 1000 bytes transaction serialization size in the block. + pub min_rbf_rate: Uint64, /// Last updated time. This is the Unix timestamp in milliseconds. pub last_txs_updated_at: Timestamp, /// Limiting transactions to tx_size_limit @@ -63,6 +68,7 @@ impl From for TxPoolInfo { total_tx_size: (tx_pool_info.total_tx_size as u64).into(), total_tx_cycles: tx_pool_info.total_tx_cycles.into(), min_fee_rate: tx_pool_info.min_fee_rate.as_u64().into(), + min_rbf_rate: tx_pool_info.min_rbf_rate.as_u64().into(), last_txs_updated_at: tx_pool_info.last_txs_updated_at.into(), tx_size_limit: tx_pool_info.tx_size_limit.into(), max_tx_pool_size: tx_pool_info.max_tx_pool_size.into(), @@ -241,6 +247,9 @@ pub enum PoolTransactionReject { /// Transaction expired Expiry(String), + + /// RBF rejected + RBFRejected(String), } impl From for PoolTransactionReject { @@ -260,6 +269,7 @@ impl From for PoolTransactionReject { Reject::Resolve(_) => Self::Resolve(format!("{reject}")), Reject::Verification(_) => Self::Verification(format!("{reject}")), Reject::Expiry(_) => Self::Expiry(format!("{reject}")), + Reject::RBFRejected(_) => Self::RBFRejected(format!("{reject}")), } } } diff --git a/util/launcher/src/shared_builder.rs b/util/launcher/src/shared_builder.rs index 96d750a198..e1ad368a24 100644 --- a/util/launcher/src/shared_builder.rs +++ b/util/launcher/src/shared_builder.rs @@ -463,7 +463,7 @@ fn register_tx_pool_callback(tx_pool_builder: &mut TxPoolServiceBuilder, notify: let tx_hash = entry.transaction().hash(); // record recent reject - if matches!(reject, Reject::Resolve(..)) { + if matches!(reject, Reject::Resolve(..) | Reject::RBFRejected(..)) { if let Some(ref mut recent_reject) = tx_pool.recent_reject { if let Err(e) = recent_reject.put(&tx_hash, reject.clone()) { error!("record recent_reject failed {} {} {}", tx_hash, reject, e); diff --git a/util/types/src/core/tx_pool.rs b/util/types/src/core/tx_pool.rs index 479e4c373e..88977331d0 100644 --- a/util/types/src/core/tx_pool.rs +++ b/util/types/src/core/tx_pool.rs @@ -55,6 +55,10 @@ pub enum Reject { /// Expired #[error("Expiry transaction, timestamp {0}")] Expiry(u64), + + /// RBF rejected + #[error("RBF rejected: {0}")] + RBFRejected(String), } fn is_malformed_from_verification(error: &Error) -> bool { @@ -168,33 +172,28 @@ pub struct TransactionWithStatus { pub tx_status: TxStatus, /// The transaction verification consumed cycles pub cycles: Option, + /// The transaction fee of the transaction + pub fee: Option, + /// The minimal fee required to replace this transaction + pub min_replace_fee: Option, /// If the transaction is in tx-pool, `time_added_to_pool` represent when it enters the tx-pool. unit: Millisecond pub time_added_to_pool: Option, } impl TransactionWithStatus { - /// Build with pending status - pub fn with_pending( + /// Build with tx status + pub fn with_status( tx: Option, cycles: core::Cycle, time_added_to_pool: u64, + tx_status: TxStatus, + fee: Option, + min_replace_fee: Option, ) -> Self { Self { - tx_status: TxStatus::Pending, - transaction: tx, - cycles: Some(cycles), - time_added_to_pool: Some(time_added_to_pool), - } - } - - /// Build with proposed status - pub fn with_proposed( - tx: Option, - cycles: core::Cycle, - time_added_to_pool: u64, - ) -> Self { - Self { - tx_status: TxStatus::Proposed, + tx_status, + fee, + min_replace_fee, transaction: tx, cycles: Some(cycles), time_added_to_pool: Some(time_added_to_pool), @@ -206,11 +205,14 @@ impl TransactionWithStatus { tx: Option, hash: H256, cycles: Option, + fee: Option, ) -> Self { Self { tx_status: TxStatus::Committed(hash), transaction: tx, cycles, + fee, + min_replace_fee: None, time_added_to_pool: None, } } @@ -221,6 +223,8 @@ impl TransactionWithStatus { tx_status: TxStatus::Rejected(reason), transaction: None, cycles: None, + fee: None, + min_replace_fee: None, time_added_to_pool: None, } } @@ -231,6 +235,8 @@ impl TransactionWithStatus { tx_status: TxStatus::Unknown, transaction: None, cycles: None, + fee: None, + min_replace_fee: None, time_added_to_pool: None, } } @@ -241,6 +247,8 @@ impl TransactionWithStatus { tx_status, transaction: None, cycles, + fee: None, + min_replace_fee: None, time_added_to_pool: None, } } @@ -318,6 +326,13 @@ pub struct TxPoolInfo { /// /// The unit is Shannons per 1000 bytes transaction serialization size in the block. pub min_fee_rate: FeeRate, + + /// Min RBF rate threshold. The pool reject RBF transactions which fee rate is below this threshold. + /// if min_rbf_rate > min_fee_rate then RBF is enabled on the node. + /// + /// The unit is Shannons per 1000 bytes transaction serialization size in the block. + pub min_rbf_rate: FeeRate, + /// Last updated time. This is the Unix timestamp in milliseconds. pub last_txs_updated_at: u64, /// Limiting transactions to tx_size_limit