Skip to content

Commit

Permalink
Merge pull request #2266 from blockstack/fix/net-slow-lock
Browse files Browse the repository at this point in the history
Fix/net slow lock
  • Loading branch information
jcnelson authored Jan 5, 2021
2 parents 6343af7 + 1d977c1 commit 404f87d
Show file tree
Hide file tree
Showing 11 changed files with 407 additions and 207 deletions.
6 changes: 6 additions & 0 deletions src/chainstate/stacks/db/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5109,6 +5109,12 @@ impl StacksChainState {
tx_size,
)
})
.map_err(|_| {
MemPoolRejection::NoSuchChainTip(
current_consensus_hash.clone(),
current_block.clone(),
)
})?
.expect("BUG: do not have unconfirmed state, despite being Some(..)")
} else {
Err(MemPoolRejection::BadNonces(mismatch_error))
Expand Down
26 changes: 12 additions & 14 deletions src/chainstate/stacks/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1621,30 +1621,32 @@ impl StacksChainState {
&mut self,
burn_dbconn: &dyn BurnStateDB,
to_do: F,
) -> Option<R>
) -> Result<Option<R>, Error>
where
F: FnOnce(&mut ClarityReadOnlyConnection) -> R,
{
if let Some(ref unconfirmed) = self.unconfirmed_state.as_ref() {
if !unconfirmed.is_readable() {
return None;
return Ok(None);
}
}

let mut unconfirmed_state_opt = self.unconfirmed_state.take();
let res = if let Some(ref mut unconfirmed_state) = unconfirmed_state_opt {
let mut conn = unconfirmed_state.clarity_inst.read_only_connection(
&unconfirmed_state.unconfirmed_chain_tip,
self.db(),
burn_dbconn,
);
let mut conn = unconfirmed_state
.clarity_inst
.read_only_connection_checked(
&unconfirmed_state.unconfirmed_chain_tip,
self.db(),
burn_dbconn,
)?;
let result = to_do(&mut conn);
Some(result)
} else {
None
};
self.unconfirmed_state = unconfirmed_state_opt;
res
Ok(res)
}

/// Run to_do on the unconfirmed Clarity VM state if the tip refers to the unconfirmed state;
Expand All @@ -1655,7 +1657,7 @@ impl StacksChainState {
burn_dbconn: &dyn BurnStateDB,
parent_tip: &StacksBlockId,
to_do: F,
) -> Option<R>
) -> Result<Option<R>, Error>
where
F: FnOnce(&mut ClarityReadOnlyConnection) -> R,
{
Expand All @@ -1669,7 +1671,7 @@ impl StacksChainState {
if unconfirmed {
self.with_read_only_unconfirmed_clarity_tx(burn_dbconn, to_do)
} else {
self.with_read_only_clarity_tx(burn_dbconn, parent_tip, to_do)
Ok(self.with_read_only_clarity_tx(burn_dbconn, parent_tip, to_do))
}
}

Expand Down Expand Up @@ -1708,8 +1710,6 @@ impl StacksChainState {
}

/// Open a Clarity transaction against this chainstate's unconfirmed state, if it exists.
/// This marks the unconfirmed chainstate as "dirty" so that no future queries against it can
/// happen.
pub fn begin_unconfirmed<'a>(
&'a mut self,
burn_dbconn: &'a dyn BurnStateDB,
Expand All @@ -1721,8 +1721,6 @@ impl StacksChainState {
return None;
}

unconfirmed.set_dirty(true);

Some(StacksChainState::chainstate_begin_unconfirmed(
conf,
self.state_index.sqlite_conn(),
Expand Down
68 changes: 62 additions & 6 deletions src/chainstate/stacks/db/unconfirmed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,20 @@ use vm::database::NULL_HEADER_DB;

use vm::costs::ExecutionCost;

pub type UnconfirmedTxMap = HashMap<Txid, (StacksTransaction, BlockHeaderHash, u16)>;

pub struct UnconfirmedState {
pub confirmed_chain_tip: StacksBlockId,
pub unconfirmed_chain_tip: StacksBlockId,
pub clarity_inst: ClarityInstance,
pub mined_txs: HashMap<Txid, (StacksTransaction, BlockHeaderHash, u16)>,
pub mined_txs: UnconfirmedTxMap,
pub cost_so_far: ExecutionCost,
pub bytes_so_far: u64,

pub last_mblock: Option<StacksMicroblockHeader>,
pub last_mblock_seq: u16,

readonly: bool,
dirty: bool,
}

Expand All @@ -68,13 +71,40 @@ impl UnconfirmedState {
confirmed_chain_tip: tip,
unconfirmed_chain_tip: unconfirmed_tip,
clarity_inst: clarity_instance,
mined_txs: HashMap::new(),
mined_txs: UnconfirmedTxMap::new(),
cost_so_far: ExecutionCost::zero(),
bytes_so_far: 0,

last_mblock: None,
last_mblock_seq: 0,

readonly: false,
dirty: false,
})
}

/// Make a new unconfirmed state, but don't do anything with it yet, and deny refreshes.
fn new_readonly(
chainstate: &StacksChainState,
tip: StacksBlockId,
) -> Result<UnconfirmedState, Error> {
let marf = MarfedKV::open_unconfirmed(&chainstate.clarity_state_index_root, None)?;

let clarity_instance = ClarityInstance::new(marf, chainstate.block_limit.clone());
let unconfirmed_tip = MARF::make_unconfirmed_chain_tip(&tip);

Ok(UnconfirmedState {
confirmed_chain_tip: tip,
unconfirmed_chain_tip: unconfirmed_tip,
clarity_inst: clarity_instance,
mined_txs: UnconfirmedTxMap::new(),
cost_so_far: ExecutionCost::zero(),
bytes_so_far: 0,

last_mblock: None,
last_mblock_seq: 0,

readonly: true,
dirty: false,
})
}
Expand All @@ -91,7 +121,7 @@ impl UnconfirmedState {
mblocks: Vec<StacksMicroblock>,
) -> Result<(u128, u128, Vec<StacksTransactionReceipt>), Error> {
if self.last_mblock_seq == u16::max_value() {
// drop them
// drop them -- nothing to do
return Ok((0, 0, vec![]));
}

Expand All @@ -108,7 +138,7 @@ impl UnconfirmedState {
let mut total_fees = 0;
let mut total_burns = 0;
let mut all_receipts = vec![];
let mut mined_txs = HashMap::new();
let mut mined_txs = UnconfirmedTxMap::new();
let new_cost;
let mut new_bytes = 0;

Expand Down Expand Up @@ -195,7 +225,7 @@ impl UnconfirmedState {
Ok((total_fees, total_burns, all_receipts))
}

/// Load up Stacks microblock stream to process
/// Load up the Stacks microblock stream to process, composed of only the new microblocks
fn load_child_microblocks(
&self,
chainstate: &StacksChainState,
Expand All @@ -222,6 +252,11 @@ impl UnconfirmedState {
chainstate: &StacksChainState,
burn_dbconn: &dyn BurnStateDB,
) -> Result<(u128, u128, Vec<StacksTransactionReceipt>), Error> {
assert!(
!self.readonly,
"BUG: code tried to write unconfirmed state to a read-only instance"
);

if self.last_mblock_seq == u16::max_value() {
// no-op
return Ok((0, 0, vec![]));
Expand All @@ -235,7 +270,7 @@ impl UnconfirmedState {

/// Is there any state to read?
pub fn is_readable(&self) -> bool {
self.has_data() && !self.dirty
(self.has_data() || self.readonly) && !self.dirty
}

/// Can we write to this unconfirmed state?
Expand Down Expand Up @@ -395,6 +430,25 @@ impl StacksChainState {
res
}

/// Instantiate a read-only view of unconfirmed state.
/// Use from a dedicated chainstate handle that will only do read-only operations on it (such
/// as the p2p network thread)
pub fn refresh_unconfirmed_readonly(
&mut self,
canonical_tip: StacksBlockId,
) -> Result<(), Error> {
if let Some(ref unconfirmed) = self.unconfirmed_state {
assert!(
unconfirmed.readonly,
"BUG: tried to replace a read/write unconfirmed state instance"
);
}

let unconfirmed = UnconfirmedState::new_readonly(self, canonical_tip)?;
self.unconfirmed_state = Some(unconfirmed);
Ok(())
}

pub fn set_unconfirmed_dirty(&mut self, dirty: bool) {
if let Some(ref mut unconfirmed) = self.unconfirmed_state.as_mut() {
unconfirmed.dirty = dirty;
Expand Down Expand Up @@ -632,6 +686,7 @@ mod test {
clarity_db.get_account_stx_balance(&recv_addr.into())
})
})
.unwrap()
.unwrap();
peer.sortdb = Some(sortdb);

Expand Down Expand Up @@ -862,6 +917,7 @@ mod test {
clarity_db.get_account_stx_balance(&recv_addr.into())
})
})
.unwrap()
.unwrap();
peer.sortdb = Some(sortdb);

Expand Down
4 changes: 2 additions & 2 deletions src/chainstate/stacks/index/marf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2653,7 +2653,7 @@ mod test {
m.commit().unwrap();
let flush_end_time = get_epoch_time_ms();

debug!(
eprintln!(
"Inserted {} in {} (1 insert = {} ms). Processed {} keys in {} ms (flush = {} ms)",
i,
end_time - start_time,
Expand Down Expand Up @@ -2703,7 +2703,7 @@ mod test {

end_time = get_epoch_time_ms();

debug!(
eprintln!(
"Got {} in {} (1 get = {} ms)",
i,
end_time - start_time,
Expand Down
48 changes: 6 additions & 42 deletions src/net/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3939,9 +3939,7 @@ impl PeerNetwork {
// update burnchain snapshot if we need to (careful -- it's expensive)
let sn = SortitionDB::get_canonical_burn_chain_tip(&sortdb.conn())?;
let mut ret: HashMap<NeighborKey, Vec<StacksMessage>> = HashMap::new();
if sn.block_height > self.chain_view.burn_block_height
|| sn.burn_header_hash != self.antientropy_last_burnchain_tip
{
if sn.block_height > self.chain_view.burn_block_height {
debug!(
"{:?}: load chain view for burn block {}",
&self.local_peer, sn.block_height
Expand All @@ -3955,7 +3953,9 @@ impl PeerNetwork {
self.hint_sync_invs();
self.hint_download_rescan();
self.chain_view = new_chain_view;
}

if sn.burn_header_hash != self.antientropy_last_burnchain_tip {
// try processing previously-buffered messages (best-effort)
let buffered_messages = mem::replace(&mut self.pending_messages, HashMap::new());
ret = self.handle_unsolicited_messages(sortdb, chainstate, buffered_messages, false)?;
Expand Down Expand Up @@ -4104,26 +4104,6 @@ impl PeerNetwork {
Ok(())
}

/// Set up the unconfirmed chain state off of the canonical chain tip.
pub fn setup_unconfirmed_state(
chainstate: &mut StacksChainState,
sortdb: &SortitionDB,
) -> Result<(), Error> {
let (canonical_consensus_hash, canonical_block_hash) =
SortitionDB::get_canonical_stacks_chain_tip_hash(sortdb.conn())?;
let canonical_tip = StacksBlockHeader::make_index_block_hash(
&canonical_consensus_hash,
&canonical_block_hash,
);
// setup unconfirmed state off of this tip
debug!(
"Reload unconfirmed state off of {}/{}",
&canonical_consensus_hash, &canonical_block_hash
);
chainstate.reload_unconfirmed_state(&sortdb.index_conn(), canonical_tip)?;
Ok(())
}

/// Store a single transaction
/// Return true if stored; false if it was a dup.
/// Has to be done here, since only the p2p network has the unconfirmed state.
Expand Down Expand Up @@ -4153,7 +4133,7 @@ impl PeerNetwork {

/// Store all inbound transactions, and return the ones that we actually stored so they can be
/// relayed.
fn store_transactions(
pub fn store_transactions(
mempool: &mut MemPoolDB,
chainstate: &mut StacksChainState,
sortdb: &SortitionDB,
Expand Down Expand Up @@ -4184,6 +4164,8 @@ impl PeerNetwork {
}
}

// (HTTP-uploaded transactions are already in the mempool)

network_result.pushed_transactions.extend(ret);
Ok(())
}
Expand Down Expand Up @@ -4269,24 +4251,6 @@ impl PeerNetwork {
p2p_poll_state,
)?;

if let Err(e) =
PeerNetwork::store_transactions(mempool, chainstate, sortdb, &mut network_result)
{
warn!("Failed to store transactions: {:?}", &e);
}

if let Err(e) = PeerNetwork::setup_unconfirmed_state(chainstate, sortdb) {
if let net_error::ChainstateError(ref err_msg) = e {
if err_msg == "Stacks chainstate error: NoSuchBlockError" {
trace!("Failed to instantiate unconfirmed state: {:?}", &e);
} else {
warn!("Failed to instantiate unconfirmed state: {:?}", &e);
}
} else {
warn!("Failed to instantiate unconfirmed state: {:?}", &e);
}
}

debug!("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< End Network Dispatch <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
Ok(network_result)
}
Expand Down
Loading

0 comments on commit 404f87d

Please sign in to comment.