Skip to content

Commit

Permalink
core/state: blocking prefetcher on term signal, parallel updates (eth…
Browse files Browse the repository at this point in the history
…ereum#29519)

* core/state: trie prefetcher change: calling trie() doesn't stop the associated subfetcher

Co-authored-by: Martin HS <[email protected]>
Co-authored-by: Péter Szilágyi <[email protected]>

* core/state: improve prefetcher

* core/state: restore async prefetcher stask scheduling

* core/state: finish prefetching async and process storage updates async

* core/state: don't use the prefetcher for missing snapshot items

* core/state: remove update concurrency for Verkle tries

* core/state: add some termination checks to prefetcher async shutdowns

* core/state: differentiate db tries and prefetched tries

* core/state: teh teh teh

---------

Co-authored-by: Jared Wasinger <[email protected]>
Co-authored-by: Martin HS <[email protected]>
Co-authored-by: Gary Rong <[email protected]>
  • Loading branch information
4 people authored and stwiname committed Sep 9, 2024
1 parent 3770905 commit 89fc224
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 297 deletions.
8 changes: 6 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1805,8 +1805,12 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
}
statedb.SetLogger(bc.logger)

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain")
// If we are past Byzantium, enable prefetching to pull in trie node paths
// while processing transactions. Before Byzantium the prefetcher is mostly
// useless due to the intermediate root hashing after each transaction.
if bc.chainConfig.IsByzantium(block.Number()) {
statedb.StartPrefetcher("chain")
}
activeState = statedb

// If we have a followup block, run that against the current state to pre-cache
Expand Down
97 changes: 71 additions & 26 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io"
"maps"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -33,6 +34,14 @@ import (
"github.com/holiman/uint256"
)

// hasherPool holds a pool of hashers used by state objects during concurrent
// trie updates.
var hasherPool = sync.Pool{
New: func() interface{} {
return crypto.NewKeccakState()
},
}

type Storage map[common.Hash]common.Hash

func (s Storage) Copy() Storage {
Expand Down Expand Up @@ -118,27 +127,39 @@ func (s *stateObject) touch() {
}
}

// getTrie returns the associated storage trie. The trie will be opened
// if it's not loaded previously. An error will be returned if trie can't
// be loaded.
// getTrie returns the associated storage trie. The trie will be opened if it'
// not loaded previously. An error will be returned if trie can't be loaded.
//
// If a new trie is opened, it will be cached within the state object to allow
// subsequent reads to expand the same trie instead of reloading from disk.
func (s *stateObject) getTrie() (Trie, error) {
if s.trie == nil {
// Try fetching from prefetcher first
if s.data.Root != types.EmptyRootHash && s.db.prefetcher != nil {
// When the miner is creating the pending state, there is no prefetcher
s.trie = s.db.prefetcher.trie(s.addrHash, s.data.Root)
}
if s.trie == nil {
tr, err := s.db.db.OpenStorageTrie(s.db.originalRoot, s.address, s.data.Root, s.db.trie)
if err != nil {
return nil, err
}
s.trie = tr
tr, err := s.db.db.OpenStorageTrie(s.db.originalRoot, s.address, s.data.Root, s.db.trie)
if err != nil {
return nil, err
}
s.trie = tr
}
return s.trie, nil
}

// getPrefetchedTrie returns the associated trie, as populated by the prefetcher
// if it's available.
//
// Note, opposed to getTrie, this method will *NOT* blindly cache the resulting
// trie in the state object. The caller might want to do that, but it's cleaner
// to break the hidden interdependency between retrieving tries from the db or
// from the prefetcher.
func (s *stateObject) getPrefetchedTrie() (Trie, error) {
// If there's nothing to meaningfully return, let the user figure it out by
// pulling the trie from disk.
if s.data.Root == types.EmptyRootHash || s.db.prefetcher == nil {
return nil, nil
}
// Attempt to retrieve the trie from the pretecher
return s.db.prefetcher.trie(s.addrHash, s.data.Root)
}

// GetState retrieves a value from the account storage trie.
func (s *stateObject) GetState(key common.Hash) common.Hash {
value, _ := s.getState(key)
Expand Down Expand Up @@ -248,7 +269,7 @@ func (s *stateObject) setState(key common.Hash, value common.Hash, origin common

// finalise moves all dirty storage slots into the pending area to be hashed or
// committed later. It is invoked at the end of every transaction.
func (s *stateObject) finalise(prefetch bool) {
func (s *stateObject) finalise() {
slotsToPrefetch := make([][]byte, 0, len(s.dirtyStorage))
for key, value := range s.dirtyStorage {
// If the slot is different from its original value, move it into the
Expand All @@ -263,8 +284,10 @@ func (s *stateObject) finalise(prefetch bool) {
delete(s.pendingStorage, key)
}
}
if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != types.EmptyRootHash {
s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, slotsToPrefetch)
if s.db.prefetcher != nil && len(slotsToPrefetch) > 0 && s.data.Root != types.EmptyRootHash {
if err := s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, slotsToPrefetch); err != nil {
log.Error("Failed to prefetch slots", "addr", s.address, "slots", len(slotsToPrefetch), "err", err)
}
}
if len(s.dirtyStorage) > 0 {
s.dirtyStorage = make(Storage)
Expand All @@ -283,25 +306,43 @@ func (s *stateObject) finalise(prefetch bool) {
// storage change at all.
func (s *stateObject) updateTrie() (Trie, error) {
// Make sure all dirty slots are finalized into the pending storage area
s.finalise(false)
s.finalise()

// Short circuit if nothing changed, don't bother with hashing anything
if len(s.pendingStorage) == 0 {
return s.trie, nil
}
// Retrieve a pretecher populated trie, or fall back to the database
tr, err := s.getPrefetchedTrie()
switch {
case err != nil:
// Fetcher retrieval failed, something's very wrong, abort
s.db.setError(err)
return nil, err

case tr == nil:
// Fetcher not running or empty trie, fallback to the database trie
tr, err = s.getTrie()
if err != nil {
s.db.setError(err)
return nil, err
}

default:
// Prefetcher returned a live trie, swap it out for the current one
s.trie = tr
}
// The snapshot storage map for the object
var (
storage map[common.Hash][]byte
origin map[common.Hash][]byte
)
tr, err := s.getTrie()
if err != nil {
s.db.setError(err)
return nil, err
}
// Insert all the pending storage updates into the trie
usedStorage := make([][]byte, 0, len(s.pendingStorage))

hasher := hasherPool.Get().(crypto.KeccakState)
defer hasherPool.Put(hasher)

// Perform trie updates before deletions. This prevents resolution of unnecessary trie nodes
// in circumstances similar to the following:
//
Expand Down Expand Up @@ -330,26 +371,30 @@ func (s *stateObject) updateTrie() (Trie, error) {
s.db.setError(err)
return nil, err
}
s.db.StorageUpdated += 1
s.db.StorageUpdated.Add(1)
} else {
deletions = append(deletions, key)
}
// Cache the mutated storage slots until commit
if storage == nil {
s.db.storagesLock.Lock()
if storage = s.db.storages[s.addrHash]; storage == nil {
storage = make(map[common.Hash][]byte)
s.db.storages[s.addrHash] = storage
}
s.db.storagesLock.Unlock()
}
khash := crypto.HashData(s.db.hasher, key[:])
khash := crypto.HashData(hasher, key[:])
storage[khash] = encoded // encoded will be nil if it's deleted

// Cache the original value of mutated storage slots
if origin == nil {
s.db.storagesLock.Lock()
if origin = s.db.storagesOrigin[s.address]; origin == nil {
origin = make(map[common.Hash][]byte)
s.db.storagesOrigin[s.address] = origin
}
s.db.storagesLock.Unlock()
}
// Track the original value of slot only if it's mutated first time
if _, ok := origin[khash]; !ok {
Expand All @@ -369,7 +414,7 @@ func (s *stateObject) updateTrie() (Trie, error) {
s.db.setError(err)
return nil, err
}
s.db.StorageDeleted += 1
s.db.StorageDeleted.Add(1)
}
// If no slots were touched, issue a warning as we shouldn't have done all
// the above work in the first place
Expand Down
Loading

0 comments on commit 89fc224

Please sign in to comment.