Skip to content

Commit

Permalink
kgo.Fetches: add Err0
Browse files Browse the repository at this point in the history
I'm having a hard time thinking of a good name here, and at least Err0
somewhat implies the indexing aspect.

I've tried:

* QuitPollErr, which too focused because it would imply that it only
returns non-nil if the error is context.Canceled or ErrClientClosed,
i.e. I would need to check the error.

* SingleErr, which is a bit awkward.

* OnlyErr, similarly awkward (and what is proposed in #189).

After staring at 189, it is a bit compelling to have very fast closed /
canceled checks, especially if the user consumes a LOT of partitions.

Credit to @dwagin for the idea / initial implementation.

Closes #189.
  • Loading branch information
twmb committed Aug 22, 2022
1 parent 5dd3321 commit 3a229d9
Showing 1 changed file with 22 additions and 3 deletions.
25 changes: 22 additions & 3 deletions pkg/kgo/record_and_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,21 +357,40 @@ func (f Fetch) hasErrorsOrRecords() bool {
return false
}

// IsClientClosed returns whether the fetches includes an error indicating that
// IsClientClosed returns whether the fetches include an error indicating that
// the client is closed.
//
// This function is useful to break out of a poll loop; you likely want to call
// this function before calling Errors.
// this function before calling Errors. If you may cancel the context to poll,
// you may want to use Err0 and manually check errors.Is(ErrClientClosed) or
// errors.Is(context.Canceled).
func (fs Fetches) IsClientClosed() bool {
// An injected ErrClientClosed is a single fetch with one topic and
// one partition. We can use this to make IsClientClosed do less work.
return len(fs) == 1 && len(fs[0].Topics) == 1 && len(fs[0].Topics[0].Partitions) == 1 && errors.Is(fs[0].Topics[0].Partitions[0].Err, ErrClientClosed)
}

// Err0 returns the error at the 0th index fetch, topic, and partition. This
// can be used to quickly check if polling returned early because the client
// was closed or the context was canceled and is faster than performing a
// linear scan over all partitions with Err. When the client is closed or the
// context is canceled, fetches will contain only one partition whose Err field
// indicates the close / cancel. Note that this returns whatever the first
// error is, nil or non-nil, and does not check for a specific error value.
func (fs Fetches) Err0() error {
if len(fs) > 0 && len(fs[0].Topics) > 0 && len(fs[0].Topics[0].Partitions) > 0 {
return fs[0].Topics[0].Partitions[0].Err
}
return nil
}

// Err returns the first error in all fetches, if any. This can be used to
// quickly check if the client is closed or your poll context was canceled, or
// to check if there's some other error that requires deeper investigation with
// EachError.
// EachError. This function performs a linear scan over all fetched partitions.
// It is recommended to always check all errors. If you would like to more
// quickly check ahead of time if a poll was canceled because of closing the
// client or canceling the context, you can use Err0.
func (fs Fetches) Err() error {
for _, f := range fs {
for i := range f.Topics {
Expand Down

0 comments on commit 3a229d9

Please sign in to comment.