Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock in discard stats #1070

Merged
merged 8 commits into from
Oct 16, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,10 +380,6 @@ func (db *DB) Close() error {
func (db *DB) close() (err error) {
db.elog.Printf("Closing database")

if err := db.vlog.flushDiscardStats(); err != nil {
return errors.Wrap(err, "failed to flush discard stats")
}

atomic.StoreInt32(&db.blockWrites, 1)

// Stop value GC first.
Expand Down
10 changes: 4 additions & 6 deletions db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,22 +369,20 @@ func TestDiscardMapTooBig(t *testing.T) {
defer os.RemoveAll(dir)

db, err := Open(DefaultOptions(dir))
require.NoError(t, err, "error while openning db")
require.NoError(t, err, "error while opening db")

// Add some data so that memtable flush happens on close
// Add some data so that memtable flush happens on close.
require.NoError(t, db.Update(func(txn *Txn) error {
return txn.Set([]byte("foo"), []byte("bar"))
}))

// overwrite discardstat with large value
db.vlog.lfDiscardStats = &lfDiscardStats{
m: createDiscardStats(),
}
db.vlog.lfDiscardStats.m = createDiscardStats()

require.NoError(t, db.Close())
// reopen the same DB
db, err = Open(DefaultOptions(dir))
require.NoError(t, err, "error while openning db")
require.NoError(t, err, "error while opening db")
require.NoError(t, db.Close())
}

Expand Down
4 changes: 1 addition & 3 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,9 +638,7 @@ func (s *levelsController) compactBuildTables(
sort.Slice(newTables, func(i, j int) bool {
return y.CompareKeys(newTables[i].Biggest(), newTables[j].Biggest()) < 0
})
if err := s.kv.vlog.updateDiscardStats(discardStats); err != nil {
return nil, nil, errors.Wrap(err, "failed to update discard stats")
}
s.kv.vlog.updateDiscardStats(discardStats)
s.kv.opt.Debugf("Discard stats: %v", discardStats)
return newTables, func() error { return decrRefs(newTables) }, nil
}
Expand Down
118 changes: 72 additions & 46 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,8 @@ func (vlog *valueLog) dropAll() (int, error) {
type lfDiscardStats struct {
sync.Mutex
m map[uint32]int64
flushChan chan map[uint32]int64
closer *y.Closer
updatesSinceFlush int
}

Expand Down Expand Up @@ -838,6 +840,7 @@ func (lf *logFile) open(path string, flags uint32) error {
if lf.fd, err = y.OpenExistingFile(path, flags); err != nil {
return y.Wrapf(err, "Error while opening file in logfile %s", path)
}

fi, err := lf.fd.Stat()
if err != nil {
return errFile(err, lf.path, "Unable to run file.Stat")
Expand Down Expand Up @@ -999,7 +1002,12 @@ func (vlog *valueLog) open(db *DB, ptr valuePointer, replayFn logEntry) error {
vlog.elog = trace.NewEventLog("Badger", "Valuelog")
}
vlog.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time.
vlog.lfDiscardStats = &lfDiscardStats{m: make(map[uint32]int64)}
vlog.lfDiscardStats = &lfDiscardStats{
m: make(map[uint32]int64),
closer: y.NewCloser(1),
flushChan: make(chan map[uint32]int64, 16),
}
go vlog.flushDiscardStats()
if err := vlog.populateFilesMap(); err != nil {
return err
}
Expand Down Expand Up @@ -1131,6 +1139,9 @@ func (lf *logFile) init() error {
}

func (vlog *valueLog) Close() error {
// close flushDiscardStats.
vlog.lfDiscardStats.closer.SignalAndWait()

vlog.elog.Printf("Stopping garbage collection of values.")
defer vlog.elog.Finish()

Expand Down Expand Up @@ -1217,7 +1228,7 @@ func (reqs requests) DecrRef() {
// sync function syncs content of latest value log file to disk. Syncing of value log directory is
// not required here as it happens every time a value log file rotation happens(check createVlogFile
// function). During rotation, previous value log file also gets synced to disk. It only syncs file
// if fid >= vlog.maxFid. In some cases such as replay(while openning db), it might be called with
// if fid >= vlog.maxFid. In some cases such as replay(while opening db), it might be called with
// fid < vlog.maxFid. To sync irrespective of file id just call it with math.MaxUint32.
func (vlog *valueLog) sync(fid uint32) error {
if vlog.opt.SyncWrites {
Expand Down Expand Up @@ -1668,58 +1679,73 @@ func (vlog *valueLog) runGC(discardRatio float64, head valuePointer) error {
}
}

func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) error {
vlog.lfDiscardStats.Lock()

for fid, sz := range stats {
vlog.lfDiscardStats.m[fid] += sz
vlog.lfDiscardStats.updatesSinceFlush++
}
if vlog.lfDiscardStats.updatesSinceFlush > discardStatsFlushThreshold {
vlog.lfDiscardStats.Unlock()
// flushDiscardStats also acquires lock. So, we need to unlock here.
return vlog.flushDiscardStats()
func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) {
select {
case vlog.lfDiscardStats.flushChan <- stats:
default:
vlog.opt.Warningf("updateDiscardStats called: discard stats flushChan full, " +
"returning without pushing to flushChan")
}
vlog.lfDiscardStats.Unlock()
return nil
}

// flushDiscardStats inserts discard stats into badger. Returns error on failure.
func (vlog *valueLog) flushDiscardStats() error {
vlog.lfDiscardStats.Lock()
defer vlog.lfDiscardStats.Unlock()
func (vlog *valueLog) flushDiscardStats() {
defer vlog.lfDiscardStats.closer.Done()

if len(vlog.lfDiscardStats.m) == 0 {
return nil
mergeStats := func(stats map[uint32]int64) (encodedDS []byte) {
vlog.lfDiscardStats.Lock()
defer vlog.lfDiscardStats.Unlock()
for fid, count := range stats {
vlog.lfDiscardStats.m[fid] += count
vlog.lfDiscardStats.updatesSinceFlush++
}

if vlog.lfDiscardStats.updatesSinceFlush > discardStatsFlushThreshold {
var err error
encodedDS, err = json.Marshal(vlog.lfDiscardStats.m)
if err != nil {
return
}
vlog.lfDiscardStats.updatesSinceFlush = 0
}
return
}
entries := []*Entry{{
Key: y.KeyWithTs(lfDiscardStatsKey, 1),
Value: vlog.encodedDiscardStats(),
}}
req, err := vlog.db.sendToWriteCh(entries)
if err == ErrBlockedWrites {
// We'll block write while closing db.
// When L0 compaction in close may push discard stats.
// So ignoring it.
// https://github.com/dgraph-io/badger/issues/970
return nil
} else if err != nil {
return errors.Wrapf(err, "failed to push discard stats to write channel")

process := func(stats map[uint32]int64) error {
encodedDS := mergeStats(stats)
if encodedDS == nil {
return nil
}

entries := []*Entry{{
Key: y.KeyWithTs(lfDiscardStatsKey, 1),
Value: encodedDS,
}}
req, err := vlog.db.sendToWriteCh(entries)
if err == ErrBlockedWrites {
// Writes will be blocked if DropAll() call is made, to avoid crash ignore
// ErrBlockedWrites https://github.com/dgraph-io/badger/issues/970.
return nil
} else if err != nil {
return errors.Wrapf(err, "failed to push discard stats to write channel")
}
return req.Wait()
}
vlog.lfDiscardStats.updatesSinceFlush = 0
return req.Wait()
}

// encodedDiscardStats returns []byte representation of lfDiscardStats
// This will be called while storing stats in BadgerDB
// caller should acquire lock before encoding the stats.
func (vlog *valueLog) encodedDiscardStats() []byte {
encodedStats, _ := json.Marshal(vlog.lfDiscardStats.m)
return encodedStats
for {
select {
case <-vlog.lfDiscardStats.closer.HasBeenClosed():
// For simplicity just return without processing already present in stats in flushChan.
return
case stats := <-vlog.lfDiscardStats.flushChan:
if err := process(stats); err != nil {
vlog.opt.Errorf("unable to process discardstats with error: %s", err)
}
}
}
}

// populateDiscardStats populates vlog.lfDiscardStats
// This function will be called while initializing valueLog
// populateDiscardStats populates vlog.lfDiscardStats.
// This function will be called while initializing valueLog.
func (vlog *valueLog) populateDiscardStats() error {
key := y.KeyWithTs(lfDiscardStatsKey, math.MaxUint64)
var statsMap map[uint32]int64
Expand Down Expand Up @@ -1771,6 +1797,6 @@ func (vlog *valueLog) populateDiscardStats() error {
return errors.Wrapf(err, "failed to unmarshal discard stats")
}
vlog.opt.Debugf("Value Log Discard stats: %v", statsMap)
vlog.lfDiscardStats = &lfDiscardStats{m: statsMap}
vlog.lfDiscardStats.flushChan <- statsMap
return nil
}
58 changes: 35 additions & 23 deletions value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package badger

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
Expand Down Expand Up @@ -470,23 +471,32 @@ func TestPersistLFDiscardStats(t *testing.T) {
require.NoError(t, err)
}

// wait for compaction to complete
time.Sleep(1 * time.Second)
time.Sleep(1 * time.Second) // wait for compaction to complete

persistedMap := make(map[uint32]int64)
db.vlog.lfDiscardStats.Lock()
require.True(t, len(db.vlog.lfDiscardStats.m) > 0, "some discardStats should be generated")
for k, v := range db.vlog.lfDiscardStats.m {
persistedMap[k] = v
}
db.vlog.lfDiscardStats.updatesSinceFlush = discardStatsFlushThreshold + 1
db.vlog.lfDiscardStats.Unlock()

// db.vlog.lfDiscardStats.updatesSinceFlush is already > discardStatsFlushThreshold,
// send empty map to flushChan, so that latest discardStats map can be persisted.
db.vlog.lfDiscardStats.flushChan <- map[uint32]int64{}
time.Sleep(1 * time.Second) // Wait for map to be persisted.
err = db.Close()
require.NoError(t, err)

db, err = Open(opt)
require.NoError(t, err)
defer db.Close()
time.Sleep(1 * time.Second) // Wait for discardStats to be populated by populateDiscardStats().
db.vlog.lfDiscardStats.Lock()
require.True(t, reflect.DeepEqual(persistedMap, db.vlog.lfDiscardStats.m),
"Discard maps are not equal")
db.vlog.lfDiscardStats.Unlock()
}

func TestChecksums(t *testing.T) {
Expand Down Expand Up @@ -630,7 +640,6 @@ func TestPartialAppendToValueLog(t *testing.T) {
kv, err = Open(opts)
require.NoError(t, err)
checkKeys(t, kv, [][]byte{k3})

// Replay value log from beginning, badger head is past k2.
require.NoError(t, kv.vlog.Close())
require.NoError(t,
Expand Down Expand Up @@ -1003,14 +1012,12 @@ func TestTruncatedDiscardStat(t *testing.T) {
for i := uint32(0); i < uint32(20); i++ {
stat[i] = 0
}
// Set discard stats.
db.vlog.lfDiscardStats = &lfDiscardStats{
m: stat,
}
db.vlog.lfDiscardStats.m = stat
encodedDS, _ := json.Marshal(db.vlog.lfDiscardStats.m)
entries := []*Entry{{
Key: y.KeyWithTs(lfDiscardStatsKey, 1),
// Insert truncated discard stats. This is important.
Value: db.vlog.encodedDiscardStats()[:10],
Value: encodedDS[:10],
}}
// Push discard stats entry to the write channel.
req, err := db.sendToWriteCh(entries)
Expand Down Expand Up @@ -1059,14 +1066,14 @@ func TestDiscardStatsMove(t *testing.T) {
stat[i] = 0
}

// Set discard stats.
db.vlog.lfDiscardStats = &lfDiscardStats{
m: stat,
}
db.vlog.lfDiscardStats.Lock()
db.vlog.lfDiscardStats.m = stat
encodedDS, _ := json.Marshal(db.vlog.lfDiscardStats.m)
db.vlog.lfDiscardStats.Unlock()
entries := []*Entry{{
Key: y.KeyWithTs(lfDiscardStatsKey, 1),
// The discard stat value is more than value threshold.
Value: db.vlog.encodedDiscardStats(),
Value: encodedDS,
}}
// Push discard stats entry to the write channel.
req, err := db.sendToWriteCh(entries)
Expand All @@ -1076,7 +1083,9 @@ func TestDiscardStatsMove(t *testing.T) {
// Unset discard stats. We've already pushed the stats. If we don't unset it then it will be
// pushed again on DB close. Also, the first insertion was in vlog file 1, this insertion would
// be in value log file 3.
db.vlog.lfDiscardStats.Lock()
db.vlog.lfDiscardStats.m = nil
db.vlog.lfDiscardStats.Unlock()

// Push more entries so that we get more than 1 value log files.
require.NoError(t, db.Update(func(txn *Txn) error {
Expand All @@ -1086,7 +1095,6 @@ func TestDiscardStatsMove(t *testing.T) {
require.NoError(t, db.Update(func(txn *Txn) error {
e := NewEntry([]byte("ff"), []byte("1"))
return txn.SetEntry(e)

}))

tr := trace.New("Badger.ValueLog", "GC")
Expand All @@ -1096,8 +1104,13 @@ func TestDiscardStatsMove(t *testing.T) {
require.NoError(t, db.Close())

db, err = Open(ops)
// discardStats will be populate using vlog.populateDiscardStats(), which pushes discard stats
// to vlog.lfDiscardStats.flushChan. Hence wait for some time, for discard stats to be updated.
time.Sleep(1 * time.Second)
require.NoError(t, err)
db.vlog.lfDiscardStats.Lock()
require.Equal(t, stat, db.vlog.lfDiscardStats.m)
db.vlog.lfDiscardStats.Unlock()
require.NoError(t, db.Close())
}

Expand All @@ -1109,11 +1122,13 @@ func TestBlockedDiscardStats(t *testing.T) {
db, err := Open(getTestOptions(dir))
require.NoError(t, err)
// Set discard stats.
db.vlog.lfDiscardStats = &lfDiscardStats{
m: map[uint32]int64{0: 0},
}
db.vlog.lfDiscardStats.m = map[uint32]int64{0: 0}
db.blockWrite()
require.NoError(t, db.vlog.flushDiscardStats())
// Push discard stats more than the capacity of flushChan. This ensures at least one flush
// operation completes successfully after the writes were blocked.
for i := 0; i < cap(db.vlog.lfDiscardStats.flushChan)+2; i++ {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 2 here?

db.vlog.lfDiscardStats.flushChan <- db.vlog.lfDiscardStats.m
}
db.unblockWrite()
require.NoError(t, db.Close())
}
Expand All @@ -1126,11 +1141,8 @@ func TestBlockedDiscardStatsOnClose(t *testing.T) {

db, err := Open(getTestOptions(dir))
require.NoError(t, err)
// Set discard stats.
db.vlog.lfDiscardStats = &lfDiscardStats{
m: map[uint32]int64{0: 0},
}
// This is important. Set updateSinceFlush to discardStatsFlushThresold so
db.vlog.lfDiscardStats.m = map[uint32]int64{0: 0}
// This is important. Set updateSinceFlush to discardStatsFlushThreshold so
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also add a test that replicates the deadlock, and make sure it does not deadlock with this change?

// that the next update call flushes the discard stats.
db.vlog.lfDiscardStats.updatesSinceFlush = discardStatsFlushThreshold + 1
require.NoError(t, db.Close())
Expand Down