Skip to content

Commit

Permalink
Merge pull request #6722 from hashicorp/b-always-destroy-executor
Browse files Browse the repository at this point in the history
Always destroy exec container on cleanup
  • Loading branch information
Mahmood Ali authored Nov 19, 2019
2 parents 70a73db + bdef161 commit 6907748
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ BUG FIXES:
* cli: Make scoring column orders consistent `nomad alloc status` [[GH-6609](https://github.com/hashicorp/nomad/issues/6609)]
* cli: Fixed a bug where a cli user may fail to query FS/Allocation API endpoints if they lack `node:read` capability [[GH-6423](https://github.com/hashicorp/nomad/issues/6423)]
* client: Fixed a bug where a client may not restart dead internal processes upon client's restart on Windows [[GH-6426](https://github.com/hashicorp/nomad/issues/6426)]
* driver/exec: Fixed a bug where exec tasks can spawn processes that live beyond task lifecycle [[GH-6722](https://github.com/hashicorp/nomad/issues/6722)]
* driver/docker: Added mechanism for detecting running unexpectedly running docker containers [[GH-6325](https://github.com/hashicorp/nomad/issues/6325)]
* nomad: Multiple connect enabled services in the same taskgroup failed to
register [[GH-6646](https://github.com/hashicorp/nomad/issues/6646)]
Expand Down
6 changes: 2 additions & 4 deletions drivers/exec/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,10 +458,8 @@ func (d *Driver) DestroyTask(taskID string, force bool) error {
}

if !handle.pluginClient.Exited() {
if handle.IsRunning() {
if err := handle.exec.Shutdown("", 0); err != nil {
handle.logger.Error("destroying executor failed", "err", err)
}
if err := handle.exec.Shutdown("", 0); err != nil {
handle.logger.Error("destroying executor failed", "err", err)
}

handle.pluginClient.Kill()
Expand Down
103 changes: 103 additions & 0 deletions drivers/exec/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (
"io/ioutil"
"os"
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"syscall"
"testing"
"time"

Expand Down Expand Up @@ -251,6 +254,106 @@ func TestExecDriver_StartWaitRecover(t *testing.T) {
require.NoError(harness.DestroyTask(task.ID, true))
}

// TestExecDriver_DestroyKillsAll asserts that when TaskDestroy is called all
// task processes are cleaned up.
func TestExecDriver_DestroyKillsAll(t *testing.T) {
t.Parallel()
require := require.New(t)
ctestutils.ExecCompatible(t)

d := NewExecDriver(testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
defer harness.Kill()

task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "test",
}

cleanup := harness.MkAllocDir(task, true)
defer cleanup()

taskConfig := map[string]interface{}{}
taskConfig["command"] = "/bin/sh"
taskConfig["args"] = []string{"-c", fmt.Sprintf(`sleep 3600 & echo "SLEEP_PID=$!"`)}

require.NoError(task.EncodeConcreteDriverConfig(&taskConfig))

handle, _, err := harness.StartTask(task)
require.NoError(err)
defer harness.DestroyTask(task.ID, true)

ch, err := harness.WaitTask(context.Background(), handle.Config.ID)
require.NoError(err)

select {
case result := <-ch:
require.True(result.Successful(), "command failed: %#v", result)
case <-time.After(10 * time.Second):
require.Fail("timeout waiting for task to shutdown")
}

sleepPid := 0

// Ensure that the task is marked as dead, but account
// for WaitTask() closing channel before internal state is updated
testutil.WaitForResult(func() (bool, error) {
stdout, err := ioutil.ReadFile(filepath.Join(task.TaskDir().LogDir, "test.stdout.0"))
if err != nil {
return false, fmt.Errorf("failed to output pid file: %v", err)
}

pidMatch := regexp.MustCompile(`SLEEP_PID=(\d+)`).FindStringSubmatch(string(stdout))
if len(pidMatch) != 2 {
return false, fmt.Errorf("failed to find pid in %s", string(stdout))
}

pid, err := strconv.Atoi(pidMatch[1])
if err != nil {
return false, fmt.Errorf("pid parts aren't int: %s", pidMatch[1])
}

sleepPid = pid
return true, nil
}, func(err error) {
require.NoError(err)
})

// isProcessRunning returns an error if process is not running
isProcessRunning := func(pid int) error {
process, err := os.FindProcess(pid)
if err != nil {
return fmt.Errorf("failed to find process: %s", err)
}

err = process.Signal(syscall.Signal(0))
if err != nil {
return fmt.Errorf("failed to signal process: %s", err)
}

return nil
}

require.NoError(isProcessRunning(sleepPid))

require.NoError(harness.DestroyTask(task.ID, true))

testutil.WaitForResult(func() (bool, error) {
err := isProcessRunning(sleepPid)
if err == nil {
return false, fmt.Errorf("child process is still running")
}

if !strings.Contains(err.Error(), "failed to signal process") {
return false, fmt.Errorf("unexpected error: %v", err)
}

return true, nil
}, func(err error) {
require.NoError(err)
})
}

func TestExecDriver_Stats(t *testing.T) {
t.Parallel()
require := require.New(t)
Expand Down
6 changes: 2 additions & 4 deletions drivers/java/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,10 +486,8 @@ func (d *Driver) DestroyTask(taskID string, force bool) error {
}

if !handle.pluginClient.Exited() {
if handle.IsRunning() {
if err := handle.exec.Shutdown("", 0); err != nil {
handle.logger.Error("destroying executor failed", "err", err)
}
if err := handle.exec.Shutdown("", 0); err != nil {
handle.logger.Error("destroying executor failed", "err", err)
}

handle.pluginClient.Kill()
Expand Down
6 changes: 2 additions & 4 deletions drivers/qemu/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,10 +534,8 @@ func (d *Driver) DestroyTask(taskID string, force bool) error {
}

if !handle.pluginClient.Exited() {
if handle.IsRunning() {
if err := handle.exec.Shutdown("", 0); err != nil {
handle.logger.Error("destroying executor failed", "err", err)
}
if err := handle.exec.Shutdown("", 0); err != nil {
handle.logger.Error("destroying executor failed", "err", err)
}

handle.pluginClient.Kill()
Expand Down
6 changes: 2 additions & 4 deletions drivers/rawexec/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,10 +460,8 @@ func (d *Driver) DestroyTask(taskID string, force bool) error {
}

if !handle.pluginClient.Exited() {
if handle.IsRunning() {
if err := handle.exec.Shutdown("", 0); err != nil {
handle.logger.Error("destroying executor failed", "err", err)
}
if err := handle.exec.Shutdown("", 0); err != nil {
handle.logger.Error("destroying executor failed", "err", err)
}

handle.pluginClient.Kill()
Expand Down
109 changes: 109 additions & 0 deletions drivers/rawexec/driver_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ package rawexec

import (
"context"
"os"
"regexp"
"runtime"
"strconv"
"syscall"
"testing"

"fmt"
Expand Down Expand Up @@ -196,6 +200,111 @@ func TestRawExecDriver_StartWaitStop(t *testing.T) {
require.NoError(harness.DestroyTask(task.ID, true))
}

// TestRawExecDriver_DestroyKillsAll asserts that when TaskDestroy is called all
// task processes are cleaned up.
func TestRawExecDriver_DestroyKillsAll(t *testing.T) {
t.Parallel()

// This only works reliably with cgroup PID tracking, happens in linux only
if runtime.GOOS != "linux" {
t.Skip("Linux only test")
}

require := require.New(t)

d := newEnabledRawExecDriver(t)
harness := dtestutil.NewDriverHarness(t, d)
defer harness.Kill()

task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "test",
}

cleanup := harness.MkAllocDir(task, true)
defer cleanup()

taskConfig := map[string]interface{}{}
taskConfig["command"] = "/bin/sh"
taskConfig["args"] = []string{"-c", fmt.Sprintf(`sleep 3600 & echo "SLEEP_PID=$!"`)}

require.NoError(task.EncodeConcreteDriverConfig(&taskConfig))

handle, _, err := harness.StartTask(task)
require.NoError(err)
defer harness.DestroyTask(task.ID, true)

ch, err := harness.WaitTask(context.Background(), handle.Config.ID)
require.NoError(err)

select {
case result := <-ch:
require.True(result.Successful(), "command failed: %#v", result)
case <-time.After(10 * time.Second):
require.Fail("timeout waiting for task to shutdown")
}

sleepPid := 0

// Ensure that the task is marked as dead, but account
// for WaitTask() closing channel before internal state is updated
testutil.WaitForResult(func() (bool, error) {
stdout, err := ioutil.ReadFile(filepath.Join(task.TaskDir().LogDir, "test.stdout.0"))
if err != nil {
return false, fmt.Errorf("failed to output pid file: %v", err)
}

pidMatch := regexp.MustCompile(`SLEEP_PID=(\d+)`).FindStringSubmatch(string(stdout))
if len(pidMatch) != 2 {
return false, fmt.Errorf("failed to find pid in %s", string(stdout))
}

pid, err := strconv.Atoi(pidMatch[1])
if err != nil {
return false, fmt.Errorf("pid parts aren't int: %s", pidMatch[1])
}

sleepPid = pid
return true, nil
}, func(err error) {
require.NoError(err)
})

// isProcessRunning returns an error if process is not running
isProcessRunning := func(pid int) error {
process, err := os.FindProcess(pid)
if err != nil {
return fmt.Errorf("failed to find process: %s", err)
}

err = process.Signal(syscall.Signal(0))
if err != nil {
return fmt.Errorf("failed to signal process: %s", err)
}

return nil
}

require.NoError(isProcessRunning(sleepPid))

require.NoError(harness.DestroyTask(task.ID, true))

testutil.WaitForResult(func() (bool, error) {
err := isProcessRunning(sleepPid)
if err == nil {
return false, fmt.Errorf("child process is still running")
}

if !strings.Contains(err.Error(), "failed to signal process") {
return false, fmt.Errorf("unexpected error: %v", err)
}

return true, nil
}, func(err error) {
require.NoError(err)
})
}

func TestRawExec_ExecTaskStreaming(t *testing.T) {
t.Parallel()
if runtime.GOOS == "darwin" {
Expand Down
6 changes: 2 additions & 4 deletions drivers/rkt/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,10 +810,8 @@ func (d *Driver) DestroyTask(taskID string, force bool) error {
}

if !handle.pluginClient.Exited() {
if handle.IsRunning() {
if err := handle.exec.Shutdown("", 0); err != nil {
handle.logger.Error("destroying executor failed", "err", err)
}
if err := handle.exec.Shutdown("", 0); err != nil {
handle.logger.Error("destroying executor failed", "err", err)
}

handle.pluginClient.Kill()
Expand Down

0 comments on commit 6907748

Please sign in to comment.