Skip to content

Commit

Permalink
backport of commit b58abf4
Browse files Browse the repository at this point in the history
  • Loading branch information
shoenig authored Nov 7, 2024
1 parent 0ff86de commit 8c5e607
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 18 deletions.
3 changes: 3 additions & 0 deletions .changelog/24340.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
drivers: Move executor process out of task cgroup after task starts on cgroups v1
```
9 changes: 8 additions & 1 deletion drivers/shared/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,8 @@ func (e *UniversalExecutor) Launch(command *ExecCommand) (*ProcessState, error)
}

// setup containment (i.e. cgroups on linux)
if cleanup, err := e.configureResourceContainer(command, os.Getpid()); err != nil {
running, cleanup, err := e.configureResourceContainer(command, os.Getpid())
if err != nil {
e.logger.Error("failed to configure container, process isolation will not work", "error", err)
if os.Geteuid() == 0 || e.usesCustomCgroup() {
return nil, fmt.Errorf("unable to configure cgroups: %w", err)
Expand Down Expand Up @@ -424,6 +425,12 @@ func (e *UniversalExecutor) Launch(command *ExecCommand) (*ProcessState, error)
return nil, fmt.Errorf("failed to start command path=%q --- args=%q: %v", path, e.childCmd.Args, err)
}

// Run the runningFunc hook after the process starts
if err := running(); err != nil {
return nil, err
}

// Wait on the task process
go e.wait()
return &ProcessState{Pid: e.childCmd.Process.Pid, ExitCode: -1, Time: time.Now()}, nil
}
Expand Down
7 changes: 4 additions & 3 deletions drivers/shared/executor/executor_basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ func NewExecutorWithIsolation(logger hclog.Logger, compute cpustats.Compute) Exe
return NewExecutor(logger, compute)
}

func (e *UniversalExecutor) configureResourceContainer(_ *ExecCommand, _ int) (func(), error) {
nothing := func() {}
return nothing, nil
func (e *UniversalExecutor) configureResourceContainer(_ *ExecCommand, _ int) (func() error, func(), error) {
cleanup := func() {}
running := func() error { return nil }
return running, cleanup, nil
}

func (e *UniversalExecutor) start(command *ExecCommand) error {
Expand Down
54 changes: 40 additions & 14 deletions drivers/shared/executor/executor_universal_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,53 +114,73 @@ func (e *UniversalExecutor) statCG(cgroup string) (int, func(), error) {
return fd, cleanup, err
}

// runningFunc is called after task startup and is running.
//
// its use case is for moving the executor process out of the task cgroup once
// the child task process has been started (cgroups v1 only)
type runningFunc func() error

// cleanupFunc is called after task shutdown
//
// its use case is for removing the cgroup from the system once it is no longer
// being used for running the task
type cleanupFunc func()

// configureResourceContainer on Linux configures the cgroups to be used to track
// pids created by the executor
//
// pid: pid of the executor (i.e. ourself)
func (e *UniversalExecutor) configureResourceContainer(command *ExecCommand, pid int) (func(), error) {
func (e *UniversalExecutor) configureResourceContainer(
command *ExecCommand,
pid int,
) (runningFunc, cleanupFunc, error) {
cgroup := command.StatsCgroup()

// ensure tasks get the desired oom_score_adj value set
if err := e.setOomAdj(command.OOMScoreAdj); err != nil {
return nil, err
return nil, nil, err
}

// cgCleanup will be called after the task has been launched
// deleteCgroup will be called after the task has been launched
// v1: remove the executor process from the task's cgroups
// v2: let go of the file descriptor of the task's cgroup
var cgCleanup func()
var (
deleteCgroup cleanupFunc
moveProcess runningFunc
)

// manually configure cgroup for cpu / memory constraints
switch cgroupslib.GetMode() {
case cgroupslib.CG1:
if err := e.configureCG1(cgroup, command); err != nil {
return nil, err
return nil, nil, err
}
cgCleanup = e.enterCG1(cgroup, command.CpusetCgroup())
moveProcess, deleteCgroup = e.enterCG1(cgroup, command.CpusetCgroup())
default:
e.configureCG2(cgroup, command)
// configure child process to spawn in the cgroup
// get file descriptor of the cgroup made for this task
fd, cleanup, err := e.statCG(cgroup)
if err != nil {
return nil, err
return nil, nil, err
}
e.childCmd.SysProcAttr.UseCgroupFD = true
e.childCmd.SysProcAttr.CgroupFD = fd
cgCleanup = cleanup
deleteCgroup = cleanup
moveProcess = func() error { return nil }
}

e.logger.Info("configured cgroup for executor", "pid", pid)

return cgCleanup, nil
return moveProcess, deleteCgroup, nil
}

// enterCG1 will write the executor PID (i.e. itself) into the cgroups we
// created for the task - so that the task and its children will spawn in
// those cgroups. The cleanup function moves the executor out of the task's
// cgroups and into the nomad/ parent cgroups.
func (e *UniversalExecutor) enterCG1(statsCgroup, cpusetCgroup string) func() {
func (e *UniversalExecutor) enterCG1(statsCgroup, cpusetCgroup string) (runningFunc, cleanupFunc) {
ed := cgroupslib.OpenPath(cpusetCgroup)
pid := strconv.Itoa(unix.Getpid())

// write pid to all the normal interfaces
Expand All @@ -174,21 +194,27 @@ func (e *UniversalExecutor) enterCG1(statsCgroup, cpusetCgroup string) func() {
}

// write pid to the cpuset interface, which varies between reserve/share
ed := cgroupslib.OpenPath(cpusetCgroup)
err := ed.Write("cgroup.procs", pid)
if err != nil {
e.logger.Warn("failed to write cpuset cgroup", "error", err)
}

// cleanup func that moves executor back up to nomad cgroup
return func() {
for _, iface := range ifaces {
move := func() error {
// move the executor back out
for _, iface := range append(ifaces, "cpuset") {
err := cgroupslib.WriteNomadCG1(iface, "cgroup.procs", pid)
if err != nil {
e.logger.Warn("failed to move executor cgroup", "interface", iface, "error", err)
return err
}
}
return nil
}

// cleanup func does nothing in cgroups v1
cleanup := func() {}

return move, cleanup
}

func (e *UniversalExecutor) configureCG1(cgroup string, command *ExecCommand) error {
Expand Down
35 changes: 35 additions & 0 deletions drivers/shared/executor/executor_universal_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package executor
import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -127,3 +128,37 @@ func TestUniversalExecutor_setOomAdj(t *testing.T) {
oomScoreInt, _ := strconv.Atoi(strings.TrimSuffix(string(oomScore), "\n"))
must.Eq(t, execCmd.OOMScoreAdj, int32(oomScoreInt))
}

func TestUniversalExecutor_cg1_no_executor_pid(t *testing.T) {
testutil.CgroupsCompatibleV1(t)
ci.Parallel(t)

factory := universalFactory
testExecCmd := testExecutorCommand(t)
execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir
execCmd.Cmd = "sleep"
execCmd.Args = []string{"infinity"}

factory.configureExecCmd(t, execCmd)
defer allocDir.Destroy()
executor := factory.new(testlog.HCLogger(t), compute)
defer executor.Shutdown("", 0)

p, err := executor.Launch(execCmd)
must.NoError(t, err)

alloc := filepath.Base(allocDir.AllocDirPath())

ifaces := []string{"cpu", "memory", "freezer"}
for _, iface := range ifaces {
cgroup := fmt.Sprintf("/sys/fs/cgroup/%s/nomad/%s.web/cgroup.procs", iface, alloc)

content, err := os.ReadFile(cgroup)
must.NoError(t, err)

// ensure only 1 pid (sleep) is present in this cgroup
pids := strings.Fields(string(content))
must.SliceLen(t, 1, pids)
must.Eq(t, pids[0], strconv.Itoa(p.Pid))
}
}

0 comments on commit 8c5e607

Please sign in to comment.