From c3b083b253849a0d6e26d2a36abddb43f5133584 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 8 Jul 2023 08:16:22 -0600 Subject: [PATCH] kgo: do not returned paused topics/partitions after pausing This causes slightly more work (for simplicity, we drop everything buffered and kill all in flight fetch requests), but this is much easier to reason about. Closes #489. --- pkg/kgo/consumer.go | 30 ++++++++++------- pkg/kgo/consumer_direct_test.go | 58 +++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 12 deletions(-) diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index f516a06d..ddeb81a2 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -545,11 +545,9 @@ func (cl *Client) UpdateFetchMaxBytes(maxBytes, maxPartBytes int32) { // PauseFetchTopics sets the client to no longer fetch the given topics and // returns all currently paused topics. Paused topics persist until resumed. // You can call this function with no topics to simply receive the list of -// currently paused topics. -// -// In contrast to the canonical Java client, this function does not clear -// anything currently buffered. Buffered fetches containing paused topics are -// still returned from polling. +// currently paused topics. Pausing topics drops everything currently buffered +// and kills any in flight fetch requests to ensure nothing that is paused +// can be returned anymore from polling. // // Pausing topics is independent from pausing individual partitions with the // PauseFetchPartitions method. If you pause partitions for a topic with @@ -564,6 +562,7 @@ func (cl *Client) PauseFetchTopics(topics ...string) []string { c.pausedMu.Lock() defer c.pausedMu.Unlock() + defer c.assignPartitions(nil, assignBumpSession, nil, fmt.Sprintf("pausing fetch topics %v", topics)) paused := c.clonePaused() paused.addTopics(topics...) @@ -574,11 +573,9 @@ func (cl *Client) PauseFetchTopics(topics ...string) []string { // PauseFetchPartitions sets the client to no longer fetch the given partitions // and returns all currently paused partitions. Paused partitions persist until // resumed. You can call this function with no partitions to simply receive the -// list of currently paused partitions. -// -// In contrast to the canonical Java client, this function does not clear -// anything currently buffered. Buffered fetches containing paused partitions -// are still returned from polling. +// list of currently paused partitions. Pausing partitions drops everything +// currently buffered and kills any in flight fetch requests to ensure nothing +// that is paused can be returned anymore from polling. // // Pausing individual partitions is independent from pausing topics with the // PauseFetchTopics method. If you pause partitions for a topic with @@ -593,6 +590,7 @@ func (cl *Client) PauseFetchPartitions(topicPartitions map[string][]int32) map[s c.pausedMu.Lock() defer c.pausedMu.Unlock() + defer c.assignPartitions(nil, assignBumpSession, nil, fmt.Sprintf("pausing fetch partitions %v", topicPartitions)) paused := c.clonePaused() paused.addPartitions(topicPartitions) @@ -868,6 +866,10 @@ const ( // The counterpart to assignInvalidateMatching, assignSetMatching // resets all matching partitions to the specified offset / epoch. assignSetMatching + + // For pausing, we want to drop anything inflight. We start a new + // session with the old tps. + assignBumpSession ) func (h assignHow) String() string { @@ -882,6 +884,8 @@ func (h assignHow) String() string { return "unassigning and purging any partition matching the input topics" case assignSetMatching: return "reassigning any currently assigned matching partition to the input" + case assignBumpSession: + return "bumping internal consumer session to drop anything currently in flight" } return "" } @@ -952,6 +956,8 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how // if we had no session before, which is why we need to pass in // our topicPartitions. session = c.guardSessionChange(tps) + } else if how == assignBumpSession { + loadOffsets, tps = c.stopSession() } else { loadOffsets, _ = c.stopSession() @@ -998,7 +1004,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how // assignment went straight to listing / epoch loading, and // that list/epoch never finished. switch how { - case assignWithoutInvalidating: + case assignWithoutInvalidating, assignBumpSession: // Nothing to do -- this is handled above. case assignInvalidateAll: loadOffsets = listOrEpochLoads{} @@ -1039,7 +1045,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how // This assignment could contain nothing (for the purposes of // invalidating active fetches), so we only do this if needed. - if len(assignments) == 0 || how == assignInvalidateMatching || how == assignPurgeMatching || how == assignSetMatching { + if len(assignments) == 0 || how == assignInvalidateMatching || how == assignPurgeMatching || how == assignSetMatching || how == assignBumpSession { return } diff --git a/pkg/kgo/consumer_direct_test.go b/pkg/kgo/consumer_direct_test.go index 34e3ad0b..8eadf5da 100644 --- a/pkg/kgo/consumer_direct_test.go +++ b/pkg/kgo/consumer_direct_test.go @@ -260,3 +260,61 @@ func TestAddRemovePartitions(t *testing.T) { t.Fatalf("expected to see v1 and v2, got %v", recs) } } + +func TestPauseIssue489(t *testing.T) { + t.Parallel() + + t1, cleanup := tmpTopicPartitions(t, 2) + defer cleanup() + + cl, _ := NewClient( + getSeedBrokers(), + UnknownTopicRetries(-1), + DefaultProduceTopic(t1), + RecordPartitioner(ManualPartitioner()), + ConsumeTopics(t1), + FetchMaxWait(100*time.Millisecond), + ) + defer cl.Close() + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + var exit bool + var zeroOne uint8 + for !exit { + r := StringRecord("v") + r.Partition = int32(zeroOne % 2) + zeroOne++ + cl.Produce(ctx, r, func(r *Record, err error) { + if err == context.Canceled { + exit = true + } + }) + } + }() + defer cancel() + + for i := 0; i < 10; i++ { + var sawZero, sawOne bool + for !sawZero || !sawOne { + fs := cl.PollFetches(ctx) + fs.EachRecord(func(r *Record) { + sawZero = sawZero || r.Partition == 0 + sawOne = sawOne || r.Partition == 1 + }) + } + cl.PauseFetchPartitions(map[string][]int32{t1: {0}}) + sawZero, sawOne = false, false + for i := 0; i < 5; i++ { + fs := cl.PollFetches(ctx) + fs.EachRecord(func(r *Record) { + sawZero = sawZero || r.Partition == 0 + sawOne = sawOne || r.Partition == 1 + }) + } + if sawZero { + t.Error("saw partition zero even though it was paused") + } + cl.ResumeFetchPartitions(map[string][]int32{t1: {0}}) + } +}