diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index d3927922..4f370214 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -314,8 +314,8 @@ func (cl *Client) broker() *broker { return b } -func (cl *Client) waitTries(ctx context.Context, tries int) bool { - after := time.NewTimer(cl.cfg.retryBackoff(tries)) +func (cl *Client) waitTries(ctx context.Context, backoff time.Duration) bool { + after := time.NewTimer(backoff) defer after.Stop() select { case <-ctx.Done(): @@ -614,19 +614,20 @@ start: if err != nil || retryErr != nil { if r.limitRetries == 0 || tries < r.limitRetries { - if retryTimeout == 0 || time.Since(tryStart) <= retryTimeout { + backoff := r.cl.cfg.retryBackoff(tries) + if retryTimeout == 0 || time.Now().Add(backoff).Sub(tryStart) <= retryTimeout { // If this broker / request had a retriable error, we can // just retry now. If the error is *not* retriable but // is a broker-specific network error, and the next // broker is different than the current, we also retry. if r.cl.shouldRetry(tries, err) || r.cl.shouldRetry(tries, retryErr) { - if r.cl.waitTries(ctx, tries) { + if r.cl.waitTries(ctx, backoff) { next, nextErr = r.br() goto start } } else if r.cl.shouldRetryNext(tries, err) { next, nextErr = r.br() - if next != br && r.cl.waitTries(ctx, tries) { + if next != br && r.cl.waitTries(ctx, backoff) { goto start } } @@ -1527,7 +1528,8 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res // If we failed to issue the request, we *maybe* will retry. // We could have failed to even issue the request or receive // a response, which is retriable. - if err != nil && (retryTimeout == 0 || time.Since(start) < retryTimeout) && cl.shouldRetry(tries, err) && cl.waitTries(ctx, tries) { + backoff := cl.cfg.retryBackoff(tries) + if err != nil && (retryTimeout == 0 || time.Now().Add(backoff).Sub(start) < retryTimeout) && cl.shouldRetry(tries, err) && cl.waitTries(ctx, backoff) { // Non-reshardable re-requests just jump back to the // top where the broker is loaded. This is the case on // requests where the original request is split to