Skip to content

Commit

Permalink
Check for broker existence when forming partition replica and ISR lis…
Browse files Browse the repository at this point in the history
…ts (#776)

Data for each partition returned by Conn.ReadPartitions includes the
list of replicas and the ISR for the partition. The broker data is
copied from the list of currently available brokers, retrieved from
Kafka metadata. It can happen that a broker is not present in metadata
(due to being down, for example), but still listed as a replica for a
partition. (For example, broker 2 may be down but the ID 2 can still be
listed as a replica for a partition.)

The logic that copies broker data from the list of available brokers
into partition data now omits any that are not present in the metadata
list. Without this change, partition data receives the copy of a nil
object in its replica or ISR list (ID 0, host nil, default port 9092),
which is useless.
  • Loading branch information
bhavanki authored Nov 2, 2021
1 parent df0521c commit 40933d2
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 10 deletions.
22 changes: 12 additions & 10 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,14 +994,6 @@ func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err err
}
}

makeBrokers := func(ids ...int32) []Broker {
b := make([]Broker, len(ids))
for i, id := range ids {
b[i] = brokers[id]
}
return b
}

for _, t := range res.Topics {
if t.TopicErrorCode != 0 && (c.topic == "" || t.TopicName == c.topic) {
// We only report errors if they happened for the topic of
Expand All @@ -1013,8 +1005,8 @@ func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err err
partitions = append(partitions, Partition{
Topic: t.TopicName,
Leader: brokers[p.Leader],
Replicas: makeBrokers(p.Replicas...),
Isr: makeBrokers(p.Isr...),
Replicas: makeBrokers(brokers, p.Replicas...),
Isr: makeBrokers(brokers, p.Isr...),
ID: int(p.PartitionID),
})
}
Expand All @@ -1025,6 +1017,16 @@ func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err err
return
}

func makeBrokers(brokers map[int32]Broker, ids ...int32) []Broker {
b := make([]Broker, 0, len(ids))
for _, id := range ids {
if br, ok := brokers[id]; ok {
b = append(b, br)
}
}
return b
}

// Write writes a message to the kafka broker that this connection was
// established to. The method returns the number of bytes written, or an error
// if something went wrong.
Expand Down
34 changes: 34 additions & 0 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1322,3 +1322,37 @@ func TestEmptyToNullableLeavesStringsIntact(t *testing.T) {
t.Error("Non empty string is not equal to the original string")
}
}

func TestMakeBrokersAllPresent(t *testing.T) {
brokers := make(map[int32]Broker)
brokers[1] = Broker{ID: 1, Host: "203.0.113.101", Port: 9092}
brokers[2] = Broker{ID: 1, Host: "203.0.113.102", Port: 9092}
brokers[3] = Broker{ID: 1, Host: "203.0.113.103", Port: 9092}

b := makeBrokers(brokers, 1, 2, 3)
if len(b) != 3 {
t.Errorf("Expected 3 brokers, got %d", len(b))
}
for _, i := range []int32{1, 2, 3} {
if b[i-1] != brokers[i] {
t.Errorf("Expected broker %d at index %d, got %d", i, i-1, b[i].ID)
}
}
}

func TestMakeBrokersOneMissing(t *testing.T) {
brokers := make(map[int32]Broker)
brokers[1] = Broker{ID: 1, Host: "203.0.113.101", Port: 9092}
brokers[3] = Broker{ID: 1, Host: "203.0.113.103", Port: 9092}

b := makeBrokers(brokers, 1, 2, 3)
if len(b) != 2 {
t.Errorf("Expected 2 brokers, got %d", len(b))
}
if b[0] != brokers[1] {
t.Errorf("Expected broker 1 at index 0, got %d", b[0].ID)
}
if b[1] != brokers[3] {
t.Errorf("Expected broker 3 at index 1, got %d", b[1].ID)
}
}

0 comments on commit 40933d2

Please sign in to comment.