Skip to content

Commit

Permalink
Introduce StreamDone in Stream Writer (#1061)
Browse files Browse the repository at this point in the history
This PR introduces, a way to tell StreamWriter to close a stream.
Previously Streams were always open until Flush is called on
StreamWriter. This resulted in memory utilisation, because of underlying
TableBuilder to a sortedWriter. Also closing all sorted writer in single
call resulted in more memory allocation(during Flush()). This can be
useful in some case such as bulk loader in
Dgraph, where only one stream is active at a time.

(cherry picked from commit 385da91)
  • Loading branch information
ashish-goswami authored and Ibrahim Jarif committed Mar 12, 2020
1 parent ae5d81a commit ea19351
Show file tree
Hide file tree
Showing 5 changed files with 272 additions and 43 deletions.
4 changes: 2 additions & 2 deletions contrib/cover.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ TMP=$(mktemp /tmp/badger-coverage-XXXXX.txt)
BUILD=$1
OUT=$2

set -e
set -ex

pushd $SRC &> /dev/null

# create coverage output
echo 'mode: atomic' > $OUT
for PKG in $(go list ./...|grep -v -E 'vendor'); do
go test -covermode=atomic -coverprofile=$TMP $PKG
go test -v -covermode=atomic -coverprofile=$TMP $PKG
tail -n +2 $TMP >> $OUT
done

Expand Down
93 changes: 68 additions & 25 deletions pb/pb.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pb/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ message KV {

// Stream id is used to identify which stream the KV came from.
uint32 stream_id = 10;
// Stream done is used to indicate end of stream.
bool stream_done = 11;
}

message KVList {
Expand Down
91 changes: 75 additions & 16 deletions stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package badger

import (
"fmt"
"math"
"sync"

Expand Down Expand Up @@ -47,7 +48,7 @@ type StreamWriter struct {
throttle *y.Throttle
maxVersion uint64
writers map[uint32]*sortedWriter
closer *y.Closer
maxHead valuePointer
}

// NewStreamWriter creates a StreamWriter. Right after creating StreamWriter, Prepare must be
Expand All @@ -61,7 +62,6 @@ func (db *DB) NewStreamWriter() *StreamWriter {
// concurrent streams being processed.
throttle: y.NewThrottle(16),
writers: make(map[uint32]*sortedWriter),
closer: y.NewCloser(0),
}
}

Expand All @@ -85,8 +85,23 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error {
if len(kvs.GetKv()) == 0 {
return nil
}

// closedStreams keeps track of all streams which are going to be marked as done. We are
// keeping track of all streams so that we can close them at the end, after inserting all
// the valid kvs.
closedStreams := make(map[uint32]struct{})
streamReqs := make(map[uint32]*request)
for _, kv := range kvs.Kv {
if kv.StreamDone {
closedStreams[kv.StreamId] = struct{}{}
continue
}

// Panic if some kv comes after stream has been marked as closed.
if _, ok := closedStreams[kv.StreamId]; ok {
panic(fmt.Sprintf("write performed on closed stream: %d", kv.StreamId))
}

var meta, userMeta byte
if len(kv.Meta) > 0 {
meta = kv.Meta[0]
Expand All @@ -104,7 +119,7 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error {
ExpiresAt: kv.ExpiresAt,
meta: meta,
}
// If the value can be colocated with the key in LSM tree, we can skip
// If the value can be collocated with the key in LSM tree, we can skip
// writing the value to value log.
e.skipVlog = sw.db.shouldWriteValueToLSM(*e)
req := streamReqs[kv.StreamId]
Expand All @@ -121,6 +136,10 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error {

sw.writeLock.Lock()
defer sw.writeLock.Unlock()

// We are writing all requests to vlog even if some request belongs to already closed stream.
// It is safe to do because we are panicking while writing to sorted writer, which will be nil
// for closed stream. At restart, stream writer will drop all the data in Prepare function.
if err := sw.db.vlog.write(all); err != nil {
return err
}
Expand All @@ -131,8 +150,35 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error {
writer = sw.newWriter(streamId)
sw.writers[streamId] = writer
}

if writer == nil {
panic(fmt.Sprintf("write performed on closed stream: %d", streamId))
}

writer.reqCh <- req
}

// Now we can close any streams if required. We will make writer for
// the closed streams as nil.
for streamId := range closedStreams {
writer, ok := sw.writers[streamId]
if !ok {
sw.db.opt.Logger.Warningf("Trying to close stream: %d, but no sorted "+
"writer found for it", streamId)
continue
}

writer.closer.SignalAndWait()
if err := writer.Done(); err != nil {
return err
}

if sw.maxHead.Less(writer.head) {
sw.maxHead = writer.head
}

sw.writers[streamId] = nil
}
return nil
}

Expand All @@ -144,20 +190,27 @@ func (sw *StreamWriter) Flush() error {

defer sw.done()

sw.closer.SignalAndWait()
var maxHead valuePointer
for _, writer := range sw.writers {
if writer != nil {
writer.closer.SignalAndWait()
}
}

for _, writer := range sw.writers {
if writer == nil {
continue
}
if err := writer.Done(); err != nil {
return err
}
if maxHead.Less(writer.head) {
maxHead = writer.head
if sw.maxHead.Less(writer.head) {
sw.maxHead = writer.head
}
}

// Encode and write the value log head into a new table.
data := make([]byte, vptrSize)
maxHead.Encode(data)
data = sw.maxHead.Encode(data)
headWriter := sw.newWriter(headStreamId)
if err := headWriter.Add(
y.KeyWithTs(head, sw.maxVersion),
Expand Down Expand Up @@ -205,6 +258,8 @@ type sortedWriter struct {
streamId uint32
reqCh chan *request
head valuePointer
// Have separate closer for each writer, as it can be closed at any time.
closer *y.Closer
}

func (sw *StreamWriter) newWriter(streamId uint32) *sortedWriter {
Expand All @@ -214,17 +269,18 @@ func (sw *StreamWriter) newWriter(streamId uint32) *sortedWriter {
throttle: sw.throttle,
builder: table.NewTableBuilder(),
reqCh: make(chan *request, 3),
closer: y.NewCloser(1),
}
sw.closer.AddRunning(1)
go w.handleRequests(sw.closer)

go w.handleRequests()
return w
}

// ErrUnsortedKey is returned when any out of order key arrives at sortedWriter during call to Add.
var ErrUnsortedKey = errors.New("Keys not in sorted order")

func (w *sortedWriter) handleRequests(closer *y.Closer) {
defer closer.Done()
func (w *sortedWriter) handleRequests() {
defer w.closer.Done()

process := func(req *request) {
for i, e := range req.Entries {
Expand Down Expand Up @@ -261,7 +317,7 @@ func (w *sortedWriter) handleRequests(closer *y.Closer) {
select {
case req := <-w.reqCh:
process(req)
case <-closer.HasBeenClosed():
case <-w.closer.HasBeenClosed():
close(w.reqCh)
for req := range w.reqCh {
process(req)
Expand All @@ -280,7 +336,7 @@ func (w *sortedWriter) Add(key []byte, vs y.ValueStruct) error {
sameKey := y.SameKey(key, w.lastKey)
// Same keys should go into the same SSTable.
if !sameKey && w.builder.ReachedCapacity(w.db.opt.MaxTableSize) {
if err := w.send(); err != nil {
if err := w.send(false); err != nil {
return err
}
}
Expand All @@ -290,7 +346,7 @@ func (w *sortedWriter) Add(key []byte, vs y.ValueStruct) error {
return nil
}

func (w *sortedWriter) send() error {
func (w *sortedWriter) send(done bool) error {
if err := w.throttle.Do(); err != nil {
return err
}
Expand All @@ -307,9 +363,12 @@ func (w *sortedWriter) send() error {
// to sortedWriter. It completes writing current SST to disk.
func (w *sortedWriter) Done() error {
if w.builder.Empty() {
// Assign builder as nil, so that underlying memory can be garbage collected.
w.builder = nil
return nil
}
return w.send()

return w.send(true)
}

func (w *sortedWriter) createTable(data []byte) error {
Expand Down
Loading

0 comments on commit ea19351

Please sign in to comment.