Skip to content

Commit

Permalink
Merge pull request #809 from hashicorp/b-failed-kill
Browse files Browse the repository at this point in the history
Killing a driver handle is retried with an exponential backoff
  • Loading branch information
dadgar committed Feb 17, 2016
2 parents 6752e36 + 48a97e0 commit da73f59
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 20 deletions.
3 changes: 3 additions & 0 deletions client/driver/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ func (h *execHandle) Update(task *structs.Task) error {

func (h *execHandle) Kill() error {
if err := h.executor.ShutDown(); err != nil {
if h.pluginClient.Exited() {
return nil
}
return fmt.Errorf("executor Shutdown failed: %v", err)
}

Expand Down
17 changes: 15 additions & 2 deletions client/driver/java.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,25 @@ func (h *javaHandle) Update(task *structs.Task) error {
}

func (h *javaHandle) Kill() error {
h.executor.ShutDown()
if err := h.executor.ShutDown(); err != nil {
if h.pluginClient.Exited() {
return nil
}
return fmt.Errorf("executor Shutdown failed: %v", err)
}

select {
case <-h.doneCh:
return nil
case <-time.After(h.killTimeout):
return h.executor.Exit()
if h.pluginClient.Exited() {
return nil
}
if err := h.executor.Exit(); err != nil {
return fmt.Errorf("executor Exit failed: %v", err)
}

return nil
}
}

Expand Down
9 changes: 4 additions & 5 deletions client/driver/java_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,10 @@ func TestJavaDriver_Start_Kill_Wait(t *testing.T) {
}
case <-time.After(time.Duration(testutil.TestMultiplier()*10) * time.Second):
t.Fatalf("timeout")
}

// need to kill long lived process
err = handle.Kill()
if err != nil {
t.Fatalf("Error: %s", err)
// Need to kill long lived process
if err = handle.Kill(); err != nil {
t.Fatalf("Error: %s", err)
}
}
}
17 changes: 15 additions & 2 deletions client/driver/qemu.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,25 @@ func (h *qemuHandle) Update(task *structs.Task) error {
// TODO: allow a 'shutdown_command' that can be executed over a ssh connection
// to the VM
func (h *qemuHandle) Kill() error {
h.executor.ShutDown()
if err := h.executor.ShutDown(); err != nil {
if h.pluginClient.Exited() {
return nil
}
return fmt.Errorf("executor Shutdown failed: %v", err)
}

select {
case <-h.doneCh:
return nil
case <-time.After(h.killTimeout):
return h.executor.Exit()
if h.pluginClient.Exited() {
return nil
}
if err := h.executor.Exit(); err != nil {
return fmt.Errorf("executor Exit failed: %v", err)
}

return nil
}
}

Expand Down
17 changes: 15 additions & 2 deletions client/driver/raw_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,25 @@ func (h *rawExecHandle) Update(task *structs.Task) error {
}

func (h *rawExecHandle) Kill() error {
h.executor.ShutDown()
if err := h.executor.ShutDown(); err != nil {
if h.pluginClient.Exited() {
return nil
}
return fmt.Errorf("executor Shutdown failed: %v", err)
}

select {
case <-h.doneCh:
return nil
case <-time.After(h.killTimeout):
return h.executor.Exit()
if h.pluginClient.Exited() {
return nil
}
if err := h.executor.Exit(); err != nil {
return fmt.Errorf("executor Exit failed: %v", err)
}

return nil
}
}

Expand Down
61 changes: 53 additions & 8 deletions client/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,20 @@ import (
cstructs "github.com/hashicorp/nomad/client/driver/structs"
)

const (
// killBackoffBaseline is the baseline time for exponential backoff while
// killing a task.
killBackoffBaseline = 5 * time.Second

// killBackoffLimit is the the limit of the exponential backoff for killing
// the task.
killBackoffLimit = 5 * time.Minute

// killFailureLimit is how many times we will attempt to kill a task before
// giving up and potentially leaking resources.
killFailureLimit = 10
)

// TaskRunner is used to wrap a task within an allocation and provide the execution context.
type TaskRunner struct {
config *config.Config
Expand Down Expand Up @@ -258,17 +272,23 @@ func (r *TaskRunner) run() {
r.logger.Printf("[ERR] client: update to task %q failed: %v", r.task.Name, err)
}
case <-r.destroyCh:
// Avoid destroying twice
if destroyed {
continue
// Kill the task using an exponential backoff in-case of failures.
destroySuccess, err := r.handleDestroy()
if !destroySuccess {
// We couldn't successfully destroy the resource created.
r.logger.Printf("[ERR] client: failed to kill task %q. Resources may have been leaked: %v", r.task.Name, err)
} else {
// Wait for the task to exit but cap the time to ensure we don't block.
select {
case waitRes = <-r.handle.WaitCh():
case <-time.After(3 * time.Second):
}
}

// Send the kill signal, and use the WaitCh to block until complete
if err := r.handle.Kill(); err != nil {
r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err)
destroyErr = err
}
// Store that the task has been destroyed and any associated error.
destroyed = true
destroyErr = err
break OUTER
}
}

Expand Down Expand Up @@ -382,6 +402,31 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
return mErr.ErrorOrNil()
}

// handleDestroy kills the task handle. In the case that killing fails,
// handleDestroy will retry with an exponential backoff and will give up at a
// given limit. It returns whether the task was destroyed and the error
// associated with the last kill attempt.
func (r *TaskRunner) handleDestroy() (destroyed bool, err error) {
// Cap the number of times we attempt to kill the task.
for i := 0; i < killFailureLimit; i++ {
if err = r.handle.Kill(); err != nil {
// Calculate the new backoff
backoff := (1 << (2 * uint64(i))) * killBackoffBaseline
if backoff > killBackoffLimit {
backoff = killBackoffLimit
}

r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc %q. Retrying in %v: %v",
r.task.Name, r.alloc.ID, backoff, err)
time.Sleep(time.Duration(backoff))
} else {
// Kill was successful
return true, nil
}
}
return
}

// Helper function for converting a WaitResult into a TaskTerminated event.
func (r *TaskRunner) waitErrorToEvent(res *cstructs.WaitResult) *structs.TaskEvent {
return structs.NewTaskEvent(structs.TaskTerminated).
Expand Down
1 change: 0 additions & 1 deletion nomad/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,6 @@ func (w *Worker) shouldResubmit(err error) bool {
// backoffErr is used to do an exponential back off on error. This is
// maintained statefully for the worker. Returns if attempts should be
// abandoneded due to shutdown.
// be made or abandoned.
func (w *Worker) backoffErr(base, limit time.Duration) bool {
backoff := (1 << (2 * w.failures)) * base
if backoff > limit {
Expand Down

0 comments on commit da73f59

Please sign in to comment.