Skip to content

Commit

Permalink
sync response: use protocol from JoinGroupResponse
Browse files Browse the repository at this point in the history
The sync group response's Protocol field is a new field, added for some
KIPy reason. Older brokers will not send this field; we need to always
use the protocl that was specified in JoinGroupResponse
  • Loading branch information
twmb committed May 18, 2021
1 parent d23fafa commit 522c9e2
Showing 1 changed file with 2 additions and 6 deletions.
8 changes: 2 additions & 6 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ start:
return err
}

if err = g.handleSyncResp(syncResp); err != nil {
if err = g.handleSyncResp(protocol, syncResp); err != nil {
if err == kerr.RebalanceInProgress {
g.cl.cfg.logger.Log(LogLevelInfo, "sync failed with RebalanceInProgress, rejoining")
goto start
Expand Down Expand Up @@ -1246,15 +1246,11 @@ func (g *groupConsumer) handleJoinResp(resp *kmsg.JoinGroupResponse) (restart bo
return
}

func (g *groupConsumer) handleSyncResp(resp *kmsg.SyncGroupResponse) error {
func (g *groupConsumer) handleSyncResp(protocol string, resp *kmsg.SyncGroupResponse) error {
if err := kerr.ErrorForCode(resp.ErrorCode); err != nil {
return err
}

var protocol string
if resp.Protocol != nil {
protocol = *resp.Protocol
}
b, err := g.findBalancer("sync assignment", protocol)
if err != nil {
return err
Expand Down

0 comments on commit 522c9e2

Please sign in to comment.