Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of [gh-6980] Client: clean up old allocs before running new ones using the exec task driver. into release/1.7.x #20584

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/20500.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
client: terminate old exec task processes before starting new ones, to avoid accidentally leaving running processes in case of an error
```
4 changes: 4 additions & 0 deletions client/lib/cgroupslib/editor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ const (
root = "/sys/fs/cgroup"
)

func GetDefaultRoot() string {
return root
}

// OpenPath creates a handle for modifying cgroup interface files under
// the given directory.
//
Expand Down
59 changes: 58 additions & 1 deletion drivers/shared/executor/executor_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"io"
"os"
"os/exec"
"os/signal"
"path"
"path/filepath"
"strings"
Expand Down Expand Up @@ -78,17 +79,43 @@ type LibcontainerExecutor struct {
userProc *libcontainer.Process
userProcExited chan interface{}
exitState *ProcessState
sigChan chan os.Signal
}

func (l *LibcontainerExecutor) catchSignals() {
l.logger.Trace("waiting for signals")
defer signal.Stop(l.sigChan)
defer close(l.sigChan)

signal.Notify(l.sigChan, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGSEGV)
for {
signal := <-l.sigChan
if signal == syscall.SIGTERM || signal == syscall.SIGINT {
l.Shutdown("SIGINT", 0)
break
}

if l.container != nil {
l.container.Signal(signal, false)
}
}
}

func NewExecutorWithIsolation(logger hclog.Logger, compute cpustats.Compute) Executor {
sigch := make(chan os.Signal, 4)

le := &LibcontainerExecutor{
id: strings.ReplaceAll(uuid.Generate(), "-", "_"),
logger: logger.Named("isolated_executor"),
compute: compute,
totalCpuStats: cpustats.New(compute),
userCpuStats: cpustats.New(compute),
systemCpuStats: cpustats.New(compute),
sigChan: sigch,
}

go le.catchSignals()

le.processStats = procstats.New(compute, le)
return le
}
Expand All @@ -97,6 +124,34 @@ func (l *LibcontainerExecutor) ListProcesses() *set.Set[int] {
return procstats.List(l.command)
}

// cleanOldProcessesInCGroup kills processes that might ended up orphans when the
// executor was unexpectedly killed and nomad can't reconnect to them.
func (l *LibcontainerExecutor) cleanOldProcessesInCGroup(nomadRelativePath string) {
l.logger.Debug("looking for old processes", "path", nomadRelativePath)

root := cgroupslib.GetDefaultRoot()
orphansPIDs, err := cgroups.GetAllPids(filepath.Join(root, nomadRelativePath))
if err != nil {
l.logger.Error("unable to get orphaned task PIDs", "error", err)
return
}

for _, pid := range orphansPIDs {
l.logger.Info("killing orphaned process", "pid", pid)

// Avoid bringing down the whole node by mistake, very unlikely case,
// but it's better to be sure.
if pid == 1 {
continue
}

err := syscall.Kill(pid, syscall.SIGKILL)
if err != nil {
l.logger.Error("unable to send signal to process", "pid", pid, "error", err)
}
}
}

// Launch creates a new container in libcontainer and starts a new process with it
func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, error) {
l.logger.Trace("preparing to launch command", "command", command.Cmd, "args", strings.Join(command.Args, " "))
Expand Down Expand Up @@ -127,6 +182,7 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro
return nil, fmt.Errorf("failed to configure container(%s): %v", l.id, err)
}

l.cleanOldProcessesInCGroup(containerCfg.Cgroups.Path)
container, err := factory.Create(l.id, containerCfg)
if err != nil {
return nil, fmt.Errorf("failed to create container(%s): %v", l.id, err)
Expand Down Expand Up @@ -166,6 +222,7 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro
if command.User != "" {
process.User = command.User
}

l.userProc = process

l.totalCpuStats = cpustats.New(l.compute)
Expand All @@ -187,7 +244,6 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro
// start a goroutine to wait on the process to complete, so Wait calls can
// be multiplexed
l.userProcExited = make(chan interface{})

go l.wait()

return &ProcessState{
Expand Down Expand Up @@ -779,6 +835,7 @@ func (l *LibcontainerExecutor) configureCG2(cfg *runc.Config, command *ExecComma

func (l *LibcontainerExecutor) newLibcontainerConfig(command *ExecCommand) (*runc.Config, error) {
cfg := &runc.Config{
ParentDeathSignal: 9,
Cgroups: &runc.Cgroup{
Resources: &runc.Resources{
MemorySwappiness: nil,
Expand Down
117 changes: 117 additions & 0 deletions drivers/shared/executor/executor_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"regexp"
"strconv"
"strings"
"syscall"
"testing"
"time"

Expand All @@ -26,6 +28,7 @@ import (
tu "github.com/hashicorp/nomad/testutil"
lconfigs "github.com/opencontainers/runc/libcontainer/configs"
"github.com/opencontainers/runc/libcontainer/devices"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -792,3 +795,117 @@ func TestExecutor_cmdMounts(t *testing.T) {

require.EqualValues(t, expected, cmdMounts(input))
}

func createCGroup(fullpath string) (cgroupslib.Interface, error) {
if err := os.MkdirAll(fullpath, 0755); err != nil {
return nil, err
}

return cgroupslib.OpenPath(fullpath), nil
}

func TestExecutor_CleanOldProcessesInCGroup(t *testing.T) {
ci.Parallel(t)

testutil.ExecCompatible(t)
testutil.CgroupsCompatible(t)

testExecCmd := testExecutorCommandWithChroot(t)

allocDir := testExecCmd.allocDir
defer allocDir.Destroy()

fullCGroupPath := testExecCmd.command.Resources.LinuxResources.CpusetCgroupPath

execCmd := testExecCmd.command
execCmd.Cmd = "/bin/sleep"
execCmd.Args = []string{"1"}
execCmd.ResourceLimits = true
execCmd.ModePID = "private"
execCmd.ModeIPC = "private"

// Create the CGroup the executor's command will run in and populate it with one process
cgInterface, err := createCGroup(fullCGroupPath)
must.NoError(t, err)

cmd := exec.Command("/bin/sleep", "3000")
err = cmd.Start()
must.NoError(t, err)

go func() {
err := cmd.Wait()
//This process will be killed by the executor as a prerequisite to run
// the executors command.
must.Error(t, err)
}()

pid := cmd.Process.Pid
must.Positive(t, pid)

err = cgInterface.Write("cgroup.procs", strconv.Itoa(pid))
must.NoError(t, err)

pids, err := cgInterface.PIDs()
must.NoError(t, err)
must.One(t, pids.Size())

// Run the executor normally and make sure the process that was originally running
// as part of the CGroup was killed, and only the executor's process is running.
execInterface := NewExecutorWithIsolation(testlog.HCLogger(t), compute)
executor := execInterface.(*LibcontainerExecutor)
defer executor.Shutdown("SIGKILL", 0)

ps, err := executor.Launch(execCmd)
must.NoError(t, err)
must.Positive(t, ps.Pid)

pids, err = cgInterface.PIDs()
must.NoError(t, err)
must.One(t, pids.Size())
must.True(t, pids.Contains(ps.Pid))
must.False(t, pids.Contains(pid))

estate, err := executor.Wait(context.Background())
must.NoError(t, err)
must.Zero(t, estate.ExitCode)

must.NoError(t, executor.Shutdown("", 0))
executor.Wait(context.Background())
}

func TestExecutor_SignalCatching(t *testing.T) {
ci.Parallel(t)

testutil.ExecCompatible(t)
testutil.CgroupsCompatible(t)

testExecCmd := testExecutorCommandWithChroot(t)

allocDir := testExecCmd.allocDir
defer allocDir.Destroy()

execCmd := testExecCmd.command
execCmd.Cmd = "/bin/sleep"
execCmd.Args = []string{"100"}
execCmd.ResourceLimits = true
execCmd.ModePID = "private"
execCmd.ModeIPC = "private"

execInterface := NewExecutorWithIsolation(testlog.HCLogger(t), compute)

ps, err := execInterface.Launch(execCmd)
must.NoError(t, err)
must.Positive(t, ps.Pid)

executor := execInterface.(*LibcontainerExecutor)
status, err := executor.container.OCIState()
must.NoError(t, err)
must.Eq(t, specs.StateRunning, status.Status)

executor.sigChan <- syscall.SIGTERM
time.Sleep(1 * time.Second)

status, err = executor.container.OCIState()
must.NoError(t, err)
must.Eq(t, specs.StateStopped, status.Status)
}
Loading