From 85a680ea71e26d0f032fd296f685799da85b646e Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 21 Mar 2022 22:03:10 -0600 Subject: [PATCH] consuming: do not continually try to create fetch sessions If we fail at creating a fetch session, we should not keep trying. --- pkg/kgo/source.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 8c2ebc7d..00912f33 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -707,11 +707,11 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct // advance past them). setOffsets = true - if resp.Version < 7 { + if resp.Version < 7 || resp.SessionID <= 0 { // If the version is less than 7, we cannot use fetch sessions, // so we kill them on the first response. s.session.kill() - } else if resp.SessionID > 0 { + } else { s.session.bumpEpoch(resp.SessionID) }