Skip to content

Commit

Permalink
group consuming: add reasons to JoinGroup, LeaveGroup per KIP-800
Browse files Browse the repository at this point in the history
We add no new APIs for this yet, but we now log:
- normal join (beginning to manage)
- normal rejoin
- backoff error rejoin
- force rejoin with reason
as well,
- normal leave group
  • Loading branch information
twmb committed Mar 1, 2022
1 parent 0bfaf64 commit 10ee8dd
Showing 1 changed file with 29 additions and 12 deletions.
41 changes: 29 additions & 12 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,14 @@ func (g *groupConsumer) manage() {
g.cfg.logger.Log(LogLevelInfo, "beginning to manage the group lifecycle", "group", g.cfg.group)

var consecutiveErrors int
joinWhy := "beginning to manage the group lifecycle"
for {
err := g.joinAndSync()
if joinWhy == "" {
joinWhy = "rejoining from normal rebalance"
}
err := g.joinAndSync(joinWhy)
if err == nil {
if err = g.setupAssignedAndHeartbeat(); err != nil {
if joinWhy, err = g.setupAssignedAndHeartbeat(); err != nil {
if errors.Is(err, kerr.RebalanceInProgress) {
err = nil
}
Expand All @@ -270,6 +274,7 @@ func (g *groupConsumer) manage() {
consecutiveErrors = 0
continue
}
joinWhy = "rejoining after we previously errored and backed off"

hook := func() {
g.cfg.hooks.each(func(h Hook) {
Expand Down Expand Up @@ -387,6 +392,7 @@ func (g *groupConsumer) leave() (wait func()) {
req.MemberID = g.memberID
member := kmsg.NewLeaveGroupRequestMember()
member.MemberID = g.memberID
member.Reason = kmsg.StringPtr("client leaving group per normal operation")
req.Members = append(req.Members, member)
req.RequestWith(g.cl.ctx, g.cl)
}
Expand Down Expand Up @@ -669,8 +675,12 @@ func (s *assignRevokeSession) revoke(g *groupConsumer, leaving bool) <-chan stru
// - which ensures that pre revoking is complete
// - fetching is complete
// - heartbeating is complete
func (g *groupConsumer) setupAssignedAndHeartbeat() error {
hbErrCh := make(chan error, 1)
func (g *groupConsumer) setupAssignedAndHeartbeat() (string, error) {
type hbquit struct {
rejoinWhy string
err error
}
hbErrCh := make(chan hbquit, 1)
fetchErrCh := make(chan error, 1)

s := newAssignRevokeSession()
Expand All @@ -685,7 +695,8 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() error {
go func() {
defer cancel() // potentially kill offset fetching
g.cfg.logger.Log(LogLevelInfo, "beginning heartbeat loop", "group", g.cfg.group)
hbErrCh <- g.heartbeat(fetchErrCh, s)
rejoinWhy, err := g.heartbeat(fetchErrCh, s)
hbErrCh <- hbquit{rejoinWhy, err}
}()

// We immediately begin fetching offsets. We want to wait until the
Expand Down Expand Up @@ -735,7 +746,9 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() error {

// Finally, we simply return whatever the heartbeat error is. This will
// be the fetch offset error if that function is what killed this.
return <-hbErrCh

done := <-hbErrCh
return done.rejoinWhy, done.err
}

// heartbeat issues heartbeat requests to Kafka for the duration of a group
Expand All @@ -748,7 +761,7 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() error {
//
// If the offset fetch is successful, then we basically sit in this function
// until a heartbeat errors or we, being the leader, decide to re-join.
func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSession) error {
func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSession) (string, error) {
ticker := time.NewTicker(g.cfg.heartbeatInterval)
defer ticker.Stop()

Expand All @@ -762,6 +775,7 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio

var metadone, revoked <-chan struct{}
var heartbeat, didMetadone, didRevoke bool
var rejoinWhy string
var lastErr error

ctxCh := g.ctx.Done()
Expand All @@ -777,10 +791,10 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio
heartbeat = true
case force = <-g.heartbeatForceCh:
heartbeat = true
case why := <-g.rejoinCh:
case rejoinWhy = <-g.rejoinCh:
// If a metadata update changes our subscription,
// we just pretend we are rebalancing.
g.cfg.logger.Log(LogLevelInfo, "forced rejoin quitting heartbeat loop", "why", why)
g.cfg.logger.Log(LogLevelInfo, "forced rejoin quitting heartbeat loop", "why", rejoinWhy)
err = kerr.RebalanceInProgress
case err = <-fetchErrCh:
fetchErrCh = nil
Expand Down Expand Up @@ -821,7 +835,7 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio
// revoke, we wait for it to complete regardless of any future
// error.
if didMetadone && didRevoke {
return lastErr
return rejoinWhy, lastErr
}

if err == nil {
Expand Down Expand Up @@ -849,7 +863,7 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio
// to be done so that we avoid calling onLost
// concurrently.
if !errors.Is(err, kerr.RebalanceInProgress) && revoked == nil {
return err
return "", err
}

// Now we call the user provided revoke callback, even
Expand Down Expand Up @@ -909,7 +923,7 @@ func (g *groupConsumer) rejoin(why string) {

// Joins and then syncs, issuing the two slow requests in goroutines to allow
// for group cancelation to return early.
func (g *groupConsumer) joinAndSync() error {
func (g *groupConsumer) joinAndSync(joinWhy string) error {
g.cfg.logger.Log(LogLevelInfo, "joining group", "group", g.cfg.group)
g.leader.set(false)
g.getAndResetExternalRejoin()
Expand All @@ -936,6 +950,9 @@ start:
joinReq.MemberID = g.memberID
joinReq.InstanceID = g.cfg.instanceID
joinReq.Protocols = g.joinGroupProtocols()
if joinWhy != "" {
joinReq.Reason = kmsg.StringPtr(joinWhy)
}
var (
joinResp *kmsg.JoinGroupResponse
err error
Expand Down

0 comments on commit 10ee8dd

Please sign in to comment.