Skip to content

Commit

Permalink
Merge pull request #3105 from hashicorp/f-876-restart-unhealthy
Browse files Browse the repository at this point in the history
Restart unhealthy tasks
  • Loading branch information
schmichael authored Sep 18, 2017
2 parents 7e3fc68 + 3d7446d commit c1cf162
Show file tree
Hide file tree
Showing 28 changed files with 1,632 additions and 230 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

IMPROVEMENTS:
* api: Metrics endpoint exposes Prometheus formatted metrics [GH-3171]
* discovery: Allow restarting unhealthy tasks with `check_restart` [GH-3105]
* telemetry: Add support for tagged metrics for Nomad clients [GH-3147]
* telemetry: Add basic Prometheus configuration for a Nomad cluster [GH-3186]

Expand Down
88 changes: 82 additions & 6 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,71 @@ func (r *RestartPolicy) Merge(rp *RestartPolicy) {
}
}

// CheckRestart describes if and when a task should be restarted based on
// failing health checks.
type CheckRestart struct {
Limit int `mapstructure:"limit"`
Grace *time.Duration `mapstructure:"grace_period"`
IgnoreWarnings bool `mapstructure:"ignore_warnings"`
}

// Canonicalize CheckRestart fields if not nil.
func (c *CheckRestart) Canonicalize() {
if c == nil {
return
}

if c.Grace == nil {
c.Grace = helper.TimeToPtr(1 * time.Second)
}
}

// Copy returns a copy of CheckRestart or nil if unset.
func (c *CheckRestart) Copy() *CheckRestart {
if c == nil {
return nil
}

nc := new(CheckRestart)
nc.Limit = c.Limit
if c.Grace != nil {
g := *c.Grace
nc.Grace = &g
}
nc.IgnoreWarnings = c.IgnoreWarnings
return nc
}

// Merge values from other CheckRestart over default values on this
// CheckRestart and return merged copy.
func (c *CheckRestart) Merge(o *CheckRestart) *CheckRestart {
if c == nil {
// Just return other
return o
}

nc := c.Copy()

if o == nil {
// Nothing to merge
return nc
}

if nc.Limit == 0 {
nc.Limit = o.Limit
}

if nc.Grace == nil {
nc.Grace = o.Grace
}

if nc.IgnoreWarnings {
nc.IgnoreWarnings = o.IgnoreWarnings
}

return nc
}

// The ServiceCheck data model represents the consul health check that
// Nomad registers for a Task
type ServiceCheck struct {
Expand All @@ -96,16 +161,18 @@ type ServiceCheck struct {
TLSSkipVerify bool `mapstructure:"tls_skip_verify"`
Header map[string][]string
Method string
CheckRestart *CheckRestart `mapstructure:"check_restart"`
}

// The Service model represents a Consul service definition
type Service struct {
Id string
Name string
Tags []string
PortLabel string `mapstructure:"port"`
AddressMode string `mapstructure:"address_mode"`
Checks []ServiceCheck
Id string
Name string
Tags []string
PortLabel string `mapstructure:"port"`
AddressMode string `mapstructure:"address_mode"`
Checks []ServiceCheck
CheckRestart *CheckRestart `mapstructure:"check_restart"`
}

func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) {
Expand All @@ -117,6 +184,15 @@ func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) {
if s.AddressMode == "" {
s.AddressMode = "auto"
}

s.CheckRestart.Canonicalize()

// Canonicallize CheckRestart on Checks and merge Service.CheckRestart
// into each check.
for _, c := range s.Checks {
c.CheckRestart.Canonicalize()
c.CheckRestart = c.CheckRestart.Merge(s.CheckRestart)
}
}

// EphemeralDisk is an ephemeral disk object
Expand Down
3 changes: 2 additions & 1 deletion client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ func (r *AllocRunner) RestoreState() error {
// Restart task runner if RestoreState gave a reason
if restartReason != "" {
r.logger.Printf("[INFO] client: restarting alloc %s task %s: %v", r.allocID, name, restartReason)
tr.Restart("upgrade", restartReason)
const failure = false
tr.Restart("upgrade", restartReason, failure)
}
} else {
tr.Destroy(taskDestroyEvent)
Expand Down
4 changes: 2 additions & 2 deletions client/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
// ConsulServiceAPI is the interface the Nomad Client uses to register and
// remove services and checks from Consul.
type ConsulServiceAPI interface {
RegisterTask(allocID string, task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error
RegisterTask(allocID string, task *structs.Task, restarter consul.TaskRestarter, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error
RemoveTask(allocID string, task *structs.Task)
UpdateTask(allocID string, existing, newTask *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error
UpdateTask(allocID string, existing, newTask *structs.Task, restart consul.TaskRestarter, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error
AllocRegistrations(allocID string) (*consul.AllocRegistration, error)
}
5 changes: 3 additions & 2 deletions client/consul_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var (
// TaskHooks is an interface which provides hooks into the tasks life-cycle
type TaskHooks interface {
// Restart is used to restart the task
Restart(source, reason string)
Restart(source, reason string, failure bool)

// Signal is used to signal the task
Signal(source, reason string, s os.Signal) error
Expand Down Expand Up @@ -439,7 +439,8 @@ func (tm *TaskTemplateManager) handleTemplateRerenders(allRenderedTime time.Time
}

if restart {
tm.config.Hooks.Restart(consulTemplateSourceName, "template with change_mode restart re-rendered")
const failure = false
tm.config.Hooks.Restart(consulTemplateSourceName, "template with change_mode restart re-rendered", failure)
} else if len(signals) != 0 {
var mErr multierror.Error
for signal := range signals {
Expand Down
2 changes: 1 addition & 1 deletion client/consul_template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewMockTaskHooks() *MockTaskHooks {
EmitEventCh: make(chan struct{}, 1),
}
}
func (m *MockTaskHooks) Restart(source, reason string) {
func (m *MockTaskHooks) Restart(source, reason string, failure bool) {
m.Restarts++
select {
case m.RestartCh <- struct{}{}:
Expand Down
4 changes: 2 additions & 2 deletions client/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ func newMockConsulServiceClient() *mockConsulServiceClient {
return &m
}

func (m *mockConsulServiceClient) UpdateTask(allocID string, old, new *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error {
func (m *mockConsulServiceClient) UpdateTask(allocID string, old, new *structs.Task, restarter consul.TaskRestarter, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error {
m.mu.Lock()
defer m.mu.Unlock()
m.logger.Printf("[TEST] mock_consul: UpdateTask(%q, %v, %v, %T, %x)", allocID, old, new, exec, net.Hash())
m.ops = append(m.ops, newMockConsulOp("update", allocID, new, exec, net))
return nil
}

func (m *mockConsulServiceClient) RegisterTask(allocID string, task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error {
func (m *mockConsulServiceClient) RegisterTask(allocID string, task *structs.Task, restarter consul.TaskRestarter, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error {
m.mu.Lock()
defer m.mu.Unlock()
m.logger.Printf("[TEST] mock_consul: RegisterTask(%q, %q, %T, %x)", allocID, task.Name, exec, net.Hash())
Expand Down
73 changes: 30 additions & 43 deletions client/restarts.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type RestartTracker struct {
waitRes *dstructs.WaitResult
startErr error
restartTriggered bool // Whether the task has been signalled to be restarted
failure bool // Whether a failure triggered the restart
count int // Current number of attempts.
onSuccess bool // Whether to restart on successful exit code.
startTime time.Time // When the interval began
Expand All @@ -59,6 +60,7 @@ func (r *RestartTracker) SetStartError(err error) *RestartTracker {
r.lock.Lock()
defer r.lock.Unlock()
r.startErr = err
r.failure = true
return r
}

Expand All @@ -67,15 +69,22 @@ func (r *RestartTracker) SetWaitResult(res *dstructs.WaitResult) *RestartTracker
r.lock.Lock()
defer r.lock.Unlock()
r.waitRes = res
r.failure = true
return r
}

// SetRestartTriggered is used to mark that the task has been signalled to be
// restarted
func (r *RestartTracker) SetRestartTriggered() *RestartTracker {
// restarted. Setting the failure to true restarts according to the restart
// policy. When failure is false the task is restarted without considering the
// restart policy.
func (r *RestartTracker) SetRestartTriggered(failure bool) *RestartTracker {
r.lock.Lock()
defer r.lock.Unlock()
r.restartTriggered = true
if failure {
r.failure = true
} else {
r.restartTriggered = true
}
return r
}

Expand Down Expand Up @@ -106,6 +115,7 @@ func (r *RestartTracker) GetState() (string, time.Duration) {
r.startErr = nil
r.waitRes = nil
r.restartTriggered = false
r.failure = false
}()

// Hot path if a restart was triggered
Expand Down Expand Up @@ -134,52 +144,29 @@ func (r *RestartTracker) GetState() (string, time.Duration) {
r.startTime = now
}

if r.startErr != nil {
return r.handleStartError()
} else if r.waitRes != nil {
return r.handleWaitResult()
// Handle restarts due to failures
if !r.failure {
return "", 0
}

return "", 0
}

// handleStartError returns the new state and potential wait duration for
// restarting the task after it was not successfully started. On start errors,
// the restart policy is always treated as fail mode to ensure we don't
// infinitely try to start a task.
func (r *RestartTracker) handleStartError() (string, time.Duration) {
// If the error is not recoverable, do not restart.
if !structs.IsRecoverable(r.startErr) {
r.reason = ReasonUnrecoverableErrror
return structs.TaskNotRestarting, 0
}

if r.count > r.policy.Attempts {
if r.policy.Mode == structs.RestartPolicyModeFail {
r.reason = fmt.Sprintf(
`Exceeded allowed attempts %d in interval %v and mode is "fail"`,
r.policy.Attempts, r.policy.Interval)
if r.startErr != nil {
// If the error is not recoverable, do not restart.
if !structs.IsRecoverable(r.startErr) {
r.reason = ReasonUnrecoverableErrror
return structs.TaskNotRestarting, 0
} else {
r.reason = ReasonDelay
return structs.TaskRestarting, r.getDelay()
}
} else if r.waitRes != nil {
// If the task started successfully and restart on success isn't specified,
// don't restart but don't mark as failed.
if r.waitRes.Successful() && !r.onSuccess {
r.reason = "Restart unnecessary as task terminated successfully"
return structs.TaskTerminated, 0
}
}

r.reason = ReasonWithinPolicy
return structs.TaskRestarting, r.jitter()
}

// handleWaitResult returns the new state and potential wait duration for
// restarting the task after it has exited.
func (r *RestartTracker) handleWaitResult() (string, time.Duration) {
// If the task started successfully and restart on success isn't specified,
// don't restart but don't mark as failed.
if r.waitRes.Successful() && !r.onSuccess {
r.reason = "Restart unnecessary as task terminated successfully"
return structs.TaskTerminated, 0
}

// If this task has been restarted due to failures more times
// than the restart policy allows within an interval fail
// according to the restart policy's mode.
if r.count > r.policy.Attempts {
if r.policy.Mode == structs.RestartPolicyModeFail {
r.reason = fmt.Sprintf(
Expand Down
15 changes: 14 additions & 1 deletion client/restarts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,24 @@ func TestClient_RestartTracker_RestartTriggered(t *testing.T) {
p := testPolicy(true, structs.RestartPolicyModeFail)
p.Attempts = 0
rt := newRestartTracker(p, structs.JobTypeService)
if state, when := rt.SetRestartTriggered().GetState(); state != structs.TaskRestarting && when != 0 {
if state, when := rt.SetRestartTriggered(false).GetState(); state != structs.TaskRestarting && when != 0 {
t.Fatalf("expect restart immediately, got %v %v", state, when)
}
}

func TestClient_RestartTracker_RestartTriggered_Failure(t *testing.T) {
t.Parallel()
p := testPolicy(true, structs.RestartPolicyModeFail)
p.Attempts = 1
rt := newRestartTracker(p, structs.JobTypeService)
if state, when := rt.SetRestartTriggered(true).GetState(); state != structs.TaskRestarting || when == 0 {
t.Fatalf("expect restart got %v %v", state, when)
}
if state, when := rt.SetRestartTriggered(true).GetState(); state != structs.TaskNotRestarting || when != 0 {
t.Fatalf("expect failed got %v %v", state, when)
}
}

func TestClient_RestartTracker_StartError_Recoverable_Fail(t *testing.T) {
t.Parallel()
p := testPolicy(true, structs.RestartPolicyModeFail)
Expand Down
Loading

0 comments on commit c1cf162

Please sign in to comment.