Skip to content

Commit

Permalink
client: unbuffer fetches on Close to allow mangeFetchConcurrency to quit
Browse files Browse the repository at this point in the history
The manageFetchConcurrency function only quits when active fetches hits
zero. When a fetch is buffered, it's considered active. If the client
closes, these buffered fetches will prevent manageFetchConcurrency from
quitting because the buffered fetches are considered "active". The
goroutine would only exit if a user calls PollFetches after Close.

We now call PollFetches at the end of Close to ensure that everything is
unbuffered. This also adds more comments as to flow ordering to help
reason about correctness.
  • Loading branch information
twmb committed Jan 6, 2022
1 parent d62f1be commit bb581f4
Showing 1 changed file with 10 additions and 0 deletions.
10 changes: 10 additions & 0 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,10 @@ func (cl *Client) updateBrokers(brokers []kmsg.MetadataResponseBroker) {
// must manually commit offsets before closing the client.
func (cl *Client) Close() {
cl.LeaveGroup()
// After LeaveGroup, consumers cannot consume anymore. LeaveGroup
// internally assigns noTopicsPartitions, which uses noConsmerSession,
// which prevents loopFetch from starting. Assigning also waits for the
// prior session to be complete, meaning loopFetch cannot be running.

// Now we kill the client context and all brokers, ensuring all
// requests fail. This will finish all producer callbacks and
Expand All @@ -539,6 +543,12 @@ func (cl *Client) Close() {
}

cl.failBufferedRecords(ErrClientClosed)

// We need one final poll: if any sources buffered a fetch, then the
// manageFetchConcurrency loop only exits when all fetches have been
// drained, because draining a fetch is what decrements an "active"
// fetch. PollFetches with `nil` is instant.
cl.PollFetches(nil)
}

// Request issues a request to Kafka, waiting for and returning the response.
Expand Down

0 comments on commit bb581f4

Please sign in to comment.