diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 8c2ebc7d..00912f33 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -707,11 +707,11 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct // advance past them). setOffsets = true - if resp.Version < 7 { + if resp.Version < 7 || resp.SessionID <= 0 { // If the version is less than 7, we cannot use fetch sessions, // so we kill them on the first response. s.session.kill() - } else if resp.SessionID > 0 { + } else { s.session.bumpEpoch(resp.SessionID) }