diff --git a/server/wal/wal_impl.go b/server/wal/wal_impl.go index 65f29d71..b0028a46 100644 --- a/server/wal/wal_impl.go +++ b/server/wal/wal_impl.go @@ -64,8 +64,9 @@ type wal struct { segmentSize uint32 syncData bool - currentSegment ReadWriteSegment - readOnlySegments ReadOnlySegmentsGroup + currentSegment ReadWriteSegment + readOnlySegments ReadOnlySegmentsGroup + commitOffsetProvider CommitOffsetProvider // The last offset appended to the Wal. It might not yet be synced lastAppendedOffset atomic.Int64 @@ -102,11 +103,12 @@ func newWal(namespace string, shard int64, options *FactoryOptions, commitOffset labels := metrics.LabelsForShard(namespace, shard) w := &wal{ - walPath: walPath(options.BaseWalDir, namespace, shard), - namespace: namespace, - shard: shard, - segmentSize: uint32(options.SegmentSize), - syncData: options.SyncData, + walPath: walPath(options.BaseWalDir, namespace, shard), + namespace: namespace, + shard: shard, + segmentSize: uint32(options.SegmentSize), + syncData: options.SyncData, + commitOffsetProvider: commitOffsetProvider, appendLatency: metrics.NewLatencyHistogram("oxia_server_wal_append_latency", "The time it takes to append entries to the WAL", labels), @@ -276,7 +278,8 @@ func (t *wal) AppendAsync(entry *proto.LogEntry) error { return err } - if t.currentSegment, err = newReadWriteSegment(t.walPath, entry.Offset, t.segmentSize, 0); err != nil { + if t.currentSegment, err = newReadWriteSegment(t.walPath, entry.Offset, t.segmentSize, + 0, t.commitOffsetProvider); err != nil { t.writeErrors.Inc() return err } @@ -317,7 +320,8 @@ func (t *wal) rolloverSegment() error { lastCrc := t.currentSegment.LastCrc() t.readOnlySegments.AddedNewSegment(t.currentSegment.BaseOffset()) - if t.currentSegment, err = newReadWriteSegment(t.walPath, t.lastAppendedOffset.Load()+1, t.segmentSize, lastCrc); err != nil { + if t.currentSegment, err = newReadWriteSegment(t.walPath, t.lastAppendedOffset.Load()+1, t.segmentSize, + lastCrc, t.commitOffsetProvider); err != nil { return err } @@ -430,7 +434,8 @@ func (t *wal) Clear() error { return errors.Wrap(err, "failed to clear wal") } - if t.currentSegment, err = newReadWriteSegment(t.walPath, 0, t.segmentSize, 0); err != nil { + if t.currentSegment, err = newReadWriteSegment(t.walPath, 0, t.segmentSize, + 0, t.commitOffsetProvider); err != nil { return err } @@ -500,7 +505,7 @@ func (t *wal) TruncateLog(lastSafeOffset int64) (int64, error) { //nolint:revive return InvalidOffset, err } if t.currentSegment, err = newReadWriteSegment(t.walPath, segment.Get().BaseOffset(), - t.segmentSize, segment.Get().LastCrc()); err != nil { + t.segmentSize, segment.Get().LastCrc(), t.commitOffsetProvider); err != nil { err = multierr.Append(err, segment.Close()) return InvalidOffset, err } @@ -552,7 +557,8 @@ func (t *wal) recoverWal() error { lastCrc = 0 } - if t.currentSegment, err = newReadWriteSegment(t.walPath, lastSegment, t.segmentSize, lastCrc); err != nil { + if t.currentSegment, err = newReadWriteSegment(t.walPath, lastSegment, t.segmentSize, + lastCrc, t.commitOffsetProvider); err != nil { return err } diff --git a/server/wal/wal_ro_segment_test.go b/server/wal/wal_ro_segment_test.go index 283abf18..589e34ee 100644 --- a/server/wal/wal_ro_segment_test.go +++ b/server/wal/wal_ro_segment_test.go @@ -26,7 +26,7 @@ import ( func TestReadOnlySegment(t *testing.T) { path := t.TempDir() - rw, err := newReadWriteSegment(path, 0, 128*1024, 0) + rw, err := newReadWriteSegment(path, 0, 128*1024, 0, nil) assert.NoError(t, err) for i := int64(0); i < 10; i++ { assert.NoError(t, rw.Append(i, []byte(fmt.Sprintf("entry-%d", i)))) diff --git a/server/wal/wal_rw_segment.go b/server/wal/wal_rw_segment.go index 1a26e984..aa6bd0be 100644 --- a/server/wal/wal_rw_segment.go +++ b/server/wal/wal_rw_segment.go @@ -16,6 +16,7 @@ package wal import ( "encoding/binary" + "log/slog" "os" "sync" "time" @@ -62,7 +63,8 @@ type readWriteSegment struct { segmentSize uint32 } -func newReadWriteSegment(basePath string, baseOffset int64, segmentSize uint32, lastCrc uint32) (ReadWriteSegment, error) { +func newReadWriteSegment(basePath string, baseOffset int64, segmentSize uint32, lastCrc uint32, + commitOffsetProvider CommitOffsetProvider) (ReadWriteSegment, error) { var err error if _, err = os.Stat(basePath); os.IsNotExist(err) { if err = os.MkdirAll(basePath, 0755); err != nil { @@ -104,8 +106,7 @@ func newReadWriteSegment(basePath string, baseOffset int64, segmentSize uint32, if ms.txnMappedFile, err = mmap.MapRegion(ms.txnFile, int(segmentSize), mmap.RDWR, 0, 0); err != nil { return nil, errors.Wrapf(err, "failed to map segment file %s", ms.txnPath) } - - if err = ms.rebuildIdx(); err != nil { + if err = ms.rebuildIdx(commitOffsetProvider); err != nil { return nil, errors.Wrapf(err, "failed to rebuild index for segment file %s", ms.txnPath) } @@ -174,7 +175,7 @@ func (ms *readWriteSegment) Flush() error { return ms.txnMappedFile.Flush() } -func (ms *readWriteSegment) rebuildIdx() error { +func (ms *readWriteSegment) rebuildIdx(commitOffsetProvider CommitOffsetProvider) error { // Scan the mapped file and rebuild the index entryOffset := ms.baseOffset @@ -184,11 +185,19 @@ func (ms *readWriteSegment) rebuildIdx() error { var payloadCrc uint32 var err error if payloadSize, _, payloadCrc, err = ms.codec.ReadHeaderWithValidation(ms.txnMappedFile, ms.currentFileOffset); err != nil { - if errors.Is(err, codec.ErrOffsetOutOfBounds) || errors.Is(err, codec.ErrEmptyPayload) { + if errors.Is(err, codec.ErrEmptyPayload) { + // we might read the end of the segment. break } - if errors.Is(err, codec.ErrDataCorrupted) { - return errors.Wrapf(codec.ErrDataCorrupted, "entryOffset: %d", entryOffset) + // data corruption + if errors.Is(err, codec.ErrOffsetOutOfBounds) || errors.Is(err, codec.ErrDataCorrupted) { + if commitOffsetProvider != nil && entryOffset > commitOffsetProvider.CommitOffset() { + // uncommited data corruption, simply discard it + slog.Warn("discard the corrupted uncommited data.", + slog.Int64("entryId", entryOffset), slog.Any("error", err)) + break + } + return errors.Wrapf(err, "entryOffset: %d", entryOffset) } return err } diff --git a/server/wal/wal_rw_segment_test.go b/server/wal/wal_rw_segment_test.go index bc41fe6a..4d4a3120 100644 --- a/server/wal/wal_rw_segment_test.go +++ b/server/wal/wal_rw_segment_test.go @@ -15,6 +15,8 @@ package wal import ( + "encoding/binary" + "github.com/streamnative/oxia/server/wal/codec" "testing" "github.com/stretchr/testify/assert" @@ -23,7 +25,7 @@ import ( func TestReadWriteSegment(t *testing.T) { path := t.TempDir() - rw, err := newReadWriteSegment(path, 0, 128*1024, 0) + rw, err := newReadWriteSegment(path, 0, 128*1024, 0, nil) assert.NoError(t, err) assert.EqualValues(t, 0, rw.BaseOffset()) @@ -42,7 +44,7 @@ func TestReadWriteSegment(t *testing.T) { assert.NoError(t, rw.Close()) // Re-open and recover the segment - rw, err = newReadWriteSegment(path, 0, 128*1024, 0) + rw, err = newReadWriteSegment(path, 0, 128*1024, 0, nil) assert.NoError(t, err) assert.EqualValues(t, 0, rw.BaseOffset()) assert.EqualValues(t, 1, rw.LastOffset()) @@ -61,7 +63,7 @@ func TestReadWriteSegment(t *testing.T) { func TestReadWriteSegment_NonZero(t *testing.T) { path := t.TempDir() - rw, err := newReadWriteSegment(path, 5, 128*1024, 0) + rw, err := newReadWriteSegment(path, 5, 128*1024, 0, nil) assert.NoError(t, err) assert.EqualValues(t, 5, rw.BaseOffset()) @@ -88,14 +90,14 @@ func TestReadWriteSegment_NonZero(t *testing.T) { assert.NoError(t, rw.Close()) // Re-open and recover the segment - rw, err = newReadWriteSegment(path, 5, 128*1024, 0) + rw, err = newReadWriteSegment(path, 5, 128*1024, 0, nil) assert.NoError(t, err) assert.EqualValues(t, 5, rw.BaseOffset()) assert.EqualValues(t, 6, rw.LastOffset()) } func TestReadWriteSegment_HasSpace(t *testing.T) { - rw, err := newReadWriteSegment(t.TempDir(), 0, 1024, 0) + rw, err := newReadWriteSegment(t.TempDir(), 0, 1024, 0, nil) assert.NoError(t, err) segment := rw.(*readWriteSegment) headerSize := int(segment.codec.GetHeaderSize()) @@ -111,3 +113,185 @@ func TestReadWriteSegment_HasSpace(t *testing.T) { assert.False(t, rw.HasSpace(1020-100)) assert.True(t, rw.HasSpace(1024-100-headerSize*2)) } + +type ConfigurableCommitOffsetProvider struct { + commitOffset int64 +} + +func (c ConfigurableCommitOffsetProvider) CommitOffset() int64 { + return c.commitOffset +} + +func TestReadWriteSegment_BrokenUncommittedData_ErrOffsetOutOfBounds(t *testing.T) { + commitOffsetProvider := ConfigurableCommitOffsetProvider{} + + dir := t.TempDir() + // basic functionality test + rw, err := newReadWriteSegment(dir, 0, 1024, 0, commitOffsetProvider) + assert.NoError(t, err) + payload1 := []byte("entry-0") + assert.NoError(t, rw.Append(0, payload1)) + payload2 := []byte("entry-1") + assert.NoError(t, rw.Append(1, payload2)) + payload3 := []byte("entry-2") + assert.NoError(t, rw.Append(2, payload3)) + actualPayload1, err := rw.Read(0) + assert.NoError(t, err) + assert.EqualValues(t, payload1, actualPayload1) + actualPayload2, err := rw.Read(1) + assert.NoError(t, err) + assert.EqualValues(t, payload2, actualPayload2) + actualPayload3, err := rw.Read(2) + assert.NoError(t, err) + assert.EqualValues(t, payload3, actualPayload3) + + // move commit offset to 1 + commitOffsetProvider.commitOffset = 1 + + // inject payload size failure to trigger ErrOffsetOutOfBounds + rwSegment := rw.(*readWriteSegment) + fso := fileOffset(rwSegment.writingIdx, 0, 2) + binary.BigEndian.PutUint32(rwSegment.txnMappedFile[fso:], 9999999) + + // close the segment + rwSegment.Close() + + // recover the rw segment + rw, err = newReadWriteSegment(dir, 0, 1024, 0, commitOffsetProvider) + assert.NoError(t, err) + assert.EqualValues(t, 1, rw.LastOffset()) + + // test functionality + assert.NoError(t, rw.Append(2, payload3)) + actualPayload3, err = rw.Read(2) + assert.NoError(t, err) + assert.EqualValues(t, payload3, actualPayload3) + + rw.Close() +} + +func TestReadWriteSegment_BrokenCommittedData_ErrOffsetOutOfBounds(t *testing.T) { + commitOffsetProvider := ConfigurableCommitOffsetProvider{} + + dir := t.TempDir() + // basic functionality test + rw, err := newReadWriteSegment(dir, 0, 1024, 0, commitOffsetProvider) + assert.NoError(t, err) + payload1 := []byte("entry-0") + assert.NoError(t, rw.Append(0, payload1)) + payload2 := []byte("entry-1") + assert.NoError(t, rw.Append(1, payload2)) + payload3 := []byte("entry-2") + assert.NoError(t, rw.Append(2, payload3)) + actualPayload1, err := rw.Read(0) + assert.NoError(t, err) + assert.EqualValues(t, payload1, actualPayload1) + actualPayload2, err := rw.Read(1) + assert.NoError(t, err) + assert.EqualValues(t, payload2, actualPayload2) + actualPayload3, err := rw.Read(2) + assert.NoError(t, err) + assert.EqualValues(t, payload3, actualPayload3) + + // move commit offset to 2 + commitOffsetProvider.commitOffset = 2 + + // inject payload size failure to trigger ErrOffsetOutOfBounds + rwSegment := rw.(*readWriteSegment) + fso := fileOffset(rwSegment.writingIdx, 0, 2) + binary.BigEndian.PutUint32(rwSegment.txnMappedFile[fso:], 9999999) + + // close the segment + rwSegment.Close() + + // recover the rw segment + _, err = newReadWriteSegment(dir, 0, 1024, 0, commitOffsetProvider) + assert.ErrorIs(t, err, codec.ErrOffsetOutOfBounds) +} + +func TestReadWriteSegment_BrokenUncommittedData_ErrDataCorrupted(t *testing.T) { + commitOffsetProvider := ConfigurableCommitOffsetProvider{} + + dir := t.TempDir() + // basic functionality test + rw, err := newReadWriteSegment(dir, 0, 1024, 0, commitOffsetProvider) + assert.NoError(t, err) + payload1 := []byte("entry-0") + assert.NoError(t, rw.Append(0, payload1)) + payload2 := []byte("entry-1") + assert.NoError(t, rw.Append(1, payload2)) + payload3 := []byte("entry-2") + assert.NoError(t, rw.Append(2, payload3)) + actualPayload1, err := rw.Read(0) + assert.NoError(t, err) + assert.EqualValues(t, payload1, actualPayload1) + actualPayload2, err := rw.Read(1) + assert.NoError(t, err) + assert.EqualValues(t, payload2, actualPayload2) + actualPayload3, err := rw.Read(2) + assert.NoError(t, err) + assert.EqualValues(t, payload3, actualPayload3) + + // move commit offset to 1 + commitOffsetProvider.commitOffset = 1 + + // inject payload size failure to trigger ErrOffsetOutOfBounds + rwSegment := rw.(*readWriteSegment) + fso := fileOffset(rwSegment.writingIdx, 0, 2) + binary.BigEndian.PutUint32(rwSegment.txnMappedFile[fso+4:], 9999999) + + // close the segment + rwSegment.Close() + + // recover the rw segment + rw, err = newReadWriteSegment(dir, 0, 1024, 0, commitOffsetProvider) + assert.NoError(t, err) + assert.EqualValues(t, 1, rw.LastOffset()) + + // test functionality + assert.NoError(t, rw.Append(2, payload3)) + actualPayload3, err = rw.Read(2) + assert.NoError(t, err) + assert.EqualValues(t, payload3, actualPayload3) + + rw.Close() +} + +func TestReadWriteSegment_BrokenCommittedData_ErrDataCorrupted(t *testing.T) { + commitOffsetProvider := ConfigurableCommitOffsetProvider{} + + dir := t.TempDir() + // basic functionality test + rw, err := newReadWriteSegment(dir, 0, 1024, 0, commitOffsetProvider) + assert.NoError(t, err) + payload1 := []byte("entry-0") + assert.NoError(t, rw.Append(0, payload1)) + payload2 := []byte("entry-1") + assert.NoError(t, rw.Append(1, payload2)) + payload3 := []byte("entry-2") + assert.NoError(t, rw.Append(2, payload3)) + actualPayload1, err := rw.Read(0) + assert.NoError(t, err) + assert.EqualValues(t, payload1, actualPayload1) + actualPayload2, err := rw.Read(1) + assert.NoError(t, err) + assert.EqualValues(t, payload2, actualPayload2) + actualPayload3, err := rw.Read(2) + assert.NoError(t, err) + assert.EqualValues(t, payload3, actualPayload3) + + // move commit offset to 2 + commitOffsetProvider.commitOffset = 2 + + // inject payload size failure to trigger ErrOffsetOutOfBounds + rwSegment := rw.(*readWriteSegment) + fso := fileOffset(rwSegment.writingIdx, 0, 2) + binary.BigEndian.PutUint32(rwSegment.txnMappedFile[fso+4:], 9999999) + + // close the segment + rwSegment.Close() + + // recover the rw segment + _, err = newReadWriteSegment(dir, 0, 1024, 0, commitOffsetProvider) + assert.ErrorIs(t, err, codec.ErrDataCorrupted) +}