Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/state: parallelise parts of state commit #29681

Merged
merged 4 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1963,7 +1963,7 @@ func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, s
snapshotCommitTimer.Update(statedb.SnapshotCommits) // Snapshot commits are complete, we can mark them
triedbCommitTimer.Update(statedb.TrieDBCommits) // Trie database commits are complete, we can mark them

blockWriteTimer.Update(time.Since(wstart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits - statedb.TrieDBCommits)
blockWriteTimer.Update(time.Since(wstart) - max(statedb.AccountCommits, statedb.StorageCommits) /* concurrent */ - statedb.SnapshotCommits - statedb.TrieDBCommits)
blockInsertTimer.UpdateSince(start)

return &blockProcessingResult{usedGas: usedGas, procTime: proctime, status: status}, nil
Expand Down
3 changes: 3 additions & 0 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,9 @@ func (s *stateObject) updateRoot() {
// commit obtains a set of dirty storage trie nodes and updates the account data.
// The returned set can be nil if nothing to commit. This function assumes all
// storage mutations have already been flushed into trie by updateRoot.
//
// Note, commit may run concurrently across all the state objects. Do not assume
// thread-safe access to the statedb.
func (s *stateObject) commit() (*trienode.NodeSet, error) {
// Short circuit if trie is not even loaded, don't bother with committing anything
if s.trie == nil {
Expand Down
126 changes: 85 additions & 41 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math/big"
"slices"
"sort"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/ethereum/go-ethereum/trie/trienode"
"github.com/ethereum/go-ethereum/trie/triestate"
"github.com/holiman/uint256"
"golang.org/x/sync/errgroup"
)

type revision struct {
Expand Down Expand Up @@ -1146,66 +1148,108 @@ func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, er
storageTrieNodesUpdated int
storageTrieNodesDeleted int
nodes = trienode.NewMergedNodeSet()
codeWriter = s.db.DiskDB().NewBatch()
)
// Handle all state deletions first
if err := s.handleDestruction(nodes); err != nil {
return common.Hash{}, err
}
// Handle all state updates afterwards
// Handle all state updates afterwards, concurrently to one another to shave
// off some milliseconds from the commit operation. Also accumulate the code
// writes to run in parallel with the computations.
start := time.Now()
var (
code = s.db.DiskDB().NewBatch()
lock sync.Mutex
root common.Hash
workers errgroup.Group
)
// Schedule the account trie first since that will be the biggest, so give
// it the most time to crunch.
//
// TODO(karalabe): This account trie commit is *very* heavy. 5-6ms at chain
// heads, which seems excessive given that it doesn't do hashing, it just
// shuffles some data. For comparison, the *hashing* at chain head is 2-3ms.
// We need to investigate what's happening as it seems something's wonky.
// Obviously it's not an end of the world issue, just something the original
// code didn't anticipate for.
workers.Go(func() error {
// Write the account trie changes, measuring the amount of wasted time
newroot, set, err := s.trie.Commit(true)
if err != nil {
return err
}
root = newroot

// Merge the dirty nodes of account trie into global set
lock.Lock()
defer lock.Unlock()

if set != nil {
if err = nodes.Merge(set); err != nil {
return err
}
accountTrieNodesUpdated, accountTrieNodesDeleted = set.Size()
}
s.AccountCommits = time.Since(start)
karalabe marked this conversation as resolved.
Show resolved Hide resolved
return nil
})
// Schedule each of the storage tries that need to be updated, so they can
// run concurrently to one another.
//
// TODO(karalabe): Experimentally, the account commit takes approximately the
// same time as all the storage commits combined, so we could maybe only have
// 2 threads in total. But that kind of depends on the account commit being
// more expensive than it should be, so let's fix that and revisit this todo.
for addr, op := range s.mutations {
if op.isDelete() {
continue
}
obj := s.stateObjects[addr]

// Write any contract code associated with the state object
obj := s.stateObjects[addr]
if obj.code != nil && obj.dirtyCode {
rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code)
rawdb.WriteCode(code, common.BytesToHash(obj.CodeHash()), obj.code)
obj.dirtyCode = false
}
// Write any storage changes in the state object to its storage trie
set, err := obj.commit()
if err != nil {
return common.Hash{}, err
}
// Merge the dirty nodes of storage trie into global set. It is possible
// that the account was destructed and then resurrected in the same block.
// In this case, the node set is shared by both accounts.
if set != nil {
if err := nodes.Merge(set); err != nil {
return common.Hash{}, err
// Run the storage updates concurrently to one another
workers.Go(func() error {
// Write any storage changes in the state object to its storage trie
set, err := obj.commit()
if err != nil {
return err
}
updates, deleted := set.Size()
storageTrieNodesUpdated += updates
storageTrieNodesDeleted += deleted
}
// Merge the dirty nodes of storage trie into global set. It is possible
// that the account was destructed and then resurrected in the same block.
// In this case, the node set is shared by both accounts.
lock.Lock()
defer lock.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a quick note, this locking scheme here will definitely affect performance. You're spawning concurrent work but it'll all have to synchronize after. It would be better to pre-allocate a slice of suitable size to track the output sets (or even have it as a field in StateObject), then perform the merge operation and counter updates after all commits are done.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I were to wait until all results are in and then merge, then the merge wound't proceed concurrnetly with the disk reads. IMO that would be worse.

I could make a channel and stream the results out and have an outer goroutine listen and merge one by one, but that would just end up being the same as locking and allowing access one by one via the mutex.

Don't really see why it would be different.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The channel way would also work, and the channel could even be buffered. Writes on buffered channels are fast even under contention AFAIK, so could be an idea.


if set != nil {
if err = nodes.Merge(set); err != nil {
return err
}
updates, deleted := set.Size()
storageTrieNodesUpdated += updates
storageTrieNodesDeleted += deleted
}
s.StorageCommits = time.Since(start) // overwrite with the longest storage commit runtime
return nil
})
}
s.StorageCommits += time.Since(start)

if codeWriter.ValueSize() > 0 {
if err := codeWriter.Write(); err != nil {
log.Crit("Failed to commit dirty codes", "error", err)
// Schedule the code commits to run concurrently too. This shouldn't really
// take much since we don't often commit code, but since it's disk access,
// it's always yolo.
workers.Go(func() error {
if code.ValueSize() > 0 {
if err := code.Write(); err != nil {
log.Crit("Failed to commit dirty codes", "error", err)
}
}
}
// Write the account trie changes, measuring the amount of wasted time
start = time.Now()

root, set, err := s.trie.Commit(true)
if err != nil {
return nil
})
// Wait for everything to finish and update the metrics
if err := workers.Wait(); err != nil {
return common.Hash{}, err
}
// Merge the dirty nodes of account trie into global set
if set != nil {
if err := nodes.Merge(set); err != nil {
return common.Hash{}, err
}
accountTrieNodesUpdated, accountTrieNodesDeleted = set.Size()
}
// Report the commit metrics
s.AccountCommits += time.Since(start)

accountUpdatedMeter.Mark(int64(s.AccountUpdated))
storageUpdatedMeter.Mark(int64(s.StorageUpdated))
accountDeletedMeter.Mark(int64(s.AccountDeleted))
Expand Down
Loading