Skip to content

Commit

Permalink
Merge pull request #384 from twmb/366
Browse files Browse the repository at this point in the history
bugfix kgo: do not default to eager if there is any eager balancer
  • Loading branch information
twmb authored Mar 13, 2023
2 parents f4ed65a + a568b21 commit 14b89d6
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 24 deletions.
10 changes: 0 additions & 10 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,6 @@ type cfg struct {
commitCallback func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error)
}

// cooperative is a helper that returns whether all group balancers in the
// config are cooperative.
func (cfg *cfg) cooperative() bool {
cooperative := true
for _, balancer := range cfg.balancers {
cooperative = cooperative && balancer.IsCooperative()
}
return cooperative
}

func (cfg *cfg) validate() error {
if len(cfg.seedBrokers) == 0 {
return errors.New("config erroneously has no seed brokers")
Expand Down
28 changes: 19 additions & 9 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type groupConsumer struct {
cancel func()
manageDone chan struct{} // closed once when the manage goroutine quits

cooperative bool // true if all config balancers are cooperative
cooperative atomicBool // true if the group balancer chosen during Join is cooperative

// The data for topics that the user assigned. Metadata updates the
// atomic.Value in each pointer atomically. If we are consuming via
Expand Down Expand Up @@ -204,7 +204,6 @@ func (c *consumer) initGroup() {
reSeen: make(map[string]bool),

manageDone: make(chan struct{}),
cooperative: c.cl.cfg.cooperative(),
tps: newTopicsPartitions(),
rejoinCh: make(chan string, 1),
heartbeatForceCh: make(chan func(error)),
Expand Down Expand Up @@ -458,7 +457,7 @@ func (g *groupConsumer) leave() (wait func()) {
// returns the difference of g.nowAssigned and g.lastAssigned.
func (g *groupConsumer) diffAssigned() (added, lost map[string][]int32) {
nowAssigned := g.nowAssigned.clone()
if !g.cooperative {
if !g.cooperative.Load() {
return nowAssigned, nil
}

Expand Down Expand Up @@ -534,7 +533,7 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi
g.c.waitAndAddRebalance()
defer g.c.unaddRebalance()

if !g.cooperative || leaving { // stage == revokeThisSession if not cooperative
if !g.cooperative.Load() || leaving { // stage == revokeThisSession if not cooperative
// If we are an eager consumer, we stop fetching all of our
// current partitions as we will be revoking them.
g.c.mu.Lock()
Expand All @@ -545,7 +544,7 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi
}
g.c.mu.Unlock()

if !g.cooperative {
if !g.cooperative.Load() {
g.cfg.logger.Log(LogLevelInfo, "eager consumer revoking prior assigned partitions", "group", g.cfg.group, "revoking", g.nowAssigned.read())
} else {
g.cfg.logger.Log(LogLevelInfo, "cooperative consumer revoking prior assigned partitions because leaving group", "group", g.cfg.group, "revoking", g.nowAssigned.read())
Expand Down Expand Up @@ -687,7 +686,7 @@ func newAssignRevokeSession() *assignRevokeSession {
func (s *assignRevokeSession) prerevoke(g *groupConsumer, lost map[string][]int32) <-chan struct{} {
go func() {
defer close(s.prerevokeDone)
if g.cooperative && len(lost) > 0 {
if g.cooperative.Load() && len(lost) > 0 {
g.revoke(revokeLastSession, lost, false)
}
}()
Expand Down Expand Up @@ -778,7 +777,7 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() (string, error) {

// If cooperative consuming, we may have to resume fetches. See the
// comment on adjustCooperativeFetchOffsets.
if g.cooperative {
if g.cooperative.Load() {
added = g.adjustCooperativeFetchOffsets(added, lost)
}

Expand Down Expand Up @@ -836,7 +835,7 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio
// cooperative consumers rejoin the group immediately, and we want to
// detect that in 500ms rather than 3s.
var cooperativeFastCheck <-chan time.Time
if g.cooperative {
if g.cooperative.Load() {
cooperativeFastCheck = time.After(500 * time.Millisecond)
}

Expand Down Expand Up @@ -1141,6 +1140,17 @@ func (g *groupConsumer) handleJoinResp(resp *kmsg.JoinGroupResponse) (restart bo
protocol = *resp.Protocol
}

for _, balancer := range g.cfg.balancers {
if protocol == balancer.ProtocolName() {
cooperative := balancer.IsCooperative()
if !cooperative && g.cooperative.Load() {
g.cfg.logger.Log(LogLevelWarn, "downgrading from cooperative group to eager group, this is not supported per KIP-429!")
}
g.cooperative.Store(cooperative)
break
}
}

// KIP-345 has a fundamental limitation that KIP-814 also does not
// solve.
//
Expand Down Expand Up @@ -2608,7 +2618,7 @@ func (g *groupConsumer) commitAcrossRebalance(
// We retry four times, for five tries total: cooperative rebalancing
// uses two back to back rebalances, and the commit could
// pathologically end during both.
if g.cooperative && tries < 5 {
if g.cooperative.Load() && tries < 5 {
origDone := onDone
onDone = func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
retry := err == nil
Expand Down
6 changes: 1 addition & 5 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ const (
type GroupTransactSession struct {
cl *Client

cooperative bool

failMu sync.Mutex

revoked bool
Expand Down Expand Up @@ -90,8 +88,6 @@ func NewGroupTransactSession(opts ...Opt) (*GroupTransactSession, error) {
return
}

s.cooperative = cfg.cooperative()

userRevoked := cfg.onRevoked
cfg.onRevoked = func(ctx context.Context, cl *Client, rev map[string][]int32) {
s.failMu.Lock()
Expand All @@ -100,7 +96,7 @@ func NewGroupTransactSession(opts ...Opt) (*GroupTransactSession, error) {
return
}

if s.cooperative && len(rev) == 0 && !s.revoked {
if cl.consumer.g.cooperative.Load() && len(rev) == 0 && !s.revoked {
cl.cfg.logger.Log(LogLevelInfo, "transact session in on_revoke with nothing to revoke; allowing next commit")
} else {
cl.cfg.logger.Log(LogLevelInfo, "transact session in on_revoke; aborting next commit if we are currently in a transaction")
Expand Down

0 comments on commit 14b89d6

Please sign in to comment.