From 05e7af9949fe0e26b176b7716cd8473b9cb08e10 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Thu, 10 Sep 2020 16:48:51 +0530 Subject: [PATCH] fix(replay) - Update head for LSM entires also (#1456) (#1516) https://github.com/dgraph-io/badger/pull/1372 tried to fix the `replay from start` issue but it partially fixed the issue. The head was not being updated in case all the entries are inserted only in the LSM tree. This commit fixes it. (cherry picked from commit 4c8fe7fd63e36f9e39b788c084e18e293413a71d) --- backup_test.go | 22 ++++++++++++++++++++++ db.go | 34 +++++++++++++++++++++++----------- db_test.go | 5 +++-- 3 files changed, 48 insertions(+), 13 deletions(-) diff --git a/backup_test.go b/backup_test.go index 802616558..53d5db2ce 100644 --- a/backup_test.go +++ b/backup_test.go @@ -112,6 +112,7 @@ func TestBackupRestore1(t *testing.T) { return nil }) require.NoError(t, err) + require.Equal(t, db.orc.nextTs(), uint64(3)) } func TestBackupRestore2(t *testing.T) { @@ -163,6 +164,9 @@ func TestBackupRestore2(t *testing.T) { err = db2.Load(&backup, 16) require.NoError(t, err) + // Check nextTs is correctly set. + require.Equal(t, db1.orc.nextTs(), db2.orc.nextTs()) + for i := byte(1); i < N; i++ { err = db2.View(func(tx *Txn) error { k := append(key1, i) @@ -210,6 +214,9 @@ func TestBackupRestore2(t *testing.T) { err = db3.Load(&backup, 16) require.NoError(t, err) + // Check nextTs is correctly set. + require.Equal(t, db2.orc.nextTs(), db3.orc.nextTs()) + for i := byte(1); i < N; i++ { err = db3.View(func(tx *Txn) error { k := append(key1, i) @@ -319,6 +326,7 @@ func TestBackupRestore3(t *testing.T) { N := 1000 entries := createEntries(N) + var db1NextTs uint64 // backup { db1, err := Open(DefaultOptions(filepath.Join(tmpdir, "backup1"))) @@ -329,6 +337,8 @@ func TestBackupRestore3(t *testing.T) { _, err = db1.Backup(&bb, 0) require.NoError(t, err) + + db1NextTs = db1.orc.nextTs() require.NoError(t, db1.Close()) } require.True(t, len(entries) == N) @@ -339,7 +349,9 @@ func TestBackupRestore3(t *testing.T) { require.NoError(t, err) defer db2.Close() + require.NotEqual(t, db1NextTs, db2.orc.nextTs()) require.NoError(t, db2.Load(&bb, 16)) + require.Equal(t, db1NextTs, db2.orc.nextTs()) // verify err = db2.View(func(txn *Txn) error { @@ -377,6 +389,7 @@ func TestBackupLoadIncremental(t *testing.T) { updates := make(map[int]byte) var bb bytes.Buffer + var db1NextTs uint64 // backup { db1, err := Open(DefaultOptions(filepath.Join(tmpdir, "backup2"))) @@ -433,6 +446,9 @@ func TestBackupLoadIncremental(t *testing.T) { require.NoError(t, err) _, err = db1.Backup(&bb, since) require.NoError(t, err) + + db1NextTs = db1.orc.nextTs() + require.NoError(t, db1.Close()) } require.True(t, len(entries) == N) @@ -444,7 +460,9 @@ func TestBackupLoadIncremental(t *testing.T) { defer db2.Close() + require.NotEqual(t, db1NextTs, db2.orc.nextTs()) require.NoError(t, db2.Load(&bb, 16)) + require.Equal(t, db1NextTs, db2.orc.nextTs()) // verify actual := make(map[int]byte) @@ -511,6 +529,8 @@ func TestBackupBitClear(t *testing.T) { _, err = db.Backup(bak, 0) require.NoError(t, err) require.NoError(t, bak.Close()) + + oldValue := db.orc.nextTs() require.NoError(t, db.Close()) opt = getTestOptions(dir) @@ -524,6 +544,8 @@ func TestBackupBitClear(t *testing.T) { defer bak.Close() require.NoError(t, db.Load(bak, 16)) + // Ensure nextTs is still the same. + require.Equal(t, oldValue, db.orc.nextTs()) require.NoError(t, db.View(func(txn *Txn) error { e, err := txn.Get(key) diff --git a/db.go b/db.go index 0b6754077..d1598c375 100644 --- a/db.go +++ b/db.go @@ -136,11 +136,11 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error { nv = make([]byte, vptrSize) vp.Encode(nv) meta = meta | bitValuePointer - // Update vhead. If the crash happens while replay was in progess - // and the head is not updated, we will end up replaying all the - // files again. - db.updateHead([]valuePointer{vp}) } + // Update vhead. If the crash happens while replay was in progess + // and the head is not updated, we will end up replaying all the + // files starting from file zero, again. + db.updateHead([]valuePointer{vp}) v := y.ValueStruct{ Value: nv, @@ -871,17 +871,14 @@ type flushTask struct { dropPrefix []byte } -// handleFlushTask must be run serially. -func (db *DB) handleFlushTask(ft flushTask) error { - // There can be a scnerio, when empty memtable is flushed. For example, memtable is empty and - // after writing request to value log, rotation count exceeds db.LogRotatesToFlush. - if ft.mt.Empty() { - return nil +func (db *DB) pushHead(ft flushTask) error { + // Ensure we never push a zero valued head pointer. + if ft.vptr.IsZero() { + return errors.New("Head should not be zero") } // Store badger head even if vptr is zero, need it for readTs db.opt.Debugf("Storing value log head: %+v\n", ft.vptr) - db.elog.Printf("Storing offset: %+v\n", ft.vptr) offset := make([]byte, vptrSize) ft.vptr.Encode(offset) @@ -890,6 +887,21 @@ func (db *DB) handleFlushTask(ft flushTask) error { headTs := y.KeyWithTs(head, db.orc.nextTs()) ft.mt.Put(headTs, y.ValueStruct{Value: offset}) + return nil +} + +// handleFlushTask must be run serially. +func (db *DB) handleFlushTask(ft flushTask) error { + // There can be a scenario, when empty memtable is flushed. For example, memtable is empty and + // after writing request to value log, rotation count exceeds db.LogRotatesToFlush. + if ft.mt.Empty() { + return nil + } + + if err := db.pushHead(ft); err != nil { + return err + } + fileID := db.lc.reserveFileID() fd, err := y.CreateSyncedFile(table.NewFilename(fileID, db.opt.Dir), true) if err != nil { diff --git a/db_test.go b/db_test.go index b32abfb00..e3526f2b5 100644 --- a/db_test.go +++ b/db_test.go @@ -1861,8 +1861,9 @@ func TestNoCrash(t *testing.T) { } db.Lock() - // make head to point to first file - db.vhead = valuePointer{0, 0, 0} + // make head to point to second file. We cannot make it point to the first + // vlog file because we cannot push a zero head pointer. + db.vhead = valuePointer{1, 0, 0} db.Unlock() db.Close()