Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop calculating transaction hashes twice in the checkpoint verifier #2696

Merged
merged 3 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 69 additions & 50 deletions zebra-consensus/src/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use zebra_chain::{
parameters::{Network, GENESIS_PREVIOUS_BLOCK_HASH},
work::equihash,
};
use zebra_state as zs;
use zebra_state::{self as zs, FinalizedBlock};

use crate::{block::VerifyBlockError, error::BlockError, BoxError};

Expand All @@ -49,14 +49,21 @@ use types::{TargetHeight, TargetHeight::*};
/// An unverified block, which is in the queue for checkpoint verification.
#[derive(Debug)]
struct QueuedBlock {
/// The block data.
block: Arc<Block>,
/// `block`'s cached header hash.
hash: block::Hash,
/// The block, with additional precalculated data.
block: FinalizedBlock,
/// The transmitting end of the oneshot channel for this block's result.
tx: oneshot::Sender<Result<block::Hash, VerifyCheckpointError>>,
}

/// The unverified block, with a receiver for the [`QueuedBlock`]'s result.
#[derive(Debug)]
struct RequestBlock {
/// The block, with additional precalculated data.
block: FinalizedBlock,
/// The receiving end of the oneshot channel for this block's result.
rx: oneshot::Receiver<Result<block::Hash, VerifyCheckpointError>>,
}

/// A list of unverified blocks at a particular height.
///
/// Typically contains a single block, but might contain more if a peer
Expand Down Expand Up @@ -467,6 +474,8 @@ where

/// Check that the block height, proof of work, and Merkle root are valid.
///
/// Returns a [`FinalizedBlock`] with precalculated block data.
///
/// ## Security
///
/// Checking the proof of work makes resource exhaustion attacks harder to
Expand All @@ -475,7 +484,7 @@ where
/// Checking the Merkle root ensures that the block hash binds the block
/// contents. To prevent malleability (CVE-2012-2459), we also need to check
/// whether the transaction hashes are unique.
fn check_block(&self, block: &Block) -> Result<block::Height, VerifyCheckpointError> {
fn check_block(&self, block: Arc<Block>) -> Result<FinalizedBlock, VerifyCheckpointError> {
let hash = block.hash();
let height = block
.coinbase_height()
Expand All @@ -485,40 +494,42 @@ where
crate::block::check::difficulty_is_valid(&block.header, self.network, &height, &hash)?;
crate::block::check::equihash_solution_is_valid(&block.header)?;

let transaction_hashes = block
.transactions
.iter()
.map(|tx| tx.hash())
.collect::<Vec<_>>();
// don't do precalculation until the block passes basic difficulty checks
let block = FinalizedBlock::with_hash_and_height(block, hash, height);

crate::block::check::merkle_root_validity(self.network, block, &transaction_hashes)?;
crate::block::check::merkle_root_validity(
self.network,
&block.block,
&block.transaction_hashes,
)?;

Ok(height)
Ok(block)
}

/// Queue `block` for verification, and return the `Receiver` for the
/// block's verification result.
/// Queue `block` for verification.
///
/// On success, returns a [`RequestBlock`] containing the block,
/// precalculated request data, and the queued result receiver.
///
/// Verification will finish when the chain to the next checkpoint is
/// complete, and the caller will be notified via the channel.
///
/// If the block does not have a coinbase height, sends an error on `tx`,
/// and does not queue the block.
fn queue_block(
&mut self,
block: Arc<Block>,
) -> oneshot::Receiver<Result<block::Hash, VerifyCheckpointError>> {
/// If the block does not pass basic validity checks,
/// returns an error immediately.
fn queue_block(&mut self, block: Arc<Block>) -> Result<RequestBlock, VerifyCheckpointError> {
// Set up a oneshot channel to send results
let (tx, rx) = oneshot::channel();

// Check that the height and Merkle roots are valid.
let height = match self.check_block(&block) {
Ok(height) => height,
Err(error) => {
tx.send(Err(error)).expect("rx has not been dropped yet");
return rx;
}
let block = self.check_block(block)?;
let height = block.height;
let hash = block.hash;

let new_qblock = QueuedBlock {
block: block.clone(),
tx,
};
let req_block = RequestBlock { block, rx };

// Since we're using Arc<Block>, each entry is a single pointer to the
// Arc. But there are a lot of QueuedBlockLists in the queue, so we keep
Expand All @@ -528,29 +539,25 @@ where
.entry(height)
.or_insert_with(|| QueuedBlockList::with_capacity(1));

let hash = block.hash();

// Replace older requests by newer ones by swapping the oneshot.
for qb in qblocks.iter_mut() {
if qb.hash == hash {
if qb.block.hash == hash {
let e = VerifyCheckpointError::NewerRequest { height, hash };
tracing::trace!(?e, "failing older of duplicate requests");
let old_tx = std::mem::replace(&mut qb.tx, tx);
let old_tx = std::mem::replace(&mut qb.tx, new_qblock.tx);
let _ = old_tx.send(Err(e));
return rx;
return Ok(req_block);
}
}

// Memory DoS resistance: limit the queued blocks at each height
if qblocks.len() >= MAX_QUEUED_BLOCKS_PER_HEIGHT {
let e = VerifyCheckpointError::QueuedLimit;
tracing::warn!(?e);
let _ = tx.send(Err(e));
return rx;
return Err(e);
}

// Add the block to the list of queued blocks at this height
let new_qblock = QueuedBlock { block, hash, tx };
// This is a no-op for the first block in each QueuedBlockList.
qblocks.reserve_exact(1);
qblocks.push(new_qblock);
Expand All @@ -567,7 +574,7 @@ where
let is_checkpoint = self.checkpoint_list.contains(height);
tracing::debug!(?height, ?hash, ?is_checkpoint, "queued block");

rx
Ok(req_block)
}

/// During checkpoint range processing, process all the blocks at `height`.
Expand Down Expand Up @@ -606,21 +613,21 @@ where
// If there are any side-chain blocks, they fail validation.
let mut valid_qblock = None;
for qblock in qblocks.drain(..) {
if qblock.hash == expected_hash {
if qblock.block.hash == expected_hash {
if valid_qblock.is_none() {
// The first valid block at the current height
valid_qblock = Some(qblock);
} else {
unreachable!("unexpected duplicate block {:?} {:?}: duplicate blocks should be rejected before being queued",
height, qblock.hash);
height, qblock.block.hash);
}
} else {
tracing::info!(?height, ?qblock.hash, ?expected_hash,
tracing::info!(?height, ?qblock.block.hash, ?expected_hash,
"Side chain hash at height in CheckpointVerifier");
let _ = qblock
.tx
.send(Err(VerifyCheckpointError::UnexpectedSideChain {
found: qblock.hash,
found: qblock.block.hash,
expected: expected_hash,
}));
}
Expand Down Expand Up @@ -693,7 +700,7 @@ where
for current_height in range_heights {
let valid_qblock = self.process_height(current_height, expected_hash);
if let Some(qblock) = valid_qblock {
expected_hash = qblock.block.header.previous_block_hash;
expected_hash = qblock.block.block.header.previous_block_hash;
// Add the block to the end of the pending block list
// (since we're walking the chain backwards, the list is
// in reverse chain order)
Expand All @@ -714,11 +721,10 @@ where
// The order here shouldn't matter, but add the blocks in
// height order, for consistency.
for vblock in rev_valid_blocks.drain(..).rev() {
let height = vblock
.block
.coinbase_height()
.expect("queued blocks have a block height");
self.queued.entry(height).or_default().push(vblock);
self.queued
.entry(vblock.block.height)
.or_default()
.push(vblock);
}

// Make sure the current progress hasn't changed
Expand Down Expand Up @@ -757,7 +763,7 @@ where
// in height order.
for qblock in rev_valid_blocks.drain(..).rev() {
// Sending can fail, but there's nothing we can do about it.
let _ = qblock.tx.send(Ok(qblock.hash));
let _ = qblock.tx.send(Ok(qblock.block.hash));
}

// Finally, update the checkpoint bounds
Expand Down Expand Up @@ -916,7 +922,11 @@ where
return async { Err(VerifyCheckpointError::Finished) }.boxed();
}

let rx = self.queue_block(block.clone());
let req_block = match self.queue_block(block) {
Ok(req_block) => req_block,
Err(e) => return async { Err(e) }.boxed(),
};

self.process_checkpoint_range();

metrics::gauge!("checkpoint.queued_slots", self.queued.len() as f64);
Expand All @@ -929,6 +939,8 @@ where
// verifier to reject blocks not already in the state as
// already-verified.
//
// # Dropped Receivers
//
// To commit blocks transactionally on a per-checkpoint basis, we must
// commit all verified blocks in a checkpoint range, regardless of
// whether or not the response futures for each block were dropped.
Expand All @@ -937,9 +949,16 @@ where
// commit-if-verified logic. This task will always execute, except if
// the program is interrupted, in which case there is no longer a
// checkpoint verifier to keep in sync with the state.
//
// # State Commit Failures
//
// If the state commit fails due to corrupt block data,
// we don't reject the entire checkpoint.
// Instead, we reset the verifier to the successfully committed state tip.
let state_service = self.state_service.clone();
let commit_finalized_block = tokio::spawn(async move {
let hash = rx
let hash = req_block
.rx
.await
.map_err(Into::into)
.map_err(VerifyCheckpointError::CommitFinalized)
Expand All @@ -948,7 +967,7 @@ where
// We use a `ServiceExt::oneshot`, so that every state service
// `poll_ready` has a corresponding `call`. See #1593.
match state_service
.oneshot(zs::Request::CommitFinalizedBlock(block.into()))
.oneshot(zs::Request::CommitFinalizedBlock(req_block.block))
.map_err(VerifyCheckpointError::CommitFinalized)
.await?
{
Expand Down
66 changes: 50 additions & 16 deletions zebra-state/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,14 @@ pub struct PreparedBlock {
/// be unspent, since a later transaction in a block can spend outputs of an
/// earlier transaction.
pub new_outputs: HashMap<transparent::OutPoint, transparent::OrderedUtxo>,
/// A precomputed list of the hashes of the transactions in this block.
/// A precomputed list of the hashes of the transactions in this block,
/// in the same order as `block.transactions`.
pub transaction_hashes: Arc<[transaction::Hash]>,
}

// Some fields are pub(crate), so we can add whatever db-format-dependent
// precomputation we want here without leaking internal details.

/// A contextually validated block, ready to be committed directly to the finalized state with
/// no checks, if it becomes the root of the best non-finalized chain.
///
Expand All @@ -104,13 +108,26 @@ pub struct ContextuallyValidBlock {
/// This is exposed for use in checkpointing.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct FinalizedBlock {
// These are pub(crate) so we can add whatever db-format-dependent
// precomputation we want here without leaking internal details.
pub(crate) block: Arc<Block>,
pub(crate) hash: block::Hash,
pub(crate) height: block::Height,
/// The block to commit to the state.
pub block: Arc<Block>,
/// The hash of the block.
pub hash: block::Hash,
/// The height of the block.
pub height: block::Height,
/// New transparent outputs created in this block, indexed by
/// [`Outpoint`](transparent::Outpoint).
///
/// Each output is tagged with its transaction index in the block.
/// (The outputs of earlier transactions in a block can be spent by later
/// transactions.)
///
/// Note: although these transparent outputs are newly created, they may not
/// be unspent, since a later transaction in a block can spend outputs of an
/// earlier transaction.
pub(crate) new_outputs: HashMap<transparent::OutPoint, transparent::Utxo>,
pub(crate) transaction_hashes: Arc<[transaction::Hash]>,
/// A precomputed list of the hashes of the transactions in this block,
/// in the same order as `block.transactions`.
pub transaction_hashes: Arc<[transaction::Hash]>,
}

impl From<&PreparedBlock> for PreparedBlock {
Expand All @@ -119,6 +136,10 @@ impl From<&PreparedBlock> for PreparedBlock {
}
}

// Doing precomputation in these impls means that it will be done in
// the *service caller*'s task, not inside the service call itself.
// This allows moving work out of the single-threaded state service.

impl ContextuallyValidBlock {
/// Create a block that's ready for non-finalized [`Chain`] contextual validation,
/// using a [`PreparedBlock`] and the UTXOs it spends.
Expand Down Expand Up @@ -156,15 +177,17 @@ impl ContextuallyValidBlock {
}
}

// Doing precomputation in this From impl means that it will be done in
// the *service caller*'s task, not inside the service call itself.
// This allows moving work out of the single-threaded state service.
impl From<Arc<Block>> for FinalizedBlock {
fn from(block: Arc<Block>) -> Self {
let height = block
.coinbase_height()
.expect("finalized blocks must have a valid coinbase height");
let hash = block.hash();
impl FinalizedBlock {
/// Create a block that's ready to be committed to the finalized state,
/// using a precalculated [`block::Hash`] and [`block::Height`].
///
/// Note: a [`FinalizedBlock`] isn't actually finalized
/// until [`Request::CommitFinalizedBlock`] returns success.
pub fn with_hash_and_height(
block: Arc<Block>,
hash: block::Hash,
height: block::Height,
) -> Self {
let transaction_hashes: Arc<[_]> = block.transactions.iter().map(|tx| tx.hash()).collect();
let new_outputs = transparent::new_outputs(&block, &transaction_hashes);

Expand All @@ -178,6 +201,17 @@ impl From<Arc<Block>> for FinalizedBlock {
}
}

impl From<Arc<Block>> for FinalizedBlock {
fn from(block: Arc<Block>) -> Self {
let hash = block.hash();
let height = block
.coinbase_height()
.expect("finalized blocks must have a valid coinbase height");

FinalizedBlock::with_hash_and_height(block, hash, height)
}
}

impl From<ContextuallyValidBlock> for FinalizedBlock {
fn from(contextually_valid: ContextuallyValidBlock) -> Self {
let ContextuallyValidBlock {
Expand Down