Skip to content

Commit

Permalink
expose replicator, rewrite replicator test w/ mocks
Browse files Browse the repository at this point in the history
  • Loading branch information
shwet authored and travisjeffery committed Mar 9, 2017
1 parent 55bd7bb commit 0ceb188
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 123 deletions.
12 changes: 6 additions & 6 deletions broker/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,34 +37,34 @@ func Raft(raft jocko.Raft) BrokerFn {
}
}

type ReplicatorFn func(r *replicator)
type ReplicatorFn func(r *Replicator)

func ReplicatorReplicaID(id int32) ReplicatorFn {
return func(r *replicator) {
return func(r *Replicator) {
r.replicaID = id
}
}

func ReplicatorFetchSize(size int32) ReplicatorFn {
return func(r *replicator) {
return func(r *Replicator) {
r.fetchSize = size
}
}

func ReplicatorMinBytes(size int32) ReplicatorFn {
return func(r *replicator) {
return func(r *Replicator) {
r.minBytes = size
}
}

func ReplicatorMaxWaitTime(time int32) ReplicatorFn {
return func(r *replicator) {
return func(r *Replicator) {
r.maxWaitTime = time
}
}

func ReplicatorProxy(proxy jocko.Proxy) ReplicatorFn {
return func(r *replicator) {
return func(r *Replicator) {
r.proxy = proxy
}
}
11 changes: 5 additions & 6 deletions broker/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (

type replicationManager struct {
jocko.Broker
replicators map[*jocko.Partition]*replicator
replicators map[*jocko.Partition]*Replicator
}

func newReplicationManager() *replicationManager {
return &replicationManager{
replicators: make(map[*jocko.Partition]*replicator),
replicators: make(map[*jocko.Partition]*Replicator),
}
}

Expand All @@ -24,7 +24,7 @@ func (rm *replicationManager) BecomeFollower(topic string, pid int32, command *p
}
// stop replicator to current leader
if r, ok := rm.replicators[p]; ok {
if err := r.close(); err != nil {
if err := r.Close(); err != nil {
return err
}
}
Expand All @@ -35,9 +35,8 @@ func (rm *replicationManager) BecomeFollower(topic string, pid int32, command *p
}
p.Leader = command.Leader
p.Conn = rm.ClusterMember(p.LeaderID())
r := newReplicator(p, rm.ID(),
r := NewReplicator(p, rm.ID(),
ReplicatorProxy(server.NewProxy(p.Conn)))
r.replicate()
rm.replicators[p] = r
return nil
}
Expand All @@ -48,7 +47,7 @@ func (rm *replicationManager) BecomeLeader(topic string, pid int32, command *pro
return err
}
if r, ok := rm.replicators[p]; ok {
if err := r.close(); err != nil {
if err := r.Close(); err != nil {
return err
}
}
Expand Down
24 changes: 11 additions & 13 deletions broker/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"github.com/travisjeffery/jocko/protocol"
)

// replicator fetches from the partition's leader and produces to a follower
// Replicator fetches from the partition's leader and produces to a follower
// thereby replicating the partition
type replicator struct {
type Replicator struct {
replicaID int32
partition *jocko.Partition
clientID string
Expand All @@ -23,9 +23,9 @@ type replicator struct {
proxy jocko.Proxy
}

// newReplicator returns a new replicator object
func newReplicator(partition *jocko.Partition, replicaID int32, opts ...ReplicatorFn) *replicator {
r := &replicator{
// NewReplicator returns a new replicator object
func NewReplicator(partition *jocko.Partition, replicaID int32, opts ...ReplicatorFn) *Replicator {
r := &Replicator{
partition: partition,
replicaID: replicaID,
clientID: fmt.Sprintf("Replicator-%d", replicaID),
Expand All @@ -35,16 +35,14 @@ func newReplicator(partition *jocko.Partition, replicaID int32, opts ...Replicat
for _, o := range opts {
o(r)
}
return r
}

// replicate starts replication process of fetching and writing messages
func (r *replicator) replicate() {
go r.fetchMessages()
go r.writeMessages()

return r
}

func (r *replicator) fetchMessages() {
func (r *Replicator) fetchMessages() {
for {
select {
case <-r.done:
Expand Down Expand Up @@ -80,7 +78,7 @@ func (r *replicator) fetchMessages() {
}
}

func (r *replicator) writeMessages() {
func (r *Replicator) writeMessages() {
for {
select {
case <-r.done:
Expand All @@ -94,8 +92,8 @@ func (r *replicator) writeMessages() {
}
}

// close the replicator object when we are no longer following
func (r *replicator) close() error {
// Close the replicator object when we are no longer following
func (r *Replicator) Close() error {
close(r.done)
return nil
}
98 changes: 0 additions & 98 deletions broker/replicator_test.go

This file was deleted.

44 changes: 44 additions & 0 deletions tests/broker/replicator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package tests

import (
"testing"
"time"

"github.com/bmizerany/assert"
"github.com/hashicorp/nomad/testutil"
"github.com/travisjeffery/jocko"
"github.com/travisjeffery/jocko/broker"
"github.com/travisjeffery/jocko/testutil/mocks"
)

func TestBroker_Replicate(t *testing.T) {
mockCommitLog := mocks.NewCommitLog()
mockProxy := mocks.NewProxyForFetchMessages(4)

p := &jocko.Partition{
Topic: "test",
ID: 0,
Leader: 0,
PreferredLeader: 0,
Replicas: []int32{0},
CommitLog: mockCommitLog,
}

replicator := broker.NewReplicator(p, 0,
broker.ReplicatorMinBytes(5),
broker.ReplicatorMaxWaitTime(int32(250*time.Millisecond)),
broker.ReplicatorProxy(mockProxy))

defer replicator.Close()

testutil.WaitForResult(func() (bool, error) {
commitLog := mockCommitLog.Log()
if len(commitLog) < 4 {
return false, nil
}
assert.Equal(t, commitLog, mockProxy.MockedMessages(), "unmatched replicated messages")
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
50 changes: 50 additions & 0 deletions testutil/mocks/mock_commitlog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package mocks

import (
"io"
"sync"
)

type MockCommitLog struct {
mu sync.RWMutex
log [][]byte
}

func NewCommitLog() *MockCommitLog {
return &MockCommitLog{}
}

func (c *MockCommitLog) Log() [][]byte {
log := [][]byte{}
c.mu.RLock()
log = append(log, c.log...)
c.mu.RUnlock()
return log
}

func (c *MockCommitLog) Append(b []byte) (int64, error) {
c.mu.Lock()
c.log = append(c.log, b)
c.mu.Unlock()
return 0, nil
}

func (c *MockCommitLog) DeleteAll() error {
return nil
}

func (c *MockCommitLog) NewReader(offset int64, maxBytes int32) (io.Reader, error) {
return nil, nil
}

func (c *MockCommitLog) TruncateTo(int64) error {
return nil
}

func (c *MockCommitLog) NewestOffset() int64 {
return 0
}

func (c *MockCommitLog) OldestOffset() int64 {
return 0
}
Loading

0 comments on commit 0ceb188

Please sign in to comment.