-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix deadlock in discard stats #1070
Changes from 3 commits
1496998
5870c76
34237a9
3a16d54
c8f0d4e
12cea56
054ac4e
ecadd42
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -764,6 +764,9 @@ func (vlog *valueLog) dropAll() (int, error) { | |
type lfDiscardStats struct { | ||
sync.Mutex | ||
m map[uint32]int64 | ||
flushChan chan []byte | ||
flushChanClosed bool | ||
closer *y.Closer | ||
updatesSinceFlush int | ||
} | ||
|
||
|
@@ -838,6 +841,7 @@ func (lf *logFile) open(path string, flags uint32) error { | |
if lf.fd, err = y.OpenExistingFile(path, flags); err != nil { | ||
return y.Wrapf(err, "Error while opening file in logfile %s", path) | ||
} | ||
|
||
fi, err := lf.fd.Stat() | ||
if err != nil { | ||
return errFile(err, lf.path, "Unable to run file.Stat") | ||
|
@@ -999,10 +1003,15 @@ func (vlog *valueLog) open(db *DB, ptr valuePointer, replayFn logEntry) error { | |
vlog.elog = trace.NewEventLog("Badger", "Valuelog") | ||
} | ||
vlog.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time. | ||
vlog.lfDiscardStats = &lfDiscardStats{m: make(map[uint32]int64)} | ||
vlog.lfDiscardStats = &lfDiscardStats{ | ||
m: make(map[uint32]int64), | ||
closer: y.NewCloser(1), | ||
flushChan: make(chan []byte, 16), | ||
} | ||
if err := vlog.populateFilesMap(); err != nil { | ||
return err | ||
} | ||
go vlog.startFlushDiscard() | ||
// If no files are found, then create a new file. | ||
if len(vlog.filesMap) == 0 { | ||
_, err := vlog.createVlogFile(0) | ||
|
@@ -1668,58 +1677,101 @@ func (vlog *valueLog) runGC(discardRatio float64, head valuePointer) error { | |
} | ||
} | ||
|
||
func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) error { | ||
func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) { | ||
vlog.lfDiscardStats.Lock() | ||
defer vlog.lfDiscardStats.Unlock() | ||
|
||
// Compaction is closed after closing of flushing of discard stats. | ||
// Hence check if flushChan is already closed. | ||
if vlog.lfDiscardStats.flushChanClosed { | ||
vlog.opt.Warningf("updateDiscardStats called: discard stats flush channel closed, " + | ||
"returning without updating") | ||
return | ||
} | ||
|
||
for fid, sz := range stats { | ||
vlog.lfDiscardStats.m[fid] += sz | ||
vlog.lfDiscardStats.updatesSinceFlush++ | ||
} | ||
if vlog.lfDiscardStats.updatesSinceFlush > discardStatsFlushThreshold { | ||
vlog.lfDiscardStats.Unlock() | ||
// flushDiscardStats also acquires lock. So, we need to unlock here. | ||
return vlog.flushDiscardStats() | ||
select { | ||
case vlog.lfDiscardStats.flushChan <- vlog.encodedDiscardStats(): | ||
vlog.lfDiscardStats.updatesSinceFlush = 0 | ||
default: | ||
vlog.opt.Warningf("updateDiscardStats called: discard stats flushChan full, " + | ||
"returning without pushing to flushChan") | ||
} | ||
} | ||
vlog.lfDiscardStats.Unlock() | ||
return nil | ||
} | ||
|
||
// flushDiscardStats inserts discard stats into badger. Returns error on failure. | ||
func (vlog *valueLog) flushDiscardStats() error { | ||
vlog.lfDiscardStats.Lock() | ||
defer vlog.lfDiscardStats.Unlock() | ||
func (vlog *valueLog) startFlushDiscard() { | ||
defer vlog.lfDiscardStats.closer.Done() | ||
|
||
if len(vlog.lfDiscardStats.m) == 0 { | ||
return nil | ||
process := func(encodedDS []byte) error { | ||
if len(encodedDS) == 0 { | ||
return nil | ||
} | ||
entries := []*Entry{{ | ||
Key: y.KeyWithTs(lfDiscardStatsKey, 1), | ||
Value: encodedDS, | ||
}} | ||
req, err := vlog.db.sendToWriteCh(entries) | ||
if err == ErrBlockedWrites { | ||
// We'll block writes while closing db. When L0 compaction in close may push discard | ||
// stats. So ignoring it. https://github.com/dgraph-io/badger/issues/970 | ||
return nil | ||
} else if err != nil { | ||
return errors.Wrapf(err, "failed to push discard stats to write channel") | ||
} | ||
return req.Wait() | ||
} | ||
entries := []*Entry{{ | ||
Key: y.KeyWithTs(lfDiscardStatsKey, 1), | ||
Value: vlog.encodedDiscardStats(), | ||
}} | ||
req, err := vlog.db.sendToWriteCh(entries) | ||
if err == ErrBlockedWrites { | ||
// We'll block write while closing db. | ||
// When L0 compaction in close may push discard stats. | ||
// So ignoring it. | ||
// https://github.com/dgraph-io/badger/issues/970 | ||
return nil | ||
} else if err != nil { | ||
return errors.Wrapf(err, "failed to push discard stats to write channel") | ||
|
||
LOOP: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great idea to provide labels 👍 |
||
for { | ||
select { | ||
case <-vlog.lfDiscardStats.closer.HasBeenClosed(): | ||
vlog.lfDiscardStats.Lock() | ||
vlog.lfDiscardStats.flushChanClosed = true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. to prevent scattering
|
||
close(vlog.lfDiscardStats.flushChan) | ||
vlog.lfDiscardStats.Unlock() | ||
break LOOP | ||
|
||
case encodedDS := <-vlog.lfDiscardStats.flushChan: | ||
if err := process(encodedDS); err != nil { | ||
vlog.opt.Errorf("unable to process discardstats with error: %s", err) | ||
} | ||
} | ||
} | ||
|
||
// Process already buffered discard stats after flushChan is closed. | ||
for encodedDS := range vlog.lfDiscardStats.flushChan { | ||
if err := process(encodedDS); err != nil { | ||
vlog.opt.Errorf("unable to process discardstats with error: %s", err) | ||
} | ||
} | ||
vlog.lfDiscardStats.updatesSinceFlush = 0 | ||
return req.Wait() | ||
} | ||
|
||
// encodedDiscardStats returns []byte representation of lfDiscardStats | ||
// This will be called while storing stats in BadgerDB | ||
// caller should acquire lock before encoding the stats. | ||
func (vlog *valueLog) closeFlushDiscardStats() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is just using lfDiscardStat APIs, and doesn't have any valueLog internal logic. Push this into the lfDiscardStat interface. |
||
// Before closing, flush latest discardStats also. | ||
vlog.lfDiscardStats.Lock() | ||
if len(vlog.lfDiscardStats.m) > 0 { | ||
encodedDS := vlog.encodedDiscardStats() | ||
vlog.lfDiscardStats.flushChan <- encodedDS | ||
} | ||
vlog.lfDiscardStats.Unlock() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a good idea to to |
||
|
||
vlog.lfDiscardStats.closer.SignalAndWait() | ||
} | ||
|
||
// encodedDiscardStats returns []byte representation of lfDiscardStats. This will be called while | ||
// storing stats in BadgerDB caller should acquire lock before encoding the stats. | ||
func (vlog *valueLog) encodedDiscardStats() []byte { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above, this should be part of the |
||
encodedStats, _ := json.Marshal(vlog.lfDiscardStats.m) | ||
return encodedStats | ||
} | ||
|
||
// populateDiscardStats populates vlog.lfDiscardStats | ||
// This function will be called while initializing valueLog | ||
// populateDiscardStats populates vlog.lfDiscardStats. | ||
// This function will be called while initializing valueLog. | ||
func (vlog *valueLog) populateDiscardStats() error { | ||
key := y.KeyWithTs(lfDiscardStatsKey, math.MaxUint64) | ||
var statsMap map[uint32]int64 | ||
|
@@ -1771,6 +1823,11 @@ func (vlog *valueLog) populateDiscardStats() error { | |
return errors.Wrapf(err, "failed to unmarshal discard stats") | ||
} | ||
vlog.opt.Debugf("Value Log Discard stats: %v", statsMap) | ||
vlog.lfDiscardStats = &lfDiscardStats{m: statsMap} | ||
vlog.lfDiscardStats.Lock() | ||
// TODO: since we are populating discardStats after replay, it might be possible we have | ||
// generated discardStats from compaction. So instead of direct assignment just merge both | ||
// the maps. | ||
vlog.lfDiscardStats.m = statsMap | ||
vlog.lfDiscardStats.Unlock() | ||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -410,6 +410,7 @@ func TestValueGC4(t *testing.T) { | |
|
||
err = kv.vlog.Close() | ||
require.NoError(t, err) | ||
kv.vlog.closeFlushDiscardStats() // Close flushDiscardStats goroutine also. | ||
|
||
err = kv.vlog.open(kv, valuePointer{Fid: 2}, kv.replayFunction()) | ||
require.NoError(t, err) | ||
|
@@ -630,9 +631,9 @@ func TestPartialAppendToValueLog(t *testing.T) { | |
kv, err = Open(opts) | ||
require.NoError(t, err) | ||
checkKeys(t, kv, [][]byte{k3}) | ||
|
||
// Replay value log from beginning, badger head is past k2. | ||
require.NoError(t, kv.vlog.Close()) | ||
kv.vlog.closeFlushDiscardStats() // Close flushDiscardStats goroutine also. | ||
require.NoError(t, | ||
kv.vlog.open(kv, valuePointer{Fid: 0}, kv.replayFunction())) | ||
require.NoError(t, kv.Close()) | ||
|
@@ -1003,10 +1004,7 @@ func TestTruncatedDiscardStat(t *testing.T) { | |
for i := uint32(0); i < uint32(20); i++ { | ||
stat[i] = 0 | ||
} | ||
// Set discard stats. | ||
db.vlog.lfDiscardStats = &lfDiscardStats{ | ||
m: stat, | ||
} | ||
db.vlog.lfDiscardStats.m = stat | ||
entries := []*Entry{{ | ||
Key: y.KeyWithTs(lfDiscardStatsKey, 1), | ||
// Insert truncated discard stats. This is important. | ||
|
@@ -1059,10 +1057,7 @@ func TestDiscardStatsMove(t *testing.T) { | |
stat[i] = 0 | ||
} | ||
|
||
// Set discard stats. | ||
db.vlog.lfDiscardStats = &lfDiscardStats{ | ||
m: stat, | ||
} | ||
db.vlog.lfDiscardStats.m = stat | ||
entries := []*Entry{{ | ||
Key: y.KeyWithTs(lfDiscardStatsKey, 1), | ||
// The discard stat value is more than value threshold. | ||
|
@@ -1109,11 +1104,14 @@ func TestBlockedDiscardStats(t *testing.T) { | |
db, err := Open(getTestOptions(dir)) | ||
require.NoError(t, err) | ||
// Set discard stats. | ||
db.vlog.lfDiscardStats = &lfDiscardStats{ | ||
m: map[uint32]int64{0: 0}, | ||
} | ||
db.vlog.lfDiscardStats.m = map[uint32]int64{0: 0} | ||
db.blockWrite() | ||
require.NoError(t, db.vlog.flushDiscardStats()) | ||
// Here insert discardStats more than the cap(vlog.lfDiscardStats.flushChan), that will ensure | ||
// if we have tried to write at least one discard stats entry while writes were blocked. | ||
encodedDS := db.vlog.encodedDiscardStats() | ||
for i := 0; i < cap(db.vlog.lfDiscardStats.flushChan)+2; i++ { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why |
||
db.vlog.lfDiscardStats.flushChan <- encodedDS | ||
} | ||
db.unblockWrite() | ||
require.NoError(t, db.Close()) | ||
} | ||
|
@@ -1126,11 +1124,8 @@ func TestBlockedDiscardStatsOnClose(t *testing.T) { | |
|
||
db, err := Open(getTestOptions(dir)) | ||
require.NoError(t, err) | ||
// Set discard stats. | ||
db.vlog.lfDiscardStats = &lfDiscardStats{ | ||
m: map[uint32]int64{0: 0}, | ||
} | ||
// This is important. Set updateSinceFlush to discardStatsFlushThresold so | ||
db.vlog.lfDiscardStats.m = map[uint32]int64{0: 0} | ||
// This is important. Set updateSinceFlush to discardStatsFlushThreshold so | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we also add a test that replicates the deadlock, and make sure it does not deadlock with this change? |
||
// that the next update call flushes the discard stats. | ||
db.vlog.lfDiscardStats.updatesSinceFlush = discardStatsFlushThreshold + 1 | ||
require.NoError(t, db.Close()) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this no longer return an error?