Skip to content

Commit

Permalink
kgo source: kill the session if FetchResponse is version < 7
Browse files Browse the repository at this point in the history
If we did not kill the session, we would still use sessions even though
kafka does not support it. If kafka replied with a partition error, we
would stop requesting the partition and be unable to make progress.
  • Loading branch information
twmb committed Feb 12, 2021
1 parent 27fb37d commit c7dbafb
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,11 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct
return
}

if resp.SessionID > 0 {
if resp.Version < 7 {
// If the version is less than 7, we cannot use fetch sessions,
// so we kill them on the first response.
s.session.kill()
} else if resp.SessionID > 0 {
s.session.bumpEpoch(resp.SessionID)
}

Expand Down

0 comments on commit c7dbafb

Please sign in to comment.