Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[gh-6980] Client: clean up old allocs before running new ones using the exec task driver. #20500

Merged
merged 24 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1c49e81
func: set nomad as subreaper, clean processes in cgroup and wait for …
Juanadelacuesta Apr 30, 2024
08ad0fa
style: clean up the orphans function
Juanadelacuesta Apr 30, 2024
52a96b2
func: comment out subreaper asignation
Juanadelacuesta Apr 30, 2024
34c633e
func: remove the subreaper
Juanadelacuesta May 1, 2024
bba68b0
style: clean up logs
Juanadelacuesta May 3, 2024
bdfef99
func: add testing for the cleaning of olf processes
Juanadelacuesta May 8, 2024
4d58b78
func: add extra test to make sure the old process is dead
Juanadelacuesta May 8, 2024
25fd974
func: Add changelog
Juanadelacuesta May 8, 2024
a10f608
Update drivers/shared/executor/executor_linux.go
Juanadelacuesta May 10, 2024
ae45696
Update drivers/shared/executor/executor_linux.go
Juanadelacuesta May 10, 2024
b1c347c
Update drivers/shared/executor/executor_linux.go
Juanadelacuesta May 10, 2024
48406ce
Update drivers/shared/executor/executor_linux.go
Juanadelacuesta May 10, 2024
3ba9e85
Update client/lib/cgroupslib/editor.go
Juanadelacuesta May 10, 2024
bdd0948
Update .changelog/20500.txt
Juanadelacuesta May 10, 2024
2c901fe
Update drivers/shared/executor/executor_linux.go
Juanadelacuesta May 10, 2024
2f61a97
style: change message to debug for killing orphans
Juanadelacuesta May 10, 2024
a1c635e
func: Move the catch signals to the executor creation
Juanadelacuesta May 10, 2024
312dcfc
func: add test for catching signals
Juanadelacuesta May 10, 2024
3f1a238
typo
Juanadelacuesta May 10, 2024
ec9dade
func: Check the process number before kill it, to avoid shuting done …
Juanadelacuesta May 10, 2024
2f50dbb
Update drivers/shared/executor/executor_linux.go
Juanadelacuesta May 10, 2024
d6017b4
style: add tag to logger
Juanadelacuesta May 10, 2024
9d6b62e
fix: move back ciliums version
Juanadelacuesta May 13, 2024
bf0a7f5
Update drivers/shared/executor/executor_linux.go
Juanadelacuesta May 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/20500.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
client: terminate old exec task processes before starting new ones, to avoid accidentally leaving running processes in case of an error
```
4 changes: 4 additions & 0 deletions client/lib/cgroupslib/editor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ const (
root = "/sys/fs/cgroup"
)

func GetDefaultRoot() string {
return root
}

// OpenPath creates a handle for modifying cgroup interface files under
// the given directory.
//
Expand Down
59 changes: 58 additions & 1 deletion drivers/shared/executor/executor_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"io"
"os"
"os/exec"
"os/signal"
"path"
"path/filepath"
"strings"
Expand Down Expand Up @@ -78,17 +79,43 @@ type LibcontainerExecutor struct {
userProc *libcontainer.Process
userProcExited chan interface{}
exitState *ProcessState
sigChan chan os.Signal
}

func (l *LibcontainerExecutor) catchSignals() {
l.logger.Trace("waiting for signals")
defer signal.Stop(l.sigChan)
defer close(l.sigChan)

signal.Notify(l.sigChan, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGSEGV)
for {
signal := <-l.sigChan
if signal == syscall.SIGTERM || signal == syscall.SIGINT {
l.Shutdown("SIGINT", 0)
break
}

if l.container != nil {
l.container.Signal(signal, false)
}
}
}

func NewExecutorWithIsolation(logger hclog.Logger, compute cpustats.Compute) Executor {
sigch := make(chan os.Signal, 4)

le := &LibcontainerExecutor{
id: strings.ReplaceAll(uuid.Generate(), "-", "_"),
logger: logger.Named("isolated_executor"),
compute: compute,
totalCpuStats: cpustats.New(compute),
userCpuStats: cpustats.New(compute),
systemCpuStats: cpustats.New(compute),
sigChan: sigch,
}

go le.catchSignals()

le.processStats = procstats.New(compute, le)
return le
}
Expand All @@ -97,6 +124,34 @@ func (l *LibcontainerExecutor) ListProcesses() *set.Set[int] {
return procstats.List(l.command)
}

// cleanOldProcessesInCGroup kills processes that might ended up orphans when the
// executor was unexpectedly killed and nomad can't reconnect to them.
func (l *LibcontainerExecutor) cleanOldProcessesInCGroup(nomadRelativePath string) {
l.logger.Debug("looking for old processes", "path", nomadRelativePath)

root := cgroupslib.GetDefaultRoot()
orphansPIDs, err := cgroups.GetAllPids(filepath.Join(root, nomadRelativePath))
if err != nil {
l.logger.Error("unable to get orphaned task PIDs", "error", err)
return
}

for _, pid := range orphansPIDs {
l.logger.Info("killing orphaned process", "pid", pid)

// Avoid bringing down the whole node by mistake, very unlikely case,
// but it's better to be sure.
if pid == 1 {
continue
}

err := syscall.Kill(pid, syscall.SIGKILL)
tgross marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
l.logger.Error("unable to send signal to process", "pid", pid, "error", err)
}
}
}

// Launch creates a new container in libcontainer and starts a new process with it
func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, error) {
l.logger.Trace("preparing to launch command", "command", command.Cmd, "args", strings.Join(command.Args, " "))
Expand Down Expand Up @@ -127,6 +182,7 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro
return nil, fmt.Errorf("failed to configure container(%s): %v", l.id, err)
}

l.cleanOldProcessesInCGroup(containerCfg.Cgroups.Path)
container, err := factory.Create(l.id, containerCfg)
if err != nil {
return nil, fmt.Errorf("failed to create container(%s): %v", l.id, err)
Expand Down Expand Up @@ -166,6 +222,7 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro
if command.User != "" {
process.User = command.User
}

l.userProc = process

l.totalCpuStats = cpustats.New(l.compute)
Expand All @@ -187,7 +244,6 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro
// start a goroutine to wait on the process to complete, so Wait calls can
// be multiplexed
l.userProcExited = make(chan interface{})

go l.wait()

return &ProcessState{
Expand Down Expand Up @@ -779,6 +835,7 @@ func (l *LibcontainerExecutor) configureCG2(cfg *runc.Config, command *ExecComma

func (l *LibcontainerExecutor) newLibcontainerConfig(command *ExecCommand) (*runc.Config, error) {
cfg := &runc.Config{
ParentDeathSignal: 9,
Cgroups: &runc.Cgroup{
Resources: &runc.Resources{
MemorySwappiness: nil,
Expand Down
117 changes: 117 additions & 0 deletions drivers/shared/executor/executor_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"regexp"
"strconv"
"strings"
"syscall"
"testing"
"time"

Expand All @@ -27,6 +29,7 @@ import (
tu "github.com/hashicorp/nomad/testutil"
lconfigs "github.com/opencontainers/runc/libcontainer/configs"
"github.com/opencontainers/runc/libcontainer/devices"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -856,3 +859,117 @@ func TestExecCommand_getCgroupOr_v1_relative(t *testing.T) {
result2 := ec.getCgroupOr("cpuset", "/sys/fs/cgroup/cpuset/nomad/abc123")
must.Eq(t, result2, "/sys/fs/cgroup/cpuset/custom/path")
}

func createCGroup(fullpath string) (cgroupslib.Interface, error) {
if err := os.MkdirAll(fullpath, 0755); err != nil {
return nil, err
}

return cgroupslib.OpenPath(fullpath), nil
}

func TestExecutor_CleanOldProcessesInCGroup(t *testing.T) {
ci.Parallel(t)

testutil.ExecCompatible(t)
testutil.CgroupsCompatible(t)

testExecCmd := testExecutorCommandWithChroot(t)

allocDir := testExecCmd.allocDir
defer allocDir.Destroy()

fullCGroupPath := testExecCmd.command.Resources.LinuxResources.CpusetCgroupPath

execCmd := testExecCmd.command
execCmd.Cmd = "/bin/sleep"
execCmd.Args = []string{"1"}
execCmd.ResourceLimits = true
execCmd.ModePID = "private"
execCmd.ModeIPC = "private"

// Create the CGroup the executor's command will run in and populate it with one process
cgInterface, err := createCGroup(fullCGroupPath)
must.NoError(t, err)

cmd := exec.Command("/bin/sleep", "3000")
err = cmd.Start()
must.NoError(t, err)

go func() {
err := cmd.Wait()
//This process will be killed by the executor as a prerequisite to run
// the executors command.
must.Error(t, err)
}()

pid := cmd.Process.Pid
must.Positive(t, pid)

err = cgInterface.Write("cgroup.procs", strconv.Itoa(pid))
must.NoError(t, err)

pids, err := cgInterface.PIDs()
must.NoError(t, err)
must.One(t, pids.Size())

// Run the executor normally and make sure the process that was originally running
// as part of the CGroup was killed, and only the executor's process is running.
execInterface := NewExecutorWithIsolation(testlog.HCLogger(t), compute)
executor := execInterface.(*LibcontainerExecutor)
defer executor.Shutdown("SIGKILL", 0)

ps, err := executor.Launch(execCmd)
must.NoError(t, err)
must.Positive(t, ps.Pid)

pids, err = cgInterface.PIDs()
must.NoError(t, err)
must.One(t, pids.Size())
must.True(t, pids.Contains(ps.Pid))
must.False(t, pids.Contains(pid))

estate, err := executor.Wait(context.Background())
must.NoError(t, err)
must.Zero(t, estate.ExitCode)

must.NoError(t, executor.Shutdown("", 0))
executor.Wait(context.Background())
}

func TestExecutor_SignalCatching(t *testing.T) {
ci.Parallel(t)

testutil.ExecCompatible(t)
testutil.CgroupsCompatible(t)

testExecCmd := testExecutorCommandWithChroot(t)

allocDir := testExecCmd.allocDir
defer allocDir.Destroy()

execCmd := testExecCmd.command
execCmd.Cmd = "/bin/sleep"
execCmd.Args = []string{"100"}
execCmd.ResourceLimits = true
execCmd.ModePID = "private"
execCmd.ModeIPC = "private"

execInterface := NewExecutorWithIsolation(testlog.HCLogger(t), compute)

ps, err := execInterface.Launch(execCmd)
must.NoError(t, err)
must.Positive(t, ps.Pid)

executor := execInterface.(*LibcontainerExecutor)
status, err := executor.container.OCIState()
must.NoError(t, err)
must.Eq(t, specs.StateRunning, status.Status)

executor.sigChan <- syscall.SIGTERM
time.Sleep(1 * time.Second)

status, err = executor.container.OCIState()
must.NoError(t, err)
must.Eq(t, specs.StateStopped, status.Status)
}
Loading