Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: backport the pruning fix PR to v1.1.x #910

Merged
merged 5 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 32 additions & 13 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
// around batch that flushes batch's data to disk
// as soon as the configurable limit is reached.
type BatchWithFlusher struct {
mtx sync.Mutex
db dbm.DB // This is only used to create new batch
batch dbm.Batch // Batched writing buffer.

mtx sync.Mutex
flushThreshold int // The threshold to flush the batch to disk.
}

Expand Down Expand Up @@ -57,13 +57,11 @@ func (b *BatchWithFlusher) Set(key, value []byte) error {
return err
}
if batchSizeAfter > b.flushThreshold {
if err := b.batch.Write(); err != nil {
b.mtx.Unlock()
if err := b.Write(); err != nil {
return err
}
if err := b.batch.Close(); err != nil {
return err
}
b.batch = b.db.NewBatchWithSize(b.flushThreshold)
b.mtx.Lock()
}
return b.batch.Set(key, value)
}
Expand All @@ -81,26 +79,47 @@ func (b *BatchWithFlusher) Delete(key []byte) error {
return err
}
if batchSizeAfter > b.flushThreshold {
if err := b.batch.Write(); err != nil {
return err
}
if err := b.batch.Close(); err != nil {
b.mtx.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how come there is a need for the extra mutexs?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is related to the legacy node pruning, it runs in background

if err := b.Write(); err != nil {
return err
}
b.batch = b.db.NewBatchWithSize(b.flushThreshold)
b.mtx.Lock()
}
return b.batch.Delete(key)
}

func (b *BatchWithFlusher) Write() error {
return b.batch.Write()
b.mtx.Lock()
defer b.mtx.Unlock()

if err := b.batch.Write(); err != nil {
return err
}
if err := b.batch.Close(); err != nil {
return err
}
b.batch = b.db.NewBatchWithSize(b.flushThreshold)
return nil
}

func (b *BatchWithFlusher) WriteSync() error {
return b.batch.WriteSync()
b.mtx.Lock()
defer b.mtx.Unlock()

if err := b.batch.WriteSync(); err != nil {
return err
}
if err := b.batch.Close(); err != nil {
return err
}
b.batch = b.db.NewBatchWithSize(b.flushThreshold)
return nil
}

func (b *BatchWithFlusher) Close() error {
b.mtx.Lock()
defer b.mtx.Unlock()

return b.batch.Close()
}

Expand Down
2 changes: 1 addition & 1 deletion import.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (i *Importer) Commit() error {
return err
}
if i.stack[0].nodeKey.version < i.version { // it means there is no update in the given version
if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), i.tree.ndb.nodeKeyPrefix(i.stack[0].nodeKey.version)); err != nil {
if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), i.tree.ndb.nodeKey(i.stack[0].nodeKey.GetKey())); err != nil {
return err
}
}
Expand Down
156 changes: 147 additions & 9 deletions migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ import (
"os/exec"
"path"
"testing"
"time"

"cosmossdk.io/log"
"github.com/stretchr/testify/require"

dbm "github.com/cosmos/iavl/db"
iavlrand "github.com/cosmos/iavl/internal/rand"
)

const (
dbType = "goleveldb"
)

func createLegacyTree(t *testing.T, dbType, dbDir string, version int) (string, error) {
func createLegacyTree(t *testing.T, dbDir string, version int) (string, error) {
relateDir := path.Join(t.TempDir(), dbDir)
if _, err := os.Stat(relateDir); err == nil {
err := os.RemoveAll(relateDir)
Expand Down Expand Up @@ -48,10 +50,10 @@ func createLegacyTree(t *testing.T, dbType, dbDir string, version int) (string,
func TestLazySet(t *testing.T) {
legacyVersion := 1000
dbDir := fmt.Sprintf("legacy-%s-%d", dbType, legacyVersion)
relateDir, err := createLegacyTree(t, dbType, dbDir, legacyVersion)
relateDir, err := createLegacyTree(t, dbDir, legacyVersion)
require.NoError(t, err)

db, err := dbm.NewDB("test", "goleveldb", relateDir)
db, err := dbm.NewDB("test", dbType, relateDir)
require.NoError(t, err)

defer func() {
Expand Down Expand Up @@ -91,10 +93,10 @@ func TestLazySet(t *testing.T) {
func TestLegacyReferenceNode(t *testing.T) {
legacyVersion := 20
dbDir := fmt.Sprintf("./legacy-%s-%d", dbType, legacyVersion)
relateDir, err := createLegacyTree(t, dbType, dbDir, legacyVersion)
relateDir, err := createLegacyTree(t, dbDir, legacyVersion)
require.NoError(t, err)

db, err := dbm.NewDB("test", "goleveldb", relateDir)
db, err := dbm.NewDB("test", dbType, relateDir)
require.NoError(t, err)

defer func() {
Expand All @@ -111,6 +113,7 @@ func TestLegacyReferenceNode(t *testing.T) {
// Load the latest legacy version
_, err = tree.LoadVersion(int64(legacyVersion))
require.NoError(t, err)
legacyLatestVersion := tree.root.nodeKey.version

// Commit new versions without updates
_, _, err = tree.SaveVersion()
Expand All @@ -123,17 +126,17 @@ func TestLegacyReferenceNode(t *testing.T) {
_, err = newTree.LoadVersion(version - 1)
require.NoError(t, err)
// Check if the reference node is refactored
require.Equal(t, newTree.root.nodeKey.nonce, uint32(1))
require.Equal(t, newTree.root.nodeKey.version, int64(legacyVersion))
require.Equal(t, newTree.root.nodeKey.nonce, uint32(0))
require.Equal(t, newTree.root.nodeKey.version, legacyLatestVersion)
}

func TestDeleteVersions(t *testing.T) {
legacyVersion := 100
dbDir := fmt.Sprintf("./legacy-%s-%d", dbType, legacyVersion)
relateDir, err := createLegacyTree(t, dbType, dbDir, legacyVersion)
relateDir, err := createLegacyTree(t, dbDir, legacyVersion)
require.NoError(t, err)

db, err := dbm.NewDB("test", "goleveldb", relateDir)
db, err := dbm.NewDB("test", dbType, relateDir)
require.NoError(t, err)

defer func() {
Expand Down Expand Up @@ -205,3 +208,138 @@ func TestDeleteVersions(t *testing.T) {
pVersions = tree.AvailableVersions()
require.Equal(t, postVersions/2, len(pVersions))
}

func TestPruning(t *testing.T) {
legacyVersion := 100
dbDir := fmt.Sprintf("./legacy-%s-%d", dbType, legacyVersion)
relateDir, err := createLegacyTree(t, dbDir, legacyVersion)
require.NoError(t, err)

db, err := dbm.NewDB("test", dbType, relateDir)
require.NoError(t, err)

defer func() {
if err := db.Close(); err != nil {
t.Errorf("DB close error: %v\n", err)
}
if err := os.RemoveAll(relateDir); err != nil {
t.Errorf("%+v\n", err)
}
}()

// Load the latest version
tree := NewMutableTree(db, 1000, false, log.NewNopLogger())
_, err = tree.Load()
require.NoError(t, err)

// Save 10 versions without updates
for i := 0; i < 10; i++ {
_, _, err = tree.SaveVersion()
require.NoError(t, err)
}

// Save 990 versions
leavesCount := 10
toVersion := int64(990)
pruningInterval := int64(20)
for i := int64(0); i < toVersion; i++ {
for j := 0; j < leavesCount; j++ {
_, err := tree.Set([]byte(fmt.Sprintf("key%d", j)), []byte(fmt.Sprintf("value%d", j)))
require.NoError(t, err)
}
_, v, err := tree.SaveVersion()
require.NoError(t, err)
if v%pruningInterval == 0 {
err = tree.DeleteVersionsTo(v - pruningInterval/2)
require.NoError(t, err)
}
}

// Wait for pruning to finish
for i := 0; i < 200; i++ {
_, _, err := tree.SaveVersion()
require.NoError(t, err)
isLeacy, err := tree.ndb.hasLegacyVersion(int64(legacyVersion))
require.NoError(t, err)
if !isLeacy {
break
}
// Simulate the consensus state update
time.Sleep(100 * time.Millisecond)
}
// Reload the tree
tree = NewMutableTree(db, 0, false, log.NewNopLogger())
versions := tree.AvailableVersions()
require.Equal(t, versions[0], int(toVersion)+legacyVersion+1)
for _, v := range versions {
_, err := tree.LoadVersion(int64(v))
require.NoError(t, err)
}
// Check if the legacy nodes are pruned
_, err = tree.Load()
require.NoError(t, err)
itr, err := NewNodeIterator(tree.root.GetKey(), tree.ndb)
require.NoError(t, err)
legacyNodes := make(map[string]*Node)
for ; itr.Valid(); itr.Next(false) {
node := itr.GetNode()
if node.nodeKey.nonce == 0 {
legacyNodes[string(node.hash)] = node
}
}

lNodes, err := tree.ndb.legacyNodes()
require.NoError(t, err)
require.Len(t, lNodes, len(legacyNodes))
for _, node := range lNodes {
_, ok := legacyNodes[string(node.hash)]
require.True(t, ok)
}
}

func TestRandomSet(t *testing.T) {
legacyVersion := 50
dbDir := fmt.Sprintf("./legacy-%s-%d", dbType, legacyVersion)
relateDir, err := createLegacyTree(t, dbDir, legacyVersion)
require.NoError(t, err)

db, err := dbm.NewDB("test", dbType, relateDir)
require.NoError(t, err)

defer func() {
if err := db.Close(); err != nil {
t.Errorf("DB close error: %v\n", err)
}
if err := os.RemoveAll(relateDir); err != nil {
t.Errorf("%+v\n", err)
}
}()

tree := NewMutableTree(db, 10000, false, log.NewNopLogger())

// Load the latest legacy version
_, err = tree.LoadVersion(int64(legacyVersion))
require.NoError(t, err)

// Commit new versions
postVersions := 1000
emptyVersions := 10
for i := 0; i < emptyVersions; i++ {
_, _, err := tree.SaveVersion()
require.NoError(t, err)
}
for i := 0; i < postVersions-emptyVersions; i++ {
leafCount := rand.Intn(50)
for j := 0; j < leafCount; j++ {
key := iavlrand.RandBytes(10)
value := iavlrand.RandBytes(10)
_, err = tree.Set(key, value)
require.NoError(t, err)
}
_, _, err = tree.SaveVersion()
require.NoError(t, err)
}

err = tree.DeleteVersionsTo(int64(legacyVersion + postVersions - 1))
require.NoError(t, err)
}
17 changes: 9 additions & 8 deletions mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,8 @@ func (tree *MutableTree) LoadVersion(targetVersion int64) (int64, error) {

if firstVersion == 0 {
if targetVersion <= 0 {
tree.version = int64(tree.ndb.opts.InitialVersion)

if !tree.skipFastStorageUpgrade {
tree.mtx.Lock()
defer tree.mtx.Unlock()
Expand Down Expand Up @@ -624,7 +626,7 @@ func (tree *MutableTree) enableFastStorageAndCommit() error {
return err
}

if err = tree.ndb.setFastStorageVersionToBatch(latestVersion); err != nil {
if err = tree.ndb.SetFastStorageVersionToBatch(latestVersion); err != nil {
return err
}

Expand Down Expand Up @@ -756,15 +758,15 @@ func (tree *MutableTree) SaveVersion() ([]byte, int64, error) {
}
} else {
if tree.root.nodeKey != nil {
// it means there is no updated
if err := tree.ndb.SaveRoot(version, tree.root.nodeKey.version); err != nil {
// it means there are no updated nodes
if err := tree.ndb.SaveRoot(version, tree.root.nodeKey); err != nil {
return nil, 0, err
}
// it means the reference node is a legacy node
if tree.root.nodeKey.nonce == 0 {
if tree.root.isLegacy {
// it will update the legacy node to the new format
// which ensures the reference node is not a legacy node
tree.root.nodeKey.nonce = 1
tree.root.isLegacy = false
if err := tree.ndb.SaveNode(tree.root); err != nil {
return nil, 0, fmt.Errorf("failed to save the reference legacy node: %w", err)
}
Expand Down Expand Up @@ -801,7 +803,7 @@ func (tree *MutableTree) saveFastNodeVersion(latestVersion int64, isGenesis bool
if err := tree.saveFastNodeRemovals(); err != nil {
return err
}
return tree.ndb.setFastStorageVersionToBatch(latestVersion)
return tree.ndb.SetFastStorageVersionToBatch(latestVersion)
}

func (tree *MutableTree) getUnsavedFastNodeAdditions() map[string]*fastnode.Node {
Expand Down Expand Up @@ -884,8 +886,7 @@ func (tree *MutableTree) SetInitialVersion(version uint64) {
}

// DeleteVersionsTo removes versions upto the given version from the MutableTree.
// An error is returned if any single version has active readers.
// All writes happen in a single batch with a single commit.
// It will not block the SaveVersion() call, instead it will be queued and executed deferred.
func (tree *MutableTree) DeleteVersionsTo(toVersion int64) error {
if err := tree.ndb.DeleteVersionsTo(toVersion); err != nil {
return err
Expand Down
Loading
Loading