Skip to content

Commit

Permalink
reduce commitlog interface
Browse files Browse the repository at this point in the history
  • Loading branch information
shwet authored and travisjeffery committed Mar 9, 2017
1 parent f80578b commit 55bd7bb
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 40 deletions.
6 changes: 0 additions & 6 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,6 @@ func (b *Broker) StartReplica(partition *jocko.Partition) error {
if err != nil {
return err
}
if err = commitLog.Init(); err != nil {
return err
}
if err = commitLog.Open(); err != nil {
return err
}
partition.CommitLog = commitLog
partition.Conn = b.serf.Member(partition.LeaderID())
}
Expand Down
2 changes: 1 addition & 1 deletion broker/replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"testing"
"time"

"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/travisjeffery/jocko"
"github.com/travisjeffery/jocko/protocol"
"github.com/travisjeffery/jocko/server"
"github.com/travisjeffery/jocko/testutil"
)

func TestBroker_Replicate(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion broker/serf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
"testing"
"time"

"github.com/hashicorp/nomad/testutil"
"github.com/hashicorp/raft"
"github.com/stretchr/testify/assert"
jockoraft "github.com/travisjeffery/jocko/raft"
"github.com/travisjeffery/jocko/serf"
"github.com/travisjeffery/jocko/testutil"
"github.com/travisjeffery/simplelog"
)

Expand Down
13 changes: 11 additions & 2 deletions commitlog/commitlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,26 @@ func New(opts Options) (*CommitLog, error) {
cleaner: NewDeleteCleaner(opts.MaxLogBytes),
}

if err := l.init(); err != nil {
return nil, err
}

if err := l.open(); err != nil {
return nil, err
}

return l, nil
}

func (l *CommitLog) Init() error {
func (l *CommitLog) init() error {
err := os.MkdirAll(l.Path, 0755)
if err != nil {
return errors.Wrap(err, "mkdir failed")
}
return nil
}

func (l *CommitLog) Open() error {
func (l *CommitLog) open() error {
files, err := ioutil.ReadDir(l.Path)
if err != nil {
return errors.Wrap(err, "read dir failed")
Expand Down Expand Up @@ -130,6 +138,7 @@ func (l *CommitLog) Read(p []byte) (n int, err error) {
defer l.mu.Unlock()
return l.activeSegment().Read(p)
}

func (l *CommitLog) NewestOffset() int64 {
return l.activeSegment().NextOffset
}
Expand Down
2 changes: 0 additions & 2 deletions jocko.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
// CommitLog is the interface that wraps the commit log's methods and
// is used to manage a partition's data.
type CommitLog interface {
Init() error
Open() error
DeleteAll() error
NewReader(offset int64, maxBytes int32) (io.Reader, error)
TruncateTo(int64) error
Expand Down
32 changes: 15 additions & 17 deletions commitlog/commitlog_test.go → tests/commitlog/commitlog_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package commitlog
package tests

import (
"bytes"
Expand All @@ -9,18 +9,19 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/travisjeffery/jocko/commitlog"
)

var (
msgs = []Message{
NewMessage([]byte("one")),
NewMessage([]byte("two")),
NewMessage([]byte("three")),
NewMessage([]byte("four")),
msgs = []commitlog.Message{
commitlog.NewMessage([]byte("one")),
commitlog.NewMessage([]byte("two")),
commitlog.NewMessage([]byte("three")),
commitlog.NewMessage([]byte("four")),
}
msgSets = []MessageSet{
NewMessageSet(0, msgs...),
NewMessageSet(1, msgs...),
msgSets = []commitlog.MessageSet{
commitlog.NewMessageSet(0, msgs...),
commitlog.NewMessageSet(1, msgs...),
}
maxBytes = msgSets[0].Size()
path = filepath.Join(os.TempDir(), fmt.Sprintf("commitlogtest%d", rand.Int63()))
Expand All @@ -44,7 +45,7 @@ func TestNewCommitLog(t *testing.T) {
_, err = r.Read(p)
assert.NoError(t, err)

ms := MessageSet(p)
ms := commitlog.MessageSet(p)
assert.Equal(t, int64(i), ms.Offset())

payload := ms.Payload()
Expand Down Expand Up @@ -97,7 +98,7 @@ func TestTruncateTo(t *testing.T) {
_, err = r.Read(p)
assert.NoError(t, err)

ms := MessageSet(p)
ms := commitlog.MessageSet(p)
assert.Equal(t, int64(i+1), ms.Offset())

payload := ms.Payload()
Expand Down Expand Up @@ -137,18 +138,15 @@ func check(t assert.TestingT, got, want []byte) {
}
}

func setup(t assert.TestingT) *CommitLog {
opts := Options{
func setup(t assert.TestingT) *commitlog.CommitLog {
opts := commitlog.Options{
Path: path,
MaxSegmentBytes: 6,
MaxLogBytes: 30,
}
l, err := New(opts)
l, err := commitlog.New(opts)
assert.NoError(t, err)

assert.NoError(t, l.Init())
assert.NoError(t, l.Open())

return l
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package commitlog
package tests

import (
"testing"

"github.com/bmizerany/assert"
"github.com/travisjeffery/jocko/commitlog"
)

func TestMessageSet(t *testing.T) {
msg0 := NewMessage([]byte("hello"))
msg1 := NewMessage([]byte("world"))
msgs := []Message{
msg0 := commitlog.NewMessage([]byte("hello"))
msg1 := commitlog.NewMessage([]byte("world"))
msgs := []commitlog.Message{
msg0,
msg1,
}
ms := NewMessageSet(3, msgs...)
ms := commitlog.NewMessageSet(3, msgs...)
assert.Equal(t, int64(3), ms.Offset())

payload := ms.Payload()
Expand Down
9 changes: 3 additions & 6 deletions server/server_test.go → tests/server/server_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package server_test
package tests

import (
"bytes"
Expand Down Expand Up @@ -27,6 +27,7 @@ const (

func TestBroker(t *testing.T) {
_, teardown := setup(t)
defer teardown()

t.Run("Sarama", func(t *testing.T) {
config := sarama.NewConfig()
Expand Down Expand Up @@ -66,12 +67,11 @@ func TestBroker(t *testing.T) {
assert.NoError(t, err)
}
})

teardown()
}

func BenchmarkBroker(b *testing.B) {
_, teardown := setup(b)
defer teardown()

config := sarama.NewConfig()
config.Version = sarama.V0_10_0_0
Expand Down Expand Up @@ -119,8 +119,6 @@ func BenchmarkBroker(b *testing.B) {
}
}
})

teardown()
}

func setup(t assert.TestingT) (*net.TCPConn, func()) {
Expand Down Expand Up @@ -159,7 +157,6 @@ func setup(t assert.TestingT) (*net.TCPConn, func()) {
assert.NoError(t, err)

proxy := server.NewProxy(conn)

_, err = proxy.CreateTopic("testclient", &protocol.CreateTopicRequest{
Topic: topic,
NumPartitions: int32(1),
Expand Down

0 comments on commit 55bd7bb

Please sign in to comment.