Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stream): improve incremental stream writer #1901

Merged
merged 1 commit into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ func (sw *StreamWriter) PrepareIncremental() error {
}
sw.done = func() { once.Do(done) }

mts, decr := sw.db.getMemTables()
defer decr()
for _, m := range mts {
if !m.sl.Empty() {
return fmt.Errorf("Unable to do incremental writes because MemTable has data")
}
}

isEmptyDB := true
for _, level := range sw.db.Levels() {
if level.NumTables > 0 {
Expand All @@ -117,7 +125,13 @@ func (sw *StreamWriter) PrepareIncremental() error {
return nil
}
if sw.prevLevel == 0 {
return fmt.Errorf("Unable to do incremental writes because L0 has data")
// It seems that data is present in all levels from Lmax to L0. If we call flatten
// on the tree, all the data will go to Lmax. All the levels above will be empty
// after flatten call. Now, we should be able to use incremental stream writer again.
if err := sw.db.Flatten(3); err != nil {
mangalaman93 marked this conversation as resolved.
Show resolved Hide resolved
return errors.Wrapf(err, "error during flatten in StreamWriter")
}
sw.prevLevel = len(sw.db.Levels()) - 1
}
return nil
}
Expand Down
87 changes: 81 additions & 6 deletions stream_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ func TestStreamWriterWithLargeValue(t *testing.T) {
}

func TestStreamWriterIncremental(t *testing.T) {
addIncremtal := func(t *testing.T, db *DB, keys [][]byte) {
addIncremental := func(t *testing.T, db *DB, keys [][]byte) {
buf := z.NewBuffer(10<<20, "test")
defer func() { require.NoError(t, buf.Release()) }()
for _, key := range keys {
Expand Down Expand Up @@ -633,7 +633,7 @@ func TestStreamWriterIncremental(t *testing.T) {
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")

addIncremtal(t, db, [][]byte{[]byte("key-2")})
addIncremental(t, db, [][]byte{[]byte("key-2")})

txn := db.NewTransaction(false)
defer txn.Discard()
Expand All @@ -646,7 +646,7 @@ func TestStreamWriterIncremental(t *testing.T) {

t.Run("incremental on empty DB", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
addIncremtal(t, db, [][]byte{[]byte("key-1")})
addIncremental(t, db, [][]byte{[]byte("key-1")})
txn := db.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get([]byte("key-1"))
Expand All @@ -656,9 +656,9 @@ func TestStreamWriterIncremental(t *testing.T) {

t.Run("multiple incremental", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
addIncremtal(t, db, [][]byte{[]byte("a1"), []byte("c1")})
addIncremtal(t, db, [][]byte{[]byte("a2"), []byte("c2")})
addIncremtal(t, db, [][]byte{[]byte("a3"), []byte("c3")})
addIncremental(t, db, [][]byte{[]byte("a1"), []byte("c1")})
addIncremental(t, db, [][]byte{[]byte("a2"), []byte("c2")})
addIncremental(t, db, [][]byte{[]byte("a3"), []byte("c3")})
txn := db.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get([]byte("a1"))
Expand All @@ -675,4 +675,79 @@ func TestStreamWriterIncremental(t *testing.T) {
require.NoError(t, err)
})
})

t.Run("write between incremental writes", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
addIncremental(t, db, [][]byte{[]byte("a1"), []byte("c1")})
require.NoError(t, db.Update(func(txn *Txn) error {
return txn.Set([]byte("a3"), []byte("c3"))
}))

sw := db.NewStreamWriter()
defer sw.Cancel()
require.EqualError(t, sw.PrepareIncremental(), "Unable to do incremental writes because MemTable has data")

txn := db.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get([]byte("a1"))
require.NoError(t, err)
_, err = txn.Get([]byte("c1"))
require.NoError(t, err)
_, err = txn.Get([]byte("a3"))
require.NoError(t, err)
})
})

t.Run("incremental writes > #levels", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
addIncremental(t, db, [][]byte{[]byte("a1"), []byte("c1")})
addIncremental(t, db, [][]byte{[]byte("a2"), []byte("c2")})
addIncremental(t, db, [][]byte{[]byte("a3"), []byte("c3")})
addIncremental(t, db, [][]byte{[]byte("a4"), []byte("c4")})
addIncremental(t, db, [][]byte{[]byte("a5"), []byte("c5")})
addIncremental(t, db, [][]byte{[]byte("a6"), []byte("c6")})
addIncremental(t, db, [][]byte{[]byte("a7"), []byte("c7")})
addIncremental(t, db, [][]byte{[]byte("a8"), []byte("c8")})
addIncremental(t, db, [][]byte{[]byte("a9"), []byte("c9")})

txn := db.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get([]byte("a1"))
require.NoError(t, err)
_, err = txn.Get([]byte("c1"))
require.NoError(t, err)
_, err = txn.Get([]byte("a2"))
require.NoError(t, err)
_, err = txn.Get([]byte("c2"))
require.NoError(t, err)
_, err = txn.Get([]byte("a3"))
require.NoError(t, err)
_, err = txn.Get([]byte("c3"))
require.NoError(t, err)
_, err = txn.Get([]byte("a4"))
require.NoError(t, err)
_, err = txn.Get([]byte("c4"))
require.NoError(t, err)
_, err = txn.Get([]byte("a5"))
require.NoError(t, err)
_, err = txn.Get([]byte("c5"))
require.NoError(t, err)
_, err = txn.Get([]byte("a6"))
require.NoError(t, err)
_, err = txn.Get([]byte("c6"))
require.NoError(t, err)
_, err = txn.Get([]byte("a7"))
require.NoError(t, err)
_, err = txn.Get([]byte("c7"))
require.NoError(t, err)
_, err = txn.Get([]byte("a8"))
require.NoError(t, err)
_, err = txn.Get([]byte("c8"))
require.NoError(t, err)
_, err = txn.Get([]byte("a9"))
require.NoError(t, err)
_, err = txn.Get([]byte("c9"))
require.NoError(t, err)
})
})
}