Skip to content

Commit

Permalink
broker: test metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Oct 28, 2017
1 parent f929635 commit 6045c3a
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 1 deletion.
12 changes: 11 additions & 1 deletion broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,20 @@ func (b *Broker) handleMetadata(header *protocol.RequestHeader, req *protocol.Me
}
var topicMetadata []*protocol.TopicMetadata
topicMetadataFn := func(topic string, partitions []*jocko.Partition, err protocol.Error) *protocol.TopicMetadata {
if err != protocol.ErrNone {
return &protocol.TopicMetadata{
TopicErrorCode: err.Code(),
Topic: topic,
}
}
partitionMetadata := make([]*protocol.PartitionMetadata, len(partitions))
for i, p := range partitions {
partitionMetadata[i] = &protocol.PartitionMetadata{
ParititionID: p.ID,
ParititionID: p.ID,
PartitionErrorCode: protocol.ErrNone.Code(),
Leader: p.Leader,
Replicas: p.Replicas,
ISR: p.ISR,
}
}
return &protocol.TopicMetadata{
Expand Down
58 changes: 58 additions & 0 deletions broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,64 @@ func TestBroker_Run(t *testing.T) {
},
},
},
{
name: "metadata",
fields: newFields(),
args: args{
requestCh: make(chan jocko.Request, 2),
responseCh: make(chan jocko.Response, 2),
requests: []jocko.Request{
{
Header: &protocol.RequestHeader{CorrelationID: 1},
Request: &protocol.CreateTopicRequests{Requests: []*protocol.CreateTopicRequest{{
Topic: "the-topic",
NumPartitions: 1,
ReplicationFactor: 1,
}}},
},
{
Header: &protocol.RequestHeader{CorrelationID: 2},
Request: &protocol.ProduceRequest{TopicData: []*protocol.TopicData{{
Topic: "the-topic",
Data: []*protocol.Data{{
RecordSet: mustEncode(&protocol.MessageSet{Offset: 0, Messages: []*protocol.Message{{Value: []byte("The message.")}}})}}}}},
},
{
Header: &protocol.RequestHeader{CorrelationID: 3},
Request: &protocol.MetadataRequest{Topics: []string{"the-topic", "unknown-topic"}},
},
},
responses: []jocko.Response{
{
Header: &protocol.RequestHeader{CorrelationID: 1},
Response: &protocol.Response{CorrelationID: 1, Body: &protocol.CreateTopicsResponse{
TopicErrorCodes: []*protocol.TopicErrorCode{{Topic: "the-topic", ErrorCode: protocol.ErrNone.Code()}},
}},
},
{
Header: &protocol.RequestHeader{CorrelationID: 2},
Response: &protocol.Response{CorrelationID: 2, Body: &protocol.ProduceResponses{
Responses: []*protocol.ProduceResponse{
{
Topic: "the-topic",
PartitionResponses: []*protocol.ProducePartitionResponse{{Partition: 0, BaseOffset: 0, ErrorCode: protocol.ErrNone.Code()}},
},
},
}},
},
{
Header: &protocol.RequestHeader{CorrelationID: 3},
Response: &protocol.Response{CorrelationID: 3, Body: &protocol.MetadataResponse{
Brokers: []*protocol.Broker{{NodeID: 1, Host: "localhost", Port: 9092}},
TopicMetadata: []*protocol.TopicMetadata{
{Topic: "the-topic", TopicErrorCode: protocol.ErrNone.Code(), PartitionMetadata: []*protocol.PartitionMetadata{{PartitionErrorCode: protocol.ErrNone.Code(), ParititionID: 0, Leader: 1, Replicas: []int32{1}, ISR: []int32{1}}}},
{Topic: "unknown-topic", TopicErrorCode: protocol.ErrUnknownTopicOrPartition.Code()},
},
}},
},
},
},
},
{
name: "produce topic/partition doesn't exist error",
fields: newFields(),
Expand Down

0 comments on commit 6045c3a

Please sign in to comment.