Skip to content

Commit

Permalink
internal/record: write an EOF trailer on LogWriter Close
Browse files Browse the repository at this point in the history
While closing a LogWriter, write a special EOF record with an
incremented log number signifying the end of the log. This EOF record
will overwrite trailing garbage from the recycled log file, allowing us
to detect corruption errors in any logs earlier than the most recent
one.

In Open return an error if any WAL other than the most recent one
encounters an invalid chunk.
  • Loading branch information
jbowens committed Sep 2, 2020
1 parent e6a9f9a commit 59020cb
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 9 deletions.
17 changes: 17 additions & 0 deletions internal/record/log_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,11 @@ func (w *LogWriter) queueBlock() {
func (w *LogWriter) Close() error {
f := &w.flusher

// Emit an EOF trailer signifying the end of this log. This helps readers
// differentiate between a corrupted entry in the middle of a log from
// garbage at the tail from a recycled log file.
w.emitEOFTrailer()

// Signal the flush loop to close.
f.Lock()
f.close = true
Expand Down Expand Up @@ -613,6 +618,18 @@ func (w *LogWriter) Size() int64 {
return w.blockNum*blockSize + int64(w.block.written)
}

func (w *LogWriter) emitEOFTrailer() {
// Write a recyclable chunk header with a different log number. Readers
// will treat the header as EOF when the log number does not match.
b := w.block
i := b.written
binary.LittleEndian.PutUint32(b.buf[i+0:i+4], 0) // CRC
binary.LittleEndian.PutUint16(b.buf[i+4:i+6], 0) // Size
b.buf[i+6] = recyclableFullChunkType
binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum+1) // Log number
atomic.StoreInt32(&b.written, i+int32(recyclableHeaderSize))
}

func (w *LogWriter) emitFragment(n int, p []byte) []byte {
b := w.block
i := b.written
Expand Down
38 changes: 36 additions & 2 deletions internal/record/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"os"
"strings"
"testing"
Expand Down Expand Up @@ -948,22 +949,55 @@ func TestTruncatedLog(t *testing.T) {
}

func TestRecycleLogWithPartialBlock(t *testing.T) {
backing := make([]byte, 16)
backing := make([]byte, 27)
w := NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(1))
// Will write a chunk with 11 byte header + 5 byte payload.
_, err := w.WriteRecord([]byte("aaaaa"))
require.NoError(t, err)
// Close will write a 11-byte EOF chunk.
require.NoError(t, w.Close())

w = NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(2))
// Will write a chunk with 11 byte header + 1 byte payload.
_, err = w.WriteRecord([]byte("a"))
require.NoError(t, err)
// Close will write a 11-byte EOF chunk.
require.NoError(t, w.Close())

r := NewReader(bytes.NewReader(backing), base.FileNum(2))
_, err = r.Next()
require.NoError(t, err)
// 4 bytes left, which are not enough for even the legacy header.
if _, err = r.Next(); err != ErrInvalidChunk && err != io.EOF {
if _, err = r.Next(); err != io.EOF {
t.Fatalf("unexpected error: %v", err)
}
}

func TestRecycleLogNumberOverflow(t *testing.T) {
// We truncate log numbers to 32-bits when writing to the WAL. Test log
// recycling at the wraparound point, ensuring that EOF chunks are
// interpreted correctly.

backing := make([]byte, 27)
w := NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(math.MaxUint32))
// Will write a chunk with 11 byte header + 5 byte payload.
_, err := w.WriteRecord([]byte("aaaaa"))
require.NoError(t, err)
// Close will write a 11-byte EOF chunk.
require.NoError(t, w.Close())

w = NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(math.MaxUint32+1))
// Will write a chunk with 11 byte header + 1 byte payload.
_, err = w.WriteRecord([]byte("a"))
require.NoError(t, err)
// Close will write a 11-byte EOF chunk.
require.NoError(t, w.Close())

r := NewReader(bytes.NewReader(backing), base.FileNum(math.MaxUint32+1))
_, err = r.Next()
require.NoError(t, err)
// 4 bytes left, which are not enough for even the legacy header.
if _, err = r.Next(); err != io.EOF {
t.Fatalf("unexpected error: %v", err)
}
}
Expand Down
18 changes: 11 additions & 7 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,9 @@ func Open(dirname string, opts *Options) (db *DB, _ error) {
})

var ve versionEdit
for _, lf := range logFiles {
maxSeqNum, err := d.replayWAL(jobID, &ve, opts.FS, opts.FS.PathJoin(d.walDirname, lf.name), lf.num)
for i, lf := range logFiles {
lastWAL := i == len(logFiles)-1
maxSeqNum, err := d.replayWAL(jobID, &ve, opts.FS, opts.FS.PathJoin(d.walDirname, lf.name), lf.num, lastWAL)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -427,7 +428,7 @@ func GetVersion(dir string, fs vfs.FS) (string, error) {
// d.mu must be held when calling this, but the mutex may be dropped and
// re-acquired during the course of this method.
func (d *DB) replayWAL(
jobID int, ve *versionEdit, fs vfs.FS, filename string, logNum FileNum,
jobID int, ve *versionEdit, fs vfs.FS, filename string, logNum FileNum, lastWAL bool,
) (maxSeqNum uint64, err error) {
file, err := fs.Open(filename)
if err != nil {
Expand Down Expand Up @@ -492,10 +493,13 @@ func (d *DB) replayWAL(
}
if err != nil {
// It is common to encounter a zeroed or invalid chunk due to WAL
// preallocation and WAL recycling. We need to distinguish these errors
// from EOF in order to recognize that the record was truncated, but want
// preallocation and WAL recycling. We need to distinguish these
// errors from EOF in order to recognize that the record was
// truncated and to avoid replaying subsequent WALs, but want
// to otherwise treat them like EOF.
if err == io.EOF || record.IsInvalidRecord(err) {
if err == io.EOF {
break
} else if record.IsInvalidRecord(err) && lastWAL {
break
}
return 0, errors.Wrap(err, "pebble: error when replaying WAL")
Expand Down Expand Up @@ -565,7 +569,7 @@ func (d *DB) replayWAL(
toFlush[i].readerUnref()
}
}
return maxSeqNum, nil
return maxSeqNum, err
}

func checkOptions(opts *Options, path string) error {
Expand Down
57 changes: 57 additions & 0 deletions open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"io/ioutil"
"os"
"path/filepath"
"reflect"
"sort"
"strconv"
Expand Down Expand Up @@ -506,6 +507,62 @@ func TestOpenWALReplay2(t *testing.T) {
}
}

// TestTwoWALReplayCorrupt tests WAL-replay behavior when the first of the two
// WALs is corrupted with an sstable checksum error. Replay must stop at the
// first WAL because otherwise we may violate point-in-time recovery
// semantics. See #864.
func TestTwoWALReplayCorrupt(t *testing.T) {
// Use the real filesystem so that we can seek and overwrite WAL data
// easily.
dir, err := ioutil.TempDir("", "wal-replay")
require.NoError(t, err)
defer os.RemoveAll(dir)

d, err := Open(dir, &Options{
MemTableStopWritesThreshold: 4,
MemTableSize: 2048,
})
require.NoError(t, err)
d.mu.Lock()
d.mu.compact.flushing = true
d.mu.Unlock()
require.NoError(t, d.Set([]byte("1"), []byte(strings.Repeat("a", 1024)), nil))
require.NoError(t, d.Set([]byte("2"), nil, nil))
d.mu.Lock()
d.mu.compact.flushing = false
d.mu.Unlock()
require.NoError(t, d.Close())

// We should have two WALs.
var logs []string
ls, err := vfs.Default.List(dir)
require.NoError(t, err)
for _, name := range ls {
if filepath.Ext(name) == ".log" {
logs = append(logs, name)
}
}
sort.Strings(logs)
if len(logs) < 2 {
t.Fatalf("expected at least two log files, found %d", len(logs))
}

// Corrupt the (n-1)th WAL by zeroing four bytes, 100 bytes from the end
// of the file.
f, err := os.OpenFile(filepath.Join(dir, logs[len(logs)-2]), os.O_RDWR, os.ModePerm)
require.NoError(t, err)
off, err := f.Seek(-100, 2)
require.NoError(t, err)
_, err = f.Write([]byte{0, 0, 0, 0})
require.NoError(t, err)
require.NoError(t, f.Close())
t.Logf("zeored four bytes in %s at offset %d\n", logs[len(logs)-2], off)

// Re-opening the database should detect and report the corruption.
d, err = Open(dir, nil)
require.Error(t, err, "pebble: corruption")
}

// TestOpenWALReplayReadOnlySeqNums tests opening a database:
// * in read-only mode
// * with multiple unflushed log files that must replayed
Expand Down

0 comments on commit 59020cb

Please sign in to comment.