From f1c80afcf3cdebbefc2060cef430d13a40815839 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 16 Feb 2016 21:00:49 -0800 Subject: [PATCH 1/2] Killing a driver handle is retried with an exponential backoff --- client/driver/exec.go | 3 ++ client/driver/java.go | 17 +++++++++-- client/driver/qemu.go | 17 +++++++++-- client/driver/raw_exec.go | 17 +++++++++-- client/task_runner.go | 61 ++++++++++++++++++++++++++++++++++----- nomad/worker.go | 1 - 6 files changed, 101 insertions(+), 15 deletions(-) diff --git a/client/driver/exec.go b/client/driver/exec.go index c00a483b956..54b0b33b9e9 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -232,6 +232,9 @@ func (h *execHandle) Update(task *structs.Task) error { func (h *execHandle) Kill() error { if err := h.executor.ShutDown(); err != nil { + if h.pluginClient.Exited() { + return nil + } return fmt.Errorf("executor Shutdown failed: %v", err) } diff --git a/client/driver/java.go b/client/driver/java.go index f7d724efce6..3c5f441a757 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -282,12 +282,25 @@ func (h *javaHandle) Update(task *structs.Task) error { } func (h *javaHandle) Kill() error { - h.executor.ShutDown() + if err := h.executor.ShutDown(); err != nil { + if h.pluginClient.Exited() { + return nil + } + return fmt.Errorf("executor Shutdown failed: %v", err) + } + select { case <-h.doneCh: return nil case <-time.After(h.killTimeout): - return h.executor.Exit() + if h.pluginClient.Exited() { + return nil + } + if err := h.executor.Exit(); err != nil { + return fmt.Errorf("executor Exit failed: %v", err) + } + + return nil } } diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 54e28bbbddd..fbff86013e8 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -305,12 +305,25 @@ func (h *qemuHandle) Update(task *structs.Task) error { // TODO: allow a 'shutdown_command' that can be executed over a ssh connection // to the VM func (h *qemuHandle) Kill() error { - h.executor.ShutDown() + if err := h.executor.ShutDown(); err != nil { + if h.pluginClient.Exited() { + return nil + } + return fmt.Errorf("executor Shutdown failed: %v", err) + } + select { case <-h.doneCh: return nil case <-time.After(h.killTimeout): - return h.executor.Exit() + if h.pluginClient.Exited() { + return nil + } + if err := h.executor.Exit(); err != nil { + return fmt.Errorf("executor Exit failed: %v", err) + } + + return nil } } diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 6c8adace2fa..31e25fd5b0a 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -207,12 +207,25 @@ func (h *rawExecHandle) Update(task *structs.Task) error { } func (h *rawExecHandle) Kill() error { - h.executor.ShutDown() + if err := h.executor.ShutDown(); err != nil { + if h.pluginClient.Exited() { + return nil + } + return fmt.Errorf("executor Shutdown failed: %v", err) + } + select { case <-h.doneCh: return nil case <-time.After(h.killTimeout): - return h.executor.Exit() + if h.pluginClient.Exited() { + return nil + } + if err := h.executor.Exit(); err != nil { + return fmt.Errorf("executor Exit failed: %v", err) + } + + return nil } } diff --git a/client/task_runner.go b/client/task_runner.go index 226b9de8afa..3d1b6f5dd43 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -19,6 +19,20 @@ import ( cstructs "github.com/hashicorp/nomad/client/driver/structs" ) +const ( + // killBackoffBaseline is the baseline time for exponential backoff while + // killing a task. + killBackoffBaseline = 5 * time.Second + + // killBackoffLimit is the the limit of the exponential backoff for killing + // the task. + killBackoffLimit = 5 * time.Minute + + // killFailureLimit is how many times we will attempt to kill a task before + // giving up and potentially leaking resources. + killFailureLimit = 10 +) + // TaskRunner is used to wrap a task within an allocation and provide the execution context. type TaskRunner struct { config *config.Config @@ -258,17 +272,23 @@ func (r *TaskRunner) run() { r.logger.Printf("[ERR] client: update to task %q failed: %v", r.task.Name, err) } case <-r.destroyCh: - // Avoid destroying twice - if destroyed { - continue + // Kill the task using an exponential backoff in-case of failures. + destroySuccess, err := r.handleDestroy() + if !destroySuccess { + // We couldn't successfully destroy the resource created. + r.logger.Printf("[ERR] client: failed to kill task %q. Resources may have been leaked: %v", r.task.Name, err) + } else { + // Wait for the task to exit but cap the time to ensure we don't block. + select { + case waitRes = <-r.handle.WaitCh(): + case <-time.After(3 * time.Second): + } } - // Send the kill signal, and use the WaitCh to block until complete - if err := r.handle.Kill(); err != nil { - r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err) - destroyErr = err - } + // Store that the task has been destroyed and any associated error. destroyed = true + destroyErr = err + break OUTER } } @@ -382,6 +402,31 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error { return mErr.ErrorOrNil() } +// handleDestroy kills the task handle. In the case that killing fails, +// handleDestroy will retry with an exponential backoff and will give up at a +// given limit. It returns whether the task was destroyed and the error +// associated with the last kill attempt. +func (r *TaskRunner) handleDestroy() (destroyed bool, err error) { + // Cap the number of times we attempt to kill the task. + for i := 0; i < killFailureLimit; i++ { + if err = r.handle.Kill(); err != nil { + // Calculate the new backoff + backoff := (1 << (2 * uint64(i))) * killBackoffBaseline + if backoff > killBackoffLimit { + backoff = killBackoffLimit + } + + r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc %q. Retrying in %v: %v", + r.task.Name, r.alloc.ID, backoff, err) + time.Sleep(time.Duration(backoff)) + } else { + // Kill was successful + return true, nil + } + } + return +} + // Helper function for converting a WaitResult into a TaskTerminated event. func (r *TaskRunner) waitErrorToEvent(res *cstructs.WaitResult) *structs.TaskEvent { return structs.NewTaskEvent(structs.TaskTerminated). diff --git a/nomad/worker.go b/nomad/worker.go index 21a7d174625..1bae409d59c 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -413,7 +413,6 @@ func (w *Worker) shouldResubmit(err error) bool { // backoffErr is used to do an exponential back off on error. This is // maintained statefully for the worker. Returns if attempts should be // abandoneded due to shutdown. -// be made or abandoned. func (w *Worker) backoffErr(base, limit time.Duration) bool { backoff := (1 << (2 * w.failures)) * base if backoff > limit { From 48a97e0a9e7702fadb90222f4f56d0751df7b499 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 17 Feb 2016 10:04:19 -0800 Subject: [PATCH 2/2] fix java test --- client/driver/java_test.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/client/driver/java_test.go b/client/driver/java_test.go index 7b3fe507694..cc342101483 100644 --- a/client/driver/java_test.go +++ b/client/driver/java_test.go @@ -191,11 +191,10 @@ func TestJavaDriver_Start_Kill_Wait(t *testing.T) { } case <-time.After(time.Duration(testutil.TestMultiplier()*10) * time.Second): t.Fatalf("timeout") - } - // need to kill long lived process - err = handle.Kill() - if err != nil { - t.Fatalf("Error: %s", err) + // Need to kill long lived process + if err = handle.Kill(); err != nil { + t.Fatalf("Error: %s", err) + } } }