Skip to content

Commit

Permalink
feat: add support for context.Context in Retry
Browse files Browse the repository at this point in the history
Existing function `Retry` is maintained with the same API to preserve
backwards compatibility.

New function `RetryWithContext` accepts a context and delivers it to the
retryable function. Each retry attempt can be limited by timeout (via
context, `WithAttemptTimeout`).

Garbage-collected channel `C` which wasn't used.

Replaced stop channel delivery with `close(ch)` to avoid having a
channel with a single entry.

Removed `time.After()` as it leaks the object until timer fires.

Fixes #2

Signed-off-by: Andrey Smirnov <[email protected]>
  • Loading branch information
smira authored and talos-bot committed Jan 19, 2021
1 parent 8c63d29 commit b9dc1a9
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 49 deletions.
3 changes: 1 addition & 2 deletions .drone.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2020-09-01T21:14:08Z by kres ee90a80-dirty.
# Generated on 2021-01-18T14:11:16Z by kres latest.

kind: pipeline
type: kubernetes
Expand Down Expand Up @@ -132,7 +132,6 @@ services:
- --dns=8.8.4.4
- --mtu=1500
- --log-level=error
- --insecure-registry=http://registry.ci.svc:5000
privileged: true
volumes:
- name: outer-docker-socket
Expand Down
5 changes: 4 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2020-09-01T21:14:08Z by kres ee90a80-dirty.
# Generated on 2021-01-18T14:11:16Z by kres latest.


# options for analysis running
Expand Down Expand Up @@ -125,6 +125,9 @@ linters:
- gomnd
- goerr113
- nestif
- wrapcheck
- paralleltest
- exhaustivestruct
disable-all: false
fast: false

Expand Down
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# syntax = docker/dockerfile-upstream:1.1.7-experimental
# syntax = docker/dockerfile-upstream:1.2.0-labs

# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2020-09-01T21:14:08Z by kres ee90a80-dirty.
# Generated on 2021-01-18T14:11:16Z by kres latest.

ARG TOOLCHAIN

Expand All @@ -24,7 +24,7 @@ FROM toolchain AS tools
ENV GO111MODULE on
ENV CGO_ENABLED 0
ENV GOPATH /go
RUN curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | bash -s -- -b /bin v1.30.0
RUN curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | bash -s -- -b /bin v1.33.0
ARG GOFUMPT_VERSION
RUN cd $(mktemp -d) \
&& go mod init tmp \
Expand Down
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2020-09-21T13:20:49Z by kres 7e146df-dirty.
# Generated on 2021-01-18T14:11:16Z by kres latest.

# common variables

SHA := $(shell git describe --match=none --always --abbrev=8 --dirty)
TAG := $(shell git describe --tag --always --dirty)
BRANCH := $(shell git rev-parse --abbrev-ref HEAD)
ARTIFACTS := _out
REGISTRY ?= docker.io
USERNAME ?= autonomy
REGISTRY ?= ghcr.io
USERNAME ?= talos-systems
REGISTRY_AND_USERNAME ?= $(REGISTRY)/$(USERNAME)
GOFUMPT_VERSION ?= abc0db2c416aca0f60ea33c23c76665f6e7ba0b6
GO_VERSION ?= 1.14
TESTPKGS ?= ./...
KRES_IMAGE ?= autonomy/kres:latest
KRES_IMAGE ?= ghcr.io/talos-systems/kres:latest

# docker build settings

Expand Down
11 changes: 8 additions & 3 deletions retry/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package retry

import (
"context"
"time"
)

Expand Down Expand Up @@ -34,9 +35,8 @@ func Constant(duration time.Duration, setters ...Option) Retryer {
func NewConstantTicker(opts *Options) *ConstantTicker {
l := &ConstantTicker{
ticker: ticker{
C: make(chan time.Time, 1),
options: opts,
s: make(chan struct{}, 1),
s: make(chan struct{}),
},
}

Expand All @@ -45,10 +45,15 @@ func NewConstantTicker(opts *Options) *ConstantTicker {

// Retry implements the Retryer interface.
func (c constantRetryer) Retry(f RetryableFunc) error {
return c.RetryWithContext(context.Background(), removeContext(f))
}

// RetryWithContext implements the Retryer interface.
func (c constantRetryer) RetryWithContext(ctx context.Context, f RetryableFuncWithContext) error {
tick := NewConstantTicker(c.options)
defer tick.Stop()

return retry(f, c.duration, tick, c.options)
return retry(ctx, f, c.duration, tick, c.options)
}

// Tick implements the Ticker interface.
Expand Down
11 changes: 8 additions & 3 deletions retry/exponential.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package retry

import (
"context"
"math"
"time"
)
Expand Down Expand Up @@ -39,9 +40,8 @@ func Exponential(duration time.Duration, setters ...Option) Retryer {
func NewExponentialTicker(opts *Options) *ExponentialTicker {
e := &ExponentialTicker{
ticker: ticker{
C: make(chan time.Time, 1),
options: opts,
s: make(chan struct{}, 1),
s: make(chan struct{}),
},
c: 1.0,
}
Expand All @@ -51,10 +51,15 @@ func NewExponentialTicker(opts *Options) *ExponentialTicker {

// Retry implements the Retryer interface.
func (e exponentialRetryer) Retry(f RetryableFunc) error {
return e.RetryWithContext(context.Background(), removeContext(f))
}

// RetryWithContext implements the Retryer interface.
func (e exponentialRetryer) RetryWithContext(ctx context.Context, f RetryableFuncWithContext) error {
tick := NewExponentialTicker(e.options)
defer tick.Stop()

return retry(f, e.duration, tick, e.options)
return retry(ctx, f, e.duration, tick, e.options)
}

// Tick implements the Ticker interface.
Expand Down
11 changes: 8 additions & 3 deletions retry/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package retry

import (
"context"
"time"
)

Expand Down Expand Up @@ -36,9 +37,8 @@ func Linear(duration time.Duration, setters ...Option) Retryer {
func NewLinearTicker(opts *Options) *LinearTicker {
l := &LinearTicker{
ticker: ticker{
C: make(chan time.Time, 1),
options: opts,
s: make(chan struct{}, 1),
s: make(chan struct{}),
},
c: 1,
}
Expand All @@ -48,10 +48,15 @@ func NewLinearTicker(opts *Options) *LinearTicker {

// Retry implements the Retryer interface.
func (l linearRetryer) Retry(f RetryableFunc) error {
return l.RetryWithContext(context.Background(), removeContext(f))
}

// RetryWithContext implements the Retryer interface.
func (l linearRetryer) RetryWithContext(ctx context.Context, f RetryableFuncWithContext) error {
tick := NewLinearTicker(l.options)
defer tick.Stop()

return retry(f, l.duration, tick, l.options)
return retry(ctx, f, l.duration, tick, l.options)
}

// Tick implements the Ticker interface.
Expand Down
14 changes: 11 additions & 3 deletions retry/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (

// Options is the functional options struct.
type Options struct {
Units time.Duration
Jitter time.Duration
LogErrors bool
Units time.Duration
Jitter time.Duration
AttemptTimeout time.Duration
LogErrors bool
}

// Option is the functional option func.
Expand All @@ -39,6 +40,13 @@ func WithErrorLogging(enable bool) Option {
}
}

// WithAttemptTimeout sets timeout for each retry attempt.
func WithAttemptTimeout(o time.Duration) Option {
return func(args *Options) {
args.AttemptTimeout = o
}
}

// NewDefaultOptions initializes a Options struct with default values.
func NewDefaultOptions(setters ...Option) *Options {
opts := &Options{
Expand Down
92 changes: 70 additions & 22 deletions retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package retry

import (
"context"
"errors"
"fmt"
"log"
Expand All @@ -17,9 +18,19 @@ import (
// RetryableFunc represents a function that can be retried.
type RetryableFunc func() error

// RetryableFuncWithContext represents a function that can be retried.
type RetryableFuncWithContext func(context.Context) error

func removeContext(f RetryableFunc) RetryableFuncWithContext {
return func(context.Context) error {
return f()
}
}

// Retryer defines the requirements for retrying a function.
type Retryer interface {
Retry(RetryableFunc) error
RetryWithContext(context.Context, RetryableFuncWithContext) error
}

// Ticker defines the requirements for providing a clock to the retry logic.
Expand Down Expand Up @@ -92,9 +103,7 @@ func (TimeoutError) Error() string {

// IsTimeout reutrns if the provided error is a timeout error.
func IsTimeout(err error) bool {
_, ok := err.(TimeoutError)

return ok
return errors.Is(err, TimeoutError{})
}

type expectedError struct{ error }
Expand All @@ -115,7 +124,6 @@ type retryer struct {
}

type ticker struct {
C chan time.Time
options *Options
rand *rand.Rand
s chan struct{}
Expand All @@ -138,7 +146,7 @@ func (t ticker) StopChan() <-chan struct{} {
}

func (t ticker) Stop() {
t.s <- struct{}{}
close(t.s)
}

// ExpectedError error represents an error that is expected by the retrying
Expand All @@ -161,37 +169,77 @@ func UnexpectedError(err error) error {
return unexpectedError{err}
}

func retry(f RetryableFunc, d time.Duration, t Ticker, o *Options) error {
timer := time.NewTimer(d)
defer timer.Stop()
func retry(ctx context.Context, f RetryableFuncWithContext, d time.Duration, t Ticker, o *Options) error {
ctx, cancel := context.WithTimeout(ctx, d)
defer cancel()

errs := &ErrorSet{}

var timer *time.Timer

defer func() {
if timer != nil {
timer.Stop()
}
}()

for {
if err := f(); err != nil {
exists := errs.Append(err)

switch err.(type) {
case expectedError:
// retry expected errors
if !exists && o.LogErrors {
log.Printf("retrying error: %s", err)
}
err := func() error {
var attemptCtxCancel context.CancelFunc

attemptCtx := ctx

if o.AttemptTimeout != 0 {
attemptCtx, attemptCtxCancel = context.WithTimeout(attemptCtx, o.AttemptTimeout)
defer attemptCtxCancel()
}

return f(attemptCtx)
}()

if err == nil {
return nil
}

if errors.Is(err, context.DeadlineExceeded) {
err = TimeoutError{}

select {
case <-ctx.Done():
default:
return errs
// main context not canceled, continue retrying
err = ExpectedError(err)
}
}

exists := errs.Append(err)

var expError expectedError

if errors.As(err, &expError) {
// retry expected errors
if !exists && o.LogErrors {
log.Printf("retrying error: %s", err)
}
} else {
return nil
return errs
}

timer = time.NewTimer(t.Tick())

select {
case <-timer.C:
errs.Append(TimeoutError{})
case <-ctx.Done():
err := ctx.Err()
if errors.Is(err, context.DeadlineExceeded) {
err = TimeoutError{}
}

errs.Append(err)

return errs
case <-t.StopChan():
return nil
case <-time.After(t.Tick()):
case <-timer.C:
}
}
}
Loading

0 comments on commit b9dc1a9

Please sign in to comment.