Skip to content

Commit

Permalink
consuming: avoid setting cursors to nil on LeaveGroup
Browse files Browse the repository at this point in the history
The client previously supported reassigning topics to consume / groups
at runtime with dedicated AssignPartitions and an AssignGroup method.
These have long since been removed.

The code used to "unset" the prior consumer before allowing a new
consumer. We now only unset when leaving a group.

Unsetting a code previously nil'd out the cursors in all sources, so
that new assignments would not add duplicate cursors. The logic was
greatly simplified when transitioning to assigning only in NewClient,
and now cursors are only ever added from the original assignments.
However, some logic around this was likely lost, and we were left with a
bug.

The bug involves three things, in order:
1) LeaveGroup
2) the cluster moves a partition from one broker to another
3) a metadata refresh occurs

This metadata refresh will attempt to migrate the cursor from one source
to another, but the source cursors have been nil'd out. This causes a
panic with an out of range index of -1, which is the length of the
cursors, minus 1.

The easy fix for this is to remove the old code responsible for
preventing duplicate cursors around reassignment. We no longer need to
worry about this, so removing things fixes the bug and simplifies the
code.
  • Loading branch information
twmb committed Oct 24, 2021
1 parent b415080 commit 864526a
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 51 deletions.
16 changes: 7 additions & 9 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,10 @@ type Client struct {
coordinatorsMu sync.Mutex
coordinators map[coordinatorKey]*coordinatorLoad

updateMetadataCh chan struct{}
updateMetadataNowCh chan struct{} // like above, but with high priority
blockingMetadataFnCh chan func()
metawait metawait
metadone chan struct{}
updateMetadataCh chan struct{}
updateMetadataNowCh chan struct{} // like above, but with high priority
metawait metawait
metadone chan struct{}
}

func (cl *Client) idempotent() bool { return !cl.cfg.disableIdempotency }
Expand Down Expand Up @@ -165,10 +164,9 @@ func NewClient(opts ...Opt) (*Client, error) {

coordinators: make(map[coordinatorKey]*coordinatorLoad),

updateMetadataCh: make(chan struct{}, 1),
updateMetadataNowCh: make(chan struct{}, 1),
blockingMetadataFnCh: make(chan func()),
metadone: make(chan struct{}),
updateMetadataCh: make(chan struct{}, 1),
updateMetadataNowCh: make(chan struct{}, 1),
metadone: make(chan struct{}),
}

compressor, err := newCompressor(cl.cfg.compression...)
Expand Down
21 changes: 0 additions & 21 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,27 +189,6 @@ func (c *consumer) unset() {
return
}

// Unsetting means all old cursors are useless. Before we return, we
// delete all old cursors. This is especially important in the context
// of a new assignment, which will create new cursors that may
// duplicate our old.
//
// Note that this is happening outside of the context of a valid
// consumer session, so we do not need to worry about much else.
//
// Since we are blocking metadata, sinksAndSources cannot be modified
// concurrently and we do not need a lock.
cl := c.cl
defer cl.blockingMetadataFn(func() {
for _, sns := range cl.sinksAndSources {
s := sns.source
s.cursorsMu.Lock()
s.cursors = nil
s.cursorsStart = 0
s.cursorsMu.Unlock()
}
})

wait := func() {} // wait AFTER we unlock
defer func() { wait() }()

Expand Down
21 changes: 0 additions & 21 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,6 @@ func (cl *Client) triggerUpdateMetadataNow() {
}
}

func (cl *Client) blockingMetadataFn(fn func()) {
select {
case cl.blockingMetadataFnCh <- fn:
case <-cl.ctx.Done():
}
}

// updateMetadataLoop updates metadata whenever the update ticker ticks,
// or whenever deliberately triggered.
func (cl *Client) updateMetadataLoop() {
Expand All @@ -120,7 +113,6 @@ func (cl *Client) updateMetadataLoop() {

ticker := time.NewTicker(cl.cfg.metadataMaxAge)
defer ticker.Stop()
loop:
for {
var now bool
select {
Expand All @@ -130,9 +122,6 @@ loop:
case <-cl.updateMetadataCh:
case <-cl.updateMetadataNowCh:
now = true
case fn := <-cl.blockingMetadataFnCh:
fn()
continue loop
}

var nowTries int
Expand All @@ -141,17 +130,13 @@ loop:
if !now {
if wait := cl.cfg.metadataMinAge - time.Since(lastAt); wait > 0 {
timer := time.NewTimer(wait)
prewait:
select {
case <-cl.ctx.Done():
timer.Stop()
return
case <-cl.updateMetadataNowCh:
timer.Stop()
case <-timer.C:
case fn := <-cl.blockingMetadataFnCh:
fn()
goto prewait
}
}
} else {
Expand All @@ -166,8 +151,6 @@ loop:
select {
case <-cl.updateMetadataCh:
case <-cl.updateMetadataNowCh:
case fn := <-cl.blockingMetadataFnCh:
fn()
default:
break out
}
Expand All @@ -188,15 +171,11 @@ loop:

consecutiveErrors++
after := time.NewTimer(cl.cfg.retryBackoff(consecutiveErrors))
backoff:
select {
case <-cl.ctx.Done():
after.Stop()
return
case <-after.C:
case fn := <-cl.blockingMetadataFnCh:
fn()
goto backoff
}

}
Expand Down

0 comments on commit 864526a

Please sign in to comment.