Skip to content

Commit

Permalink
commitlog: add .TruncateTo(offset) (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Dec 6, 2016
1 parent 128fab4 commit 56a588b
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 18 deletions.
27 changes: 27 additions & 0 deletions commitlog/commitlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (
"github.com/pkg/errors"
)

var (
ErrSegmentNotFound = errors.New("segment not found")
)

type CommitLog struct {
Options
name string
Expand Down Expand Up @@ -138,6 +142,29 @@ func (l *CommitLog) DeleteAll() error {
return os.RemoveAll(l.Path)
}

func (l *CommitLog) TruncateTo(offset int64) error {
l.mu.Lock()
defer l.mu.Unlock()
var segments []*Segment
for _, segment := range l.segments {
if segment.BaseOffset < offset {
if err := segment.Delete(); err != nil {
return err
}
} else {
segments = append(segments, segment)
}
}
l.segments = segments
return nil
}

func (l *CommitLog) Segments() []*Segment {
l.mu.Lock()
defer l.mu.Unlock()
return l.segments
}

func (l *CommitLog) checkSplit() bool {
return l.activeSegment().IsFull()
}
Expand Down
46 changes: 46 additions & 0 deletions commitlog/commitlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,52 @@ func TestNewCommitLogExisting(t *testing.T) {
}
}

func TestTruncateTo(t *testing.T) {
var err error
l0 := setup(t)
defer cleanup(t)

for _, msgSet := range msgSets {
_, err = l0.Append(msgSet)
assert.NoError(t, err)
}
assert.Equal(t, int64(2), l0.NewestOffset())
assert.Equal(t, 2, len(l0.Segments()))

err = l0.TruncateTo(int64(1))
assert.NoError(t, err)
assert.Equal(t, 1, len(l0.Segments()))

maxBytes := msgSets[0].Size()
_, err = l0.NewReader(ReaderOptions{
Offset: 0,
MaxBytes: maxBytes,
})
assert.Error(t, err)

r, err := l0.NewReader(ReaderOptions{
Offset: 1,
MaxBytes: maxBytes,
})
assert.NoError(t, err)

for i, _ := range msgSets[1:] {
p := make([]byte, maxBytes)
_, err = r.Read(p)
assert.NoError(t, err)

ms := MessageSet(p)
assert.Equal(t, int64(i+1), ms.Offset())

payload := ms.Payload()
var offset int
for _, msg := range msgs {
assert.Equal(t, []byte(msg), payload[offset:offset+len(msg)])
offset += len(msg)
}
}
}

func check(t *testing.T, got, want []byte) {
if !bytes.Equal(got, want) {
t.Errorf("got = %s, want %s", string(got), string(want))
Expand Down
6 changes: 5 additions & 1 deletion commitlog/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (idx *index) ReadEntry(e *Entry, offset int64) error {
rel := &relEntry{}
err := binary.Read(b, binary.BigEndian, rel)
if err != nil {
return errors.Wrap(err, "binar read failed")
return errors.Wrap(err, "binary read failed")
}
idx.mu.RLock()
rel.fill(e, idx.baseOffset)
Expand Down Expand Up @@ -139,3 +139,7 @@ func (idx *index) Sync() error {
func (idx *index) Close() (err error) {
return idx.file.Close()
}

func (idx *index) Name() string {
return idx.file.Name()
}
32 changes: 15 additions & 17 deletions commitlog/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,26 @@ package commitlog
import (
"io"
"sync"

"github.com/pkg/errors"
)

type Reader struct {
ReaderOptions
segment *Segment
segments []*Segment
idx int
mu sync.Mutex
position int64
commitlog *CommitLog
idx int
mu sync.Mutex
position int64
}

func (r *Reader) Read(p []byte) (n int, err error) {
r.mu.Lock()
defer r.mu.Unlock()

segments := r.commitlog.Segments()
segment := segments[r.idx]

var readSize int
for {
readSize, err = r.segment.ReadAt(p[n:], r.position)
readSize, err = segment.ReadAt(p[n:], r.position)
n += readSize
r.position += int64(readSize)
if readSize != 0 && err == nil {
Expand All @@ -31,12 +31,12 @@ func (r *Reader) Read(p []byte) (n int, err error) {
if n == len(p) || err != io.EOF {
break
}
if len(r.segments) <= r.idx+1 {
if len(segments) <= r.idx+1 {
err = io.EOF
break
}
r.idx++
r.segment = r.segments[r.idx]
segment = segments[r.idx]
r.position = 0
}

Expand All @@ -50,18 +50,16 @@ type ReaderOptions struct {
}

func (l *CommitLog) NewReader(options ReaderOptions) (r *Reader, err error) {
segment, idx := findSegment(l.segments, options.Offset)
entry, _ := segment.findEntry(options.Offset)
position := entry.Position

segment, idx := findSegment(l.Segments(), options.Offset)
if segment == nil {
return nil, errors.Wrap(err, "segment not found")
return nil, ErrSegmentNotFound
}
entry, _ := segment.findEntry(options.Offset)
position := entry.Position

return &Reader{
ReaderOptions: options,
segment: segment,
segments: l.segments,
commitlog: l,
idx: idx,
position: position,
}, nil
Expand Down
15 changes: 15 additions & 0 deletions commitlog/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,18 @@ func (s *Segment) findEntry(offset int64) (e *Entry, err error) {
}
return e, nil
}

func (s *Segment) Delete() error {
if err := s.Close(); err != nil {
return err
}
s.Lock()
defer s.Unlock()
if err := os.Remove(s.log.Name()); err != nil {
return err
}
if err := os.Remove(s.Index.Name()); err != nil {
return err
}
return nil
}

0 comments on commit 56a588b

Please sign in to comment.