Skip to content

Commit

Permalink
refactor: avoid multiple lock
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangsoledad committed Mar 8, 2019
1 parent 79cec0a commit d51c197
Show file tree
Hide file tree
Showing 26 changed files with 506 additions and 515 deletions.
46 changes: 36 additions & 10 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use ckb_core::block::Block;
use ckb_core::cell::CellProvider;
use ckb_core::cell::{resolve_transaction, CellProvider, CellStatus, ResolvedTransaction};
use ckb_core::extras::BlockExt;
use ckb_core::service::{Request, DEFAULT_CHANNEL_SIZE, SIGNAL_CHANNEL_SIZE};
use ckb_core::transaction::{OutPoint, ProposalShortId};
Expand Down Expand Up @@ -177,7 +177,7 @@ impl<CI: ChainIndex + 'static> ChainService<CI> {

let mut txo_set_diff = TxoSetDiff::default();
let mut fork = ForkChanges::default();
let mut chain_state = self.shared.chain_state().write();
let mut chain_state = self.shared.chain_state().lock();
let tip_number = chain_state.tip_number();
let tip_hash = chain_state.tip_hash();
let parent_ext = self
Expand Down Expand Up @@ -221,7 +221,7 @@ impl<CI: ChainIndex + 'static> ChainService<CI> {
);

self.find_fork(&mut fork, tip_number, &block, ext);
txo_set_diff = self.reconcile_main_chain(batch, &mut fork, &chain_state)?;
txo_set_diff = self.reconcile_main_chain(batch, &mut fork, &mut chain_state)?;
self.update_index(batch, &fork.detached_blocks, &fork.attached_blocks);
self.update_proposal_ids(&mut chain_state, &fork);
self.shared
Expand All @@ -244,11 +244,11 @@ impl<CI: ChainIndex + 'static> ChainService<CI> {
let detached_proposal_id = chain_state.reconstruct_proposal_ids(tip_header.number());
fork.detached_proposal_id = detached_proposal_id;
chain_state.update_tip(tip_header, total_difficulty, txo_set_diff);
self.shared.reconcile_tx_pool(
&chain_state,
chain_state.update_tx_pool_for_reorg(
fork.detached_blocks(),
fork.attached_blocks(),
fork.detached_proposal_id(),
self.shared.consensus().max_block_cycles(),
);
if log_enabled!(target: "chain", log::Level::Debug) {
self.print_chain(&chain_state, 10);
Expand All @@ -260,7 +260,7 @@ impl<CI: ChainIndex + 'static> ChainService<CI> {
Ok(())
}

pub(crate) fn update_proposal_ids(&self, chain_state: &mut ChainState, fork: &ForkChanges) {
pub(crate) fn update_proposal_ids(&self, chain_state: &mut ChainState<CI>, fork: &ForkChanges) {
for blk in fork.attached_blocks() {
chain_state.update_proposal_ids(&blk);
}
Expand Down Expand Up @@ -399,7 +399,7 @@ impl<CI: ChainIndex + 'static> ChainService<CI> {
&self,
batch: &mut Batch,
fork: &mut ForkChanges,
chain_state: &ChainState,
chain_state: &mut ChainState<CI>,
) -> Result<TxoSetDiff, FailureError> {
let skip_verify = !self.verification;

Expand Down Expand Up @@ -440,7 +440,6 @@ impl<CI: ChainIndex + 'static> ChainService<CI> {
push_new(b, &mut new_inputs, &mut new_outputs);
}

let mut txs_cache = self.shared.txs_verify_cache().write();
// The verify function
let txs_verifier = TransactionsVerifier::new(self.shared.consensus().max_block_cycles());

Expand Down Expand Up @@ -471,10 +470,37 @@ impl<CI: ChainIndex + 'static> ChainService<CI> {
}
})
};

let mut output_indexs = FnvHashMap::default();
let mut seen_inputs = FnvHashSet::default();

for (i, tx) in b.commit_transactions().iter().enumerate() {
output_indexs.insert(tx.hash(), i);
}

// cellbase verified
let resolved: Vec<ResolvedTransaction> = b
.commit_transactions()
.iter()
.skip(1)
.map(|x| {
resolve_transaction(x, &mut seen_inputs, |o| {
if let Some(i) = output_indexs.get(&o.hash) {
match b.commit_transactions()[*i].outputs().get(o.index as usize) {
Some(x) => CellStatus::Live(x.clone()),
None => CellStatus::Unknown,
}
} else {
cell_resolver(o)
}
})
})
.collect();

if !found_error
|| skip_verify
|| txs_verifier
.verify(&mut *txs_cache, b, cell_resolver)
.verify(chain_state.mut_txs_verify_cache(), &resolved)
.is_ok()
{
push_new(b, &mut new_inputs, &mut new_outputs);
Expand Down Expand Up @@ -514,7 +540,7 @@ impl<CI: ChainIndex + 'static> ChainService<CI> {
})
}

fn print_chain(&self, chain_state: &ChainState, len: u64) {
fn print_chain(&self, chain_state: &ChainState<CI>, len: u64) {
debug!(target: "chain", "Chain {{");

let tip = chain_state.tip_number();
Expand Down
6 changes: 3 additions & 3 deletions chain/src/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ fn test_calculate_difficulty() {
chain2.push(new_block.clone());
parent = new_block.header().clone();
}
let tip = shared.chain_state().read().tip_header().clone();
let tip = shared.chain_state().lock().tip_header().clone();
let total_uncles_count = shared.block_ext(&tip.hash()).unwrap().total_uncles_count;
assert_eq!(total_uncles_count, 25);
let difficulty = shared.calculate_difficulty(&tip).unwrap();
Expand Down Expand Up @@ -319,7 +319,7 @@ fn test_calculate_difficulty() {
chain2.push(new_block.clone());
parent = new_block.header().clone();
}
let tip = shared.chain_state().read().tip_header().clone();
let tip = shared.chain_state().lock().tip_header().clone();
let total_uncles_count = shared.block_ext(&tip.hash()).unwrap().total_uncles_count;
assert_eq!(total_uncles_count, 10);
let difficulty = shared.calculate_difficulty(&tip).unwrap();
Expand Down Expand Up @@ -349,7 +349,7 @@ fn test_calculate_difficulty() {
chain2.push(new_block.clone());
parent = new_block.header().clone();
}
let tip = shared.chain_state().read().tip_header().clone();
let tip = shared.chain_state().lock().tip_header().clone();
let total_uncles_count = shared.block_ext(&tip.hash()).unwrap().total_uncles_count;
assert_eq!(total_uncles_count, 150);
let difficulty = shared.calculate_difficulty(&tip).unwrap();
Expand Down
8 changes: 4 additions & 4 deletions chain/src/tests/find_fork.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ fn test_find_fork_case1() {
chain_service.process_block(Arc::new(blk.clone())).unwrap();
}

let tip_number = { shared.chain_state().read().tip_number() };
let tip_number = { shared.chain_state().lock().tip_number() };

let new_block = gen_block(&parent, 100, U256::from(200u64), vec![], vec![]);
fork2.push(new_block.clone());
Expand Down Expand Up @@ -122,7 +122,7 @@ fn test_find_fork_case2() {
chain_service.process_block(Arc::new(blk.clone())).unwrap();
}

let tip_number = { shared.chain_state().read().tip_number() };
let tip_number = { shared.chain_state().lock().tip_number() };

let difficulty = parent.difficulty().clone();
let new_block = gen_block(
Expand Down Expand Up @@ -198,7 +198,7 @@ fn test_find_fork_case3() {
chain_service.process_block(Arc::new(blk.clone())).unwrap();
}

let tip_number = { shared.chain_state().read().tip_number() };
let tip_number = { shared.chain_state().lock().tip_number() };

println!("case3 tip{}", tip_number);

Expand Down Expand Up @@ -268,7 +268,7 @@ fn test_find_fork_case4() {
chain_service.process_block(Arc::new(blk.clone())).unwrap();
}

let tip_number = { shared.chain_state().read().tip_number() };
let tip_number = { shared.chain_state().lock().tip_number() };

println!("case3 tip{}", tip_number);

Expand Down
6 changes: 3 additions & 3 deletions miner/src/block_assembler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,9 @@ impl<CI: ChainIndex + 'static> BlockAssembler<CI> {
let uncles_count_limit = self.shared.consensus().max_uncles_num() as u32;

let last_uncles_updated_at = self.last_uncles_updated_at.load(Ordering::SeqCst) as u64;
let last_txs_updated_at = self.shared.get_last_txs_updated_at();
let chain_state = self.shared.chain_state().lock();
let last_txs_updated_at = chain_state.get_last_txs_updated_at();

let chain_state = self.shared.chain_state().read();
let header = chain_state.tip_header();
let number = chain_state.tip_number() + 1;
let current_time = cmp::max(unix_time_as_millis(), header.timestamp() + 1);
Expand All @@ -257,7 +257,7 @@ impl<CI: ChainIndex + 'static> BlockAssembler<CI> {
.expect("get difficulty");

let (proposal_transactions, commit_transactions) =
self.shared.get_proposal_commit_txs(10000, 10000);
chain_state.get_proposal_and_staging_txs(10000, 10000);

let (uncles, bad_uncles) = self.prepare_uncles(&header, &difficulty);
if !bad_uncles.is_empty() {
Expand Down
6 changes: 3 additions & 3 deletions rpc/src/module/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl<CI: ChainIndex + 'static> ChainRpc for ChainRpcImpl<CI> {
}

fn get_tip_header(&self) -> Result<Header> {
Ok(self.shared.chain_state().read().tip_header().into())
Ok(self.shared.chain_state().lock().tip_header().into())
}

// TODO: we need to build a proper index instead of scanning every time
Expand All @@ -65,7 +65,7 @@ impl<CI: ChainIndex + 'static> ChainRpc for ChainRpcImpl<CI> {
to: BlockNumber,
) -> Result<Vec<CellOutputWithOutPoint>> {
let mut result = Vec::new();
let chain_state = self.shared.chain_state().read();
let chain_state = self.shared.chain_state().lock();
for block_number in from..=to {
if let Some(block_hash) = self.shared.block_hash(block_number) {
let block = self
Expand Down Expand Up @@ -100,6 +100,6 @@ impl<CI: ChainIndex + 'static> ChainRpc for ChainRpcImpl<CI> {
}

fn get_tip_block_number(&self) -> Result<BlockNumber> {
Ok(self.shared.chain_state().read().tip_number())
Ok(self.shared.chain_state().lock().tip_number())
}
}
4 changes: 3 additions & 1 deletion rpc/src/module/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ impl<CI: ChainIndex + 'static> PoolRpc for PoolRpcImpl<CI> {
fn send_transaction(&self, tx: Transaction) -> Result<H256> {
let tx: CoreTransaction = tx.into();
let tx_hash = tx.hash().clone();
self.shared.tx_pool().write().enqueue_tx(tx.clone());
let mut chain_state = self.shared.chain_state().lock();
let tx_pool = chain_state.mut_tx_pool();
tx_pool.enqueue_tx(tx.clone());

let fbb = &mut FlatBufferBuilder::new();
let message = RelayMessage::build_transaction(fbb, &tx);
Expand Down
8 changes: 6 additions & 2 deletions rpc/src/module/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ impl<CI: ChainIndex + 'static> TraceRpc for TraceRpcImpl<CI> {
fn trace_transaction(&self, tx: Transaction) -> Result<H256> {
let tx: CoreTransaction = tx.into();
let tx_hash = tx.hash().clone();
self.shared.tx_pool().write().trace_tx(tx.clone());
let mut chain_state = self.shared.chain_state().lock();
let tx_pool = chain_state.mut_tx_pool();
tx_pool.trace_tx(tx.clone());

let fbb = &mut FlatBufferBuilder::new();
let message = RelayMessage::build_transaction(fbb, &tx);
Expand All @@ -47,6 +49,8 @@ impl<CI: ChainIndex + 'static> TraceRpc for TraceRpcImpl<CI> {
}

fn get_transaction_trace(&self, hash: H256) -> Result<Option<Vec<TxTrace>>> {
Ok(self.shared.tx_pool().read().get_tx_traces(&hash).cloned())
let chain_state = self.shared.chain_state().lock();
let tx_pool = chain_state.tx_pool();
Ok(tx_pool.get_tx_traces(&hash).cloned())
}
}
Loading

0 comments on commit d51c197

Please sign in to comment.