From fc9ee65ab18bb5f2d10fd7a38d258d73d04f9e83 Mon Sep 17 00:00:00 2001 From: Ashish Goswami Date: Wed, 16 Oct 2019 16:08:10 +0530 Subject: [PATCH] Fix deadlock in discard stats (#1070) Fixes #1032 Currently discardStats flow is as follows: * Discard Stats are generated during compaction. At the end, compaction routine updates these stats in vlog(vlog maintains all discard stats). If number of updates exceeds a threshold, a new request is generated and sent to write channel. Routine waits for request to complete(request.Wait()). * Requests are consumed from write channel and written to vlog first and then to memtable. * If memtable is full, it is flushed to flush channel. *From flush channel, memtables are written to L0 only if there are less than or equal to NumLevelZeroTablesStall tables already. Events which can lead to deadlock: Compaction is running on L0 which has NumLevelZeroTablesStall tables currently and tries to flush discard stats to write channel. After pushing stats to write channel, it waits for write request to complete, which cannot be completed due to cyclic dependency. Fix: This PR introduces a flush channel(buffered) for discardStats. Compaction routine, will push generated discard stats to flush channel, if channel is full it just returns. This decouples compaction and writes. We have a separate routine for consuming stats from flush chan. (cherry picked from commit c1cf0d7e3d9f893561c847f0e5a62f759adfb4bb) --- db.go | 4 -- db2_test.go | 10 ++-- levels.go | 4 +- value.go | 130 ++++++++++++++++++++++++++++++-------------------- value_test.go | 78 ++++++++++++++++++++---------- 5 files changed, 137 insertions(+), 89 deletions(-) diff --git a/db.go b/db.go index 4ac797bb7..e187b3579 100644 --- a/db.go +++ b/db.go @@ -358,10 +358,6 @@ func (db *DB) Close() error { func (db *DB) close() (err error) { db.elog.Printf("Closing database") - if err := db.vlog.flushDiscardStats(); err != nil { - return errors.Wrap(err, "failed to flush discard stats") - } - atomic.StoreInt32(&db.blockWrites, 1) // Stop value GC first. diff --git a/db2_test.go b/db2_test.go index fb3c9602b..f109973b5 100644 --- a/db2_test.go +++ b/db2_test.go @@ -349,22 +349,20 @@ func TestDiscardMapTooBig(t *testing.T) { defer os.RemoveAll(dir) db, err := Open(DefaultOptions(dir)) - require.NoError(t, err, "error while openning db") + require.NoError(t, err, "error while opening db") - // Add some data so that memtable flush happens on close + // Add some data so that memtable flush happens on close. require.NoError(t, db.Update(func(txn *Txn) error { return txn.Set([]byte("foo"), []byte("bar")) })) // overwrite discardstat with large value - db.vlog.lfDiscardStats = &lfDiscardStats{ - m: createDiscardStats(), - } + db.vlog.lfDiscardStats.m = createDiscardStats() require.NoError(t, db.Close()) // reopen the same DB db, err = Open(DefaultOptions(dir)) - require.NoError(t, err, "error while openning db") + require.NoError(t, err, "error while opening db") require.NoError(t, db.Close()) } diff --git a/levels.go b/levels.go index ca1b112da..99d966d10 100644 --- a/levels.go +++ b/levels.go @@ -619,9 +619,7 @@ func (s *levelsController) compactBuildTables( sort.Slice(newTables, func(i, j int) bool { return y.CompareKeys(newTables[i].Biggest(), newTables[j].Biggest()) < 0 }) - if err := s.kv.vlog.updateDiscardStats(discardStats); err != nil { - return nil, nil, errors.Wrap(err, "failed to update discard stats") - } + s.kv.vlog.updateDiscardStats(discardStats) s.kv.opt.Debugf("Discard stats: %v", discardStats) return newTables, func() error { return decrRefs(newTables) }, nil } diff --git a/value.go b/value.go index 831f99d09..75b232b92 100644 --- a/value.go +++ b/value.go @@ -87,7 +87,7 @@ func (lf *logFile) mmap(size int64) (err error) { } func (lf *logFile) munmap() (err error) { - if lf.loadingMode != options.MemoryMap { + if lf.loadingMode != options.MemoryMap || len(lf.fmap)== 0{ // Nothing to do return nil } @@ -611,8 +611,10 @@ func (vlog *valueLog) dropAll() (int, error) { // lfDiscardStats keeps track of the amount of data that could be discarded for // a given logfile. type lfDiscardStats struct { - sync.Mutex + sync.RWMutex m map[uint32]int64 + flushChan chan map[uint32]int64 + closer *y.Closer updatesSinceFlush int } @@ -762,7 +764,12 @@ func (vlog *valueLog) open(db *DB, ptr valuePointer, replayFn logEntry) error { vlog.elog = trace.NewEventLog("Badger", "Valuelog") } vlog.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time. - vlog.lfDiscardStats = &lfDiscardStats{m: make(map[uint32]int64)} + vlog.lfDiscardStats = &lfDiscardStats{ + m: make(map[uint32]int64), + closer: y.NewCloser(1), + flushChan: make(chan map[uint32]int64, 16), + } + go vlog.flushDiscardStats() if err := vlog.populateFilesMap(); err != nil { return err } @@ -786,10 +793,12 @@ func (vlog *valueLog) open(db *DB, ptr valuePointer, replayFn logEntry) error { flags |= y.Sync } + var err error // Open log file "lf" in read-write mode. - if err := lf.open(vlog.fpath(lf.fid), flags); err != nil { - return err + if lf.fd, err = y.OpenExistingFile(vlog.fpath(fid), flags); err != nil { + return y.Wrapf(err, "Error while opening file in logfile %s", vlog.fpath(fid)) } + // This file is before the value head pointer. So, we don't need to // replay it, and can just open it in readonly mode. if fid < ptr.Fid { @@ -873,6 +882,9 @@ func (lf *logFile) open(filename string, flags uint32) error { } func (vlog *valueLog) Close() error { + // close flushDiscardStats. + vlog.lfDiscardStats.closer.SignalAndWait() + vlog.elog.Printf("Stopping garbage collection of values.") defer vlog.elog.Finish() @@ -959,7 +971,7 @@ func (reqs requests) DecrRef() { // sync function syncs content of latest value log file to disk. Syncing of value log directory is // not required here as it happens every time a value log file rotation happens(check createVlogFile // function). During rotation, previous value log file also gets synced to disk. It only syncs file -// if fid >= vlog.maxFid. In some cases such as replay(while openning db), it might be called with +// if fid >= vlog.maxFid. In some cases such as replay(while opening db), it might be called with // fid < vlog.maxFid. To sync irrespective of file id just call it with math.MaxUint32. func (vlog *valueLog) sync(fid uint32) error { if vlog.opt.SyncWrites { @@ -1185,7 +1197,7 @@ func (vlog *valueLog) pickLog(head valuePointer, tr trace.Trace) (files []*logFi fid uint32 discard int64 }{math.MaxUint32, 0} - vlog.lfDiscardStats.Lock() + vlog.lfDiscardStats.RLock() for _, fid := range fids { if fid >= head.Fid { break @@ -1195,7 +1207,7 @@ func (vlog *valueLog) pickLog(head valuePointer, tr trace.Trace) (files []*logFi candidate.discard = vlog.lfDiscardStats.m[fid] } } - vlog.lfDiscardStats.Unlock() + vlog.lfDiscardStats.RUnlock() if candidate.fid != math.MaxUint32 { // Found a candidate tr.LazyPrintf("Found candidate via discard stats: %v", candidate) @@ -1418,58 +1430,72 @@ func (vlog *valueLog) runGC(discardRatio float64, head valuePointer) error { } } -func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) error { - vlog.lfDiscardStats.Lock() - - for fid, sz := range stats { - vlog.lfDiscardStats.m[fid] += sz - vlog.lfDiscardStats.updatesSinceFlush++ - } - if vlog.lfDiscardStats.updatesSinceFlush > discardStatsFlushThreshold { - vlog.lfDiscardStats.Unlock() - // flushDiscardStats also acquires lock. So, we need to unlock here. - return vlog.flushDiscardStats() +func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) { + select { + case vlog.lfDiscardStats.flushChan <- stats: + default: + vlog.opt.Warningf("updateDiscardStats called: discard stats flushChan full, " + + "returning without pushing to flushChan") } - vlog.lfDiscardStats.Unlock() - return nil } -// flushDiscardStats inserts discard stats into badger. Returns error on failure. -func (vlog *valueLog) flushDiscardStats() error { - vlog.lfDiscardStats.Lock() - defer vlog.lfDiscardStats.Unlock() +func (vlog *valueLog) flushDiscardStats() { + defer vlog.lfDiscardStats.closer.Done() - if len(vlog.lfDiscardStats.m) == 0 { - return nil + mergeStats := func(stats map[uint32]int64) ([]byte, error) { + vlog.lfDiscardStats.Lock() + defer vlog.lfDiscardStats.Unlock() + for fid, count := range stats { + vlog.lfDiscardStats.m[fid] += count + vlog.lfDiscardStats.updatesSinceFlush++ + } + + if vlog.lfDiscardStats.updatesSinceFlush > discardStatsFlushThreshold { + encodedDS, err := json.Marshal(vlog.lfDiscardStats.m) + if err != nil { + return nil, err + } + vlog.lfDiscardStats.updatesSinceFlush = 0 + return encodedDS, nil + } + return nil, nil } - entries := []*Entry{{ - Key: y.KeyWithTs(lfDiscardStatsKey, 1), - Value: vlog.encodedDiscardStats(), - }} - req, err := vlog.db.sendToWriteCh(entries) - if err == ErrBlockedWrites { - // We'll block write while closing db. - // When L0 compaction in close may push discard stats. - // So ignoring it. - // https://github.com/dgraph-io/badger/issues/970 - return nil - } else if err != nil { - return errors.Wrapf(err, "failed to push discard stats to write channel") + + process := func(stats map[uint32]int64) error { + encodedDS, err := mergeStats(stats) + if err != nil || encodedDS == nil { + return err + } + + entries := []*Entry{{ + Key: y.KeyWithTs(lfDiscardStatsKey, 1), + Value: encodedDS, + }} + req, err := vlog.db.sendToWriteCh(entries) + // No special handling of ErrBlockedWrites is required as err is just logged in + // for loop below. + if err != nil { + return errors.Wrapf(err, "failed to push discard stats to write channel") + } + return req.Wait() } - vlog.lfDiscardStats.updatesSinceFlush = 0 - return req.Wait() -} -// encodedDiscardStats returns []byte representation of lfDiscardStats -// This will be called while storing stats in BadgerDB -// caller should acquire lock before encoding the stats. -func (vlog *valueLog) encodedDiscardStats() []byte { - encodedStats, _ := json.Marshal(vlog.lfDiscardStats.m) - return encodedStats + closer := vlog.lfDiscardStats.closer + for { + select { + case <-closer.HasBeenClosed(): + // For simplicity just return without processing already present in stats in flushChan. + return + case stats := <-vlog.lfDiscardStats.flushChan: + if err := process(stats); err != nil { + vlog.opt.Errorf("unable to process discardstats with error: %s", err) + } + } + } } -// populateDiscardStats populates vlog.lfDiscardStats -// This function will be called while initializing valueLog +// populateDiscardStats populates vlog.lfDiscardStats. +// This function will be called while initializing valueLog. func (vlog *valueLog) populateDiscardStats() error { key := y.KeyWithTs(lfDiscardStatsKey, math.MaxUint64) var statsMap map[uint32]int64 @@ -1521,6 +1547,6 @@ func (vlog *valueLog) populateDiscardStats() error { return errors.Wrapf(err, "failed to unmarshal discard stats") } vlog.opt.Debugf("Value Log Discard stats: %v", statsMap) - vlog.lfDiscardStats = &lfDiscardStats{m: statsMap} + vlog.lfDiscardStats.flushChan <- statsMap return nil } diff --git a/value_test.go b/value_test.go index 38ee7a5d4..f43dd8313 100644 --- a/value_test.go +++ b/value_test.go @@ -17,6 +17,7 @@ package badger import ( + "encoding/json" "fmt" "io/ioutil" "math/rand" @@ -464,23 +465,32 @@ func TestPersistLFDiscardStats(t *testing.T) { require.NoError(t, err) } - // wait for compaction to complete - time.Sleep(1 * time.Second) + time.Sleep(1 * time.Second) // wait for compaction to complete persistedMap := make(map[uint32]int64) db.vlog.lfDiscardStats.Lock() + require.True(t, len(db.vlog.lfDiscardStats.m) > 0, "some discardStats should be generated") for k, v := range db.vlog.lfDiscardStats.m { persistedMap[k] = v } + db.vlog.lfDiscardStats.updatesSinceFlush = discardStatsFlushThreshold + 1 db.vlog.lfDiscardStats.Unlock() + + // db.vlog.lfDiscardStats.updatesSinceFlush is already > discardStatsFlushThreshold, + // send empty map to flushChan, so that latest discardStats map can be persisted. + db.vlog.lfDiscardStats.flushChan <- map[uint32]int64{} + time.Sleep(1 * time.Second) // Wait for map to be persisted. err = db.Close() require.NoError(t, err) db, err = Open(opt) require.NoError(t, err) defer db.Close() + time.Sleep(1 * time.Second) // Wait for discardStats to be populated by populateDiscardStats(). + db.vlog.lfDiscardStats.RLock() require.True(t, reflect.DeepEqual(persistedMap, db.vlog.lfDiscardStats.m), "Discard maps are not equal") + db.vlog.lfDiscardStats.RUnlock() } func TestChecksums(t *testing.T) { @@ -624,7 +634,6 @@ func TestPartialAppendToValueLog(t *testing.T) { kv, err = Open(opts) require.NoError(t, err) checkKeys(t, kv, [][]byte{k3}) - // Replay value log from beginning, badger head is past k2. require.NoError(t, kv.vlog.Close()) require.NoError(t, @@ -997,15 +1006,13 @@ func TestDiscardStatsMove(t *testing.T) { for i := uint32(0); i < uint32(ops.ValueThreshold+10); i++ { stat[i] = 0 } - - // Set discard stats. - db.vlog.lfDiscardStats = &lfDiscardStats{ - m: stat, - } + db.vlog.lfDiscardStats.m = stat + encodedDS, _ := json.Marshal(db.vlog.lfDiscardStats.m) entries := []*Entry{{ Key: y.KeyWithTs(lfDiscardStatsKey, 1), // The discard stat value is more than value threshold. - Value: db.vlog.encodedDiscardStats(), + + Value: encodedDS, }} // Push discard stats entry to the write channel. req, err := db.sendToWriteCh(entries) @@ -1055,14 +1062,15 @@ func TestTruncatedDiscardStat(t *testing.T) { for i := uint32(0); i < uint32(20); i++ { stat[i] = 0 } - // Set discard stats. - db.vlog.lfDiscardStats = &lfDiscardStats{ - m: stat, - } + + db.vlog.lfDiscardStats.Lock() + db.vlog.lfDiscardStats.m = stat + encodedDS, _ := json.Marshal(db.vlog.lfDiscardStats.m) + db.vlog.lfDiscardStats.Unlock() entries := []*Entry{{ Key: y.KeyWithTs(lfDiscardStatsKey, 1), // Insert truncated discard stats. This is important. - Value: db.vlog.encodedDiscardStats()[:10], + Value: encodedDS[:10], }} // Push discard stats entry to the write channel. req, err := db.sendToWriteCh(entries) @@ -1070,13 +1078,36 @@ func TestTruncatedDiscardStat(t *testing.T) { req.Wait() // Unset discard stats. We've already pushed the stats. If we don't unset it then it will be - // pushed again on DB close. + // pushed again on DB close. Also, the first insertion was in vlog file 1, this insertion would + // be in value log file 3. + db.vlog.lfDiscardStats.Lock() db.vlog.lfDiscardStats.m = nil + db.vlog.lfDiscardStats.Unlock() + + // Push more entries so that we get more than 1 value log files. + require.NoError(t, db.Update(func(txn *Txn) error { + e := NewEntry([]byte("f"), []byte("1")) + return txn.SetEntry(e) + })) + require.NoError(t, db.Update(func(txn *Txn) error { + e := NewEntry([]byte("ff"), []byte("1")) + return txn.SetEntry(e) + })) + tr := trace.New("Badger.ValueLog", "GC") + // Use first value log file for GC. This value log file contains the discard stats. + lf := db.vlog.filesMap[0] + require.NoError(t, db.vlog.rewrite(lf, tr)) require.NoError(t, db.Close()) db, err = Open(ops) + // discardStats will be populate using vlog.populateDiscardStats(), which pushes discard stats + // to vlog.lfDiscardStats.flushChan. Hence wait for some time, for discard stats to be updated. + time.Sleep(1 * time.Second) require.NoError(t, err) + db.vlog.lfDiscardStats.RLock() + require.Equal(t, stat, db.vlog.lfDiscardStats.m) + db.vlog.lfDiscardStats.RUnlock() require.NoError(t, db.Close()) } @@ -1088,11 +1119,13 @@ func TestBlockedDiscardStats(t *testing.T) { db, err := Open(getTestOptions(dir)) require.NoError(t, err) // Set discard stats. - db.vlog.lfDiscardStats = &lfDiscardStats{ - m: map[uint32]int64{0: 0}, - } + db.vlog.lfDiscardStats.m = map[uint32]int64{0: 0} db.blockWrite() - require.NoError(t, db.vlog.flushDiscardStats()) + // Push discard stats more than the capacity of flushChan. This ensures at least one flush + // operation completes successfully after the writes were blocked. + for i := 0; i < cap(db.vlog.lfDiscardStats.flushChan)+2; i++ { + db.vlog.lfDiscardStats.flushChan <- db.vlog.lfDiscardStats.m + } db.unblockWrite() require.NoError(t, db.Close()) } @@ -1105,11 +1138,8 @@ func TestBlockedDiscardStatsOnClose(t *testing.T) { db, err := Open(getTestOptions(dir)) require.NoError(t, err) - // Set discard stats. - db.vlog.lfDiscardStats = &lfDiscardStats{ - m: map[uint32]int64{0: 0}, - } - // This is important. Set updateSinceFlush to discardStatsFlushThresold so + db.vlog.lfDiscardStats.m = map[uint32]int64{0: 0} + // This is important. Set updateSinceFlush to discardStatsFlushThreshold so // that the next update call flushes the discard stats. db.vlog.lfDiscardStats.updatesSinceFlush = discardStatsFlushThreshold + 1 require.NoError(t, db.Close())