diff --git a/ethdb/dbtest/testsuite.go b/ethdb/dbtest/testsuite.go index a2b7003c27..30cef82bec 100644 --- a/ethdb/dbtest/testsuite.go +++ b/ethdb/dbtest/testsuite.go @@ -272,9 +272,13 @@ func TestDatabaseSuite(t *testing.T, New func() ethdb.KeyValueStore) { b.Put([]byte("5"), nil) b.Delete([]byte("1")) b.Put([]byte("6"), nil) - b.Delete([]byte("3")) + + b.Delete([]byte("3")) // delete then put b.Put([]byte("3"), nil) + b.Put([]byte("7"), nil) // put then delete + b.Delete([]byte("7")) + if err := b.Write(); err != nil { t.Fatal(err) } diff --git a/trie/sync.go b/trie/sync.go index 195395f737..084a154840 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -19,6 +19,7 @@ package trie import ( "errors" "fmt" + "sync" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/prque" @@ -98,10 +99,9 @@ func NewSyncPath(path []byte) SyncPath { // nodeRequest represents a scheduled or already in-flight trie node retrieval request. type nodeRequest struct { - hash common.Hash // Hash of the trie node to retrieve - path []byte // Merkle path leading to this node for prioritization - data []byte // Data content of the node, cached until all subtrees complete - deletes [][]byte // List of internal path segments for trie nodes to delete + hash common.Hash // Hash of the trie node to retrieve + path []byte // Merkle path leading to this node for prioritization + data []byte // Data content of the node, cached until all subtrees complete parent *nodeRequest // Parent state node referencing this entry deps int // Number of dependencies before allowed to commit this node @@ -128,37 +128,70 @@ type CodeSyncResult struct { Data []byte // Data content of the retrieved bytecode } +// nodeOp represents an operation upon the trie node. It can either represent a +// deletion to the specific node or a node write for persisting retrieved node. +type nodeOp struct { + owner common.Hash // identifier of the trie (empty for account trie) + path []byte // path from the root to the specified node. + blob []byte // the content of the node (nil for deletion) + hash common.Hash // hash of the node content (empty for node deletion) +} + +// isDelete indicates if the operation is a database deletion. +func (op *nodeOp) isDelete() bool { + return len(op.blob) == 0 +} + // syncMemBatch is an in-memory buffer of successfully downloaded but not yet // persisted data items. type syncMemBatch struct { - nodes map[string][]byte // In-memory membatch of recently completed nodes - hashes map[string]common.Hash // Hashes of recently completed nodes - deletes map[string]struct{} // List of paths for trie node to delete - codes map[common.Hash][]byte // In-memory membatch of recently completed codes + scheme string // State scheme identifier + nodes []nodeOp // In-memory batch of recently completed/deleted nodes + hashes map[string]common.Hash // Hashes of recently completed nodes + codes map[common.Hash][]byte // In-memory membatch of recently completed codes } // newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes. -func newSyncMemBatch() *syncMemBatch { +func newSyncMemBatch(scheme string) *syncMemBatch { return &syncMemBatch{ - nodes: make(map[string][]byte), - hashes: make(map[string]common.Hash), - deletes: make(map[string]struct{}), - codes: make(map[common.Hash][]byte), + scheme: scheme, + codes: make(map[common.Hash][]byte), } } -// hasNode reports the trie node with specific path is already cached. -func (batch *syncMemBatch) hasNode(path []byte) bool { - _, ok := batch.nodes[string(path)] - return ok -} - // hasCode reports the contract code with specific hash is already cached. func (batch *syncMemBatch) hasCode(hash common.Hash) bool { _, ok := batch.codes[hash] return ok } +// addCode caches a contract code database write operation. +func (batch *syncMemBatch) addCode(hash common.Hash, code []byte) { + batch.codes[hash] = code +} + +// addNode caches a node database write operation. +func (batch *syncMemBatch) addNode(owner common.Hash, path []byte, blob []byte, hash common.Hash) { + batch.nodes = append(batch.nodes, nodeOp{ + owner: owner, + path: path, + blob: blob, + hash: hash, + }) +} + +// delNode caches a node database delete operation. +func (batch *syncMemBatch) delNode(owner common.Hash, path []byte) { + if batch.scheme != rawdb.PathScheme { + log.Error("Unexpected node deletion", "owner", owner, "path", path, "scheme", batch.scheme) + return // deletion is not supported in hash mode. + } + batch.nodes = append(batch.nodes, nodeOp{ + owner: owner, + path: path, + }) +} + // Sync is the main state trie synchronisation scheduler, which provides yet // unknown trie hashes to retrieve, accepts node data associated with said hashes // and reconstructs the trie step by step until all is done. @@ -194,7 +227,7 @@ func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallb ts := &Sync{ scheme: scheme, database: database, - membatch: newSyncMemBatch(), + membatch: newSyncMemBatch(scheme), nodeReqs: make(map[string]*nodeRequest), codeReqs: make(map[common.Hash]*codeRequest), queue: prque.New(nil), @@ -213,16 +246,18 @@ func (s *Sync) AddSubTrie(root common.Hash, path []byte, parent common.Hash, par if root == emptyRoot { return } - if s.membatch.hasNode(path) { - return - } if s.bloom == nil || s.bloom.Contains(root[:]) { // Bloom filter says this might be a duplicate, double check. // If database says yes, then at least the trie node is present // and we hold the assumption that it's NOT legacy contract code. owner, inner := ResolvePath(path) - if rawdb.HasTrieNode(s.database, owner, inner, root, s.scheme) { + exist, inconsistent := s.hasNode(owner, inner, root) + if exist { + // The entire subtrie is already present in the database. return + } else if inconsistent { + // There is a pre-existing node with the wrong hash in DB, remove it. + s.membatch.delNode(owner, inner) } // False positive, bump fault meter bloomFaultMeter.Mark(1) @@ -382,22 +417,32 @@ func (s *Sync) ProcessNode(result NodeSyncResult) error { } // Commit flushes the data stored in the internal membatch out to persistent -// storage, returning any occurred error. +// storage, returning any occurred error. The whole data set will be flushed +// in an atomic database batch. func (s *Sync) Commit(dbw ethdb.Batch) error { - // Flush the pending node writes into database batch. var ( account int storage int ) - for path, value := range s.membatch.nodes { - owner, inner := ResolvePath([]byte(path)) - if owner == (common.Hash{}) { - account += 1 + // Flush the pending node writes into database batch. + for _, op := range s.membatch.nodes { + if op.isDelete() { + // node deletion is only supported in path mode. + if op.owner == (common.Hash{}) { + rawdb.DeleteAccountTrieNode(dbw, op.path) + } else { + rawdb.DeleteStorageTrieNode(dbw, op.owner, op.path) + } + deletionGauge.Inc(1) } else { - storage += 1 + if op.owner == (common.Hash{}) { + account += 1 + } else { + storage += 1 + } + rawdb.WriteTrieNode(dbw, op.owner, op.path, op.hash, op.blob, s.scheme) } - rawdb.WriteTrieNode(dbw, owner, inner, s.membatch.hashes[path], value, s.scheme) - hash := s.membatch.hashes[path] + hash := op.hash if s.bloom != nil { s.bloom.Add(hash[:]) } @@ -405,13 +450,6 @@ func (s *Sync) Commit(dbw ethdb.Batch) error { accountNodeSyncedGauge.Inc(int64(account)) storageNodeSyncedGauge.Inc(int64(storage)) - // Flush the pending node deletes into the database batch. - // Please note that each written and deleted node has a - // unique path, ensuring no duplication occurs. - for path := range s.membatch.deletes { - owner, inner := ResolvePath([]byte(path)) - rawdb.DeleteTrieNode(dbw, owner, inner, common.Hash{} /* unused */, s.scheme) - } // Flush the pending code writes into database batch. for hash, value := range s.membatch.codes { rawdb.WriteCode(dbw, hash, value) @@ -421,7 +459,7 @@ func (s *Sync) Commit(dbw ethdb.Batch) error { } codeSyncedGauge.Inc(int64(len(s.membatch.codes))) - s.membatch = newSyncMemBatch() // reset the batch + s.membatch = newSyncMemBatch(s.scheme) // reset the batch return nil } @@ -489,12 +527,15 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) { // child as invalid. This is essential in the case of path mode // scheme; otherwise, state healing might overwrite existing child // nodes silently while leaving a dangling parent node within the - // range of this internal path on disk. This would break the - // guarantee for state healing. + // range of this internal path on disk and the persistent state + // ends up with a very weird situation that nodes on the same path + // are not inconsistent while they all present in disk. This property + // would break the guarantee for state healing. // // While it's possible for this shortNode to overwrite a previously // existing full node, the other branches of the fullNode can be - // retained as they remain untouched and complete. + // retained as they are not accessible with the new shortNode, and + // also the whole sub-trie is still untouched and complete. // // This step is only necessary for path mode, as there is no deletion // in hash mode at all. @@ -511,8 +552,7 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) { exists = rawdb.ExistsStorageTrieNode(s.database, owner, append(inner, key[:i]...)) } if exists { - req.deletes = append(req.deletes, key[:i]) - deletionGauge.Inc(1) + s.membatch.delNode(owner, append(inner, key[:i]...)) log.Debug("Detected dangling node", "owner", owner, "path", append(inner, key[:i]...)) } } @@ -532,6 +572,7 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) { } // Iterate over the children, and request all unknown ones requests := make([]*nodeRequest, 0, len(children)) + var batchMu sync.Mutex for _, child := range children { // Notify any external watcher of a new key/value node if req.callback != nil { @@ -548,20 +589,24 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) { } } } - // If the child references another node, resolve or schedule + // If the child references another node, resolve or schedule. + // We check all children concurrently. if node, ok := (child.node).(hashNode); ok { - // Try to resolve the node from the local database - if s.membatch.hasNode(child.path) { - continue - } - chash := common.BytesToHash(node) + path := child.path + hash := common.BytesToHash(node) if s.bloom == nil || s.bloom.Contains(node) { // Bloom filter says this might be a duplicate, double check. // If database says yes, then at least the trie node is present // and we hold the assumption that it's NOT legacy contract code. - owner, inner := ResolvePath(child.path) - if rawdb.HasTrieNode(s.database, owner, inner, chash, s.scheme) { - continue + owner, inner := ResolvePath(path) + exist, inconsistent := s.hasNode(owner, inner, hash) + if exist { + + } else if inconsistent { + // There is a pre-existing node with the wrong hash in DB, remove it. + batchMu.Lock() + s.membatch.delNode(owner, inner) + batchMu.Unlock() } // False positive, bump fault meter @@ -569,8 +614,8 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) { } // Locally unknown node, schedule for retrieval requests = append(requests, &nodeRequest{ - path: child.path, - hash: chash, + path: path, + hash: hash, parent: req, callback: req.callback, }) @@ -584,14 +629,10 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) { // committed themselves. func (s *Sync) commitNodeRequest(req *nodeRequest) error { // Write the node content to the membatch - s.membatch.nodes[string(req.path)] = req.data - s.membatch.hashes[string(req.path)] = req.hash + owner, path := ResolvePath(req.path) + s.membatch.addNode(owner, path, req.data, req.hash) - // Delete the internal nodes which are marked as invalid - for _, segment := range req.deletes { - path := append(req.path, segment...) - s.membatch.deletes[string(path)] = struct{}{} - } + // Removed the completed node request delete(s.nodeReqs, string(req.path)) s.fetches[len(req.path)]-- @@ -612,7 +653,9 @@ func (s *Sync) commitNodeRequest(req *nodeRequest) error { // committed themselves. func (s *Sync) commitCodeRequest(req *codeRequest) error { // Write the node content to the membatch - s.membatch.codes[req.hash] = req.data + s.membatch.addCode(req.hash, req.data) + + // Removed the completed code request delete(s.codeReqs, req.hash) s.fetches[len(req.path)]-- @@ -628,6 +671,28 @@ func (s *Sync) commitCodeRequest(req *codeRequest) error { return nil } +// hasNode reports whether the specified trie node is present in the database. +// 'exists' is true when the node exists in the database and matches the given root +// hash. The 'inconsistent' return value is true when the node exists but does not +// match the expected hash. +func (s *Sync) hasNode(owner common.Hash, path []byte, hash common.Hash) (exists bool, inconsistent bool) { + // If node is running with hash scheme, check the presence with node hash. + if s.scheme == rawdb.HashScheme { + return rawdb.HasLegacyTrieNode(s.database, hash), false + } + // If node is running with path scheme, check the presence with node path. + var blob []byte + var dbHash common.Hash + if owner == (common.Hash{}) { + blob, dbHash = rawdb.ReadAccountTrieNode(s.database, path) + } else { + blob, dbHash = rawdb.ReadStorageTrieNode(s.database, owner, path) + } + exists = hash == dbHash + inconsistent = !exists && len(blob) != 0 + return exists, inconsistent +} + // ResolvePath resolves the provided composite node path by separating the // path in account trie if it's existent. func ResolvePath(path []byte) (common.Hash, []byte) { diff --git a/trie/sync_test.go b/trie/sync_test.go index a07dbccd87..75eb5809a2 100644 --- a/trie/sync_test.go +++ b/trie/sync_test.go @@ -689,8 +689,11 @@ func testSyncOrdering(t *testing.T, scheme string) { } } } - func syncWith(t *testing.T, root common.Hash, db ethdb.Database, srcDb *Database) { + syncWithHookWriter(t, root, db, srcDb, nil) +} + +func syncWithHookWriter(t *testing.T, root common.Hash, db ethdb.Database, srcDb *Database, hookWriter ethdb.KeyValueWriter) { // Create a destination trie and sync with the scheduler sched := NewSync(root, db, nil, NewSyncBloom(1, db), srcDb.Scheme()) @@ -728,8 +731,11 @@ func syncWith(t *testing.T, root common.Hash, db ethdb.Database, srcDb *Database if err := sched.Commit(batch); err != nil { t.Fatalf("failed to commit data: %v", err) } - batch.Write() - + if hookWriter != nil { + batch.Replay(hookWriter) + } else { + batch.Write() + } paths, nodes, _ = sched.Missing(0) elements = elements[:0] for i := 0; i < len(paths); i++ { @@ -899,3 +905,116 @@ func testPivotMove(t *testing.T, scheme string, tiny bool) { syncWith(t, rootC, destDisk, srcTrieDB) checkTrieContents(t, destDisk, scheme, srcTrie.Hash().Bytes(), stateC, true) } + +func TestSyncAbort(t *testing.T) { + testSyncAbort(t, rawdb.PathScheme) + testSyncAbort(t, rawdb.HashScheme) +} + +type hookWriter struct { + db ethdb.KeyValueStore + filter func(key []byte, value []byte) bool +} + +// Put inserts the given value into the key-value data store. +func (w *hookWriter) Put(key []byte, value []byte) error { + if w.filter != nil && w.filter(key, value) { + return nil + } + return w.db.Put(key, value) +} + +// Delete removes the key from the key-value data store. +func (w *hookWriter) Delete(key []byte) error { + return w.db.Delete(key) +} + +func testSyncAbort(t *testing.T, scheme string) { + var ( + srcDisk = rawdb.NewMemoryDatabase() + srcTrieDB = newTestDatabase(srcDisk, scheme) + srcTrie, _ = New(TrieID(types.EmptyRootHash), srcTrieDB) + + deleteFn = func(key []byte, tr *Trie, states map[string][]byte) { + tr.Delete(key) + delete(states, string(key)) + } + writeFn = func(key []byte, val []byte, tr *Trie, states map[string][]byte) { + if val == nil { + val = randBytes(32) + } + tr.Update(key, val) + states[string(key)] = common.CopyBytes(val) + } + copyStates = func(states map[string][]byte) map[string][]byte { + cpy := make(map[string][]byte) + for k, v := range states { + cpy[k] = v + } + return cpy + } + ) + var ( + stateA = make(map[string][]byte) + key = randBytes(32) + val = randBytes(32) + ) + for i := 0; i < 256; i++ { + writeFn(randBytes(32), nil, srcTrie, stateA) + } + writeFn(key, val, srcTrie, stateA) + + rootA, nodesA, _ := srcTrie.Commit(false) + if err := srcTrieDB.Update(rootA, types.EmptyRootHash, 0, trienode.NewWithNodeSet(nodesA), nil); err != nil { + panic(err) + } + if err := srcTrieDB.Commit(rootA, false); err != nil { + panic(err) + } + // Create a destination trie and sync with the scheduler + destDisk := rawdb.NewMemoryDatabase() + syncWith(t, rootA, destDisk, srcTrieDB) + checkTrieContents(t, destDisk, scheme, srcTrie.Hash().Bytes(), stateA, true) + + // Delete the element from the trie + stateB := copyStates(stateA) + srcTrie, _ = New(TrieID(rootA), srcTrieDB) + deleteFn(key, srcTrie, stateB) + + rootB, nodesB, _ := srcTrie.Commit(false) + if err := srcTrieDB.Update(rootB, rootA, 0, trienode.NewWithNodeSet(nodesB), nil); err != nil { + panic(err) + } + if err := srcTrieDB.Commit(rootB, false); err != nil { + panic(err) + } + + // Sync the new state, but never persist the new root node. Before the + // fix #28595, the original old root node will still be left in database + // which breaks the next healing cycle. + syncWithHookWriter(t, rootB, destDisk, srcTrieDB, &hookWriter{db: destDisk, filter: func(key []byte, value []byte) bool { + if scheme == rawdb.HashScheme { + return false + } + if len(value) == 0 { + return false + } + ok, path := rawdb.ResolveAccountTrieNodeKey(key) + return ok && len(path) == 0 + }}) + + // Add elements to expand trie + stateC := copyStates(stateB) + srcTrie, _ = New(TrieID(rootB), srcTrieDB) + + writeFn(key, val, srcTrie, stateC) + rootC, nodesC, _ := srcTrie.Commit(false) + if err := srcTrieDB.Update(rootC, rootB, 0, trienode.NewWithNodeSet(nodesC), nil); err != nil { + panic(err) + } + if err := srcTrieDB.Commit(rootC, false); err != nil { + panic(err) + } + syncWith(t, rootC, destDisk, srcTrieDB) + checkTrieContents(t, destDisk, scheme, srcTrie.Hash().Bytes(), stateC, true) +}