From a568b216c69cb585882085852983c7f50359894e Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Fri, 10 Mar 2023 20:32:25 -0700 Subject: [PATCH] bugfix kgo: do not default to eager if there is any eager balancer The previous cooperative check was wrong: if you used multiple group balancers, if any were eager, the client would think you were eager consuming. However this bug did not affect all areas of group balancing, which actually made the bug worse. If everything in the code went eager, then the bug would just be "you saw more stop the world than was intended". Instead, if the chosen balancer was cooperative, then cooperative balancing would occur for eager consumers. On rebalance, partitions would be lost, and then a cooperative rebalance would not occur, meaning the partitions would be stuck until another rebalance. We fix this by saving if we are cooperative based on the rebalancer that is actually chosen. This is an upgrade path that should happen once -- once to cooperative -- but we can downgrade if the user adds a random eager balancer. This is not supported per KIP-429 so we just warn log and continue as best we can (likely with duplicate consumption). Closes #366. --- pkg/kgo/config.go | 10 ---------- pkg/kgo/consumer_group.go | 28 +++++++++++++++++++--------- pkg/kgo/txn.go | 6 +----- 3 files changed, 20 insertions(+), 24 deletions(-) diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 3dca610b..3ccf2c4e 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -182,16 +182,6 @@ type cfg struct { commitCallback func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error) } -// cooperative is a helper that returns whether all group balancers in the -// config are cooperative. -func (cfg *cfg) cooperative() bool { - cooperative := true - for _, balancer := range cfg.balancers { - cooperative = cooperative && balancer.IsCooperative() - } - return cooperative -} - func (cfg *cfg) validate() error { if len(cfg.seedBrokers) == 0 { return errors.New("config erroneously has no seed brokers") diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 8c2cbcc6..1de0ce0b 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -24,7 +24,7 @@ type groupConsumer struct { cancel func() manageDone chan struct{} // closed once when the manage goroutine quits - cooperative bool // true if all config balancers are cooperative + cooperative atomicBool // true if the group balancer chosen during Join is cooperative // The data for topics that the user assigned. Metadata updates the // atomic.Value in each pointer atomically. If we are consuming via @@ -204,7 +204,6 @@ func (c *consumer) initGroup() { reSeen: make(map[string]bool), manageDone: make(chan struct{}), - cooperative: c.cl.cfg.cooperative(), tps: newTopicsPartitions(), rejoinCh: make(chan string, 1), heartbeatForceCh: make(chan func(error)), @@ -458,7 +457,7 @@ func (g *groupConsumer) leave() (wait func()) { // returns the difference of g.nowAssigned and g.lastAssigned. func (g *groupConsumer) diffAssigned() (added, lost map[string][]int32) { nowAssigned := g.nowAssigned.clone() - if !g.cooperative { + if !g.cooperative.Load() { return nowAssigned, nil } @@ -534,7 +533,7 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi g.c.waitAndAddRebalance() defer g.c.unaddRebalance() - if !g.cooperative || leaving { // stage == revokeThisSession if not cooperative + if !g.cooperative.Load() || leaving { // stage == revokeThisSession if not cooperative // If we are an eager consumer, we stop fetching all of our // current partitions as we will be revoking them. g.c.mu.Lock() @@ -545,7 +544,7 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi } g.c.mu.Unlock() - if !g.cooperative { + if !g.cooperative.Load() { g.cfg.logger.Log(LogLevelInfo, "eager consumer revoking prior assigned partitions", "group", g.cfg.group, "revoking", g.nowAssigned.read()) } else { g.cfg.logger.Log(LogLevelInfo, "cooperative consumer revoking prior assigned partitions because leaving group", "group", g.cfg.group, "revoking", g.nowAssigned.read()) @@ -687,7 +686,7 @@ func newAssignRevokeSession() *assignRevokeSession { func (s *assignRevokeSession) prerevoke(g *groupConsumer, lost map[string][]int32) <-chan struct{} { go func() { defer close(s.prerevokeDone) - if g.cooperative && len(lost) > 0 { + if g.cooperative.Load() && len(lost) > 0 { g.revoke(revokeLastSession, lost, false) } }() @@ -778,7 +777,7 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() (string, error) { // If cooperative consuming, we may have to resume fetches. See the // comment on adjustCooperativeFetchOffsets. - if g.cooperative { + if g.cooperative.Load() { added = g.adjustCooperativeFetchOffsets(added, lost) } @@ -836,7 +835,7 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio // cooperative consumers rejoin the group immediately, and we want to // detect that in 500ms rather than 3s. var cooperativeFastCheck <-chan time.Time - if g.cooperative { + if g.cooperative.Load() { cooperativeFastCheck = time.After(500 * time.Millisecond) } @@ -1141,6 +1140,17 @@ func (g *groupConsumer) handleJoinResp(resp *kmsg.JoinGroupResponse) (restart bo protocol = *resp.Protocol } + for _, balancer := range g.cfg.balancers { + if protocol == balancer.ProtocolName() { + cooperative := balancer.IsCooperative() + if !cooperative && g.cooperative.Load() { + g.cfg.logger.Log(LogLevelWarn, "downgrading from cooperative group to eager group, this is not supported per KIP-429!") + } + g.cooperative.Store(cooperative) + break + } + } + // KIP-345 has a fundamental limitation that KIP-814 also does not // solve. // @@ -2608,7 +2618,7 @@ func (g *groupConsumer) commitAcrossRebalance( // We retry four times, for five tries total: cooperative rebalancing // uses two back to back rebalances, and the commit could // pathologically end during both. - if g.cooperative && tries < 5 { + if g.cooperative.Load() && tries < 5 { origDone := onDone onDone = func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { retry := err == nil diff --git a/pkg/kgo/txn.go b/pkg/kgo/txn.go index 2fefff7f..f786c4a8 100644 --- a/pkg/kgo/txn.go +++ b/pkg/kgo/txn.go @@ -33,8 +33,6 @@ const ( type GroupTransactSession struct { cl *Client - cooperative bool - failMu sync.Mutex revoked bool @@ -90,8 +88,6 @@ func NewGroupTransactSession(opts ...Opt) (*GroupTransactSession, error) { return } - s.cooperative = cfg.cooperative() - userRevoked := cfg.onRevoked cfg.onRevoked = func(ctx context.Context, cl *Client, rev map[string][]int32) { s.failMu.Lock() @@ -100,7 +96,7 @@ func NewGroupTransactSession(opts ...Opt) (*GroupTransactSession, error) { return } - if s.cooperative && len(rev) == 0 && !s.revoked { + if cl.consumer.g.cooperative.Load() && len(rev) == 0 && !s.revoked { cl.cfg.logger.Log(LogLevelInfo, "transact session in on_revoke with nothing to revoke; allowing next commit") } else { cl.cfg.logger.Log(LogLevelInfo, "transact session in on_revoke; aborting next commit if we are currently in a transaction")