From 32ac27fdc2d9380ddaf5f73cca88775e670797fa Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 10 Jul 2023 13:52:52 -0600 Subject: [PATCH] kgo: ensure assignPartitions is locked when pausing topics/partitions Also adds a runtime panic if this is ever unlocked, since this is not the first time this bug has occurred and it is very hard to test against -- while a runtime panic is easy to catch in tests. --- pkg/kgo/consumer.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index ddeb81a2..093e064d 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -562,7 +562,11 @@ func (cl *Client) PauseFetchTopics(topics ...string) []string { c.pausedMu.Lock() defer c.pausedMu.Unlock() - defer c.assignPartitions(nil, assignBumpSession, nil, fmt.Sprintf("pausing fetch topics %v", topics)) + defer func() { + c.mu.Lock() + defer c.mu.Unlock() + c.assignPartitions(nil, assignBumpSession, nil, fmt.Sprintf("pausing fetch topics %v", topics)) + }() paused := c.clonePaused() paused.addTopics(topics...) @@ -590,7 +594,11 @@ func (cl *Client) PauseFetchPartitions(topicPartitions map[string][]int32) map[s c.pausedMu.Lock() defer c.pausedMu.Unlock() - defer c.assignPartitions(nil, assignBumpSession, nil, fmt.Sprintf("pausing fetch partitions %v", topicPartitions)) + defer func() { + c.mu.Lock() + defer c.mu.Unlock() + c.assignPartitions(nil, assignBumpSession, nil, fmt.Sprintf("pausing fetch partitions %v", topicPartitions)) + }() paused := c.clonePaused() paused.addPartitions(topicPartitions) @@ -922,6 +930,10 @@ func (f fmtAssignment) String() string { // assignPartitions, called under the consumer's mu, is used to set new // cursors or add to the existing cursors. func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how assignHow, tps *topicsPartitions, why string) { + if c.mu.TryLock() { + panic("assignPartitions called without holding the consumer lock, this is a bug in franz-go, please open an issue at github.com/twmb/franz-go") + } + // The internal code can avoid giving an assign reason in cases where // the caller logs itself immediately before assigning. We only log if // there is a reason.