diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index f4ee5a40..5d43a45c 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -183,16 +183,6 @@ type cfg struct { commitCallback func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error) } -// cooperative is a helper that returns whether all group balancers in the -// config are cooperative. -func (cfg *cfg) cooperative() bool { - cooperative := true - for _, balancer := range cfg.balancers { - cooperative = cooperative && balancer.IsCooperative() - } - return cooperative -} - func (cfg *cfg) validate() error { if len(cfg.seedBrokers) == 0 { return errors.New("config erroneously has no seed brokers") diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 8c2cbcc6..1de0ce0b 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -24,7 +24,7 @@ type groupConsumer struct { cancel func() manageDone chan struct{} // closed once when the manage goroutine quits - cooperative bool // true if all config balancers are cooperative + cooperative atomicBool // true if the group balancer chosen during Join is cooperative // The data for topics that the user assigned. Metadata updates the // atomic.Value in each pointer atomically. If we are consuming via @@ -204,7 +204,6 @@ func (c *consumer) initGroup() { reSeen: make(map[string]bool), manageDone: make(chan struct{}), - cooperative: c.cl.cfg.cooperative(), tps: newTopicsPartitions(), rejoinCh: make(chan string, 1), heartbeatForceCh: make(chan func(error)), @@ -458,7 +457,7 @@ func (g *groupConsumer) leave() (wait func()) { // returns the difference of g.nowAssigned and g.lastAssigned. func (g *groupConsumer) diffAssigned() (added, lost map[string][]int32) { nowAssigned := g.nowAssigned.clone() - if !g.cooperative { + if !g.cooperative.Load() { return nowAssigned, nil } @@ -534,7 +533,7 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi g.c.waitAndAddRebalance() defer g.c.unaddRebalance() - if !g.cooperative || leaving { // stage == revokeThisSession if not cooperative + if !g.cooperative.Load() || leaving { // stage == revokeThisSession if not cooperative // If we are an eager consumer, we stop fetching all of our // current partitions as we will be revoking them. g.c.mu.Lock() @@ -545,7 +544,7 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi } g.c.mu.Unlock() - if !g.cooperative { + if !g.cooperative.Load() { g.cfg.logger.Log(LogLevelInfo, "eager consumer revoking prior assigned partitions", "group", g.cfg.group, "revoking", g.nowAssigned.read()) } else { g.cfg.logger.Log(LogLevelInfo, "cooperative consumer revoking prior assigned partitions because leaving group", "group", g.cfg.group, "revoking", g.nowAssigned.read()) @@ -687,7 +686,7 @@ func newAssignRevokeSession() *assignRevokeSession { func (s *assignRevokeSession) prerevoke(g *groupConsumer, lost map[string][]int32) <-chan struct{} { go func() { defer close(s.prerevokeDone) - if g.cooperative && len(lost) > 0 { + if g.cooperative.Load() && len(lost) > 0 { g.revoke(revokeLastSession, lost, false) } }() @@ -778,7 +777,7 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() (string, error) { // If cooperative consuming, we may have to resume fetches. See the // comment on adjustCooperativeFetchOffsets. - if g.cooperative { + if g.cooperative.Load() { added = g.adjustCooperativeFetchOffsets(added, lost) } @@ -836,7 +835,7 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio // cooperative consumers rejoin the group immediately, and we want to // detect that in 500ms rather than 3s. var cooperativeFastCheck <-chan time.Time - if g.cooperative { + if g.cooperative.Load() { cooperativeFastCheck = time.After(500 * time.Millisecond) } @@ -1141,6 +1140,17 @@ func (g *groupConsumer) handleJoinResp(resp *kmsg.JoinGroupResponse) (restart bo protocol = *resp.Protocol } + for _, balancer := range g.cfg.balancers { + if protocol == balancer.ProtocolName() { + cooperative := balancer.IsCooperative() + if !cooperative && g.cooperative.Load() { + g.cfg.logger.Log(LogLevelWarn, "downgrading from cooperative group to eager group, this is not supported per KIP-429!") + } + g.cooperative.Store(cooperative) + break + } + } + // KIP-345 has a fundamental limitation that KIP-814 also does not // solve. // @@ -2608,7 +2618,7 @@ func (g *groupConsumer) commitAcrossRebalance( // We retry four times, for five tries total: cooperative rebalancing // uses two back to back rebalances, and the commit could // pathologically end during both. - if g.cooperative && tries < 5 { + if g.cooperative.Load() && tries < 5 { origDone := onDone onDone = func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { retry := err == nil diff --git a/pkg/kgo/txn.go b/pkg/kgo/txn.go index 2fefff7f..f786c4a8 100644 --- a/pkg/kgo/txn.go +++ b/pkg/kgo/txn.go @@ -33,8 +33,6 @@ const ( type GroupTransactSession struct { cl *Client - cooperative bool - failMu sync.Mutex revoked bool @@ -90,8 +88,6 @@ func NewGroupTransactSession(opts ...Opt) (*GroupTransactSession, error) { return } - s.cooperative = cfg.cooperative() - userRevoked := cfg.onRevoked cfg.onRevoked = func(ctx context.Context, cl *Client, rev map[string][]int32) { s.failMu.Lock() @@ -100,7 +96,7 @@ func NewGroupTransactSession(opts ...Opt) (*GroupTransactSession, error) { return } - if s.cooperative && len(rev) == 0 && !s.revoked { + if cl.consumer.g.cooperative.Load() && len(rev) == 0 && !s.revoked { cl.cfg.logger.Log(LogLevelInfo, "transact session in on_revoke with nothing to revoke; allowing next commit") } else { cl.cfg.logger.Log(LogLevelInfo, "transact session in on_revoke; aborting next commit if we are currently in a transaction")