Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(wal): support crc for index file #549

Merged
merged 10 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ linters-settings:
severity: warning
disabled: false
arguments:
- 4
- 5
- name: flag-parameter
severity: warning
disabled: false
Expand Down Expand Up @@ -169,4 +169,4 @@ issues:
- path: _grpc\.go
linters:
- revive
text: "blank-imports"
text: "blank-imports"
60 changes: 57 additions & 3 deletions server/wal/codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package codec
import (
"encoding/binary"
"os"
"sync"

"github.com/pkg/errors"
)
Expand All @@ -28,9 +29,10 @@ var (
)

type Metadata struct {
TxnExtension string
IdxExtension string
HeaderSize uint32
TxnExtension string
IdxExtension string
HeaderSize uint32
IdxHeaderSize uint32
}

type Codec interface {
Expand Down Expand Up @@ -90,6 +92,41 @@ type Codec interface {
// - recordSize: The total size of the written record, including the header.
// - payloadCrc: The CRC value of the written payload.
WriteRecord(buf []byte, startFileOffset uint32, previousCrc uint32, payload []byte) (recordSize uint32, payloadCrc uint32)

// GetIndexHeaderSize returns the size of the index header in bytes.
// The header size is typically a fixed value representing the metadata at the
// beginning of an index file.
GetIndexHeaderSize() uint32

// WriteIndex writes the provided index data to the specified file path.
// Parameters:
// - path: is the location where the index file will be written.
// - index: is the byte slice that contains the index data.
// Returns an error if the file cannot be written or if any I/O issues occur.
WriteIndex(path string, index []byte) error

// ReadIndex reads the index data from the specified file path.
// Parameters
// - path is the location of the index file to be read.
// Returns the index data as a byte slice and an error if any I/O issues occur.
ReadIndex(path string) ([]byte, error)

// RecoverIndex attempts to recover the index from a txn byte buffer.
//
// Parameters:
// - buf: the byte slice containing the raw data.
// - startFileOffset: the starting file offset from which recovery begins.
// - baseEntryOffset: the base offset for the index entries, used to adjust entry offsets.
// - commitOffset: a pointer to the commit offset, which is using for auto-discard uncommited corruption data
//
// Returns:
// - index: the recovered index data as a byte slice.
// - lastCrc: the CRC of the last valid entry in the index, used to verify data corruption.
// - newFileOffset: the new file offset after recovery, indicating where the next data should be written.
// - lastEntryOffset: the offset of the last valid entry in the recovered index.
// - err: an error if the recovery process encounters issues, such as data corruption or invalid entries.
RecoverIndex(buf []byte, startFileOffset uint32, baseEntryOffset int64, commitOffset *int64) (index []byte,
lastCrc uint32, newFileOffset uint32, lastEntryOffset int64, err error)
}

// The latest codec.
Expand Down Expand Up @@ -127,3 +164,20 @@ func GetOrCreate(basePath string) (_codec Codec, exist bool, err error) {
func ReadInt(b []byte, offset uint32) uint32 {
return binary.BigEndian.Uint32(b[offset : offset+4])
}

// Index buf
var bufferPool = sync.Pool{}

const initialIndexBufferCapacity = 16 * 1024

func BorrowEmptyIndexBuf() []byte {
if pooledBuffer, ok := bufferPool.Get().(*[]byte); ok {
return (*pooledBuffer)[:0]
}
// Start with empty slice, though with some initial capacity
return make([]byte, 0, initialIndexBufferCapacity)
}

func ReturnIndexBuf(buf *[]byte) {
bufferPool.Put(buf)
}
99 changes: 83 additions & 16 deletions server/wal/codec/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,56 @@ package codec

import (
"encoding/binary"

"github.com/pkg/errors"
"go.uber.org/multierr"
"io"
"os"
)

// Txn File:
// +--------------+--------------+
// | Size(4Bytes) | Payload(...) |
// | Size(4 Bytes) | Payload(...) |
// +--------------+--------------+
// Size: Length of the payload data
// Payload: Byte stream as long as specified by the payload size.
var _ Codec = V1{}

// Idx File:
// +----------------+----------------+----------------+
// | Index(4 Bytes) | Index(4 Bytes) | ... |
// +----------------+----------------+----------------+
// Index: The file offset index
var _ Codec = &V1{}

const v1PayloadSizeLen uint32 = 4
const v1TxnExtension = ".txn"
const v1IdxExtension = ".idx"

var v1 = &V1{
Metadata{
TxnExtension: v1TxnExtension,
IdxExtension: v1IdxExtension,
HeaderSize: v1PayloadSizeLen,
TxnExtension: v1TxnExtension,
IdxExtension: v1IdxExtension,
HeaderSize: v1PayloadSizeLen,
IdxHeaderSize: 0,
},
}

type V1 struct {
Metadata
}

func (v V1) GetIdxExtension() string {
func (v *V1) GetIdxExtension() string {
return v.IdxExtension
}

func (v V1) GetTxnExtension() string {
func (v *V1) GetTxnExtension() string {
return v.TxnExtension
}

func (v V1) GetHeaderSize() uint32 {
func (v *V1) GetHeaderSize() uint32 {
return v.HeaderSize
}

func (v V1) ReadRecordWithValidation(buf []byte, startFileOffset uint32) (payload []byte, err error) {
func (v *V1) ReadRecordWithValidation(buf []byte, startFileOffset uint32) (payload []byte, err error) {
var payloadSize uint32
if payloadSize, _, _, err = v.ReadHeaderWithValidation(buf, startFileOffset); err != nil {
return nil, err
Expand All @@ -66,38 +76,36 @@ func (v V1) ReadRecordWithValidation(buf []byte, startFileOffset uint32) (payloa
return payload, nil
}

func (v V1) GetRecordSize(buf []byte, startFileOffset uint32) (payloadSize uint32, err error) {
func (v *V1) GetRecordSize(buf []byte, startFileOffset uint32) (payloadSize uint32, err error) {
if payloadSize, _, _, err = v.ReadHeaderWithValidation(buf, startFileOffset); err != nil {
return 0, err
}
return v.HeaderSize + payloadSize, nil
}

func (v V1) ReadHeaderWithValidation(buf []byte, startFileOffset uint32) (payloadSize uint32, previousCrc uint32, payloadCrc uint32, err error) {
func (v *V1) ReadHeaderWithValidation(buf []byte, startFileOffset uint32) (payloadSize uint32, previousCrc uint32, payloadCrc uint32, err error) {
bufSize := uint32(len(buf))
if startFileOffset >= bufSize {
return payloadSize, previousCrc, payloadCrc, errors.Wrapf(ErrOffsetOutOfBounds,
"expected payload size: %d. actual buf size: %d ", startFileOffset+v1PayloadSizeLen, bufSize)
}

var headerOffset uint32
payloadSize = ReadInt(buf, startFileOffset)
headerOffset += v1PayloadSizeLen
// It shouldn't happen when normal reading
if payloadSize == 0 {
return payloadSize, previousCrc, payloadCrc, errors.Wrapf(ErrEmptyPayload, "unexpected empty payload")
}
expectSize := payloadSize + v.HeaderSize
// overflow checking
actualBufSize := bufSize - (startFileOffset + headerOffset)
actualBufSize := bufSize - startFileOffset
if expectSize > actualBufSize {
return payloadSize, previousCrc, payloadCrc,
errors.Wrapf(ErrOffsetOutOfBounds, "expected payload size: %d. actual buf size: %d ", expectSize, bufSize)
}
return payloadSize, previousCrc, payloadCrc, nil
}

func (V1) WriteRecord(buf []byte, startOffset uint32, _ uint32, payload []byte) (recordSize uint32, payloadCrc uint32) {
func (*V1) WriteRecord(buf []byte, startOffset uint32, _ uint32, payload []byte) (recordSize uint32, payloadCrc uint32) {
payloadSize := uint32(len(payload))

var headerOffset uint32
Expand All @@ -107,3 +115,62 @@ func (V1) WriteRecord(buf []byte, startOffset uint32, _ uint32, payload []byte)
copy(buf[startOffset+headerOffset:], payload)
return headerOffset + payloadSize, payloadCrc
}

func (*V1) WriteIndex(path string, index []byte) error {
idxFile, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return errors.Wrapf(err, "failed to open index file %s", path)
}

if _, err = idxFile.Write(index); err != nil {
return errors.Wrapf(err, "failed write index file %s", path)
}
return idxFile.Close()
}

func (*V1) ReadIndex(path string) ([]byte, error) {
var idFile *os.File
var err error
if idFile, err = os.OpenFile(path, os.O_RDONLY, 0); err != nil {
return nil, errors.Wrapf(err, "failed to open segment index file %s", path)
}
var indexBuf []byte
if indexBuf, err = io.ReadAll(idFile); err != nil {
return nil, multierr.Combine(
errors.Wrapf(err, "failed to read segment index file %s", path),
idFile.Close())
}
if err = idFile.Close(); err != nil {
return nil, errors.Wrapf(err, "failed to close segment index file %s", path)
}
return indexBuf, nil
}

func (v *V1) GetIndexHeaderSize() uint32 {
return v.IdxHeaderSize
}

func (v *V1) RecoverIndex(buf []byte, startFileOffset uint32, baseEntryOffset int64,
_ *int64) (index []byte, lastCrc uint32, newFileOffset uint32, lastEntryOffset int64, err error) {
maxSize := uint32(len(buf))
newFileOffset = startFileOffset
index = BorrowEmptyIndexBuf()
currentEntryOffset := baseEntryOffset

for newFileOffset < maxSize {
var payloadSize uint32
var err error
if payloadSize, _, _, err = v.ReadHeaderWithValidation(buf, newFileOffset); err != nil {
if errors.Is(err, ErrEmptyPayload) || errors.Is(err, ErrOffsetOutOfBounds) {
// we might read the end of the segment.
break
}
return nil, 0, 0, 0, err
}
index = binary.BigEndian.AppendUint32(index, newFileOffset)
newFileOffset += v.GetHeaderSize() + payloadSize
currentEntryOffset++
}

return index, lastCrc, newFileOffset, currentEntryOffset - 1, nil
}
52 changes: 52 additions & 0 deletions server/wal/codec/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
package codec

import (
"encoding/binary"
"github.com/google/uuid"
"os"
"path"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -37,3 +41,51 @@ func TestV1_Codec(t *testing.T) {
assert.NoError(t, err)
assert.EqualValues(t, payload, getPayload)
}

func TestV1_WriteReadIndex(t *testing.T) {
dir := os.TempDir()
fileName := "0"
elementsNum := 5
indexBuf := make([]byte, uint32(elementsNum*4)+v1.GetIndexHeaderSize())
for i := 0; i < elementsNum; i++ {
binary.BigEndian.PutUint32(indexBuf[i*4:], uint32(i))
}
p := path.Join(dir, fileName+v1.GetIdxExtension())
err := v1.WriteIndex(p, indexBuf)
assert.NoError(t, err)
index, err := v1.ReadIndex(p)
assert.NoError(t, err)
for i := 0; i < elementsNum; i++ {
idx := ReadInt(index, uint32(i*4))
assert.EqualValues(t, idx, i)
}
}

func TestV1_RecoverIndex(t *testing.T) {
elementsNum := 5

buf := make([]byte, 100)
var payloads [][]byte
for i := 0; i < elementsNum; i++ {
payload, err := uuid.New().MarshalBinary()
assert.NoError(t, err)
payloads = append(payloads, payload)
}

fOffset := uint32(0)
for i := 0; i < elementsNum; i++ {
recordSize, _ := v1.WriteRecord(buf, fOffset, 0, payloads[i])
fOffset += recordSize
}

index, _, newFileOffset, lastEntryOffset, err := v1.RecoverIndex(buf, 0, 0, nil)
assert.NoError(t, err)
assert.EqualValues(t, lastEntryOffset, 4)
assert.EqualValues(t, fOffset, newFileOffset)
for i := 0; i < elementsNum; i++ {
fOffset := ReadInt(index, uint32(i*4))
payload, err := v1.ReadRecordWithValidation(buf, fOffset)
assert.NoError(t, err)
assert.EqualValues(t, payloads[i], payload)
}
}
Loading
Loading