Skip to content

Commit

Permalink
add replicator options
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Dec 21, 2016
1 parent 5c4a38b commit 6c88901
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 33 deletions.
28 changes: 28 additions & 0 deletions broker/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,31 @@ type optionLogger struct {
func (o optionLogger) modifyBroker(b *Broker) {
b.logger = o.logger
}

type ReplicatorOption interface {
modifyReplicator(*PartitionReplicator)
}

type ReplicatorOptionReplicaID int32

func (o ReplicatorOptionReplicaID) modifyReplicator(r *PartitionReplicator) {
r.replicaID = int32(o)
}

type ReplicatorOptionFetchSize int32

func (o ReplicatorOptionFetchSize) modifyReplicator(r *PartitionReplicator) {
r.fetchSize = int32(o)
}

type ReplicatorOptionMinBytes int32

func (o ReplicatorOptionMinBytes) modifyReplicator(r *PartitionReplicator) {
r.minBytes = int32(o)
}

type ReplicatorOptionMaxWaitTime int32

func (o ReplicatorOptionMaxWaitTime) modifyReplicator(r *PartitionReplicator) {
r.maxWaitTime = int32(o)
}
60 changes: 32 additions & 28 deletions broker/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,46 @@ package broker

import (
"bytes"
"fmt"
"io"
"math/rand"

"github.com/travisjeffery/jocko/jocko"
"github.com/travisjeffery/jocko/protocol"
)

type ReplicatorOptions struct {
Partition *jocko.Partition
ClientID string
ReplicaID int32 // broker id of the follower
FetchSize int32
MinBytes int32
MaxWaitTime int32
}

type PartitionReplicator struct {
*ReplicatorOptions
highwaterMarkOffset int64
replicaID int32
partition *jocko.Partition
clientID string
minBytes int32
fetchSize int32
maxWaitTime int32
highwaterMarkOffset int64
offset int64
msgs chan []byte
done chan struct{}
}

func NewPartitionReplicator(options *ReplicatorOptions) (*PartitionReplicator, error) {
return &PartitionReplicator{
ReplicatorOptions: options,
done: make(chan struct{}, 2),
msgs: make(chan []byte, 2),
}, nil
func NewPartitionReplicator(partition *jocko.Partition, replicaID int32, opts ...ReplicatorOption) (*PartitionReplicator, error) {
r := &PartitionReplicator{
partition: partition,
replicaID: replicaID,
clientID: fmt.Sprintf("Replicator-%d", replicaID),
done: make(chan struct{}, 2),
msgs: make(chan []byte, 2),
}

for _, o := range opts {
o.modifyReplicator(r)
}

return r, nil
}

func (r *PartitionReplicator) Replicate() error {
hw := r.Partition.HighWatermark()
err := r.Partition.TruncateTo(hw)
hw := r.partition.HighWatermark()
err := r.partition.TruncateTo(hw)
if err != nil {
return err
}
Expand All @@ -57,13 +61,13 @@ func (r *PartitionReplicator) fetchMessages() {
return
default:
fetchBody := &protocol.FetchRequest{
ReplicaID: r.ReplicaID,
MaxWaitTime: r.MaxWaitTime,
MinBytes: r.MinBytes,
ReplicaID: r.replicaID,
MaxWaitTime: r.maxWaitTime,
MinBytes: r.minBytes,
Topics: []*protocol.FetchTopic{{
Topic: r.Partition.Topic,
Topic: r.partition.Topic,
Partitions: []*protocol.FetchPartition{{
Partition: r.Partition.ID,
Partition: r.partition.ID,
FetchOffset: r.offset,
}},
}},
Expand All @@ -77,21 +81,21 @@ func (r *PartitionReplicator) fetchMessages() {
if err != nil {
panic(err)
}
_, err = r.Partition.Write(b)
_, err = r.partition.Write(b)
if err != nil {
panic(err)
}
var header protocol.Response
br := bytes.NewBuffer(make([]byte, 0, 8))
if _, err = io.CopyN(br, r.Partition, 8); err != nil {
if _, err = io.CopyN(br, r.partition, 8); err != nil {
panic(err)
}
if err = protocol.Decode(br.Bytes(), &header); err != nil {
panic(err)
}
c := make([]byte, 0, header.Size-4)
cr := bytes.NewBuffer(c)
_, err = io.CopyN(cr, r.Partition, int64(header.Size-4))
_, err = io.CopyN(cr, r.partition, int64(header.Size-4))
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -120,7 +124,7 @@ func (r *PartitionReplicator) writeMessages() {
case <-r.done:
return
case msg := <-r.msgs:
_, err := r.Partition.Append(msg)
_, err := r.partition.Append(msg)
if err != nil {
panic(err)
}
Expand Down
8 changes: 3 additions & 5 deletions broker/replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,9 @@ func TestFetchMessages(t *testing.T) {
assert.NoError(t, err)
defer server.Close()

replicator, err := NewPartitionReplicator(&ReplicatorOptions{
MinBytes: 5,
MaxWaitTime: int32(time.Millisecond * 250),
Partition: tp,
})
replicator, err := NewPartitionReplicator(tp, 0,
ReplicatorOptionMinBytes(5),
ReplicatorOptionMaxWaitTime(int32(time.Millisecond*250)))
assert.NoError(t, err)
defer replicator.Close()

Expand Down

0 comments on commit 6c88901

Please sign in to comment.