Skip to content

Commit

Permalink
source: avoid backoff / session reset when there is no consumed data
Browse files Browse the repository at this point in the history
We use fetch sessions by default. 2b4d7ea in 1.9.0 fixed a potential
spin-loop, but introduced a different minor bug: if broker returned
nothing to consume for our fetch session, the broker would return no
partitions and we would (a) backoff, (b) trigger a metadata update, and
(c) reset our fetch session.

The fix changed the source to back off if all partitions were stripped
due to retriable errors, but our check was against the number of
partitions in the response. We now check against the number of
partitions in the request (numOffsets).
  • Loading branch information
twmb committed Dec 1, 2022
1 parent 6ce8bdf commit 6c0abd1
Showing 1 changed file with 3 additions and 8 deletions.
11 changes: 3 additions & 8 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,12 +775,9 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
) {
f = Fetch{Topics: make([]FetchTopic, 0, len(resp.Topics))}
var (
updateWhy multiUpdateWhy

numParts int
updateWhy multiUpdateWhy
numErrsStripped int

kip320 = s.cl.supportsOffsetForLeaderEpoch()
kip320 = s.cl.supportsOffsetForLeaderEpoch()
)

for _, rt := range resp.Topics {
Expand Down Expand Up @@ -821,8 +818,6 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
continue
}

numParts++

// If we are fetching from the replica already, Kafka replies with a -1
// preferred read replica. If Kafka replies with a preferred replica,
// it sends no records.
Expand Down Expand Up @@ -953,7 +948,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
}
}

return f, reloadOffsets, preferreds, numParts == numErrsStripped, updateMeta, updateWhy.reason("fetch had inner topic errors")
return f, reloadOffsets, preferreds, req.numOffsets == numErrsStripped, updateMeta, updateWhy.reason("fetch had inner topic errors")
}

// processRespPartition processes all records in all potentially compressed
Expand Down

0 comments on commit 6c0abd1

Please sign in to comment.