From 6045c3a65100edfa2694275d970f5e0e3f5514fd Mon Sep 17 00:00:00 2001 From: Travis Jeffery Date: Sat, 28 Oct 2017 16:03:24 -0500 Subject: [PATCH] broker: test metadata --- broker/broker.go | 12 ++++++++- broker/broker_test.go | 58 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/broker/broker.go b/broker/broker.go index 3f47458b..4924d129 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -367,10 +367,20 @@ func (b *Broker) handleMetadata(header *protocol.RequestHeader, req *protocol.Me } var topicMetadata []*protocol.TopicMetadata topicMetadataFn := func(topic string, partitions []*jocko.Partition, err protocol.Error) *protocol.TopicMetadata { + if err != protocol.ErrNone { + return &protocol.TopicMetadata{ + TopicErrorCode: err.Code(), + Topic: topic, + } + } partitionMetadata := make([]*protocol.PartitionMetadata, len(partitions)) for i, p := range partitions { partitionMetadata[i] = &protocol.PartitionMetadata{ - ParititionID: p.ID, + ParititionID: p.ID, + PartitionErrorCode: protocol.ErrNone.Code(), + Leader: p.Leader, + Replicas: p.Replicas, + ISR: p.ISR, } } return &protocol.TopicMetadata{ diff --git a/broker/broker_test.go b/broker/broker_test.go index 4ea57993..b9270903 100644 --- a/broker/broker_test.go +++ b/broker/broker_test.go @@ -301,6 +301,64 @@ func TestBroker_Run(t *testing.T) { }, }, }, + { + name: "metadata", + fields: newFields(), + args: args{ + requestCh: make(chan jocko.Request, 2), + responseCh: make(chan jocko.Response, 2), + requests: []jocko.Request{ + { + Header: &protocol.RequestHeader{CorrelationID: 1}, + Request: &protocol.CreateTopicRequests{Requests: []*protocol.CreateTopicRequest{{ + Topic: "the-topic", + NumPartitions: 1, + ReplicationFactor: 1, + }}}, + }, + { + Header: &protocol.RequestHeader{CorrelationID: 2}, + Request: &protocol.ProduceRequest{TopicData: []*protocol.TopicData{{ + Topic: "the-topic", + Data: []*protocol.Data{{ + RecordSet: mustEncode(&protocol.MessageSet{Offset: 0, Messages: []*protocol.Message{{Value: []byte("The message.")}}})}}}}}, + }, + { + Header: &protocol.RequestHeader{CorrelationID: 3}, + Request: &protocol.MetadataRequest{Topics: []string{"the-topic", "unknown-topic"}}, + }, + }, + responses: []jocko.Response{ + { + Header: &protocol.RequestHeader{CorrelationID: 1}, + Response: &protocol.Response{CorrelationID: 1, Body: &protocol.CreateTopicsResponse{ + TopicErrorCodes: []*protocol.TopicErrorCode{{Topic: "the-topic", ErrorCode: protocol.ErrNone.Code()}}, + }}, + }, + { + Header: &protocol.RequestHeader{CorrelationID: 2}, + Response: &protocol.Response{CorrelationID: 2, Body: &protocol.ProduceResponses{ + Responses: []*protocol.ProduceResponse{ + { + Topic: "the-topic", + PartitionResponses: []*protocol.ProducePartitionResponse{{Partition: 0, BaseOffset: 0, ErrorCode: protocol.ErrNone.Code()}}, + }, + }, + }}, + }, + { + Header: &protocol.RequestHeader{CorrelationID: 3}, + Response: &protocol.Response{CorrelationID: 3, Body: &protocol.MetadataResponse{ + Brokers: []*protocol.Broker{{NodeID: 1, Host: "localhost", Port: 9092}}, + TopicMetadata: []*protocol.TopicMetadata{ + {Topic: "the-topic", TopicErrorCode: protocol.ErrNone.Code(), PartitionMetadata: []*protocol.PartitionMetadata{{PartitionErrorCode: protocol.ErrNone.Code(), ParititionID: 0, Leader: 1, Replicas: []int32{1}, ISR: []int32{1}}}}, + {Topic: "unknown-topic", TopicErrorCode: protocol.ErrUnknownTopicOrPartition.Code()}, + }, + }}, + }, + }, + }, + }, { name: "produce topic/partition doesn't exist error", fields: newFields(),