diff --git a/value.go b/value.go index c7bc13ef5..5ce999010 100644 --- a/value.go +++ b/value.go @@ -762,7 +762,7 @@ func (vlog *valueLog) dropAll() (int, error) { // lfDiscardStats keeps track of the amount of data that could be discarded for // a given logfile. type lfDiscardStats struct { - sync.Mutex + sync.RWMutex m map[uint32]int64 flushChan chan map[uint32]int64 closer *y.Closer @@ -1440,7 +1440,7 @@ func (vlog *valueLog) pickLog(head valuePointer, tr trace.Trace) (files []*logFi fid uint32 discard int64 }{math.MaxUint32, 0} - vlog.lfDiscardStats.Lock() + vlog.lfDiscardStats.RLock() for _, fid := range fids { if fid >= head.Fid { break @@ -1450,7 +1450,7 @@ func (vlog *valueLog) pickLog(head valuePointer, tr trace.Trace) (files []*logFi candidate.discard = vlog.lfDiscardStats.m[fid] } } - vlog.lfDiscardStats.Unlock() + vlog.lfDiscardStats.RUnlock() if candidate.fid != math.MaxUint32 { // Found a candidate tr.LazyPrintf("Found candidate via discard stats: %v", candidate) @@ -1691,7 +1691,7 @@ func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) { func (vlog *valueLog) flushDiscardStats() { defer vlog.lfDiscardStats.closer.Done() - mergeStats := func(stats map[uint32]int64) (encodedDS []byte) { + mergeStats := func(stats map[uint32]int64) ([]byte, error) { vlog.lfDiscardStats.Lock() defer vlog.lfDiscardStats.Unlock() for fid, count := range stats { @@ -1700,20 +1700,20 @@ func (vlog *valueLog) flushDiscardStats() { } if vlog.lfDiscardStats.updatesSinceFlush > discardStatsFlushThreshold { - var err error - encodedDS, err = json.Marshal(vlog.lfDiscardStats.m) + encodedDS, err := json.Marshal(vlog.lfDiscardStats.m) if err != nil { - return + return nil, err } vlog.lfDiscardStats.updatesSinceFlush = 0 + return encodedDS, nil } - return + return nil, nil } process := func(stats map[uint32]int64) error { - encodedDS := mergeStats(stats) - if encodedDS == nil { - return nil + encodedDS, err := mergeStats(stats) + if err != nil || encodedDS == nil { + return err } entries := []*Entry{{ @@ -1721,19 +1721,18 @@ func (vlog *valueLog) flushDiscardStats() { Value: encodedDS, }} req, err := vlog.db.sendToWriteCh(entries) - if err == ErrBlockedWrites { - // Writes will be blocked if DropAll() call is made, to avoid crash ignore - // ErrBlockedWrites https://github.com/dgraph-io/badger/issues/970. - return nil - } else if err != nil { + // No special handling of ErrBlockedWrites is required as err is just logged in + // for loop below. + if err != nil { return errors.Wrapf(err, "failed to push discard stats to write channel") } return req.Wait() } + closer := vlog.lfDiscardStats.closer for { select { - case <-vlog.lfDiscardStats.closer.HasBeenClosed(): + case <-closer.HasBeenClosed(): // For simplicity just return without processing already present in stats in flushChan. return case stats := <-vlog.lfDiscardStats.flushChan: diff --git a/value_test.go b/value_test.go index 54d6ccc68..ca4cfcf15 100644 --- a/value_test.go +++ b/value_test.go @@ -493,10 +493,10 @@ func TestPersistLFDiscardStats(t *testing.T) { require.NoError(t, err) defer db.Close() time.Sleep(1 * time.Second) // Wait for discardStats to be populated by populateDiscardStats(). - db.vlog.lfDiscardStats.Lock() + db.vlog.lfDiscardStats.RLock() require.True(t, reflect.DeepEqual(persistedMap, db.vlog.lfDiscardStats.m), "Discard maps are not equal") - db.vlog.lfDiscardStats.Unlock() + db.vlog.lfDiscardStats.RUnlock() } func TestChecksums(t *testing.T) { @@ -1108,9 +1108,9 @@ func TestDiscardStatsMove(t *testing.T) { // to vlog.lfDiscardStats.flushChan. Hence wait for some time, for discard stats to be updated. time.Sleep(1 * time.Second) require.NoError(t, err) - db.vlog.lfDiscardStats.Lock() + db.vlog.lfDiscardStats.RLock() require.Equal(t, stat, db.vlog.lfDiscardStats.m) - db.vlog.lfDiscardStats.Unlock() + db.vlog.lfDiscardStats.RUnlock() require.NoError(t, db.Close()) }