Skip to content

Commit

Permalink
internal/record: write an EOF trailer on LogWriter Close
Browse files Browse the repository at this point in the history
While closing a LogWriter, write a special EOF record signifying the end
of the log. This EOF record will overwrite trailing garbage from the
recycled log file, allowing us to detect corruption errors in any logs
earlier than the most recent one.

In Open return an error if any WAL other than the most recent one
encounters an invalid chunk.
  • Loading branch information
jbowens committed Sep 1, 2020
1 parent e6a9f9a commit eb829fd
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 18 deletions.
15 changes: 15 additions & 0 deletions internal/record/log_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,11 @@ func (w *LogWriter) queueBlock() {
func (w *LogWriter) Close() error {
f := &w.flusher

// Emit an EOF trailer signifying the end of this log. This helps readers
// differentiate between a corrupted entry in the middle of a log from
// garbage at the tail from a recycled log file.
w.emitEOFTrailer()

// Signal the flush loop to close.
f.Lock()
f.close = true
Expand Down Expand Up @@ -613,6 +618,16 @@ func (w *LogWriter) Size() int64 {
return w.blockNum*blockSize + int64(w.block.written)
}

func (w *LogWriter) emitEOFTrailer() {
b := w.block
i := b.written
binary.LittleEndian.PutUint32(b.buf[i+0:i+4], 0) // CRC
binary.LittleEndian.PutUint16(b.buf[i+4:i+6], 0) // Size
b.buf[i+6] = recyclableEOFChunkType
binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum)
atomic.StoreInt32(&b.written, i+int32(recyclableHeaderSize))
}

func (w *LogWriter) emitFragment(n int, p []byte) []byte {
b := w.block
i := b.written
Expand Down
18 changes: 15 additions & 3 deletions internal/record/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@
//
// Recyclable chunks are distinguished from legacy chunks by the addition of 4
// extra "recyclable" chunk types that map directly to the legacy chunk types
// (i.e. full, first, middle, last). The CRC is computed over the type, log
// number, and payload.
// (i.e. full, first, middle, last), plus one special EOF chunk type. The CRC
// is computed over the type, log number, and payload.
//
// The wire format allows for limited recovery in the face of data corruption:
// on a format error (such as a checksum mismatch), the reader moves to the
Expand Down Expand Up @@ -123,6 +123,7 @@ const (
recyclableFirstChunkType = 6
recyclableMiddleChunkType = 7
recyclableLastChunkType = 8
recyclableEOFChunkType = 9
)

const (
Expand Down Expand Up @@ -223,7 +224,7 @@ func (r *Reader) nextChunk(wantFirst bool) error {
}

headerSize := legacyHeaderSize
if chunkType >= recyclableFullChunkType && chunkType <= recyclableLastChunkType {
if chunkType >= recyclableFullChunkType && chunkType <= recyclableEOFChunkType {
headerSize = recyclableHeaderSize
if r.end+headerSize > r.n {
return ErrInvalidChunk
Expand All @@ -241,6 +242,17 @@ func (r *Reader) nextChunk(wantFirst bool) error {
return ErrInvalidChunk
}

// Look for the special EOF chunk that denotes the end of the WAL.
// These chunks prevent us from misinterpreting garbage at the
// tail of a WAL from recycling.
if chunkType == recyclableEOFChunkType {
// The EOF chunk must have a zeroed checksum and length.
if checksum != 0 || length != 0 {
return ErrInvalidChunk
}
return io.EOF
}

chunkType -= (recyclableFullChunkType - 1)
}

Expand Down
6 changes: 5 additions & 1 deletion internal/record/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,17 +948,21 @@ func TestTruncatedLog(t *testing.T) {
}

func TestRecycleLogWithPartialBlock(t *testing.T) {
backing := make([]byte, 16)
backing := make([]byte, 27)
w := NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(1))
// Will write a chunk with 11 byte header + 5 byte payload.
_, err := w.WriteRecord([]byte("aaaaa"))
require.NoError(t, err)
// Close will write a 11-byte EOF chunk.
require.NoError(t, w.Close())

w = NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(2))
// Will write a chunk with 11 byte header + 1 byte payload.
_, err = w.WriteRecord([]byte("a"))
require.NoError(t, err)
// Close will write an 11-byte EOF chunk.
require.NoError(t, w.Close())

r := NewReader(bytes.NewReader(backing), base.FileNum(2))
_, err = r.Next()
require.NoError(t, err)
Expand Down
36 changes: 22 additions & 14 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,11 @@ func Open(dirname string, opts *Options) (db *DB, _ error) {
})

var ve versionEdit
for _, lf := range logFiles {
for i, lf := range logFiles {
maxSeqNum, err := d.replayWAL(jobID, &ve, opts.FS, opts.FS.PathJoin(d.walDirname, lf.name), lf.num)
if err != nil {
// Invalid records are expected from recycled WALs, but only in the
// most recent WAL.
if err != nil && (!record.IsInvalidRecord(err) || i < len(logFiles)-1) {
return nil, err
}
d.mu.versions.markFileNumUsed(lf.num)
Expand Down Expand Up @@ -485,17 +487,22 @@ func (d *DB) replayWAL(
}
}
for {
var r io.Reader
offset = rr.Offset()
r, err := rr.Next()
r, err = rr.Next()
if err == nil {
_, err = io.Copy(&buf, r)
}
if err != nil {
// It is common to encounter a zeroed or invalid chunk due to WAL
// preallocation and WAL recycling. We need to distinguish these errors
// from EOF in order to recognize that the record was truncated, but want
// preallocation and WAL recycling. We need to distinguish these
// errors from EOF in order to recognize that the record was
// truncated and to avoid replaying any subsequent WALs, but want
// to otherwise treat them like EOF.
if err == io.EOF || record.IsInvalidRecord(err) {
if err == io.EOF {
err = nil
break
} else if record.IsInvalidRecord(err) {
break
}
return 0, errors.Wrap(err, "pebble: error when replaying WAL")
Expand Down Expand Up @@ -556,16 +563,17 @@ func (d *DB) replayWAL(
if !d.opts.ReadOnly {
c := newFlush(d.opts, d.mu.versions.currentVersion(),
1 /* base level */, toFlush, &d.bytesFlushed)
newVE, _, err := d.runCompaction(jobID, c, nilPacer)
if err != nil {
return 0, err
}
ve.NewFiles = append(ve.NewFiles, newVE.NewFiles...)
for i := range toFlush {
toFlush[i].readerUnref()
newVE, _, cerr := d.runCompaction(jobID, c, nilPacer)
if cerr == nil {
ve.NewFiles = append(ve.NewFiles, newVE.NewFiles...)
for i := range toFlush {
toFlush[i].readerUnref()
}
} else if err == nil {
err = cerr
}
}
return maxSeqNum, nil
return maxSeqNum, err
}

func checkOptions(opts *Options, path string) error {
Expand Down
57 changes: 57 additions & 0 deletions open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"io/ioutil"
"os"
"path/filepath"
"reflect"
"sort"
"strconv"
Expand Down Expand Up @@ -506,6 +507,62 @@ func TestOpenWALReplay2(t *testing.T) {
}
}

// TestTwoWALReplayCorrupt tests WAL-replay behavior when the first of the two
// WALs is corrupted with an sstable checksum error. Replay must stop at the
// first WAL because otherwise we may violate point-in-time recovery
// semantics. See #864.
func TestTwoWALReplayCorrupt(t *testing.T) {
// Use the real filesystem so that we can seek and overwrite WAL data
// easily.
dir, err := ioutil.TempDir("", "wal-replay")
require.NoError(t, err)
defer os.RemoveAll(dir)

d, err := Open(dir, &Options{
MemTableStopWritesThreshold: 4,
MemTableSize: 2048,
})
require.NoError(t, err)
d.mu.Lock()
d.mu.compact.flushing = true
d.mu.Unlock()
require.NoError(t, d.Set([]byte("1"), []byte(strings.Repeat("a", 1024)), nil))
require.NoError(t, d.Set([]byte("2"), nil, nil))
d.mu.Lock()
d.mu.compact.flushing = false
d.mu.Unlock()
require.NoError(t, d.Close())

// We should have two WALs.
var logs []string
ls, err := vfs.Default.List(dir)
require.NoError(t, err)
for _, name := range ls {
if filepath.Ext(name) == ".log" {
logs = append(logs, name)
}
}
sort.Strings(logs)
if len(logs) < 2 {
t.Fatalf("expected at least two log files, found %d", len(logs))
}

// Corrupt the (n-1)th WAL by zeroing four bytes, 100 bytes from the end
// of the file.
f, err := os.OpenFile(filepath.Join(dir, logs[len(logs)-2]), os.O_RDWR, os.ModePerm)
require.NoError(t, err)
off, err := f.Seek(-100, 2)
require.NoError(t, err)
_, err = f.Write([]byte{0, 0, 0, 0})
require.NoError(t, err)
require.NoError(t, f.Close())
t.Logf("zeored four bytes in %s at offset %d\n", logs[len(logs)-2], off)

// Re-opening the database should detect and report the corruption.
d, err = Open(dir, nil)
require.Error(t, err, "pebble: corruption")
}

// TestOpenWALReplayReadOnlySeqNums tests opening a database:
// * in read-only mode
// * with multiple unflushed log files that must replayed
Expand Down

0 comments on commit eb829fd

Please sign in to comment.