Skip to content

Commit

Permalink
consumer: retry reloading offsets on non-retriable errors
Browse files Browse the repository at this point in the history
This brings consistency to retrying even non-retriable errors within the
client: all other non-retriable errors during fetching were retried
eventually except in this one instance. Now, if we see a non-retriable
list offsets or load epoch error, then we will retry after a 1s backoff.

Closes #25.
  • Loading branch information
twmb committed Apr 7, 2021
1 parent 6318b15 commit 009e1ba
Showing 1 changed file with 24 additions and 5 deletions.
29 changes: 24 additions & 5 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"
Expand Down Expand Up @@ -161,10 +162,6 @@ func (c *consumer) loadGroup() (*groupConsumer, bool) {
g, ok := c.loadKind().(*groupConsumer)
return g, ok
}
func (c *consumer) loadDirect() (*directConsumer, bool) {
d, ok := c.loadKind().(*directConsumer)
return d, ok
}

func (c *consumer) storeDirect(d *directConsumer) { c.v.Store(&consumerValue{v: d}) } // while locked
func (c *consumer) storeGroup(g *groupConsumer) { c.v.Store(&consumerValue{v: g}) } // while locked
Expand Down Expand Up @@ -1029,7 +1026,16 @@ func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, immediate bool)
// Called within a consumer session, this function handles results from list
// offsets or epoch loads.
func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) {
var reloads listOrEpochLoads
var (
// Retriable errors are retried immediately, while
// non-retriable errors are retried after a 1s backoff. It is
// unlikely that the client will be able to recover, but we may
// as well fetch every second to (a) force the user to notice
// errors, and (b) allow the user to auth the client at
// runtime.
reloads listOrEpochLoads
slowReloads listOrEpochLoads
)
defer func() {
// When we are done handling results, we have finished loading
// all the topics and partitions. We remove them from tracking
Expand All @@ -1041,6 +1047,18 @@ func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) {
s.listOrEpochMu.Unlock()

reloads.loadWithSession(s)
if !slowReloads.isEmpty() {
go func() {
after := time.NewTimer(time.Second)
defer after.Stop()
select {
case <-after.C:
case <-s.ctx.Done():
return
}
slowReloads.loadWithSession(s)
}()
}
}()

for _, load := range loaded.loaded {
Expand All @@ -1064,6 +1082,7 @@ func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) {
default: // from ErrorCode in a response
if !kerr.IsRetriable(load.err) { // non-retriable response error; signal such in a response
s.c.addFakeReadyForDraining(load.topic, load.partition, load.err)
slowReloads.addLoad(load.topic, load.partition, loaded.loadType, load.request)
continue
}
reloads.addLoad(load.topic, load.partition, loaded.loadType, load.request)
Expand Down

0 comments on commit 009e1ba

Please sign in to comment.