diff --git a/broker/broker.go b/broker/broker.go index 124bb13a..b302dc67 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -96,13 +96,13 @@ func (b *Broker) IsController() bool { } // TopicPartitions is used to get the partitions for the given topic. -func (b *Broker) TopicPartitions(topic string) (found []*jocko.Partition, err *jocko.Error) { +func (b *Broker) TopicPartitions(topic string) (found []*jocko.Partition, err protocol.Error) { b.mu.RLock() defer b.mu.RUnlock() if p, ok := b.topics[topic]; !ok { - return nil, &jocko.Error{ErrorCode: protocol.ErrUnknownTopicOrPartition} + return nil, protocol.ErrUnknownTopicOrPartition } else { - return p, nil + return p, protocol.ErrNone } } @@ -112,17 +112,17 @@ func (b *Broker) Topics() map[string][]*jocko.Partition { return b.topics } -func (b *Broker) Partition(topic string, partition int32) (*jocko.Partition, error) { +func (b *Broker) Partition(topic string, partition int32) (*jocko.Partition, protocol.Error) { found, err := b.TopicPartitions(topic) - if err != nil { + if err != protocol.ErrNone { return nil, err } for _, f := range found { if f.ID == partition { - return f, nil + return f, protocol.ErrNone } } - return nil, errors.New("partition not found") + return nil, protocol.ErrUnknownTopicOrPartition } // AddPartition is used to add a partition across the cluster. @@ -187,10 +187,10 @@ func (b *Broker) Join(addrs ...string) (int, error) { } // CreateTopic is used to create the topic across the cluster. -func (b *Broker) CreateTopic(topic string, partitions int32, replicationFactor int16) error { +func (b *Broker) CreateTopic(topic string, partitions int32, replicationFactor int16) protocol.Error { for t, _ := range b.Topics() { if t == topic { - return ErrTopicExists + return protocol.ErrTopicAlreadyExists } } @@ -217,10 +217,10 @@ func (b *Broker) CreateTopic(topic string, partitions int32, replicationFactor i ISR: replicas, } if err := b.AddPartition(partition); err != nil { - return err + return protocol.ErrUnknown } } - return nil + return protocol.ErrNone } // DeleteTopic is used to delete the topic across the cluster. @@ -231,7 +231,7 @@ func (b *Broker) DeleteTopic(topic string) error { // deleteTopic is used to delete the topic from this broker. func (b *Broker) deleteTopic(tp *jocko.Partition) error { partitions, err := b.TopicPartitions(tp.Topic) - if err != nil { + if err != protocol.ErrNone { return err } for _, p := range partitions { diff --git a/examples/sarama/main.go b/examples/sarama/main.go index 9ad3d774..f70a78b3 100644 --- a/examples/sarama/main.go +++ b/examples/sarama/main.go @@ -144,7 +144,7 @@ func setup() func() { } // creating/deleting topic directly since Sarama doesn't support it - ir err := store.CreateTopic(topic, numPartitions, 1); err != protocol.ErrNone { + if err := store.CreateTopic(topic, numPartitions, 1); err != protocol.ErrNone { panic(err) } diff --git a/jocko.go b/jocko.go index 7c1e590c..ff1f77b0 100644 --- a/jocko.go +++ b/jocko.go @@ -9,11 +9,6 @@ import ( "github.com/travisjeffery/jocko/protocol" ) -type Error struct { - error - ErrorCode int16 -} - // CommitLog is the interface that wraps the commit log's methods and // is used to manage a partition's data. type CommitLog interface { @@ -175,16 +170,16 @@ type Raft interface { type Broker interface { ID() int32 IsController() bool - CreateTopic(topic string, partitions int32, replicationFactor int16) error + CreateTopic(topic string, partitions int32, replicationFactor int16) protocol.Error StartReplica(*Partition) error DeleteTopic(topic string) error - Partition(topic string, id int32) (*Partition, error) + Partition(topic string, id int32) (*Partition, protocol.Error) ClusterMember(brokerID int32) *ClusterMember BecomeLeader(topic string, id int32, command *protocol.PartitionState) error BecomeFollower(topic string, id int32, command *protocol.PartitionState) error Join(addr ...string) (int, error) Cluster() []*ClusterMember - TopicPartitions(topic string) ([]*Partition, *Error) + TopicPartitions(topic string) ([]*Partition, protocol.Error) Topics() map[string][]*Partition IsLeaderOfPartition(topic string, id int32, leaderID int32) bool } diff --git a/server/server.go b/server/server.go index 5085375d..a72c21b1 100644 --- a/server/server.go +++ b/server/server.go @@ -12,7 +12,6 @@ import ( "github.com/gorilla/handlers" "github.com/gorilla/mux" - "github.com/pkg/errors" "github.com/travisjeffery/jocko" "github.com/travisjeffery/jocko/protocol" "github.com/travisjeffery/simplelog" @@ -225,7 +224,7 @@ func (s *Server) handleAPIVersions(conn net.Conn, header *protocol.RequestHeader return s.write(conn, header, r) } -func (s *Server) handleCreateTopic(conn net.Conn, header *protocol.RequestHeader, reqs *protocol.CreateTopicRequests) (err error) { +func (s *Server) handleCreateTopic(conn net.Conn, header *protocol.RequestHeader, reqs *protocol.CreateTopicRequests) error { resp := new(protocol.CreateTopicsResponse) resp.TopicErrorCodes = make([]*protocol.TopicErrorCode, len(reqs.Requests)) isController := s.broker.IsController() @@ -234,27 +233,25 @@ func (s *Server) handleCreateTopic(conn net.Conn, header *protocol.RequestHeader if req.ReplicationFactor > int16(len(s.broker.Cluster())) { resp.TopicErrorCodes[i] = &protocol.TopicErrorCode{ Topic: req.Topic, - ErrorCode: protocol.ErrInvalidReplicationFactor, + ErrorCode: protocol.ErrInvalidReplicationFactor.Code(), } continue } - err = s.broker.CreateTopic(req.Topic, req.NumPartitions, req.ReplicationFactor) - if err != nil { - s.logger.Info("failed to create topic %s: %v", req.Topic, err) - return - } + err := s.broker.CreateTopic(req.Topic, req.NumPartitions, req.ReplicationFactor) resp.TopicErrorCodes[i] = &protocol.TopicErrorCode{ Topic: req.Topic, - ErrorCode: protocol.ErrNone, + ErrorCode: err.Code(), } } } else { // TODO: forward req to controller - s.logger.Info("failed to create topic(s): %v", errors.New("broker is not controller")) + s.logger.Info("failed to create topic(s): %s", protocol.ErrNotController) + // TODO: could have these topic error code structs have protocol.Error + // set as the field instead of the code directly for i, req := range reqs.Requests { resp.TopicErrorCodes[i] = &protocol.TopicErrorCode{ Topic: req.Topic, - ErrorCode: protocol.ErrNotController, + ErrorCode: protocol.ErrNotController.Code(), } } } @@ -269,25 +266,23 @@ func (s *Server) handleDeleteTopics(conn net.Conn, header *protocol.RequestHeade resp := new(protocol.DeleteTopicsResponse) resp.TopicErrorCodes = make([]*protocol.TopicErrorCode, len(reqs.Topics)) isController := s.broker.IsController() - if err != nil { - return err - } - if isController { - for i, topic := range reqs.Topics { - err = s.broker.DeleteTopic(topic) - if err != nil { - s.logger.Info("failed to delete topic %s: %v", topic, err) - return - } + for i, topic := range reqs.Topics { + if !isController { resp.TopicErrorCodes[i] = &protocol.TopicErrorCode{ Topic: topic, - ErrorCode: protocol.ErrNone, + ErrorCode: protocol.ErrNotController.Code(), } + continue + } + err = s.broker.DeleteTopic(topic) + if err != nil { + s.logger.Info("failed to delete topic %s: %v", topic, err) + return + } + resp.TopicErrorCodes[i] = &protocol.TopicErrorCode{ + Topic: topic, + ErrorCode: protocol.ErrNone.Code(), } - } else { - // cID := s.broker.ControllerID() - // send the request to the controller - return } r := &protocol.Response{ CorrelationID: header.CorrelationID, @@ -300,7 +295,7 @@ func (s *Server) handleLeaderAndISR(conn net.Conn, header *protocol.RequestHeade body := &protocol.LeaderAndISRResponse{} for _, p := range req.PartitionStates { partition, err := s.broker.Partition(p.Topic, p.Partition) - if err != nil { + if err != protocol.ErrNone { return err } if partition == nil { @@ -376,7 +371,7 @@ func (s *Server) handleMetadata(conn net.Conn, header *protocol.RequestHeader, r }) } var topicMetadata []*protocol.TopicMetadata - topicMetadataFn := func(topic string, partitions []*jocko.Partition, errCode int16) *protocol.TopicMetadata { + topicMetadataFn := func(topic string, partitions []*jocko.Partition, err protocol.Error) *protocol.TopicMetadata { partitionMetadata := make([]*protocol.PartitionMetadata, len(partitions)) for i, p := range partitions { partitionMetadata[i] = &protocol.PartitionMetadata{ @@ -384,7 +379,7 @@ func (s *Server) handleMetadata(conn net.Conn, header *protocol.RequestHeader, r } } return &protocol.TopicMetadata{ - TopicErrorCode: errCode, + TopicErrorCode: err.Code(), Topic: topic, PartitionMetadata: partitionMetadata, } @@ -402,11 +397,7 @@ func (s *Server) handleMetadata(conn net.Conn, header *protocol.RequestHeader, r topicMetadata = make([]*protocol.TopicMetadata, len(req.Topics)) for i, topic := range req.Topics { partitions, err := s.broker.TopicPartitions(topic) - errCode := protocol.ErrNone - if err != nil { - errCode = err.ErrorCode - } - topicMetadata[i] = topicMetadataFn(topic, partitions, errCode) + topicMetadata[i] = topicMetadataFn(topic, partitions, err) } } resp := &protocol.MetadataResponse{ @@ -442,12 +433,12 @@ func (s *Server) handleOffsets(conn net.Conn, header *protocol.RequestHeader, re pResp.Partition = p.Partition partition, err := s.broker.Partition(t.Topic, p.Partition) - - var offset int64 - if err != nil { - pResp.ErrorCode = protocol.ErrUnknown + if err != protocol.ErrNone { + pResp.ErrorCode = err.Code() continue } + + var offset int64 if p.Timestamp == -2 { offset = partition.LowWatermark() } else { @@ -474,17 +465,17 @@ func (s *Server) handleProduce(conn net.Conn, header *protocol.RequestHeader, re partition := jocko.NewPartition(td.Topic, p.Partition) presp := &protocol.ProducePartitionResponse{} partition, err := s.broker.Partition(td.Topic, p.Partition) - if err != nil { - presp.ErrorCode = protocol.ErrUnknownTopicOrPartition + if err != protocol.ErrNone { + presp.ErrorCode = err.Code() } if !s.broker.IsLeaderOfPartition(partition.Topic, partition.ID, partition.LeaderID()) { - presp.ErrorCode = protocol.ErrNotLeaderForPartition + presp.ErrorCode = protocol.ErrNotLeaderForPartition.Code() // break ? } - offset, err := partition.Append(p.RecordSet) - if err != nil { + offset, appendErr := partition.Append(p.RecordSet) + if appendErr != nil { s.logger.Info("commitlog/append failed: %s", err) - presp.ErrorCode = protocol.ErrUnknown + presp.ErrorCode = protocol.ErrUnknown.Code() } presp.Partition = p.Partition presp.BaseOffset = offset @@ -517,21 +508,27 @@ func (s *Server) handleFetch(conn net.Conn, header *protocol.RequestHeader, r *p for j, p := range topic.Partitions { partition, err := s.broker.Partition(topic.Topic, p.Partition) - if err != nil { - // TODO set err code - s.logger.Info("failed to find partition: %v (%s/%d)", err, topic.Topic, p.Partition) - break + if err != protocol.ErrNone { + fr.PartitionResponses[j] = &protocol.FetchPartitionResponse{ + Partition: p.Partition, + ErrorCode: err.Code(), + } + continue } if !s.broker.IsLeaderOfPartition(partition.Topic, partition.ID, partition.LeaderID()) { - s.logger.Info("failed to produce: %v", errors.New("broker is not partition leader")) - // TODO set err code - break + fr.PartitionResponses[j] = &protocol.FetchPartitionResponse{ + Partition: p.Partition, + ErrorCode: protocol.ErrNotLeaderForPartition.Code(), + } + continue } - rdr, err := partition.NewReader(p.FetchOffset, p.MaxBytes) - if err != nil { - s.logger.Info("failed to read partition: %v", err) - // TODO set err code - break + rdr, rdrErr := partition.NewReader(p.FetchOffset, p.MaxBytes) + if rdrErr != nil { + fr.PartitionResponses[j] = &protocol.FetchPartitionResponse{ + Partition: p.Partition, + ErrorCode: protocol.ErrUnknown.Code(), + } + continue } b := new(bytes.Buffer) var n int32 @@ -542,8 +539,10 @@ func (s *Server) handleFetch(conn net.Conn, header *protocol.RequestHeader, r *p // TODO: copy these bytes to outer bytes nn, err := io.Copy(b, rdr) if err != nil && err != io.EOF { - s.logger.Info("failed to fetch messages: %v", err) - // TODO seT error code + fr.PartitionResponses[j] = &protocol.FetchPartitionResponse{ + Partition: p.Partition, + ErrorCode: protocol.ErrUnknown.Code(), + } break } n += int32(nn) @@ -554,7 +553,7 @@ func (s *Server) handleFetch(conn net.Conn, header *protocol.RequestHeader, r *p fr.PartitionResponses[j] = &protocol.FetchPartitionResponse{ Partition: p.Partition, - ErrorCode: protocol.ErrNone, + ErrorCode: protocol.ErrNone.Code(), HighWatermark: partition.HighWatermark(), RecordSet: b.Bytes(), }