Skip to content

Commit

Permalink
feat(stream): add support for incremental stream writer (#1722)
Browse files Browse the repository at this point in the history
This PR adds support for stream writing incrementally to the DB.
Adds an API: StreamWriter.PrepareIncremental

This PR also has the bug fix from PR #1723.
  • Loading branch information
mangalaman93 committed Mar 6, 2023
1 parent eae48ae commit a9a5761
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 7 deletions.
1 change: 1 addition & 0 deletions badger/cmd/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func stream(cmd *cobra.Command, args []string) error {
WithValueDir(so.outDir).
WithNumVersionsToKeep(so.numVersions).
WithCompression(options.CompressionType(so.compressionType)).
WithEncryptionKey(encKey).
WithReadOnly(false)
err = inDB.StreamDB(outOpt)

Expand Down
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1623,7 +1623,7 @@ func (db *DB) prepareToDrop() (func(), error) {
// write it to db. Then, flush all the pending memtable. So that, we
// don't miss any entries.
if err := db.blockWrite(); err != nil {
return nil, err
return func() {}, err
}
reqs := make([]*request, 0, 10)
for {
Expand Down
64 changes: 58 additions & 6 deletions stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type StreamWriter struct {
throttle *y.Throttle
maxVersion uint64
writers map[uint32]*sortedWriter
prevLevel int
}

// NewStreamWriter creates a StreamWriter. Right after creating StreamWriter, Prepare must be
Expand All @@ -67,18 +68,58 @@ func (db *DB) NewStreamWriter() *StreamWriter {
// Prepare should be called before writing any entry to StreamWriter. It deletes all data present in
// existing DB, stops compactions and any writes being done by other means. Be very careful when
// calling Prepare, because it could result in permanent data loss. Not calling Prepare would result
// in a corrupt Badger instance.
// in a corrupt Badger instance. Use PrepareIncremental to do incremental stream write.
func (sw *StreamWriter) Prepare() error {
sw.writeLock.Lock()
defer sw.writeLock.Unlock()

done, err := sw.db.dropAll()
// Ensure that done() is never called more than once.
var once sync.Once
sw.done = func() { once.Do(done) }
return err
}

// PrepareIncremental should be called before writing any entry to StreamWriter incrementally.
// In incremental stream write, the tables are written at one level above the current base level.
func (sw *StreamWriter) PrepareIncremental() error {
sw.writeLock.Lock()
defer sw.writeLock.Unlock()

// Ensure that done() is never called more than once.
var once sync.Once

// prepareToDrop will stop all the incoming writes and process any pending flush tasks.
// Before we start writing, we'll stop the compactions because no one else should be writing to
// the same level as the stream writer is writing to.
f, err := sw.db.prepareToDrop()
if err != nil {
sw.done = func() { once.Do(f) }
return err
}
sw.db.stopCompactions()
done := func() {
sw.db.startCompactions()
f()
}
sw.done = func() { once.Do(done) }

return err
isEmptyDB := true
for _, level := range sw.db.Levels() {
if level.NumTables > 0 {
sw.prevLevel = level.Level
isEmptyDB = false
break
}
}
if isEmptyDB {
// If DB is empty, we should allow doing incremental stream write.
return nil
}
if sw.prevLevel == 0 {
return fmt.Errorf("Unable to do incremental writes because L0 has data")
}
return nil
}

// Write writes KVList to DB. Each KV within the list contains the stream id which StreamWriter
Expand Down Expand Up @@ -110,16 +151,25 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error {
panic(fmt.Sprintf("write performed on closed stream: %d", kv.StreamId))
}

sw.writeLock.Lock()
if sw.maxVersion < kv.Version {
sw.maxVersion = kv.Version
}
if sw.prevLevel == 0 {
// If prevLevel is 0, that means that we have not written anything yet.
// So, we can write to the maxLevel. newWriter writes to prevLevel - 1,
// so we can set prevLevel to len(levels).
sw.prevLevel = len(sw.db.lc.levels)
}
sw.writeLock.Unlock()

var meta, userMeta byte
if len(kv.Meta) > 0 {
meta = kv.Meta[0]
}
if len(kv.UserMeta) > 0 {
userMeta = kv.UserMeta[0]
}
if sw.maxVersion < kv.Version {
sw.maxVersion = kv.Version
}
e := &Entry{
Key: y.KeyWithTs(kv.Key, kv.Version),
Value: y.Copy(kv.Value),
Expand Down Expand Up @@ -285,6 +335,7 @@ type sortedWriter struct {

builder *table.Builder
lastKey []byte
level int
streamID uint32
reqCh chan *request
// Have separate closer for each writer, as it can be closed at any time.
Expand All @@ -304,6 +355,7 @@ func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) {
builder: table.NewTableBuilder(bopts),
reqCh: make(chan *request, 3),
closer: z.NewCloser(1),
level: sw.prevLevel - 1, // Write at the level just above the one we were writing to.
}

go w.handleRequests()
Expand Down Expand Up @@ -435,7 +487,7 @@ func (w *sortedWriter) createTable(builder *table.Builder) error {
}
lc := w.db.lc

lhandler := lc.levels[len(lc.levels)-1]
lhandler := lc.levels[w.level]
// Now that table can be opened successfully, let's add this to the MANIFEST.
change := &pb.ManifestChange{
Id: tbl.ID(),
Expand Down
99 changes: 99 additions & 0 deletions stream_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,3 +577,102 @@ func TestStreamWriterEncrypted(t *testing.T) {
require.NoError(t, db.Close())

}

// Test that stream writer does not crashes with large values in managed mode.
func TestStreamWriterWithLargeValue(t *testing.T) {
opts := DefaultOptions("")
opts.managedTxns = true
runBadgerTest(t, &opts, func(t *testing.T, db *DB) {
buf := z.NewBuffer(10<<20, "test")
defer func() { require.NoError(t, buf.Release()) }()
val := make([]byte, 10<<20)
_, err := rand.Read(val)
require.NoError(t, err)
KVToBuffer(&pb.KV{
Key: []byte("key"),
Value: val,
Version: 1,
}, buf)

sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")
})
}

func TestStreamWriterIncremental(t *testing.T) {
addIncremtal := func(t *testing.T, db *DB, keys [][]byte) {
buf := z.NewBuffer(10<<20, "test")
defer func() { require.NoError(t, buf.Release()) }()
for _, key := range keys {
KVToBuffer(&pb.KV{
Key: key,
Value: []byte("val"),
Version: 1,
}, buf)
}
// Now do an incremental stream write.
sw := db.NewStreamWriter()
require.NoError(t, sw.PrepareIncremental(), "sw.PrepareIncremental() failed")
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")
}

t.Run("incremental on non-empty DB", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
buf := z.NewBuffer(10<<20, "test")
defer func() { require.NoError(t, buf.Release()) }()
KVToBuffer(&pb.KV{
Key: []byte("key-1"),
Value: []byte("val"),
Version: 1,
}, buf)
sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")

addIncremtal(t, db, [][]byte{[]byte("key-2")})

txn := db.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get([]byte("key-1"))
require.NoError(t, err)
_, err = txn.Get([]byte("key-2"))
require.NoError(t, err)
})
})

t.Run("incremental on empty DB", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
addIncremtal(t, db, [][]byte{[]byte("key-1")})
txn := db.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get([]byte("key-1"))
require.NoError(t, err)
})
})

t.Run("multiple incremental", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
addIncremtal(t, db, [][]byte{[]byte("a1"), []byte("c1")})
addIncremtal(t, db, [][]byte{[]byte("a2"), []byte("c2")})
addIncremtal(t, db, [][]byte{[]byte("a3"), []byte("c3")})
txn := db.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get([]byte("a1"))
require.NoError(t, err)
_, err = txn.Get([]byte("c1"))
require.NoError(t, err)
_, err = txn.Get([]byte("a2"))
require.NoError(t, err)
_, err = txn.Get([]byte("c2"))
require.NoError(t, err)
_, err = txn.Get([]byte("a3"))
require.NoError(t, err)
_, err = txn.Get([]byte("c3"))
require.NoError(t, err)
})
})
}

0 comments on commit a9a5761

Please sign in to comment.