Skip to content

Commit

Permalink
committing: internally retry on some errors when cooperative
Browse files Browse the repository at this point in the history
See #137. Cooperative consumers can consume during rebalancing. If they
commit at the start of a rebalance that ends after, then the commit will
fail with ILLEGAL_GENERATION (or sometimes, REBALANCE_IN_PROGRESS).

For cooperative specifically, if the commit fails but the consumer still
owns all partitions being committed, we now retry the commit once. This
should help alleviate commit errors that well written consumers are
currently running into.

We retry up to twice because the rebalancing when cooperative results in
two rebalances. A third failure is more unexpected.
  • Loading branch information
twmb committed Feb 28, 2022
1 parent 2d54cb9 commit bf0e5d7
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 39 deletions.
155 changes: 121 additions & 34 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@ type groupConsumer struct {
heartbeatForceCh chan func(error)

// The following two are only updated in the manager / join&sync loop
lastAssigned map[string][]int32 // only updated in join&sync loop
nowAssigned map[string][]int32 // only updated in join&sync loop
// The nowAssigned map is read when commits fail: if the commit fails
// with ILLEGAL_GENERATION and it contains only partitions that are in
// nowAssigned, we re-issue.
lastAssigned map[string][]int32
nowAssigned amtps

// Fetching ensures we continue fetching offsets across cooperative
// rebalance if an offset fetch returns early due to an immediate
Expand Down Expand Up @@ -298,12 +301,12 @@ func (g *groupConsumer) manage() {
// onRevoked, but since we are handling this case for
// the cooperative consumer we may as well just also
// include the eager consumer.
g.cfg.onRevoked(g.cl.ctx, g.cl, g.nowAssigned)
g.cfg.onRevoked(g.cl.ctx, g.cl, g.nowAssigned.read())
} else {
// Any other error is perceived as a fatal error,
// and we go into OnLost as appropriate.
if g.cfg.onLost != nil {
g.cfg.onLost(g.cl.ctx, g.cl, g.nowAssigned)
g.cfg.onLost(g.cl.ctx, g.cl, g.nowAssigned.read())
}
hook()
}
Expand All @@ -317,7 +320,7 @@ func (g *groupConsumer) manage() {
g.uncommitted = nil
g.mu.Unlock()

g.nowAssigned = nil
g.nowAssigned.store(nil)
g.lastAssigned = nil
g.fetching = nil

Expand Down Expand Up @@ -403,19 +406,20 @@ func (g *groupConsumer) leave() (wait func()) {

// returns the difference of g.nowAssigned and g.lastAssigned.
func (g *groupConsumer) diffAssigned() (added, lost map[string][]int32) {
nowAssigned := g.nowAssigned.clone()
if g.lastAssigned == nil {
return g.nowAssigned, nil
return nowAssigned, nil
}

added = make(map[string][]int32, len(g.nowAssigned))
lost = make(map[string][]int32, len(g.nowAssigned))
added = make(map[string][]int32, len(nowAssigned))
lost = make(map[string][]int32, len(nowAssigned))

// First, we diff lasts: any topic in last but not now is lost,
// otherwise, (1) new partitions are added, (2) common partitions are
// ignored, and (3) partitions no longer in now are lost.
lasts := make(map[int32]struct{}, 100)
for topic, lastPartitions := range g.lastAssigned {
nowPartitions, exists := g.nowAssigned[topic]
nowPartitions, exists := nowAssigned[topic]
if !exists {
lost[topic] = lastPartitions
continue
Expand Down Expand Up @@ -444,7 +448,7 @@ func (g *groupConsumer) diffAssigned() (added, lost map[string][]int32) {
}

// Finally, any new topics in now assigned are strictly added.
for topic, nowPartitions := range g.nowAssigned {
for topic, nowPartitions := range nowAssigned {
if _, exists := g.lastAssigned[topic]; !exists {
added[topic] = nowPartitions
}
Expand Down Expand Up @@ -488,14 +492,14 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi
g.c.mu.Unlock()

if !g.cooperative {
g.cfg.logger.Log(LogLevelInfo, "eager consumer revoking prior assigned partitions", "group", g.cfg.group, "revoking", g.nowAssigned)
g.cfg.logger.Log(LogLevelInfo, "eager consumer revoking prior assigned partitions", "group", g.cfg.group, "revoking", g.nowAssigned.read())
} else {
g.cfg.logger.Log(LogLevelInfo, "cooperative consumer revoking prior assigned partitions because leaving group", "group", g.cfg.group, "revoking", g.nowAssigned)
g.cfg.logger.Log(LogLevelInfo, "cooperative consumer revoking prior assigned partitions because leaving group", "group", g.cfg.group, "revoking", g.nowAssigned.read())
}
if g.cfg.onRevoked != nil {
g.cfg.onRevoked(g.cl.ctx, g.cl, g.nowAssigned)
g.cfg.onRevoked(g.cl.ctx, g.cl, g.nowAssigned.read())
}
g.nowAssigned = nil
g.nowAssigned.store(nil)

// After nilling uncommitted here, nothing should recreate
// uncommitted until a future fetch after the group is
Expand All @@ -521,17 +525,19 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi
// which causes a new metadata request -- in short, this could
// be concurrent with a metadata findNewAssignments, so we
// lock.
g.mu.Lock()
for topic, partitions := range g.nowAssigned {
if _, exists := g.using[topic]; !exists {
if lost == nil {
lost = make(map[string][]int32)
g.nowAssigned.write(func(nowAssigned map[string][]int32) {
g.mu.Lock()
for topic, partitions := range nowAssigned {
if _, exists := g.using[topic]; !exists {
if lost == nil {
lost = make(map[string][]int32)
}
lost[topic] = partitions
delete(nowAssigned, topic)
}
lost[topic] = partitions
delete(g.nowAssigned, topic)
}
}
g.mu.Unlock()
g.mu.Unlock()
})
}

if len(lost) > 0 {
Expand Down Expand Up @@ -685,7 +691,7 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() (string, error) {

s := newAssignRevokeSession()
added, lost := g.diffAssigned()
g.cfg.logger.Log(LogLevelInfo, "new group session begun", "group", g.cfg.group, "added", tpsFmt(added), "lost", tpsFmt(lost))
g.cfg.logger.Log(LogLevelInfo, "new group session begun", "group", g.cfg.group, "added", mtps(added), "lost", mtps(lost))
s.prerevoke(g, lost) // for cooperative consumers

// Since we have joined the group, we immediately begin heartbeating.
Expand Down Expand Up @@ -1192,14 +1198,14 @@ func (g *groupConsumer) handleSyncResp(protocol string, resp *kmsg.SyncGroupResp
return err
}

g.cfg.logger.Log(LogLevelInfo, "synced", "group", g.cfg.group, "assigned", tpsFmt(assigned))
g.cfg.logger.Log(LogLevelInfo, "synced", "group", g.cfg.group, "assigned", mtps(assigned))

// Past this point, we will fall into the setupAssigned prerevoke code,
// meaning for cooperative, we will revoke what we need to.
if g.cooperative {
g.lastAssigned = g.nowAssigned
g.lastAssigned = g.nowAssigned.clone()
}
g.nowAssigned = assigned
g.nowAssigned.store(assigned)
return nil
}

Expand All @@ -1210,10 +1216,7 @@ func (g *groupConsumer) joinGroupProtocols() []kmsg.JoinGroupRequestProtocol {
for topic := range g.using {
topics = append(topics, topic)
}
nowDup := make(map[string][]int32) // deep copy to allow modifications
for topic, partitions := range g.nowAssigned {
nowDup[topic] = append([]int32(nil), partitions...)
}
nowDup := g.nowAssigned.clone() // deep copy to allow modifications
gen := g.generation

g.mu.Unlock()
Expand Down Expand Up @@ -2333,13 +2336,30 @@ func (g *groupConsumer) defaultRevoke(context.Context, *Client, map[string][]int
}
}

// commit is the logic for Commit; see Commit's documentation
//
// This is called under the groupConsumer's lock.
// commit is the first step of actually committing; see doc below.
func (g *groupConsumer) commit(
ctx context.Context,
uncommitted map[string]map[int32]EpochOffset,
onDone func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error),
) {
g.commitAcrossRebalance(ctx, uncommitted, onDone, 1)
}

// commitAcrossRebalances, called under the group mu, actually issues a commit.
// This function retries committing up to *once*. In standard mode of
// consuming, if a cooperative rebalance happens, a user may commit records
// while the client is rebalancing. This can cause ILLEGAL_GENERATION or
// REBALANCE_IN_PROGRESS errors. If we see either of those errors (once, to
// prevent spin looping), we re-issue the commit. See #137 for an example.
//
// We only try this logic for a cooperative group. Non-cooperative groups give
// up all their partitions on rebalance and do not continue to consume during
// the rebalancing.
func (g *groupConsumer) commitAcrossRebalance(
ctx context.Context,
uncommitted map[string]map[int32]EpochOffset,
onDone func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error),
tries int8,
) {
if onDone == nil { // note we must always call onDone
onDone = func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error) {}
Expand All @@ -2351,6 +2371,73 @@ func (g *groupConsumer) commit(
return
}

// We retry up to twice (three tries total): cooperative rebalancing
// uses two back to back rebalances, and the commit could
// pathologically end during both. A third failure is unexpected.
if g.cooperative && tries < 3 {
origDone := onDone
onDone = func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
retry := err == nil
var retryErr error

// Per package docs: if all partitions indicate rebalancing
// or illegal generation, we re-issue the commit.
if retry {
checkErr:
for i := range resp.Topics {
t := &resp.Topics[i]
for j := range t.Partitions {
p := &t.Partitions[j]
retryErr = kerr.ErrorForCode(p.ErrorCode)
retry = retry && (retryErr == kerr.RebalanceInProgress || retryErr == kerr.IllegalGeneration)
if !retry {
break checkErr
}
}
}
}

if retry {
// All errors are generation or rebalance. We now check
// if we are still assigned everything in the commit.
nowAssigned := g.nowAssigned.read()
mps := make(map[int32]struct{})
checkAssign:
for i := range resp.Topics {
t := &resp.Topics[i]
ps, exists := nowAssigned[t.Topic]
if retry = exists; !exists {
break checkAssign // no longer assigned this topic
}
for p := range mps {
delete(mps, p)
}
for _, p := range ps {
mps[p] = struct{}{}
}
for j := range t.Partitions {
p := &t.Partitions[j]
_, exists := mps[p.Partition]
if retry = exists; !exists {
break checkAssign // no longer assigned this partition
}
}
}
}

if retry {
go func() {
g.cl.cfg.logger.Log(LogLevelInfo, "CommitOffsets spanned a rebalance, we are cooperative and did not lose any partition we were trying to commit, recommitting", "err", retryErr)
g.mu.Lock()
defer g.mu.Unlock()
g.commitAcrossRebalance(ctx, uncommitted, origDone, tries+1)
}()
} else {
origDone(cl, req, resp, err)
}
}
}

priorCancel := g.commitCancel
priorDone := g.commitDone

Expand Down
49 changes: 44 additions & 5 deletions pkg/kgo/topics_and_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kgo

import (
"fmt"
"sort"
"strings"
"sync"
"sync/atomic"
Expand All @@ -17,15 +18,53 @@ func dupmsi32(m map[string]int32) map[string]int32 {
return d
}

type tpsFmt map[string][]int32
// "Atomic map of topic partitions", for lack of a better name at this point.
type amtps struct {
v atomic.Value
}

func (a *amtps) read() map[string][]int32 {
v := a.v.Load()
if v == nil {
return nil
}
return v.(map[string][]int32)
}

func (a *amtps) write(fn func(map[string][]int32)) {
dup := a.clone()
fn(dup)
a.store(dup)
}

func (f tpsFmt) String() string {
func (a *amtps) clone() map[string][]int32 {
orig := a.read()
dup := make(map[string][]int32, len(orig))
for t, ps := range orig {
dup[t] = append(dup[t], ps...)
}
return dup
}

func (a *amtps) store(m map[string][]int32) { a.v.Store(m) }

type mtps map[string][]int32

func (m mtps) String() string {
var sb strings.Builder
var topicsWritten int
for topic, partitions := range f {
ts := make([]string, 0, len(m))
var ps []int32
for t := range m {
ts = append(ts, t)
}
sort.Strings(ts)
for _, t := range ts {
ps = append(ps[:0], m[t]...)
sort.Slice(ps, func(i, j int) bool { return ps[i] < ps[j] })
topicsWritten++
fmt.Fprintf(&sb, "%s%v", topic, partitions)
if topicsWritten < len(f) {
fmt.Fprintf(&sb, "%s%v", t, ps)
if topicsWritten < len(m) {
sb.WriteString(", ")
}
}
Expand Down

0 comments on commit bf0e5d7

Please sign in to comment.