Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix multiple overflow errors in exponential backoff #18200

Merged
merged 8 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/18200.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
core: Fixed a bug where exponential backoff could result in excessive CPU usage
```
13 changes: 5 additions & 8 deletions client/allocrunner/taskrunner/stats_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
bstructs "github.com/hashicorp/nomad/plugins/base/structs"
)
Expand Down Expand Up @@ -127,7 +128,9 @@ MAIN:
//
// It logs the errors with appropriate log levels; don't log returned error
func (h *statsHook) callStatsWithRetry(ctx context.Context, handle interfaces.DriverStats) (<-chan *cstructs.TaskResourceUsage, error) {
var retry int
var retry uint64
var backoff time.Duration
limit := time.Second * 5

MAIN:
if ctx.Err() != nil {
Expand Down Expand Up @@ -162,13 +165,7 @@ MAIN:
h.logger.Error("failed to start stats collection for task", "error", err)
}

limit := time.Second * 5
backoff := 1 << (2 * uint64(retry)) * time.Second
if backoff > limit || retry > 5 {
backoff = limit
}

// Increment retry counter
backoff = helper.Backoff(time.Second, limit, retry)
retry++

time.Sleep(backoff)
Expand Down
13 changes: 6 additions & 7 deletions client/allocrunner/taskrunner/vault_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
ti "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand Down Expand Up @@ -311,7 +312,8 @@ OUTER:
// deriveVaultToken derives the Vault token using exponential backoffs. It
// returns the Vault token and whether the manager should exit.
func (h *vaultHook) deriveVaultToken() (token string, exit bool) {
attempts := 0
var attempts uint64
var backoff time.Duration
for {
tokens, err := h.client.DeriveToken(h.alloc, []string{h.taskName})
if err == nil {
Expand Down Expand Up @@ -339,14 +341,11 @@ func (h *vaultHook) deriveVaultToken() (token string, exit bool) {
}

// Handle the retry case
backoff := (1 << (2 * uint64(attempts))) * vaultBackoffBaseline
if backoff > vaultBackoffLimit {
backoff = vaultBackoffLimit
}
h.logger.Error("failed to derive Vault token", "error", err, "recoverable", true, "backoff", backoff)

backoff = helper.Backoff(vaultBackoffBaseline, vaultBackoffLimit, attempts)
attempts++

h.logger.Error("failed to derive Vault token", "error", err, "recoverable", true, "backoff", backoff)

// Wait till retrying
select {
case <-h.ctx.Done():
Expand Down
11 changes: 5 additions & 6 deletions client/devicemanager/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/pluginutils/loader"
"github.com/hashicorp/nomad/helper/pluginutils/singleton"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -450,7 +451,8 @@ func (i *instanceManager) handleFingerprint(f *device.FingerprintResponse) error
// collectStats is a long lived goroutine for collecting device statistics. It
// handles errors by backing off exponentially and retrying.
func (i *instanceManager) collectStats() {
attempt := 0
var attempt uint64
var backoff time.Duration

START:
// Get a device plugin
Expand Down Expand Up @@ -495,10 +497,7 @@ START:
}

// Retry with an exponential backoff
backoff := (1 << (2 * uint64(attempt))) * statsBackoffBaseline
if backoff > statsBackoffLimit {
backoff = statsBackoffLimit
}
backoff = helper.Backoff(statsBackoffBaseline, statsBackoffLimit, attempt)
attempt++

i.logger.Error("stats returned an error", "error", err, "retry", backoff)
Expand All @@ -511,7 +510,7 @@ START:
}
}

// Reset the attempt since we got statistics
// Reset the attempts since we got statistics
attempt = 0

// Store the new stats
Expand Down
16 changes: 5 additions & 11 deletions client/pluginmanager/drivermanager/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/pluginutils/loader"
"github.com/hashicorp/nomad/helper/pluginutils/singleton"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -290,7 +291,7 @@ func (i *instanceManager) fingerprint() {

// backoff and retry used if the RPC is closed by the other end
var backoff time.Duration
var retry int
var retry uint64
for {
if backoff > 0 {
select {
Expand Down Expand Up @@ -329,11 +330,7 @@ func (i *instanceManager) fingerprint() {
i.handleFingerprintError()

// Calculate the new backoff
backoff = (1 << (2 * uint64(retry))) * driverFPBackoffBaseline
if backoff > driverFPBackoffLimit {
backoff = driverFPBackoffLimit
}
// Increment retry counter
backoff = helper.Backoff(driverFPBackoffBaseline, driverFPBackoffLimit, retry)
retry++
continue
}
Expand Down Expand Up @@ -426,7 +423,7 @@ func (i *instanceManager) handleEvents() {
}

var backoff time.Duration
var retry int
var retry uint64
for {
if backoff > 0 {
select {
Expand All @@ -453,10 +450,7 @@ func (i *instanceManager) handleEvents() {
i.logger.Warn("failed to receive task events, retrying", "error", err, "retry", retry)

// Calculate the new backoff
backoff = (1 << (2 * uint64(retry))) * driverFPBackoffBaseline
if backoff > driverFPBackoffLimit {
backoff = driverFPBackoffLimit
}
backoff = helper.Backoff(driverFPBackoffBaseline, driverFPBackoffLimit, retry)
retry++
continue
}
Expand Down
14 changes: 5 additions & 9 deletions command/agent/consul/version_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ func checkConsulTLSSkipVerify(ctx context.Context, logger log.Logger, client Age

defer close(done)

i := uint64(0)

timer, stop := helper.NewSafeTimer(limit)
defer stop()

var attempts uint64
var backoff time.Duration

for {
self, err := client.Self()
if err == nil {
Expand All @@ -40,13 +41,8 @@ func checkConsulTLSSkipVerify(ctx context.Context, logger log.Logger, client Age
return
}

backoff := (1 << (2 * i)) * baseline
if backoff > limit {
backoff = limit
} else {
i++
}

backoff = helper.Backoff(baseline, limit, attempts)
attempts++
timer.Reset(backoff)

select {
Expand Down
26 changes: 13 additions & 13 deletions drivers/docker/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,9 @@ type createContainerClient interface {
func (d *Driver) createContainer(client createContainerClient, config docker.CreateContainerOptions,
image string) (*docker.Container, error) {
// Create a container
attempted := 0
var attempted uint64
var backoff time.Duration

CREATE:
container, createErr := client.CreateContainer(config)
if createErr == nil {
Expand Down Expand Up @@ -526,16 +528,19 @@ CREATE:

if attempted < 5 {
attempted++
time.Sleep(nextBackoff(attempted))
backoff = helper.Backoff(50*time.Millisecond, time.Minute, attempted)
time.Sleep(backoff)
goto CREATE
}

} else if strings.Contains(strings.ToLower(createErr.Error()), "no such image") {
// There is still a very small chance this is possible even with the
// coordinator so retry.
return nil, nstructs.NewRecoverableError(createErr, true)
} else if isDockerTransientError(createErr) && attempted < 5 {
attempted++
time.Sleep(nextBackoff(attempted))
backoff = helper.Backoff(50*time.Millisecond, time.Minute, attempted)
time.Sleep(backoff)
tgross marked this conversation as resolved.
Show resolved Hide resolved
goto CREATE
}

Expand All @@ -550,8 +555,9 @@ func (d *Driver) startContainer(c *docker.Container) error {
return err
}

// Start a container
attempted := 0
var attempted uint64
var backoff time.Duration

START:
startErr := dockerClient.StartContainer(c.ID, c.HostConfig)
if startErr == nil || strings.Contains(startErr.Error(), "Container already running") {
Expand All @@ -563,7 +569,8 @@ START:
if isDockerTransientError(startErr) {
if attempted < 5 {
attempted++
time.Sleep(nextBackoff(attempted))
backoff = helper.Backoff(50*time.Millisecond, time.Minute, attempted)
time.Sleep(backoff)
goto START
}
return nstructs.NewRecoverableError(startErr, true)
Expand All @@ -572,13 +579,6 @@ START:
return recoverableErrTimeouts(startErr)
}

// nextBackoff returns appropriate docker backoff durations after attempted attempts.
func nextBackoff(attempted int) time.Duration {
tgross marked this conversation as resolved.
Show resolved Hide resolved
// attempts in 200ms, 800ms, 3.2s, 12.8s, 51.2s
// TODO: add randomization factor and extract to a helper
return 1 << (2 * uint64(attempted)) * 50 * time.Millisecond
}

// createImage creates a docker image either by pulling it from a registry or by
// loading it from the file system
func (d *Driver) createImage(task *drivers.TaskConfig, driverConfig *TaskConfig, client *docker.Client) (string, error) {
Expand Down
10 changes: 3 additions & 7 deletions drivers/docker/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte
defer destCh.close()

// backoff and retry used if the docker stats API returns an error
var backoff time.Duration = 0
var retry int
var backoff time.Duration
var retry uint64

// create an interval timer
timer, stop := helper.NewSafeTimer(backoff)
Expand Down Expand Up @@ -137,11 +137,7 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte
h.logger.Debug("error collecting stats from container", "error", err)

// Calculate the new backoff
backoff = (1 << (2 * uint64(retry))) * statsCollectorBackoffBaseline
if backoff > statsCollectorBackoffLimit {
backoff = statsCollectorBackoffLimit
}
// Increment retry counter
backoff = helper.Backoff(statsCollectorBackoffBaseline, statsCollectorBackoffLimit, retry)
retry++
continue
}
Expand Down
31 changes: 31 additions & 0 deletions helper/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package helper

import (
"time"
)

func Backoff(backoffBase time.Duration, backoffLimit time.Duration, attempt uint64) time.Duration {
const MaxUint = ^uint64(0)
const MaxInt = int64(MaxUint >> 1)

// Ensure lack of non-positive backoffs since these make no sense
if backoffBase.Nanoseconds() <= 0 {
return max(backoffBase, 0*time.Second)
}

// Ensure that a large attempt will not cause an overflow
if attempt > 62 || MaxInt/backoffBase.Nanoseconds() < (1<<attempt) {
return backoffLimit
}

// Compute deadline and clamp it to backoffLimit
deadline := 1 << attempt * backoffBase
if deadline > backoffLimit {
deadline = backoffLimit
}

return deadline
}
72 changes: 72 additions & 0 deletions helper/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package helper

import (
"testing"
"time"

"github.com/shoenig/test/must"
)

func Test_Backoff(t *testing.T) {
const MaxUint = ^uint64(0)
const MaxInt = int64(MaxUint >> 1)

cases := []struct {
name string
backoffBase time.Duration
backoffLimit time.Duration
attempt uint64
expectedResult time.Duration
}{
{
name: "backoff limit clamps for high base",
backoffBase: time.Hour,
backoffLimit: time.Minute,
attempt: 1,
expectedResult: time.Minute,
},
{
name: "backoff limit clamps for boundary attempt",
backoffBase: time.Hour,
backoffLimit: time.Minute,
attempt: 63,
expectedResult: time.Minute,
},
{
name: "small retry value",
backoffBase: time.Minute,
backoffLimit: time.Hour,
attempt: 0,
expectedResult: time.Minute,
},
{
name: "first retry value",
backoffBase: time.Minute,
backoffLimit: time.Hour,
attempt: 1,
expectedResult: 2 * time.Minute,
},
{
name: "fifth retry value",
backoffBase: time.Minute,
backoffLimit: time.Hour,
attempt: 5,
expectedResult: 32 * time.Minute,
},
{
name: "sixth retry value",
backoffBase: time.Minute,
backoffLimit: time.Hour,
attempt: 6,
expectedResult: time.Hour,
},
}

for _, tc := range cases {
result := Backoff(tc.backoffBase, tc.backoffLimit, tc.attempt)
must.Eq(t, tc.expectedResult, result)
}
}
Loading