Skip to content

Commit

Permalink
Fix deadlock in discard stats (#1070)
Browse files Browse the repository at this point in the history
Fixes #1032

Currently discardStats flow is as follows:
* Discard Stats are generated during compaction. At the end, compaction routine updates these stats in vlog(vlog maintains all discard stats). If number of updates exceeds a threshold, a new request is generated and sent to write channel. Routine waits for request to complete(request.Wait()).
* Requests are consumed from write channel and written to vlog first and then to memtable.
* If memtable is full, it is flushed to flush channel.
*From flush channel, memtables are written to L0 only if there are less than or equal to NumLevelZeroTablesStall tables already.

Events which can lead to deadlock:
Compaction is running on L0 which has NumLevelZeroTablesStall tables currently and tries to flush discard stats to write channel. After pushing stats to write channel, it waits for write request to complete, which cannot be completed due to cyclic dependency.

Fix:
This PR introduces a flush channel(buffered) for discardStats. Compaction routine, will push generated discard stats to flush channel, if channel is full it just returns. This decouples compaction and writes. We have a separate routine for consuming stats from flush chan.

(cherry picked from commit c1cf0d7)
  • Loading branch information
ashish-goswami authored and Ibrahim Jarif committed Mar 12, 2020
1 parent dbd9cc9 commit fc9ee65
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 89 deletions.
4 changes: 0 additions & 4 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,10 +358,6 @@ func (db *DB) Close() error {
func (db *DB) close() (err error) {
db.elog.Printf("Closing database")

if err := db.vlog.flushDiscardStats(); err != nil {
return errors.Wrap(err, "failed to flush discard stats")
}

atomic.StoreInt32(&db.blockWrites, 1)

// Stop value GC first.
Expand Down
10 changes: 4 additions & 6 deletions db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,22 +349,20 @@ func TestDiscardMapTooBig(t *testing.T) {
defer os.RemoveAll(dir)

db, err := Open(DefaultOptions(dir))
require.NoError(t, err, "error while openning db")
require.NoError(t, err, "error while opening db")

// Add some data so that memtable flush happens on close
// Add some data so that memtable flush happens on close.
require.NoError(t, db.Update(func(txn *Txn) error {
return txn.Set([]byte("foo"), []byte("bar"))
}))

// overwrite discardstat with large value
db.vlog.lfDiscardStats = &lfDiscardStats{
m: createDiscardStats(),
}
db.vlog.lfDiscardStats.m = createDiscardStats()

require.NoError(t, db.Close())
// reopen the same DB
db, err = Open(DefaultOptions(dir))
require.NoError(t, err, "error while openning db")
require.NoError(t, err, "error while opening db")
require.NoError(t, db.Close())
}

Expand Down
4 changes: 1 addition & 3 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,9 +619,7 @@ func (s *levelsController) compactBuildTables(
sort.Slice(newTables, func(i, j int) bool {
return y.CompareKeys(newTables[i].Biggest(), newTables[j].Biggest()) < 0
})
if err := s.kv.vlog.updateDiscardStats(discardStats); err != nil {
return nil, nil, errors.Wrap(err, "failed to update discard stats")
}
s.kv.vlog.updateDiscardStats(discardStats)
s.kv.opt.Debugf("Discard stats: %v", discardStats)
return newTables, func() error { return decrRefs(newTables) }, nil
}
Expand Down
130 changes: 78 additions & 52 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (lf *logFile) mmap(size int64) (err error) {
}

func (lf *logFile) munmap() (err error) {
if lf.loadingMode != options.MemoryMap {
if lf.loadingMode != options.MemoryMap || len(lf.fmap)== 0{
// Nothing to do
return nil
}
Expand Down Expand Up @@ -611,8 +611,10 @@ func (vlog *valueLog) dropAll() (int, error) {
// lfDiscardStats keeps track of the amount of data that could be discarded for
// a given logfile.
type lfDiscardStats struct {
sync.Mutex
sync.RWMutex
m map[uint32]int64
flushChan chan map[uint32]int64
closer *y.Closer
updatesSinceFlush int
}

Expand Down Expand Up @@ -762,7 +764,12 @@ func (vlog *valueLog) open(db *DB, ptr valuePointer, replayFn logEntry) error {
vlog.elog = trace.NewEventLog("Badger", "Valuelog")
}
vlog.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time.
vlog.lfDiscardStats = &lfDiscardStats{m: make(map[uint32]int64)}
vlog.lfDiscardStats = &lfDiscardStats{
m: make(map[uint32]int64),
closer: y.NewCloser(1),
flushChan: make(chan map[uint32]int64, 16),
}
go vlog.flushDiscardStats()
if err := vlog.populateFilesMap(); err != nil {
return err
}
Expand All @@ -786,10 +793,12 @@ func (vlog *valueLog) open(db *DB, ptr valuePointer, replayFn logEntry) error {
flags |= y.Sync
}

var err error
// Open log file "lf" in read-write mode.
if err := lf.open(vlog.fpath(lf.fid), flags); err != nil {
return err
if lf.fd, err = y.OpenExistingFile(vlog.fpath(fid), flags); err != nil {
return y.Wrapf(err, "Error while opening file in logfile %s", vlog.fpath(fid))
}

// This file is before the value head pointer. So, we don't need to
// replay it, and can just open it in readonly mode.
if fid < ptr.Fid {
Expand Down Expand Up @@ -873,6 +882,9 @@ func (lf *logFile) open(filename string, flags uint32) error {
}

func (vlog *valueLog) Close() error {
// close flushDiscardStats.
vlog.lfDiscardStats.closer.SignalAndWait()

vlog.elog.Printf("Stopping garbage collection of values.")
defer vlog.elog.Finish()

Expand Down Expand Up @@ -959,7 +971,7 @@ func (reqs requests) DecrRef() {
// sync function syncs content of latest value log file to disk. Syncing of value log directory is
// not required here as it happens every time a value log file rotation happens(check createVlogFile
// function). During rotation, previous value log file also gets synced to disk. It only syncs file
// if fid >= vlog.maxFid. In some cases such as replay(while openning db), it might be called with
// if fid >= vlog.maxFid. In some cases such as replay(while opening db), it might be called with
// fid < vlog.maxFid. To sync irrespective of file id just call it with math.MaxUint32.
func (vlog *valueLog) sync(fid uint32) error {
if vlog.opt.SyncWrites {
Expand Down Expand Up @@ -1185,7 +1197,7 @@ func (vlog *valueLog) pickLog(head valuePointer, tr trace.Trace) (files []*logFi
fid uint32
discard int64
}{math.MaxUint32, 0}
vlog.lfDiscardStats.Lock()
vlog.lfDiscardStats.RLock()
for _, fid := range fids {
if fid >= head.Fid {
break
Expand All @@ -1195,7 +1207,7 @@ func (vlog *valueLog) pickLog(head valuePointer, tr trace.Trace) (files []*logFi
candidate.discard = vlog.lfDiscardStats.m[fid]
}
}
vlog.lfDiscardStats.Unlock()
vlog.lfDiscardStats.RUnlock()

if candidate.fid != math.MaxUint32 { // Found a candidate
tr.LazyPrintf("Found candidate via discard stats: %v", candidate)
Expand Down Expand Up @@ -1418,58 +1430,72 @@ func (vlog *valueLog) runGC(discardRatio float64, head valuePointer) error {
}
}

func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) error {
vlog.lfDiscardStats.Lock()

for fid, sz := range stats {
vlog.lfDiscardStats.m[fid] += sz
vlog.lfDiscardStats.updatesSinceFlush++
}
if vlog.lfDiscardStats.updatesSinceFlush > discardStatsFlushThreshold {
vlog.lfDiscardStats.Unlock()
// flushDiscardStats also acquires lock. So, we need to unlock here.
return vlog.flushDiscardStats()
func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) {
select {
case vlog.lfDiscardStats.flushChan <- stats:
default:
vlog.opt.Warningf("updateDiscardStats called: discard stats flushChan full, " +
"returning without pushing to flushChan")
}
vlog.lfDiscardStats.Unlock()
return nil
}

// flushDiscardStats inserts discard stats into badger. Returns error on failure.
func (vlog *valueLog) flushDiscardStats() error {
vlog.lfDiscardStats.Lock()
defer vlog.lfDiscardStats.Unlock()
func (vlog *valueLog) flushDiscardStats() {
defer vlog.lfDiscardStats.closer.Done()

if len(vlog.lfDiscardStats.m) == 0 {
return nil
mergeStats := func(stats map[uint32]int64) ([]byte, error) {
vlog.lfDiscardStats.Lock()
defer vlog.lfDiscardStats.Unlock()
for fid, count := range stats {
vlog.lfDiscardStats.m[fid] += count
vlog.lfDiscardStats.updatesSinceFlush++
}

if vlog.lfDiscardStats.updatesSinceFlush > discardStatsFlushThreshold {
encodedDS, err := json.Marshal(vlog.lfDiscardStats.m)
if err != nil {
return nil, err
}
vlog.lfDiscardStats.updatesSinceFlush = 0
return encodedDS, nil
}
return nil, nil
}
entries := []*Entry{{
Key: y.KeyWithTs(lfDiscardStatsKey, 1),
Value: vlog.encodedDiscardStats(),
}}
req, err := vlog.db.sendToWriteCh(entries)
if err == ErrBlockedWrites {
// We'll block write while closing db.
// When L0 compaction in close may push discard stats.
// So ignoring it.
// https://github.com/dgraph-io/badger/issues/970
return nil
} else if err != nil {
return errors.Wrapf(err, "failed to push discard stats to write channel")

process := func(stats map[uint32]int64) error {
encodedDS, err := mergeStats(stats)
if err != nil || encodedDS == nil {
return err
}

entries := []*Entry{{
Key: y.KeyWithTs(lfDiscardStatsKey, 1),
Value: encodedDS,
}}
req, err := vlog.db.sendToWriteCh(entries)
// No special handling of ErrBlockedWrites is required as err is just logged in
// for loop below.
if err != nil {
return errors.Wrapf(err, "failed to push discard stats to write channel")
}
return req.Wait()
}
vlog.lfDiscardStats.updatesSinceFlush = 0
return req.Wait()
}

// encodedDiscardStats returns []byte representation of lfDiscardStats
// This will be called while storing stats in BadgerDB
// caller should acquire lock before encoding the stats.
func (vlog *valueLog) encodedDiscardStats() []byte {
encodedStats, _ := json.Marshal(vlog.lfDiscardStats.m)
return encodedStats
closer := vlog.lfDiscardStats.closer
for {
select {
case <-closer.HasBeenClosed():
// For simplicity just return without processing already present in stats in flushChan.
return
case stats := <-vlog.lfDiscardStats.flushChan:
if err := process(stats); err != nil {
vlog.opt.Errorf("unable to process discardstats with error: %s", err)
}
}
}
}

// populateDiscardStats populates vlog.lfDiscardStats
// This function will be called while initializing valueLog
// populateDiscardStats populates vlog.lfDiscardStats.
// This function will be called while initializing valueLog.
func (vlog *valueLog) populateDiscardStats() error {
key := y.KeyWithTs(lfDiscardStatsKey, math.MaxUint64)
var statsMap map[uint32]int64
Expand Down Expand Up @@ -1521,6 +1547,6 @@ func (vlog *valueLog) populateDiscardStats() error {
return errors.Wrapf(err, "failed to unmarshal discard stats")
}
vlog.opt.Debugf("Value Log Discard stats: %v", statsMap)
vlog.lfDiscardStats = &lfDiscardStats{m: statsMap}
vlog.lfDiscardStats.flushChan <- statsMap
return nil
}
Loading

0 comments on commit fc9ee65

Please sign in to comment.