From 59020cb4c9bdc41ccfcd396dabda48c348568a76 Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Tue, 1 Sep 2020 12:56:41 -0400 Subject: [PATCH] internal/record: write an EOF trailer on LogWriter Close While closing a LogWriter, write a special EOF record with an incremented log number signifying the end of the log. This EOF record will overwrite trailing garbage from the recycled log file, allowing us to detect corruption errors in any logs earlier than the most recent one. In Open return an error if any WAL other than the most recent one encounters an invalid chunk. --- internal/record/log_writer.go | 17 ++++++++++ internal/record/record_test.go | 38 +++++++++++++++++++++-- open.go | 18 ++++++----- open_test.go | 57 ++++++++++++++++++++++++++++++++++ 4 files changed, 121 insertions(+), 9 deletions(-) diff --git a/internal/record/log_writer.go b/internal/record/log_writer.go index e3ce92af0d..91e9409df8 100644 --- a/internal/record/log_writer.go +++ b/internal/record/log_writer.go @@ -534,6 +534,11 @@ func (w *LogWriter) queueBlock() { func (w *LogWriter) Close() error { f := &w.flusher + // Emit an EOF trailer signifying the end of this log. This helps readers + // differentiate between a corrupted entry in the middle of a log from + // garbage at the tail from a recycled log file. + w.emitEOFTrailer() + // Signal the flush loop to close. f.Lock() f.close = true @@ -613,6 +618,18 @@ func (w *LogWriter) Size() int64 { return w.blockNum*blockSize + int64(w.block.written) } +func (w *LogWriter) emitEOFTrailer() { + // Write a recyclable chunk header with a different log number. Readers + // will treat the header as EOF when the log number does not match. + b := w.block + i := b.written + binary.LittleEndian.PutUint32(b.buf[i+0:i+4], 0) // CRC + binary.LittleEndian.PutUint16(b.buf[i+4:i+6], 0) // Size + b.buf[i+6] = recyclableFullChunkType + binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum+1) // Log number + atomic.StoreInt32(&b.written, i+int32(recyclableHeaderSize)) +} + func (w *LogWriter) emitFragment(n int, p []byte) []byte { b := w.block i := b.written diff --git a/internal/record/record_test.go b/internal/record/record_test.go index d16414e769..0d503eb125 100644 --- a/internal/record/record_test.go +++ b/internal/record/record_test.go @@ -10,6 +10,7 @@ import ( "fmt" "io" "io/ioutil" + "math" "os" "strings" "testing" @@ -948,22 +949,55 @@ func TestTruncatedLog(t *testing.T) { } func TestRecycleLogWithPartialBlock(t *testing.T) { - backing := make([]byte, 16) + backing := make([]byte, 27) w := NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(1)) // Will write a chunk with 11 byte header + 5 byte payload. _, err := w.WriteRecord([]byte("aaaaa")) require.NoError(t, err) + // Close will write a 11-byte EOF chunk. require.NoError(t, w.Close()) + w = NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(2)) // Will write a chunk with 11 byte header + 1 byte payload. _, err = w.WriteRecord([]byte("a")) require.NoError(t, err) + // Close will write a 11-byte EOF chunk. require.NoError(t, w.Close()) + r := NewReader(bytes.NewReader(backing), base.FileNum(2)) _, err = r.Next() require.NoError(t, err) // 4 bytes left, which are not enough for even the legacy header. - if _, err = r.Next(); err != ErrInvalidChunk && err != io.EOF { + if _, err = r.Next(); err != io.EOF { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestRecycleLogNumberOverflow(t *testing.T) { + // We truncate log numbers to 32-bits when writing to the WAL. Test log + // recycling at the wraparound point, ensuring that EOF chunks are + // interpreted correctly. + + backing := make([]byte, 27) + w := NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(math.MaxUint32)) + // Will write a chunk with 11 byte header + 5 byte payload. + _, err := w.WriteRecord([]byte("aaaaa")) + require.NoError(t, err) + // Close will write a 11-byte EOF chunk. + require.NoError(t, w.Close()) + + w = NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(math.MaxUint32+1)) + // Will write a chunk with 11 byte header + 1 byte payload. + _, err = w.WriteRecord([]byte("a")) + require.NoError(t, err) + // Close will write a 11-byte EOF chunk. + require.NoError(t, w.Close()) + + r := NewReader(bytes.NewReader(backing), base.FileNum(math.MaxUint32+1)) + _, err = r.Next() + require.NoError(t, err) + // 4 bytes left, which are not enough for even the legacy header. + if _, err = r.Next(); err != io.EOF { t.Fatalf("unexpected error: %v", err) } } diff --git a/open.go b/open.go index 4808bd992a..1101cd7b55 100644 --- a/open.go +++ b/open.go @@ -264,8 +264,9 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { }) var ve versionEdit - for _, lf := range logFiles { - maxSeqNum, err := d.replayWAL(jobID, &ve, opts.FS, opts.FS.PathJoin(d.walDirname, lf.name), lf.num) + for i, lf := range logFiles { + lastWAL := i == len(logFiles)-1 + maxSeqNum, err := d.replayWAL(jobID, &ve, opts.FS, opts.FS.PathJoin(d.walDirname, lf.name), lf.num, lastWAL) if err != nil { return nil, err } @@ -427,7 +428,7 @@ func GetVersion(dir string, fs vfs.FS) (string, error) { // d.mu must be held when calling this, but the mutex may be dropped and // re-acquired during the course of this method. func (d *DB) replayWAL( - jobID int, ve *versionEdit, fs vfs.FS, filename string, logNum FileNum, + jobID int, ve *versionEdit, fs vfs.FS, filename string, logNum FileNum, lastWAL bool, ) (maxSeqNum uint64, err error) { file, err := fs.Open(filename) if err != nil { @@ -492,10 +493,13 @@ func (d *DB) replayWAL( } if err != nil { // It is common to encounter a zeroed or invalid chunk due to WAL - // preallocation and WAL recycling. We need to distinguish these errors - // from EOF in order to recognize that the record was truncated, but want + // preallocation and WAL recycling. We need to distinguish these + // errors from EOF in order to recognize that the record was + // truncated and to avoid replaying subsequent WALs, but want // to otherwise treat them like EOF. - if err == io.EOF || record.IsInvalidRecord(err) { + if err == io.EOF { + break + } else if record.IsInvalidRecord(err) && lastWAL { break } return 0, errors.Wrap(err, "pebble: error when replaying WAL") @@ -565,7 +569,7 @@ func (d *DB) replayWAL( toFlush[i].readerUnref() } } - return maxSeqNum, nil + return maxSeqNum, err } func checkOptions(opts *Options, path string) error { diff --git a/open_test.go b/open_test.go index 26b4ed6480..cc93350c55 100644 --- a/open_test.go +++ b/open_test.go @@ -9,6 +9,7 @@ import ( "io" "io/ioutil" "os" + "path/filepath" "reflect" "sort" "strconv" @@ -506,6 +507,62 @@ func TestOpenWALReplay2(t *testing.T) { } } +// TestTwoWALReplayCorrupt tests WAL-replay behavior when the first of the two +// WALs is corrupted with an sstable checksum error. Replay must stop at the +// first WAL because otherwise we may violate point-in-time recovery +// semantics. See #864. +func TestTwoWALReplayCorrupt(t *testing.T) { + // Use the real filesystem so that we can seek and overwrite WAL data + // easily. + dir, err := ioutil.TempDir("", "wal-replay") + require.NoError(t, err) + defer os.RemoveAll(dir) + + d, err := Open(dir, &Options{ + MemTableStopWritesThreshold: 4, + MemTableSize: 2048, + }) + require.NoError(t, err) + d.mu.Lock() + d.mu.compact.flushing = true + d.mu.Unlock() + require.NoError(t, d.Set([]byte("1"), []byte(strings.Repeat("a", 1024)), nil)) + require.NoError(t, d.Set([]byte("2"), nil, nil)) + d.mu.Lock() + d.mu.compact.flushing = false + d.mu.Unlock() + require.NoError(t, d.Close()) + + // We should have two WALs. + var logs []string + ls, err := vfs.Default.List(dir) + require.NoError(t, err) + for _, name := range ls { + if filepath.Ext(name) == ".log" { + logs = append(logs, name) + } + } + sort.Strings(logs) + if len(logs) < 2 { + t.Fatalf("expected at least two log files, found %d", len(logs)) + } + + // Corrupt the (n-1)th WAL by zeroing four bytes, 100 bytes from the end + // of the file. + f, err := os.OpenFile(filepath.Join(dir, logs[len(logs)-2]), os.O_RDWR, os.ModePerm) + require.NoError(t, err) + off, err := f.Seek(-100, 2) + require.NoError(t, err) + _, err = f.Write([]byte{0, 0, 0, 0}) + require.NoError(t, err) + require.NoError(t, f.Close()) + t.Logf("zeored four bytes in %s at offset %d\n", logs[len(logs)-2], off) + + // Re-opening the database should detect and report the corruption. + d, err = Open(dir, nil) + require.Error(t, err, "pebble: corruption") +} + // TestOpenWALReplayReadOnlySeqNums tests opening a database: // * in read-only mode // * with multiple unflushed log files that must replayed