Skip to content

Commit

Permalink
kgo: ensure assignPartitions is locked when pausing topics/partitions
Browse files Browse the repository at this point in the history
Also adds a runtime panic if this is ever unlocked, since this is not
the first time this bug has occurred and it is very hard to test
against -- while a runtime panic is easy to catch in tests.
  • Loading branch information
twmb committed Jul 10, 2023
1 parent 6079141 commit 32ac27f
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,11 @@ func (cl *Client) PauseFetchTopics(topics ...string) []string {

c.pausedMu.Lock()
defer c.pausedMu.Unlock()
defer c.assignPartitions(nil, assignBumpSession, nil, fmt.Sprintf("pausing fetch topics %v", topics))
defer func() {
c.mu.Lock()
defer c.mu.Unlock()
c.assignPartitions(nil, assignBumpSession, nil, fmt.Sprintf("pausing fetch topics %v", topics))
}()

paused := c.clonePaused()
paused.addTopics(topics...)
Expand Down Expand Up @@ -590,7 +594,11 @@ func (cl *Client) PauseFetchPartitions(topicPartitions map[string][]int32) map[s

c.pausedMu.Lock()
defer c.pausedMu.Unlock()
defer c.assignPartitions(nil, assignBumpSession, nil, fmt.Sprintf("pausing fetch partitions %v", topicPartitions))
defer func() {
c.mu.Lock()
defer c.mu.Unlock()
c.assignPartitions(nil, assignBumpSession, nil, fmt.Sprintf("pausing fetch partitions %v", topicPartitions))
}()

paused := c.clonePaused()
paused.addPartitions(topicPartitions)
Expand Down Expand Up @@ -922,6 +930,10 @@ func (f fmtAssignment) String() string {
// assignPartitions, called under the consumer's mu, is used to set new
// cursors or add to the existing cursors.
func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how assignHow, tps *topicsPartitions, why string) {
if c.mu.TryLock() {
panic("assignPartitions called without holding the consumer lock, this is a bug in franz-go, please open an issue at github.com/twmb/franz-go")
}

// The internal code can avoid giving an assign reason in cases where
// the caller logs itself immediately before assigning. We only log if
// there is a reason.
Expand Down

0 comments on commit 32ac27f

Please sign in to comment.