From ca1d22d11bbb7d21524728e4b06661a08c96a35d Mon Sep 17 00:00:00 2001 From: cool-developer <51834436+cool-develope@users.noreply.github.com> Date: Thu, 14 Mar 2024 16:51:39 -0400 Subject: [PATCH] chore: backport the pruning fix PR to v1.1.x (#910) --- CHANGELOG.md | 11 ++ batch.go | 45 ++++-- import.go | 2 +- migrate_test.go | 156 ++++++++++++++++++-- mutable_tree.go | 15 +- node.go | 9 +- nodedb.go | 369 ++++++++++++++++++++++++++++-------------------- nodedb_test.go | 16 +-- tree_test.go | 66 ++++++++- 9 files changed, 489 insertions(+), 200 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c350e2c1..10422131c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,16 @@ # Changelog +## v1.1.1 March 13, 2024 + +### Bug Fixes + +- [#910](https://github.com/cosmos/iavl/pull/910) Fix the reference root format from (prefix, version) to (prefix, version, nonce) + +### Improvements + +- [#910](https://github.com/cosmos/iavl/pull/910) Async pruning of legacy orphan nodes. + + ## v1.1.0 February 29, 2024 ### API Breaking Changes diff --git a/batch.go b/batch.go index c67306393..3de9b1684 100644 --- a/batch.go +++ b/batch.go @@ -10,10 +10,10 @@ import ( // around batch that flushes batch's data to disk // as soon as the configurable limit is reached. type BatchWithFlusher struct { + mtx sync.Mutex db dbm.DB // This is only used to create new batch batch dbm.Batch // Batched writing buffer. - mtx sync.Mutex flushThreshold int // The threshold to flush the batch to disk. } @@ -57,13 +57,11 @@ func (b *BatchWithFlusher) Set(key, value []byte) error { return err } if batchSizeAfter > b.flushThreshold { - if err := b.batch.Write(); err != nil { + b.mtx.Unlock() + if err := b.Write(); err != nil { return err } - if err := b.batch.Close(); err != nil { - return err - } - b.batch = b.db.NewBatchWithSize(b.flushThreshold) + b.mtx.Lock() } return b.batch.Set(key, value) } @@ -81,26 +79,47 @@ func (b *BatchWithFlusher) Delete(key []byte) error { return err } if batchSizeAfter > b.flushThreshold { - if err := b.batch.Write(); err != nil { - return err - } - if err := b.batch.Close(); err != nil { + b.mtx.Unlock() + if err := b.Write(); err != nil { return err } - b.batch = b.db.NewBatchWithSize(b.flushThreshold) + b.mtx.Lock() } return b.batch.Delete(key) } func (b *BatchWithFlusher) Write() error { - return b.batch.Write() + b.mtx.Lock() + defer b.mtx.Unlock() + + if err := b.batch.Write(); err != nil { + return err + } + if err := b.batch.Close(); err != nil { + return err + } + b.batch = b.db.NewBatchWithSize(b.flushThreshold) + return nil } func (b *BatchWithFlusher) WriteSync() error { - return b.batch.WriteSync() + b.mtx.Lock() + defer b.mtx.Unlock() + + if err := b.batch.WriteSync(); err != nil { + return err + } + if err := b.batch.Close(); err != nil { + return err + } + b.batch = b.db.NewBatchWithSize(b.flushThreshold) + return nil } func (b *BatchWithFlusher) Close() error { + b.mtx.Lock() + defer b.mtx.Unlock() + return b.batch.Close() } diff --git a/import.go b/import.go index 44802170f..282e260ca 100644 --- a/import.go +++ b/import.go @@ -204,7 +204,7 @@ func (i *Importer) Commit() error { return err } if i.stack[0].nodeKey.version < i.version { // it means there is no update in the given version - if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), i.tree.ndb.nodeKeyPrefix(i.stack[0].nodeKey.version)); err != nil { + if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), i.tree.ndb.nodeKey(i.stack[0].nodeKey.GetKey())); err != nil { return err } } diff --git a/migrate_test.go b/migrate_test.go index 6b46301fe..adc238dd1 100644 --- a/migrate_test.go +++ b/migrate_test.go @@ -8,18 +8,20 @@ import ( "os/exec" "path" "testing" + "time" "cosmossdk.io/log" "github.com/stretchr/testify/require" dbm "github.com/cosmos/iavl/db" + iavlrand "github.com/cosmos/iavl/internal/rand" ) const ( dbType = "goleveldb" ) -func createLegacyTree(t *testing.T, dbType, dbDir string, version int) (string, error) { +func createLegacyTree(t *testing.T, dbDir string, version int) (string, error) { relateDir := path.Join(t.TempDir(), dbDir) if _, err := os.Stat(relateDir); err == nil { err := os.RemoveAll(relateDir) @@ -48,10 +50,10 @@ func createLegacyTree(t *testing.T, dbType, dbDir string, version int) (string, func TestLazySet(t *testing.T) { legacyVersion := 1000 dbDir := fmt.Sprintf("legacy-%s-%d", dbType, legacyVersion) - relateDir, err := createLegacyTree(t, dbType, dbDir, legacyVersion) + relateDir, err := createLegacyTree(t, dbDir, legacyVersion) require.NoError(t, err) - db, err := dbm.NewDB("test", "goleveldb", relateDir) + db, err := dbm.NewDB("test", dbType, relateDir) require.NoError(t, err) defer func() { @@ -91,10 +93,10 @@ func TestLazySet(t *testing.T) { func TestLegacyReferenceNode(t *testing.T) { legacyVersion := 20 dbDir := fmt.Sprintf("./legacy-%s-%d", dbType, legacyVersion) - relateDir, err := createLegacyTree(t, dbType, dbDir, legacyVersion) + relateDir, err := createLegacyTree(t, dbDir, legacyVersion) require.NoError(t, err) - db, err := dbm.NewDB("test", "goleveldb", relateDir) + db, err := dbm.NewDB("test", dbType, relateDir) require.NoError(t, err) defer func() { @@ -111,6 +113,7 @@ func TestLegacyReferenceNode(t *testing.T) { // Load the latest legacy version _, err = tree.LoadVersion(int64(legacyVersion)) require.NoError(t, err) + legacyLatestVersion := tree.root.nodeKey.version // Commit new versions without updates _, _, err = tree.SaveVersion() @@ -123,17 +126,17 @@ func TestLegacyReferenceNode(t *testing.T) { _, err = newTree.LoadVersion(version - 1) require.NoError(t, err) // Check if the reference node is refactored - require.Equal(t, newTree.root.nodeKey.nonce, uint32(1)) - require.Equal(t, newTree.root.nodeKey.version, int64(legacyVersion)) + require.Equal(t, newTree.root.nodeKey.nonce, uint32(0)) + require.Equal(t, newTree.root.nodeKey.version, legacyLatestVersion) } func TestDeleteVersions(t *testing.T) { legacyVersion := 100 dbDir := fmt.Sprintf("./legacy-%s-%d", dbType, legacyVersion) - relateDir, err := createLegacyTree(t, dbType, dbDir, legacyVersion) + relateDir, err := createLegacyTree(t, dbDir, legacyVersion) require.NoError(t, err) - db, err := dbm.NewDB("test", "goleveldb", relateDir) + db, err := dbm.NewDB("test", dbType, relateDir) require.NoError(t, err) defer func() { @@ -205,3 +208,138 @@ func TestDeleteVersions(t *testing.T) { pVersions = tree.AvailableVersions() require.Equal(t, postVersions/2, len(pVersions)) } + +func TestPruning(t *testing.T) { + legacyVersion := 100 + dbDir := fmt.Sprintf("./legacy-%s-%d", dbType, legacyVersion) + relateDir, err := createLegacyTree(t, dbDir, legacyVersion) + require.NoError(t, err) + + db, err := dbm.NewDB("test", dbType, relateDir) + require.NoError(t, err) + + defer func() { + if err := db.Close(); err != nil { + t.Errorf("DB close error: %v\n", err) + } + if err := os.RemoveAll(relateDir); err != nil { + t.Errorf("%+v\n", err) + } + }() + + // Load the latest version + tree := NewMutableTree(db, 1000, false, log.NewNopLogger()) + _, err = tree.Load() + require.NoError(t, err) + + // Save 10 versions without updates + for i := 0; i < 10; i++ { + _, _, err = tree.SaveVersion() + require.NoError(t, err) + } + + // Save 990 versions + leavesCount := 10 + toVersion := int64(990) + pruningInterval := int64(20) + for i := int64(0); i < toVersion; i++ { + for j := 0; j < leavesCount; j++ { + _, err := tree.Set([]byte(fmt.Sprintf("key%d", j)), []byte(fmt.Sprintf("value%d", j))) + require.NoError(t, err) + } + _, v, err := tree.SaveVersion() + require.NoError(t, err) + if v%pruningInterval == 0 { + err = tree.DeleteVersionsTo(v - pruningInterval/2) + require.NoError(t, err) + } + } + + // Wait for pruning to finish + for i := 0; i < 100; i++ { + _, _, err := tree.SaveVersion() + require.NoError(t, err) + isLeacy, err := tree.ndb.hasLegacyVersion(int64(legacyVersion)) + require.NoError(t, err) + if !isLeacy { + break + } + // Simulate the consensus state update + time.Sleep(500 * time.Millisecond) + } + // Reload the tree + tree = NewMutableTree(db, 0, false, log.NewNopLogger()) + versions := tree.AvailableVersions() + require.Equal(t, versions[0], int(toVersion)+legacyVersion+1) + for _, v := range versions { + _, err := tree.LoadVersion(int64(v)) + require.NoError(t, err) + } + // Check if the legacy nodes are pruned + _, err = tree.Load() + require.NoError(t, err) + itr, err := NewNodeIterator(tree.root.GetKey(), tree.ndb) + require.NoError(t, err) + legacyNodes := make(map[string]*Node) + for ; itr.Valid(); itr.Next(false) { + node := itr.GetNode() + if node.nodeKey.nonce == 0 { + legacyNodes[string(node.hash)] = node + } + } + + lNodes, err := tree.ndb.legacyNodes() + require.NoError(t, err) + require.Len(t, lNodes, len(legacyNodes)) + for _, node := range lNodes { + _, ok := legacyNodes[string(node.hash)] + require.True(t, ok) + } +} + +func TestRandomSet(t *testing.T) { + legacyVersion := 50 + dbDir := fmt.Sprintf("./legacy-%s-%d", dbType, legacyVersion) + relateDir, err := createLegacyTree(t, dbDir, legacyVersion) + require.NoError(t, err) + + db, err := dbm.NewDB("test", dbType, relateDir) + require.NoError(t, err) + + defer func() { + if err := db.Close(); err != nil { + t.Errorf("DB close error: %v\n", err) + } + if err := os.RemoveAll(relateDir); err != nil { + t.Errorf("%+v\n", err) + } + }() + + tree := NewMutableTree(db, 10000, false, log.NewNopLogger()) + + // Load the latest legacy version + _, err = tree.LoadVersion(int64(legacyVersion)) + require.NoError(t, err) + + // Commit new versions + postVersions := 1000 + emptyVersions := 10 + for i := 0; i < emptyVersions; i++ { + _, _, err := tree.SaveVersion() + require.NoError(t, err) + } + for i := 0; i < postVersions-emptyVersions; i++ { + leafCount := rand.Intn(50) + for j := 0; j < leafCount; j++ { + key := iavlrand.RandBytes(10) + value := iavlrand.RandBytes(10) + _, err = tree.Set(key, value) + require.NoError(t, err) + } + _, _, err = tree.SaveVersion() + require.NoError(t, err) + } + + err = tree.DeleteVersionsTo(int64(legacyVersion + postVersions - 1)) + require.NoError(t, err) +} diff --git a/mutable_tree.go b/mutable_tree.go index c5965681e..752deb26f 100644 --- a/mutable_tree.go +++ b/mutable_tree.go @@ -624,7 +624,7 @@ func (tree *MutableTree) enableFastStorageAndCommit() error { return err } - if err = tree.ndb.setFastStorageVersionToBatch(latestVersion); err != nil { + if err = tree.ndb.SetFastStorageVersionToBatch(latestVersion); err != nil { return err } @@ -756,15 +756,15 @@ func (tree *MutableTree) SaveVersion() ([]byte, int64, error) { } } else { if tree.root.nodeKey != nil { - // it means there is no updated - if err := tree.ndb.SaveRoot(version, tree.root.nodeKey.version); err != nil { + // it means there are no updated nodes + if err := tree.ndb.SaveRoot(version, tree.root.nodeKey); err != nil { return nil, 0, err } // it means the reference node is a legacy node - if tree.root.nodeKey.nonce == 0 { + if tree.root.isLegacy { // it will update the legacy node to the new format // which ensures the reference node is not a legacy node - tree.root.nodeKey.nonce = 1 + tree.root.isLegacy = false if err := tree.ndb.SaveNode(tree.root); err != nil { return nil, 0, fmt.Errorf("failed to save the reference legacy node: %w", err) } @@ -801,7 +801,7 @@ func (tree *MutableTree) saveFastNodeVersion(latestVersion int64, isGenesis bool if err := tree.saveFastNodeRemovals(); err != nil { return err } - return tree.ndb.setFastStorageVersionToBatch(latestVersion) + return tree.ndb.SetFastStorageVersionToBatch(latestVersion) } func (tree *MutableTree) getUnsavedFastNodeAdditions() map[string]*fastnode.Node { @@ -884,8 +884,7 @@ func (tree *MutableTree) SetInitialVersion(version uint64) { } // DeleteVersionsTo removes versions upto the given version from the MutableTree. -// An error is returned if any single version has active readers. -// All writes happen in a single batch with a single commit. +// It will not block the SaveVersion() call, instead it will be queued and executed deferred. func (tree *MutableTree) DeleteVersionsTo(toVersion int64) error { if err := tree.ndb.DeleteVersionsTo(toVersion); err != nil { return err diff --git a/node.go b/node.go index 24f791b8e..06ffc9e58 100644 --- a/node.go +++ b/node.go @@ -71,6 +71,7 @@ type Node struct { leftNode *Node rightNode *Node subtreeHeight int8 + isLegacy bool } var _ cache.Node = (*Node)(nil) @@ -87,6 +88,9 @@ func NewNode(key []byte, value []byte) *Node { // GetKey returns the key of the node. func (node *Node) GetKey() []byte { + if node.isLegacy { + return node.hash + } return node.nodeKey.GetKey() } @@ -239,6 +243,7 @@ func MakeLegacyNode(hash, buf []byte) (*Node, error) { nodeKey: &NodeKey{version: ver}, key: key, hash: hash, + isLegacy: true, } // Read node body. @@ -280,11 +285,11 @@ func (node *Node) String() string { if node.rightNode != nil && node.rightNode.nodeKey != nil { child += fmt.Sprintf("{right %v}", node.rightNode.nodeKey) } - return fmt.Sprintf("Node{%s:%s@ %v:%v-%v %d-%d}#%s\n", + return fmt.Sprintf("Node{%s:%s@ %v:%x-%x %d-%d %x}#%s\n", color.ColoredBytes(node.key, color.Green, color.Blue), color.ColoredBytes(node.value, color.Cyan, color.Blue), node.nodeKey, node.leftNodeKey, node.rightNodeKey, - node.size, node.subtreeHeight, child) + node.size, node.subtreeHeight, node.hash, child) } // clone creates a shallow copy of a node with its hash set to nil. diff --git a/nodedb.go b/nodedb.go index 3d3cd0370..9aa2c805a 100644 --- a/nodedb.go +++ b/nodedb.go @@ -37,6 +37,10 @@ const ( defaultStorageVersionValue = "1.0.0" fastStorageVersionValue = "1.1.0" fastNodeCacheSize = 100000 + + // This is used to avoid the case which pruning blocks the main process. + deleteBatchCount = 1000 + deletePauseDuration = 100 * time.Millisecond ) var ( @@ -62,8 +66,12 @@ var ( // All legacy node keys are prefixed with the byte 'n'. legacyNodeKeyFormat = keyformat.NewFastPrefixFormatter('n', hashSize) // n + // All legacy orphan keys are prefixed with the byte 'o'. + legacyOrphanKeyFormat = keyformat.NewKeyFormat('o', int64Size, int64Size, hashSize) // o + // All legacy root keys are prefixed with the byte 'r'. legacyRootKeyFormat = keyformat.NewKeyFormat('r', int64Size) // r + ) var errInvalidFastStorageVersion = fmt.Errorf("fast storage version must be in the format %s", fastStorageVersionDelimiter) @@ -243,10 +251,13 @@ func (ndb *nodeDB) SaveFastNodeNoCache(node *fastnode.Node) error { return ndb.saveFastNodeUnlocked(node, false) } -// setFastStorageVersionToBatch sets storage version to fast where the version is +// SetFastStorageVersionToBatch sets storage version to fast where the version is // 1.1.0-. Returns error if storage version is incorrect or on // db error, nil otherwise. Requires changes to be committed after to be persisted. -func (ndb *nodeDB) setFastStorageVersionToBatch(latestVersion int64) error { +func (ndb *nodeDB) SetFastStorageVersionToBatch(latestVersion int64) error { + ndb.mtx.Lock() + defer ndb.mtx.Unlock() + var newVersion string if ndb.storageVersion >= fastStorageVersionValue { // Storage version should be at index 0 and latest fast cache version at index 1 @@ -369,15 +380,61 @@ func (ndb *nodeDB) deleteVersion(version int64) error { if err != nil { return err } - if rootKey == nil || GetNodeKey(rootKey).version < version { - if err := ndb.batch.Delete(ndb.nodeKey(GetRootKey(version))); err != nil { + + if err := ndb.traverseOrphans(version, version+1, func(orphan *Node) error { + if orphan.nodeKey.nonce == 0 && !orphan.isLegacy { + // if the orphan is a reformatted root, it can be a legacy root + // so it should be removed from the pruning process. + if err := ndb.batch.Delete(ndb.legacyNodeKey(orphan.hash)); err != nil { + return err + } + } + if orphan.nodeKey.nonce == 1 && orphan.nodeKey.version < version { + // if the orphan is referred to the previous root, it should be reformatted + // to (version, 0), because the root (version, 1) should be removed but not + // applied now due to the batch writing. + orphan.nodeKey.nonce = 0 + } + nk := orphan.GetKey() + if orphan.isLegacy { + return ndb.batch.Delete(ndb.legacyNodeKey(nk)) + } + return ndb.batch.Delete(ndb.nodeKey(nk)) + }); err != nil { + return err + } + + literalRootKey := GetRootKey(version) + if rootKey == nil || !bytes.Equal(rootKey, literalRootKey) { + // if the root key is not matched with the literal root key, it means the given root + // is a reference root to the previous version. + if err := ndb.batch.Delete(ndb.nodeKey(literalRootKey)); err != nil { return err } } - return ndb.traverseOrphans(version, version+1, func(orphan *Node) error { - return ndb.batch.Delete(ndb.nodeKey(orphan.GetKey())) - }) + // check if the version is referred by the next version + nextRootKey, err := ndb.GetRoot(version + 1) + if err != nil { + return err + } + if bytes.Equal(literalRootKey, nextRootKey) { + root, err := ndb.GetNode(nextRootKey) + if err != nil { + return err + } + // ensure that the given version is not included in the root search + if err := ndb.batch.Delete(ndb.nodeKey(literalRootKey)); err != nil { + return err + } + // instead, the root should be reformatted to (version, 0) + root.nodeKey.nonce = 0 + if err := ndb.SaveNode(root); err != nil { + return err + } + } + + return nil } // deleteLegacyNodes deletes all legacy nodes with the given version from disk. @@ -404,121 +461,48 @@ func (ndb *nodeDB) deleteLegacyNodes(version int64, nk []byte) error { return ndb.batch.Delete(ndb.legacyNodeKey(nk)) } -var ( - isDeletingLegacyVersionsMutex = &sync.Mutex{} - isDeletingLegacyVersions = false -) - // deleteLegacyVersions deletes all legacy versions from disk. -func (ndb *nodeDB) deleteLegacyVersions() error { - isDeletingLegacyVersionsMutex.Lock() - if isDeletingLegacyVersions { - isDeletingLegacyVersionsMutex.Unlock() - return nil - } - isDeletingLegacyVersions = true - isDeletingLegacyVersionsMutex.Unlock() - - go func() { - defer func() { - isDeletingLegacyVersionsMutex.Lock() - isDeletingLegacyVersions = false - isDeletingLegacyVersionsMutex.Unlock() - }() - - // Check if we have a legacy version - itr, err := ndb.getPrefixIterator(legacyRootKeyFormat.Key()) - if err != nil { - ndb.logger.Error(err.Error()) - return - } - defer itr.Close() - - // Delete orphans for all legacy versions - var prevVersion, curVersion int64 - var rootKeys [][]byte - counter := 0 - for ; itr.Valid(); itr.Next() { - legacyRootKeyFormat.Scan(itr.Key(), &curVersion) - rootKeys = append(rootKeys, itr.Key()) - if prevVersion > 0 { - if err := ndb.traverseOrphans(prevVersion, curVersion, func(orphan *Node) error { - counter++ - if counter == 1000 { - counter = 0 - time.Sleep(1000 * time.Millisecond) - fmt.Println("IAVL sleep happening") - } - return ndb.batch.Delete(ndb.nodeKey(orphan.GetKey())) - }); err != nil { - ndb.logger.Error(err.Error()) - return - } - } - prevVersion = curVersion - } - // Delete the last version for the legacyLastVersion - if curVersion > 0 { - legacyLatestVersion, err := ndb.getLegacyLatestVersion() - if err != nil { - ndb.logger.Error(err.Error()) - return - } - if curVersion != legacyLatestVersion { - ndb.logger.Error("expected legacyLatestVersion to be %d, got %d", legacyLatestVersion, curVersion) - return - } - if err := ndb.traverseOrphans(curVersion, curVersion+1, func(orphan *Node) error { - return ndb.batch.Delete(ndb.nodeKey(orphan.GetKey())) - }); err != nil { - ndb.logger.Error("failed to clean legacy orphans between versions", "err", err) - return - } - } - - // Delete all roots of the legacy versions - for _, rootKey := range rootKeys { - if err := ndb.batch.Delete(rootKey); err != nil { - ndb.logger.Error("failed to clean legacy orphans root keys", "err", err) - return - } - } - - // Initialize the legacy latest version to -1 to demonstrate that all legacy versions have been deleted - ndb.legacyLatestVersion = -1 +func (ndb *nodeDB) deleteLegacyVersions(legacyLatestVersion int64) error { + count := 0 - // Delete all orphan nodes of the legacy versions - // TODO: Is this just deadcode????? - if err := ndb.deleteOrphans(); err != nil { - ndb.logger.Error("failed to clean legacy orphans", "err", err) - return + checkDeletePause := func() { + count++ + if count%deleteBatchCount == 0 { + time.Sleep(deletePauseDuration) + count = 0 } - }() - - return nil -} + } -// deleteOrphans cleans all legacy orphans from the nodeDB. -func (ndb *nodeDB) deleteOrphans() error { - itr, err := ndb.getPrefixIterator(legacyRootKeyFormat.Key()) - if err != nil { + // Delete the last version for the legacyLastVersion + if err := ndb.traverseOrphans(legacyLatestVersion, legacyLatestVersion+1, func(orphan *Node) error { + checkDeletePause() + return ndb.batch.Delete(ndb.legacyNodeKey(orphan.hash)) + }); err != nil { return err } - defer itr.Close() - count := 0 - for ; itr.Valid(); itr.Next() { - if err := ndb.batch.Delete(itr.Key()); err != nil { + // Delete orphans for all legacy versions + if err := ndb.traversePrefix(legacyOrphanKeyFormat.Key(), func(key, value []byte) error { + checkDeletePause() + if err := ndb.batch.Delete(key); err != nil { return err } - - // Sleep for a while to avoid blocking the main thread i/o. - count++ - if count > 1000 { - count = 0 - time.Sleep(100 * time.Millisecond) + var fromVersion, toVersion int64 + legacyOrphanKeyFormat.Scan(key, &toVersion, &fromVersion) + if (fromVersion <= legacyLatestVersion && toVersion < legacyLatestVersion) || fromVersion > legacyLatestVersion { + checkDeletePause() + return ndb.batch.Delete(ndb.legacyNodeKey(value)) } - + return nil + }); err != nil { + return err + } + // Delete all legacy roots + if err := ndb.traversePrefix(legacyRootKeyFormat.Key(), func(key, value []byte) error { + checkDeletePause() + return ndb.batch.Delete(key) + }); err != nil { + return err } return nil @@ -557,8 +541,8 @@ func (ndb *nodeDB) DeleteVersionsFrom(fromVersion int64) error { if err := ndb.deleteLegacyNodes(version, v); err != nil { return err } - // delete the legacy root // it will skip the orphans because orphans will be removed at once in `deleteLegacyVersions` + // delete the legacy root return ndb.batch.Delete(k) }); err != nil { return err @@ -590,6 +574,7 @@ func (ndb *nodeDB) DeleteVersionsTo(toVersion int64) error { if err != nil { return err } + // If the legacy version is greater than the toVersion, we don't need to delete anything. // It will delete the legacy versions at once. if legacyLatestVersion > toVersion { @@ -607,20 +592,27 @@ func (ndb *nodeDB) DeleteVersionsTo(toVersion int64) error { } if latest <= toVersion { - return fmt.Errorf("the version should be smaller than the latest version %d", latest) + return fmt.Errorf("latest version %d is less than or equal to toVersion %d", latest, toVersion) } + ndb.mtx.Lock() for v, r := range ndb.versionReaders { if v >= first && v <= toVersion && r != 0 { - return fmt.Errorf("unable to delete version %v with %v active readers", v, r) + ndb.mtx.Unlock() + return fmt.Errorf("unable to delete version %d with %d active readers", v, r) } } + ndb.mtx.Unlock() // Delete the legacy versions if legacyLatestVersion >= first { - if err := ndb.deleteLegacyVersions(); err != nil { - return err - } + // reset the legacy latest version forcibly to avoid multiple calls + ndb.resetLegacyLatestVersion(-1) + go func() { + if err := ndb.deleteLegacyVersions(legacyLatestVersion); err != nil { + ndb.logger.Error("Error deleting legacy versions", "err", err) + } + }() first = legacyLatestVersion + 1 } @@ -648,10 +640,6 @@ func (ndb *nodeDB) nodeKey(nk []byte) []byte { return nodeKeyFormat.Key(nk) } -func (ndb *nodeDB) nodeKeyPrefix(version int64) []byte { - return nodeKeyPrefixFormat.KeyInt64(version) -} - func (ndb *nodeDB) fastNodeKey(key []byte) []byte { return fastKeyFormat.KeyBytes(key) } @@ -665,8 +653,12 @@ func (ndb *nodeDB) legacyRootKey(version int64) []byte { } func (ndb *nodeDB) getFirstVersion() (int64, error) { - if ndb.firstVersion != 0 { - return ndb.firstVersion, nil + ndb.mtx.Lock() + firstVersion := ndb.firstVersion + ndb.mtx.Unlock() + + if firstVersion > 0 { + return firstVersion, nil } // Check if we have a legacy version @@ -685,7 +677,6 @@ func (ndb *nodeDB) getFirstVersion() (int64, error) { if err != nil { return 0, err } - firstVersion := int64(0) for firstVersion < latestVersion { version := (latestVersion + firstVersion) >> 1 has, err := ndb.hasVersion(version) @@ -698,18 +689,25 @@ func (ndb *nodeDB) getFirstVersion() (int64, error) { firstVersion = version + 1 } } - ndb.firstVersion = latestVersion - return ndb.firstVersion, nil + ndb.resetFirstVersion(latestVersion) + + return latestVersion, nil } func (ndb *nodeDB) resetFirstVersion(version int64) { + ndb.mtx.Lock() + defer ndb.mtx.Unlock() ndb.firstVersion = version } func (ndb *nodeDB) getLegacyLatestVersion() (int64, error) { - if ndb.legacyLatestVersion != 0 { - return ndb.legacyLatestVersion, nil + ndb.mtx.Lock() + latestVersion := ndb.legacyLatestVersion + ndb.mtx.Unlock() + + if latestVersion != 0 { + return latestVersion, nil } itr, err := ndb.db.ReverseIterator( @@ -725,7 +723,7 @@ func (ndb *nodeDB) getLegacyLatestVersion() (int64, error) { k := itr.Key() var version int64 legacyRootKeyFormat.Scan(k, &version) - ndb.legacyLatestVersion = version + ndb.resetLegacyLatestVersion(version) return version, nil } @@ -734,14 +732,24 @@ func (ndb *nodeDB) getLegacyLatestVersion() (int64, error) { } // If there are no legacy versions, set -1 - ndb.legacyLatestVersion = -1 + ndb.resetLegacyLatestVersion(-1) - return ndb.legacyLatestVersion, nil + return -1, nil +} + +func (ndb *nodeDB) resetLegacyLatestVersion(version int64) { + ndb.mtx.Lock() + defer ndb.mtx.Unlock() + ndb.legacyLatestVersion = version } func (ndb *nodeDB) getLatestVersion() (int64, error) { - if ndb.latestVersion != 0 { - return ndb.latestVersion, nil + ndb.mtx.Lock() + latestVersion := ndb.latestVersion + ndb.mtx.Unlock() + + if latestVersion > 0 { + return latestVersion, nil } itr, err := ndb.db.ReverseIterator( @@ -757,8 +765,9 @@ func (ndb *nodeDB) getLatestVersion() (int64, error) { k := itr.Key() var nk []byte nodeKeyFormat.Scan(k, &nk) - ndb.latestVersion = GetNodeKey(nk).version - return ndb.latestVersion, nil + latestVersion = GetNodeKey(nk).version + ndb.resetLatestVersion(latestVersion) + return latestVersion, nil } if err := itr.Error(); err != nil { @@ -766,15 +775,16 @@ func (ndb *nodeDB) getLatestVersion() (int64, error) { } // If there are no versions, try to get the latest version from the legacy format. - version, err := ndb.getLegacyLatestVersion() + latestVersion, err = ndb.getLegacyLatestVersion() if err != nil { return 0, err } - if version > 0 { - ndb.latestVersion = version + if latestVersion > 0 { + ndb.resetLatestVersion(latestVersion) + return latestVersion, nil } - return ndb.latestVersion, nil + return 0, nil } func (ndb *nodeDB) resetLatestVersion(version int64) { @@ -817,8 +827,33 @@ func (ndb *nodeDB) GetRoot(version int64) ([]byte, error) { if len(val) == 0 { // empty root return nil, nil } - if isReferenceToRoot(val) { // point to the prev root - return append(val[1:], 0, 0, 0, 1), nil + isRef, n := isReferenceRoot(val) + if isRef { // point to the prev version + switch n { + case nodeKeyFormat.Length(): // (prefix, version, 1) + nk := GetNodeKey(val[1:]) + val, err = ndb.db.Get(nodeKeyFormat.Key(val[1:])) + if err != nil { + return nil, err + } + if val == nil { // the prev version does not exist + // check if the prev version root is reformatted due to the pruning + rnk := &NodeKey{version: nk.version, nonce: 0} + val, err = ndb.db.Get(nodeKeyFormat.Key(rnk.GetKey())) + if err != nil { + return nil, err + } + if val == nil { + return nil, ErrVersionDoesNotExist + } + return rnk.GetKey(), nil + } + return nk.GetKey(), nil + case nodeKeyPrefixFormat.Length(): // (prefix, version) before the lazy pruning + return append(val[1:], 0, 0, 0, 1), nil + default: + return nil, fmt.Errorf("invalid reference root: %x", val) + } } return rootKey, nil @@ -826,12 +861,16 @@ func (ndb *nodeDB) GetRoot(version int64) ([]byte, error) { // SaveEmptyRoot saves the empty root. func (ndb *nodeDB) SaveEmptyRoot(version int64) error { + ndb.mtx.Lock() + defer ndb.mtx.Unlock() return ndb.batch.Set(nodeKeyFormat.Key(GetRootKey(version)), []byte{}) } // SaveRoot saves the root when no updates. -func (ndb *nodeDB) SaveRoot(version, prevVersion int64) error { - return ndb.batch.Set(nodeKeyFormat.Key(GetRootKey(version)), ndb.nodeKeyPrefix(prevVersion)) +func (ndb *nodeDB) SaveRoot(version int64, nk *NodeKey) error { + ndb.mtx.Lock() + defer ndb.mtx.Unlock() + return ndb.batch.Set(nodeKeyFormat.Key(GetRootKey(version)), nodeKeyFormat.Key(nk.GetKey())) } // Traverse fast nodes and return error if any, nil otherwise @@ -932,9 +971,6 @@ func (ndb *nodeDB) Commit() error { return fmt.Errorf("failed to write batch, %w", err) } - ndb.batch.Close() - ndb.batch = NewBatchWithFlusher(ndb.db, ndb.opts.FlushThreshold) - return nil } @@ -950,6 +986,16 @@ func (ndb *nodeDB) decrVersionReaders(version int64) { if ndb.versionReaders[version] > 0 { ndb.versionReaders[version]-- } + if ndb.versionReaders[version] == 0 { + delete(ndb.versionReaders, version) + } +} + +func isReferenceRoot(bz []byte) (bool, int) { + if bz[0] == nodeKeyFormat.Prefix()[0] { + return true, len(bz) + } + return false, 0 } // traverseOrphans traverses orphans which removed by the updates of the curVersion in the prevVersion. @@ -1050,6 +1096,24 @@ func (ndb *nodeDB) nodes() ([]*Node, error) { return nodes, nil } +func (ndb *nodeDB) legacyNodes() ([]*Node, error) { + nodes := []*Node{} + + err := ndb.traversePrefix(legacyNodeKeyFormat.Prefix(), func(key, value []byte) error { + node, err := MakeLegacyNode(key[1:], value) + if err != nil { + return err + } + nodes = append(nodes, node) + return nil + }) + if err != nil { + return nil, err + } + + return nodes, nil +} + func (ndb *nodeDB) orphans() ([][]byte, error) { orphans := [][]byte{} @@ -1083,20 +1147,11 @@ func (ndb *nodeDB) size() int { return size } -func isReferenceToRoot(bz []byte) bool { - if bz[0] == nodeKeyPrefixFormat.Prefix()[0] { - if len(bz) == nodeKeyPrefixFormat.Length() { - return true - } - } - return false -} - func (ndb *nodeDB) traverseNodes(fn func(node *Node) error) error { nodes := []*Node{} if err := ndb.traversePrefix(nodeKeyFormat.Prefix(), func(key, value []byte) error { - if isReferenceToRoot(value) { + if isRef, _ := isReferenceRoot(value); isRef { return nil } node, err := MakeNode(key[1:], value) diff --git a/nodedb_test.go b/nodedb_test.go index 272a8180c..52d6a51de 100644 --- a/nodedb_test.go +++ b/nodedb_test.go @@ -91,7 +91,7 @@ func TestSetStorageVersion_Success(t *testing.T) { latestVersion, err := ndb.getLatestVersion() require.NoError(t, err) - err = ndb.setFastStorageVersionToBatch(latestVersion) + err = ndb.SetFastStorageVersionToBatch(latestVersion) require.NoError(t, err) require.Equal(t, expectedVersion+fastStorageVersionDelimiter+strconv.Itoa(int(latestVersion)), ndb.getStorageVersion()) @@ -116,7 +116,7 @@ func TestSetStorageVersion_DBFailure_OldKept(t *testing.T) { ndb := newNodeDB(dbMock, 0, DefaultOptions(), log.NewNopLogger()) require.Equal(t, defaultStorageVersionValue, ndb.getStorageVersion()) - err := ndb.setFastStorageVersionToBatch(int64(expectedFastCacheVersion)) + err := ndb.SetFastStorageVersionToBatch(int64(expectedFastCacheVersion)) require.Error(t, err) require.Equal(t, expectedErrorMsg, err.Error()) require.Equal(t, defaultStorageVersionValue, ndb.getStorageVersion()) @@ -137,7 +137,7 @@ func TestSetStorageVersion_InvalidVersionFailure_OldKept(t *testing.T) { ndb := newNodeDB(dbMock, 0, DefaultOptions(), log.NewNopLogger()) require.Equal(t, invalidStorageVersion, ndb.getStorageVersion()) - err := ndb.setFastStorageVersionToBatch(0) + err := ndb.SetFastStorageVersionToBatch(0) require.Error(t, err) require.Equal(t, expectedErrorMsg, err) require.Equal(t, invalidStorageVersion, ndb.getStorageVersion()) @@ -149,7 +149,7 @@ func TestSetStorageVersion_FastVersionFirst_VersionAppended(t *testing.T) { ndb.storageVersion = fastStorageVersionValue ndb.latestVersion = 100 - err := ndb.setFastStorageVersionToBatch(ndb.latestVersion) + err := ndb.SetFastStorageVersionToBatch(ndb.latestVersion) require.NoError(t, err) require.Equal(t, fastStorageVersionValue+fastStorageVersionDelimiter+strconv.Itoa(int(ndb.latestVersion)), ndb.storageVersion) } @@ -163,7 +163,7 @@ func TestSetStorageVersion_FastVersionSecond_VersionAppended(t *testing.T) { storageVersionBytes[len(fastStorageVersionValue)-1]++ // increment last byte ndb.storageVersion = string(storageVersionBytes) - err := ndb.setFastStorageVersionToBatch(ndb.latestVersion) + err := ndb.SetFastStorageVersionToBatch(ndb.latestVersion) require.NoError(t, err) require.Equal(t, string(storageVersionBytes)+fastStorageVersionDelimiter+strconv.Itoa(int(ndb.latestVersion)), ndb.storageVersion) } @@ -177,12 +177,12 @@ func TestSetStorageVersion_SameVersionTwice(t *testing.T) { storageVersionBytes[len(fastStorageVersionValue)-1]++ // increment last byte ndb.storageVersion = string(storageVersionBytes) - err := ndb.setFastStorageVersionToBatch(ndb.latestVersion) + err := ndb.SetFastStorageVersionToBatch(ndb.latestVersion) require.NoError(t, err) newStorageVersion := string(storageVersionBytes) + fastStorageVersionDelimiter + strconv.Itoa(int(ndb.latestVersion)) require.Equal(t, newStorageVersion, ndb.storageVersion) - err = ndb.setFastStorageVersionToBatch(ndb.latestVersion) + err = ndb.SetFastStorageVersionToBatch(ndb.latestVersion) require.NoError(t, err) require.Equal(t, newStorageVersion, ndb.storageVersion) } @@ -402,7 +402,7 @@ func TestDeleteVersionsFromNoDeadlock(t *testing.T) { ndb := newNodeDB(db, 0, DefaultOptions(), log.NewNopLogger()) require.Equal(t, defaultStorageVersionValue, ndb.getStorageVersion()) - err := ndb.setFastStorageVersionToBatch(ndb.latestVersion) + err := ndb.SetFastStorageVersionToBatch(ndb.latestVersion) require.NoError(t, err) latestVersion, err := ndb.getLatestVersion() diff --git a/tree_test.go b/tree_test.go index cf71ff2cd..9c564b29a 100644 --- a/tree_test.go +++ b/tree_test.go @@ -802,8 +802,6 @@ func TestVersionedTreeSaveAndLoad(t *testing.T) { ntree.Set([]byte("T"), []byte("MhkWjkVy")) ntree.SaveVersion() - ntree.DeleteVersionsTo(1) - ntree.DeleteVersionsTo(4) ntree.DeleteVersionsTo(6) require.False(ntree.IsEmpty()) @@ -1823,3 +1821,67 @@ func TestNodeCacheStatisic(t *testing.T) { }) } } + +func TestEmptyVersionDelete(t *testing.T) { + db, err := dbm.NewDB("test", "memdb", "") + require.NoError(t, err) + defer db.Close() + + tree := NewMutableTree(db, 0, false, log.NewNopLogger()) + + _, err = tree.Set([]byte("key1"), []byte("value1")) + require.NoError(t, err) + + toVersion := 10 + for i := 0; i < toVersion; i++ { + _, _, err = tree.SaveVersion() + require.NoError(t, err) + } + + require.NoError(t, tree.DeleteVersionsTo(5)) + + // Load the tree from disk. + tree = NewMutableTree(db, 0, false, log.NewNopLogger()) + v, err := tree.Load() + require.NoError(t, err) + require.Equal(t, int64(toVersion), v) + // Version 1 is only meaningful, so it should not be deleted. + require.Equal(t, tree.root.GetKey(), (&NodeKey{version: 1, nonce: 0}).GetKey()) + // it is expected that the version 1 is deleted. + versions := tree.AvailableVersions() + require.Equal(t, 6, versions[0]) + require.Len(t, versions, 5) +} + +func TestReferenceRoot(t *testing.T) { + db, err := dbm.NewDB("test", "memdb", "") + require.NoError(t, err) + defer db.Close() + + tree := NewMutableTree(db, 0, false, log.NewNopLogger()) + + _, err = tree.Set([]byte("key1"), []byte("value1")) + require.NoError(t, err) + + _, err = tree.Set([]byte("key2"), []byte("value2")) + require.NoError(t, err) + + _, _, err = tree.SaveVersion() + require.NoError(t, err) + + _, _, err = tree.Remove([]byte("key1")) + require.NoError(t, err) + + // the root will be the leaf node of key2 + _, _, err = tree.SaveVersion() + require.NoError(t, err) + + // Load the tree from disk. + tree = NewMutableTree(db, 0, false, log.NewNopLogger()) + _, err = tree.Load() + require.NoError(t, err) + require.Equal(t, int64(2), tree.Version()) + // check the root of version 2 is the leaf node of key2 + require.Equal(t, tree.root.GetKey(), (&NodeKey{version: 1, nonce: 3}).GetKey()) + require.Equal(t, tree.root.key, []byte("key2")) +}