Skip to content

Commit

Permalink
Poll{Records,Fetches}: inject an error for canceled context
Browse files Browse the repository at this point in the history
It's debatable that this is necessary, but it's also not that useful not
to do this.

In the original implementation of PollFetches, no error was ever
injected. You only received an error if a partition error occurred.

Eventually, I realized this made quitting a poll loop on client close
quite difficult. I added ErrClientClosed, injected it in Poll, and added
a dedicated IsClientClosed API for this.

---

Polling originally returned every fetch result, even without records.
Fetches always returned updated log start offsets & high watermarks even
if no records were returned. Polling returned these empty fetches. This
was largely useless, so empty poll results were eventually removed. Poll
functions switched from returning periodically to returning only if
records were buffered, or the client closed, or the context was
canceled.

---

The intent of the context was to be a more powerful way to quit a poll
loop, in contrast to Java's or librdkafka's poll millisecond timeout.
The original implementation of the code largely mirrored Java and
librdkafka's polling, modulo timeouts for a context.

When you poll with a timeout, once the timeout is hit, you get no
results. You also get no error. You just hit a timeout, it is fine.
This was the original thought behind the context as well. It doesn't
matter if the context is canceled: you just quit fetching. This isn't an
error.

If you wanted to quit polling when you canceled your context, you'd need
to check the context yourself after the fact. This is a small ask,
really:

```
for ctx.Err() == nil {
    fs := cl.PollFetches(ctx)
    {work}
}
return ctx.Err()
```

---

I've seen code *once* that has a select block after fetching to check if
the context was canceled. I thought this was unfortunate, but it was
easy.

It's usual that functions that take a context return ctx.Err(), but this
isn't always the case. If a singleflight cache takes a context and a
value is loaded successfully, *even if* the next get for the same key
uses an expired context, the cache can return the successfully cached
key. You essentially find ctx.Err missing in functions where state is
already loaded. This was the exact case as the polling functions here:
we first did a check if anything was avaialable and if so, returned
without waiting.

It's valid to not return this context error. Most code does, because
most code using a context transits application boundaries and a
cancelation can only be an error, but there are cases where a
cancelation is not an error. We're just done waiting.

Regardless, odds are that if a person cancels a context, they want to
return anyway, so we can inject the ctx.Err and just return that. We
will avoid checking buffered fetches until we check ctx.Done.

This is a behavior change.

Closes #160.
  • Loading branch information
twmb committed May 5, 2022
1 parent 665f65f commit cbc8962
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 32 deletions.
66 changes: 40 additions & 26 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,20 +332,31 @@ func (c *consumer) addFakeReadyForDraining(topic string, partition int32, err er
c.sourcesReadyCond.Broadcast()
}

func errFetch(err error) Fetches {
return []Fetch{{
Topics: []FetchTopic{{
Topic: "",
Partitions: []FetchPartition{{
Partition: 0,
Err: err,
}},
}},
}}
}

// PollFetches waits for fetches to be available, returning as soon as any
// broker returns a fetch. If the context quits, this function quits. If the
// context is nil or is already canceled, this function will return immediately
// with any currently buffered records.
// broker returns a fetch. If the context is nil, this function will return
// immediately with any currently buffered records.
//
// If the client is closed, a fake fetch will be injected that has no topic, a
// partition of 0, and a partition error of ErrClientClosed. If the context is
// canceled, a fake fetch will be injected with ctx.Err. These injected errors
// can be used to break out of a poll loop.
//
// It is important to check all partition errors in the returned fetches. If
// any partition has a fatal error and actually had no records, fake fetch will
// be injected with the error.
//
// If the client is closing or has closed, a fake fetch will be injected that
// has no topic, a partition of 0, and a partition error of ErrClientClosed.
// This can be used to detect if the client is closing and to break out of a
// poll loop.
//
// If you are group consuming, a rebalance can happen under the hood while you
// process the returned fetches. This can result in duplicate work, and you may
// accidentally commit to partitions that you no longer own. You can prevent
Expand All @@ -356,9 +367,13 @@ func (cl *Client) PollFetches(ctx context.Context) Fetches {
}

// PollRecords waits for records to be available, returning as soon as any
// broker returns records in a fetch. If the context quits, this function
// quits. If the context is nil or is already canceled, this function will
// return immediately with any currently buffered records.
// broker returns records in a fetch. If the context is nil, this function
// will return immediately with any currently buffered records.
//
// If the client is closed, a fake fetch will be injected that has no topic, a
// partition of 0, and a partition error of ErrClientClosed. If the context is
// canceled, a fake fetch will be injected with ctx.Err. These injected errors
// can be used to break out of a poll loop.
//
// This returns a maximum of maxPollRecords total across all fetches, or
// returns all buffered records if maxPollRecords is <= 0.
Expand All @@ -367,11 +382,6 @@ func (cl *Client) PollFetches(ctx context.Context) Fetches {
// any partition has a fatal error and actually had no records, fake fetch will
// be injected with the error.
//
// If the client is closing or has closed, a fake fetch will be injected that
// has no topic, a partition of 0, and a partition error of ErrClientClosed.
// This can be used to detect if the client is closing and to break out of a
// poll loop.
//
// If you are group consuming, a rebalance can happen under the hood while you
// process the returned fetches. This can result in duplicate work, and you may
// accidentally commit to partitions that you no longer own. You can prevent
Expand All @@ -385,6 +395,16 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches {

c.g.undirtyUncommitted()

// If the user gave us a canceled context, we bail immediately after
// un-dirty-ing marked records.
if ctx != nil {
select {
case <-ctx.Done():
return errFetch(ctx.Err())
default:
}
}

var fetches Fetches
fill := func() {
if c.cl.cfg.blockRebalanceOnPoll {
Expand Down Expand Up @@ -454,15 +474,12 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches {
}
}

// We try filling fetches once before waiting. If we have no context,
// we guarantee that we just drain anything available and return.
fill()
if len(fetches) > 0 || ctx == nil {
return fetches
}
select {
case <-ctx.Done():
return fetches
default:
}

done := make(chan struct{})
quit := false
Expand All @@ -485,14 +502,11 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches {

select {
case <-cl.ctx.Done():
// The client is closed: we inject an error right now, which
// will be drained immediately in the fill call just below, and
// then will be returned with our fetches.
c.addFakeReadyForDraining("", 0, ErrClientClosed)
exit()
return errFetch(ErrClientClosed)
case <-ctx.Done():
// The user canceled: no need to inject anything; just return.
exit()
return errFetch(ctx.Err())
case <-done:
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/kgo/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,13 @@ func (c *testConsumer) etl(etlsBeforeQuit int) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
fetches := cl.PollRecords(ctx, 100)
cancel()
if len(fetches) == 0 {
if fetches.Err() == context.DeadlineExceeded || fetches.Err() == ErrClientClosed {
if consumed := atomic.LoadUint64(&c.consumed); consumed == testRecordLimit {
return
} else if consumed > testRecordLimit {
panic(fmt.Sprintf("invalid: consumed too much from %s (group %s)", c.consumeFrom, c.group))
}
continue
}

if fetchErrs := fetches.Errors(); len(fetchErrs) > 0 {
Expand Down
22 changes: 18 additions & 4 deletions pkg/kgo/record_and_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,14 +364,28 @@ func (f Fetch) hasErrorsOrRecords() bool {
// This function is useful to break out of a poll loop; you likely want to call
// this function before calling Errors.
func (fs Fetches) IsClientClosed() bool {
// An injected ErrClientClosed is a dedicated fetch with one topic and
// An injected ErrClientClosed is a single fetch with one topic and
// one partition. We can use this to make IsClientClosed do less work.
return len(fs) == 1 && len(fs[0].Topics) == 1 && len(fs[0].Topics[0].Partitions) == 1 && errors.Is(fs[0].Topics[0].Partitions[0].Err, ErrClientClosed)
}

// Err returns the first error in all fetches, if any. This can be used to
// quickly check if the client is closed or your poll context was canceled, or
// to check if there's some other error that requires deeper investigation with
// EachError.
func (fs Fetches) Err() error {
for _, f := range fs {
if len(f.Topics) == 1 && len(f.Topics[0].Partitions) == 1 && errors.Is(f.Topics[0].Partitions[0].Err, ErrClientClosed) {
return true
for i := range f.Topics {
ft := &f.Topics[i]
for j := range ft.Partitions {
fp := &ft.Partitions[j]
if fp.Err != nil {
return fp.Err
}
}
}
}
return false
return nil
}

// EachError calls fn for every partition that had a fetch error with the
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (c *testConsumer) transact(txnsBeforeQuit int) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
fetches := txnSess.PollFetches(ctx)
cancel()
if len(fetches) == 0 {
if fetches.Err() == context.DeadlineExceeded || fetches.Err() == ErrClientClosed {
if consumed := atomic.LoadUint64(&c.consumed); consumed == testRecordLimit {
return
} else if consumed > testRecordLimit {
Expand Down

0 comments on commit cbc8962

Please sign in to comment.