diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index c3cf6648..87b00773 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -775,12 +775,9 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe ) { f = Fetch{Topics: make([]FetchTopic, 0, len(resp.Topics))} var ( - updateWhy multiUpdateWhy - - numParts int + updateWhy multiUpdateWhy numErrsStripped int - - kip320 = s.cl.supportsOffsetForLeaderEpoch() + kip320 = s.cl.supportsOffsetForLeaderEpoch() ) for _, rt := range resp.Topics { @@ -821,8 +818,6 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe continue } - numParts++ - // If we are fetching from the replica already, Kafka replies with a -1 // preferred read replica. If Kafka replies with a preferred replica, // it sends no records. @@ -953,7 +948,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe } } - return f, reloadOffsets, preferreds, numParts == numErrsStripped, updateMeta, updateWhy.reason("fetch had inner topic errors") + return f, reloadOffsets, preferreds, req.numOffsets == numErrsStripped, updateMeta, updateWhy.reason("fetch had inner topic errors") } // processRespPartition processes all records in all potentially compressed