diff --git a/conn.go b/conn.go index 1e79fab52..a2a7499ce 100644 --- a/conn.go +++ b/conn.go @@ -994,14 +994,6 @@ func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err err } } - makeBrokers := func(ids ...int32) []Broker { - b := make([]Broker, len(ids)) - for i, id := range ids { - b[i] = brokers[id] - } - return b - } - for _, t := range res.Topics { if t.TopicErrorCode != 0 && (c.topic == "" || t.TopicName == c.topic) { // We only report errors if they happened for the topic of @@ -1013,8 +1005,8 @@ func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err err partitions = append(partitions, Partition{ Topic: t.TopicName, Leader: brokers[p.Leader], - Replicas: makeBrokers(p.Replicas...), - Isr: makeBrokers(p.Isr...), + Replicas: makeBrokers(brokers, p.Replicas...), + Isr: makeBrokers(brokers, p.Isr...), ID: int(p.PartitionID), }) } @@ -1025,6 +1017,16 @@ func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err err return } +func makeBrokers(brokers map[int32]Broker, ids ...int32) []Broker { + b := make([]Broker, 0, len(ids)) + for _, id := range ids { + if br, ok := brokers[id]; ok { + b = append(b, br) + } + } + return b +} + // Write writes a message to the kafka broker that this connection was // established to. The method returns the number of bytes written, or an error // if something went wrong. diff --git a/conn_test.go b/conn_test.go index d85eb9414..f3921818a 100644 --- a/conn_test.go +++ b/conn_test.go @@ -1322,3 +1322,37 @@ func TestEmptyToNullableLeavesStringsIntact(t *testing.T) { t.Error("Non empty string is not equal to the original string") } } + +func TestMakeBrokersAllPresent(t *testing.T) { + brokers := make(map[int32]Broker) + brokers[1] = Broker{ID: 1, Host: "203.0.113.101", Port: 9092} + brokers[2] = Broker{ID: 1, Host: "203.0.113.102", Port: 9092} + brokers[3] = Broker{ID: 1, Host: "203.0.113.103", Port: 9092} + + b := makeBrokers(brokers, 1, 2, 3) + if len(b) != 3 { + t.Errorf("Expected 3 brokers, got %d", len(b)) + } + for _, i := range []int32{1, 2, 3} { + if b[i-1] != brokers[i] { + t.Errorf("Expected broker %d at index %d, got %d", i, i-1, b[i].ID) + } + } +} + +func TestMakeBrokersOneMissing(t *testing.T) { + brokers := make(map[int32]Broker) + brokers[1] = Broker{ID: 1, Host: "203.0.113.101", Port: 9092} + brokers[3] = Broker{ID: 1, Host: "203.0.113.103", Port: 9092} + + b := makeBrokers(brokers, 1, 2, 3) + if len(b) != 2 { + t.Errorf("Expected 2 brokers, got %d", len(b)) + } + if b[0] != brokers[1] { + t.Errorf("Expected broker 1 at index 0, got %d", b[0].ID) + } + if b[1] != brokers[3] { + t.Errorf("Expected broker 3 at index 1, got %d", b[1].ID) + } +}