Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ashish-goswami committed Oct 14, 2019
1 parent 054ac4e commit ecadd42
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 21 deletions.
33 changes: 16 additions & 17 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ func (vlog *valueLog) dropAll() (int, error) {
// lfDiscardStats keeps track of the amount of data that could be discarded for
// a given logfile.
type lfDiscardStats struct {
sync.Mutex
sync.RWMutex
m map[uint32]int64
flushChan chan map[uint32]int64
closer *y.Closer
Expand Down Expand Up @@ -1440,7 +1440,7 @@ func (vlog *valueLog) pickLog(head valuePointer, tr trace.Trace) (files []*logFi
fid uint32
discard int64
}{math.MaxUint32, 0}
vlog.lfDiscardStats.Lock()
vlog.lfDiscardStats.RLock()
for _, fid := range fids {
if fid >= head.Fid {
break
Expand All @@ -1450,7 +1450,7 @@ func (vlog *valueLog) pickLog(head valuePointer, tr trace.Trace) (files []*logFi
candidate.discard = vlog.lfDiscardStats.m[fid]
}
}
vlog.lfDiscardStats.Unlock()
vlog.lfDiscardStats.RUnlock()

if candidate.fid != math.MaxUint32 { // Found a candidate
tr.LazyPrintf("Found candidate via discard stats: %v", candidate)
Expand Down Expand Up @@ -1691,7 +1691,7 @@ func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) {
func (vlog *valueLog) flushDiscardStats() {
defer vlog.lfDiscardStats.closer.Done()

mergeStats := func(stats map[uint32]int64) (encodedDS []byte) {
mergeStats := func(stats map[uint32]int64) ([]byte, error) {
vlog.lfDiscardStats.Lock()
defer vlog.lfDiscardStats.Unlock()
for fid, count := range stats {
Expand All @@ -1700,40 +1700,39 @@ func (vlog *valueLog) flushDiscardStats() {
}

if vlog.lfDiscardStats.updatesSinceFlush > discardStatsFlushThreshold {
var err error
encodedDS, err = json.Marshal(vlog.lfDiscardStats.m)
encodedDS, err := json.Marshal(vlog.lfDiscardStats.m)
if err != nil {
return
return nil, err
}
vlog.lfDiscardStats.updatesSinceFlush = 0
return encodedDS, nil
}
return
return nil, nil
}

process := func(stats map[uint32]int64) error {
encodedDS := mergeStats(stats)
if encodedDS == nil {
return nil
encodedDS, err := mergeStats(stats)
if err != nil || encodedDS == nil {
return err
}

entries := []*Entry{{
Key: y.KeyWithTs(lfDiscardStatsKey, 1),
Value: encodedDS,
}}
req, err := vlog.db.sendToWriteCh(entries)
if err == ErrBlockedWrites {
// Writes will be blocked if DropAll() call is made, to avoid crash ignore
// ErrBlockedWrites https://github.com/dgraph-io/badger/issues/970.
return nil
} else if err != nil {
// No special handling of ErrBlockedWrites is required as err is just logged in
// for loop below.
if err != nil {
return errors.Wrapf(err, "failed to push discard stats to write channel")
}
return req.Wait()
}

closer := vlog.lfDiscardStats.closer
for {
select {
case <-vlog.lfDiscardStats.closer.HasBeenClosed():
case <-closer.HasBeenClosed():
// For simplicity just return without processing already present in stats in flushChan.
return
case stats := <-vlog.lfDiscardStats.flushChan:
Expand Down
8 changes: 4 additions & 4 deletions value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,10 +493,10 @@ func TestPersistLFDiscardStats(t *testing.T) {
require.NoError(t, err)
defer db.Close()
time.Sleep(1 * time.Second) // Wait for discardStats to be populated by populateDiscardStats().
db.vlog.lfDiscardStats.Lock()
db.vlog.lfDiscardStats.RLock()
require.True(t, reflect.DeepEqual(persistedMap, db.vlog.lfDiscardStats.m),
"Discard maps are not equal")
db.vlog.lfDiscardStats.Unlock()
db.vlog.lfDiscardStats.RUnlock()
}

func TestChecksums(t *testing.T) {
Expand Down Expand Up @@ -1108,9 +1108,9 @@ func TestDiscardStatsMove(t *testing.T) {
// to vlog.lfDiscardStats.flushChan. Hence wait for some time, for discard stats to be updated.
time.Sleep(1 * time.Second)
require.NoError(t, err)
db.vlog.lfDiscardStats.Lock()
db.vlog.lfDiscardStats.RLock()
require.Equal(t, stat, db.vlog.lfDiscardStats.m)
db.vlog.lfDiscardStats.Unlock()
db.vlog.lfDiscardStats.RUnlock()
require.NoError(t, db.Close())
}

Expand Down

0 comments on commit ecadd42

Please sign in to comment.