Skip to content

Commit

Permalink
add WriteBatch (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
akiozihao authored Sep 14, 2023
1 parent b89ea3e commit aa0af20
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 86 deletions.
21 changes: 19 additions & 2 deletions benchmark/bench_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package benchmark

import (
"github.com/rosedblabs/wal"
"github.com/stretchr/testify/assert"
"math/rand"
"os"
"strings"
"testing"

"github.com/rosedblabs/wal"
"github.com/stretchr/testify/assert"
)

var walFile *wal.WAL
Expand Down Expand Up @@ -45,6 +46,22 @@ func BenchmarkWAL_Write(b *testing.B) {
}
}

func BenchmarkWAL_WriteBatch(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for j := 0; j < 31; j++ {
err := walFile.PendingWrites([]byte(strings.Repeat("X", wal.MB)))
assert.Nil(b, err)
}
err := walFile.PendingWrites([]byte(strings.Repeat("X", wal.MB)))
assert.Equal(b, wal.ErrPendingSizeTooLarge, err)
pos, err := walFile.WriteAll()
assert.Nil(b, err)
assert.Equal(b, 0, len(pos))
}
}

func BenchmarkWAL_Read(b *testing.B) {
var positions []*wal.ChunkPosition
for i := 0; i < 1000000; i++ {
Expand Down
185 changes: 119 additions & 66 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,100 +166,162 @@ func (seg *segment) Size() int64 {
return size + int64(seg.currentBlockSize)
}

// Write writes the data to the segment file.
// writeToBuffer calculate chunkPosition for data, write data to bytebufferpool, update segment status
// The data will be written in chunks, and the chunk has four types:
// ChunkTypeFull, ChunkTypeFirst, ChunkTypeMiddle, ChunkTypeLast.
//
// Each chunk has a header, and the header contains the length, type and checksum.
// And the payload of the chunk is the real data you want to write.
func (seg *segment) Write(data []byte) (*ChunkPosition, error) {
// And the payload of the chunk is the real data you want to Write.
func (seg *segment) writeToBuffer(data []byte, chunkBuffer *bytebufferpool.ByteBuffer) (*ChunkPosition, error) {
startLen := chunkBuffer.Len()
padding := uint32(0)

if seg.closed {
return nil, ErrClosed
}

// The left block space is not enough for a chunk header
if seg.currentBlockSize+chunkHeaderSize >= blockSize {
// padding if necessary
if seg.currentBlockSize < blockSize {
padding := make([]byte, blockSize-seg.currentBlockSize)
if _, err := seg.fd.Write(padding); err != nil {
return nil, err
}
chunkBuffer.Write(make([]byte, blockSize-seg.currentBlockSize))
padding += blockSize - seg.currentBlockSize
seg.currentBlockNumber += 1
seg.currentBlockSize = 0
}

// A new block, clear the current block size.
seg.currentBlockNumber += 1
seg.currentBlockSize = 0
}

// the start position(for read operation)
position := &ChunkPosition{
SegmentId: seg.id,
BlockNumber: seg.currentBlockNumber,
ChunkOffset: int64(seg.currentBlockSize),
}

dataSize := uint32(len(data))
// The entire chunk can fit into the block.
if seg.currentBlockSize+dataSize+chunkHeaderSize <= blockSize {
seg.appendChunkBuffer(chunkBuffer, data, ChunkTypeFull)
position.ChunkSize = dataSize + chunkHeaderSize
} else {
// If the size of the data exceeds the size of the block,
// the data should be written to the block in batches.
var (
leftSize = dataSize
blockCount uint32 = 0
currBlockSize = seg.currentBlockSize
)

for leftSize > 0 {
chunkSize := blockSize - currBlockSize - chunkHeaderSize
if chunkSize > leftSize {
chunkSize = leftSize
}

var end = dataSize - leftSize + chunkSize
if end > dataSize {
end = dataSize
}

// append the chunks to the buffer
var chunkType ChunkType
switch leftSize {
case dataSize: // First chunk
chunkType = ChunkTypeFirst
case chunkSize: // Last chunk
chunkType = ChunkTypeLast
default: // Middle chunk
chunkType = ChunkTypeMiddle
}
seg.appendChunkBuffer(chunkBuffer, data[dataSize-leftSize:end], chunkType)

leftSize -= chunkSize
blockCount += 1
currBlockSize = (currBlockSize + chunkSize + chunkHeaderSize) % blockSize
}

position.ChunkSize = blockCount*chunkHeaderSize + dataSize
}
endLen := chunkBuffer.Len()
if position.ChunkSize+padding != uint32(endLen-startLen) {
panic(fmt.Sprintf("chunk size %d, len %d", position.ChunkSize, endLen-startLen-int(padding)))
}

// update segment status
seg.currentBlockSize += position.ChunkSize
if seg.currentBlockSize >= blockSize {
seg.currentBlockNumber += seg.currentBlockSize / blockSize
seg.currentBlockSize = seg.currentBlockSize % blockSize
}

return position, nil
}

// writeAll write batch data to the segment file.
func (seg *segment) writeAll(data [][]byte) (positions []*ChunkPosition, err error) {
if seg.closed {
return nil, ErrClosed
}

originBlockNumber := seg.currentBlockNumber
originBlockSize := seg.currentBlockSize

// init chunk buffer
chunkBuffer := bytebufferpool.Get()
chunkBuffer.Reset()
defer func() {
if err != nil {
seg.currentBlockNumber = originBlockNumber
seg.currentBlockSize = originBlockSize
}
chunkBuffer.Reset()
bytebufferpool.Put(chunkBuffer)
}()

// The entire chunk can fit into the block.
if seg.currentBlockSize+dataSize+chunkHeaderSize <= blockSize {
seg.appendChunkBuffer(chunkBuffer, data, ChunkTypeFull)
if err := seg.writeChunkBuffer(chunkBuffer); err != nil {
return nil, err
var pos *ChunkPosition
positions = make([]*ChunkPosition, len(data))
for i := 0; i < len(positions); i++ {
pos, err = seg.writeToBuffer(data[i], chunkBuffer)
if err != nil {
return
}

position.ChunkSize = dataSize + chunkHeaderSize
return position, nil
positions[i] = pos
}
if err = seg.writeChunkBuffer(chunkBuffer); err != nil {
return
}
return
}

// If the size of the data exceeds the size of the block,
// the data should be written to the block in batches.
var (
leftSize = dataSize
blockCount uint32 = 0
currBlockSize = seg.currentBlockSize
)

for leftSize > 0 {
chunkSize := blockSize - currBlockSize - chunkHeaderSize
if chunkSize > leftSize {
chunkSize = leftSize
}
// Write writes the data to the segment file.
func (seg *segment) Write(data []byte) (pos *ChunkPosition, err error) {
if seg.closed {
return nil, ErrClosed
}

var end = dataSize - leftSize + chunkSize
if end > dataSize {
end = dataSize
}
originBlockNumber := seg.currentBlockNumber
originBlockSize := seg.currentBlockSize

// append the chunks to the buffer
var chunkType ChunkType
switch leftSize {
case dataSize: // First chunk
chunkType = ChunkTypeFirst
case chunkSize: // Last chunk
chunkType = ChunkTypeLast
default: // Middle chunk
chunkType = ChunkTypeMiddle
// init chunk buffer
chunkBuffer := bytebufferpool.Get()
chunkBuffer.Reset()
defer func() {
if err != nil {
seg.currentBlockNumber = originBlockNumber
seg.currentBlockSize = originBlockSize
}
seg.appendChunkBuffer(chunkBuffer, data[dataSize-leftSize:end], chunkType)
chunkBuffer.Reset()
bytebufferpool.Put(chunkBuffer)
}()

leftSize -= chunkSize
blockCount += 1
currBlockSize = (currBlockSize + chunkSize + chunkHeaderSize) % blockSize
pos, err = seg.writeToBuffer(data, chunkBuffer)
if err != nil {
return
}

if err := seg.writeChunkBuffer(chunkBuffer); err != nil {
return nil, err
if err = seg.writeChunkBuffer(chunkBuffer); err != nil {
return
}

position.ChunkSize = blockCount*chunkHeaderSize + dataSize
return position, nil
return
}

func (seg *segment) appendChunkBuffer(buf *bytebufferpool.ByteBuffer, data []byte, chunkType ChunkType) {
Expand Down Expand Up @@ -287,15 +349,6 @@ func (seg *segment) writeChunkBuffer(buf *bytebufferpool.ByteBuffer) error {
return err
}

// update the corresponding fields
seg.currentBlockSize += uint32(buf.Len())

// calculate the new offsets
if seg.currentBlockSize >= blockSize {
seg.currentBlockNumber += seg.currentBlockSize / blockSize
seg.currentBlockSize = seg.currentBlockSize % blockSize
}

return nil
}

Expand Down Expand Up @@ -380,7 +433,7 @@ func (seg *segment) readInternal(blockNumber uint32, chunkOffset int64) ([]byte,
if chunkType == ChunkTypeFull || chunkType == ChunkTypeLast {
nextChunk.BlockNumber = blockNumber
nextChunk.ChunkOffset = checksumEnd
// If this is the last chunk in the block, and the left block
// If this is the last chunk interhe block, and the left block
// space are paddings, the next chunk should be in the next block.
if checksumEnd+chunkHeaderSize >= blockSize {
nextChunk.BlockNumber += 1
Expand All @@ -398,7 +451,7 @@ func (seg *segment) getCacheKey(blockNumber uint32) uint64 {
return uint64(seg.id)<<32 | uint64(blockNumber)
}

// Next returns the next chunk data.
// Next returns the Next chunk data.
// You can call it repeatedly until io.EOF is returned.
func (segReader *segmentReader) Next() ([]byte, *ChunkPosition, error) {
// The segment file is closed
Expand Down
Loading

0 comments on commit aa0af20

Please sign in to comment.