diff --git a/value.go b/value.go index d4524d669..585345215 100644 --- a/value.go +++ b/value.go @@ -1260,11 +1260,11 @@ func (vlog *valueLog) write(reqs []*request) error { vlog.filesLock.RUnlock() var buf bytes.Buffer - toDisk := func() error { + flushWrites := func() error { if buf.Len() == 0 { return nil } - vlog.elog.Printf("Flushing %d blocks of total size: %d", len(reqs), buf.Len()) + vlog.elog.Printf("Flushing buffer of size %d to vlog", buf.Len()) n, err := curlf.fd.Write(buf.Bytes()) if err != nil { return errors.Wrapf(err, "Unable to write to value log file: %q", curlf.path) @@ -1274,11 +1274,15 @@ func (vlog *valueLog) write(reqs []*request) error { y.NumBytesWritten.Add(int64(n)) vlog.elog.Printf("Done") atomic.AddUint32(&vlog.writableLogOffset, uint32(n)) - + return nil + } + toDisk := func() error { + if err := flushWrites(); err != nil { + return err + } if vlog.woffset() > uint32(vlog.opt.ValueLogFileSize) || vlog.numEntriesWritten > vlog.opt.ValueLogMaxEntries { - var err error - if err = curlf.doneWriting(vlog.woffset()); err != nil { + if err := curlf.doneWriting(vlog.woffset()); err != nil { return err } @@ -1315,6 +1319,16 @@ func (vlog *valueLog) write(reqs []*request) error { p.Len = uint32(plen) b.Ptrs = append(b.Ptrs, p) written++ + + // It is possible that the size of the buffer grows beyond the max size of the value + // log (this happens when a transaction contains entries with large value sizes) and + // badger might run into out of memory errors. We flush the buffer here if it's size + // grows beyond the max value log size. + if int64(buf.Len()) > vlog.db.opt.ValueLogFileSize { + if err := flushWrites(); err != nil { + return err + } + } } vlog.numEntriesWritten += uint32(written) // We write to disk here so that all entries that are part of the same transaction are