From 4d12fcdb5fdee495d067b7f3798ab3f372496435 Mon Sep 17 00:00:00 2001 From: Travis Jeffery Date: Sat, 9 Sep 2017 04:00:42 -0500 Subject: [PATCH] Fix metadata response (fix #47) - Respond with all topics when no topic specified in request - Respond with specific topics given in request - Respond with unknown topic for unknown topics given in request --- broker/broker.go | 25 +++++++++++++------------ jocko.go | 8 +++++++- server/server.go | 48 ++++++++++++++++++++++++++++++++++-------------- 3 files changed, 54 insertions(+), 27 deletions(-) diff --git a/broker/broker.go b/broker/broker.go index e126ed64..bb00fbae 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -7,6 +7,7 @@ import ( "github.com/pkg/errors" "github.com/travisjeffery/jocko" "github.com/travisjeffery/jocko/commitlog" + "github.com/travisjeffery/jocko/protocol" "github.com/travisjeffery/simplelog" ) @@ -94,10 +95,19 @@ func (b *Broker) IsController() bool { } // TopicPartitions is used to get the partitions for the given topic. -func (b *Broker) TopicPartitions(topic string) (found []*jocko.Partition, err error) { +func (b *Broker) TopicPartitions(topic string) (found []*jocko.Partition, err *jocko.Error) { + b.mu.RLock() + if p, ok := b.topics[topic]; !ok { + return nil, &jocko.Error{ErrorCode: protocol.ErrUnknownTopicOrPartition} + } else { + return p, nil + } +} + +func (b *Broker) Topics() map[string][]*jocko.Partition { b.mu.RLock() defer b.mu.RUnlock() - return b.topics[topic], nil + return b.topics } func (b *Broker) Partition(topic string, partition int32) (*jocko.Partition, error) { @@ -168,15 +178,6 @@ func (b *Broker) IsLeaderOfPartition(topic string, pid int32, lid int32) bool { return result } -// Topics returns the list of known topic names. -func (b *Broker) Topics() []string { - topics := []string{} - for k := range b.topics { - topics = append(topics, k) - } - return topics -} - // Join is used to have the broker join the gossip ring. // The given address should be another broker listening on the Serf address. func (b *Broker) Join(addrs ...string) (int, error) { @@ -185,7 +186,7 @@ func (b *Broker) Join(addrs ...string) (int, error) { // CreateTopic is used to create the topic across the cluster. func (b *Broker) CreateTopic(topic string, partitions int32) error { - for _, t := range b.Topics() { + for t, _ := range b.Topics() { if t == topic { return ErrTopicExists } diff --git a/jocko.go b/jocko.go index 2dc5b497..7577c76b 100644 --- a/jocko.go +++ b/jocko.go @@ -9,6 +9,11 @@ import ( "github.com/travisjeffery/jocko/protocol" ) +type Error struct { + error + ErrorCode int16 +} + // CommitLog is the interface that wraps the commit log's methods and // is used to manage a partition's data. type CommitLog interface { @@ -179,7 +184,8 @@ type Broker interface { BecomeFollower(topic string, id int32, command *protocol.PartitionState) error Join(addr ...string) (int, error) Cluster() []*ClusterMember - TopicPartitions(topic string) ([]*Partition, error) + TopicPartitions(topic string) ([]*Partition, *Error) + Topics() map[string][]*Partition IsLeaderOfPartition(topic string, id int32, leaderID int32) bool } diff --git a/server/server.go b/server/server.go index bf67cd81..07c2cca8 100644 --- a/server/server.go +++ b/server/server.go @@ -239,10 +239,14 @@ func (s *Server) handleCreateTopic(conn net.Conn, header *protocol.RequestHeader } } } else { - s.logger.Info("failed to create topic %s: %v", errors.New("broker is not controller")) - // cID := s.broker.ControllerID() - // send the request to the controller - return + // TODO: forward req to controller + s.logger.Info("failed to create topic(s): %v", errors.New("broker is not controller")) + for i, req := range reqs.Requests { + resp.TopicErrorCodes[i] = &protocol.TopicErrorCode{ + Topic: req.Topic, + ErrorCode: protocol.ErrNotController, + } + } } r := &protocol.Response{ CorrelationID: header.CorrelationID, @@ -360,7 +364,6 @@ func (s *Server) handleJoin(w http.ResponseWriter, r *http.Request) { func (s *Server) handleMetadata(conn net.Conn, header *protocol.RequestHeader, req *protocol.MetadataRequest) error { brokers := make([]*protocol.Broker, 0, len(s.broker.Cluster())) - topics := make([]*protocol.TopicMetadata, len(req.Topics)) for _, b := range s.broker.Cluster() { brokers = append(brokers, &protocol.Broker{ NodeID: b.ID, @@ -368,26 +371,43 @@ func (s *Server) handleMetadata(conn net.Conn, header *protocol.RequestHeader, r Port: int32(b.Port), }) } - for i, t := range req.Topics { - partitions, err := s.broker.TopicPartitions(t) - if err != nil { - return err - } + var topicMetadata []*protocol.TopicMetadata + topicMetadataFn := func(topic string, partitions []*jocko.Partition, errCode int16) *protocol.TopicMetadata { partitionMetadata := make([]*protocol.PartitionMetadata, len(partitions)) for i, p := range partitions { partitionMetadata[i] = &protocol.PartitionMetadata{ ParititionID: p.ID, } } - topics[i] = &protocol.TopicMetadata{ - TopicErrorCode: protocol.ErrNone, - Topic: t, + return &protocol.TopicMetadata{ + TopicErrorCode: errCode, + Topic: topic, PartitionMetadata: partitionMetadata, } } + if len(req.Topics) == 0 { + // Respond with metadata for all topics + topics := s.broker.Topics() + topicMetadata = make([]*protocol.TopicMetadata, len(topics)) + idx := 0 + for topic, partitions := range topics { + topicMetadata[idx] = topicMetadataFn(topic, partitions, protocol.ErrNone) + idx++ + } + } else { + topicMetadata = make([]*protocol.TopicMetadata, len(req.Topics)) + for i, topic := range req.Topics { + partitions, err := s.broker.TopicPartitions(topic) + errCode := protocol.ErrNone + if err != nil { + errCode = err.ErrorCode + } + topicMetadata[i] = topicMetadataFn(topic, partitions, errCode) + } + } resp := &protocol.MetadataResponse{ Brokers: brokers, - TopicMetadata: topics, + TopicMetadata: topicMetadata, } r := &protocol.Response{ CorrelationID: header.CorrelationID,