Skip to content

Commit

Permalink
kgo: do not returned paused topics/partitions after pausing
Browse files Browse the repository at this point in the history
This causes slightly more work (for simplicity, we drop everything
buffered and kill all in flight fetch requests), but this is much easier
to reason about.

Closes #489.
  • Loading branch information
twmb committed Jul 8, 2023
1 parent e224e90 commit c3b083b
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 12 deletions.
30 changes: 18 additions & 12 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,11 +545,9 @@ func (cl *Client) UpdateFetchMaxBytes(maxBytes, maxPartBytes int32) {
// PauseFetchTopics sets the client to no longer fetch the given topics and
// returns all currently paused topics. Paused topics persist until resumed.
// You can call this function with no topics to simply receive the list of
// currently paused topics.
//
// In contrast to the canonical Java client, this function does not clear
// anything currently buffered. Buffered fetches containing paused topics are
// still returned from polling.
// currently paused topics. Pausing topics drops everything currently buffered
// and kills any in flight fetch requests to ensure nothing that is paused
// can be returned anymore from polling.
//
// Pausing topics is independent from pausing individual partitions with the
// PauseFetchPartitions method. If you pause partitions for a topic with
Expand All @@ -564,6 +562,7 @@ func (cl *Client) PauseFetchTopics(topics ...string) []string {

c.pausedMu.Lock()
defer c.pausedMu.Unlock()
defer c.assignPartitions(nil, assignBumpSession, nil, fmt.Sprintf("pausing fetch topics %v", topics))

paused := c.clonePaused()
paused.addTopics(topics...)
Expand All @@ -574,11 +573,9 @@ func (cl *Client) PauseFetchTopics(topics ...string) []string {
// PauseFetchPartitions sets the client to no longer fetch the given partitions
// and returns all currently paused partitions. Paused partitions persist until
// resumed. You can call this function with no partitions to simply receive the
// list of currently paused partitions.
//
// In contrast to the canonical Java client, this function does not clear
// anything currently buffered. Buffered fetches containing paused partitions
// are still returned from polling.
// list of currently paused partitions. Pausing partitions drops everything
// currently buffered and kills any in flight fetch requests to ensure nothing
// that is paused can be returned anymore from polling.
//
// Pausing individual partitions is independent from pausing topics with the
// PauseFetchTopics method. If you pause partitions for a topic with
Expand All @@ -593,6 +590,7 @@ func (cl *Client) PauseFetchPartitions(topicPartitions map[string][]int32) map[s

c.pausedMu.Lock()
defer c.pausedMu.Unlock()
defer c.assignPartitions(nil, assignBumpSession, nil, fmt.Sprintf("pausing fetch partitions %v", topicPartitions))

paused := c.clonePaused()
paused.addPartitions(topicPartitions)
Expand Down Expand Up @@ -868,6 +866,10 @@ const (
// The counterpart to assignInvalidateMatching, assignSetMatching
// resets all matching partitions to the specified offset / epoch.
assignSetMatching

// For pausing, we want to drop anything inflight. We start a new
// session with the old tps.
assignBumpSession
)

func (h assignHow) String() string {
Expand All @@ -882,6 +884,8 @@ func (h assignHow) String() string {
return "unassigning and purging any partition matching the input topics"
case assignSetMatching:
return "reassigning any currently assigned matching partition to the input"
case assignBumpSession:
return "bumping internal consumer session to drop anything currently in flight"
}
return ""
}
Expand Down Expand Up @@ -952,6 +956,8 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
// if we had no session before, which is why we need to pass in
// our topicPartitions.
session = c.guardSessionChange(tps)
} else if how == assignBumpSession {
loadOffsets, tps = c.stopSession()
} else {
loadOffsets, _ = c.stopSession()

Expand Down Expand Up @@ -998,7 +1004,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
// assignment went straight to listing / epoch loading, and
// that list/epoch never finished.
switch how {
case assignWithoutInvalidating:
case assignWithoutInvalidating, assignBumpSession:
// Nothing to do -- this is handled above.
case assignInvalidateAll:
loadOffsets = listOrEpochLoads{}
Expand Down Expand Up @@ -1039,7 +1045,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how

// This assignment could contain nothing (for the purposes of
// invalidating active fetches), so we only do this if needed.
if len(assignments) == 0 || how == assignInvalidateMatching || how == assignPurgeMatching || how == assignSetMatching {
if len(assignments) == 0 || how == assignInvalidateMatching || how == assignPurgeMatching || how == assignSetMatching || how == assignBumpSession {
return
}

Expand Down
58 changes: 58 additions & 0 deletions pkg/kgo/consumer_direct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,3 +260,61 @@ func TestAddRemovePartitions(t *testing.T) {
t.Fatalf("expected to see v1 and v2, got %v", recs)
}
}

func TestPauseIssue489(t *testing.T) {
t.Parallel()

t1, cleanup := tmpTopicPartitions(t, 2)
defer cleanup()

cl, _ := NewClient(
getSeedBrokers(),
UnknownTopicRetries(-1),
DefaultProduceTopic(t1),
RecordPartitioner(ManualPartitioner()),
ConsumeTopics(t1),
FetchMaxWait(100*time.Millisecond),
)
defer cl.Close()

ctx, cancel := context.WithCancel(context.Background())
go func() {
var exit bool
var zeroOne uint8
for !exit {
r := StringRecord("v")
r.Partition = int32(zeroOne % 2)
zeroOne++
cl.Produce(ctx, r, func(r *Record, err error) {
if err == context.Canceled {
exit = true
}
})
}
}()
defer cancel()

for i := 0; i < 10; i++ {
var sawZero, sawOne bool
for !sawZero || !sawOne {
fs := cl.PollFetches(ctx)
fs.EachRecord(func(r *Record) {
sawZero = sawZero || r.Partition == 0
sawOne = sawOne || r.Partition == 1
})
}
cl.PauseFetchPartitions(map[string][]int32{t1: {0}})
sawZero, sawOne = false, false
for i := 0; i < 5; i++ {
fs := cl.PollFetches(ctx)
fs.EachRecord(func(r *Record) {
sawZero = sawZero || r.Partition == 0
sawOne = sawOne || r.Partition == 1
})
}
if sawZero {
t.Error("saw partition zero even though it was paused")
}
cl.ResumeFetchPartitions(map[string][]int32{t1: {0}})
}
}

0 comments on commit c3b083b

Please sign in to comment.