From 1ad5160d3b6a3e15cc8362a3fd852a068d1def98 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Fri, 4 Oct 2019 21:46:27 +0530 Subject: [PATCH 1/4] Flush vlog buffer if it grows beyond threshold --- batch_test.go | 24 ++++++++++++++++++++++++ value.go | 12 ++++++++++++ 2 files changed, 36 insertions(+) diff --git a/batch_test.go b/batch_test.go index 6b062eb43..57653f1b6 100644 --- a/batch_test.go +++ b/batch_test.go @@ -18,6 +18,7 @@ package badger import ( "fmt" + "math/rand" "testing" "time" @@ -67,3 +68,26 @@ func TestWriteBatch(t *testing.T) { require.NoError(t, err) }) } + +// Regression test for https://github.com/dgraph-io/badger/issues/1062 +func TestWriteBatchOOM(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + wb := db.NewWriteBatch() + defer wb.Cancel() + // 1,000 entries each of 32 mb values + n := 1000 + vsize := 32000000 + key := make([]byte, 32) + val := make([]byte, vsize) + rand.Read(val) + for i := 0; i <= n; i++ { + _, err := rand.Read(key) + require.NoError(t, err) + require.NoError(t, wb.Set(key, val)) + if i%100 == 0 { + db.opt.Logger.Debugf("Written %d entries", i) + } + } + require.NoError(t, wb.Flush()) + }) +} diff --git a/value.go b/value.go index d4524d669..5a22182e0 100644 --- a/value.go +++ b/value.go @@ -1315,6 +1315,18 @@ func (vlog *valueLog) write(reqs []*request) error { p.Len = uint32(plen) b.Ptrs = append(b.Ptrs, p) written++ + + if int64(buf.Len()) > vlog.db.opt.ValueLogFileSize { + vlog.elog.Printf("Flushing buffer of size %d to vlog", buf.Len()) + n, err := curlf.fd.Write(buf.Bytes()) + if err != nil { + return errors.Wrapf(err, "Unable to write to value log file: %q", curlf.path) + } + buf.Reset() + y.NumWrites.Add(1) + y.NumBytesWritten.Add(int64(n)) + atomic.AddUint32(&vlog.writableLogOffset, uint32(n)) + } } vlog.numEntriesWritten += uint32(written) // We write to disk here so that all entries that are part of the same transaction are From e58721bf028c36ac756f9abc728176c00bb79339 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Mon, 7 Oct 2019 15:49:30 +0530 Subject: [PATCH 2/4] Remove unreliable test --- batch_test.go | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/batch_test.go b/batch_test.go index 57653f1b6..6b062eb43 100644 --- a/batch_test.go +++ b/batch_test.go @@ -18,7 +18,6 @@ package badger import ( "fmt" - "math/rand" "testing" "time" @@ -68,26 +67,3 @@ func TestWriteBatch(t *testing.T) { require.NoError(t, err) }) } - -// Regression test for https://github.com/dgraph-io/badger/issues/1062 -func TestWriteBatchOOM(t *testing.T) { - runBadgerTest(t, nil, func(t *testing.T, db *DB) { - wb := db.NewWriteBatch() - defer wb.Cancel() - // 1,000 entries each of 32 mb values - n := 1000 - vsize := 32000000 - key := make([]byte, 32) - val := make([]byte, vsize) - rand.Read(val) - for i := 0; i <= n; i++ { - _, err := rand.Read(key) - require.NoError(t, err) - require.NoError(t, wb.Set(key, val)) - if i%100 == 0 { - db.opt.Logger.Debugf("Written %d entries", i) - } - } - require.NoError(t, wb.Flush()) - }) -} From 60c77f011108371df00a7c9901ef04b37de704f9 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Wed, 9 Oct 2019 14:09:18 +0530 Subject: [PATCH 3/4] Address review comments --- value.go | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/value.go b/value.go index 5a22182e0..0b58cc16d 100644 --- a/value.go +++ b/value.go @@ -1264,7 +1264,7 @@ func (vlog *valueLog) write(reqs []*request) error { if buf.Len() == 0 { return nil } - vlog.elog.Printf("Flushing %d blocks of total size: %d", len(reqs), buf.Len()) + vlog.elog.Printf("Flushing buffer of size %d to vlog", buf.Len()) n, err := curlf.fd.Write(buf.Bytes()) if err != nil { return errors.Wrapf(err, "Unable to write to value log file: %q", curlf.path) @@ -1274,11 +1274,15 @@ func (vlog *valueLog) write(reqs []*request) error { y.NumBytesWritten.Add(int64(n)) vlog.elog.Printf("Done") atomic.AddUint32(&vlog.writableLogOffset, uint32(n)) - + return nil + } + flushWrites := func() error { + if err := toDisk(); err != nil { + return err + } if vlog.woffset() > uint32(vlog.opt.ValueLogFileSize) || vlog.numEntriesWritten > vlog.opt.ValueLogMaxEntries { - var err error - if err = curlf.doneWriting(vlog.woffset()); err != nil { + if err := curlf.doneWriting(vlog.woffset()); err != nil { return err } @@ -1316,16 +1320,14 @@ func (vlog *valueLog) write(reqs []*request) error { b.Ptrs = append(b.Ptrs, p) written++ + // It is possible that the size of the buffer grows beyond the max size of the value + // log (this happens when a transaction contains entries with large value sizes) and + // badger might run into out of memory errors. We flush the buffer here if it's size + // grows beyond the max value log size. if int64(buf.Len()) > vlog.db.opt.ValueLogFileSize { - vlog.elog.Printf("Flushing buffer of size %d to vlog", buf.Len()) - n, err := curlf.fd.Write(buf.Bytes()) - if err != nil { - return errors.Wrapf(err, "Unable to write to value log file: %q", curlf.path) + if err := toDisk(); err != nil { + return err } - buf.Reset() - y.NumWrites.Add(1) - y.NumBytesWritten.Add(int64(n)) - atomic.AddUint32(&vlog.writableLogOffset, uint32(n)) } } vlog.numEntriesWritten += uint32(written) @@ -1335,12 +1337,12 @@ func (vlog *valueLog) write(reqs []*request) error { vlog.woffset()+uint32(buf.Len()) > uint32(vlog.opt.ValueLogFileSize) || vlog.numEntriesWritten > uint32(vlog.opt.ValueLogMaxEntries) if writeNow { - if err := toDisk(); err != nil { + if err := flushWrites(); err != nil { return err } } } - return toDisk() + return flushWrites() } // Gets the logFile and acquires and RLock() for the mmap. You must call RUnlock on the file From bd1c2b1441344bcbe0b702b5d70a77d0680d17c8 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Thu, 10 Oct 2019 13:28:08 +0530 Subject: [PATCH 4/4] Address review comments --- value.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/value.go b/value.go index 0b58cc16d..585345215 100644 --- a/value.go +++ b/value.go @@ -1260,7 +1260,7 @@ func (vlog *valueLog) write(reqs []*request) error { vlog.filesLock.RUnlock() var buf bytes.Buffer - toDisk := func() error { + flushWrites := func() error { if buf.Len() == 0 { return nil } @@ -1276,8 +1276,8 @@ func (vlog *valueLog) write(reqs []*request) error { atomic.AddUint32(&vlog.writableLogOffset, uint32(n)) return nil } - flushWrites := func() error { - if err := toDisk(); err != nil { + toDisk := func() error { + if err := flushWrites(); err != nil { return err } if vlog.woffset() > uint32(vlog.opt.ValueLogFileSize) || @@ -1325,7 +1325,7 @@ func (vlog *valueLog) write(reqs []*request) error { // badger might run into out of memory errors. We flush the buffer here if it's size // grows beyond the max value log size. if int64(buf.Len()) > vlog.db.opt.ValueLogFileSize { - if err := toDisk(); err != nil { + if err := flushWrites(); err != nil { return err } } @@ -1337,12 +1337,12 @@ func (vlog *valueLog) write(reqs []*request) error { vlog.woffset()+uint32(buf.Len()) > uint32(vlog.opt.ValueLogFileSize) || vlog.numEntriesWritten > uint32(vlog.opt.ValueLogMaxEntries) if writeNow { - if err := flushWrites(); err != nil { + if err := toDisk(); err != nil { return err } } } - return flushWrites() + return toDisk() } // Gets the logFile and acquires and RLock() for the mmap. You must call RUnlock on the file