Skip to content

Commit

Permalink
bugfix kgo: do not default to eager if there is any eager balancer
Browse files Browse the repository at this point in the history
The previous cooperative check was wrong: if you used multiple group
balancers, if any were eager, the client would think you were eager
consuming.

However this bug did not affect all areas of group balancing, which
actually made the bug worse. If everything in the code went eager, then
the bug would just be "you saw more stop the world than was intended".
Instead, if the chosen balancer was cooperative, then cooperative
balancing would occur for eager consumers. On rebalance, partitions
would be lost, and then a cooperative rebalance would not occur, meaning
the partitions would be stuck until another rebalance.

We fix this by saving if we are cooperative based on the rebalancer that
is actually chosen.

This is an upgrade path that should happen once -- once to cooperative
-- but we can downgrade if the user adds a random eager balancer. This
is not supported per KIP-429 so we just warn log and continue as best we
can (likely with duplicate consumption).

Closes #366.
  • Loading branch information
twmb committed Mar 11, 2023
1 parent 29a94a1 commit a568b21
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 24 deletions.
10 changes: 0 additions & 10 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,16 +182,6 @@ type cfg struct {
commitCallback func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error)
}

// cooperative is a helper that returns whether all group balancers in the
// config are cooperative.
func (cfg *cfg) cooperative() bool {
cooperative := true
for _, balancer := range cfg.balancers {
cooperative = cooperative && balancer.IsCooperative()
}
return cooperative
}

func (cfg *cfg) validate() error {
if len(cfg.seedBrokers) == 0 {
return errors.New("config erroneously has no seed brokers")
Expand Down
28 changes: 19 additions & 9 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type groupConsumer struct {
cancel func()
manageDone chan struct{} // closed once when the manage goroutine quits

cooperative bool // true if all config balancers are cooperative
cooperative atomicBool // true if the group balancer chosen during Join is cooperative

// The data for topics that the user assigned. Metadata updates the
// atomic.Value in each pointer atomically. If we are consuming via
Expand Down Expand Up @@ -204,7 +204,6 @@ func (c *consumer) initGroup() {
reSeen: make(map[string]bool),

manageDone: make(chan struct{}),
cooperative: c.cl.cfg.cooperative(),
tps: newTopicsPartitions(),
rejoinCh: make(chan string, 1),
heartbeatForceCh: make(chan func(error)),
Expand Down Expand Up @@ -458,7 +457,7 @@ func (g *groupConsumer) leave() (wait func()) {
// returns the difference of g.nowAssigned and g.lastAssigned.
func (g *groupConsumer) diffAssigned() (added, lost map[string][]int32) {
nowAssigned := g.nowAssigned.clone()
if !g.cooperative {
if !g.cooperative.Load() {
return nowAssigned, nil
}

Expand Down Expand Up @@ -534,7 +533,7 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi
g.c.waitAndAddRebalance()
defer g.c.unaddRebalance()

if !g.cooperative || leaving { // stage == revokeThisSession if not cooperative
if !g.cooperative.Load() || leaving { // stage == revokeThisSession if not cooperative
// If we are an eager consumer, we stop fetching all of our
// current partitions as we will be revoking them.
g.c.mu.Lock()
Expand All @@ -545,7 +544,7 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi
}
g.c.mu.Unlock()

if !g.cooperative {
if !g.cooperative.Load() {
g.cfg.logger.Log(LogLevelInfo, "eager consumer revoking prior assigned partitions", "group", g.cfg.group, "revoking", g.nowAssigned.read())
} else {
g.cfg.logger.Log(LogLevelInfo, "cooperative consumer revoking prior assigned partitions because leaving group", "group", g.cfg.group, "revoking", g.nowAssigned.read())
Expand Down Expand Up @@ -687,7 +686,7 @@ func newAssignRevokeSession() *assignRevokeSession {
func (s *assignRevokeSession) prerevoke(g *groupConsumer, lost map[string][]int32) <-chan struct{} {
go func() {
defer close(s.prerevokeDone)
if g.cooperative && len(lost) > 0 {
if g.cooperative.Load() && len(lost) > 0 {
g.revoke(revokeLastSession, lost, false)
}
}()
Expand Down Expand Up @@ -778,7 +777,7 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() (string, error) {

// If cooperative consuming, we may have to resume fetches. See the
// comment on adjustCooperativeFetchOffsets.
if g.cooperative {
if g.cooperative.Load() {
added = g.adjustCooperativeFetchOffsets(added, lost)
}

Expand Down Expand Up @@ -836,7 +835,7 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio
// cooperative consumers rejoin the group immediately, and we want to
// detect that in 500ms rather than 3s.
var cooperativeFastCheck <-chan time.Time
if g.cooperative {
if g.cooperative.Load() {
cooperativeFastCheck = time.After(500 * time.Millisecond)
}

Expand Down Expand Up @@ -1141,6 +1140,17 @@ func (g *groupConsumer) handleJoinResp(resp *kmsg.JoinGroupResponse) (restart bo
protocol = *resp.Protocol
}

for _, balancer := range g.cfg.balancers {
if protocol == balancer.ProtocolName() {
cooperative := balancer.IsCooperative()
if !cooperative && g.cooperative.Load() {
g.cfg.logger.Log(LogLevelWarn, "downgrading from cooperative group to eager group, this is not supported per KIP-429!")
}
g.cooperative.Store(cooperative)
break
}
}

// KIP-345 has a fundamental limitation that KIP-814 also does not
// solve.
//
Expand Down Expand Up @@ -2608,7 +2618,7 @@ func (g *groupConsumer) commitAcrossRebalance(
// We retry four times, for five tries total: cooperative rebalancing
// uses two back to back rebalances, and the commit could
// pathologically end during both.
if g.cooperative && tries < 5 {
if g.cooperative.Load() && tries < 5 {
origDone := onDone
onDone = func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
retry := err == nil
Expand Down
6 changes: 1 addition & 5 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ const (
type GroupTransactSession struct {
cl *Client

cooperative bool

failMu sync.Mutex

revoked bool
Expand Down Expand Up @@ -90,8 +88,6 @@ func NewGroupTransactSession(opts ...Opt) (*GroupTransactSession, error) {
return
}

s.cooperative = cfg.cooperative()

userRevoked := cfg.onRevoked
cfg.onRevoked = func(ctx context.Context, cl *Client, rev map[string][]int32) {
s.failMu.Lock()
Expand All @@ -100,7 +96,7 @@ func NewGroupTransactSession(opts ...Opt) (*GroupTransactSession, error) {
return
}

if s.cooperative && len(rev) == 0 && !s.revoked {
if cl.consumer.g.cooperative.Load() && len(rev) == 0 && !s.revoked {
cl.cfg.logger.Log(LogLevelInfo, "transact session in on_revoke with nothing to revoke; allowing next commit")
} else {
cl.cfg.logger.Log(LogLevelInfo, "transact session in on_revoke; aborting next commit if we are currently in a transaction")
Expand Down

0 comments on commit a568b21

Please sign in to comment.