Skip to content

Commit

Permalink
consumer: document that a nil / closed context is supported for polling
Browse files Browse the repository at this point in the history
This allows opportunistic tries for polling that do not hang.
  • Loading branch information
twmb committed Jul 26, 2021
1 parent 092d78b commit f82e5c6
Showing 1 changed file with 18 additions and 3 deletions.
21 changes: 18 additions & 3 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,17 +232,27 @@ func (c *consumer) addFakeReadyForDraining(topic string, partition int32, err er
}

// PollFetches waits for fetches to be available, returning as soon as any
// broker returns a fetch. If the ctx quits, this function quits.
// broker returns a fetch. If the context quits, this function quits. If the
// context is nil or is already canceled, this function will return immediately
// with any currently buffered records.
//
// It is important to check all partition errors in the returned fetches. If
// any partition has a fatal error and actually had no records, fake fetch will
// be injected with the error.
//
// If the client is closing or has closed, a fake fetch will be injected that
// has no topic, a partition of 0, and a partition error of ErrClientClosed.
// This can be used to detect if the client is closing and to break out of a
// poll loop.
func (cl *Client) PollFetches(ctx context.Context) Fetches {
return cl.PollRecords(ctx, 0)
}

// PollRecords waits for records to be available, returning as soon as any
// broker returns a record in a fetch. If the ctx quits, this function quits.
// broker returns records in a fetch. If the context quits, this function
// quits. If the context is nil or is already canceled, this function will
// return immediately with any currently buffered records.

// This returns a maximum of maxPollRecords total across all fetches, or
// returns all buffered records if maxPollRecords is <= 0.
//
Expand Down Expand Up @@ -321,9 +331,14 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches {
}

fill()
if len(fetches) > 0 {
if len(fetches) > 0 || ctx == nil {
return fetches
}
select {
case <-ctx.Done():
return fetches
default:
}

done := make(chan struct{})
quit := false
Expand Down

0 comments on commit f82e5c6

Please sign in to comment.