Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(release/v1.6) Update head while replaying value log (#1372) #1506

Merged
merged 2 commits into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error {
nv = make([]byte, vptrSize)
vp.Encode(nv)
meta = meta | bitValuePointer
// Update vhead. If the crash happens while replay was in progess
// and the head is not updated, we will end up replaying all the
// files again.
db.updateHead([]valuePointer{vp})
}

v := y.ValueStruct{
Expand Down Expand Up @@ -550,6 +554,8 @@ func (db *DB) get(key []byte) (y.ValueStruct, error) {
return db.lc.get(key, maxVs)
}

// updateHead should not be called without the db.Lock() since db.vhead is used
// by the writer go routines and memtable flushing goroutine.
func (db *DB) updateHead(ptrs []valuePointer) {
var ptr valuePointer
for i := len(ptrs) - 1; i >= 0; i-- {
Expand All @@ -563,8 +569,6 @@ func (db *DB) updateHead(ptrs []valuePointer) {
return
}

db.Lock()
defer db.Unlock()
y.AssertTrue(!ptr.Less(db.vhead))
db.vhead = ptr
}
Expand Down Expand Up @@ -658,7 +662,9 @@ func (db *DB) writeRequests(reqs []*request) error {
done(err)
return errors.Wrap(err, "writeRequests")
}
db.Lock()
db.updateHead(b.Ptrs)
db.Unlock()
}
done(nil)
db.elog.Printf("%d entries written", count)
Expand Down
13 changes: 8 additions & 5 deletions value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,6 @@ func TestValueGC4(t *testing.T) {

kv, err := Open(opt)
require.NoError(t, err)
defer kv.Close()

sz := 128 << 10 // 5 entries per value log file.
txn := kv.NewTransaction(true)
Expand Down Expand Up @@ -403,11 +402,9 @@ func TestValueGC4(t *testing.T) {
kv.vlog.rewrite(lf0, tr)
kv.vlog.rewrite(lf1, tr)

err = kv.vlog.Close()
require.NoError(t, err)
require.NoError(t, kv.Close())

kv.vlog.init(kv)
err = kv.vlog.open(kv, valuePointer{Fid: 2}, kv.replayFunction())
kv, err = Open(opt)
require.NoError(t, err)

for i := 0; i < 8; i++ {
Expand All @@ -429,6 +426,7 @@ func TestValueGC4(t *testing.T) {
return nil
}))
}
require.NoError(t, kv.Close())
}

func TestPersistLFDiscardStats(t *testing.T) {
Expand Down Expand Up @@ -638,6 +636,11 @@ func TestPartialAppendToValueLog(t *testing.T) {
// Replay value log from beginning, badger head is past k2.
require.NoError(t, kv.vlog.Close())

// clean up the current db.vhead so that we can replay from the beginning.
// If we don't clear the current vhead, badger will error out since new
// head passed while opening vlog is zero in the following lines.
kv.vhead = valuePointer{}

kv.vlog.init(kv)
require.NoError(
t, kv.vlog.open(kv, valuePointer{Fid: 0}, kv.replayFunction()),
Expand Down