Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drivers: move executor process out of v1 task cgroup after process starts #24340

Merged
merged 5 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/24340.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
drivers: Move executor process out of task cgroup after task starts on cgroups v1
```
9 changes: 8 additions & 1 deletion drivers/shared/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,8 @@ func (e *UniversalExecutor) Launch(command *ExecCommand) (*ProcessState, error)
}

// setup containment (i.e. cgroups on linux)
if cleanup, err := e.configureResourceContainer(command, os.Getpid()); err != nil {
running, cleanup, err := e.configureResourceContainer(command, os.Getpid())
if err != nil {
e.logger.Error("failed to configure container, process isolation will not work", "error", err)
if os.Geteuid() == 0 || e.usesCustomCgroup() {
return nil, fmt.Errorf("unable to configure cgroups: %w", err)
Expand Down Expand Up @@ -416,6 +417,12 @@ func (e *UniversalExecutor) Launch(command *ExecCommand) (*ProcessState, error)
return nil, fmt.Errorf("failed to start command path=%q --- args=%q: %v", path, e.childCmd.Args, err)
}

// Run the runningFunc hook after the process starts
if err := running(); err != nil {
return nil, err
}

// Wait on the task process
go e.wait()
return &ProcessState{Pid: e.childCmd.Process.Pid, ExitCode: -1, Time: time.Now()}, nil
}
Expand Down
7 changes: 4 additions & 3 deletions drivers/shared/executor/executor_basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ func NewExecutorWithIsolation(logger hclog.Logger, compute cpustats.Compute) Exe
return NewExecutor(logger, compute)
}

func (e *UniversalExecutor) configureResourceContainer(_ *ExecCommand, _ int) (func(), error) {
nothing := func() {}
return nothing, nil
func (e *UniversalExecutor) configureResourceContainer(_ *ExecCommand, _ int) (func() error, func(), error) {
cleanup := func() {}
running := func() error { return nil }
return running, cleanup, nil
}

func (e *UniversalExecutor) start(command *ExecCommand) error {
Expand Down
54 changes: 40 additions & 14 deletions drivers/shared/executor/executor_universal_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,53 +114,73 @@ func (e *UniversalExecutor) statCG(cgroup string) (int, func(), error) {
return fd, cleanup, err
}

// runningFunc is called after task startup and is running.
//
// its use case is for moving the executor process out of the task cgroup once
// the child task process has been started (cgroups v1 only)
type runningFunc func() error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you return this functions instead of making them methods of the executor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong reason, it's just that the implementions are different between cgroups v1 and v2. Seemed like defining them as closures inside already-version specific functions was a bit cleaner than expanding the number of functions defined on the executor.


// cleanupFunc is called after task shutdown
//
// its use case is for removing the cgroup from the system once it is no longer
// being used for running the task
type cleanupFunc func()

// configureResourceContainer on Linux configures the cgroups to be used to track
// pids created by the executor
//
// pid: pid of the executor (i.e. ourself)
func (e *UniversalExecutor) configureResourceContainer(command *ExecCommand, pid int) (func(), error) {
func (e *UniversalExecutor) configureResourceContainer(
command *ExecCommand,
pid int,
) (runningFunc, cleanupFunc, error) {
cgroup := command.StatsCgroup()

// ensure tasks get the desired oom_score_adj value set
if err := e.setOomAdj(command.OOMScoreAdj); err != nil {
return nil, err
return nil, nil, err
}

// cgCleanup will be called after the task has been launched
// deleteCgroup will be called after the task has been launched
// v1: remove the executor process from the task's cgroups
// v2: let go of the file descriptor of the task's cgroup
var cgCleanup func()
var (
deleteCgroup cleanupFunc
moveProcess runningFunc
)

// manually configure cgroup for cpu / memory constraints
switch cgroupslib.GetMode() {
case cgroupslib.CG1:
if err := e.configureCG1(cgroup, command); err != nil {
return nil, err
return nil, nil, err
}
cgCleanup = e.enterCG1(cgroup, command.CpusetCgroup())
moveProcess, deleteCgroup = e.enterCG1(cgroup, command.CpusetCgroup())
default:
e.configureCG2(cgroup, command)
// configure child process to spawn in the cgroup
// get file descriptor of the cgroup made for this task
fd, cleanup, err := e.statCG(cgroup)
if err != nil {
return nil, err
return nil, nil, err
}
e.childCmd.SysProcAttr.UseCgroupFD = true
e.childCmd.SysProcAttr.CgroupFD = fd
cgCleanup = cleanup
deleteCgroup = cleanup
moveProcess = func() error { return nil }
}

e.logger.Info("configured cgroup for executor", "pid", pid)

return cgCleanup, nil
return moveProcess, deleteCgroup, nil
}

// enterCG1 will write the executor PID (i.e. itself) into the cgroups we
// created for the task - so that the task and its children will spawn in
// those cgroups. The cleanup function moves the executor out of the task's
// cgroups and into the nomad/ parent cgroups.
func (e *UniversalExecutor) enterCG1(statsCgroup, cpusetCgroup string) func() {
func (e *UniversalExecutor) enterCG1(statsCgroup, cpusetCgroup string) (runningFunc, cleanupFunc) {
ed := cgroupslib.OpenPath(cpusetCgroup)
pid := strconv.Itoa(unix.Getpid())

// write pid to all the normal interfaces
Expand All @@ -174,21 +194,27 @@ func (e *UniversalExecutor) enterCG1(statsCgroup, cpusetCgroup string) func() {
}

// write pid to the cpuset interface, which varies between reserve/share
ed := cgroupslib.OpenPath(cpusetCgroup)
err := ed.Write("cgroup.procs", pid)
if err != nil {
e.logger.Warn("failed to write cpuset cgroup", "error", err)
}

// cleanup func that moves executor back up to nomad cgroup
return func() {
for _, iface := range ifaces {
move := func() error {
// move the executor back out
for _, iface := range append(ifaces, "cpuset") {
err := cgroupslib.WriteNomadCG1(iface, "cgroup.procs", pid)
if err != nil {
e.logger.Warn("failed to move executor cgroup", "interface", iface, "error", err)
return err
}
}
return nil
}

// cleanup func does nothing in cgroups v1
cleanup := func() {}

return move, cleanup
}

func (e *UniversalExecutor) configureCG1(cgroup string, command *ExecCommand) error {
Expand Down
35 changes: 35 additions & 0 deletions drivers/shared/executor/executor_universal_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package executor
import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -127,3 +128,37 @@ func TestUniversalExecutor_setOomAdj(t *testing.T) {
oomScoreInt, _ := strconv.Atoi(strings.TrimSuffix(string(oomScore), "\n"))
must.Eq(t, execCmd.OOMScoreAdj, int32(oomScoreInt))
}

func TestUniversalExecutor_cg1_no_executor_pid(t *testing.T) {
testutil.CgroupsCompatibleV1(t)
ci.Parallel(t)

factory := universalFactory
testExecCmd := testExecutorCommand(t)
execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir
execCmd.Cmd = "sleep"
execCmd.Args = []string{"infinity"}

factory.configureExecCmd(t, execCmd)
defer allocDir.Destroy()
executor := factory.new(testlog.HCLogger(t), compute)
defer executor.Shutdown("", 0)

p, err := executor.Launch(execCmd)
must.NoError(t, err)

alloc := filepath.Base(allocDir.AllocDirPath())

ifaces := []string{"cpu", "memory", "freezer"}
for _, iface := range ifaces {
cgroup := fmt.Sprintf("/sys/fs/cgroup/%s/nomad/%s.web/cgroup.procs", iface, alloc)

content, err := os.ReadFile(cgroup)
must.NoError(t, err)

// ensure only 1 pid (sleep) is present in this cgroup
pids := strings.Fields(string(content))
must.SliceLen(t, 1, pids)
must.Eq(t, pids[0], strconv.Itoa(p.Pid))
}
}
Loading