Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restart unhealthy tasks #3105

Merged
merged 33 commits into from
Sep 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a720bb5
Add restart fields
schmichael Aug 24, 2017
bd1a342
Nest restart fields in CheckRestart
schmichael Aug 25, 2017
1608e59
Add check watcher for restarting unhealthy tasks
schmichael Aug 26, 2017
ebbf87f
Use existing restart policy infrastructure
schmichael Sep 10, 2017
555d1e2
on_warning=false -> ignore_warnings=false
schmichael Sep 11, 2017
c2d895d
Add comments and move delay calc to TaskRunner
schmichael Sep 11, 2017
78c72f8
Default grace period to 1s
schmichael Sep 11, 2017
7e103f6
Document new check_restart stanza
schmichael Sep 11, 2017
850d991
Add changelog entry for #3105
schmichael Sep 11, 2017
3db835c
Improve check watcher logging and add tests
schmichael Sep 13, 2017
526528c
Removed partially implemented allocLock
schmichael Sep 13, 2017
568b963
Remove unused lastStart field
schmichael Sep 13, 2017
9fb2865
Fix whitespace
schmichael Sep 13, 2017
092057a
Canonicalize and Merge CheckRestart in api
schmichael Sep 14, 2017
8b8c164
Wrap check watch updates in a struct
schmichael Sep 14, 2017
237c096
Simplify from 2 select loops to one
schmichael Sep 14, 2017
f8e872c
RestartDelay isn't needed as checks are re-added on restarts
schmichael Sep 14, 2017
40ed262
Handle multiple failing checks on a single task
schmichael Sep 14, 2017
5cd1d57
Watched -> TriggersRestart
schmichael Sep 14, 2017
10dc1c7
DRY up restart handling a bit.
schmichael Sep 14, 2017
3c0a42b
Rename unhealthy var and fix test indeterminism
schmichael Sep 14, 2017
6f72270
Test check watch updates
schmichael Sep 14, 2017
a508bb9
Fold SetFailure into SetRestartTriggered
schmichael Sep 14, 2017
5141c95
Add check_restart to jobspec tests
schmichael Sep 14, 2017
1564e1c
Move check_restart to its own section.
schmichael Sep 14, 2017
8014762
Add comments
schmichael Sep 15, 2017
cde908e
Cleanup and test restart failure code
schmichael Sep 15, 2017
fa836d8
Name const after what it represents
schmichael Sep 15, 2017
924813d
Test converting CheckRestart from api->structs
schmichael Sep 15, 2017
6bcf019
Test CheckRestart.Validate
schmichael Sep 15, 2017
10ae18c
Minor corrections to check_restart docs
schmichael Sep 15, 2017
967825d
Fix comments: task -> check
schmichael Sep 15, 2017
3d7446d
@dadgar is better at words than me
schmichael Sep 15, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

IMPROVEMENTS:
* api: Metrics endpoint exposes Prometheus formatted metrics [GH-3171]
* discovery: Allow restarting unhealthy tasks with `check_restart` [GH-3105]
* telemetry: Add support for tagged metrics for Nomad clients [GH-3147]

BUG FIXES:
Expand Down
88 changes: 82 additions & 6 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,71 @@ func (r *RestartPolicy) Merge(rp *RestartPolicy) {
}
}

// CheckRestart describes if and when a task should be restarted based on
// failing health checks.
type CheckRestart struct {
Limit int `mapstructure:"limit"`
Grace *time.Duration `mapstructure:"grace_period"`
IgnoreWarnings bool `mapstructure:"ignore_warnings"`
}

// Canonicalize CheckRestart fields if not nil.
func (c *CheckRestart) Canonicalize() {
if c == nil {
return
}

if c.Grace == nil {
c.Grace = helper.TimeToPtr(1 * time.Second)
}
}

// Copy returns a copy of CheckRestart or nil if unset.
func (c *CheckRestart) Copy() *CheckRestart {
if c == nil {
return nil
}

nc := new(CheckRestart)
nc.Limit = c.Limit
if c.Grace != nil {
g := *c.Grace
nc.Grace = &g
}
nc.IgnoreWarnings = c.IgnoreWarnings
return nc
}

// Merge values from other CheckRestart over default values on this
// CheckRestart and return merged copy.
func (c *CheckRestart) Merge(o *CheckRestart) *CheckRestart {
if c == nil {
// Just return other
return o
}

nc := c.Copy()

if o == nil {
// Nothing to merge
return nc
}

if nc.Limit == 0 {
nc.Limit = o.Limit
}

if nc.Grace == nil {
nc.Grace = o.Grace
}

if nc.IgnoreWarnings {
nc.IgnoreWarnings = o.IgnoreWarnings
}

return nc
}

// The ServiceCheck data model represents the consul health check that
// Nomad registers for a Task
type ServiceCheck struct {
Expand All @@ -96,16 +161,18 @@ type ServiceCheck struct {
TLSSkipVerify bool `mapstructure:"tls_skip_verify"`
Header map[string][]string
Method string
CheckRestart *CheckRestart `mapstructure:"check_restart"`
}

// The Service model represents a Consul service definition
type Service struct {
Id string
Name string
Tags []string
PortLabel string `mapstructure:"port"`
AddressMode string `mapstructure:"address_mode"`
Checks []ServiceCheck
Id string
Name string
Tags []string
PortLabel string `mapstructure:"port"`
AddressMode string `mapstructure:"address_mode"`
Checks []ServiceCheck
CheckRestart *CheckRestart `mapstructure:"check_restart"`
}

func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) {
Expand All @@ -117,6 +184,15 @@ func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) {
if s.AddressMode == "" {
s.AddressMode = "auto"
}

s.CheckRestart.Canonicalize()

// Canonicallize CheckRestart on Checks and merge Service.CheckRestart
// into each check.
for _, c := range s.Checks {
c.CheckRestart.Canonicalize()
c.CheckRestart = c.CheckRestart.Merge(s.CheckRestart)
}
}

// EphemeralDisk is an ephemeral disk object
Expand Down
3 changes: 2 additions & 1 deletion client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ func (r *AllocRunner) RestoreState() error {
// Restart task runner if RestoreState gave a reason
if restartReason != "" {
r.logger.Printf("[INFO] client: restarting alloc %s task %s: %v", r.allocID, name, restartReason)
tr.Restart("upgrade", restartReason)
const failure = false
tr.Restart("upgrade", restartReason, failure)
}
} else {
tr.Destroy(taskDestroyEvent)
Expand Down
4 changes: 2 additions & 2 deletions client/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
// ConsulServiceAPI is the interface the Nomad Client uses to register and
// remove services and checks from Consul.
type ConsulServiceAPI interface {
RegisterTask(allocID string, task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error
RegisterTask(allocID string, task *structs.Task, restarter consul.TaskRestarter, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error
RemoveTask(allocID string, task *structs.Task)
UpdateTask(allocID string, existing, newTask *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error
UpdateTask(allocID string, existing, newTask *structs.Task, restart consul.TaskRestarter, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would advice against adding anything else at this point. I would convert to a config struct. No action required in this PR but its getting a bit much

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the same thing!

AllocRegistrations(allocID string) (*consul.AllocRegistration, error)
}
5 changes: 3 additions & 2 deletions client/consul_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var (
// TaskHooks is an interface which provides hooks into the tasks life-cycle
type TaskHooks interface {
// Restart is used to restart the task
Restart(source, reason string)
Restart(source, reason string, failure bool)

// Signal is used to signal the task
Signal(source, reason string, s os.Signal) error
Expand Down Expand Up @@ -439,7 +439,8 @@ func (tm *TaskTemplateManager) handleTemplateRerenders(allRenderedTime time.Time
}

if restart {
tm.config.Hooks.Restart(consulTemplateSourceName, "template with change_mode restart re-rendered")
const failure = false
tm.config.Hooks.Restart(consulTemplateSourceName, "template with change_mode restart re-rendered", failure)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My little const pattern is pretty funky, and I'd be happy to remove it.

I do it because I hate seeing method calls with literal booleans in them and having no idea what those booleans do without looking at the method signature and/or docs.

} else if len(signals) != 0 {
var mErr multierror.Error
for signal := range signals {
Expand Down
2 changes: 1 addition & 1 deletion client/consul_template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewMockTaskHooks() *MockTaskHooks {
EmitEventCh: make(chan struct{}, 1),
}
}
func (m *MockTaskHooks) Restart(source, reason string) {
func (m *MockTaskHooks) Restart(source, reason string, failure bool) {
m.Restarts++
select {
case m.RestartCh <- struct{}{}:
Expand Down
4 changes: 2 additions & 2 deletions client/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ func newMockConsulServiceClient() *mockConsulServiceClient {
return &m
}

func (m *mockConsulServiceClient) UpdateTask(allocID string, old, new *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error {
func (m *mockConsulServiceClient) UpdateTask(allocID string, old, new *structs.Task, restarter consul.TaskRestarter, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error {
m.mu.Lock()
defer m.mu.Unlock()
m.logger.Printf("[TEST] mock_consul: UpdateTask(%q, %v, %v, %T, %x)", allocID, old, new, exec, net.Hash())
m.ops = append(m.ops, newMockConsulOp("update", allocID, new, exec, net))
return nil
}

func (m *mockConsulServiceClient) RegisterTask(allocID string, task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error {
func (m *mockConsulServiceClient) RegisterTask(allocID string, task *structs.Task, restarter consul.TaskRestarter, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error {
m.mu.Lock()
defer m.mu.Unlock()
m.logger.Printf("[TEST] mock_consul: RegisterTask(%q, %q, %T, %x)", allocID, task.Name, exec, net.Hash())
Expand Down
73 changes: 30 additions & 43 deletions client/restarts.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type RestartTracker struct {
waitRes *dstructs.WaitResult
startErr error
restartTriggered bool // Whether the task has been signalled to be restarted
failure bool // Whether a failure triggered the restart
count int // Current number of attempts.
onSuccess bool // Whether to restart on successful exit code.
startTime time.Time // When the interval began
Expand All @@ -59,6 +60,7 @@ func (r *RestartTracker) SetStartError(err error) *RestartTracker {
r.lock.Lock()
defer r.lock.Unlock()
r.startErr = err
r.failure = true
return r
}

Expand All @@ -67,15 +69,22 @@ func (r *RestartTracker) SetWaitResult(res *dstructs.WaitResult) *RestartTracker
r.lock.Lock()
defer r.lock.Unlock()
r.waitRes = res
r.failure = true
return r
}

// SetRestartTriggered is used to mark that the task has been signalled to be
// restarted
func (r *RestartTracker) SetRestartTriggered() *RestartTracker {
// restarted. Setting the failure to true restarts according to the restart
// policy. When failure is false the task is restarted without considering the
// restart policy.
func (r *RestartTracker) SetRestartTriggered(failure bool) *RestartTracker {
r.lock.Lock()
defer r.lock.Unlock()
r.restartTriggered = true
if failure {
r.failure = true
} else {
r.restartTriggered = true
}
return r
}

Expand Down Expand Up @@ -106,6 +115,7 @@ func (r *RestartTracker) GetState() (string, time.Duration) {
r.startErr = nil
r.waitRes = nil
r.restartTriggered = false
r.failure = false
}()

// Hot path if a restart was triggered
Expand Down Expand Up @@ -134,52 +144,29 @@ func (r *RestartTracker) GetState() (string, time.Duration) {
r.startTime = now
}

if r.startErr != nil {
return r.handleStartError()
} else if r.waitRes != nil {
return r.handleWaitResult()
// Handle restarts due to failures
if !r.failure {
return "", 0
}

return "", 0
}

// handleStartError returns the new state and potential wait duration for
// restarting the task after it was not successfully started. On start errors,
// the restart policy is always treated as fail mode to ensure we don't
// infinitely try to start a task.
func (r *RestartTracker) handleStartError() (string, time.Duration) {
// If the error is not recoverable, do not restart.
if !structs.IsRecoverable(r.startErr) {
r.reason = ReasonUnrecoverableErrror
return structs.TaskNotRestarting, 0
}

if r.count > r.policy.Attempts {
if r.policy.Mode == structs.RestartPolicyModeFail {
r.reason = fmt.Sprintf(
`Exceeded allowed attempts %d in interval %v and mode is "fail"`,
r.policy.Attempts, r.policy.Interval)
if r.startErr != nil {
// If the error is not recoverable, do not restart.
if !structs.IsRecoverable(r.startErr) {
r.reason = ReasonUnrecoverableErrror
return structs.TaskNotRestarting, 0
} else {
r.reason = ReasonDelay
return structs.TaskRestarting, r.getDelay()
}
} else if r.waitRes != nil {
// If the task started successfully and restart on success isn't specified,
// don't restart but don't mark as failed.
if r.waitRes.Successful() && !r.onSuccess {
r.reason = "Restart unnecessary as task terminated successfully"
return structs.TaskTerminated, 0
}
}

r.reason = ReasonWithinPolicy
return structs.TaskRestarting, r.jitter()
}

// handleWaitResult returns the new state and potential wait duration for
// restarting the task after it has exited.
func (r *RestartTracker) handleWaitResult() (string, time.Duration) {
// If the task started successfully and restart on success isn't specified,
// don't restart but don't mark as failed.
if r.waitRes.Successful() && !r.onSuccess {
r.reason = "Restart unnecessary as task terminated successfully"
return structs.TaskTerminated, 0
}

// If this task has been restarted due to failures more times
// than the restart policy allows within an interval fail
// according to the restart policy's mode.
if r.count > r.policy.Attempts {
if r.policy.Mode == structs.RestartPolicyModeFail {
r.reason = fmt.Sprintf(
Expand Down
15 changes: 14 additions & 1 deletion client/restarts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,24 @@ func TestClient_RestartTracker_RestartTriggered(t *testing.T) {
p := testPolicy(true, structs.RestartPolicyModeFail)
p.Attempts = 0
rt := newRestartTracker(p, structs.JobTypeService)
if state, when := rt.SetRestartTriggered().GetState(); state != structs.TaskRestarting && when != 0 {
if state, when := rt.SetRestartTriggered(false).GetState(); state != structs.TaskRestarting && when != 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit test the new case

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added cde908e

t.Fatalf("expect restart immediately, got %v %v", state, when)
}
}

func TestClient_RestartTracker_RestartTriggered_Failure(t *testing.T) {
t.Parallel()
p := testPolicy(true, structs.RestartPolicyModeFail)
p.Attempts = 1
rt := newRestartTracker(p, structs.JobTypeService)
if state, when := rt.SetRestartTriggered(true).GetState(); state != structs.TaskRestarting || when == 0 {
t.Fatalf("expect restart got %v %v", state, when)
}
if state, when := rt.SetRestartTriggered(true).GetState(); state != structs.TaskNotRestarting || when != 0 {
t.Fatalf("expect failed got %v %v", state, when)
}
}

func TestClient_RestartTracker_StartError_Recoverable_Fail(t *testing.T) {
t.Parallel()
p := testPolicy(true, structs.RestartPolicyModeFail)
Expand Down
Loading