Skip to content

Commit

Permalink
trie: remove inconsistent trie nodes during sync in path mode (#28595)
Browse files Browse the repository at this point in the history
This fixes a database corruption issue that could occur during state healing.
When sync is aborted while certain modifications were already committed, and a reorg occurs, the database would contain incorrect trie nodes stored by path.
These nodes need to detected/deleted in order to obtain a complete and fully correct state after state healing.

---------

Co-authored-by: Felix Lange <[email protected]>
  • Loading branch information
2 people authored and Francesco4203 committed Nov 5, 2024
1 parent 68cbdb3 commit af060c3
Show file tree
Hide file tree
Showing 3 changed files with 257 additions and 69 deletions.
6 changes: 5 additions & 1 deletion ethdb/dbtest/testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,13 @@ func TestDatabaseSuite(t *testing.T, New func() ethdb.KeyValueStore) {
b.Put([]byte("5"), nil)
b.Delete([]byte("1"))
b.Put([]byte("6"), nil)
b.Delete([]byte("3"))

b.Delete([]byte("3")) // delete then put
b.Put([]byte("3"), nil)

b.Put([]byte("7"), nil) // put then delete
b.Delete([]byte("7"))

if err := b.Write(); err != nil {
t.Fatal(err)
}
Expand Down
195 changes: 130 additions & 65 deletions trie/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package trie
import (
"errors"
"fmt"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/prque"
Expand Down Expand Up @@ -98,10 +99,9 @@ func NewSyncPath(path []byte) SyncPath {

// nodeRequest represents a scheduled or already in-flight trie node retrieval request.
type nodeRequest struct {
hash common.Hash // Hash of the trie node to retrieve
path []byte // Merkle path leading to this node for prioritization
data []byte // Data content of the node, cached until all subtrees complete
deletes [][]byte // List of internal path segments for trie nodes to delete
hash common.Hash // Hash of the trie node to retrieve
path []byte // Merkle path leading to this node for prioritization
data []byte // Data content of the node, cached until all subtrees complete

parent *nodeRequest // Parent state node referencing this entry
deps int // Number of dependencies before allowed to commit this node
Expand All @@ -128,37 +128,70 @@ type CodeSyncResult struct {
Data []byte // Data content of the retrieved bytecode
}

// nodeOp represents an operation upon the trie node. It can either represent a
// deletion to the specific node or a node write for persisting retrieved node.
type nodeOp struct {
owner common.Hash // identifier of the trie (empty for account trie)
path []byte // path from the root to the specified node.
blob []byte // the content of the node (nil for deletion)
hash common.Hash // hash of the node content (empty for node deletion)
}

// isDelete indicates if the operation is a database deletion.
func (op *nodeOp) isDelete() bool {
return len(op.blob) == 0
}

// syncMemBatch is an in-memory buffer of successfully downloaded but not yet
// persisted data items.
type syncMemBatch struct {
nodes map[string][]byte // In-memory membatch of recently completed nodes
hashes map[string]common.Hash // Hashes of recently completed nodes
deletes map[string]struct{} // List of paths for trie node to delete
codes map[common.Hash][]byte // In-memory membatch of recently completed codes
scheme string // State scheme identifier
nodes []nodeOp // In-memory batch of recently completed/deleted nodes
hashes map[string]common.Hash // Hashes of recently completed nodes
codes map[common.Hash][]byte // In-memory membatch of recently completed codes
}

// newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes.
func newSyncMemBatch() *syncMemBatch {
func newSyncMemBatch(scheme string) *syncMemBatch {
return &syncMemBatch{
nodes: make(map[string][]byte),
hashes: make(map[string]common.Hash),
deletes: make(map[string]struct{}),
codes: make(map[common.Hash][]byte),
scheme: scheme,
codes: make(map[common.Hash][]byte),
}
}

// hasNode reports the trie node with specific path is already cached.
func (batch *syncMemBatch) hasNode(path []byte) bool {
_, ok := batch.nodes[string(path)]
return ok
}

// hasCode reports the contract code with specific hash is already cached.
func (batch *syncMemBatch) hasCode(hash common.Hash) bool {
_, ok := batch.codes[hash]
return ok
}

// addCode caches a contract code database write operation.
func (batch *syncMemBatch) addCode(hash common.Hash, code []byte) {
batch.codes[hash] = code
}

// addNode caches a node database write operation.
func (batch *syncMemBatch) addNode(owner common.Hash, path []byte, blob []byte, hash common.Hash) {
batch.nodes = append(batch.nodes, nodeOp{
owner: owner,
path: path,
blob: blob,
hash: hash,
})
}

// delNode caches a node database delete operation.
func (batch *syncMemBatch) delNode(owner common.Hash, path []byte) {
if batch.scheme != rawdb.PathScheme {
log.Error("Unexpected node deletion", "owner", owner, "path", path, "scheme", batch.scheme)
return // deletion is not supported in hash mode.
}
batch.nodes = append(batch.nodes, nodeOp{
owner: owner,
path: path,
})
}

// Sync is the main state trie synchronisation scheduler, which provides yet
// unknown trie hashes to retrieve, accepts node data associated with said hashes
// and reconstructs the trie step by step until all is done.
Expand Down Expand Up @@ -194,7 +227,7 @@ func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallb
ts := &Sync{
scheme: scheme,
database: database,
membatch: newSyncMemBatch(),
membatch: newSyncMemBatch(scheme),
nodeReqs: make(map[string]*nodeRequest),
codeReqs: make(map[common.Hash]*codeRequest),
queue: prque.New(nil),
Expand All @@ -213,16 +246,18 @@ func (s *Sync) AddSubTrie(root common.Hash, path []byte, parent common.Hash, par
if root == emptyRoot {
return
}
if s.membatch.hasNode(path) {
return
}
if s.bloom == nil || s.bloom.Contains(root[:]) {
// Bloom filter says this might be a duplicate, double check.
// If database says yes, then at least the trie node is present
// and we hold the assumption that it's NOT legacy contract code.
owner, inner := ResolvePath(path)
if rawdb.HasTrieNode(s.database, owner, inner, root, s.scheme) {
exist, inconsistent := s.hasNode(owner, inner, root)
if exist {
// The entire subtrie is already present in the database.
return
} else if inconsistent {
// There is a pre-existing node with the wrong hash in DB, remove it.
s.membatch.delNode(owner, inner)
}
// False positive, bump fault meter
bloomFaultMeter.Mark(1)
Expand Down Expand Up @@ -382,36 +417,39 @@ func (s *Sync) ProcessNode(result NodeSyncResult) error {
}

// Commit flushes the data stored in the internal membatch out to persistent
// storage, returning any occurred error.
// storage, returning any occurred error. The whole data set will be flushed
// in an atomic database batch.
func (s *Sync) Commit(dbw ethdb.Batch) error {
// Flush the pending node writes into database batch.
var (
account int
storage int
)
for path, value := range s.membatch.nodes {
owner, inner := ResolvePath([]byte(path))
if owner == (common.Hash{}) {
account += 1
// Flush the pending node writes into database batch.
for _, op := range s.membatch.nodes {
if op.isDelete() {
// node deletion is only supported in path mode.
if op.owner == (common.Hash{}) {
rawdb.DeleteAccountTrieNode(dbw, op.path)
} else {
rawdb.DeleteStorageTrieNode(dbw, op.owner, op.path)
}
deletionGauge.Inc(1)
} else {
storage += 1
if op.owner == (common.Hash{}) {
account += 1
} else {
storage += 1
}
rawdb.WriteTrieNode(dbw, op.owner, op.path, op.hash, op.blob, s.scheme)
}
rawdb.WriteTrieNode(dbw, owner, inner, s.membatch.hashes[path], value, s.scheme)
hash := s.membatch.hashes[path]
hash := op.hash
if s.bloom != nil {
s.bloom.Add(hash[:])
}
}
accountNodeSyncedGauge.Inc(int64(account))
storageNodeSyncedGauge.Inc(int64(storage))

// Flush the pending node deletes into the database batch.
// Please note that each written and deleted node has a
// unique path, ensuring no duplication occurs.
for path := range s.membatch.deletes {
owner, inner := ResolvePath([]byte(path))
rawdb.DeleteTrieNode(dbw, owner, inner, common.Hash{} /* unused */, s.scheme)
}
// Flush the pending code writes into database batch.
for hash, value := range s.membatch.codes {
rawdb.WriteCode(dbw, hash, value)
Expand All @@ -421,7 +459,7 @@ func (s *Sync) Commit(dbw ethdb.Batch) error {
}
codeSyncedGauge.Inc(int64(len(s.membatch.codes)))

s.membatch = newSyncMemBatch() // reset the batch
s.membatch = newSyncMemBatch(s.scheme) // reset the batch
return nil
}

Expand Down Expand Up @@ -489,12 +527,15 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
// child as invalid. This is essential in the case of path mode
// scheme; otherwise, state healing might overwrite existing child
// nodes silently while leaving a dangling parent node within the
// range of this internal path on disk. This would break the
// guarantee for state healing.
// range of this internal path on disk and the persistent state
// ends up with a very weird situation that nodes on the same path
// are not inconsistent while they all present in disk. This property
// would break the guarantee for state healing.
//
// While it's possible for this shortNode to overwrite a previously
// existing full node, the other branches of the fullNode can be
// retained as they remain untouched and complete.
// retained as they are not accessible with the new shortNode, and
// also the whole sub-trie is still untouched and complete.
//
// This step is only necessary for path mode, as there is no deletion
// in hash mode at all.
Expand All @@ -511,8 +552,7 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
exists = rawdb.ExistsStorageTrieNode(s.database, owner, append(inner, key[:i]...))
}
if exists {
req.deletes = append(req.deletes, key[:i])
deletionGauge.Inc(1)
s.membatch.delNode(owner, append(inner, key[:i]...))
log.Debug("Detected dangling node", "owner", owner, "path", append(inner, key[:i]...))
}
}
Expand All @@ -532,6 +572,7 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
}
// Iterate over the children, and request all unknown ones
requests := make([]*nodeRequest, 0, len(children))
var batchMu sync.Mutex
for _, child := range children {
// Notify any external watcher of a new key/value node
if req.callback != nil {
Expand All @@ -548,29 +589,33 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
}
}
}
// If the child references another node, resolve or schedule
// If the child references another node, resolve or schedule.
// We check all children concurrently.
if node, ok := (child.node).(hashNode); ok {
// Try to resolve the node from the local database
if s.membatch.hasNode(child.path) {
continue
}
chash := common.BytesToHash(node)
path := child.path
hash := common.BytesToHash(node)
if s.bloom == nil || s.bloom.Contains(node) {
// Bloom filter says this might be a duplicate, double check.
// If database says yes, then at least the trie node is present
// and we hold the assumption that it's NOT legacy contract code.
owner, inner := ResolvePath(child.path)
if rawdb.HasTrieNode(s.database, owner, inner, chash, s.scheme) {
continue
owner, inner := ResolvePath(path)
exist, inconsistent := s.hasNode(owner, inner, hash)
if exist {

} else if inconsistent {
// There is a pre-existing node with the wrong hash in DB, remove it.
batchMu.Lock()
s.membatch.delNode(owner, inner)
batchMu.Unlock()
}

// False positive, bump fault meter
bloomFaultMeter.Mark(1)
}
// Locally unknown node, schedule for retrieval
requests = append(requests, &nodeRequest{
path: child.path,
hash: chash,
path: path,
hash: hash,
parent: req,
callback: req.callback,
})
Expand All @@ -584,14 +629,10 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
// committed themselves.
func (s *Sync) commitNodeRequest(req *nodeRequest) error {
// Write the node content to the membatch
s.membatch.nodes[string(req.path)] = req.data
s.membatch.hashes[string(req.path)] = req.hash
owner, path := ResolvePath(req.path)
s.membatch.addNode(owner, path, req.data, req.hash)

// Delete the internal nodes which are marked as invalid
for _, segment := range req.deletes {
path := append(req.path, segment...)
s.membatch.deletes[string(path)] = struct{}{}
}
// Removed the completed node request
delete(s.nodeReqs, string(req.path))
s.fetches[len(req.path)]--

Expand All @@ -612,7 +653,9 @@ func (s *Sync) commitNodeRequest(req *nodeRequest) error {
// committed themselves.
func (s *Sync) commitCodeRequest(req *codeRequest) error {
// Write the node content to the membatch
s.membatch.codes[req.hash] = req.data
s.membatch.addCode(req.hash, req.data)

// Removed the completed code request
delete(s.codeReqs, req.hash)
s.fetches[len(req.path)]--

Expand All @@ -628,6 +671,28 @@ func (s *Sync) commitCodeRequest(req *codeRequest) error {
return nil
}

// hasNode reports whether the specified trie node is present in the database.
// 'exists' is true when the node exists in the database and matches the given root
// hash. The 'inconsistent' return value is true when the node exists but does not
// match the expected hash.
func (s *Sync) hasNode(owner common.Hash, path []byte, hash common.Hash) (exists bool, inconsistent bool) {
// If node is running with hash scheme, check the presence with node hash.
if s.scheme == rawdb.HashScheme {
return rawdb.HasLegacyTrieNode(s.database, hash), false
}
// If node is running with path scheme, check the presence with node path.
var blob []byte
var dbHash common.Hash
if owner == (common.Hash{}) {
blob, dbHash = rawdb.ReadAccountTrieNode(s.database, path)
} else {
blob, dbHash = rawdb.ReadStorageTrieNode(s.database, owner, path)
}
exists = hash == dbHash
inconsistent = !exists && len(blob) != 0
return exists, inconsistent
}

// ResolvePath resolves the provided composite node path by separating the
// path in account trie if it's existent.
func ResolvePath(path []byte) (common.Hash, []byte) {
Expand Down
Loading

0 comments on commit af060c3

Please sign in to comment.