From 249754651c508303dbdbca347b19aa761c3ee116 Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Wed, 9 Sep 2020 15:23:09 -0400 Subject: [PATCH] db: close previous WAL before linking next WAL In #871, Open started interpreting any WAL older than the most recent one more strictly, erroring if it encountered any invalid chunks while reading the log. This was enabled by a new EOF trailer chunk that's written when a WAL is closed, ensuring that previous WALs end cleanly and unambiguously. However, during a WAL rotation, a crash before the previous WAL was cleanly closed could leave the previous WAL with recycled garbage at its tail. This change updates WAL rotation to close the previous WAL first, then create/rename the next WAL. --- db.go | 41 +++++++++++++---------- error_test.go | 66 +++++++++++++++++++++++++++++++++++++ internal/errorfs/errorfs.go | 9 +++-- testdata/checkpoint | 12 +++---- testdata/cleaner | 8 ++--- testdata/event_listener | 12 +++---- 6 files changed, 112 insertions(+), 36 deletions(-) diff --git a/db.go b/db.go index 06c254736e..8182eae95a 100644 --- a/db.go +++ b/db.go @@ -1315,6 +1315,14 @@ func (d *DB) makeRoomForWrite(b *Batch) error { d.mu.mem.switching = true d.mu.Unlock() + // Close the previous log first. This writes an EOF trailer + // signifying the end of the file and syncs it to disk. We must + // close the previous log before linking the new log file, + // otherwise a crash could leave both logs with unclean tails, and + // Open will treat the previous log as corrupt. + prevLogSize = uint64(d.mu.log.Size()) + err = d.mu.log.Close() + newLogName := base.MakeFilename(d.opts.FS, d.walDirname, fileTypeLog, newLogNum) // Try to use a recycled log file. Recycling log files is an important @@ -1323,12 +1331,15 @@ func (d *DB) makeRoomForWrite(b *Batch) error { // time. This is due to the need to sync file metadata when a file is // being written for the first time. Note this is true even if file // preallocation is performed (e.g. fallocate). - recycleLogNum := d.logRecycler.peek() - if recycleLogNum > 0 { - recycleLogName := base.MakeFilename(d.opts.FS, d.walDirname, fileTypeLog, recycleLogNum) - newLogFile, err = d.opts.FS.ReuseForWrite(recycleLogName, newLogName) - } else { - newLogFile, err = d.opts.FS.Create(newLogName) + var recycleLogNum base.FileNum + if err == nil { + recycleLogNum = d.logRecycler.peek() + if recycleLogNum > 0 { + recycleLogName := base.MakeFilename(d.opts.FS, d.walDirname, fileTypeLog, recycleLogNum) + newLogFile, err = d.opts.FS.ReuseForWrite(recycleLogName, newLogName) + } else { + newLogFile, err = d.opts.FS.Create(newLogName) + } } if err == nil { @@ -1337,17 +1348,13 @@ func (d *DB) makeRoomForWrite(b *Batch) error { err = d.walDir.Sync() } - if err == nil { - prevLogSize = uint64(d.mu.log.Size()) - err = d.mu.log.Close() - if err != nil { - newLogFile.Close() - } else { - newLogFile = vfs.NewSyncingFile(newLogFile, vfs.SyncingFileOptions{ - BytesPerSync: d.opts.BytesPerSync, - PreallocateSize: d.walPreallocateSize(), - }) - } + if err != nil && newLogFile != nil { + newLogFile.Close() + } else if err == nil { + newLogFile = vfs.NewSyncingFile(newLogFile, vfs.SyncingFileOptions{ + BytesPerSync: d.opts.BytesPerSync, + PreallocateSize: d.walPreallocateSize(), + }) } if recycleLogNum > 0 { diff --git a/error_test.go b/error_test.go index 588fb0d446..6b5eb4d9ff 100644 --- a/error_test.go +++ b/error_test.go @@ -6,6 +6,7 @@ package pebble import ( "bytes" + "math" "strings" "sync/atomic" "testing" @@ -329,3 +330,68 @@ func TestCorruptReadError(t *testing.T) { } } } + +func TestDBWALRotationCrash(t *testing.T) { + memfs := vfs.NewStrictMem() + + var index int32 + inj := errorfs.InjectorFunc(func(op errorfs.Op) error { + if op == errorfs.OpWrite && atomic.AddInt32(&index, -1) == -1 { + memfs.SetIgnoreSyncs(true) + } + return nil + }) + triggered := func() bool { return atomic.LoadInt32(&index) < 0 } + + run := func(fs *errorfs.FS, k int32) (err error) { + opts := &Options{ + FS: fs, + Logger: panicLogger{}, + MemTableSize: 1024, + } + opts.private.disableTableStats = true + d, err := Open("", opts) + if err != nil || triggered() { + return err + } + + // Write keys with the FS set up to simulate a crash by ignoring + // syncs on the k-th write operation. + atomic.StoreInt32(&index, k) + key := []byte("test") + for i := 0; i < 10; i++ { + v := []byte(strings.Repeat("b", i)) + err = d.Set(key, v, nil) + if err != nil || triggered() { + break + } + } + err = firstError(err, d.Close()) + return err + } + + fs := errorfs.Wrap(memfs, inj) + for k := int32(0); ; k++ { + // Run, simulating a crash by ignoring syncs after the k-th write + // operation after Open. + atomic.StoreInt32(&index, math.MaxInt32) + err := run(fs, k) + if !triggered() { + // Stop when we reach a value of k greater than the number of + // write operations performed during `run`. + t.Logf("No crash at write operation %d\n", k) + if err != nil { + t.Fatalf("Filesystem did not 'crash', but error returned: %s", err) + } + break + } + t.Logf("Crashed at write operation % 2d, error: %v\n", k, err) + + // Reset the filesystem to its state right before the simulated + // "crash", restore syncs, and run again without crashing. + memfs.ResetToSyncedState() + memfs.SetIgnoreSyncs(false) + atomic.StoreInt32(&index, math.MaxInt32) + require.NoError(t, run(fs, k)) + } +} diff --git a/internal/errorfs/errorfs.go b/internal/errorfs/errorfs.go index f1fcf25d50..e294b3cf52 100644 --- a/internal/errorfs/errorfs.go +++ b/internal/errorfs/errorfs.go @@ -62,7 +62,7 @@ func (ii *InjectIndex) MaybeError(op Op) error { func WithProbability(op Op, p float64) Injector { mu := new(sync.Mutex) rnd := rand.New(rand.NewSource(time.Now().UnixNano())) - return injectorFunc(func(currOp Op) error { + return InjectorFunc(func(currOp Op) error { mu.Lock() defer mu.Unlock() if currOp == op && rnd.Float64() < p { @@ -72,9 +72,12 @@ func WithProbability(op Op, p float64) Injector { }) } -type injectorFunc func(Op) error +// InjectorFunc implements the Injector interface for a function with +// MaybeError's signature. +type InjectorFunc func(Op) error -func (f injectorFunc) MaybeError(op Op) error { return f(op) } +// MaybeError implements the Injector interface. +func (f InjectorFunc) MaybeError(op Op) error { return f(op) } // Injector injects errors into FS operations. type Injector interface { diff --git a/testdata/checkpoint b/testdata/checkpoint index 86c0cd3c3f..0910238174 100644 --- a/testdata/checkpoint +++ b/testdata/checkpoint @@ -27,10 +27,10 @@ sync: db/000002.log flush db ---- -create: db/000004.log -sync: db sync: db/000002.log close: db/000002.log +create: db/000004.log +sync: db create: db/000005.sst sync: db/000005.sst close: db/000005.sst @@ -46,10 +46,10 @@ sync: db/000004.log flush db ---- -reuseForWrite: db/000002.log -> db/000006.log -sync: db sync: db/000004.log close: db/000004.log +reuseForWrite: db/000002.log -> db/000006.log +sync: db create: db/000007.sst sync: db/000007.sst close: db/000007.sst @@ -88,10 +88,10 @@ checkpoint checkpoint1: file already exists compact db ---- -reuseForWrite: db/000004.log -> db/000008.log -sync: db sync: db/000006.log close: db/000006.log +reuseForWrite: db/000004.log -> db/000008.log +sync: db create: db/000009.sst sync: db/000009.sst close: db/000009.sst diff --git a/testdata/cleaner b/testdata/cleaner index 3a330735b8..67ff670085 100644 --- a/testdata/cleaner +++ b/testdata/cleaner @@ -29,10 +29,10 @@ sync: wal/000002.log flush db ---- -create: wal/000004.log -sync: wal sync: wal/000002.log close: wal/000002.log +create: wal/000004.log +sync: wal create: db/000005.sst sync: db/000005.sst close: db/000005.sst @@ -48,10 +48,10 @@ sync: wal/000004.log compact db ---- -create: wal/000006.log -sync: wal sync: wal/000004.log close: wal/000004.log +create: wal/000006.log +sync: wal create: db/000007.sst sync: db/000007.sst close: db/000007.sst diff --git a/testdata/event_listener b/testdata/event_listener index 9220fddfc7..5a38783da2 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -34,10 +34,10 @@ sync: db flush ---- sync: wal/000002.log -create: wal/000005.log -sync: wal sync: wal/000002.log close: wal/000002.log +create: wal/000005.log +sync: wal [JOB 2] WAL created 000005 [JOB 3] flushing 1 memtable to L0 create: db/000006.sst @@ -60,10 +60,10 @@ sync: db compact ---- sync: wal/000005.log -reuseForWrite: wal/000002.log -> wal/000008.log -sync: wal sync: wal/000005.log close: wal/000005.log +reuseForWrite: wal/000002.log -> wal/000008.log +sync: wal [JOB 4] WAL created 000008 (recycled 000002) [JOB 5] flushing 1 memtable to L0 create: db/000009.sst @@ -108,10 +108,10 @@ disable-file-deletions flush ---- sync: wal/000008.log -reuseForWrite: wal/000005.log -> wal/000013.log -sync: wal sync: wal/000008.log close: wal/000008.log +reuseForWrite: wal/000005.log -> wal/000013.log +sync: wal [JOB 7] WAL created 000013 (recycled 000005) [JOB 8] flushing 1 memtable to L0 create: db/000014.sst