From f82e5c64f486d0e7dc2aa090f0205115a700c98b Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 26 Jul 2021 13:57:36 -0600 Subject: [PATCH] consumer: document that a nil / closed context is supported for polling This allows opportunistic tries for polling that do not hang. --- pkg/kgo/consumer.go | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 32dfada9..7c3bab39 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -232,17 +232,27 @@ func (c *consumer) addFakeReadyForDraining(topic string, partition int32, err er } // PollFetches waits for fetches to be available, returning as soon as any -// broker returns a fetch. If the ctx quits, this function quits. +// broker returns a fetch. If the context quits, this function quits. If the +// context is nil or is already canceled, this function will return immediately +// with any currently buffered records. // // It is important to check all partition errors in the returned fetches. If // any partition has a fatal error and actually had no records, fake fetch will // be injected with the error. +// +// If the client is closing or has closed, a fake fetch will be injected that +// has no topic, a partition of 0, and a partition error of ErrClientClosed. +// This can be used to detect if the client is closing and to break out of a +// poll loop. func (cl *Client) PollFetches(ctx context.Context) Fetches { return cl.PollRecords(ctx, 0) } // PollRecords waits for records to be available, returning as soon as any -// broker returns a record in a fetch. If the ctx quits, this function quits. +// broker returns records in a fetch. If the context quits, this function +// quits. If the context is nil or is already canceled, this function will +// return immediately with any currently buffered records. + // This returns a maximum of maxPollRecords total across all fetches, or // returns all buffered records if maxPollRecords is <= 0. // @@ -321,9 +331,14 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches { } fill() - if len(fetches) > 0 { + if len(fetches) > 0 || ctx == nil { return fetches } + select { + case <-ctx.Done(): + return fetches + default: + } done := make(chan struct{}) quit := false