Skip to content

Commit

Permalink
privatize most the broker apis, follow up to the refactor
Browse files Browse the repository at this point in the history
travisjeffery committed Oct 17, 2017

Verified

This commit was signed with the committer’s verified signature.
yuzawa-san James Yuzawa
1 parent 02a1063 commit 36f488f
Showing 8 changed files with 128 additions and 134 deletions.
178 changes: 79 additions & 99 deletions broker/broker.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions broker/fsm.go
Original file line number Diff line number Diff line change
@@ -58,7 +58,7 @@ func (s *Broker) apply(c jocko.RaftCommand) {
// TODO: should panic?
return
}
if err := s.StartReplica(p); err != protocol.ErrNone {
if err := s.startReplica(p); err != protocol.ErrNone {
panic(err)
}
case deleteTopic:
@@ -68,7 +68,7 @@ func (s *Broker) apply(c jocko.RaftCommand) {
// TODO: should panic?
return
}
if err := s.deleteTopic(p); err != nil {
if err := s.deletePartitions(p); err != nil {
panic(errors.Wrap(err, "topic delete failed"))
}
}
1 change: 0 additions & 1 deletion cmd/jocko/main.go
Original file line number Diff line number Diff line change
@@ -121,7 +121,6 @@ func CmdTopic(logger *simplelog.Logger) int {
}

client := server.NewClient(conn)

resp, err := client.CreateTopic("cmd/createtopic", &protocol.CreateTopicRequest{
Topic: *topicTopic,
NumPartitions: *topicPartitions,
4 changes: 2 additions & 2 deletions commitlog/commitlog.go
Original file line number Diff line number Diff line change
@@ -164,14 +164,14 @@ func (l *CommitLog) Close() error {
return nil
}

func (l *CommitLog) DeleteAll() error {
func (l *CommitLog) Delete() error {
if err := l.Close(); err != nil {
return err
}
return os.RemoveAll(l.Path)
}

func (l *CommitLog) TruncateTo(offset int64) error {
func (l *CommitLog) Truncate(offset int64) error {
l.mu.Lock()
defer l.mu.Unlock()
var segments []*Segment
4 changes: 2 additions & 2 deletions commitlog/commitlog_test.go
Original file line number Diff line number Diff line change
@@ -70,7 +70,7 @@ func BenchmarkCommitLog(b *testing.B) {
}
}

func TestTruncateTo(t *testing.T) {
func TestTruncate(t *testing.T) {
var err error
l := setup(t)
defer cleanup(t)
@@ -82,7 +82,7 @@ func TestTruncateTo(t *testing.T) {
assert.Equal(t, int64(2), l.NewestOffset())
assert.Equal(t, 2, len(l.Segments()))

err = l.TruncateTo(int64(1))
err = l.Truncate(int64(1))
assert.NoError(t, err)
assert.Equal(t, 1, len(l.Segments()))

41 changes: 34 additions & 7 deletions examples/sarama/main.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log"
"net"
"os"
"time"

@@ -132,22 +133,48 @@ func setup() func() {
broker.Raft(raft),
)
if err != nil {
fmt.Fprintf(os.Stderr, "Error opening raft store: %s\n", err)
fmt.Fprintf(os.Stderr, "failed opening raft store: %v\n", err)
os.Exit(1)
}
server := server.New(brokerAddr, store, httpAddr, logger)
if err := server.Start(context.Background()); err != nil {
fmt.Fprintf(os.Stderr, "Error starting server: %s\n", err)
srv := server.New(brokerAddr, store, httpAddr, logger)
if err := srv.Start(context.Background()); err != nil {
fmt.Fprintf(os.Stderr, "failed starting server: %v\n", err)
os.Exit(1)
}

if _, err := store.WaitForLeader(10 * time.Second); err != nil {
panic(err)
}

// creating/deleting topic directly since Sarama doesn't support it
if err := store.CreateTopic(topic, numPartitions, 1); err != protocol.ErrNone {
panic(err)
addr, err := net.ResolveTCPAddr("tcp", brokerAddr)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to resolve addr: %v\n", err)
os.Exit(1)
}
conn, err := net.DialTCP("tcp", nil, addr)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to connect to broker: %v\n", err)
os.Exit(1)
}

client := server.NewClient(conn)
resp, err := client.CreateTopic("cmd/createtopic", &protocol.CreateTopicRequest{
Topic: topic,
NumPartitions: numPartitions,
ReplicationFactor: 1,
ReplicaAssignment: nil,
Configs: nil,
})
if err != nil {
fmt.Fprintf(os.Stderr, "failed with request to broker: %v\n", err)
os.Exit(1)
}
for _, topicErrCode := range resp.TopicErrorCodes {
if topicErrCode.ErrorCode != protocol.ErrNone.Code() {
err := protocol.Errs[topicErrCode.ErrorCode]
fmt.Fprintf(os.Stderr, "error code: %v\n", err)
os.Exit(1)
}
}

return func() {
26 changes: 7 additions & 19 deletions jocko.go
Original file line number Diff line number Diff line change
@@ -13,9 +13,9 @@ import (
// CommitLog is the interface that wraps the commit log's methods and
// is used to manage a partition's data.
type CommitLog interface {
DeleteAll() error
Delete() error
NewReader(offset int64, maxBytes int32) (io.Reader, error)
TruncateTo(int64) error
Truncate(int64) error
NewestOffset() int64
OldestOffset() int64
Append([]byte) (int64, error)
@@ -53,7 +53,7 @@ func NewPartition(topic string, id int32) *Partition {

// Delete is used to delete the partition's data/commitlog.
func (p *Partition) Delete() error {
return p.CommitLog.DeleteAll()
return p.CommitLog.Delete()
}

// NewReader is used to create a reader at the given offset and will
@@ -100,9 +100,9 @@ func (p *Partition) LowWatermark() int64 {
return p.CommitLog.OldestOffset()
}

// TruncateTo is used to truncate the partition's logs before the given offset.
func (p *Partition) TruncateTo(offset int64) error {
return p.CommitLog.TruncateTo(offset)
// Truncate is used to truncate the partition's logs before the given offset.
func (p *Partition) Truncate(offset int64) error {
return p.CommitLog.Truncate(offset)
}

// Write is used to directly write the given bytes to the partition's leader.
@@ -182,20 +182,8 @@ type Response struct {
// Broker is the interface that wraps the Broker's methods.
type Broker interface {
Run(context.Context, <-chan Request, chan<- Response)
ID() int32
IsController() bool
CreateTopic(topic string, partitions int32, replicationFactor int16) protocol.Error
StartReplica(*Partition) protocol.Error
DeleteTopic(topic string) protocol.Error
Partition(topic string, id int32) (*Partition, protocol.Error)
ClusterMember(brokerID int32) *ClusterMember
BecomeLeader(topic string, id int32, command *protocol.PartitionState) protocol.Error
BecomeFollower(topic string, id int32, command *protocol.PartitionState) protocol.Error
Join(addr ...string) protocol.Error
Cluster() []*ClusterMember
TopicPartitions(topic string) ([]*Partition, protocol.Error)
Topics() map[string][]*Partition
IsLeaderOfPartition(topic string, id int32, leaderID int32) bool
Shutdown() error
}

// ClusterMember is used as a wrapper around a broker's info and a
4 changes: 2 additions & 2 deletions testutil/mocks/commitlog.go
Original file line number Diff line number Diff line change
@@ -29,15 +29,15 @@ func (c *CommitLog) Append(b []byte) (int64, error) {
return 0, nil
}

func (c *CommitLog) DeleteAll() error {
func (c *CommitLog) Delete() error {
return nil
}

func (c *CommitLog) NewReader(offset int64, maxBytes int32) (io.Reader, error) {
return nil, nil
}

func (c *CommitLog) TruncateTo(int64) error {
func (c *CommitLog) Truncate(int64) error {
return nil
}

0 comments on commit 36f488f

Please sign in to comment.