Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IncrementOffset API for z.buffers #206

Merged
merged 4 commits into from
Oct 22, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 28 additions & 17 deletions z/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math"
"os"
"sort"
"sync/atomic"

"github.com/pkg/errors"
)
Expand All @@ -40,8 +41,8 @@ const padding = 8
//
// MaxSize can be set to limit the memory usage.
type Buffer struct {
offset uint64
buf []byte
offset int
curSz int
maxSz int
fd *os.File
Expand Down Expand Up @@ -94,7 +95,7 @@ func (b *Buffer) doMmap() error {
return errors.Wrapf(err, "while mmapping %s with size: %d", fd.Name(), b.maxSz)
}
if len(curBuf) > 0 {
assert(b.offset == copy(buf, curBuf[:b.offset]))
assert(int(b.offset) == copy(buf, curBuf[:b.offset]))
Free(curBuf)
}
b.buf = buf
Expand Down Expand Up @@ -137,24 +138,25 @@ func NewBufferWith(sz, maxSz int, bufType BufferType) (*Buffer, error) {
}

func (b *Buffer) IsEmpty() bool {
return b.offset == b.StartOffset()
return int(b.offset) == b.StartOffset()
}

// LenWithPadding would return the number of bytes written to the buffer so far
// plus the padding at the start of the buffer.
func (b *Buffer) LenWithPadding() int {
return b.offset
return int(atomic.LoadUint64(&b.offset))
}

// LenNoPadding would return the number of bytes written to the buffer so far
// (without the padding).
func (b *Buffer) LenNoPadding() int {
return b.offset - padding
return int(atomic.LoadUint64(&b.offset) - padding)
}

// Bytes would return all the written bytes as a slice.
func (b *Buffer) Bytes() []byte {
return b.buf[padding:b.offset]
off := atomic.LoadUint64(&b.offset)
return b.buf[padding:off]
}

func (b *Buffer) AutoMmapAfter(size int) {
Expand All @@ -170,11 +172,11 @@ func (b *Buffer) Grow(n int) {
if b.buf == nil {
panic("z.Buffer needs to be initialized before using")
}
if b.maxSz-b.offset < n {
if b.maxSz-int(b.offset) < n {
panic(fmt.Sprintf("Buffer max size exceeded: %d."+
" Offset: %d. Grow: %d", b.maxSz, b.offset, n))
}
if b.curSz-b.offset > n {
if b.curSz-int(b.offset) > n {
return
}

Expand Down Expand Up @@ -214,16 +216,25 @@ func (b *Buffer) Grow(n int) {
func (b *Buffer) Allocate(n int) []byte {
b.Grow(n)
off := b.offset
b.offset += n
return b.buf[off:b.offset]
b.offset += uint64(n)
return b.buf[off:int(b.offset)]
}

// AllocateOffset works the same way as allocate, but instead of returning a byte slice, it returns
// the offset of the allocation.
func (b *Buffer) AllocateOffset(n int) int {
b.Grow(n)
b.offset += n
return b.offset - n
b.offset += uint64(n)
return int(b.offset) - n
}

// IncrementOffset returns the incremented offset. This operation is thread-safe.
// Note: Only this API is thread-safe, the other APIs should not be used concurrently.
func (b *Buffer) IncrementOffset(n int) int {
if int(atomic.LoadUint64(&b.offset))+n > b.curSz {
panic("Buffer size limit hit")
}
return int(atomic.AddUint64(&b.offset, uint64(n)))
}

func (b *Buffer) writeLen(sz int) {
Expand Down Expand Up @@ -368,7 +379,7 @@ func (s *sortHelper) sort(lo, hi int) []byte {

// SortSlice is like SortSliceBetween but sorting over the entire buffer.
func (b *Buffer) SortSlice(less func(left, right []byte) bool) {
b.SortSliceBetween(b.StartOffset(), b.offset, less)
b.SortSliceBetween(b.StartOffset(), int(b.offset), less)
}
func (b *Buffer) SortSliceBetween(start, end int, less LessFunc) {
if start >= end {
Expand Down Expand Up @@ -417,15 +428,15 @@ func rawSlice(buf []byte) []byte {

// Slice would return the slice written at offset.
func (b *Buffer) Slice(offset int) ([]byte, int) {
if offset >= b.offset {
if offset >= int(b.offset) {
return nil, 0
}

sz := binary.BigEndian.Uint32(b.buf[offset:])
start := offset + 4
next := start + int(sz)
res := b.buf[start:next]
if next >= b.offset {
if next >= int(b.offset) {
next = 0
}
return res, next
Expand Down Expand Up @@ -453,13 +464,13 @@ func (b *Buffer) Data(offset int) []byte {
func (b *Buffer) Write(p []byte) (n int, err error) {
b.Grow(len(p))
n = copy(b.buf[b.offset:], p)
b.offset += n
b.offset += uint64(n)
return n, nil
}

// Reset would reset the buffer to be reused.
func (b *Buffer) Reset() {
b.offset = b.StartOffset()
b.offset = uint64(b.StartOffset())
}

// Release would free up the memory allocated by the buffer. Once the usage of buffer is done, it is
Expand Down