Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal/record: write an EOF trailer on LogWriter Close #871

Merged
merged 1 commit into from
Sep 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions internal/record/log_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,11 @@ func (w *LogWriter) queueBlock() {
func (w *LogWriter) Close() error {
f := &w.flusher

// Emit an EOF trailer signifying the end of this log. This helps readers
// differentiate between a corrupted entry in the middle of a log from
// garbage at the tail from a recycled log file.
w.emitEOFTrailer()

// Signal the flush loop to close.
f.Lock()
f.close = true
Expand Down Expand Up @@ -613,6 +618,18 @@ func (w *LogWriter) Size() int64 {
return w.blockNum*blockSize + int64(w.block.written)
}

func (w *LogWriter) emitEOFTrailer() {
// Write a recyclable chunk header with a different log number. Readers
// will treat the header as EOF when the log number does not match.
b := w.block
i := b.written
binary.LittleEndian.PutUint32(b.buf[i+0:i+4], 0) // CRC
binary.LittleEndian.PutUint16(b.buf[i+4:i+6], 0) // Size
b.buf[i+6] = recyclableFullChunkType
binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum+1) // Log number
atomic.StoreInt32(&b.written, i+int32(recyclableHeaderSize))
}

func (w *LogWriter) emitFragment(n int, p []byte) []byte {
b := w.block
i := b.written
Expand Down
38 changes: 36 additions & 2 deletions internal/record/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"os"
"strings"
"testing"
Expand Down Expand Up @@ -948,22 +949,55 @@ func TestTruncatedLog(t *testing.T) {
}

func TestRecycleLogWithPartialBlock(t *testing.T) {
backing := make([]byte, 16)
backing := make([]byte, 27)
w := NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(1))
// Will write a chunk with 11 byte header + 5 byte payload.
_, err := w.WriteRecord([]byte("aaaaa"))
require.NoError(t, err)
// Close will write a 11-byte EOF chunk.
require.NoError(t, w.Close())

w = NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(2))
// Will write a chunk with 11 byte header + 1 byte payload.
_, err = w.WriteRecord([]byte("a"))
require.NoError(t, err)
// Close will write a 11-byte EOF chunk.
require.NoError(t, w.Close())

r := NewReader(bytes.NewReader(backing), base.FileNum(2))
_, err = r.Next()
require.NoError(t, err)
// 4 bytes left, which are not enough for even the legacy header.
if _, err = r.Next(); err != ErrInvalidChunk && err != io.EOF {
if _, err = r.Next(); err != io.EOF {
t.Fatalf("unexpected error: %v", err)
}
}

func TestRecycleLogNumberOverflow(t *testing.T) {
// We truncate log numbers to 32-bits when writing to the WAL. Test log
// recycling at the wraparound point, ensuring that EOF chunks are
// interpreted correctly.

backing := make([]byte, 27)
w := NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(math.MaxUint32))
// Will write a chunk with 11 byte header + 5 byte payload.
_, err := w.WriteRecord([]byte("aaaaa"))
require.NoError(t, err)
// Close will write a 11-byte EOF chunk.
require.NoError(t, w.Close())

w = NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(math.MaxUint32+1))
// Will write a chunk with 11 byte header + 1 byte payload.
_, err = w.WriteRecord([]byte("a"))
require.NoError(t, err)
// Close will write a 11-byte EOF chunk.
require.NoError(t, w.Close())

r := NewReader(bytes.NewReader(backing), base.FileNum(math.MaxUint32+1))
_, err = r.Next()
require.NoError(t, err)
// 4 bytes left, which are not enough for even the legacy header.
if _, err = r.Next(); err != io.EOF {
t.Fatalf("unexpected error: %v", err)
}
}
Expand Down
18 changes: 11 additions & 7 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,9 @@ func Open(dirname string, opts *Options) (db *DB, _ error) {
})

var ve versionEdit
for _, lf := range logFiles {
maxSeqNum, err := d.replayWAL(jobID, &ve, opts.FS, opts.FS.PathJoin(d.walDirname, lf.name), lf.num)
for i, lf := range logFiles {
lastWAL := i == len(logFiles)-1
maxSeqNum, err := d.replayWAL(jobID, &ve, opts.FS, opts.FS.PathJoin(d.walDirname, lf.name), lf.num, lastWAL)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -427,7 +428,7 @@ func GetVersion(dir string, fs vfs.FS) (string, error) {
// d.mu must be held when calling this, but the mutex may be dropped and
// re-acquired during the course of this method.
func (d *DB) replayWAL(
jobID int, ve *versionEdit, fs vfs.FS, filename string, logNum FileNum,
jobID int, ve *versionEdit, fs vfs.FS, filename string, logNum FileNum, lastWAL bool,
) (maxSeqNum uint64, err error) {
file, err := fs.Open(filename)
if err != nil {
Expand Down Expand Up @@ -492,10 +493,13 @@ func (d *DB) replayWAL(
}
if err != nil {
// It is common to encounter a zeroed or invalid chunk due to WAL
// preallocation and WAL recycling. We need to distinguish these errors
// from EOF in order to recognize that the record was truncated, but want
// preallocation and WAL recycling. We need to distinguish these
// errors from EOF in order to recognize that the record was
// truncated and to avoid replaying subsequent WALs, but want
// to otherwise treat them like EOF.
if err == io.EOF || record.IsInvalidRecord(err) {
if err == io.EOF {
break
} else if record.IsInvalidRecord(err) && lastWAL {
break
}
return 0, errors.Wrap(err, "pebble: error when replaying WAL")
Expand Down Expand Up @@ -565,7 +569,7 @@ func (d *DB) replayWAL(
toFlush[i].readerUnref()
}
}
return maxSeqNum, nil
return maxSeqNum, err
}

func checkOptions(opts *Options, path string) error {
Expand Down
57 changes: 57 additions & 0 deletions open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"io/ioutil"
"os"
"path/filepath"
"reflect"
"sort"
"strconv"
Expand Down Expand Up @@ -506,6 +507,62 @@ func TestOpenWALReplay2(t *testing.T) {
}
}

// TestTwoWALReplayCorrupt tests WAL-replay behavior when the first of the two
// WALs is corrupted with an sstable checksum error. Replay must stop at the
// first WAL because otherwise we may violate point-in-time recovery
// semantics. See #864.
func TestTwoWALReplayCorrupt(t *testing.T) {
// Use the real filesystem so that we can seek and overwrite WAL data
// easily.
dir, err := ioutil.TempDir("", "wal-replay")
require.NoError(t, err)
defer os.RemoveAll(dir)

d, err := Open(dir, &Options{
MemTableStopWritesThreshold: 4,
MemTableSize: 2048,
})
require.NoError(t, err)
d.mu.Lock()
d.mu.compact.flushing = true
d.mu.Unlock()
require.NoError(t, d.Set([]byte("1"), []byte(strings.Repeat("a", 1024)), nil))
require.NoError(t, d.Set([]byte("2"), nil, nil))
d.mu.Lock()
d.mu.compact.flushing = false
d.mu.Unlock()
require.NoError(t, d.Close())

// We should have two WALs.
var logs []string
ls, err := vfs.Default.List(dir)
require.NoError(t, err)
for _, name := range ls {
if filepath.Ext(name) == ".log" {
logs = append(logs, name)
}
}
sort.Strings(logs)
if len(logs) < 2 {
t.Fatalf("expected at least two log files, found %d", len(logs))
}

// Corrupt the (n-1)th WAL by zeroing four bytes, 100 bytes from the end
// of the file.
f, err := os.OpenFile(filepath.Join(dir, logs[len(logs)-2]), os.O_RDWR, os.ModePerm)
require.NoError(t, err)
off, err := f.Seek(-100, 2)
require.NoError(t, err)
_, err = f.Write([]byte{0, 0, 0, 0})
require.NoError(t, err)
require.NoError(t, f.Close())
t.Logf("zeored four bytes in %s at offset %d\n", logs[len(logs)-2], off)

// Re-opening the database should detect and report the corruption.
d, err = Open(dir, nil)
require.Error(t, err, "pebble: corruption")
}

// TestOpenWALReplayReadOnlySeqNums tests opening a database:
// * in read-only mode
// * with multiple unflushed log files that must replayed
Expand Down