Skip to content

Commit

Permalink
Store discardMap in vlog file when it's too large (#858)
Browse files Browse the repository at this point in the history
The discardMap stores the discard status for all value log files in the
db and this could get too large when there are many value log files in
badger. When there are many vlog files, the size of discard map could be
more than what we can keep in the LSM tree.

With this commit, we insert the discardMap into badger using the write
channel which would automatically put the discard map into the value log
file and use it's pointer in the LSM tree if the LSM tree cannot hold it.
  • Loading branch information
Ibrahim Jarif authored Jun 18, 2019
1 parent 853a823 commit fa0679c
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 15 deletions.
9 changes: 5 additions & 4 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,11 @@ func (db *DB) Close() error {

func (db *DB) close() (err error) {
db.elog.Printf("Closing database")

if err := db.vlog.flushDiscardStats(); err != nil {
return errors.Wrap(err, "failed to flush discard stats")
}

atomic.StoreInt32(&db.blockWrites, 1)

// Stop value GC first.
Expand Down Expand Up @@ -882,10 +887,6 @@ func (db *DB) handleFlushTask(ft flushTask) error {
headTs := y.KeyWithTs(head, db.orc.nextTs())
ft.mt.Put(headTs, y.ValueStruct{Value: offset})

// Also store lfDiscardStats before flushing memtables
discardStatsKey := y.KeyWithTs(lfDiscardStatsKey, 1)
ft.mt.Put(discardStatsKey, y.ValueStruct{Value: db.vlog.encodedDiscardStats()})

fileID := db.lc.reserveFileID()
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, db.opt.Dir), true)
if err != nil {
Expand Down
36 changes: 36 additions & 0 deletions db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,3 +324,39 @@ func TestPushValueLogLimit(t *testing.T) {
}
})
}

// Regression test for https://github.com/dgraph-io/badger/issues/830
func TestDiscardMapTooBig(t *testing.T) {
createDiscardStats := func() map[uint32]int64 {
stat := map[uint32]int64{}
for i := uint32(0); i < 8000; i++ {
stat[i] = 0
}
return stat
}
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer os.RemoveAll(dir)

ops := DefaultOptions
ops.Dir = dir
ops.ValueDir = dir
db, err := Open(ops)
require.NoError(t, err, "error while openning db")

// Add some data so that memtable flush happens on close
require.NoError(t, db.Update(func(txn *Txn) error {
return txn.Set([]byte("foo"), []byte("bar"))
}))

// overwrite discardstat with large value
db.vlog.lfDiscardStats = &lfDiscardStats{
m: createDiscardStats(),
}

require.NoError(t, db.Close())
// reopen the same DB
db, err = Open(ops)
require.NoError(t, err, "error while openning db")
require.NoError(t, db.Close())
}
4 changes: 3 additions & 1 deletion levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,9 @@ func (s *levelsController) compactBuildTables(
sort.Slice(newTables, func(i, j int) bool {
return y.CompareKeys(newTables[i].Biggest(), newTables[j].Biggest()) < 0
})
s.kv.vlog.updateDiscardStats(discardStats)
if err := s.kv.vlog.updateDiscardStats(discardStats); err != nil {
return nil, nil, errors.Wrap(err, "failed to update discard stats")
}
s.kv.opt.Debugf("Discard stats: %v", discardStats)
return newTables, func() error { return decrRefs(newTables) }, nil
}
Expand Down
60 changes: 50 additions & 10 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ const (
bitFinTxn byte = 1 << 7 // Set if the entry is to indicate end of txn in value log.

mi int64 = 1 << 20

// The number of updates after which discard map should be flushed into badger.
discardStatsFlushThreshold = 100
)

type logFile struct {
Expand Down Expand Up @@ -606,7 +609,8 @@ func (vlog *valueLog) dropAll() (int, error) {
// a given logfile.
type lfDiscardStats struct {
sync.Mutex
m map[uint32]int64
m map[uint32]int64
updatesSinceFlush int
}

type valueLog struct {
Expand Down Expand Up @@ -764,11 +768,7 @@ func (vlog *valueLog) open(db *DB, ptr valuePointer, replayFn logEntry) error {
vlog.db = db
vlog.elog = trace.NewEventLog("Badger", "Valuelog")
vlog.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time.

if err := vlog.populateDiscardStats(); err != nil {
return err
}

vlog.lfDiscardStats = &lfDiscardStats{m: make(map[uint32]int64)}
if err := vlog.populateFilesMap(); err != nil {
return err
}
Expand Down Expand Up @@ -852,6 +852,9 @@ func (vlog *valueLog) open(db *DB, ptr valuePointer, replayFn logEntry) error {
if err = last.mmap(2 * opt.ValueLogFileSize); err != nil {
return errFile(err, last.path, "Map log file")
}
if err := vlog.populateDiscardStats(); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -1372,12 +1375,36 @@ func (vlog *valueLog) runGC(discardRatio float64, head valuePointer) error {
}
}

func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) {
func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) error {
vlog.lfDiscardStats.Lock()
for fid, sz := range stats {
vlog.lfDiscardStats.m[fid] += sz
vlog.lfDiscardStats.updatesSinceFlush++
}
vlog.lfDiscardStats.Unlock()
if vlog.lfDiscardStats.updatesSinceFlush > discardStatsFlushThreshold {
if err := vlog.flushDiscardStats(); err != nil {
return err
}
vlog.lfDiscardStats.updatesSinceFlush = 0
}
return nil
}

// flushDiscardStats inserts discard stats into badger. Returns error on failure.
func (vlog *valueLog) flushDiscardStats() error {
if len(vlog.lfDiscardStats.m) == 0 {
return nil
}
entries := []*Entry{{
Key: y.KeyWithTs(lfDiscardStatsKey, 1),
Value: vlog.encodedDiscardStats(),
}}
req, err := vlog.db.sendToWriteCh(entries)
if err != nil {
return errors.Wrapf(err, "failed to push discard stats to write channel")
}
return req.Wait()
}

// encodedDiscardStats returns []byte representation of lfDiscardStats
Expand All @@ -1401,13 +1428,26 @@ func (vlog *valueLog) populateDiscardStats() error {

// check if value is Empty
if vs.Value == nil || len(vs.Value) == 0 {
vlog.lfDiscardStats = &lfDiscardStats{m: make(map[uint32]int64)}
return nil
}

var statsMap map[uint32]int64
if err := json.Unmarshal(vs.Value, &statsMap); err != nil {
return err
// discard map is stored in the vlog file.
if vs.Meta&bitValuePointer > 0 {
var vp valuePointer
vp.Decode(vs.Value)
result, cb, err := vlog.Read(vp, new(y.Slice))
if err != nil {
return errors.Wrapf(err, "failed to read value pointer from vlog file: %+v", vp)
}
defer runCallback(cb)
if err := json.Unmarshal(result, &statsMap); err != nil {
return errors.Wrapf(err, "failed to unmarshal discard stats")
}
} else {
if err := json.Unmarshal(vs.Value, &statsMap); err != nil {
return errors.Wrapf(err, "failed to unmarshal discard stats")
}
}
vlog.opt.Debugf("Value Log Discard stats: %v", statsMap)
vlog.lfDiscardStats = &lfDiscardStats{m: statsMap}
Expand Down

0 comments on commit fa0679c

Please sign in to comment.