Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix replay for LSM entries #1456

Merged
merged 1 commit into from
Aug 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func TestBackupRestore1(t *testing.T) {
return nil
})
require.NoError(t, err)
require.Equal(t, db.orc.nextTs(), uint64(3))
}

func TestBackupRestore2(t *testing.T) {
Expand Down Expand Up @@ -163,6 +164,9 @@ func TestBackupRestore2(t *testing.T) {
err = db2.Load(&backup, 16)
require.NoError(t, err)

// Check nextTs is correctly set.
require.Equal(t, db1.orc.nextTs(), db2.orc.nextTs())

for i := byte(1); i < N; i++ {
err = db2.View(func(tx *Txn) error {
k := append(key1, i)
Expand Down Expand Up @@ -210,6 +214,9 @@ func TestBackupRestore2(t *testing.T) {
err = db3.Load(&backup, 16)
require.NoError(t, err)

// Check nextTs is correctly set.
require.Equal(t, db2.orc.nextTs(), db3.orc.nextTs())

for i := byte(1); i < N; i++ {
err = db3.View(func(tx *Txn) error {
k := append(key1, i)
Expand Down Expand Up @@ -325,6 +332,7 @@ func TestBackupRestore3(t *testing.T) {
N := 1000
entries := createEntries(N)

var db1NextTs uint64
// backup
{
db1, err := Open(DefaultOptions(filepath.Join(tmpdir, "backup1")))
Expand All @@ -335,6 +343,8 @@ func TestBackupRestore3(t *testing.T) {

_, err = db1.Backup(&bb, 0)
require.NoError(t, err)

db1NextTs = db1.orc.nextTs()
require.NoError(t, db1.Close())
}
require.True(t, len(entries) == N)
Expand All @@ -345,7 +355,9 @@ func TestBackupRestore3(t *testing.T) {
require.NoError(t, err)

defer db2.Close()
require.NotEqual(t, db1NextTs, db2.orc.nextTs())
require.NoError(t, db2.Load(&bb, 16))
require.Equal(t, db1NextTs, db2.orc.nextTs())

// verify
err = db2.View(func(txn *Txn) error {
Expand Down Expand Up @@ -383,6 +395,7 @@ func TestBackupLoadIncremental(t *testing.T) {
updates := make(map[int]byte)
var bb bytes.Buffer

var db1NextTs uint64
// backup
{
db1, err := Open(DefaultOptions(filepath.Join(tmpdir, "backup2")))
Expand Down Expand Up @@ -439,6 +452,9 @@ func TestBackupLoadIncremental(t *testing.T) {
require.NoError(t, err)
_, err = db1.Backup(&bb, since)
require.NoError(t, err)

db1NextTs = db1.orc.nextTs()

require.NoError(t, db1.Close())
}
require.True(t, len(entries) == N)
Expand All @@ -450,7 +466,9 @@ func TestBackupLoadIncremental(t *testing.T) {

defer db2.Close()

require.NotEqual(t, db1NextTs, db2.orc.nextTs())
require.NoError(t, db2.Load(&bb, 16))
require.Equal(t, db1NextTs, db2.orc.nextTs())

// verify
actual := make(map[int]byte)
Expand Down Expand Up @@ -517,6 +535,8 @@ func TestBackupBitClear(t *testing.T) {
_, err = db.Backup(bak, 0)
require.NoError(t, err)
require.NoError(t, bak.Close())

oldValue := db.orc.nextTs()
require.NoError(t, db.Close())

opt = getTestOptions(dir)
Expand All @@ -530,6 +550,8 @@ func TestBackupBitClear(t *testing.T) {
defer bak.Close()

require.NoError(t, db.Load(bak, 16))
// Ensure nextTs is still the same.
require.Equal(t, oldValue, db.orc.nextTs())

require.NoError(t, db.View(func(txn *Txn) error {
e, err := txn.Get(key)
Expand Down
37 changes: 27 additions & 10 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error {
} else {
nv = vp.Encode()
meta = meta | bitValuePointer
// Update vhead. If the crash happens while replay was in progess
// and the head is not updated, we will end up replaying all the
// files again.
db.updateHead([]valuePointer{vp})
}
// Update vhead. If the crash happens while replay was in progess
// and the head is not updated, we will end up replaying all the
// files starting from file zero, again.
db.updateHead([]valuePointer{vp})

v := y.ValueStruct{
Value: nv,
Expand Down Expand Up @@ -994,24 +994,41 @@ type flushTask struct {
dropPrefixes [][]byte
}

// handleFlushTask must be run serially.
func (db *DB) handleFlushTask(ft flushTask) error {
// There can be a scenario, when empty memtable is flushed. For example, memtable is empty and
// after writing request to value log, rotation count exceeds db.LogRotatesToFlush.
if ft.mt.Empty() {
func (db *DB) pushHead(ft flushTask) error {
// We don't need to store head pointer in the in-memory mode since we will
// never be replay anything.
if db.opt.InMemory {
return nil
}
// Ensure we never push a zero valued head pointer.
if ft.vptr.IsZero() {
return errors.New("Head should not be zero")
}

// Store badger head even if vptr is zero, need it for readTs
db.opt.Debugf("Storing value log head: %+v\n", ft.vptr)
db.opt.Debugf("Storing offset: %+v\n", ft.vptr)
val := ft.vptr.Encode()

// Pick the max commit ts, so in case of crash, our read ts would be higher than all the
// commits.
headTs := y.KeyWithTs(head, db.orc.nextTs())
ft.mt.Put(headTs, y.ValueStruct{Value: val})

return nil
}

// handleFlushTask must be run serially.
func (db *DB) handleFlushTask(ft flushTask) error {
// There can be a scenario, when empty memtable is flushed. For example, memtable is empty and
// after writing request to value log, rotation count exceeds db.LogRotatesToFlush.
if ft.mt.Empty() {
return nil
}

if err := db.pushHead(ft); err != nil {
return err
}

dk, err := db.registry.latestDataKey()
if err != nil {
return y.Wrapf(err, "failed to get datakey in db.handleFlushTask")
Expand Down
5 changes: 3 additions & 2 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2001,8 +2001,9 @@ func TestNoCrash(t *testing.T) {
}

db.Lock()
// make head to point to first file
db.vhead = valuePointer{0, 0, 0}
// make head to point to second file. We cannot make it point to the first
// vlog file because we cannot push a zero head pointer.
db.vhead = valuePointer{1, 0, 0}
db.Unlock()
db.Close()

Expand Down