Skip to content

Commit

Permalink
Merge pull request #410 from twmb/409
Browse files Browse the repository at this point in the history
kgo groups: block join&sync while a commit is inflight
  • Loading branch information
twmb authored Apr 1, 2023
2 parents c8bb63c + ee70930 commit 858f9a0
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 96 deletions.
13 changes: 12 additions & 1 deletion pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1471,6 +1471,13 @@ func RequireStableFetchOffsets() GroupOpt {
//
// This function can largely replace any commit logic you may want to do in
// OnPartitionsRevoked.
//
// Lastly, note that this actually blocks any rebalance from calling
// OnPartitions{Assigned,Revoked,Lost}. If you are using a cooperative
// rebalancer such as CooperativeSticky, a rebalance can begin right before you
// poll, and you will still receive records because no partitions are lost yet.
// The in-progress rebalance only blocks if you are assigned new partitions or
// if any of your partitions are revoked.
func BlockRebalanceOnPoll() GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.blockRebalanceOnPoll = true }}
}
Expand Down Expand Up @@ -1674,5 +1681,9 @@ func GroupProtocol(protocol string) GroupOpt {
// AutoCommitCallback sets the callback to use if autocommitting is enabled.
// This overrides the default callback that logs errors and continues.
func AutoCommitCallback(fn func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error)) GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.commitCallback, cfg.setCommitCallback = fn, true }}
return groupOpt{func(cfg *cfg) {
if fn != nil {
cfg.commitCallback, cfg.setCommitCallback = fn, true
}
}}
}
178 changes: 84 additions & 94 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ type groupConsumer struct {
// We store this as a pointer for address comparisons.
external atomic.Value // *groupExternal

// See the big comment on `commit`. If we allow committing between
// join&sync, we occasionally see RebalanceInProgress or
// IllegalGeneration errors while cooperative consuming.
noCommitDuringJoinAndSync sync.RWMutex

//////////////
// mu block //
//////////////
Expand Down Expand Up @@ -991,6 +996,9 @@ func (g *groupConsumer) rejoin(why string) {
// Joins and then syncs, issuing the two slow requests in goroutines to allow
// for group cancelation to return early.
func (g *groupConsumer) joinAndSync(joinWhy string) error {
g.noCommitDuringJoinAndSync.Lock()
defer g.noCommitDuringJoinAndSync.Unlock()

g.cfg.logger.Log(LogLevelInfo, "joining group", "group", g.cfg.group)
g.leader.Store(false)
g.getAndResetExternalRejoin()
Expand Down Expand Up @@ -2037,10 +2045,16 @@ func (g *groupConsumer) loopCommit() {
// We always commit only the head. If we are autocommitting
// dirty, then updateUncommitted updates the head to dirty
// offsets.
g.noCommitDuringJoinAndSync.RLock()
g.mu.Lock()
if !g.blockAuto {
g.cfg.logger.Log(LogLevelDebug, "autocommitting", "group", g.cfg.group)
g.commit(g.ctx, g.getUncommittedLocked(true, false), g.cfg.commitCallback)
g.commit(g.ctx, g.getUncommittedLocked(true, false), func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
g.noCommitDuringJoinAndSync.RUnlock()
g.cfg.commitCallback(cl, req, resp, err)
})
} else {
g.noCommitDuringJoinAndSync.RUnlock()
}
g.mu.Unlock()
}
Expand Down Expand Up @@ -2426,6 +2440,42 @@ func (cl *Client) CommitOffsetsSync(
g.commitOffsetsSync(ctx, uncommitted, onDone)
}

// waitJoinSyncMu is a rather insane way to try to grab a lock, but also return
// early if we have to wait and the context is canceled.
func (g *groupConsumer) waitJoinSyncMu(ctx context.Context) error {
if g.noCommitDuringJoinAndSync.TryRLock() {
return nil
}

var (
blockJoinSyncCh = make(chan struct{})
mu sync.Mutex
returned bool
maybeRUnlock = func() {
mu.Lock()
defer mu.Unlock()
if returned {
g.noCommitDuringJoinAndSync.RUnlock()
}
returned = true
}
)

go func() {
g.noCommitDuringJoinAndSync.RLock()
close(blockJoinSyncCh)
maybeRUnlock()
}()

select {
case <-blockJoinSyncCh:
return nil
case <-ctx.Done():
maybeRUnlock()
return ctx.Err()
}
}

func (g *groupConsumer) commitOffsetsSync(
ctx context.Context,
uncommitted map[string]map[int32]EpochOffset,
Expand All @@ -2443,7 +2493,12 @@ func (g *groupConsumer) commitOffsetsSync(

g.syncCommitMu.Lock() // block all other concurrent commits until our OnDone is done.

if err := g.waitJoinSyncMu(ctx); err != nil {
onDone(g.cl, kmsg.NewPtrOffsetCommitRequest(), kmsg.NewPtrOffsetCommitResponse(), err)
return
}
unblockCommits := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
g.noCommitDuringJoinAndSync.RUnlock()
defer close(done)
defer g.syncCommitMu.Unlock()
onDone(cl, req, resp, err)
Expand Down Expand Up @@ -2519,18 +2574,26 @@ func (cl *Client) CommitOffsets(
}

g.syncCommitMu.RLock() // block sync commit, but allow other concurrent Commit to cancel us

unblockSyncCommit := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
defer g.syncCommitMu.RUnlock()
onDone(cl, req, resp, err)
}

if err := g.waitJoinSyncMu(ctx); err != nil {
onDone(g.cl, kmsg.NewPtrOffsetCommitRequest(), kmsg.NewPtrOffsetCommitResponse(), err)
return
}
unblockJoinSync := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
g.noCommitDuringJoinAndSync.RUnlock()
unblockSyncCommit(cl, req, resp, err)
}

g.mu.Lock()
defer g.mu.Unlock()

g.blockAuto = true
unblockAuto := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
unblockSyncCommit(cl, req, resp, err)
unblockJoinSync(cl, req, resp, err)
g.mu.Lock()
defer g.mu.Unlock()
g.blockAuto = false
Expand All @@ -2554,7 +2617,24 @@ func (g *groupConsumer) defaultRevoke(context.Context, *Client, map[string][]int
}
}

// commit is the first step of actually committing; see doc below.
// The actual logic to commit. This is called under two locks:
// - g.noCommitDuringJoinAndSync.RLock()
// - g.mu.Lock()
//
// By blocking the JoinGroup from being issued, or blocking the commit on join
// & sync finishing, we avoid RebalanceInProgress and IllegalGeneration. The
// former error happens if a commit arrives to the broker between the two, the
// latter error happens when a commit arrives to the broker with the old
// generation (it was in flight before sync finished).
//
// Practically, what this means is that a user's commits will be blocked if
// they try to commit between join and sync.
//
// For eager consuming, the user should not have any partitions to commit
// anyway. For cooperative consuming, a rebalance can happen after at any
// moment. We block only revokation aspects of rebalances with
// BlockRebalanceOnPoll; we want to allow the cooperative part of rebalancing
// to occur.
func (g *groupConsumer) commit(
ctx context.Context,
uncommitted map[string]map[int32]EpochOffset,
Expand Down Expand Up @@ -2587,103 +2667,13 @@ func (g *groupConsumer) commit(
uncommitted = dup
}

g.commitAcrossRebalance(ctx, uncommitted, onDone, 1)
}

// commitAcrossRebalances, called under the group mu, actually issues a commit.
// This function retries committing up to *once*. In standard mode of
// consuming, if a cooperative rebalance happens, a user may commit records
// while the client is rebalancing. This can cause ILLEGAL_GENERATION or
// REBALANCE_IN_PROGRESS errors. If we see either of those errors (once, to
// prevent spin looping), we re-issue the commit. See #137 for an example.
//
// We only try this logic for a cooperative group. Non-cooperative groups give
// up all their partitions on rebalance and do not continue to consume during
// the rebalancing.
func (g *groupConsumer) commitAcrossRebalance(
ctx context.Context,
uncommitted map[string]map[int32]EpochOffset,
onDone func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error),
tries int8,
) {
if onDone == nil { // note we must always call onDone
onDone = func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error) {}
}
if len(uncommitted) == 0 { // only empty if called thru autocommit / default revoke
// We have to do this concurrently because the expectation is
// that commit itself does not block.
go onDone(g.cl, kmsg.NewPtrOffsetCommitRequest(), kmsg.NewPtrOffsetCommitResponse(), nil)
return
}

// We retry four times, for five tries total: cooperative rebalancing
// uses two back to back rebalances, and the commit could
// pathologically end during both.
if g.cooperative.Load() && tries < 5 {
origDone := onDone
onDone = func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
retry := err == nil
var retryErr error

// Per package docs: if all partitions indicate rebalancing
// or illegal generation, we re-issue the commit.
if retry {
checkErr:
for i := range resp.Topics {
t := &resp.Topics[i]
for j := range t.Partitions {
p := &t.Partitions[j]
retryErr = kerr.ErrorForCode(p.ErrorCode)
retry = retry && (retryErr == kerr.RebalanceInProgress || retryErr == kerr.IllegalGeneration)
if !retry {
break checkErr
}
}
}
}

if retry {
// All errors are generation or rebalance. We now check
// if we are still assigned everything in the commit.
nowAssigned := g.nowAssigned.read()
mps := make(map[int32]struct{})
checkAssign:
for i := range resp.Topics {
t := &resp.Topics[i]
ps, exists := nowAssigned[t.Topic]
if retry = exists; !exists {
break checkAssign // no longer assigned this topic
}
for p := range mps {
delete(mps, p)
}
for _, p := range ps {
mps[p] = struct{}{}
}
for j := range t.Partitions {
p := &t.Partitions[j]
_, exists := mps[p.Partition]
if retry = exists; !exists {
break checkAssign // no longer assigned this partition
}
}
}
}

if retry {
go func() {
g.cl.cfg.logger.Log(LogLevelInfo, "CommitOffsets spanned a rebalance, we are cooperative and did not lose any partition we were trying to commit, recommitting", "err", retryErr)
time.Sleep(10 * time.Millisecond)
g.mu.Lock()
defer g.mu.Unlock()
g.commitAcrossRebalance(ctx, uncommitted, origDone, tries+1)
}()
} else {
origDone(cl, req, resp, err)
}
}
}

priorCancel := g.commitCancel
priorDone := g.commitDone

Expand Down
10 changes: 9 additions & 1 deletion pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1077,10 +1077,18 @@ func (cl *Client) commitTransactionOffsets(

unlockTxn()

if err := g.waitJoinSyncMu(ctx); err != nil {
onDone(kmsg.NewPtrTxnOffsetCommitRequest(), kmsg.NewPtrTxnOffsetCommitResponse(), err)
return nil
}
unblockJoinSync := func(req *kmsg.TxnOffsetCommitRequest, resp *kmsg.TxnOffsetCommitResponse, err error) {
g.noCommitDuringJoinAndSync.RUnlock()
onDone(req, resp, err)
}
g.mu.Lock()
defer g.mu.Unlock()

g.commitTxn(ctx, uncommitted, onDone)
g.commitTxn(ctx, uncommitted, unblockJoinSync)
return g
}

Expand Down

0 comments on commit 858f9a0

Please sign in to comment.