diff --git a/broker/broker.go b/broker/broker.go index a598a26b..9dc612c5 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -155,12 +155,6 @@ func (b *Broker) StartReplica(partition *jocko.Partition) error { if err != nil { return err } - if err = commitLog.Init(); err != nil { - return err - } - if err = commitLog.Open(); err != nil { - return err - } partition.CommitLog = commitLog partition.Conn = b.serf.Member(partition.LeaderID()) } diff --git a/broker/replicator_test.go b/broker/replicator_test.go index c75af7b8..bb1fd62f 100644 --- a/broker/replicator_test.go +++ b/broker/replicator_test.go @@ -8,11 +8,11 @@ import ( "testing" "time" - "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/assert" "github.com/travisjeffery/jocko" "github.com/travisjeffery/jocko/protocol" "github.com/travisjeffery/jocko/server" + "github.com/travisjeffery/jocko/testutil" ) func TestBroker_Replicate(t *testing.T) { diff --git a/broker/serf_test.go b/broker/serf_test.go index 5771c5df..9b91ae3b 100644 --- a/broker/serf_test.go +++ b/broker/serf_test.go @@ -9,11 +9,11 @@ import ( "testing" "time" - "github.com/hashicorp/nomad/testutil" "github.com/hashicorp/raft" "github.com/stretchr/testify/assert" jockoraft "github.com/travisjeffery/jocko/raft" "github.com/travisjeffery/jocko/serf" + "github.com/travisjeffery/jocko/testutil" "github.com/travisjeffery/simplelog" ) diff --git a/commitlog/commitlog.go b/commitlog/commitlog.go index 14e1b838..8648014e 100644 --- a/commitlog/commitlog.go +++ b/commitlog/commitlog.go @@ -54,10 +54,18 @@ func New(opts Options) (*CommitLog, error) { cleaner: NewDeleteCleaner(opts.MaxLogBytes), } + if err := l.init(); err != nil { + return nil, err + } + + if err := l.open(); err != nil { + return nil, err + } + return l, nil } -func (l *CommitLog) Init() error { +func (l *CommitLog) init() error { err := os.MkdirAll(l.Path, 0755) if err != nil { return errors.Wrap(err, "mkdir failed") @@ -65,7 +73,7 @@ func (l *CommitLog) Init() error { return nil } -func (l *CommitLog) Open() error { +func (l *CommitLog) open() error { files, err := ioutil.ReadDir(l.Path) if err != nil { return errors.Wrap(err, "read dir failed") @@ -130,6 +138,7 @@ func (l *CommitLog) Read(p []byte) (n int, err error) { defer l.mu.Unlock() return l.activeSegment().Read(p) } + func (l *CommitLog) NewestOffset() int64 { return l.activeSegment().NextOffset } diff --git a/jocko.go b/jocko.go index 6a20af09..b746b5c9 100644 --- a/jocko.go +++ b/jocko.go @@ -12,8 +12,6 @@ import ( // CommitLog is the interface that wraps the commit log's methods and // is used to manage a partition's data. type CommitLog interface { - Init() error - Open() error DeleteAll() error NewReader(offset int64, maxBytes int32) (io.Reader, error) TruncateTo(int64) error diff --git a/commitlog/commitlog_test.go b/tests/commitlog/commitlog_test.go similarity index 83% rename from commitlog/commitlog_test.go rename to tests/commitlog/commitlog_test.go index 1d10f965..7947f821 100644 --- a/commitlog/commitlog_test.go +++ b/tests/commitlog/commitlog_test.go @@ -1,4 +1,4 @@ -package commitlog +package tests import ( "bytes" @@ -9,18 +9,19 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/travisjeffery/jocko/commitlog" ) var ( - msgs = []Message{ - NewMessage([]byte("one")), - NewMessage([]byte("two")), - NewMessage([]byte("three")), - NewMessage([]byte("four")), + msgs = []commitlog.Message{ + commitlog.NewMessage([]byte("one")), + commitlog.NewMessage([]byte("two")), + commitlog.NewMessage([]byte("three")), + commitlog.NewMessage([]byte("four")), } - msgSets = []MessageSet{ - NewMessageSet(0, msgs...), - NewMessageSet(1, msgs...), + msgSets = []commitlog.MessageSet{ + commitlog.NewMessageSet(0, msgs...), + commitlog.NewMessageSet(1, msgs...), } maxBytes = msgSets[0].Size() path = filepath.Join(os.TempDir(), fmt.Sprintf("commitlogtest%d", rand.Int63())) @@ -44,7 +45,7 @@ func TestNewCommitLog(t *testing.T) { _, err = r.Read(p) assert.NoError(t, err) - ms := MessageSet(p) + ms := commitlog.MessageSet(p) assert.Equal(t, int64(i), ms.Offset()) payload := ms.Payload() @@ -97,7 +98,7 @@ func TestTruncateTo(t *testing.T) { _, err = r.Read(p) assert.NoError(t, err) - ms := MessageSet(p) + ms := commitlog.MessageSet(p) assert.Equal(t, int64(i+1), ms.Offset()) payload := ms.Payload() @@ -137,18 +138,15 @@ func check(t assert.TestingT, got, want []byte) { } } -func setup(t assert.TestingT) *CommitLog { - opts := Options{ +func setup(t assert.TestingT) *commitlog.CommitLog { + opts := commitlog.Options{ Path: path, MaxSegmentBytes: 6, MaxLogBytes: 30, } - l, err := New(opts) + l, err := commitlog.New(opts) assert.NoError(t, err) - assert.NoError(t, l.Init()) - assert.NoError(t, l.Open()) - return l } diff --git a/commitlog/message_set_test.go b/tests/commitlog/message_set_test.go similarity index 58% rename from commitlog/message_set_test.go rename to tests/commitlog/message_set_test.go index 10aba1ee..7762ca19 100644 --- a/commitlog/message_set_test.go +++ b/tests/commitlog/message_set_test.go @@ -1,19 +1,20 @@ -package commitlog +package tests import ( "testing" "github.com/bmizerany/assert" + "github.com/travisjeffery/jocko/commitlog" ) func TestMessageSet(t *testing.T) { - msg0 := NewMessage([]byte("hello")) - msg1 := NewMessage([]byte("world")) - msgs := []Message{ + msg0 := commitlog.NewMessage([]byte("hello")) + msg1 := commitlog.NewMessage([]byte("world")) + msgs := []commitlog.Message{ msg0, msg1, } - ms := NewMessageSet(3, msgs...) + ms := commitlog.NewMessageSet(3, msgs...) assert.Equal(t, int64(3), ms.Offset()) payload := ms.Payload() diff --git a/server/server_test.go b/tests/server/server_test.go similarity index 98% rename from server/server_test.go rename to tests/server/server_test.go index f33cc20d..956877c4 100644 --- a/server/server_test.go +++ b/tests/server/server_test.go @@ -1,4 +1,4 @@ -package server_test +package tests import ( "bytes" @@ -27,6 +27,7 @@ const ( func TestBroker(t *testing.T) { _, teardown := setup(t) + defer teardown() t.Run("Sarama", func(t *testing.T) { config := sarama.NewConfig() @@ -66,12 +67,11 @@ func TestBroker(t *testing.T) { assert.NoError(t, err) } }) - - teardown() } func BenchmarkBroker(b *testing.B) { _, teardown := setup(b) + defer teardown() config := sarama.NewConfig() config.Version = sarama.V0_10_0_0 @@ -119,8 +119,6 @@ func BenchmarkBroker(b *testing.B) { } } }) - - teardown() } func setup(t assert.TestingT) (*net.TCPConn, func()) { @@ -159,7 +157,6 @@ func setup(t assert.TestingT) (*net.TCPConn, func()) { assert.NoError(t, err) proxy := server.NewProxy(conn) - _, err = proxy.CreateTopic("testclient", &protocol.CreateTopicRequest{ Topic: topic, NumPartitions: int32(1),