Skip to content

Commit

Permalink
Update head while replaying value log (#1372)
Browse files Browse the repository at this point in the history
Fixes #1363

The head pointer is not updated when we perform replays. The head
pointer would be updated only when the replay completes. If badger crashes
between the point when replay started and replay finished, we would end up
replaying all the value log files. This PR fixes this issue.
  • Loading branch information
Ibrahim Jarif authored Jun 30, 2020
1 parent e013bfd commit 509de73
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
10 changes: 8 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error {
} else {
nv = vp.Encode()
meta = meta | bitValuePointer
// Update vhead. If the crash happens while replay was in progess
// and the head is not updated, we will end up replaying all the
// files again.
db.updateHead([]valuePointer{vp})
}

v := y.ValueStruct{
Expand Down Expand Up @@ -671,6 +675,8 @@ func (db *DB) get(key []byte) (y.ValueStruct, error) {
return db.lc.get(key, maxVs, 0)
}

// updateHead should not be called without the db.Lock() since db.vhead is used
// by the writer go routines and memtable flushing goroutine.
func (db *DB) updateHead(ptrs []valuePointer) {
var ptr valuePointer
for i := len(ptrs) - 1; i >= 0; i-- {
Expand All @@ -684,8 +690,6 @@ func (db *DB) updateHead(ptrs []valuePointer) {
return
}

db.Lock()
defer db.Unlock()
y.AssertTrue(!ptr.Less(db.vhead))
db.vhead = ptr
}
Expand Down Expand Up @@ -784,7 +788,9 @@ func (db *DB) writeRequests(reqs []*request) error {
done(err)
return errors.Wrap(err, "writeRequests")
}
db.Lock()
db.updateHead(b.Ptrs)
db.Unlock()
}
done(nil)
db.opt.Debugf("%d entries written", count)
Expand Down
13 changes: 8 additions & 5 deletions value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,6 @@ func TestValueGC4(t *testing.T) {

kv, err := Open(opt)
require.NoError(t, err)
defer kv.Close()

sz := 128 << 10 // 5 entries per value log file.
txn := kv.NewTransaction(true)
Expand Down Expand Up @@ -409,11 +408,9 @@ func TestValueGC4(t *testing.T) {
kv.vlog.rewrite(lf0, tr)
kv.vlog.rewrite(lf1, tr)

err = kv.vlog.Close()
require.NoError(t, err)
require.NoError(t, kv.Close())

kv.vlog.init(kv)
err = kv.vlog.open(kv, valuePointer{Fid: 2}, kv.replayFunction())
kv, err = Open(opt)
require.NoError(t, err)

for i := 0; i < 8; i++ {
Expand All @@ -435,6 +432,7 @@ func TestValueGC4(t *testing.T) {
return nil
}))
}
require.NoError(t, kv.Close())
}

func TestPersistLFDiscardStats(t *testing.T) {
Expand Down Expand Up @@ -646,6 +644,11 @@ func TestPartialAppendToValueLog(t *testing.T) {
// Replay value log from beginning, badger head is past k2.
require.NoError(t, kv.vlog.Close())

// clean up the current db.vhead so that we can replay from the beginning.
// If we don't clear the current vhead, badger will error out since new
// head passed while opening vlog is zero in the following lines.
kv.vhead = valuePointer{}

kv.vlog.init(kv)
require.NoError(
t, kv.vlog.open(kv, valuePointer{Fid: 0}, kv.replayFunction()),
Expand Down

0 comments on commit 509de73

Please sign in to comment.