Skip to content

Commit

Permalink
db: close previous WAL before linking next WAL
Browse files Browse the repository at this point in the history
In cockroachdb#871, Open started interpreting any WAL older than the most recent
one more strictly, erroring if it encountered any invalid chunks while
reading the log. This was enabled by a new EOF trailer chunk that's
written when a WAL is closed, ensuring that previous WALs end cleanly
and unambiguously.

However, during a WAL rotation, a crash before the previous WAL was
cleanly closed could leave the previous WAL with recycled garbage at its
tail.

This change updates WAL rotation to close the previous WAL first,
then create/rename the next WAL.
  • Loading branch information
jbowens committed Sep 10, 2020
1 parent 069f1e1 commit 2497546
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 36 deletions.
41 changes: 24 additions & 17 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,14 @@ func (d *DB) makeRoomForWrite(b *Batch) error {
d.mu.mem.switching = true
d.mu.Unlock()

// Close the previous log first. This writes an EOF trailer
// signifying the end of the file and syncs it to disk. We must
// close the previous log before linking the new log file,
// otherwise a crash could leave both logs with unclean tails, and
// Open will treat the previous log as corrupt.
prevLogSize = uint64(d.mu.log.Size())
err = d.mu.log.Close()

newLogName := base.MakeFilename(d.opts.FS, d.walDirname, fileTypeLog, newLogNum)

// Try to use a recycled log file. Recycling log files is an important
Expand All @@ -1323,12 +1331,15 @@ func (d *DB) makeRoomForWrite(b *Batch) error {
// time. This is due to the need to sync file metadata when a file is
// being written for the first time. Note this is true even if file
// preallocation is performed (e.g. fallocate).
recycleLogNum := d.logRecycler.peek()
if recycleLogNum > 0 {
recycleLogName := base.MakeFilename(d.opts.FS, d.walDirname, fileTypeLog, recycleLogNum)
newLogFile, err = d.opts.FS.ReuseForWrite(recycleLogName, newLogName)
} else {
newLogFile, err = d.opts.FS.Create(newLogName)
var recycleLogNum base.FileNum
if err == nil {
recycleLogNum = d.logRecycler.peek()
if recycleLogNum > 0 {
recycleLogName := base.MakeFilename(d.opts.FS, d.walDirname, fileTypeLog, recycleLogNum)
newLogFile, err = d.opts.FS.ReuseForWrite(recycleLogName, newLogName)
} else {
newLogFile, err = d.opts.FS.Create(newLogName)
}
}

if err == nil {
Expand All @@ -1337,17 +1348,13 @@ func (d *DB) makeRoomForWrite(b *Batch) error {
err = d.walDir.Sync()
}

if err == nil {
prevLogSize = uint64(d.mu.log.Size())
err = d.mu.log.Close()
if err != nil {
newLogFile.Close()
} else {
newLogFile = vfs.NewSyncingFile(newLogFile, vfs.SyncingFileOptions{
BytesPerSync: d.opts.BytesPerSync,
PreallocateSize: d.walPreallocateSize(),
})
}
if err != nil && newLogFile != nil {
newLogFile.Close()
} else if err == nil {
newLogFile = vfs.NewSyncingFile(newLogFile, vfs.SyncingFileOptions{
BytesPerSync: d.opts.BytesPerSync,
PreallocateSize: d.walPreallocateSize(),
})
}

if recycleLogNum > 0 {
Expand Down
66 changes: 66 additions & 0 deletions error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package pebble

import (
"bytes"
"math"
"strings"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -329,3 +330,68 @@ func TestCorruptReadError(t *testing.T) {
}
}
}

func TestDBWALRotationCrash(t *testing.T) {
memfs := vfs.NewStrictMem()

var index int32
inj := errorfs.InjectorFunc(func(op errorfs.Op) error {
if op == errorfs.OpWrite && atomic.AddInt32(&index, -1) == -1 {
memfs.SetIgnoreSyncs(true)
}
return nil
})
triggered := func() bool { return atomic.LoadInt32(&index) < 0 }

run := func(fs *errorfs.FS, k int32) (err error) {
opts := &Options{
FS: fs,
Logger: panicLogger{},
MemTableSize: 1024,
}
opts.private.disableTableStats = true
d, err := Open("", opts)
if err != nil || triggered() {
return err
}

// Write keys with the FS set up to simulate a crash by ignoring
// syncs on the k-th write operation.
atomic.StoreInt32(&index, k)
key := []byte("test")
for i := 0; i < 10; i++ {
v := []byte(strings.Repeat("b", i))
err = d.Set(key, v, nil)
if err != nil || triggered() {
break
}
}
err = firstError(err, d.Close())
return err
}

fs := errorfs.Wrap(memfs, inj)
for k := int32(0); ; k++ {
// Run, simulating a crash by ignoring syncs after the k-th write
// operation after Open.
atomic.StoreInt32(&index, math.MaxInt32)
err := run(fs, k)
if !triggered() {
// Stop when we reach a value of k greater than the number of
// write operations performed during `run`.
t.Logf("No crash at write operation %d\n", k)
if err != nil {
t.Fatalf("Filesystem did not 'crash', but error returned: %s", err)
}
break
}
t.Logf("Crashed at write operation % 2d, error: %v\n", k, err)

// Reset the filesystem to its state right before the simulated
// "crash", restore syncs, and run again without crashing.
memfs.ResetToSyncedState()
memfs.SetIgnoreSyncs(false)
atomic.StoreInt32(&index, math.MaxInt32)
require.NoError(t, run(fs, k))
}
}
9 changes: 6 additions & 3 deletions internal/errorfs/errorfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (ii *InjectIndex) MaybeError(op Op) error {
func WithProbability(op Op, p float64) Injector {
mu := new(sync.Mutex)
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
return injectorFunc(func(currOp Op) error {
return InjectorFunc(func(currOp Op) error {
mu.Lock()
defer mu.Unlock()
if currOp == op && rnd.Float64() < p {
Expand All @@ -72,9 +72,12 @@ func WithProbability(op Op, p float64) Injector {
})
}

type injectorFunc func(Op) error
// InjectorFunc implements the Injector interface for a function with
// MaybeError's signature.
type InjectorFunc func(Op) error

func (f injectorFunc) MaybeError(op Op) error { return f(op) }
// MaybeError implements the Injector interface.
func (f InjectorFunc) MaybeError(op Op) error { return f(op) }

// Injector injects errors into FS operations.
type Injector interface {
Expand Down
12 changes: 6 additions & 6 deletions testdata/checkpoint
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ sync: db/000002.log

flush db
----
create: db/000004.log
sync: db
sync: db/000002.log
close: db/000002.log
create: db/000004.log
sync: db
create: db/000005.sst
sync: db/000005.sst
close: db/000005.sst
Expand All @@ -46,10 +46,10 @@ sync: db/000004.log

flush db
----
reuseForWrite: db/000002.log -> db/000006.log
sync: db
sync: db/000004.log
close: db/000004.log
reuseForWrite: db/000002.log -> db/000006.log
sync: db
create: db/000007.sst
sync: db/000007.sst
close: db/000007.sst
Expand Down Expand Up @@ -88,10 +88,10 @@ checkpoint checkpoint1: file already exists

compact db
----
reuseForWrite: db/000004.log -> db/000008.log
sync: db
sync: db/000006.log
close: db/000006.log
reuseForWrite: db/000004.log -> db/000008.log
sync: db
create: db/000009.sst
sync: db/000009.sst
close: db/000009.sst
Expand Down
8 changes: 4 additions & 4 deletions testdata/cleaner
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ sync: wal/000002.log

flush db
----
create: wal/000004.log
sync: wal
sync: wal/000002.log
close: wal/000002.log
create: wal/000004.log
sync: wal
create: db/000005.sst
sync: db/000005.sst
close: db/000005.sst
Expand All @@ -48,10 +48,10 @@ sync: wal/000004.log

compact db
----
create: wal/000006.log
sync: wal
sync: wal/000004.log
close: wal/000004.log
create: wal/000006.log
sync: wal
create: db/000007.sst
sync: db/000007.sst
close: db/000007.sst
Expand Down
12 changes: 6 additions & 6 deletions testdata/event_listener
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ sync: db
flush
----
sync: wal/000002.log
create: wal/000005.log
sync: wal
sync: wal/000002.log
close: wal/000002.log
create: wal/000005.log
sync: wal
[JOB 2] WAL created 000005
[JOB 3] flushing 1 memtable to L0
create: db/000006.sst
Expand All @@ -60,10 +60,10 @@ sync: db
compact
----
sync: wal/000005.log
reuseForWrite: wal/000002.log -> wal/000008.log
sync: wal
sync: wal/000005.log
close: wal/000005.log
reuseForWrite: wal/000002.log -> wal/000008.log
sync: wal
[JOB 4] WAL created 000008 (recycled 000002)
[JOB 5] flushing 1 memtable to L0
create: db/000009.sst
Expand Down Expand Up @@ -108,10 +108,10 @@ disable-file-deletions
flush
----
sync: wal/000008.log
reuseForWrite: wal/000005.log -> wal/000013.log
sync: wal
sync: wal/000008.log
close: wal/000008.log
reuseForWrite: wal/000005.log -> wal/000013.log
sync: wal
[JOB 7] WAL created 000013 (recycled 000005)
[JOB 8] flushing 1 memtable to L0
create: db/000014.sst
Expand Down

0 comments on commit 2497546

Please sign in to comment.