From aef68f01b10d741ab87576e7931c12bf657c3049 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Mon, 17 Aug 2020 17:14:55 +0530 Subject: [PATCH] fix(replay) - Update head for LSM entires also (#1456) https://github.com/dgraph-io/badger/pull/1372 tried to fix the `replay from start` issue but it partially fixed the issue. The head was not being updated in case all the entries are inserted only in the LSM tree. This commit fixes it. (cherry picked from commit 4c8fe7fd63e36f9e39b788c084e18e293413a71d) --- backup_test.go | 22 ++++++++++++++++++++++ db.go | 37 +++++++++++++++++++++++++++---------- db_test.go | 5 +++-- 3 files changed, 52 insertions(+), 12 deletions(-) diff --git a/backup_test.go b/backup_test.go index b35e01fc7..ee14a187d 100644 --- a/backup_test.go +++ b/backup_test.go @@ -112,6 +112,7 @@ func TestBackupRestore1(t *testing.T) { return nil }) require.NoError(t, err) + require.Equal(t, db.orc.nextTs(), uint64(3)) } func TestBackupRestore2(t *testing.T) { @@ -163,6 +164,9 @@ func TestBackupRestore2(t *testing.T) { err = db2.Load(&backup, 16) require.NoError(t, err) + // Check nextTs is correctly set. + require.Equal(t, db1.orc.nextTs(), db2.orc.nextTs()) + for i := byte(1); i < N; i++ { err = db2.View(func(tx *Txn) error { k := append(key1, i) @@ -210,6 +214,9 @@ func TestBackupRestore2(t *testing.T) { err = db3.Load(&backup, 16) require.NoError(t, err) + // Check nextTs is correctly set. + require.Equal(t, db2.orc.nextTs(), db3.orc.nextTs()) + for i := byte(1); i < N; i++ { err = db3.View(func(tx *Txn) error { k := append(key1, i) @@ -325,6 +332,7 @@ func TestBackupRestore3(t *testing.T) { N := 1000 entries := createEntries(N) + var db1NextTs uint64 // backup { db1, err := Open(DefaultOptions(filepath.Join(tmpdir, "backup1"))) @@ -335,6 +343,8 @@ func TestBackupRestore3(t *testing.T) { _, err = db1.Backup(&bb, 0) require.NoError(t, err) + + db1NextTs = db1.orc.nextTs() require.NoError(t, db1.Close()) } require.True(t, len(entries) == N) @@ -345,7 +355,9 @@ func TestBackupRestore3(t *testing.T) { require.NoError(t, err) defer db2.Close() + require.NotEqual(t, db1NextTs, db2.orc.nextTs()) require.NoError(t, db2.Load(&bb, 16)) + require.Equal(t, db1NextTs, db2.orc.nextTs()) // verify err = db2.View(func(txn *Txn) error { @@ -383,6 +395,7 @@ func TestBackupLoadIncremental(t *testing.T) { updates := make(map[int]byte) var bb bytes.Buffer + var db1NextTs uint64 // backup { db1, err := Open(DefaultOptions(filepath.Join(tmpdir, "backup2"))) @@ -439,6 +452,9 @@ func TestBackupLoadIncremental(t *testing.T) { require.NoError(t, err) _, err = db1.Backup(&bb, since) require.NoError(t, err) + + db1NextTs = db1.orc.nextTs() + require.NoError(t, db1.Close()) } require.True(t, len(entries) == N) @@ -450,7 +466,9 @@ func TestBackupLoadIncremental(t *testing.T) { defer db2.Close() + require.NotEqual(t, db1NextTs, db2.orc.nextTs()) require.NoError(t, db2.Load(&bb, 16)) + require.Equal(t, db1NextTs, db2.orc.nextTs()) // verify actual := make(map[int]byte) @@ -517,6 +535,8 @@ func TestBackupBitClear(t *testing.T) { _, err = db.Backup(bak, 0) require.NoError(t, err) require.NoError(t, bak.Close()) + + oldValue := db.orc.nextTs() require.NoError(t, db.Close()) opt = getTestOptions(dir) @@ -530,6 +550,8 @@ func TestBackupBitClear(t *testing.T) { defer bak.Close() require.NoError(t, db.Load(bak, 16)) + // Ensure nextTs is still the same. + require.Equal(t, oldValue, db.orc.nextTs()) require.NoError(t, db.View(func(txn *Txn) error { e, err := txn.Get(key) diff --git a/db.go b/db.go index d5dd2a3c6..e3416515d 100644 --- a/db.go +++ b/db.go @@ -136,11 +136,11 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error { } else { nv = vp.Encode() meta = meta | bitValuePointer - // Update vhead. If the crash happens while replay was in progess - // and the head is not updated, we will end up replaying all the - // files again. - db.updateHead([]valuePointer{vp}) } + // Update vhead. If the crash happens while replay was in progess + // and the head is not updated, we will end up replaying all the + // files starting from file zero, again. + db.updateHead([]valuePointer{vp}) v := y.ValueStruct{ Value: nv, @@ -994,17 +994,19 @@ type flushTask struct { dropPrefixes [][]byte } -// handleFlushTask must be run serially. -func (db *DB) handleFlushTask(ft flushTask) error { - // There can be a scenario, when empty memtable is flushed. For example, memtable is empty and - // after writing request to value log, rotation count exceeds db.LogRotatesToFlush. - if ft.mt.Empty() { +func (db *DB) pushHead(ft flushTask) error { + // We don't need to store head pointer in the in-memory mode since we will + // never be replay anything. + if db.opt.InMemory { return nil } + // Ensure we never push a zero valued head pointer. + if ft.vptr.IsZero() { + return errors.New("Head should not be zero") + } // Store badger head even if vptr is zero, need it for readTs db.opt.Debugf("Storing value log head: %+v\n", ft.vptr) - db.opt.Debugf("Storing offset: %+v\n", ft.vptr) val := ft.vptr.Encode() // Pick the max commit ts, so in case of crash, our read ts would be higher than all the @@ -1012,6 +1014,21 @@ func (db *DB) handleFlushTask(ft flushTask) error { headTs := y.KeyWithTs(head, db.orc.nextTs()) ft.mt.Put(headTs, y.ValueStruct{Value: val}) + return nil +} + +// handleFlushTask must be run serially. +func (db *DB) handleFlushTask(ft flushTask) error { + // There can be a scenario, when empty memtable is flushed. For example, memtable is empty and + // after writing request to value log, rotation count exceeds db.LogRotatesToFlush. + if ft.mt.Empty() { + return nil + } + + if err := db.pushHead(ft); err != nil { + return err + } + dk, err := db.registry.latestDataKey() if err != nil { return y.Wrapf(err, "failed to get datakey in db.handleFlushTask") diff --git a/db_test.go b/db_test.go index 92c73a995..78b4f2932 100644 --- a/db_test.go +++ b/db_test.go @@ -2001,8 +2001,9 @@ func TestNoCrash(t *testing.T) { } db.Lock() - // make head to point to first file - db.vhead = valuePointer{0, 0, 0} + // make head to point to second file. We cannot make it point to the first + // vlog file because we cannot push a zero head pointer. + db.vhead = valuePointer{1, 0, 0} db.Unlock() db.Close()