Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Killing a driver handle is retried with an exponential backoff #809

Merged
merged 2 commits into from
Feb 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions client/driver/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ func (h *execHandle) Update(task *structs.Task) error {

func (h *execHandle) Kill() error {
if err := h.executor.ShutDown(); err != nil {
if h.pluginClient.Exited() {
return nil
}
return fmt.Errorf("executor Shutdown failed: %v", err)
}

Expand Down
17 changes: 15 additions & 2 deletions client/driver/java.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,25 @@ func (h *javaHandle) Update(task *structs.Task) error {
}

func (h *javaHandle) Kill() error {
h.executor.ShutDown()
if err := h.executor.ShutDown(); err != nil {
if h.pluginClient.Exited() {
return nil
}
return fmt.Errorf("executor Shutdown failed: %v", err)
}

select {
case <-h.doneCh:
return nil
case <-time.After(h.killTimeout):
return h.executor.Exit()
if h.pluginClient.Exited() {
return nil
}
if err := h.executor.Exit(); err != nil {
return fmt.Errorf("executor Exit failed: %v", err)
}

return nil
}
}

Expand Down
9 changes: 4 additions & 5 deletions client/driver/java_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,10 @@ func TestJavaDriver_Start_Kill_Wait(t *testing.T) {
}
case <-time.After(time.Duration(testutil.TestMultiplier()*10) * time.Second):
t.Fatalf("timeout")
}

// need to kill long lived process
err = handle.Kill()
if err != nil {
t.Fatalf("Error: %s", err)
// Need to kill long lived process
if err = handle.Kill(); err != nil {
t.Fatalf("Error: %s", err)
}
}
}
17 changes: 15 additions & 2 deletions client/driver/qemu.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,25 @@ func (h *qemuHandle) Update(task *structs.Task) error {
// TODO: allow a 'shutdown_command' that can be executed over a ssh connection
// to the VM
func (h *qemuHandle) Kill() error {
h.executor.ShutDown()
if err := h.executor.ShutDown(); err != nil {
if h.pluginClient.Exited() {
return nil
}
return fmt.Errorf("executor Shutdown failed: %v", err)
}

select {
case <-h.doneCh:
return nil
case <-time.After(h.killTimeout):
return h.executor.Exit()
if h.pluginClient.Exited() {
return nil
}
if err := h.executor.Exit(); err != nil {
return fmt.Errorf("executor Exit failed: %v", err)
}

return nil
}
}

Expand Down
17 changes: 15 additions & 2 deletions client/driver/raw_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,25 @@ func (h *rawExecHandle) Update(task *structs.Task) error {
}

func (h *rawExecHandle) Kill() error {
h.executor.ShutDown()
if err := h.executor.ShutDown(); err != nil {
if h.pluginClient.Exited() {
return nil
}
return fmt.Errorf("executor Shutdown failed: %v", err)
}

select {
case <-h.doneCh:
return nil
case <-time.After(h.killTimeout):
return h.executor.Exit()
if h.pluginClient.Exited() {
return nil
}
if err := h.executor.Exit(); err != nil {
return fmt.Errorf("executor Exit failed: %v", err)
}

return nil
}
}

Expand Down
61 changes: 53 additions & 8 deletions client/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,20 @@ import (
cstructs "github.com/hashicorp/nomad/client/driver/structs"
)

const (
// killBackoffBaseline is the baseline time for exponential backoff while
// killing a task.
killBackoffBaseline = 5 * time.Second

// killBackoffLimit is the the limit of the exponential backoff for killing
// the task.
killBackoffLimit = 5 * time.Minute

// killFailureLimit is how many times we will attempt to kill a task before
// giving up and potentially leaking resources.
killFailureLimit = 10
)

// TaskRunner is used to wrap a task within an allocation and provide the execution context.
type TaskRunner struct {
config *config.Config
Expand Down Expand Up @@ -258,17 +272,23 @@ func (r *TaskRunner) run() {
r.logger.Printf("[ERR] client: update to task %q failed: %v", r.task.Name, err)
}
case <-r.destroyCh:
// Avoid destroying twice
if destroyed {
continue
// Kill the task using an exponential backoff in-case of failures.
destroySuccess, err := r.handleDestroy()
if !destroySuccess {
// We couldn't successfully destroy the resource created.
r.logger.Printf("[ERR] client: failed to kill task %q. Resources may have been leaked: %v", r.task.Name, err)
} else {
// Wait for the task to exit but cap the time to ensure we don't block.
select {
case waitRes = <-r.handle.WaitCh():
case <-time.After(3 * time.Second):
}
}

// Send the kill signal, and use the WaitCh to block until complete
if err := r.handle.Kill(); err != nil {
r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err)
destroyErr = err
}
// Store that the task has been destroyed and any associated error.
destroyed = true
destroyErr = err
break OUTER
}
}

Expand Down Expand Up @@ -382,6 +402,31 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
return mErr.ErrorOrNil()
}

// handleDestroy kills the task handle. In the case that killing fails,
// handleDestroy will retry with an exponential backoff and will give up at a
// given limit. It returns whether the task was destroyed and the error
// associated with the last kill attempt.
func (r *TaskRunner) handleDestroy() (destroyed bool, err error) {
// Cap the number of times we attempt to kill the task.
for i := 0; i < killFailureLimit; i++ {
if err = r.handle.Kill(); err != nil {
// Calculate the new backoff
backoff := (1 << (2 * uint64(i))) * killBackoffBaseline
if backoff > killBackoffLimit {
backoff = killBackoffLimit
}

r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc %q. Retrying in %v: %v",
r.task.Name, r.alloc.ID, backoff, err)
time.Sleep(time.Duration(backoff))
} else {
// Kill was successful
return true, nil
}
}
return
}

// Helper function for converting a WaitResult into a TaskTerminated event.
func (r *TaskRunner) waitErrorToEvent(res *cstructs.WaitResult) *structs.TaskEvent {
return structs.NewTaskEvent(structs.TaskTerminated).
Expand Down
1 change: 0 additions & 1 deletion nomad/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,6 @@ func (w *Worker) shouldResubmit(err error) bool {
// backoffErr is used to do an exponential back off on error. This is
// maintained statefully for the worker. Returns if attempts should be
// abandoneded due to shutdown.
// be made or abandoned.
func (w *Worker) backoffErr(base, limit time.Duration) bool {
backoff := (1 << (2 * w.failures)) * base
if backoff > limit {
Expand Down