Skip to content

Commit

Permalink
kgo: add KeepRetryableFetchErrors
Browse files Browse the repository at this point in the history
For when you want to return retryable errors to the client.

Closes #439.
  • Loading branch information
twmb committed Jul 8, 2023
1 parent 815c25c commit 76d2e71
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 9 deletions.
2 changes: 2 additions & 0 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,8 @@ func (cl *Client) OptValues(opt any) []any {
return []any{cfg.maxConcurrentFetches}
case namefn(Rack):
return []any{cfg.rack}
case namefn(KeepRetryableFetchErrors):
return []any{cfg.keepRetryableFetchErrors}

case namefn(AdjustFetchOffsetsFn):
return []any{cfg.adjustOffsetsBeforeAssign}
Expand Down
10 changes: 4 additions & 6 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ type cfg struct {

maxConcurrentFetches int
disableFetchSessions bool
keepFetchRetryableErrors bool
keepRetryableFetchErrors bool

topics map[string]*regexp.Regexp // topics to consume; if regex is true, values are compiled regular expressions
partitions map[string]map[int32]Offset // partitions to directly consume from
Expand Down Expand Up @@ -1349,7 +1349,7 @@ func ConsumePreferringLagFn(fn PreferLagFn) ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.preferLagFn = fn }}
}

// KeepFetchRetryableErrors switches the client to always return any retryable
// KeepRetryableFetchErrors switches the client to always return any retryable
// broker error when fetching, rather than stripping them. By default, the
// client strips retryable errors from fetch responses; these are usually
// signals that a client needs to update its metadata to learn of where a
Expand All @@ -1359,10 +1359,8 @@ func ConsumePreferringLagFn(fn PreferLagFn) ConsumerOpt {
// events. For example, if you want to react to you yourself deleting a topic,
// you can watch for either UNKNOWN_TOPIC_OR_PARTITION or UNKNOWN_TOPIC_ID
// errors being returned in fetches (and ignore the other errors).
//
// TODO not exported / usable yet
func keepFetchRetryableErrors() ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.keepFetchRetryableErrors = true }}
func KeepRetryableFetchErrors() ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.keepRetryableFetchErrors = true }}
}

//////////////////////////////////
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/consumer_direct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestIssue434(t *testing.T) {
ConsumeTopics(fmt.Sprintf("(%s|%s)", t1, t2)),
ConsumeRegex(),
FetchMaxWait(100*time.Millisecond),
keepFetchRetryableErrors(),
KeepRetryableFetchErrors(),
)
defer cl.Close()

Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
var keep bool
switch fp.Err {
default:
if kerr.IsRetriable(fp.Err) && !s.cl.cfg.keepFetchRetryableErrors {
if kerr.IsRetriable(fp.Err) && !s.cl.cfg.keepRetryableFetchErrors {
// UnknownLeaderEpoch: our meta is newer than the broker we fetched from
// OffsetNotAvailable: fetched from out of sync replica or a behind in-sync one (KIP-392 case 1 and case 2)
// UnknownTopicID: kafka has not synced the state on all brokers
Expand Down Expand Up @@ -896,7 +896,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
if fails := partOffset.from.unknownIDFails.Add(1); fails > 5 {
partOffset.from.unknownIDFails.Add(-1)
keep = true
} else if s.cl.cfg.keepFetchRetryableErrors {
} else if s.cl.cfg.keepRetryableFetchErrors {
keep = true
} else {
numErrsStripped++
Expand Down

0 comments on commit 76d2e71

Please sign in to comment.