Skip to content

Commit

Permalink
Merge pull request #425 from twmb/group_resilience
Browse files Browse the repository at this point in the history
group consuming: block LeaveGroup between join&sync
  • Loading branch information
twmb authored Apr 4, 2023
2 parents 87c788c + d833f61 commit c5d9afb
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 14 deletions.
32 changes: 20 additions & 12 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,11 @@ type groupConsumer struct {

// LeaveGroup leaves a group if in one. Calling the client's Close function
// also leaves a group, so this is only necessary to call if you plan to leave
// the group and continue using the client.
//
// If you have overridden the default revoke, you must manually commit offsets
// before leaving the group.
// the group and continue using the client. Note that if a rebalance is in
// progress, this function waits for the rebalance to complete before the group
// can be left. This is necessary to allow you to safely issue one final offset
// commit in OnPartitionsRevoked. If you have overridden the default revoke,
// you must manually commit offsets before leaving the group.
//
// If you have configured the group with an InstanceID, this does not leave the
// group. With instance IDs, it is expected that clients will restart and
Expand Down Expand Up @@ -417,15 +418,14 @@ func (g *groupConsumer) leave() (wait func()) {
wasDead := g.dying
g.dying = true
wasManaging := g.managing
g.cancel()
g.mu.Unlock()

done := make(chan struct{})

go func() {
defer close(done)

g.cancel()

if wasManaging {
// We want to wait for the manage goroutine to be done
// so that we call the user's on{Assign,RevokeLost}.
Expand Down Expand Up @@ -1034,15 +1034,23 @@ start:
joined = make(chan struct{})
)

// NOTE: For this function, we have to use the client context, not the
// group context. We want to allow people to issue one final commit in
// OnPartitionsRevoked before leaving a group, so we need to block
// commits during join&sync. If we used the group context, we would be
// cancled immediately when leaving while a join or sync is inflight,
// and then our final commit will receive either REBALANCE_IN_PROGRESS
// or ILLEGAL_GENERATION.

go func() {
defer close(joined)
joinResp, err = joinReq.RequestWith(g.ctx, g.cl)
joinResp, err = joinReq.RequestWith(g.cl.ctx, g.cl)
}()

select {
case <-joined:
case <-g.ctx.Done():
return g.ctx.Err() // group killed
case <-g.cl.ctx.Done():
return g.cl.ctx.Err() // client closed
}
if err != nil {
return err
Expand Down Expand Up @@ -1075,13 +1083,13 @@ start:
g.cfg.logger.Log(LogLevelInfo, "syncing", "group", g.cfg.group, "protocol_type", g.cfg.protocol, "protocol", protocol)
go func() {
defer close(synced)
syncResp, err = syncReq.RequestWith(g.ctx, g.cl)
syncResp, err = syncReq.RequestWith(g.cl.ctx, g.cl)
}()

select {
case <-synced:
case <-g.ctx.Done():
return g.ctx.Err()
case <-g.cl.ctx.Done():
return g.cl.ctx.Err()
}
if err != nil {
return err
Expand Down
5 changes: 4 additions & 1 deletion pkg/kgo/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,13 @@ func (c *testConsumer) etl(etlsBeforeQuit int) {

// We poll with a short timeout so that we do not hang waiting
// at the end if another consumer hit the limit.
//
// If the host is slow, the context could be canceled immediately
// before we poll.
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
fetches := cl.PollRecords(ctx, 100)
cancel()
if fetches.Err() == context.DeadlineExceeded || fetches.Err() == ErrClientClosed {
if err := fetches.Err(); err == context.DeadlineExceeded || err == context.Canceled || err == ErrClientClosed {
if consumed := int(c.consumed.Load()); consumed == testRecordLimit {
return
} else if consumed > testRecordLimit {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (c *testConsumer) transact(txnsBeforeQuit int) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
fetches := txnSess.PollFetches(ctx)
cancel()
if fetches.Err() == context.DeadlineExceeded || fetches.Err() == ErrClientClosed {
if err := fetches.Err(); err == context.DeadlineExceeded || err == context.Canceled || err == ErrClientClosed {
if consumed := int(c.consumed.Load()); consumed == testRecordLimit {
return
} else if consumed > testRecordLimit {
Expand Down

0 comments on commit c5d9afb

Please sign in to comment.