From 4c8fe7fd63e36f9e39b788c084e18e293413a71d Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Mon, 17 Aug 2020 17:14:55 +0530 Subject: [PATCH] fix(replay) - Update head for LSM entires also (#1456) https://github.com/dgraph-io/badger/pull/1372 tried to fix the `replay from start` issue but it partially fixed the issue. The head was not being updated in case all the entries are inserted only in the LSM tree. This commit fixes it. --- backup_test.go | 22 ++++++++++++++++++++++ db.go | 37 +++++++++++++++++++++++++++---------- db_test.go | 5 +++-- 3 files changed, 52 insertions(+), 12 deletions(-) diff --git a/backup_test.go b/backup_test.go index b35e01fc7..ee14a187d 100644 --- a/backup_test.go +++ b/backup_test.go @@ -112,6 +112,7 @@ func TestBackupRestore1(t *testing.T) { return nil }) require.NoError(t, err) + require.Equal(t, db.orc.nextTs(), uint64(3)) } func TestBackupRestore2(t *testing.T) { @@ -163,6 +164,9 @@ func TestBackupRestore2(t *testing.T) { err = db2.Load(&backup, 16) require.NoError(t, err) + // Check nextTs is correctly set. + require.Equal(t, db1.orc.nextTs(), db2.orc.nextTs()) + for i := byte(1); i < N; i++ { err = db2.View(func(tx *Txn) error { k := append(key1, i) @@ -210,6 +214,9 @@ func TestBackupRestore2(t *testing.T) { err = db3.Load(&backup, 16) require.NoError(t, err) + // Check nextTs is correctly set. + require.Equal(t, db2.orc.nextTs(), db3.orc.nextTs()) + for i := byte(1); i < N; i++ { err = db3.View(func(tx *Txn) error { k := append(key1, i) @@ -325,6 +332,7 @@ func TestBackupRestore3(t *testing.T) { N := 1000 entries := createEntries(N) + var db1NextTs uint64 // backup { db1, err := Open(DefaultOptions(filepath.Join(tmpdir, "backup1"))) @@ -335,6 +343,8 @@ func TestBackupRestore3(t *testing.T) { _, err = db1.Backup(&bb, 0) require.NoError(t, err) + + db1NextTs = db1.orc.nextTs() require.NoError(t, db1.Close()) } require.True(t, len(entries) == N) @@ -345,7 +355,9 @@ func TestBackupRestore3(t *testing.T) { require.NoError(t, err) defer db2.Close() + require.NotEqual(t, db1NextTs, db2.orc.nextTs()) require.NoError(t, db2.Load(&bb, 16)) + require.Equal(t, db1NextTs, db2.orc.nextTs()) // verify err = db2.View(func(txn *Txn) error { @@ -383,6 +395,7 @@ func TestBackupLoadIncremental(t *testing.T) { updates := make(map[int]byte) var bb bytes.Buffer + var db1NextTs uint64 // backup { db1, err := Open(DefaultOptions(filepath.Join(tmpdir, "backup2"))) @@ -439,6 +452,9 @@ func TestBackupLoadIncremental(t *testing.T) { require.NoError(t, err) _, err = db1.Backup(&bb, since) require.NoError(t, err) + + db1NextTs = db1.orc.nextTs() + require.NoError(t, db1.Close()) } require.True(t, len(entries) == N) @@ -450,7 +466,9 @@ func TestBackupLoadIncremental(t *testing.T) { defer db2.Close() + require.NotEqual(t, db1NextTs, db2.orc.nextTs()) require.NoError(t, db2.Load(&bb, 16)) + require.Equal(t, db1NextTs, db2.orc.nextTs()) // verify actual := make(map[int]byte) @@ -517,6 +535,8 @@ func TestBackupBitClear(t *testing.T) { _, err = db.Backup(bak, 0) require.NoError(t, err) require.NoError(t, bak.Close()) + + oldValue := db.orc.nextTs() require.NoError(t, db.Close()) opt = getTestOptions(dir) @@ -530,6 +550,8 @@ func TestBackupBitClear(t *testing.T) { defer bak.Close() require.NoError(t, db.Load(bak, 16)) + // Ensure nextTs is still the same. + require.Equal(t, oldValue, db.orc.nextTs()) require.NoError(t, db.View(func(txn *Txn) error { e, err := txn.Get(key) diff --git a/db.go b/db.go index d5dd2a3c6..e3416515d 100644 --- a/db.go +++ b/db.go @@ -136,11 +136,11 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error { } else { nv = vp.Encode() meta = meta | bitValuePointer - // Update vhead. If the crash happens while replay was in progess - // and the head is not updated, we will end up replaying all the - // files again. - db.updateHead([]valuePointer{vp}) } + // Update vhead. If the crash happens while replay was in progess + // and the head is not updated, we will end up replaying all the + // files starting from file zero, again. + db.updateHead([]valuePointer{vp}) v := y.ValueStruct{ Value: nv, @@ -994,17 +994,19 @@ type flushTask struct { dropPrefixes [][]byte } -// handleFlushTask must be run serially. -func (db *DB) handleFlushTask(ft flushTask) error { - // There can be a scenario, when empty memtable is flushed. For example, memtable is empty and - // after writing request to value log, rotation count exceeds db.LogRotatesToFlush. - if ft.mt.Empty() { +func (db *DB) pushHead(ft flushTask) error { + // We don't need to store head pointer in the in-memory mode since we will + // never be replay anything. + if db.opt.InMemory { return nil } + // Ensure we never push a zero valued head pointer. + if ft.vptr.IsZero() { + return errors.New("Head should not be zero") + } // Store badger head even if vptr is zero, need it for readTs db.opt.Debugf("Storing value log head: %+v\n", ft.vptr) - db.opt.Debugf("Storing offset: %+v\n", ft.vptr) val := ft.vptr.Encode() // Pick the max commit ts, so in case of crash, our read ts would be higher than all the @@ -1012,6 +1014,21 @@ func (db *DB) handleFlushTask(ft flushTask) error { headTs := y.KeyWithTs(head, db.orc.nextTs()) ft.mt.Put(headTs, y.ValueStruct{Value: val}) + return nil +} + +// handleFlushTask must be run serially. +func (db *DB) handleFlushTask(ft flushTask) error { + // There can be a scenario, when empty memtable is flushed. For example, memtable is empty and + // after writing request to value log, rotation count exceeds db.LogRotatesToFlush. + if ft.mt.Empty() { + return nil + } + + if err := db.pushHead(ft); err != nil { + return err + } + dk, err := db.registry.latestDataKey() if err != nil { return y.Wrapf(err, "failed to get datakey in db.handleFlushTask") diff --git a/db_test.go b/db_test.go index 92c73a995..78b4f2932 100644 --- a/db_test.go +++ b/db_test.go @@ -2001,8 +2001,9 @@ func TestNoCrash(t *testing.T) { } db.Lock() - // make head to point to first file - db.vhead = valuePointer{0, 0, 0} + // make head to point to second file. We cannot make it point to the first + // vlog file because we cannot push a zero head pointer. + db.vhead = valuePointer{1, 0, 0} db.Unlock() db.Close()