From 522c9e277eb67172c1cda7388f86734a0ceda827 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 18 May 2021 12:43:05 -0600 Subject: [PATCH] sync response: use protocol from JoinGroupResponse The sync group response's Protocol field is a new field, added for some KIPy reason. Older brokers will not send this field; we need to always use the protocl that was specified in JoinGroupResponse --- pkg/kgo/consumer_group.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index d8543750..e3d72b35 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -1177,7 +1177,7 @@ start: return err } - if err = g.handleSyncResp(syncResp); err != nil { + if err = g.handleSyncResp(protocol, syncResp); err != nil { if err == kerr.RebalanceInProgress { g.cl.cfg.logger.Log(LogLevelInfo, "sync failed with RebalanceInProgress, rejoining") goto start @@ -1246,15 +1246,11 @@ func (g *groupConsumer) handleJoinResp(resp *kmsg.JoinGroupResponse) (restart bo return } -func (g *groupConsumer) handleSyncResp(resp *kmsg.SyncGroupResponse) error { +func (g *groupConsumer) handleSyncResp(protocol string, resp *kmsg.SyncGroupResponse) error { if err := kerr.ErrorForCode(resp.ErrorCode); err != nil { return err } - var protocol string - if resp.Protocol != nil { - protocol = *resp.Protocol - } b, err := g.findBalancer("sync assignment", protocol) if err != nil { return err