Skip to content

Commit

Permalink
client: avoid sharding to partitions with no leader
Browse files Browse the repository at this point in the history
ListOffsets, DeleteRecords, and OffsetForLeaderEpoch all need to go to
partition leaders. If a metadata response indicates -1 for the leader,
we should fail the partition, rather than attempting to issue to an
unknown broker.
  • Loading branch information
twmb committed Oct 6, 2021
1 parent 3baa522 commit 126778a
Showing 1 changed file with 19 additions and 0 deletions.
19 changes: 19 additions & 0 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1645,6 +1645,13 @@ func missingOrCodeP(t string, p int32, exists bool, code int16) error {
return kerr.ErrorForCode(code)
}

func noLeader(t string, p int32, l int32) error {
if l < 0 {
return fmt.Errorf("topic %s partition %d has no leader according to the metadata lookup", t, p, l)
}
return nil
}

// This is a helper for the sharded requests below; if mapping metadata fails
// to load topics or partitions, we group the failures by error.
//
Expand Down Expand Up @@ -1760,6 +1767,10 @@ func (cl *listOffsetsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]i
unknowns.err(err, t, partition)
continue
}
if err := noLeader(t, p.Partition, p.Leader); err != nil {
unknowns.err(err, t, partition)
continue
}

brokerReq := brokerReqs[p.Leader]
if brokerReq == nil {
Expand Down Expand Up @@ -2178,6 +2189,10 @@ func (cl *deleteRecordsSharder) shard(ctx context.Context, kreq kmsg.Request) ([
unknowns.err(err, t, partition)
continue
}
if err := noLeader(t, p.Partition, p.Leader); err != nil {
unknowns.err(err, t, partition)
continue
}

brokerReq := brokerReqs[p.Leader]
if brokerReq == nil {
Expand Down Expand Up @@ -2272,6 +2287,10 @@ func (cl *offsetForLeaderEpochSharder) shard(ctx context.Context, kreq kmsg.Requ
unknowns.err(err, t, partition)
continue
}
if err := noLeader(t, p.Partition, p.Leader); err != nil {
unknowns.err(err, t, partition)
continue
}

brokerReq := brokerReqs[p.Leader]
if brokerReq == nil {
Expand Down

0 comments on commit 126778a

Please sign in to comment.