Skip to content

Commit

Permalink
feat(wal): support discard corrupted uncommitted data (#547)
Browse files Browse the repository at this point in the history
### Motivation

Support discarding corrupted uncommitted data to fix Oxia server crash
and changing host causes `mmap` didn't persist to the disk properly.


### Modification

- Adding `commitOffsetProvider` field for WAL.
- Handling corrupted when re-build the index.
  • Loading branch information
mattisonchao authored Oct 8, 2024
1 parent 7b7f322 commit 59dfd84
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 25 deletions.
30 changes: 18 additions & 12 deletions server/wal/wal_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ type wal struct {
segmentSize uint32
syncData bool

currentSegment ReadWriteSegment
readOnlySegments ReadOnlySegmentsGroup
currentSegment ReadWriteSegment
readOnlySegments ReadOnlySegmentsGroup
commitOffsetProvider CommitOffsetProvider

// The last offset appended to the Wal. It might not yet be synced
lastAppendedOffset atomic.Int64
Expand Down Expand Up @@ -102,11 +103,12 @@ func newWal(namespace string, shard int64, options *FactoryOptions, commitOffset

labels := metrics.LabelsForShard(namespace, shard)
w := &wal{
walPath: walPath(options.BaseWalDir, namespace, shard),
namespace: namespace,
shard: shard,
segmentSize: uint32(options.SegmentSize),
syncData: options.SyncData,
walPath: walPath(options.BaseWalDir, namespace, shard),
namespace: namespace,
shard: shard,
segmentSize: uint32(options.SegmentSize),
syncData: options.SyncData,
commitOffsetProvider: commitOffsetProvider,

appendLatency: metrics.NewLatencyHistogram("oxia_server_wal_append_latency",
"The time it takes to append entries to the WAL", labels),
Expand Down Expand Up @@ -276,7 +278,8 @@ func (t *wal) AppendAsync(entry *proto.LogEntry) error {
return err
}

if t.currentSegment, err = newReadWriteSegment(t.walPath, entry.Offset, t.segmentSize, 0); err != nil {
if t.currentSegment, err = newReadWriteSegment(t.walPath, entry.Offset, t.segmentSize,
0, t.commitOffsetProvider); err != nil {
t.writeErrors.Inc()
return err
}
Expand Down Expand Up @@ -317,7 +320,8 @@ func (t *wal) rolloverSegment() error {
lastCrc := t.currentSegment.LastCrc()
t.readOnlySegments.AddedNewSegment(t.currentSegment.BaseOffset())

if t.currentSegment, err = newReadWriteSegment(t.walPath, t.lastAppendedOffset.Load()+1, t.segmentSize, lastCrc); err != nil {
if t.currentSegment, err = newReadWriteSegment(t.walPath, t.lastAppendedOffset.Load()+1, t.segmentSize,
lastCrc, t.commitOffsetProvider); err != nil {
return err
}

Expand Down Expand Up @@ -430,7 +434,8 @@ func (t *wal) Clear() error {
return errors.Wrap(err, "failed to clear wal")
}

if t.currentSegment, err = newReadWriteSegment(t.walPath, 0, t.segmentSize, 0); err != nil {
if t.currentSegment, err = newReadWriteSegment(t.walPath, 0, t.segmentSize,
0, t.commitOffsetProvider); err != nil {
return err
}

Expand Down Expand Up @@ -500,7 +505,7 @@ func (t *wal) TruncateLog(lastSafeOffset int64) (int64, error) { //nolint:revive
return InvalidOffset, err
}
if t.currentSegment, err = newReadWriteSegment(t.walPath, segment.Get().BaseOffset(),
t.segmentSize, segment.Get().LastCrc()); err != nil {
t.segmentSize, segment.Get().LastCrc(), t.commitOffsetProvider); err != nil {
err = multierr.Append(err, segment.Close())
return InvalidOffset, err
}
Expand Down Expand Up @@ -552,7 +557,8 @@ func (t *wal) recoverWal() error {
lastCrc = 0
}

if t.currentSegment, err = newReadWriteSegment(t.walPath, lastSegment, t.segmentSize, lastCrc); err != nil {
if t.currentSegment, err = newReadWriteSegment(t.walPath, lastSegment, t.segmentSize,
lastCrc, t.commitOffsetProvider); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion server/wal/wal_ro_segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
func TestReadOnlySegment(t *testing.T) {
path := t.TempDir()

rw, err := newReadWriteSegment(path, 0, 128*1024, 0)
rw, err := newReadWriteSegment(path, 0, 128*1024, 0, nil)
assert.NoError(t, err)
for i := int64(0); i < 10; i++ {
assert.NoError(t, rw.Append(i, []byte(fmt.Sprintf("entry-%d", i))))
Expand Down
23 changes: 16 additions & 7 deletions server/wal/wal_rw_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package wal

import (
"encoding/binary"
"log/slog"
"os"
"sync"
"time"
Expand Down Expand Up @@ -62,7 +63,8 @@ type readWriteSegment struct {
segmentSize uint32
}

func newReadWriteSegment(basePath string, baseOffset int64, segmentSize uint32, lastCrc uint32) (ReadWriteSegment, error) {
func newReadWriteSegment(basePath string, baseOffset int64, segmentSize uint32, lastCrc uint32,
commitOffsetProvider CommitOffsetProvider) (ReadWriteSegment, error) {
var err error
if _, err = os.Stat(basePath); os.IsNotExist(err) {
if err = os.MkdirAll(basePath, 0755); err != nil {
Expand Down Expand Up @@ -104,8 +106,7 @@ func newReadWriteSegment(basePath string, baseOffset int64, segmentSize uint32,
if ms.txnMappedFile, err = mmap.MapRegion(ms.txnFile, int(segmentSize), mmap.RDWR, 0, 0); err != nil {
return nil, errors.Wrapf(err, "failed to map segment file %s", ms.txnPath)
}

if err = ms.rebuildIdx(); err != nil {
if err = ms.rebuildIdx(commitOffsetProvider); err != nil {
return nil, errors.Wrapf(err, "failed to rebuild index for segment file %s", ms.txnPath)
}

Expand Down Expand Up @@ -174,7 +175,7 @@ func (ms *readWriteSegment) Flush() error {
return ms.txnMappedFile.Flush()
}

func (ms *readWriteSegment) rebuildIdx() error {
func (ms *readWriteSegment) rebuildIdx(commitOffsetProvider CommitOffsetProvider) error {
// Scan the mapped file and rebuild the index

entryOffset := ms.baseOffset
Expand All @@ -184,11 +185,19 @@ func (ms *readWriteSegment) rebuildIdx() error {
var payloadCrc uint32
var err error
if payloadSize, _, payloadCrc, err = ms.codec.ReadHeaderWithValidation(ms.txnMappedFile, ms.currentFileOffset); err != nil {
if errors.Is(err, codec.ErrOffsetOutOfBounds) || errors.Is(err, codec.ErrEmptyPayload) {
if errors.Is(err, codec.ErrEmptyPayload) {
// we might read the end of the segment.
break
}
if errors.Is(err, codec.ErrDataCorrupted) {
return errors.Wrapf(codec.ErrDataCorrupted, "entryOffset: %d", entryOffset)
// data corruption
if errors.Is(err, codec.ErrOffsetOutOfBounds) || errors.Is(err, codec.ErrDataCorrupted) {
if commitOffsetProvider != nil && entryOffset > commitOffsetProvider.CommitOffset() {
// uncommited data corruption, simply discard it
slog.Warn("discard the corrupted uncommited data.",
slog.Int64("entryId", entryOffset), slog.Any("error", err))
break
}
return errors.Wrapf(err, "entryOffset: %d", entryOffset)
}
return err
}
Expand Down
194 changes: 189 additions & 5 deletions server/wal/wal_rw_segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package wal

import (
"encoding/binary"
"github.com/streamnative/oxia/server/wal/codec"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -23,7 +25,7 @@ import (
func TestReadWriteSegment(t *testing.T) {
path := t.TempDir()

rw, err := newReadWriteSegment(path, 0, 128*1024, 0)
rw, err := newReadWriteSegment(path, 0, 128*1024, 0, nil)
assert.NoError(t, err)

assert.EqualValues(t, 0, rw.BaseOffset())
Expand All @@ -42,7 +44,7 @@ func TestReadWriteSegment(t *testing.T) {
assert.NoError(t, rw.Close())

// Re-open and recover the segment
rw, err = newReadWriteSegment(path, 0, 128*1024, 0)
rw, err = newReadWriteSegment(path, 0, 128*1024, 0, nil)
assert.NoError(t, err)
assert.EqualValues(t, 0, rw.BaseOffset())
assert.EqualValues(t, 1, rw.LastOffset())
Expand All @@ -61,7 +63,7 @@ func TestReadWriteSegment(t *testing.T) {
func TestReadWriteSegment_NonZero(t *testing.T) {
path := t.TempDir()

rw, err := newReadWriteSegment(path, 5, 128*1024, 0)
rw, err := newReadWriteSegment(path, 5, 128*1024, 0, nil)
assert.NoError(t, err)

assert.EqualValues(t, 5, rw.BaseOffset())
Expand All @@ -88,14 +90,14 @@ func TestReadWriteSegment_NonZero(t *testing.T) {
assert.NoError(t, rw.Close())

// Re-open and recover the segment
rw, err = newReadWriteSegment(path, 5, 128*1024, 0)
rw, err = newReadWriteSegment(path, 5, 128*1024, 0, nil)
assert.NoError(t, err)
assert.EqualValues(t, 5, rw.BaseOffset())
assert.EqualValues(t, 6, rw.LastOffset())
}

func TestReadWriteSegment_HasSpace(t *testing.T) {
rw, err := newReadWriteSegment(t.TempDir(), 0, 1024, 0)
rw, err := newReadWriteSegment(t.TempDir(), 0, 1024, 0, nil)
assert.NoError(t, err)
segment := rw.(*readWriteSegment)
headerSize := int(segment.codec.GetHeaderSize())
Expand All @@ -111,3 +113,185 @@ func TestReadWriteSegment_HasSpace(t *testing.T) {
assert.False(t, rw.HasSpace(1020-100))
assert.True(t, rw.HasSpace(1024-100-headerSize*2))
}

type ConfigurableCommitOffsetProvider struct {
commitOffset int64
}

func (c ConfigurableCommitOffsetProvider) CommitOffset() int64 {
return c.commitOffset
}

func TestReadWriteSegment_BrokenUncommittedData_ErrOffsetOutOfBounds(t *testing.T) {
commitOffsetProvider := ConfigurableCommitOffsetProvider{}

dir := t.TempDir()
// basic functionality test
rw, err := newReadWriteSegment(dir, 0, 1024, 0, commitOffsetProvider)
assert.NoError(t, err)
payload1 := []byte("entry-0")
assert.NoError(t, rw.Append(0, payload1))
payload2 := []byte("entry-1")
assert.NoError(t, rw.Append(1, payload2))
payload3 := []byte("entry-2")
assert.NoError(t, rw.Append(2, payload3))
actualPayload1, err := rw.Read(0)
assert.NoError(t, err)
assert.EqualValues(t, payload1, actualPayload1)
actualPayload2, err := rw.Read(1)
assert.NoError(t, err)
assert.EqualValues(t, payload2, actualPayload2)
actualPayload3, err := rw.Read(2)
assert.NoError(t, err)
assert.EqualValues(t, payload3, actualPayload3)

// move commit offset to 1
commitOffsetProvider.commitOffset = 1

// inject payload size failure to trigger ErrOffsetOutOfBounds
rwSegment := rw.(*readWriteSegment)
fso := fileOffset(rwSegment.writingIdx, 0, 2)
binary.BigEndian.PutUint32(rwSegment.txnMappedFile[fso:], 9999999)

// close the segment
rwSegment.Close()

// recover the rw segment
rw, err = newReadWriteSegment(dir, 0, 1024, 0, commitOffsetProvider)
assert.NoError(t, err)
assert.EqualValues(t, 1, rw.LastOffset())

// test functionality
assert.NoError(t, rw.Append(2, payload3))
actualPayload3, err = rw.Read(2)
assert.NoError(t, err)
assert.EqualValues(t, payload3, actualPayload3)

rw.Close()
}

func TestReadWriteSegment_BrokenCommittedData_ErrOffsetOutOfBounds(t *testing.T) {
commitOffsetProvider := ConfigurableCommitOffsetProvider{}

dir := t.TempDir()
// basic functionality test
rw, err := newReadWriteSegment(dir, 0, 1024, 0, commitOffsetProvider)
assert.NoError(t, err)
payload1 := []byte("entry-0")
assert.NoError(t, rw.Append(0, payload1))
payload2 := []byte("entry-1")
assert.NoError(t, rw.Append(1, payload2))
payload3 := []byte("entry-2")
assert.NoError(t, rw.Append(2, payload3))
actualPayload1, err := rw.Read(0)
assert.NoError(t, err)
assert.EqualValues(t, payload1, actualPayload1)
actualPayload2, err := rw.Read(1)
assert.NoError(t, err)
assert.EqualValues(t, payload2, actualPayload2)
actualPayload3, err := rw.Read(2)
assert.NoError(t, err)
assert.EqualValues(t, payload3, actualPayload3)

// move commit offset to 2
commitOffsetProvider.commitOffset = 2

// inject payload size failure to trigger ErrOffsetOutOfBounds
rwSegment := rw.(*readWriteSegment)
fso := fileOffset(rwSegment.writingIdx, 0, 2)
binary.BigEndian.PutUint32(rwSegment.txnMappedFile[fso:], 9999999)

// close the segment
rwSegment.Close()

// recover the rw segment
_, err = newReadWriteSegment(dir, 0, 1024, 0, commitOffsetProvider)
assert.ErrorIs(t, err, codec.ErrOffsetOutOfBounds)
}

func TestReadWriteSegment_BrokenUncommittedData_ErrDataCorrupted(t *testing.T) {
commitOffsetProvider := ConfigurableCommitOffsetProvider{}

dir := t.TempDir()
// basic functionality test
rw, err := newReadWriteSegment(dir, 0, 1024, 0, commitOffsetProvider)
assert.NoError(t, err)
payload1 := []byte("entry-0")
assert.NoError(t, rw.Append(0, payload1))
payload2 := []byte("entry-1")
assert.NoError(t, rw.Append(1, payload2))
payload3 := []byte("entry-2")
assert.NoError(t, rw.Append(2, payload3))
actualPayload1, err := rw.Read(0)
assert.NoError(t, err)
assert.EqualValues(t, payload1, actualPayload1)
actualPayload2, err := rw.Read(1)
assert.NoError(t, err)
assert.EqualValues(t, payload2, actualPayload2)
actualPayload3, err := rw.Read(2)
assert.NoError(t, err)
assert.EqualValues(t, payload3, actualPayload3)

// move commit offset to 1
commitOffsetProvider.commitOffset = 1

// inject payload size failure to trigger ErrOffsetOutOfBounds
rwSegment := rw.(*readWriteSegment)
fso := fileOffset(rwSegment.writingIdx, 0, 2)
binary.BigEndian.PutUint32(rwSegment.txnMappedFile[fso+4:], 9999999)

// close the segment
rwSegment.Close()

// recover the rw segment
rw, err = newReadWriteSegment(dir, 0, 1024, 0, commitOffsetProvider)
assert.NoError(t, err)
assert.EqualValues(t, 1, rw.LastOffset())

// test functionality
assert.NoError(t, rw.Append(2, payload3))
actualPayload3, err = rw.Read(2)
assert.NoError(t, err)
assert.EqualValues(t, payload3, actualPayload3)

rw.Close()
}

func TestReadWriteSegment_BrokenCommittedData_ErrDataCorrupted(t *testing.T) {
commitOffsetProvider := ConfigurableCommitOffsetProvider{}

dir := t.TempDir()
// basic functionality test
rw, err := newReadWriteSegment(dir, 0, 1024, 0, commitOffsetProvider)
assert.NoError(t, err)
payload1 := []byte("entry-0")
assert.NoError(t, rw.Append(0, payload1))
payload2 := []byte("entry-1")
assert.NoError(t, rw.Append(1, payload2))
payload3 := []byte("entry-2")
assert.NoError(t, rw.Append(2, payload3))
actualPayload1, err := rw.Read(0)
assert.NoError(t, err)
assert.EqualValues(t, payload1, actualPayload1)
actualPayload2, err := rw.Read(1)
assert.NoError(t, err)
assert.EqualValues(t, payload2, actualPayload2)
actualPayload3, err := rw.Read(2)
assert.NoError(t, err)
assert.EqualValues(t, payload3, actualPayload3)

// move commit offset to 2
commitOffsetProvider.commitOffset = 2

// inject payload size failure to trigger ErrOffsetOutOfBounds
rwSegment := rw.(*readWriteSegment)
fso := fileOffset(rwSegment.writingIdx, 0, 2)
binary.BigEndian.PutUint32(rwSegment.txnMappedFile[fso+4:], 9999999)

// close the segment
rwSegment.Close()

// recover the rw segment
_, err = newReadWriteSegment(dir, 0, 1024, 0, commitOffsetProvider)
assert.ErrorIs(t, err, codec.ErrDataCorrupted)
}

0 comments on commit 59dfd84

Please sign in to comment.