Skip to content

Commit

Permalink
use protocol.error
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Sep 23, 2017
1 parent 1ad1648 commit 5b076d5
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 80 deletions.
24 changes: 12 additions & 12 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ func (b *Broker) IsController() bool {
}

// TopicPartitions is used to get the partitions for the given topic.
func (b *Broker) TopicPartitions(topic string) (found []*jocko.Partition, err *jocko.Error) {
func (b *Broker) TopicPartitions(topic string) (found []*jocko.Partition, err protocol.Error) {
b.mu.RLock()
defer b.mu.RUnlock()
if p, ok := b.topics[topic]; !ok {
return nil, &jocko.Error{ErrorCode: protocol.ErrUnknownTopicOrPartition}
return nil, protocol.ErrUnknownTopicOrPartition
} else {
return p, nil
return p, protocol.ErrNone
}
}

Expand All @@ -112,17 +112,17 @@ func (b *Broker) Topics() map[string][]*jocko.Partition {
return b.topics
}

func (b *Broker) Partition(topic string, partition int32) (*jocko.Partition, error) {
func (b *Broker) Partition(topic string, partition int32) (*jocko.Partition, protocol.Error) {
found, err := b.TopicPartitions(topic)
if err != nil {
if err != protocol.ErrNone {
return nil, err
}
for _, f := range found {
if f.ID == partition {
return f, nil
return f, protocol.ErrNone
}
}
return nil, errors.New("partition not found")
return nil, protocol.ErrUnknownTopicOrPartition
}

// AddPartition is used to add a partition across the cluster.
Expand Down Expand Up @@ -187,10 +187,10 @@ func (b *Broker) Join(addrs ...string) (int, error) {
}

// CreateTopic is used to create the topic across the cluster.
func (b *Broker) CreateTopic(topic string, partitions int32, replicationFactor int16) error {
func (b *Broker) CreateTopic(topic string, partitions int32, replicationFactor int16) protocol.Error {
for t, _ := range b.Topics() {
if t == topic {
return ErrTopicExists
return protocol.ErrTopicAlreadyExists
}
}

Expand All @@ -217,10 +217,10 @@ func (b *Broker) CreateTopic(topic string, partitions int32, replicationFactor i
ISR: replicas,
}
if err := b.AddPartition(partition); err != nil {
return err
return protocol.ErrUnknown
}
}
return nil
return protocol.ErrNone
}

// DeleteTopic is used to delete the topic across the cluster.
Expand All @@ -231,7 +231,7 @@ func (b *Broker) DeleteTopic(topic string) error {
// deleteTopic is used to delete the topic from this broker.
func (b *Broker) deleteTopic(tp *jocko.Partition) error {
partitions, err := b.TopicPartitions(tp.Topic)
if err != nil {
if err != protocol.ErrNone {
return err
}
for _, p := range partitions {
Expand Down
2 changes: 1 addition & 1 deletion examples/sarama/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func setup() func() {
}

// creating/deleting topic directly since Sarama doesn't support it
ir err := store.CreateTopic(topic, numPartitions, 1); err != protocol.ErrNone {
if err := store.CreateTopic(topic, numPartitions, 1); err != protocol.ErrNone {
panic(err)
}

Expand Down
11 changes: 3 additions & 8 deletions jocko.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ import (
"github.com/travisjeffery/jocko/protocol"
)

type Error struct {
error
ErrorCode int16
}

// CommitLog is the interface that wraps the commit log's methods and
// is used to manage a partition's data.
type CommitLog interface {
Expand Down Expand Up @@ -175,16 +170,16 @@ type Raft interface {
type Broker interface {
ID() int32
IsController() bool
CreateTopic(topic string, partitions int32, replicationFactor int16) error
CreateTopic(topic string, partitions int32, replicationFactor int16) protocol.Error
StartReplica(*Partition) error
DeleteTopic(topic string) error
Partition(topic string, id int32) (*Partition, error)
Partition(topic string, id int32) (*Partition, protocol.Error)
ClusterMember(brokerID int32) *ClusterMember
BecomeLeader(topic string, id int32, command *protocol.PartitionState) error
BecomeFollower(topic string, id int32, command *protocol.PartitionState) error
Join(addr ...string) (int, error)
Cluster() []*ClusterMember
TopicPartitions(topic string) ([]*Partition, *Error)
TopicPartitions(topic string) ([]*Partition, protocol.Error)
Topics() map[string][]*Partition
IsLeaderOfPartition(topic string, id int32, leaderID int32) bool
}
Expand Down
117 changes: 58 additions & 59 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/travisjeffery/jocko"
"github.com/travisjeffery/jocko/protocol"
"github.com/travisjeffery/simplelog"
Expand Down Expand Up @@ -225,7 +224,7 @@ func (s *Server) handleAPIVersions(conn net.Conn, header *protocol.RequestHeader
return s.write(conn, header, r)
}

func (s *Server) handleCreateTopic(conn net.Conn, header *protocol.RequestHeader, reqs *protocol.CreateTopicRequests) (err error) {
func (s *Server) handleCreateTopic(conn net.Conn, header *protocol.RequestHeader, reqs *protocol.CreateTopicRequests) error {
resp := new(protocol.CreateTopicsResponse)
resp.TopicErrorCodes = make([]*protocol.TopicErrorCode, len(reqs.Requests))
isController := s.broker.IsController()
Expand All @@ -234,27 +233,25 @@ func (s *Server) handleCreateTopic(conn net.Conn, header *protocol.RequestHeader
if req.ReplicationFactor > int16(len(s.broker.Cluster())) {
resp.TopicErrorCodes[i] = &protocol.TopicErrorCode{
Topic: req.Topic,
ErrorCode: protocol.ErrInvalidReplicationFactor,
ErrorCode: protocol.ErrInvalidReplicationFactor.Code(),
}
continue
}
err = s.broker.CreateTopic(req.Topic, req.NumPartitions, req.ReplicationFactor)
if err != nil {
s.logger.Info("failed to create topic %s: %v", req.Topic, err)
return
}
err := s.broker.CreateTopic(req.Topic, req.NumPartitions, req.ReplicationFactor)
resp.TopicErrorCodes[i] = &protocol.TopicErrorCode{
Topic: req.Topic,
ErrorCode: protocol.ErrNone,
ErrorCode: err.Code(),
}
}
} else {
// TODO: forward req to controller
s.logger.Info("failed to create topic(s): %v", errors.New("broker is not controller"))
s.logger.Info("failed to create topic(s): %s", protocol.ErrNotController)
// TODO: could have these topic error code structs have protocol.Error
// set as the field instead of the code directly
for i, req := range reqs.Requests {
resp.TopicErrorCodes[i] = &protocol.TopicErrorCode{
Topic: req.Topic,
ErrorCode: protocol.ErrNotController,
ErrorCode: protocol.ErrNotController.Code(),
}
}
}
Expand All @@ -269,25 +266,23 @@ func (s *Server) handleDeleteTopics(conn net.Conn, header *protocol.RequestHeade
resp := new(protocol.DeleteTopicsResponse)
resp.TopicErrorCodes = make([]*protocol.TopicErrorCode, len(reqs.Topics))
isController := s.broker.IsController()
if err != nil {
return err
}
if isController {
for i, topic := range reqs.Topics {
err = s.broker.DeleteTopic(topic)
if err != nil {
s.logger.Info("failed to delete topic %s: %v", topic, err)
return
}
for i, topic := range reqs.Topics {
if !isController {
resp.TopicErrorCodes[i] = &protocol.TopicErrorCode{
Topic: topic,
ErrorCode: protocol.ErrNone,
ErrorCode: protocol.ErrNotController.Code(),
}
continue
}
err = s.broker.DeleteTopic(topic)
if err != nil {
s.logger.Info("failed to delete topic %s: %v", topic, err)
return
}
resp.TopicErrorCodes[i] = &protocol.TopicErrorCode{
Topic: topic,
ErrorCode: protocol.ErrNone.Code(),
}
} else {
// cID := s.broker.ControllerID()
// send the request to the controller
return
}
r := &protocol.Response{
CorrelationID: header.CorrelationID,
Expand All @@ -300,7 +295,7 @@ func (s *Server) handleLeaderAndISR(conn net.Conn, header *protocol.RequestHeade
body := &protocol.LeaderAndISRResponse{}
for _, p := range req.PartitionStates {
partition, err := s.broker.Partition(p.Topic, p.Partition)
if err != nil {
if err != protocol.ErrNone {
return err
}
if partition == nil {
Expand Down Expand Up @@ -376,15 +371,15 @@ func (s *Server) handleMetadata(conn net.Conn, header *protocol.RequestHeader, r
})
}
var topicMetadata []*protocol.TopicMetadata
topicMetadataFn := func(topic string, partitions []*jocko.Partition, errCode int16) *protocol.TopicMetadata {
topicMetadataFn := func(topic string, partitions []*jocko.Partition, err protocol.Error) *protocol.TopicMetadata {
partitionMetadata := make([]*protocol.PartitionMetadata, len(partitions))
for i, p := range partitions {
partitionMetadata[i] = &protocol.PartitionMetadata{
ParititionID: p.ID,
}
}
return &protocol.TopicMetadata{
TopicErrorCode: errCode,
TopicErrorCode: err.Code(),
Topic: topic,
PartitionMetadata: partitionMetadata,
}
Expand All @@ -402,11 +397,7 @@ func (s *Server) handleMetadata(conn net.Conn, header *protocol.RequestHeader, r
topicMetadata = make([]*protocol.TopicMetadata, len(req.Topics))
for i, topic := range req.Topics {
partitions, err := s.broker.TopicPartitions(topic)
errCode := protocol.ErrNone
if err != nil {
errCode = err.ErrorCode
}
topicMetadata[i] = topicMetadataFn(topic, partitions, errCode)
topicMetadata[i] = topicMetadataFn(topic, partitions, err)
}
}
resp := &protocol.MetadataResponse{
Expand Down Expand Up @@ -442,12 +433,12 @@ func (s *Server) handleOffsets(conn net.Conn, header *protocol.RequestHeader, re
pResp.Partition = p.Partition

partition, err := s.broker.Partition(t.Topic, p.Partition)

var offset int64
if err != nil {
pResp.ErrorCode = protocol.ErrUnknown
if err != protocol.ErrNone {
pResp.ErrorCode = err.Code()
continue
}

var offset int64
if p.Timestamp == -2 {
offset = partition.LowWatermark()
} else {
Expand All @@ -474,17 +465,17 @@ func (s *Server) handleProduce(conn net.Conn, header *protocol.RequestHeader, re
partition := jocko.NewPartition(td.Topic, p.Partition)
presp := &protocol.ProducePartitionResponse{}
partition, err := s.broker.Partition(td.Topic, p.Partition)
if err != nil {
presp.ErrorCode = protocol.ErrUnknownTopicOrPartition
if err != protocol.ErrNone {
presp.ErrorCode = err.Code()
}
if !s.broker.IsLeaderOfPartition(partition.Topic, partition.ID, partition.LeaderID()) {
presp.ErrorCode = protocol.ErrNotLeaderForPartition
presp.ErrorCode = protocol.ErrNotLeaderForPartition.Code()
// break ?
}
offset, err := partition.Append(p.RecordSet)
if err != nil {
offset, appendErr := partition.Append(p.RecordSet)
if appendErr != nil {
s.logger.Info("commitlog/append failed: %s", err)
presp.ErrorCode = protocol.ErrUnknown
presp.ErrorCode = protocol.ErrUnknown.Code()
}
presp.Partition = p.Partition
presp.BaseOffset = offset
Expand Down Expand Up @@ -517,21 +508,27 @@ func (s *Server) handleFetch(conn net.Conn, header *protocol.RequestHeader, r *p

for j, p := range topic.Partitions {
partition, err := s.broker.Partition(topic.Topic, p.Partition)
if err != nil {
// TODO set err code
s.logger.Info("failed to find partition: %v (%s/%d)", err, topic.Topic, p.Partition)
break
if err != protocol.ErrNone {
fr.PartitionResponses[j] = &protocol.FetchPartitionResponse{
Partition: p.Partition,
ErrorCode: err.Code(),
}
continue
}
if !s.broker.IsLeaderOfPartition(partition.Topic, partition.ID, partition.LeaderID()) {
s.logger.Info("failed to produce: %v", errors.New("broker is not partition leader"))
// TODO set err code
break
fr.PartitionResponses[j] = &protocol.FetchPartitionResponse{
Partition: p.Partition,
ErrorCode: protocol.ErrNotLeaderForPartition.Code(),
}
continue
}
rdr, err := partition.NewReader(p.FetchOffset, p.MaxBytes)
if err != nil {
s.logger.Info("failed to read partition: %v", err)
// TODO set err code
break
rdr, rdrErr := partition.NewReader(p.FetchOffset, p.MaxBytes)
if rdrErr != nil {
fr.PartitionResponses[j] = &protocol.FetchPartitionResponse{
Partition: p.Partition,
ErrorCode: protocol.ErrUnknown.Code(),
}
continue
}
b := new(bytes.Buffer)
var n int32
Expand All @@ -542,8 +539,10 @@ func (s *Server) handleFetch(conn net.Conn, header *protocol.RequestHeader, r *p
// TODO: copy these bytes to outer bytes
nn, err := io.Copy(b, rdr)
if err != nil && err != io.EOF {
s.logger.Info("failed to fetch messages: %v", err)
// TODO seT error code
fr.PartitionResponses[j] = &protocol.FetchPartitionResponse{
Partition: p.Partition,
ErrorCode: protocol.ErrUnknown.Code(),
}
break
}
n += int32(nn)
Expand All @@ -554,7 +553,7 @@ func (s *Server) handleFetch(conn net.Conn, header *protocol.RequestHeader, r *p

fr.PartitionResponses[j] = &protocol.FetchPartitionResponse{
Partition: p.Partition,
ErrorCode: protocol.ErrNone,
ErrorCode: protocol.ErrNone.Code(),
HighWatermark: partition.HighWatermark(),
RecordSet: b.Bytes(),
}
Expand Down

0 comments on commit 5b076d5

Please sign in to comment.