diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 9ff256af5a..dd297a25a0 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -11,7 +11,7 @@ use ckb_shared::error::SharedError; use ckb_shared::index::ChainIndex; use ckb_shared::shared::{ChainProvider, ChainState, Shared}; use ckb_shared::txo_set::TxoSetDiff; -use ckb_verification::{verify_transactions, BlockVerifier, Verifier}; +use ckb_verification::{BlockVerifier, TransactionsVerifier, Verifier}; use crossbeam_channel::{self, select, Receiver, Sender}; use faketime::unix_time_as_millis; use fnv::{FnvHashMap, FnvHashSet}; @@ -455,37 +455,38 @@ impl ChainService { push_new(b, &mut new_inputs, &mut new_outputs); } - let max_cycles = self.shared.consensus().max_block_cycles(); - let mut txs_cycles = self.shared.txs_cycles().write(); + let mut txs_cache = self.shared.txs_verify_cache().write(); // The verify function - let mut verify = - |b, new_inputs: &FnvHashSet, new_outputs: &FnvHashMap| -> bool { - verify_transactions(b, max_cycles, &mut *txs_cycles, |op| { - self.shared.cell_at(op, |op| { - if new_inputs.contains(op) { - Some(true) - } else if let Some(x) = new_outputs.get(&op.hash) { - if op.index < (*x as u32) { - Some(false) - } else { - Some(true) - } - } else if old_outputs.contains(&op.hash) { - None - } else { - chain_state - .is_spent(op) - .map(|x| x && !old_inputs.contains(op)) - } - }) - }) - .is_ok() - }; + let txs_verifier = TransactionsVerifier::new(self.shared.consensus().max_block_cycles()); let mut found_error = false; // verify transaction for (ext, b) in fork.open_exts.iter_mut().zip(fork.new_blocks.iter()).rev() { - if !found_error || skip_verify || verify(b, &new_inputs, &new_outputs) { + let cell_resolver = |op: &OutPoint| { + self.shared.cell_at(op, |op| { + if new_inputs.contains(op) { + Some(true) + } else if let Some(x) = new_outputs.get(&op.hash) { + if op.index < (*x as u32) { + Some(false) + } else { + Some(true) + } + } else if old_outputs.contains(&op.hash) { + None + } else { + chain_state + .is_spent(op) + .map(|x| x && !old_inputs.contains(op)) + } + }) + }; + if !found_error + || skip_verify + || txs_verifier + .verify(&mut *txs_cache, b, cell_resolver) + .is_ok() + { push_new(b, &mut new_inputs, &mut new_outputs); ext.valid = Some(true); } else { diff --git a/miner/src/block_assembler.rs b/miner/src/block_assembler.rs index 69f9c1c1d2..44465ee6f5 100644 --- a/miner/src/block_assembler.rs +++ b/miner/src/block_assembler.rs @@ -217,7 +217,7 @@ impl BlockAssembler { TransactionTemplate { hash: tx.transaction.hash(), required, - cycles: Some(tx.cycles), + cycles: tx.cycles, depends, data: (&tx.transaction).into(), } diff --git a/nodes_template/default.json b/nodes_template/default.json index 8279eafc04..3e8f65076c 100644 --- a/nodes_template/default.json +++ b/nodes_template/default.json @@ -49,5 +49,5 @@ "block_assembler": { "type_hash": "0x0da2fe99fe549e082d4ed483c2e968a89ea8d11aabf5d79e5cbf06522de6e674" }, - "cycles_cache_size": 100000 + "txs_verify_cache_size": 100000 } diff --git a/pool/src/tests/pool.rs b/pool/src/tests/pool.rs index e216b6272d..204a4fba3e 100644 --- a/pool/src/tests/pool.rs +++ b/pool/src/tests/pool.rs @@ -9,7 +9,6 @@ use ckb_core::extras::BlockExt; use ckb_core::header::HeaderBuilder; use ckb_core::script::Script; use ckb_core::transaction::*; -use ckb_core::Cycle; use ckb_db::memorydb::MemoryKeyValueDB; use ckb_notify::{ForkBlocks, MsgSwitchFork, NotifyService}; use ckb_shared::index::ChainIndex; @@ -208,10 +207,7 @@ pub fn test_cellbase_spent() { .output(CellOutput::new(50000, Vec::new(), H256::default(), None)) .build(); - match pool - .service - .add_to_pool(PoolEntry::new(valid_tx, 0, Cycle::default())) - { + match pool.service.add_to_pool(PoolEntry::new(valid_tx, 0, None)) { Ok(_) => {} Err(err) => panic!( "Unexpected error while adding a valid transaction: {:?}", @@ -767,7 +763,7 @@ fn test_transaction_with_capacity( .outputs(outputs) .build(); - PoolEntry::new(tx, 0, Cycle::default()) + PoolEntry::new(tx, 0, None) } // Since the main point here is to test pool functionality, not scripting diff --git a/pool/src/txs_pool/pool.rs b/pool/src/txs_pool/pool.rs index 75b15186fa..f788a63dea 100644 --- a/pool/src/txs_pool/pool.rs +++ b/pool/src/txs_pool/pool.rs @@ -361,25 +361,17 @@ where } //readd txs - let mut txs_cycles = self.shared.txs_cycles().write(); + let mut txs_cache = self.shared.txs_verify_cache().write(); for tx in b.commit_transactions().iter().rev() { if tx.is_cellbase() { continue; } - let tx_hash = tx.hash(); - let cycles = match txs_cycles.get(&tx_hash).cloned() { - Some(cycles) => cycles, - None => { - let rtx = self.resolve_transaction(&tx); - // TODO: remove unwrap, remove transactions that depend on it. - let cycles = TransactionVerifier::new(&rtx) - .verify(self.shared.consensus().max_block_cycles()) - .map_err(PoolError::InvalidTx) - .unwrap(); - txs_cycles.insert(tx_hash, cycles); - cycles - } - }; + let rtx = self.resolve_transaction(&tx); + // TODO: remove unwrap, remove transactions that depend on it. + let cycles = self + .verify_transaction(&rtx, &mut txs_cache) + .map_err(PoolError::InvalidTx) + .unwrap(); self.pool.readd_transaction(tx, cycles); } } @@ -448,7 +440,7 @@ where &mut self, tx: Transaction, ) -> Result { - let tx = PoolEntry::new(tx, 0, Cycle::default()); + let tx = PoolEntry::new(tx, 0, None); match { self.proposed.insert(tx) } { TxStage::Mineable(x) => self.add_to_pool(x), TxStage::Unknown(x) => { @@ -464,7 +456,7 @@ where tx: Transaction, ) -> Result { let tx_hash = tx.hash(); - let tx = PoolEntry::new(tx, 0, Cycle::default()); + let tx = PoolEntry::new(tx, 0, None); match { self.proposed.insert(tx) } { TxStage::Mineable(x) => self.add_to_pool(x), TxStage::Unknown(x) => { @@ -510,15 +502,24 @@ where self.pool.get_mineable_transactions(self.pool.size()) } - fn verify_transaction(&self, rtx: &ResolvedTransaction) -> Result { - let mut txs_cycles = self.shared.txs_cycles().write(); + fn verify_transaction( + &self, + rtx: &ResolvedTransaction, + txs_cache: &mut Option>, + ) -> Result { let tx_hash = rtx.transaction.hash(); - match txs_cycles.get(&tx_hash).cloned() { + match txs_cache + .as_ref() + .and_then(|cache| cache.get(&tx_hash).cloned()) + { Some(cycles) => Ok(cycles), None => { let cycles = TransactionVerifier::new(&rtx) .verify(self.shared.consensus().max_block_cycles())?; - txs_cycles.insert(tx_hash, cycles); + // write cache + txs_cache + .as_mut() + .and_then(|cache| cache.insert(tx_hash, cycles)); Ok(cycles) } } @@ -570,12 +571,14 @@ where } } - if unknowns.is_empty() && pe.cycles != Cycle::default() { + if unknowns.is_empty() && pe.cycles.is_none() { // TODO: Parallel + + let mut txs_cache = self.shared.txs_verify_cache().write(); let cycles = self - .verify_transaction(&rtx) + .verify_transaction(&rtx, &mut txs_cache) .map_err(PoolError::InvalidTx)?; - pe.cycles = cycles; + pe.cycles = Some(cycles); } } @@ -604,12 +607,14 @@ where pub(crate) fn reconcile_orphan(&mut self, tx: &Transaction) { let pes = self.orphan.reconcile_transaction(tx); + let mut txs_cache = self.shared.txs_verify_cache().write(); for mut pe in pes { - let rs = if pe.cycles == Cycle::default() { - let rtx = self.resolve_transaction(&pe.transaction); - self.verify_transaction(&rtx) - } else { - Ok(pe.cycles) + let verify_result = match pe.cycles { + Some(cycles) => Ok(cycles), + None => { + let rtx = self.resolve_transaction(&pe.transaction); + self.verify_transaction(&rtx, &mut txs_cache) + } }; if self.config.trace_enable() { @@ -617,18 +622,22 @@ where &tx.hash(), format!( "removed from orphan, prepare add to commit, verify result {:?}", - rs + verify_result ), ); } - if let Ok(cycles) = rs { - pe.cycles = cycles; - self.last_txs_updated_at - .store(unix_time_as_millis() as usize, Ordering::SeqCst); - self.pool.add_transaction(pe); - } else if rs == Err(TransactionError::DoubleSpent) { - self.cache.insert(pe.transaction.proposal_short_id(), pe); + match verify_result { + Ok(cycles) => { + pe.cycles = Some(cycles); + self.last_txs_updated_at + .store(unix_time_as_millis() as usize, Ordering::SeqCst); + self.pool.add_transaction(pe); + } + Err(TransactionError::DoubleSpent) => { + self.cache.insert(pe.transaction.proposal_short_id(), pe); + } + _ => (), } } } diff --git a/pool/src/txs_pool/types.rs b/pool/src/txs_pool/types.rs index df969b9c14..836c6599cd 100644 --- a/pool/src/txs_pool/types.rs +++ b/pool/src/txs_pool/types.rs @@ -103,12 +103,12 @@ pub struct PoolEntry { /// Bytes size pub bytes_size: usize, /// Cycles - pub cycles: Cycle, + pub cycles: Option, } impl PoolEntry { /// Create new transaction pool entry - pub fn new(tx: Transaction, count: usize, cycles: Cycle) -> PoolEntry { + pub fn new(tx: Transaction, count: usize, cycles: Option) -> PoolEntry { PoolEntry { bytes_size: tx.occupied_capacity(), transaction: tx, @@ -353,7 +353,7 @@ impl Pool { self.vertices.insert_front( tx.proposal_short_id(), - PoolEntry::new(tx.clone(), 0, cycles), + PoolEntry::new(tx.clone(), 0, Some(cycles)), ); for i in inputs { @@ -863,7 +863,7 @@ mod tests { ) .build(); - PoolEntry::new(tx, 0, Cycle::default()) + PoolEntry::new(tx, 0, None) } #[test] diff --git a/shared/src/lib.rs b/shared/src/lib.rs index 7b741ef97b..6e8220992c 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -32,4 +32,3 @@ pub const COLUMN_EXT: Col = Some(7); pub const COLUMN_BLOCK_TRANSACTION_ADDRESSES: Col = Some(9); pub const COLUMN_BLOCK_TRANSACTION_IDS: Col = Some(10); pub const COLUMN_BLOCK_PROPOSAL_IDS: Col = Some(11); -pub const COLUMN_TRANSACTION_CYCLES: Col = Some(12); diff --git a/shared/src/shared.rs b/shared/src/shared.rs index 880187acf3..a933bf9ae4 100644 --- a/shared/src/shared.rs +++ b/shared/src/shared.rs @@ -77,7 +77,7 @@ impl ChainState { pub struct Shared { store: Arc, chain_state: Arc>, - txs_cycles: Arc>>, + txs_verify_cache: Arc>>>, consensus: Arc, } @@ -87,14 +87,18 @@ impl ::std::clone::Clone for Shared { Shared { store: Arc::clone(&self.store), chain_state: Arc::clone(&self.chain_state), - txs_cycles: Arc::clone(&self.txs_cycles), + txs_verify_cache: Arc::clone(&self.txs_verify_cache), consensus: Arc::clone(&self.consensus), } } } impl Shared { - pub fn new(store: CI, consensus: Consensus, cache_size: usize) -> Self { + pub fn new( + store: CI, + consensus: Consensus, + txs_verify_cache: Arc>>>, + ) -> Self { let chain_state = { // check head in store or save the genesis block as head let header = { @@ -122,12 +126,10 @@ impl Shared { ))) }; - let txs_cycles = Arc::new(RwLock::new(LruCache::new(cache_size))); - Shared { store: Arc::new(store), chain_state, - txs_cycles, + txs_verify_cache, consensus: Arc::new(consensus), } } @@ -140,8 +142,8 @@ impl Shared { &self.store } - pub fn txs_cycles(&self) -> &RwLock> { - &self.txs_cycles + pub fn txs_verify_cache(&self) -> &RwLock>> { + &self.txs_verify_cache } pub fn init_txo_set(store: &CI, number: u64) -> TxoSet { @@ -446,7 +448,7 @@ impl BlockMedianTimeContext for Shared { pub struct SharedBuilder { db: Option, consensus: Option, - cycles_cache_size: usize, + txs_verify_cache_size: usize, } impl Default for SharedBuilder { @@ -454,7 +456,7 @@ impl Default for SharedBuilder { SharedBuilder { db: None, consensus: None, - cycles_cache_size: 100_000, + txs_verify_cache_size: 100_000, } } } @@ -464,7 +466,7 @@ impl SharedBuilder { SharedBuilder { db: Some(MemoryKeyValueDB::open(COLUMNS as usize)), consensus: None, - cycles_cache_size: 100_000, + txs_verify_cache_size: 100_000, } } } @@ -489,14 +491,18 @@ impl SharedBuilder { self } - pub fn cycles_cache_size(mut self, value: usize) -> Self { - self.cycles_cache_size = value; + pub fn txs_verify_cache_size(mut self, value: usize) -> Self { + self.txs_verify_cache_size = value; self } pub fn build(self) -> Shared> { let store = ChainKVStore::new(self.db.unwrap()); let consensus = self.consensus.unwrap_or_else(Consensus::default); - Shared::new(store, consensus, self.cycles_cache_size) + Shared::new( + store, + consensus, + Arc::new(RwLock::new(Some(LruCache::new(self.txs_verify_cache_size)))), + ) } } diff --git a/src/cli/run_impl.rs b/src/cli/run_impl.rs index 4807d9886d..d1afdb99a8 100644 --- a/src/cli/run_impl.rs +++ b/src/cli/run_impl.rs @@ -29,7 +29,7 @@ pub fn run(setup: Setup) { let shared = SharedBuilder::>::default() .consensus(consensus) .db(&setup.configs.db) - .cycles_cache_size(setup.configs.cycles_cache_size) + .txs_verify_cache_size(setup.configs.txs_verify_cache_size) .build(); let notify = NotifyService::default().start(Some("notify")); diff --git a/src/setup.rs b/src/setup.rs index e294f30749..3d3126f91c 100644 --- a/src/setup.rs +++ b/src/setup.rs @@ -39,7 +39,7 @@ pub struct Configs { pub block_assembler: BlockAssemblerConfig, pub sync: SyncConfig, pub pool: PoolConfig, - pub cycles_cache_size: usize, + pub txs_verify_cache_size: usize, } pub fn get_config_path(matches: &ArgMatches) -> PathBuf { diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index 347742e3c0..be81d87df6 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -1212,7 +1212,7 @@ mod tests { fetched_blocks.push(shared2.block(block_hash).unwrap()); } - let fork_receiver = notify.subscribe_switch_fork("for_receiver"); + let fork_receiver = notify.subscribe_switch_fork("fork_receiver"); for block in &fetched_blocks { let fbb = &mut FlatBufferBuilder::new(); diff --git a/verification/src/block_verifier.rs b/verification/src/block_verifier.rs index 924b679d4c..b93a6b1c2b 100644 --- a/verification/src/block_verifier.rs +++ b/verification/src/block_verifier.rs @@ -388,72 +388,89 @@ impl UnclesVerifier { } } -pub fn verify_transactions CellStatus>( - block: &Block, +#[derive(Clone)] +pub struct TransactionsVerifier { max_cycles: Cycle, - txs_cycles: &mut LruCache, - cell: F, -) -> Result<(), Error> { - let mut output_indexs = FnvHashMap::default(); - let mut seen_inputs = FnvHashSet::default(); - - for (i, tx) in block.commit_transactions().iter().enumerate() { - output_indexs.insert(tx.hash(), i); +} + +impl TransactionsVerifier { + pub fn new(max_cycles: Cycle) -> Self { + TransactionsVerifier { max_cycles } } - // skip first tx, assume the first is cellbase, other verifier will verify cellbase - let resolved: Vec = block - .commit_transactions() - .iter() - .skip(1) - .map(|x| { - resolve_transaction(x, &mut seen_inputs, |o| { - if let Some(i) = output_indexs.get(&o.hash) { - match block.commit_transactions()[*i] - .outputs() - .get(o.index as usize) - { - Some(x) => CellStatus::Live(x.clone()), - None => CellStatus::Unknown, + pub fn verify CellStatus>( + &self, + txs_verify_cache: &mut Option>, + block: &Block, + cell_resolver: F, + ) -> Result<(), Error> { + let mut output_indexs = FnvHashMap::default(); + let mut seen_inputs = FnvHashSet::default(); + + for (i, tx) in block.commit_transactions().iter().enumerate() { + output_indexs.insert(tx.hash(), i); + } + + // skip first tx, assume the first is cellbase, other verifier will verify cellbase + let resolved: Vec = block + .commit_transactions() + .iter() + .skip(1) + .map(|x| { + resolve_transaction(x, &mut seen_inputs, |o| { + if let Some(i) = output_indexs.get(&o.hash) { + match block.commit_transactions()[*i] + .outputs() + .get(o.index as usize) + { + Some(x) => CellStatus::Live(x.clone()), + None => CellStatus::Unknown, + } + } else { + (cell_resolver)(o) } + }) + }) + .collect(); + + // make verifiers orthogonal + // + let cycles_set = resolved + .par_iter() + .enumerate() + .map(|(index, tx)| { + if let Some(cycles) = txs_verify_cache + .as_ref() + .and_then(|cache| cache.get(&tx.transaction.hash())) + { + InputVerifier::new(&tx) + .verify() + .map_err(|e| Error::Transactions((index, e))) + .map(|_| (None, *cycles)) } else { - cell(o) + TransactionVerifier::new(&tx) + .verify(self.max_cycles) + .map_err(|e| Error::Transactions((index, e))) + .map(|cycles| (Some(tx.transaction.hash()), cycles)) } }) - }) - .collect(); - - // make verifiers orthogonal - let cycles_set = resolved - .par_iter() - .enumerate() - .map(|(index, tx)| { - if let Some(cycles) = txs_cycles.get(&tx.transaction.hash()) { - InputVerifier::new(&tx) - .verify() - .map_err(|e| Error::Transactions((index, e))) - .map(|_| (None, *cycles)) - } else { - TransactionVerifier::new(&tx) - .verify(max_cycles) - .map_err(|e| Error::Transactions((index, e))) - .map(|cycles| (Some(tx.transaction.hash()), cycles)) - } - }) - .collect::, _>>()?; + .collect::, _>>()?; - let sum: Cycle = cycles_set.iter().map(|(_, cycles)| cycles).sum(); + let sum: Cycle = cycles_set.iter().map(|(_, cycles)| cycles).sum(); - for (hash, cycles) in cycles_set { - if let Some(h) = hash { - txs_cycles.insert(h, cycles); + for (hash, cycles) in cycles_set { + if let Some(h) = hash { + txs_verify_cache + .as_mut() + .map(|cache| cache.insert(h, cycles)); + } } - } - if sum > max_cycles { - Err(Error::ExceededMaximumCycles) - } else { - Ok(()) + if sum > self.max_cycles { + Err(Error::ExceededMaximumCycles) + } else { + Ok(()) + } } } diff --git a/verification/src/lib.rs b/verification/src/lib.rs index 66e294d246..cce0ef1359 100644 --- a/verification/src/lib.rs +++ b/verification/src/lib.rs @@ -7,7 +7,7 @@ mod transaction_verifier; #[cfg(test)] mod tests; -pub use crate::block_verifier::{verify_transactions, BlockVerifier, HeaderResolverWrapper}; +pub use crate::block_verifier::{BlockVerifier, HeaderResolverWrapper, TransactionsVerifier}; pub use crate::error::{Error, TransactionError}; pub use crate::header_verifier::{HeaderResolver, HeaderVerifier}; pub use crate::transaction_verifier::{InputVerifier, TransactionVerifier};