Skip to content

Commit

Permalink
perf(provider): compute hashes and trie updates before opening write …
Browse files Browse the repository at this point in the history
…tx (#5505)
  • Loading branch information
rkrasiuk authored Nov 28, 2023
1 parent 98250f8 commit 608f100
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 30 deletions.
17 changes: 16 additions & 1 deletion bin/reth/src/debug_cmd/build_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,26 @@ impl Command {
let state = executor.take_output_state();
debug!(target: "reth::cli", ?state, "Executed block");

let hashed_state = state.hash_state_slow();
let (state_root, trie_updates) = state
.state_root_calculator(provider_factory.provider()?.tx_ref(), &hashed_state)
.root_with_updates()?;

if state_root != block_with_senders.state_root {
eyre::bail!(
"state root mismatch. expected: {}. got: {}",
block_with_senders.state_root,
state_root
);
}

// Attempt to insert new block without committing
let provider_rw = provider_factory.provider_rw()?;
provider_rw.append_blocks_with_bundle_state(
provider_rw.append_blocks_with_state(
Vec::from([block_with_senders]),
state,
hashed_state,
trie_updates,
None,
)?;
info!(target: "reth::cli", "Successfully appended built block");
Expand Down
38 changes: 30 additions & 8 deletions crates/blockchain-tree/src/blockchain_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,26 @@ use crate::{
state::{BlockChainId, TreeState},
AppendableChain, BlockIndices, BlockchainTreeConfig, BundleStateData, TreeExternals,
};
use reth_db::database::Database;
use reth_db::{database::Database, DatabaseError};
use reth_interfaces::{
blockchain_tree::{
error::{BlockchainTreeError, CanonicalError, InsertBlockError, InsertBlockErrorKind},
BlockStatus, BlockValidationKind, CanonicalOutcome, InsertPayloadOk,
},
consensus::{Consensus, ConsensusError},
executor::{BlockExecutionError, BlockValidationError},
RethResult,
provider::RootMismatch,
RethError, RethResult,
};
use reth_primitives::{
BlockHash, BlockNumHash, BlockNumber, ForkBlock, Hardfork, PruneModes, Receipt, SealedBlock,
SealedBlockWithSenders, SealedHeader, U256,
BlockHash, BlockNumHash, BlockNumber, ForkBlock, GotExpected, Hardfork, PruneModes, Receipt,
SealedBlock, SealedBlockWithSenders, SealedHeader, U256,
};
use reth_provider::{
chain::{ChainSplit, ChainSplitTarget},
BlockExecutionWriter, BlockNumReader, BlockWriter, BundleStateWithReceipts,
CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications, Chain,
ChainSpecProvider, DisplayBlocksChain, ExecutorFactory, HeaderProvider,
ChainSpecProvider, DisplayBlocksChain, ExecutorFactory, HeaderProvider, ProviderError,
};
use reth_stages::{MetricEvent, MetricEventsSender};
use std::{collections::BTreeMap, sync::Arc};
Expand Down Expand Up @@ -1104,14 +1105,35 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {

/// Write the given chain to the database as canonical.
fn commit_canonical_to_database(&self, chain: Chain) -> RethResult<()> {
let provider_rw = self.externals.provider_factory.provider_rw()?;
// Compute state root before opening write transaction.
let hashed_state = chain.state().hash_state_slow();
let (state_root, trie_updates) = chain
.state()
.state_root_calculator(
self.externals.provider_factory.provider()?.tx_ref(),
&hashed_state,
)
.root_with_updates()
.map_err(Into::<DatabaseError>::into)?;
let tip = chain.tip();
if state_root != tip.state_root {
return Err(RethError::Provider(ProviderError::StateRootMismatch(Box::new(
RootMismatch {
root: GotExpected { got: state_root, expected: tip.state_root },
block_number: tip.number,
block_hash: tip.hash,
},
))))
}

let (blocks, state) = chain.into_inner();

let provider_rw = self.externals.provider_factory.provider_rw()?;
provider_rw
.append_blocks_with_bundle_state(
.append_blocks_with_state(
blocks.into_blocks().collect(),
state,
hashed_state,
trie_updates,
self.prune_modes.as_ref(),
)
.map_err(|e| BlockExecutionError::CanonicalCommit { inner: e.to_string() })?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,14 @@ impl BundleStateWithReceipts {
///
/// The hashed post state.
pub fn hash_state_slow(&self) -> HashedPostState {
//let mut storages = BTreeMap::default();
let mut hashed_state = HashedPostState::default();

for (address, account) in self.bundle.state() {
let hashed_address = keccak256(address);
if let Some(account) = &account.info {
hashed_state.insert_account(hashed_address, into_reth_acc(account.clone()))
} else {
hashed_state.insert_cleared_account(hashed_address);
hashed_state.insert_destroyed_account(hashed_address);
}

// insert storage.
Expand All @@ -155,8 +154,8 @@ impl BundleStateWithReceipts {
hashed_state.sorted()
}

/// Returns [StateRoot] calculator.
fn state_root_calculator<'a, 'b, TX: DbTx>(
/// Returns [StateRoot] calculator based on database and in-memory state.
pub fn state_root_calculator<'a, 'b, TX: DbTx>(
&self,
tx: &'a TX,
hashed_post_state: &'b HashedPostState,
Expand All @@ -167,6 +166,7 @@ impl BundleStateWithReceipts {
.with_hashed_cursor_factory(hashed_cursor_factory)
.with_changed_account_prefixes(account_prefix_set)
.with_changed_storage_prefixes(storage_prefix_set)
.with_destroyed_accounts(hashed_post_state.destroyed_accounts())
}

/// Calculate the state root for this [BundleState].
Expand Down
128 changes: 128 additions & 0 deletions crates/storage/provider/src/bundle_state/hashed_state_changes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use reth_db::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
tables,
transaction::{DbTx, DbTxMut},
DatabaseError,
};
use reth_primitives::{Account, StorageEntry, B256, U256};
use reth_trie::hashed_cursor::HashedPostState;
use std::collections::BTreeMap;

/// Changes to the hashed state.
#[derive(Debug, Default)]
pub struct HashedStateChanges(pub HashedPostState);

impl HashedStateChanges {
/// Write the bundle state to the database.
pub fn write_to_db<TX: DbTxMut + DbTx>(self, tx: &TX) -> Result<(), DatabaseError> {
// Collect hashed account changes.
let mut hashed_accounts = BTreeMap::<B256, Option<Account>>::default();
for (hashed_address, account) in self.0.accounts() {
hashed_accounts.insert(hashed_address, account);
}

// Write hashed account updates.
let mut hashed_accounts_cursor = tx.cursor_write::<tables::HashedAccount>()?;
for (hashed_address, account) in hashed_accounts {
if let Some(account) = account {
hashed_accounts_cursor.upsert(hashed_address, account)?;
} else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
hashed_accounts_cursor.delete_current()?;
}
}

// Collect hashed storage changes.
let mut hashed_storages = BTreeMap::<B256, (bool, BTreeMap<B256, U256>)>::default();
for (hashed_address, storage) in self.0.storages() {
let entry = hashed_storages.entry(*hashed_address).or_default();
entry.0 |= storage.wiped();
for (hashed_slot, value) in storage.storage_slots() {
entry.1.insert(hashed_slot, value);
}
}

// Write hashed storage changes.
let mut hashed_storage_cursor = tx.cursor_dup_write::<tables::HashedStorage>()?;
for (hashed_address, (wiped, storage)) in hashed_storages {
if wiped && hashed_storage_cursor.seek_exact(hashed_address)?.is_some() {
hashed_storage_cursor.delete_current_duplicates()?;
}

for (hashed_slot, value) in storage {
let entry = StorageEntry { key: hashed_slot, value };
if let Some(db_entry) =
hashed_storage_cursor.seek_by_key_subkey(hashed_address, entry.key)?
{
if db_entry.key == entry.key {
hashed_storage_cursor.delete_current()?;
}
}

if entry.value != U256::ZERO {
hashed_storage_cursor.upsert(hashed_address, entry)?;
}
}
}

Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::create_test_provider_factory;
use reth_primitives::{keccak256, Address};
use reth_trie::hashed_cursor::HashedStorage;

#[test]
fn wiped_entries_are_removed() {
let provider_factory = create_test_provider_factory();

let addresses = (0..10).map(|_| Address::random()).collect::<Vec<_>>();
let destroyed_address = *addresses.first().unwrap();
let destroyed_address_hashed = keccak256(destroyed_address);
let slot = B256::with_last_byte(1);
let hashed_slot = keccak256(slot);
{
let provider_rw = provider_factory.provider_rw().unwrap();
let mut accounts_cursor =
provider_rw.tx_ref().cursor_write::<tables::HashedAccount>().unwrap();
let mut storage_cursor =
provider_rw.tx_ref().cursor_write::<tables::HashedStorage>().unwrap();

for address in addresses {
let hashed_address = keccak256(address);
accounts_cursor
.insert(hashed_address, Account { nonce: 1, ..Default::default() })
.unwrap();
storage_cursor
.insert(hashed_address, StorageEntry { key: hashed_slot, value: U256::from(1) })
.unwrap();
}
provider_rw.commit().unwrap();
}

let mut hashed_state = HashedPostState::default();
hashed_state.insert_destroyed_account(destroyed_address_hashed);
hashed_state.insert_hashed_storage(destroyed_address_hashed, HashedStorage::new(true));

let provider_rw = provider_factory.provider_rw().unwrap();
assert_eq!(HashedStateChanges(hashed_state).write_to_db(provider_rw.tx_ref()), Ok(()));
provider_rw.commit().unwrap();

let provider = provider_factory.provider().unwrap();
assert_eq!(
provider.tx_ref().get::<tables::HashedAccount>(destroyed_address_hashed),
Ok(None)
);
assert_eq!(
provider
.tx_ref()
.cursor_read::<tables::HashedStorage>()
.unwrap()
.seek_by_key_subkey(destroyed_address_hashed, hashed_slot),
Ok(None)
);
}
}
2 changes: 2 additions & 0 deletions crates/storage/provider/src/bundle_state/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
//! Bundle state module.
//! This module contains all the logic related to bundle state.
mod bundle_state_with_receipts;
mod hashed_state_changes;
mod state_changes;
mod state_reverts;

pub use bundle_state_with_receipts::{
AccountRevertInit, BundleStateInit, BundleStateWithReceipts, OriginalValuesKnown, RevertsInit,
};
pub use hashed_state_changes::HashedStateChanges;
pub use state_changes::StateChanges;
pub use state_reverts::StateReverts;
18 changes: 12 additions & 6 deletions crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
bundle_state::{BundleStateInit, BundleStateWithReceipts, RevertsInit},
bundle_state::{BundleStateInit, BundleStateWithReceipts, HashedStateChanges, RevertsInit},
providers::{database::metrics, SnapshotProvider},
traits::{
AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
Expand Down Expand Up @@ -43,7 +43,9 @@ use reth_primitives::{
TransactionMeta, TransactionSigned, TransactionSignedEcRecovered, TransactionSignedNoHash,
TxHash, TxNumber, Withdrawal, B256, U256,
};
use reth_trie::{prefix_set::PrefixSetMut, StateRoot};
use reth_trie::{
hashed_cursor::HashedPostState, prefix_set::PrefixSetMut, updates::TrieUpdates, StateRoot,
};
use revm::primitives::{BlockEnv, CfgEnv, SpecId};
use std::{
collections::{hash_map, BTreeMap, BTreeSet, HashMap, HashSet},
Expand Down Expand Up @@ -2287,10 +2289,12 @@ impl<TX: DbTxMut + DbTx> BlockWriter for DatabaseProvider<TX> {
Ok(block_indices)
}

fn append_blocks_with_bundle_state(
fn append_blocks_with_state(
&self,
blocks: Vec<SealedBlockWithSenders>,
state: BundleStateWithReceipts,
hashed_state: HashedPostState,
trie_updates: TrieUpdates,
prune_modes: Option<&PruneModes>,
) -> ProviderResult<()> {
if blocks.is_empty() {
Expand All @@ -2303,8 +2307,6 @@ impl<TX: DbTxMut + DbTx> BlockWriter for DatabaseProvider<TX> {

let last = blocks.last().unwrap();
let last_block_number = last.number;
let last_block_hash = last.hash();
let expected_state_root = last.state_root;

let mut durations_recorder = metrics::DurationsRecorder::default();

Expand All @@ -2320,7 +2322,11 @@ impl<TX: DbTxMut + DbTx> BlockWriter for DatabaseProvider<TX> {
state.write_to_db(self.tx_ref(), OriginalValuesKnown::No)?;
durations_recorder.record_relative(metrics::Action::InsertState);

self.insert_hashes(first_number..=last_block_number, last_block_hash, expected_state_root)?;
// insert hashes and intermediate merkle nodes
{
HashedStateChanges(hashed_state).write_to_db(&self.tx)?;
trie_updates.flush(&self.tx)?;
}
durations_recorder.record_relative(metrics::Action::InsertHashes);

self.update_history_indices(first_number..=last_block_number)?;
Expand Down
5 changes: 4 additions & 1 deletion crates/storage/provider/src/traits/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use reth_primitives::{
ChainSpec, Header, PruneModes, Receipt, SealedBlock, SealedBlockWithSenders, SealedHeader,
B256,
};
use reth_trie::{hashed_cursor::HashedPostState, updates::TrieUpdates};
use std::ops::RangeInclusive;

/// Enum to control transaction hash inclusion.
Expand Down Expand Up @@ -291,10 +292,12 @@ pub trait BlockWriter: Send + Sync {
/// # Returns
///
/// Returns `Ok(())` on success, or an error if any operation fails.
fn append_blocks_with_bundle_state(
fn append_blocks_with_state(
&self,
blocks: Vec<SealedBlockWithSenders>,
state: BundleStateWithReceipts,
hashed_state: HashedPostState,
trie_updates: TrieUpdates,
prune_modes: Option<&PruneModes>,
) -> ProviderResult<()>;
}
Loading

0 comments on commit 608f100

Please sign in to comment.