Skip to content

Commit

Permalink
kgo: calculate retry timeout before backoff
Browse files Browse the repository at this point in the history
Previously, our backoff could push the request past the retry timeout.
We should fail early in this case, rather than try again after the
timeout.
  • Loading branch information
twmb committed Oct 15, 2021
1 parent d44fd28 commit a67edb5
Showing 1 changed file with 8 additions and 6 deletions.
14 changes: 8 additions & 6 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,8 @@ func (cl *Client) broker() *broker {
return b
}

func (cl *Client) waitTries(ctx context.Context, tries int) bool {
after := time.NewTimer(cl.cfg.retryBackoff(tries))
func (cl *Client) waitTries(ctx context.Context, backoff time.Duration) bool {
after := time.NewTimer(backoff)
defer after.Stop()
select {
case <-ctx.Done():
Expand Down Expand Up @@ -614,19 +614,20 @@ start:

if err != nil || retryErr != nil {
if r.limitRetries == 0 || tries < r.limitRetries {
if retryTimeout == 0 || time.Since(tryStart) <= retryTimeout {
backoff := r.cl.cfg.retryBackoff(tries)
if retryTimeout == 0 || time.Now().Add(backoff).Sub(tryStart) <= retryTimeout {
// If this broker / request had a retriable error, we can
// just retry now. If the error is *not* retriable but
// is a broker-specific network error, and the next
// broker is different than the current, we also retry.
if r.cl.shouldRetry(tries, err) || r.cl.shouldRetry(tries, retryErr) {
if r.cl.waitTries(ctx, tries) {
if r.cl.waitTries(ctx, backoff) {
next, nextErr = r.br()
goto start
}
} else if r.cl.shouldRetryNext(tries, err) {
next, nextErr = r.br()
if next != br && r.cl.waitTries(ctx, tries) {
if next != br && r.cl.waitTries(ctx, backoff) {
goto start
}
}
Expand Down Expand Up @@ -1527,7 +1528,8 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
// If we failed to issue the request, we *maybe* will retry.
// We could have failed to even issue the request or receive
// a response, which is retriable.
if err != nil && (retryTimeout == 0 || time.Since(start) < retryTimeout) && cl.shouldRetry(tries, err) && cl.waitTries(ctx, tries) {
backoff := cl.cfg.retryBackoff(tries)
if err != nil && (retryTimeout == 0 || time.Now().Add(backoff).Sub(start) < retryTimeout) && cl.shouldRetry(tries, err) && cl.waitTries(ctx, backoff) {
// Non-reshardable re-requests just jump back to the
// top where the broker is loaded. This is the case on
// requests where the original request is split to
Expand Down

0 comments on commit a67edb5

Please sign in to comment.