Skip to content

Commit

Permalink
chore: backport the pruning fix PR to v1.1.x (#910)
Browse files Browse the repository at this point in the history
  • Loading branch information
cool-develope authored Mar 14, 2024
1 parent 04a0b44 commit ca1d22d
Show file tree
Hide file tree
Showing 9 changed files with 489 additions and 200 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Changelog

## v1.1.1 March 13, 2024

### Bug Fixes

- [#910](https://github.com/cosmos/iavl/pull/910) Fix the reference root format from (prefix, version) to (prefix, version, nonce)

### Improvements

- [#910](https://github.com/cosmos/iavl/pull/910) Async pruning of legacy orphan nodes.


## v1.1.0 February 29, 2024

### API Breaking Changes
Expand Down
45 changes: 32 additions & 13 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
// around batch that flushes batch's data to disk
// as soon as the configurable limit is reached.
type BatchWithFlusher struct {
mtx sync.Mutex
db dbm.DB // This is only used to create new batch
batch dbm.Batch // Batched writing buffer.

mtx sync.Mutex
flushThreshold int // The threshold to flush the batch to disk.
}

Expand Down Expand Up @@ -57,13 +57,11 @@ func (b *BatchWithFlusher) Set(key, value []byte) error {
return err
}
if batchSizeAfter > b.flushThreshold {
if err := b.batch.Write(); err != nil {
b.mtx.Unlock()
if err := b.Write(); err != nil {
return err
}
if err := b.batch.Close(); err != nil {
return err
}
b.batch = b.db.NewBatchWithSize(b.flushThreshold)
b.mtx.Lock()
}
return b.batch.Set(key, value)
}
Expand All @@ -81,26 +79,47 @@ func (b *BatchWithFlusher) Delete(key []byte) error {
return err
}
if batchSizeAfter > b.flushThreshold {
if err := b.batch.Write(); err != nil {
return err
}
if err := b.batch.Close(); err != nil {
b.mtx.Unlock()
if err := b.Write(); err != nil {
return err
}
b.batch = b.db.NewBatchWithSize(b.flushThreshold)
b.mtx.Lock()
}
return b.batch.Delete(key)
}

func (b *BatchWithFlusher) Write() error {
return b.batch.Write()
b.mtx.Lock()
defer b.mtx.Unlock()

if err := b.batch.Write(); err != nil {
return err
}
if err := b.batch.Close(); err != nil {
return err
}
b.batch = b.db.NewBatchWithSize(b.flushThreshold)
return nil
}

func (b *BatchWithFlusher) WriteSync() error {
return b.batch.WriteSync()
b.mtx.Lock()
defer b.mtx.Unlock()

if err := b.batch.WriteSync(); err != nil {
return err
}
if err := b.batch.Close(); err != nil {
return err
}
b.batch = b.db.NewBatchWithSize(b.flushThreshold)
return nil
}

func (b *BatchWithFlusher) Close() error {
b.mtx.Lock()
defer b.mtx.Unlock()

return b.batch.Close()
}

Expand Down
2 changes: 1 addition & 1 deletion import.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (i *Importer) Commit() error {
return err
}
if i.stack[0].nodeKey.version < i.version { // it means there is no update in the given version
if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), i.tree.ndb.nodeKeyPrefix(i.stack[0].nodeKey.version)); err != nil {
if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), i.tree.ndb.nodeKey(i.stack[0].nodeKey.GetKey())); err != nil {
return err
}
}
Expand Down
156 changes: 147 additions & 9 deletions migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ import (
"os/exec"
"path"
"testing"
"time"

"cosmossdk.io/log"
"github.com/stretchr/testify/require"

dbm "github.com/cosmos/iavl/db"
iavlrand "github.com/cosmos/iavl/internal/rand"
)

const (
dbType = "goleveldb"
)

func createLegacyTree(t *testing.T, dbType, dbDir string, version int) (string, error) {
func createLegacyTree(t *testing.T, dbDir string, version int) (string, error) {
relateDir := path.Join(t.TempDir(), dbDir)
if _, err := os.Stat(relateDir); err == nil {
err := os.RemoveAll(relateDir)
Expand Down Expand Up @@ -48,10 +50,10 @@ func createLegacyTree(t *testing.T, dbType, dbDir string, version int) (string,
func TestLazySet(t *testing.T) {
legacyVersion := 1000
dbDir := fmt.Sprintf("legacy-%s-%d", dbType, legacyVersion)
relateDir, err := createLegacyTree(t, dbType, dbDir, legacyVersion)
relateDir, err := createLegacyTree(t, dbDir, legacyVersion)
require.NoError(t, err)

db, err := dbm.NewDB("test", "goleveldb", relateDir)
db, err := dbm.NewDB("test", dbType, relateDir)
require.NoError(t, err)

defer func() {
Expand Down Expand Up @@ -91,10 +93,10 @@ func TestLazySet(t *testing.T) {
func TestLegacyReferenceNode(t *testing.T) {
legacyVersion := 20
dbDir := fmt.Sprintf("./legacy-%s-%d", dbType, legacyVersion)
relateDir, err := createLegacyTree(t, dbType, dbDir, legacyVersion)
relateDir, err := createLegacyTree(t, dbDir, legacyVersion)
require.NoError(t, err)

db, err := dbm.NewDB("test", "goleveldb", relateDir)
db, err := dbm.NewDB("test", dbType, relateDir)
require.NoError(t, err)

defer func() {
Expand All @@ -111,6 +113,7 @@ func TestLegacyReferenceNode(t *testing.T) {
// Load the latest legacy version
_, err = tree.LoadVersion(int64(legacyVersion))
require.NoError(t, err)
legacyLatestVersion := tree.root.nodeKey.version

// Commit new versions without updates
_, _, err = tree.SaveVersion()
Expand All @@ -123,17 +126,17 @@ func TestLegacyReferenceNode(t *testing.T) {
_, err = newTree.LoadVersion(version - 1)
require.NoError(t, err)
// Check if the reference node is refactored
require.Equal(t, newTree.root.nodeKey.nonce, uint32(1))
require.Equal(t, newTree.root.nodeKey.version, int64(legacyVersion))
require.Equal(t, newTree.root.nodeKey.nonce, uint32(0))
require.Equal(t, newTree.root.nodeKey.version, legacyLatestVersion)
}

func TestDeleteVersions(t *testing.T) {
legacyVersion := 100
dbDir := fmt.Sprintf("./legacy-%s-%d", dbType, legacyVersion)
relateDir, err := createLegacyTree(t, dbType, dbDir, legacyVersion)
relateDir, err := createLegacyTree(t, dbDir, legacyVersion)
require.NoError(t, err)

db, err := dbm.NewDB("test", "goleveldb", relateDir)
db, err := dbm.NewDB("test", dbType, relateDir)
require.NoError(t, err)

defer func() {
Expand Down Expand Up @@ -205,3 +208,138 @@ func TestDeleteVersions(t *testing.T) {
pVersions = tree.AvailableVersions()
require.Equal(t, postVersions/2, len(pVersions))
}

func TestPruning(t *testing.T) {
legacyVersion := 100
dbDir := fmt.Sprintf("./legacy-%s-%d", dbType, legacyVersion)
relateDir, err := createLegacyTree(t, dbDir, legacyVersion)
require.NoError(t, err)

db, err := dbm.NewDB("test", dbType, relateDir)
require.NoError(t, err)

defer func() {
if err := db.Close(); err != nil {
t.Errorf("DB close error: %v\n", err)
}
if err := os.RemoveAll(relateDir); err != nil {
t.Errorf("%+v\n", err)
}
}()

// Load the latest version
tree := NewMutableTree(db, 1000, false, log.NewNopLogger())
_, err = tree.Load()
require.NoError(t, err)

// Save 10 versions without updates
for i := 0; i < 10; i++ {
_, _, err = tree.SaveVersion()
require.NoError(t, err)
}

// Save 990 versions
leavesCount := 10
toVersion := int64(990)
pruningInterval := int64(20)
for i := int64(0); i < toVersion; i++ {
for j := 0; j < leavesCount; j++ {
_, err := tree.Set([]byte(fmt.Sprintf("key%d", j)), []byte(fmt.Sprintf("value%d", j)))
require.NoError(t, err)
}
_, v, err := tree.SaveVersion()
require.NoError(t, err)
if v%pruningInterval == 0 {
err = tree.DeleteVersionsTo(v - pruningInterval/2)
require.NoError(t, err)
}
}

// Wait for pruning to finish
for i := 0; i < 100; i++ {
_, _, err := tree.SaveVersion()
require.NoError(t, err)
isLeacy, err := tree.ndb.hasLegacyVersion(int64(legacyVersion))
require.NoError(t, err)
if !isLeacy {
break
}
// Simulate the consensus state update
time.Sleep(500 * time.Millisecond)
}
// Reload the tree
tree = NewMutableTree(db, 0, false, log.NewNopLogger())
versions := tree.AvailableVersions()
require.Equal(t, versions[0], int(toVersion)+legacyVersion+1)
for _, v := range versions {
_, err := tree.LoadVersion(int64(v))
require.NoError(t, err)
}
// Check if the legacy nodes are pruned
_, err = tree.Load()
require.NoError(t, err)
itr, err := NewNodeIterator(tree.root.GetKey(), tree.ndb)
require.NoError(t, err)
legacyNodes := make(map[string]*Node)
for ; itr.Valid(); itr.Next(false) {
node := itr.GetNode()
if node.nodeKey.nonce == 0 {
legacyNodes[string(node.hash)] = node
}
}

lNodes, err := tree.ndb.legacyNodes()
require.NoError(t, err)
require.Len(t, lNodes, len(legacyNodes))
for _, node := range lNodes {
_, ok := legacyNodes[string(node.hash)]
require.True(t, ok)
}
}

func TestRandomSet(t *testing.T) {
legacyVersion := 50
dbDir := fmt.Sprintf("./legacy-%s-%d", dbType, legacyVersion)
relateDir, err := createLegacyTree(t, dbDir, legacyVersion)
require.NoError(t, err)

db, err := dbm.NewDB("test", dbType, relateDir)
require.NoError(t, err)

defer func() {
if err := db.Close(); err != nil {
t.Errorf("DB close error: %v\n", err)
}
if err := os.RemoveAll(relateDir); err != nil {
t.Errorf("%+v\n", err)
}
}()

tree := NewMutableTree(db, 10000, false, log.NewNopLogger())

// Load the latest legacy version
_, err = tree.LoadVersion(int64(legacyVersion))
require.NoError(t, err)

// Commit new versions
postVersions := 1000
emptyVersions := 10
for i := 0; i < emptyVersions; i++ {
_, _, err := tree.SaveVersion()
require.NoError(t, err)
}
for i := 0; i < postVersions-emptyVersions; i++ {
leafCount := rand.Intn(50)
for j := 0; j < leafCount; j++ {
key := iavlrand.RandBytes(10)
value := iavlrand.RandBytes(10)
_, err = tree.Set(key, value)
require.NoError(t, err)
}
_, _, err = tree.SaveVersion()
require.NoError(t, err)
}

err = tree.DeleteVersionsTo(int64(legacyVersion + postVersions - 1))
require.NoError(t, err)
}
15 changes: 7 additions & 8 deletions mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ func (tree *MutableTree) enableFastStorageAndCommit() error {
return err
}

if err = tree.ndb.setFastStorageVersionToBatch(latestVersion); err != nil {
if err = tree.ndb.SetFastStorageVersionToBatch(latestVersion); err != nil {
return err
}

Expand Down Expand Up @@ -756,15 +756,15 @@ func (tree *MutableTree) SaveVersion() ([]byte, int64, error) {
}
} else {
if tree.root.nodeKey != nil {
// it means there is no updated
if err := tree.ndb.SaveRoot(version, tree.root.nodeKey.version); err != nil {
// it means there are no updated nodes
if err := tree.ndb.SaveRoot(version, tree.root.nodeKey); err != nil {
return nil, 0, err
}
// it means the reference node is a legacy node
if tree.root.nodeKey.nonce == 0 {
if tree.root.isLegacy {
// it will update the legacy node to the new format
// which ensures the reference node is not a legacy node
tree.root.nodeKey.nonce = 1
tree.root.isLegacy = false
if err := tree.ndb.SaveNode(tree.root); err != nil {
return nil, 0, fmt.Errorf("failed to save the reference legacy node: %w", err)
}
Expand Down Expand Up @@ -801,7 +801,7 @@ func (tree *MutableTree) saveFastNodeVersion(latestVersion int64, isGenesis bool
if err := tree.saveFastNodeRemovals(); err != nil {
return err
}
return tree.ndb.setFastStorageVersionToBatch(latestVersion)
return tree.ndb.SetFastStorageVersionToBatch(latestVersion)
}

func (tree *MutableTree) getUnsavedFastNodeAdditions() map[string]*fastnode.Node {
Expand Down Expand Up @@ -884,8 +884,7 @@ func (tree *MutableTree) SetInitialVersion(version uint64) {
}

// DeleteVersionsTo removes versions upto the given version from the MutableTree.
// An error is returned if any single version has active readers.
// All writes happen in a single batch with a single commit.
// It will not block the SaveVersion() call, instead it will be queued and executed deferred.
func (tree *MutableTree) DeleteVersionsTo(toVersion int64) error {
if err := tree.ndb.DeleteVersionsTo(toVersion); err != nil {
return err
Expand Down
Loading

0 comments on commit ca1d22d

Please sign in to comment.