Skip to content

Commit

Permalink
k6runner/http: retry requests
Browse files Browse the repository at this point in the history
k6runner: fix typo

Co-authored-by: ka3de <[email protected]>
  • Loading branch information
2 people authored and Nadia Santalla committed Sep 18, 2024
1 parent bd1b491 commit a8996de
Show file tree
Hide file tree
Showing 2 changed files with 348 additions and 24 deletions.
137 changes: 118 additions & 19 deletions internal/k6runner/k6runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"net/http"
"os/exec"
"strings"
Expand Down Expand Up @@ -56,8 +57,10 @@ func New(opts RunnerOpts) Runner {

if strings.HasPrefix(opts.Uri, "http") {
r = &HttpRunner{
url: opts.Uri,
logger: &logger,
url: opts.Uri,
logger: &logger,
graceTime: defaultGraceTime,
backoff: defaultBackoff,
}
} else {
r = LocalRunner{
Expand Down Expand Up @@ -324,8 +327,18 @@ NEXT_RECORD:
type HttpRunner struct {
url string
logger *zerolog.Logger
// backoff sets the minimum amount of time to wait before retrying a request. nth attempt waits n times this value,
// plus some jitter.
backoff time.Duration
// graceTime tells the HttpRunner how much time to add to the script timeout to form the request timeout.
graceTime time.Duration
}

const (
defaultBackoff = 10 * time.Second
defaultGraceTime = 20 * time.Second
)

type requestError struct {
Err string `json:"error"`
Message string `json:"msg"`
Expand Down Expand Up @@ -353,23 +366,90 @@ func (r HttpRunner) WithLogger(logger *zerolog.Logger) Runner {
var ErrUnexpectedStatus = errors.New("unexpected status code")

func (r HttpRunner) Run(ctx context.Context, script Script) (*RunResponse, error) {
if r.backoff == 0 {
panic("zero backoff, runner is misconfigured, refusing to DoS")
}

if deadline, hasDeadline := ctx.Deadline(); !hasDeadline {
defaultAllRetriesTimeout := time.Duration(script.Settings.Timeout) * time.Millisecond * 2
r.logger.Error().
Dur("allRetriesTimeout", defaultAllRetriesTimeout).
Msg("k6 runner does not have a deadline for all retries. This is a bug. Defaulting to twice the timeout to avoid retrying forever")

var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, defaultAllRetriesTimeout)
defer cancel()
} else if tud := time.Until(deadline); tud < time.Duration(script.Settings.Timeout)*time.Millisecond*2 {
r.logger.Debug().
Str("timeUntilNext", tud.String()).
Str("timeout", (time.Duration(script.Settings.Timeout) * time.Millisecond).String()).
Msg("time until next execution is too close to script timeout, there might not be room for retries")
}

wait := r.backoff
var response *RunResponse
for {
start := time.Now()

var err error
response, err = r.request(ctx, script)
if err == nil {
r.logger.Debug().Bytes("metrics", response.Metrics).Bytes("logs", response.Logs).Msg("script result")
return response, nil
}

if !errors.Is(err, errRetryable) {
return nil, err
}

// Wait, but subtract the amount of time we've already waited as part of the request timeout.
// We do this because these requests have huge timeouts, and by the nature of the system running these request,
// we expect the most common error to be a timeout, so we avoid waiting even more on top of an already large
// value.
waitRemaining := max(0, wait-time.Since(start))
r.logger.Debug().Err(err).Dur("after", waitRemaining).Msg("retrying retryable error")

waitTimer := time.NewTimer(waitRemaining)
select {
case <-ctx.Done():
waitTimer.Stop()
return nil, fmt.Errorf("cannot retry further: %w", errors.Join(err, ctx.Err()))
case <-waitTimer.C:
}

// Backoff linearly, adding some jitter.
wait += r.backoff + time.Duration(rand.Intn(int(r.backoff)))
}
}

// errRetryable indicates that an error is retryable. It is typically joined with another error.
var errRetryable = errors.New("retryable")

func (r HttpRunner) request(ctx context.Context, script Script) (*RunResponse, error) {
checkTimeout := time.Duration(script.Settings.Timeout) * time.Millisecond
if checkTimeout == 0 {
return nil, ErrNoTimeout
}

// requestTimeout should be noticeably larger than [Script.Settings.Timeout], to account for added latencies in the
// system such as network, i/o, seralization, queue wait time, etc. that take place after and before the script is
// ran.
// t0 t1 t2 t3
// |--- Queue wait ---|-------------- k6 run -----------------|--- Response ---|
// checkTimeout = t2 - t1
// requestTimeout = t3 - t0
requestTimeout := checkTimeout + r.graceTime
notAfter := time.Now().Add(requestTimeout)

ctx, cancel := context.WithDeadline(ctx, notAfter)
defer cancel()

reqBody, err := json.Marshal(script)
if err != nil {
return nil, fmt.Errorf("encoding script: %w", err)
}

// The context above carries the check timeout, which will be eventually passed to k6 by the runner at the other end
// of this request. To account for network overhead, we create a different context with an extra second of timeout,
// which adds some grace time to account for the network/system latency of the http request.
reqCtx, cancel := context.WithTimeout(context.Background(), checkTimeout+time.Second)
defer cancel()

req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, r.url, bytes.NewReader(reqBody))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.url, bytes.NewReader(reqBody))
if err != nil {
return nil, fmt.Errorf("building request: %w", err)
}
Expand All @@ -379,30 +459,43 @@ func (r HttpRunner) Run(ctx context.Context, script Script) (*RunResponse, error
resp, err := http.DefaultClient.Do(req)
if err != nil {
r.logger.Error().Err(err).Msg("sending request")
return nil, fmt.Errorf("running script: %w", err)

// Any error making a request is retryable.
return nil, errors.Join(errRetryable, fmt.Errorf("making request: %w", err))
}

defer resp.Body.Close()

switch resp.StatusCode {
case http.StatusOK, http.StatusRequestTimeout, http.StatusUnprocessableEntity, http.StatusInternalServerError:
// These are status code that come with a machine-readable response. The response may contain an error, which is
// These are status code that we assume come with a machine-readable response. The response may contain an error, which is
// handled later.
// See: https://github.com/grafana/sm-k6-runner/blob/main/internal/mq/proxy.go#L215
default:

case http.StatusBadRequest:
// These are status codes that do not come with a machine readable response, and are not retryable.
//
// There might be an argument to be made to retry 500s, as they can be produced by panic recovering mechanisms which
// _can_ be seen as a transient error. However, it is also possible for a 500 to be returned by a script that failed
// and also needed a lot of time to complete. For this reason, we choose to not retry 500 for the time being.
return nil, fmt.Errorf("%w %d", ErrUnexpectedStatus, resp.StatusCode)

default:
// Statuses not returned by the proxy directly are assumed to be infrastructure (e.g. ingress, k8s) related and
// thus marked as retriable.
// Runners may also return http.StatusServiceUnavailable if the browser session manager cannot be reached. We want
// to retry those errors, so we let the "default" block catch them.
return nil, errors.Join(errRetryable, fmt.Errorf("%w %d", ErrUnexpectedStatus, resp.StatusCode))
}

var result RunResponse
err = json.NewDecoder(resp.Body).Decode(&result)
var response RunResponse
err = json.NewDecoder(resp.Body).Decode(&response)
if err != nil {
r.logger.Error().Err(err).Msg("decoding script result")
return nil, fmt.Errorf("decoding script result: %w", err)
}

r.logger.Debug().Bytes("metrics", result.Metrics).Bytes("logs", result.Logs).Msg("script result")

return &result, nil
return &response, nil
}

type LocalRunner struct {
Expand All @@ -420,6 +513,11 @@ func (r LocalRunner) WithLogger(logger *zerolog.Logger) Runner {
func (r LocalRunner) Run(ctx context.Context, script Script) (*RunResponse, error) {
afs := afero.Afero{Fs: r.fs}

checkTimeout := time.Duration(script.Settings.Timeout) * time.Millisecond
if checkTimeout == 0 {
return nil, ErrNoTimeout
}

workdir, err := afs.TempDir("", "k6-runner")
if err != nil {
return nil, fmt.Errorf("cannot create temporary directory: %w", err)
Expand Down Expand Up @@ -455,7 +553,9 @@ func (r LocalRunner) Run(ctx context.Context, script Script) (*RunResponse, erro
return nil, fmt.Errorf("cannot find k6 executable: %w", err)
}

timeout := time.Duration(script.Settings.Timeout) * time.Millisecond
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, checkTimeout)
defer cancel()

// #nosec G204 -- the variables are not user-controlled
cmd := exec.CommandContext(
Expand All @@ -467,7 +567,6 @@ func (r LocalRunner) Run(ctx context.Context, script Script) (*RunResponse, erro
"--log-output", "file="+logsFn,
"--vus", "1",
"--iterations", "1",
"--duration", timeout.String(),
"--max-redirects", "10",
"--batch", "10",
"--batch-per-host", "4",
Expand Down
Loading

0 comments on commit a8996de

Please sign in to comment.