From be7ec8de3e223e0a944f12274530413ecc723eb0 Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Mon, 28 Mar 2022 19:33:01 -0500 Subject: [PATCH 1/2] raw_exec: make raw exec driver work with cgroups v2 This PR adds support for the raw_exec driver on systems with only cgroups v2. The raw exec driver is able to use cgroups to manage processes. This happens only on Linux, when exec_driver is enabled, and the no_cgroups option is not set. The driver uses the freezer controller to freeze processes of a task, issue a sigkill, then unfreeze. Previously the implementation assumed cgroups v1, and now it also supports cgroups v2. There is a bit of refactoring in this PR, but the fundamental design remains the same. Closes #12351 #12348 --- .changelog/12419.txt | 3 + .github/workflows/test-core.yaml | 3 +- client/allocdir/fs_unix.go | 1 - client/client.go | 1 + client/fs_endpoint_test.go | 1 + client/lib/cgutil/cgutil_linux.go | 8 +- client/lib/cgutil/group_killer.go | 210 ++++++++++++++++++ client/lib/resources/containment.go | 13 ++ client/lib/resources/containment_default.go | 11 + client/lib/resources/containment_linux.go | 107 +++++++++ client/lib/resources/pid.go | 25 +++ client/taskenv/env.go | 8 + client/taskenv/env_test.go | 2 + command/agent/agent.go | 3 +- drivers/docker/driver_unix_test.go | 5 + drivers/exec/driver_test.go | 5 +- drivers/rawexec/driver.go | 9 +- drivers/rawexec/driver_test.go | 123 ++++++++-- drivers/rawexec/driver_unix_test.go | 51 +++-- drivers/shared/executor/executor.go | 47 ++-- drivers/shared/executor/executor_basic.go | 4 +- drivers/shared/executor/executor_linux.go | 14 +- drivers/shared/executor/executor_test.go | 2 +- .../executor/executor_universal_linux.go | 167 ++++++-------- drivers/shared/executor/executor_unix.go | 14 +- drivers/shared/executor/executor_windows.go | 4 +- drivers/shared/executor/pid_collector.go | 40 ++-- .../executor/resource_container_default.go | 3 +- .../executor/resource_container_linux.go | 63 ------ main.go | 3 +- nomad/core_sched_test.go | 2 +- nomad/leader_test.go | 3 +- nomad/structs/structs.go | 3 + plugins/drivers/testutils/testing.go | 40 ++-- website/content/partials/envvars.mdx | 6 + 35 files changed, 697 insertions(+), 307 deletions(-) create mode 100644 .changelog/12419.txt create mode 100644 client/lib/cgutil/group_killer.go create mode 100644 client/lib/resources/containment.go create mode 100644 client/lib/resources/containment_default.go create mode 100644 client/lib/resources/containment_linux.go create mode 100644 client/lib/resources/pid.go delete mode 100644 drivers/shared/executor/resource_container_linux.go diff --git a/.changelog/12419.txt b/.changelog/12419.txt new file mode 100644 index 00000000000..da29f0c0c73 --- /dev/null +++ b/.changelog/12419.txt @@ -0,0 +1,3 @@ +```release-note:improvement +Add support for cgroups v2 in raw_exec driver +``` diff --git a/.github/workflows/test-core.yaml b/.github/workflows/test-core.yaml index 0e0ef790922..91ec0c4643c 100644 --- a/.github/workflows/test-core.yaml +++ b/.github/workflows/test-core.yaml @@ -78,7 +78,8 @@ jobs: run: | make bootstrap make generate-all - make test-nomad-module + sudo sed -i 's!Defaults!#Defaults!g' /etc/sudoers + sudo -E env "PATH=$PATH" make test-nomad-module tests-pkgs: runs-on: ubuntu-20.04 timeout-minutes: 30 diff --git a/client/allocdir/fs_unix.go b/client/allocdir/fs_unix.go index 9f832693870..6010a2bf46d 100644 --- a/client/allocdir/fs_unix.go +++ b/client/allocdir/fs_unix.go @@ -1,5 +1,4 @@ //go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd || solaris -// +build darwin dragonfly freebsd linux netbsd openbsd solaris package allocdir diff --git a/client/client.go b/client/client.go index 0d322a7b46b..61810a3a217 100644 --- a/client/client.go +++ b/client/client.go @@ -1432,6 +1432,7 @@ func (c *Client) setupNode() error { if node.Name == "" { node.Name, _ = os.Hostname() } + node.CgroupParent = c.config.CgroupParent if node.HostVolumes == nil { if l := len(c.config.HostVolumes); l != 0 { node.HostVolumes = make(map[string]*structs.ClientHostVolumeConfig, l) diff --git a/client/fs_endpoint_test.go b/client/fs_endpoint_test.go index 8df47b90da3..4f5a40eddc4 100644 --- a/client/fs_endpoint_test.go +++ b/client/fs_endpoint_test.go @@ -318,6 +318,7 @@ func TestFS_List_ACL(t *testing.T) { func TestFS_Stream_NoAlloc(t *testing.T) { ci.Parallel(t) + ci.SkipSlow(t, "flaky on GHA; #12358") require := require.New(t) // Start a client diff --git a/client/lib/cgutil/cgutil_linux.go b/client/lib/cgutil/cgutil_linux.go index 33b2917ec40..1333d0cc13b 100644 --- a/client/lib/cgutil/cgutil_linux.go +++ b/client/lib/cgutil/cgutil_linux.go @@ -58,13 +58,13 @@ func CgroupScope(allocID, task string) string { return fmt.Sprintf("%s.%s.scope", allocID, task) } -// ConfigureBasicCgroups will initialize cgroups for v1. +// ConfigureBasicCgroups will initialize a cgroup and modify config to contain +// a reference to its path. // -// Not useful in cgroups.v2 +// v1: creates a random "freezer" cgroup which can later be used for cleanup of processes. +// v2: does nothing. func ConfigureBasicCgroups(config *lcc.Config) error { if UseV2 { - // In v2 the default behavior is to create inherited interface files for - // all mounted subsystems automatically. return nil } diff --git a/client/lib/cgutil/group_killer.go b/client/lib/cgutil/group_killer.go new file mode 100644 index 00000000000..9eeae7fefee --- /dev/null +++ b/client/lib/cgutil/group_killer.go @@ -0,0 +1,210 @@ +//go:build linux + +package cgutil + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/opencontainers/runc/libcontainer/cgroups" + "github.com/opencontainers/runc/libcontainer/cgroups/fs" + "github.com/opencontainers/runc/libcontainer/cgroups/fs2" + "github.com/opencontainers/runc/libcontainer/configs" +) + +// freezer is the name of the cgroup subsystem used for stopping / starting +// a group of processes +const freezer = "freezer" + +// thawed and frozen are the two states we put a cgroup in when trying to remove it +var ( + thawed = &configs.Resources{Freezer: configs.Thawed} + frozen = &configs.Resources{Freezer: configs.Frozen} +) + +// GroupKiller is used for SIGKILL-ing the process tree[s] of a cgroup by leveraging +// the freezer cgroup subsystem. +type GroupKiller interface { + KillGroup(cgroup *configs.Cgroup) error +} + +// NewGroupKiller creates a GroupKiller with executor PID pid. +func NewGroupKiller(logger hclog.Logger, pid int) GroupKiller { + return &killer{ + logger: logger.Named("group_killer"), + pid: pid, + } +} + +type killer struct { + logger hclog.Logger + pid int +} + +// KillGroup will SIGKILL the process tree present in cgroup, using the freezer +// subsystem to prevent further forking, etc. +func (d *killer) KillGroup(cgroup *configs.Cgroup) error { + if UseV2 { + return d.v2(cgroup) + } + return d.v1(cgroup) +} + +func (d *killer) v1(cgroup *configs.Cgroup) error { + if cgroup == nil { + return errors.New("missing cgroup") + } + + // the actual path to our tasks freezer cgroup + path := cgroup.Paths[freezer] + + d.logger.Trace("killing processes", "cgroup_path", path, "cgroup_version", "v1", "executor_pid", d.pid) + + // move executor PID into the init freezer cgroup so we can kill the task + // pids without killing the executor (which is the process running this code, + // doing the killing) + initPath, err := cgroups.GetInitCgroupPath(freezer) + if err != nil { + return fmt.Errorf("failed to find init cgroup: %w", err) + } + m := map[string]string{freezer: initPath} + if err = cgroups.EnterPid(m, d.pid); err != nil { + return fmt.Errorf("failed to add executor pid to init cgroup: %w", err) + } + + // ability to freeze the cgroup + freeze := func() { + _ = new(fs.FreezerGroup).Set(path, frozen) + } + + // ability to thaw the cgroup + thaw := func() { + _ = new(fs.FreezerGroup).Set(path, thawed) + } + + // do the common kill logic + if err = d.kill(path, freeze, thaw); err != nil { + return err + } + + // remove the cgroup from disk + return cgroups.RemovePath(path) +} + +func (d *killer) v2(cgroup *configs.Cgroup) error { + if cgroup == nil { + return errors.New("missing cgroup") + } + + path := filepath.Join(CgroupRoot, cgroup.Path) + + existingPIDs, err := cgroups.GetPids(path) + if err != nil { + return fmt.Errorf("failed to determine pids in cgroup: %w", err) + } + + d.logger.Trace("killing processes", "cgroup_path", path, "cgroup_version", "v2", "executor_pid", d.pid, "existing_pids", existingPIDs) + + mgr, err := fs2.NewManager(cgroup, "", rootless) + if err != nil { + return fmt.Errorf("failed to create v2 cgroup manager: %w", err) + } + + // move executor PID into the root init.scope so we can kill the task pids + // without killing the executor (which is the process running this code, doing + // the killing) + init, err := fs2.NewManager(nil, filepath.Join(CgroupRoot, "init.scope"), rootless) + if err != nil { + return fmt.Errorf("failed to create v2 init cgroup manager: %w", err) + } + if err = init.Apply(d.pid); err != nil { + return fmt.Errorf("failed to move executor pid into init.scope cgroup: %w", err) + } + + d.logger.Trace("move of executor pid into init.scope complete", "pid", d.pid) + + // ability to freeze the cgroup + freeze := func() { + _ = mgr.Freeze(configs.Frozen) + } + + // ability to thaw the cgroup + thaw := func() { + _ = mgr.Freeze(configs.Thawed) + } + + // do the common kill logic + + if err = d.kill(path, freeze, thaw); err != nil { + return err + } + + // remove the cgroup from disk + return mgr.Destroy() +} + +// kill is used to SIGKILL all processes in cgroup +// +// The order of operations is +// 0. before calling this method, the executor pid has been moved outside of cgroup +// 1. freeze cgroup (so processes cannot fork further) +// 2. scan the cgroup to collect all pids +// 3. issue SIGKILL to each pid found +// 4. thaw the cgroup so processes can go die +// 5. wait on each processes until it is confirmed dead +func (d *killer) kill(cgroup string, freeze func(), thaw func()) error { + // freeze the cgroup stopping further forking + freeze() + + d.logger.Trace("search for pids in", "cgroup", cgroup) + + // find all the pids we intend to kill + pids, err := cgroups.GetPids(cgroup) + if err != nil { + // if we fail to get pids, re-thaw before bailing so there is at least + // a chance the processes can go die out of band + thaw() + return fmt.Errorf("failed to find pids: %w", err) + } + + d.logger.Trace("send sigkill to frozen processes", "cgroup", cgroup, "pids", pids) + + var processes []*os.Process + + // kill the processes in cgroup + for _, pid := range pids { + p, findErr := os.FindProcess(pid) + if findErr != nil { + d.logger.Trace("failed to find process of pid to kill", "pid", pid, "error", findErr) + continue + } + processes = append(processes, p) + if killErr := p.Kill(); killErr != nil { + d.logger.Trace("failed to kill process", "pid", pid, "error", killErr) + continue + } + } + + // thawed the cgroup so we can wait on each process + thaw() + + // wait on each process + for _, p := range processes { + // do not capture error; errors are normal here + pState, _ := p.Wait() + d.logger.Trace("return from wait on process", "pid", p.Pid, "state", pState) + } + + // cgroups are not atomic, the OS takes a moment to un-mark the cgroup as in-use; + // a tiny sleep here goes a long way for not creating noisy (but functionally benign) + // errors about removing busy cgroup + // + // alternatively we could do the removal in a loop and silence the interim errors, but meh + time.Sleep(50 * time.Millisecond) + + return nil +} diff --git a/client/lib/resources/containment.go b/client/lib/resources/containment.go new file mode 100644 index 00000000000..dd05ca5f0ea --- /dev/null +++ b/client/lib/resources/containment.go @@ -0,0 +1,13 @@ +package resources + +// A Containment will cleanup resources created by an executor. +type Containment interface { + // Apply enables containment on pid. + Apply(pid int) error + + // Cleanup will purge executor resources like cgroups. + Cleanup() error + + // GetPIDs will return the processes overseen by the Containment + GetPIDs() PIDs +} diff --git a/client/lib/resources/containment_default.go b/client/lib/resources/containment_default.go new file mode 100644 index 00000000000..1f3ec38fae0 --- /dev/null +++ b/client/lib/resources/containment_default.go @@ -0,0 +1,11 @@ +//go:build !linux + +package resources + +type containment struct { + // non-linux executors currently do not create resources to be cleaned up +} + +func (c *containment) Cleanup() error { + return nil +} diff --git a/client/lib/resources/containment_linux.go b/client/lib/resources/containment_linux.go new file mode 100644 index 00000000000..279e03e6c3a --- /dev/null +++ b/client/lib/resources/containment_linux.go @@ -0,0 +1,107 @@ +//go:build linux + +package resources + +import ( + "fmt" + "os" + "path/filepath" + "sync" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/lib/cgutil" + "github.com/opencontainers/runc/libcontainer/cgroups" + "github.com/opencontainers/runc/libcontainer/cgroups/fs2" + "github.com/opencontainers/runc/libcontainer/configs" +) + +type containment struct { + lock sync.RWMutex + cgroup *configs.Cgroup + logger hclog.Logger +} + +func Contain(logger hclog.Logger, cgroup *configs.Cgroup) *containment { + return &containment{ + cgroup: cgroup, + logger: logger.Named("containment"), + } +} + +func (c *containment) Apply(pid int) error { + c.lock.Lock() + defer c.lock.Unlock() + + c.logger.Trace("create containment for", "cgroup", c.cgroup, "pid", pid) + + // for v2 use manager to create and enter the cgroup + if cgutil.UseV2 { + mgr, err := fs2.NewManager(c.cgroup, "", false) + if err != nil { + return fmt.Errorf("failed to create v2 cgroup manager for containment: %w", err) + } + + // add the pid to the cgroup + if err = mgr.Apply(pid); err != nil { + return fmt.Errorf("failed to apply v2 cgroup containment: %w", err) + } + + // in v2 it is important to set the device resource configuration + if err = mgr.Set(c.cgroup.Resources); err != nil { + return fmt.Errorf("failed to set v2 cgroup resources: %w", err) + } + + return nil + } + + // for v1 a random cgroup was created already; just enter it + if err := cgroups.EnterPid(c.cgroup.Paths, pid); err != nil { + return fmt.Errorf("failed to add pid to v1 cgroup: %w", err) + } + + return nil +} + +func (c *containment) Cleanup() error { + c.lock.Lock() + defer c.lock.Unlock() + + // the current pid is of the executor, who manages the task process cleanup + executorPID := os.Getpid() + c.logger.Trace("cleanup on", "cgroup", c.cgroup, "executor_pid", executorPID) + + // destroy the task processes + destroyer := cgutil.NewGroupKiller(c.logger, executorPID) + return destroyer.KillGroup(c.cgroup) +} + +func (c *containment) GetPIDs() PIDs { + c.lock.Lock() + defer c.lock.Unlock() + + m := make(PIDs) + if c.cgroup == nil { + return m + } + + // get the cgroup path under containment + var path string + if cgutil.UseV2 { + path = filepath.Join(cgutil.CgroupRoot, c.cgroup.Path) + } else { + path = c.cgroup.Paths["freezer"] + } + + // find the pids in the cgroup under containment + pids, err := cgroups.GetAllPids(path) + if err != nil { + c.logger.Debug("failed to get pids", "cgroup", c.cgroup, "error", err) + return m + } + + for _, pid := range pids { + m[pid] = NewPID(pid) + } + + return m +} diff --git a/client/lib/resources/pid.go b/client/lib/resources/pid.go new file mode 100644 index 00000000000..c2e4f547090 --- /dev/null +++ b/client/lib/resources/pid.go @@ -0,0 +1,25 @@ +package resources + +import ( + "github.com/hashicorp/nomad/client/stats" +) + +// PIDs holds all of a task's pids and their cpu percentage calculators +type PIDs map[int]*PID + +// PID holds one task's pid and it's cpu percentage calculator +type PID struct { + PID int + StatsTotalCPU *stats.CpuStats + StatsUserCPU *stats.CpuStats + StatsSysCPU *stats.CpuStats +} + +func NewPID(pid int) *PID { + return &PID{ + PID: pid, + StatsTotalCPU: stats.NewCpuStats(), + StatsUserCPU: stats.NewCpuStats(), + StatsSysCPU: stats.NewCpuStats(), + } +} diff --git a/client/taskenv/env.go b/client/taskenv/env.go index 06341cada30..1b6d4ce788f 100644 --- a/client/taskenv/env.go +++ b/client/taskenv/env.go @@ -67,6 +67,9 @@ const ( // Datacenter is the environment variable for passing the datacenter in which the alloc is running. Datacenter = "NOMAD_DC" + // CgroupParent is the environment variable for passing the cgroup parent in which cgroups are made. + CgroupParent = "NOMAD_PARENT_CGROUP" + // Namespace is the environment variable for passing the namespace in which the alloc is running. Namespace = "NOMAD_NAMESPACE" @@ -400,6 +403,7 @@ type Builder struct { taskName string allocIndex int datacenter string + cgroupParent string namespace string region string allocId string @@ -518,6 +522,9 @@ func (b *Builder) buildEnv(allocDir, localDir, secretsDir string, if b.datacenter != "" { envMap[Datacenter] = b.datacenter } + if b.cgroupParent != "" { + envMap[CgroupParent] = b.cgroupParent + } if b.namespace != "" { envMap[Namespace] = b.namespace } @@ -802,6 +809,7 @@ func (b *Builder) setNode(n *structs.Node) *Builder { b.nodeAttrs[nodeClassKey] = n.NodeClass b.nodeAttrs[nodeDcKey] = n.Datacenter b.datacenter = n.Datacenter + b.cgroupParent = n.CgroupParent // Set up the attributes. for k, v := range n.Attributes { diff --git a/client/taskenv/env_test.go b/client/taskenv/env_test.go index 7b3156e47fb..32e6238169e 100644 --- a/client/taskenv/env_test.go +++ b/client/taskenv/env_test.go @@ -250,6 +250,7 @@ func TestEnvironment_AllValues(t *testing.T) { "nested.meta.key": "a", "invalid...metakey": "b", } + n.CgroupParent = "abc.slice" a := mock.ConnectAlloc() a.Job.ParentID = fmt.Sprintf("mock-parent-service-%s", uuid.Generate()) a.AllocatedResources.Tasks["web"].Networks[0] = &structs.NetworkResource{ @@ -378,6 +379,7 @@ func TestEnvironment_AllValues(t *testing.T) { "NOMAD_PORT_ssh_ssh": "22", "NOMAD_CPU_LIMIT": "500", "NOMAD_DC": "dc1", + "NOMAD_PARENT_CGROUP": "abc.slice", "NOMAD_NAMESPACE": "default", "NOMAD_REGION": "global", "NOMAD_MEMORY_LIMIT": "256", diff --git a/command/agent/agent.go b/command/agent/agent.go index 9d2668d8842..74b3cc2b73c 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -21,6 +21,7 @@ import ( uuidparse "github.com/hashicorp/go-uuid" "github.com/hashicorp/nomad/client" clientconfig "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/lib/cgutil" "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/command/agent/event" @@ -694,7 +695,7 @@ func convertClientConfig(agentConfig *Config) (*clientconfig.Config, error) { } conf.BindWildcardDefaultHostNetwork = agentConfig.Client.BindWildcardDefaultHostNetwork - conf.CgroupParent = agentConfig.Client.CgroupParent + conf.CgroupParent = cgutil.GetCgroupParent(agentConfig.Client.CgroupParent) if agentConfig.Client.ReserveableCores != "" { cores, err := cpuset.Parse(agentConfig.Client.ReserveableCores) if err != nil { diff --git a/drivers/docker/driver_unix_test.go b/drivers/docker/driver_unix_test.go index 332ad07cf59..7a82a7af7db 100644 --- a/drivers/docker/driver_unix_test.go +++ b/drivers/docker/driver_unix_test.go @@ -787,6 +787,11 @@ func TestDocker_ExecTaskStreaming(t *testing.T) { ci.Parallel(t) testutil.DockerCompatible(t) + // todo(shoenig) these fail maybe 50% of the time on GHA, and the test cases + // are tricky to follow all the way through - maybe a decent candidate for + // a generics re-write. + ci.SkipSlow(t, "flaky on GHA; #12358") + taskCfg := newTaskConfig("", []string{"/bin/sleep", "1000"}) task := &drivers.TaskConfig{ ID: uuid.Generate(), diff --git a/drivers/exec/driver_test.go b/drivers/exec/driver_test.go index fed60bea962..1c2bf06933d 100644 --- a/drivers/exec/driver_test.go +++ b/drivers/exec/driver_test.go @@ -122,7 +122,9 @@ func TestExecDriver_StartWait(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - d := NewExecDriver(ctx, testlog.HCLogger(t)) + logger := testlog.HCLogger(t) + + d := NewExecDriver(ctx, logger) harness := dtestutil.NewDriverHarness(t, d) allocID := uuid.Generate() task := &drivers.TaskConfig{ @@ -793,6 +795,7 @@ func TestExecDriver_NoPivotRoot(t *testing.T) { handle, _, err := harness.StartTask(task) require.NoError(t, err) require.NotNil(t, handle) + require.NoError(t, harness.DestroyTask(task.ID, true)) } func TestDriver_Config_validate(t *testing.T) { diff --git a/drivers/rawexec/driver.go b/drivers/rawexec/driver.go index 8a86c75c460..85005924c0c 100644 --- a/drivers/rawexec/driver.go +++ b/drivers/rawexec/driver.go @@ -11,7 +11,7 @@ import ( "time" "github.com/hashicorp/consul-template/signals" - hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/drivers/shared/eventer" "github.com/hashicorp/nomad/drivers/shared/executor" "github.com/hashicorp/nomad/helper/pluginutils/loader" @@ -323,9 +323,8 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive LogLevel: "debug", } - exec, pluginClient, err := executor.CreateExecutor( - d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID), - d.nomadConfig, executorConfig) + logger := d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID) + exec, pluginClient, err := executor.CreateExecutor(logger, d.nomadConfig, executorConfig) if err != nil { return nil, nil, fmt.Errorf("failed to create executor: %v", err) } @@ -372,7 +371,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive if err := handle.SetDriverState(&driverState); err != nil { d.logger.Error("failed to start task, error setting driver state", "error", err) - exec.Shutdown("", 0) + _ = exec.Shutdown("", 0) pluginClient.Kill() return nil, nil, fmt.Errorf("failed to set driver state: %v", err) } diff --git a/drivers/rawexec/driver_test.go b/drivers/rawexec/driver_test.go index bac01470c70..13177369a97 100644 --- a/drivers/rawexec/driver_test.go +++ b/drivers/rawexec/driver_test.go @@ -14,6 +14,7 @@ import ( "time" "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/client/lib/cgutil" ctestutil "github.com/hashicorp/nomad/client/testutil" "github.com/hashicorp/nomad/helper/pluginutils/hclutils" "github.com/hashicorp/nomad/helper/testlog" @@ -27,6 +28,18 @@ import ( "github.com/stretchr/testify/require" ) +// defaultEnv creates the default environment for raw exec tasks +func defaultEnv() map[string]string { + m := make(map[string]string) + if cgutil.UseV2 { + // normally the taskenv.Builder will set this automatically from the + // Node object which gets created via Client configuration, but none of + // that exists in the test harness so just set it here. + m["NOMAD_PARENT_CGROUP"] = "nomad.slice" + } + return m +} + func TestMain(m *testing.M) { if !testtask.Run() { os.Exit(m.Run()) @@ -35,10 +48,12 @@ func TestMain(m *testing.M) { func newEnabledRawExecDriver(t *testing.T) *Driver { ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(func() { cancel() }) + t.Cleanup(cancel) - d := NewRawExecDriver(ctx, testlog.HCLogger(t)).(*Driver) + logger := testlog.HCLogger(t) + d := NewRawExecDriver(ctx, logger).(*Driver) d.config.Enabled = true + return d } @@ -49,21 +64,25 @@ func TestRawExecDriver_SetConfig(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - d := NewRawExecDriver(ctx, testlog.HCLogger(t)) + logger := testlog.HCLogger(t) + + d := NewRawExecDriver(ctx, logger) harness := dtestutil.NewDriverHarness(t, d) defer harness.Kill() - bconfig := &basePlug.Config{} - - // Disable raw exec. - config := &Config{} + var ( + bconfig = new(basePlug.Config) + config = new(Config) + data = make([]byte, 0) + ) - var data []byte + // Default is raw_exec is disabled. require.NoError(basePlug.MsgPackEncode(&data, config)) bconfig.PluginConfig = data require.NoError(harness.SetConfig(bconfig)) require.Exactly(config, d.(*Driver).config) + // Enable raw_exec, but disable cgroups. config.Enabled = true config.NoCgroups = true data = []byte{} @@ -72,6 +91,7 @@ func TestRawExecDriver_SetConfig(t *testing.T) { require.NoError(harness.SetConfig(bconfig)) require.Exactly(config, d.(*Driver).config) + // Enable raw_exec, enable cgroups. config.NoCgroups = false data = []byte{} require.NoError(basePlug.MsgPackEncode(&data, config)) @@ -150,8 +170,10 @@ func TestRawExecDriver_StartWait(t *testing.T) { harness := dtestutil.NewDriverHarness(t, d) defer harness.Kill() task := &drivers.TaskConfig{ - ID: uuid.Generate(), - Name: "test", + AllocID: uuid.Generate(), + ID: uuid.Generate(), + Name: "test", + Env: defaultEnv(), } tc := &TaskConfig{ @@ -200,8 +222,10 @@ func TestRawExecDriver_StartWaitRecoverWaitStop(t *testing.T) { require.NoError(harness.SetConfig(bconfig)) task := &drivers.TaskConfig{ - ID: uuid.Generate(), - Name: "sleep", + AllocID: uuid.Generate(), + ID: uuid.Generate(), + Name: "sleep", + Env: defaultEnv(), } tc := &TaskConfig{ Command: testtask.Path(), @@ -276,8 +300,10 @@ func TestRawExecDriver_Start_Wait_AllocDir(t *testing.T) { defer harness.Kill() task := &drivers.TaskConfig{ - ID: uuid.Generate(), - Name: "sleep", + AllocID: uuid.Generate(), + ID: uuid.Generate(), + Name: "sleep", + Env: defaultEnv(), } cleanup := harness.MkAllocDir(task, false) @@ -323,7 +349,6 @@ func TestRawExecDriver_Start_Wait_AllocDir(t *testing.T) { func TestRawExecDriver_Start_Kill_Wait_Cgroup(t *testing.T) { ci.Parallel(t) ctestutil.ExecCompatible(t) - ctestutil.CgroupsCompatibleV1(t) // todo(shoenig) #12348 require := require.New(t) pidFile := "pid" @@ -333,9 +358,11 @@ func TestRawExecDriver_Start_Kill_Wait_Cgroup(t *testing.T) { defer harness.Kill() task := &drivers.TaskConfig{ - ID: uuid.Generate(), - Name: "sleep", - User: "root", + AllocID: uuid.Generate(), + ID: uuid.Generate(), + Name: "sleep", + User: "root", + Env: defaultEnv(), } cleanup := harness.MkAllocDir(task, false) @@ -414,10 +441,56 @@ func TestRawExecDriver_Start_Kill_Wait_Cgroup(t *testing.T) { require.NoError(harness.DestroyTask(task.ID, true)) } +func TestRawExecDriver_ParentCgroup(t *testing.T) { + ci.Parallel(t) + ctestutil.ExecCompatible(t) + ctestutil.CgroupsCompatibleV2(t) + + d := newEnabledRawExecDriver(t) + harness := dtestutil.NewDriverHarness(t, d) + defer harness.Kill() + + task := &drivers.TaskConfig{ + AllocID: uuid.Generate(), + ID: uuid.Generate(), + Name: "sleep", + Env: map[string]string{ + "NOMAD_PARENT_CGROUP": "custom.slice", + }, + } + + cleanup := harness.MkAllocDir(task, false) + defer cleanup() + + // run sleep task + tc := &TaskConfig{ + Command: testtask.Path(), + Args: []string{"sleep", "9000s"}, + } + require.NoError(t, task.EncodeConcreteDriverConfig(&tc)) + testtask.SetTaskConfigEnv(task) + _, _, err := harness.StartTask(task) + require.NoError(t, err) + + // inspect environment variable + res, execErr := harness.ExecTask(task.ID, []string{"/usr/bin/env"}, 1*time.Second) + require.NoError(t, execErr) + require.True(t, res.ExitResult.Successful()) + require.Contains(t, string(res.Stdout), "custom.slice") + + // inspect /proc/self/cgroup + res2, execErr2 := harness.ExecTask(task.ID, []string{"cat", "/proc/self/cgroup"}, 1*time.Second) + require.NoError(t, execErr2) + require.True(t, res2.ExitResult.Successful()) + require.Contains(t, string(res2.Stdout), "custom.slice") + + // kill the sleep task + require.NoError(t, harness.DestroyTask(task.ID, true)) +} + func TestRawExecDriver_Exec(t *testing.T) { ci.Parallel(t) ctestutil.ExecCompatible(t) - ctestutil.CgroupsCompatibleV1(t) // todo(shoenig) #12348 require := require.New(t) @@ -426,8 +499,10 @@ func TestRawExecDriver_Exec(t *testing.T) { defer harness.Kill() task := &drivers.TaskConfig{ - ID: uuid.Generate(), - Name: "sleep", + AllocID: uuid.Generate(), + ID: uuid.Generate(), + Name: "sleep", + Env: defaultEnv(), } cleanup := harness.MkAllocDir(task, false) @@ -502,8 +577,10 @@ func TestRawExecDriver_Disabled(t *testing.T) { harness := dtestutil.NewDriverHarness(t, d) defer harness.Kill() task := &drivers.TaskConfig{ - ID: uuid.Generate(), - Name: "test", + AllocID: uuid.Generate(), + ID: uuid.Generate(), + Name: "test", + Env: defaultEnv(), } handle, _, err := harness.StartTask(task) diff --git a/drivers/rawexec/driver_unix_test.go b/drivers/rawexec/driver_unix_test.go index 895906bc913..f5e8c5272e1 100644 --- a/drivers/rawexec/driver_unix_test.go +++ b/drivers/rawexec/driver_unix_test.go @@ -17,11 +17,9 @@ import ( "time" "github.com/hashicorp/nomad/ci" - "github.com/hashicorp/nomad/client/lib/cgutil" clienttestutil "github.com/hashicorp/nomad/client/testutil" "github.com/hashicorp/nomad/helper/testtask" "github.com/hashicorp/nomad/helper/uuid" - "github.com/hashicorp/nomad/nomad/structs" basePlug "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/drivers" dtestutil "github.com/hashicorp/nomad/plugins/drivers/testutils" @@ -62,17 +60,18 @@ func TestRawExecDriver_User(t *testing.T) { func TestRawExecDriver_Signal(t *testing.T) { ci.Parallel(t) - if runtime.GOOS != "linux" { - t.Skip("Linux only test") - } + clienttestutil.RequireLinux(t) + require := require.New(t) d := newEnabledRawExecDriver(t) harness := dtestutil.NewDriverHarness(t, d) task := &drivers.TaskConfig{ - ID: uuid.Generate(), - Name: "signal", + AllocID: uuid.Generate(), + ID: uuid.Generate(), + Name: "signal", + Env: defaultEnv(), } cleanup := harness.MkAllocDir(task, true) @@ -206,24 +205,16 @@ func TestRawExecDriver_StartWaitStop(t *testing.T) { func TestRawExecDriver_DestroyKillsAll(t *testing.T) { ci.Parallel(t) clienttestutil.RequireLinux(t) - clienttestutil.CgroupsCompatibleV1(t) // todo(shoenig): #12348 d := newEnabledRawExecDriver(t) harness := dtestutil.NewDriverHarness(t, d) defer harness.Kill() task := &drivers.TaskConfig{ - ID: uuid.Generate(), - Name: "test", - } - - if cgutil.UseV2 { - allocID := uuid.Generate() - task.AllocID = allocID - task.Resources = new(drivers.Resources) - task.Resources.NomadResources = new(structs.AllocatedTaskResources) - task.Resources.LinuxResources = new(drivers.LinuxResources) - task.Resources.LinuxResources.CpusetCgroupPath = filepath.Join(cgutil.CgroupRoot, "testing.slice", cgutil.CgroupScope(allocID, "test")) + AllocID: uuid.Generate(), + ID: uuid.Generate(), + Name: "test", + Env: defaultEnv(), } cleanup := harness.MkAllocDir(task, true) @@ -322,8 +313,10 @@ func TestRawExec_ExecTaskStreaming(t *testing.T) { defer harness.Kill() task := &drivers.TaskConfig{ - ID: uuid.Generate(), - Name: "sleep", + AllocID: uuid.Generate(), + ID: uuid.Generate(), + Name: "sleep", + Env: defaultEnv(), } cleanup := harness.MkAllocDir(task, false) @@ -349,10 +342,21 @@ func TestRawExec_ExecTaskStreaming_User(t *testing.T) { clienttestutil.RequireLinux(t) d := newEnabledRawExecDriver(t) + + // because we cannot set AllocID, see below + d.config.NoCgroups = true + harness := dtestutil.NewDriverHarness(t, d) defer harness.Kill() task := &drivers.TaskConfig{ + // todo(shoenig) - Setting AllocID causes test to fail - with or without + // cgroups, and with or without chroot. It has to do with MkAllocDir + // creating the directory structure, but the actual root cause is still + // TBD. The symptom is that any command you try to execute will result + // in "permission denied" coming from os/exec. This test is imperfect, + // the actual feature of running commands as another user works fine. + // AllocID: uuid.Generate() ID: uuid.Generate(), Name: "sleep", User: "nobody", @@ -394,8 +398,9 @@ func TestRawExecDriver_NoCgroup(t *testing.T) { harness := dtestutil.NewDriverHarness(t, d) task := &drivers.TaskConfig{ - ID: uuid.Generate(), - Name: "nocgroup", + AllocID: uuid.Generate(), + ID: uuid.Generate(), + Name: "nocgroup", } cleanup := harness.MkAllocDir(task, true) diff --git a/drivers/shared/executor/executor.go b/drivers/shared/executor/executor.go index 99819d85a66..a3fe56ccf1e 100644 --- a/drivers/shared/executor/executor.go +++ b/drivers/shared/executor/executor.go @@ -20,12 +20,12 @@ import ( multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/lib/fifo" + "github.com/hashicorp/nomad/client/lib/resources" "github.com/hashicorp/nomad/client/stats" cstructs "github.com/hashicorp/nomad/client/structs" + shelpers "github.com/hashicorp/nomad/helper/stats" "github.com/hashicorp/nomad/plugins/drivers" "github.com/syndtr/gocapability/capability" - - shelpers "github.com/hashicorp/nomad/helper/stats" ) const ( @@ -244,9 +244,9 @@ type UniversalExecutor struct { exitState *ProcessState processExited chan interface{} - // resConCtx is used to track and cleanup additional resources created by - // the executor. Currently this is only used for cgroups. - resConCtx resourceContainerContext + // containment is used to cleanup resources created by the executor + // currently only used for killing pids via freezer cgroup on linux + containment resources.Containment totalCpuStats *stats.CpuStats userCpuStats *stats.CpuStats @@ -262,6 +262,7 @@ func NewExecutor(logger hclog.Logger) Executor { if err := shelpers.Init(); err != nil { logger.Error("unable to initialize stats", "error", err) } + return &UniversalExecutor{ logger: logger, processExited: make(chan interface{}), @@ -300,10 +301,11 @@ func (e *UniversalExecutor) Launch(command *ExecCommand) (*ProcessState, error) return nil, err } - // Setup cgroups on linux + // Maybe setup containment (for now, cgroups only only on linux) if e.commandCfg.ResourceLimits || e.commandCfg.BasicProcessCgroup { pid := os.Getpid() if err := e.configureResourceContainer(pid); err != nil { + e.logger.Error("failed to configure resource container", "pid", pid, "error", err) return nil, err } } @@ -519,14 +521,14 @@ func (e *UniversalExecutor) Shutdown(signal string, grace time.Duration) error { // If there is no process we can't shutdown if e.childCmd.Process == nil { - e.logger.Warn("failed to shutdown", "error", "no process found") + e.logger.Warn("failed to shutdown due to missing process", "error", "no process found") return fmt.Errorf("executor failed to shutdown error: no process found") } proc, err := os.FindProcess(e.childCmd.Process.Pid) if err != nil { err = fmt.Errorf("executor failed to find process: %v", err) - e.logger.Warn("failed to shutdown", "error", err) + e.logger.Warn("failed to shutdown due to inability to find process", "pid", e.childCmd.Process.Pid, "error", err) return err } @@ -545,7 +547,7 @@ func (e *UniversalExecutor) Shutdown(signal string, grace time.Duration) error { } if err := e.shutdownProcess(sig, proc); err != nil { - e.logger.Warn("failed to shutdown", "error", err) + e.logger.Warn("failed to shutdown process", "pid", proc.Pid, "error", err) return err } @@ -566,22 +568,27 @@ func (e *UniversalExecutor) Shutdown(signal string, grace time.Duration) error { merr.Errors = append(merr.Errors, fmt.Errorf("process did not exit after 15 seconds")) } - // Prefer killing the process via the resource container. - if !(e.commandCfg.ResourceLimits || e.commandCfg.BasicProcessCgroup) { - if err := e.cleanupChildProcesses(proc); err != nil && err.Error() != finishedErr { + // prefer killing the process via platform-dependent resource containment + killByContainment := e.commandCfg.ResourceLimits || e.commandCfg.BasicProcessCgroup + + if !killByContainment { + // there is no containment, so kill the group the old fashioned way by sending + // SIGKILL to the negative pid + if cleanupChildrenErr := e.killProcessTree(proc); cleanupChildrenErr != nil && cleanupChildrenErr.Error() != finishedErr { merr.Errors = append(merr.Errors, - fmt.Errorf("can't kill process with pid %d: %v", e.childCmd.Process.Pid, err)) + fmt.Errorf("can't kill process with pid %d: %v", e.childCmd.Process.Pid, cleanupChildrenErr)) } - } - - if e.commandCfg.ResourceLimits || e.commandCfg.BasicProcessCgroup { - if err := e.resConCtx.executorCleanup(); err != nil { - merr.Errors = append(merr.Errors, err) + } else { + // there is containment available (e.g. cgroups) so defer to that implementation + // for killing the processes + if cleanupErr := e.containment.Cleanup(); cleanupErr != nil { + e.logger.Warn("containment cleanup failed", "error", cleanupErr) + merr.Errors = append(merr.Errors, cleanupErr) } } - if err := merr.ErrorOrNil(); err != nil { - e.logger.Warn("failed to shutdown", "error", err) + if err = merr.ErrorOrNil(); err != nil { + e.logger.Warn("failed to shutdown due to some error", "error", err.Error()) return err } diff --git a/drivers/shared/executor/executor_basic.go b/drivers/shared/executor/executor_basic.go index 083166bd37c..ad42792d1c0 100644 --- a/drivers/shared/executor/executor_basic.go +++ b/drivers/shared/executor/executor_basic.go @@ -1,5 +1,4 @@ //go:build !linux -// +build !linux package executor @@ -7,6 +6,7 @@ import ( "os/exec" hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/lib/resources" "github.com/hashicorp/nomad/plugins/drivers" ) @@ -18,7 +18,7 @@ func NewExecutorWithIsolation(logger hclog.Logger) Executor { func (e *UniversalExecutor) configureResourceContainer(_ int) error { return nil } -func (e *UniversalExecutor) getAllPids() (map[int]*nomadPid, error) { +func (e *UniversalExecutor) getAllPids() (resources.PIDs, error) { return getAllPidsByScanning() } diff --git a/drivers/shared/executor/executor_linux.go b/drivers/shared/executor/executor_linux.go index cdb14f64dba..ca35bdee58d 100644 --- a/drivers/shared/executor/executor_linux.go +++ b/drivers/shared/executor/executor_linux.go @@ -20,6 +20,7 @@ import ( hclog "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/lib/cgutil" + "github.com/hashicorp/nomad/client/lib/resources" "github.com/hashicorp/nomad/client/stats" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/drivers/shared/capabilities" @@ -200,21 +201,16 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro }, nil } -func (l *LibcontainerExecutor) getAllPids() (map[int]*nomadPid, error) { +func (l *LibcontainerExecutor) getAllPids() (resources.PIDs, error) { pids, err := l.container.Processes() if err != nil { return nil, err } - nPids := make(map[int]*nomadPid) + m := make(resources.PIDs, 1) for _, pid := range pids { - nPids[pid] = &nomadPid{ - pid: pid, - cpuStatsTotal: stats.NewCpuStats(), - cpuStatsUser: stats.NewCpuStats(), - cpuStatsSys: stats.NewCpuStats(), - } + m[pid] = resources.NewPID(pid) } - return nPids, nil + return m, nil } // Wait waits until a process has exited and returns it's exitcode and errors diff --git a/drivers/shared/executor/executor_test.go b/drivers/shared/executor/executor_test.go index 57fbbb015d0..d90be12e472 100644 --- a/drivers/shared/executor/executor_test.go +++ b/drivers/shared/executor/executor_test.go @@ -15,7 +15,7 @@ import ( "testing" "time" - hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/lib/cgutil" diff --git a/drivers/shared/executor/executor_universal_linux.go b/drivers/shared/executor/executor_universal_linux.go index 7c97681df55..ac79b518b7e 100644 --- a/drivers/shared/executor/executor_universal_linux.go +++ b/drivers/shared/executor/executor_universal_linux.go @@ -2,19 +2,19 @@ package executor import ( "fmt" - "os" "os/exec" "os/user" + "path/filepath" "strconv" + "strings" "syscall" "github.com/containernetworking/plugins/pkg/ns" - multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/lib/cgutil" + "github.com/hashicorp/nomad/client/lib/resources" + "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/plugins/drivers" - "github.com/opencontainers/runc/libcontainer/cgroups" - cgroupFs "github.com/opencontainers/runc/libcontainer/cgroups/fs" - lconfigs "github.com/opencontainers/runc/libcontainer/configs" + "github.com/opencontainers/runc/libcontainer/configs" "github.com/opencontainers/runc/libcontainer/specconv" ) @@ -23,20 +23,20 @@ import ( func setCmdUser(cmd *exec.Cmd, userid string) error { u, err := user.Lookup(userid) if err != nil { - return fmt.Errorf("Failed to identify user %v: %v", userid, err) + return fmt.Errorf("failed to identify user %v: %v", userid, err) } // Get the groups the user is a part of gidStrings, err := u.GroupIds() if err != nil { - return fmt.Errorf("Unable to lookup user's group membership: %v", err) + return fmt.Errorf("unable to lookup user's group membership: %v", err) } gids := make([]uint32, len(gidStrings)) for _, gidString := range gidStrings { u, err := strconv.ParseUint(gidString, 10, 32) if err != nil { - return fmt.Errorf("Unable to convert user's group to uint32 %s: %v", gidString, err) + return fmt.Errorf("unable to convert user's group to uint32 %s: %v", gidString, err) } gids = append(gids, uint32(u)) @@ -45,11 +45,11 @@ func setCmdUser(cmd *exec.Cmd, userid string) error { // Convert the uid and gid uid, err := strconv.ParseUint(u.Uid, 10, 32) if err != nil { - return fmt.Errorf("Unable to convert userid to uint32: %s", err) + return fmt.Errorf("unable to convert userid to uint32: %s", err) } gid, err := strconv.ParseUint(u.Gid, 10, 32) if err != nil { - return fmt.Errorf("Unable to convert groupid to uint32: %s", err) + return fmt.Errorf("unable to convert groupid to uint32: %s", err) } // Set the command to run as that user and group. @@ -69,125 +69,86 @@ func setCmdUser(cmd *exec.Cmd, userid string) error { // configureResourceContainer configured the cgroups to be used to track pids // created by the executor func (e *UniversalExecutor) configureResourceContainer(pid int) error { - cfg := &lconfigs.Config{ - Cgroups: &lconfigs.Cgroup{ - Resources: &lconfigs.Resources{}, + cfg := &configs.Config{ + Cgroups: &configs.Cgroup{ + Resources: &configs.Resources{}, }, } + + // note: this was always here, but not used until cgroups v2 support for _, device := range specconv.AllowedDevices { cfg.Cgroups.Resources.Devices = append(cfg.Cgroups.Resources.Devices, &device.Rule) } - if err := cgutil.ConfigureBasicCgroups(cfg); err != nil { - // Log this error to help diagnose cases where nomad is run with too few - // permissions, but don't return an error. There is no separate check for - // cgroup creation permissions, so this may be the happy path. - e.logger.Warn("failed to create cgroup", - "docs", "https://www.nomadproject.io/docs/drivers/raw_exec.html#no_cgroups", - "error", err) - return nil - } - e.resConCtx.groups = cfg.Cgroups - return cgroups.EnterPid(cfg.Cgroups.Paths, pid) -} - -func (e *UniversalExecutor) getAllPids() (map[int]*nomadPid, error) { - if e.resConCtx.isEmpty() { - return getAllPidsByScanning() - } else { - return e.resConCtx.getAllPidsByCgroup() - } -} - -// DestroyCgroup kills all processes in the cgroup and removes the cgroup -// configuration from the host. This function is idempotent. -func DestroyCgroup(groups *lconfigs.Cgroup, executorPid int) error { - mErrs := new(multierror.Error) - if groups == nil { - return fmt.Errorf("Can't destroy: cgroup configuration empty") - } - - // Move the executor into the global cgroup so that the task specific - // cgroup can be destroyed. - path, err := cgroups.GetInitCgroupPath("freezer") - if err != nil { - return err - } - - if err := cgroups.EnterPid(map[string]string{"freezer": path}, executorPid); err != nil { - return err - } - - // Freeze the Cgroup so that it can not continue to fork/exec. - groups.Resources.Freezer = lconfigs.Frozen - freezer := cgroupFs.FreezerGroup{} - if err := freezer.Set(groups.Paths[freezer.Name()], groups.Resources); err != nil { - return err - } - - var procs []*os.Process - pids, err := cgroups.GetAllPids(groups.Paths[freezer.Name()]) - if err != nil { - multierror.Append(mErrs, fmt.Errorf("error getting pids: %v", err)) - - // Unfreeze the cgroup. - groups.Resources.Freezer = lconfigs.Thawed - freezer := cgroupFs.FreezerGroup{} - if err := freezer.Set(groups.Paths[freezer.Name()], groups.Resources); err != nil { - multierror.Append(mErrs, fmt.Errorf("failed to unfreeze cgroup: %v", err)) - return mErrs.ErrorOrNil() + lookup := func(env []string, name string) (result string) { + for _, s := range env { + if strings.HasPrefix(s, name+"=") { + result = strings.TrimLeft(s, name+"=") + return + } } - } - - // Kill the processes in the cgroup - for _, pid := range pids { - proc, err := os.FindProcess(pid) - if err != nil { - multierror.Append(mErrs, fmt.Errorf("error finding process %v: %v", pid, err)) - continue + return + } + + if cgutil.UseV2 { + // in v2 we have the definitive cgroup; create and enter it + + // use the task environment variables for determining the cgroup path - + // not ideal but plumbing the values directly requires grpc protobuf changes + parent := lookup(e.commandCfg.Env, taskenv.CgroupParent) + allocID := lookup(e.commandCfg.Env, taskenv.AllocID) + task := lookup(e.commandCfg.Env, taskenv.TaskName) + if parent == "" || allocID == "" || task == "" { + return fmt.Errorf( + "environment variables %s must be set", + strings.Join([]string{taskenv.CgroupParent, taskenv.AllocID, taskenv.TaskName}, ","), + ) } + scope := cgutil.CgroupScope(allocID, task) + path := filepath.Join("/", cgutil.GetCgroupParent(parent), scope) + cfg.Cgroups.Path = path + e.containment = resources.Contain(e.logger, cfg.Cgroups) + return e.containment.Apply(pid) - procs = append(procs, proc) - if e := proc.Kill(); e != nil { - multierror.Append(mErrs, fmt.Errorf("error killing process %v: %v", pid, e)) + } else { + // in v1 create a freezer cgroup for use by containment + + if err := cgutil.ConfigureBasicCgroups(cfg); err != nil { + // Log this error to help diagnose cases where nomad is run with too few + // permissions, but don't return an error. There is no separate check for + // cgroup creation permissions, so this may be the happy path. + e.logger.Warn("failed to create cgroup", + "docs", "https://www.nomadproject.io/docs/drivers/raw_exec.html#no_cgroups", + "error", err) + return nil } + path := cfg.Cgroups.Paths["freezer"] + e.logger.Trace("cgroup created, now need to apply", "path", path) + e.containment = resources.Contain(e.logger, cfg.Cgroups) + return e.containment.Apply(pid) } +} - // Unfreeze the cgroug so we can wait. - groups.Resources.Freezer = lconfigs.Thawed - if err := freezer.Set(groups.Paths[freezer.Name()], groups.Resources); err != nil { - multierror.Append(mErrs, fmt.Errorf("failed to unfreeze cgroup: %v", err)) - return mErrs.ErrorOrNil() - } - - // Wait on the killed processes to ensure they are cleaned up. - for _, proc := range procs { - // Don't capture the error because we expect this to fail for - // processes we didn't fork. - proc.Wait() - } - - // Remove the cgroup. - if err := cgroups.RemovePaths(groups.Paths); err != nil { - multierror.Append(mErrs, fmt.Errorf("failed to delete the cgroup directories: %v", err)) +func (e *UniversalExecutor) getAllPids() (resources.PIDs, error) { + if e.containment == nil { + return getAllPidsByScanning() } - return mErrs.ErrorOrNil() + return e.containment.GetPIDs(), nil } // withNetworkIsolation calls the passed function the network namespace `spec` func withNetworkIsolation(f func() error, spec *drivers.NetworkIsolationSpec) error { if spec != nil && spec.Path != "" { // Get a handle to the target network namespace - netns, err := ns.GetNS(spec.Path) + netNS, err := ns.GetNS(spec.Path) if err != nil { return err } // Start the container in the network namespace - return netns.Do(func(ns.NetNS) error { + return netNS.Do(func(ns.NetNS) error { return f() }) } - return f() } diff --git a/drivers/shared/executor/executor_unix.go b/drivers/shared/executor/executor_unix.go index da2d9ddcb75..d93c8fb68ab 100644 --- a/drivers/shared/executor/executor_unix.go +++ b/drivers/shared/executor/executor_unix.go @@ -1,5 +1,4 @@ //go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd || solaris -// +build darwin dragonfly freebsd linux netbsd openbsd solaris package executor @@ -18,17 +17,22 @@ func (e *UniversalExecutor) setNewProcessGroup() error { return nil } -// Cleanup any still hanging user processes -func (e *UniversalExecutor) cleanupChildProcesses(proc *os.Process) error { +// SIGKILL the process group starting at process.Pid +func (e *UniversalExecutor) killProcessTree(process *os.Process) error { + pid := process.Pid + negative := -pid // tells unix to kill entire process group + signal := syscall.SIGKILL + // If new process group was created upon command execution // we can kill the whole process group now to cleanup any leftovers. if e.childCmd.SysProcAttr != nil && e.childCmd.SysProcAttr.Setpgid { - if err := syscall.Kill(-proc.Pid, syscall.SIGKILL); err != nil && err.Error() != noSuchProcessErr { + e.logger.Trace("sending sigkill to process group", "pid", pid, "negative", negative, "signal", signal) + if err := syscall.Kill(negative, signal); err != nil && err.Error() != noSuchProcessErr { return err } return nil } - return proc.Kill() + return process.Kill() } // Only send the process a shutdown signal (default INT), doesn't diff --git a/drivers/shared/executor/executor_windows.go b/drivers/shared/executor/executor_windows.go index 9d0b1407011..f03e0230e4c 100644 --- a/drivers/shared/executor/executor_windows.go +++ b/drivers/shared/executor/executor_windows.go @@ -1,3 +1,5 @@ +//go:build windows + package executor import ( @@ -17,7 +19,7 @@ func (e *UniversalExecutor) setNewProcessGroup() error { } // Cleanup any still hanging user processes -func (e *UniversalExecutor) cleanupChildProcesses(proc *os.Process) error { +func (e *UniversalExecutor) killProcessTree(proc *os.Process) error { // We must first verify if the process is still running. // (Windows process often lingered around after being reported as killed). handle, err := syscall.OpenProcess(syscall.PROCESS_TERMINATE|syscall.SYNCHRONIZE|syscall.PROCESS_QUERY_INFORMATION, false, uint32(proc.Pid)) diff --git a/drivers/shared/executor/pid_collector.go b/drivers/shared/executor/pid_collector.go index ed548b6e67d..2413f8ee5e0 100644 --- a/drivers/shared/executor/pid_collector.go +++ b/drivers/shared/executor/pid_collector.go @@ -7,6 +7,7 @@ import ( "time" hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/lib/resources" "github.com/hashicorp/nomad/client/stats" "github.com/hashicorp/nomad/plugins/drivers" ps "github.com/mitchellh/go-ps" @@ -23,26 +24,18 @@ var ( // pidCollector is a utility that can be embedded in an executor to collect pid // stats type pidCollector struct { - pids map[int]*nomadPid + pids map[int]*resources.PID pidLock sync.RWMutex logger hclog.Logger } -// nomadPid holds a pid and it's cpu percentage calculator -type nomadPid struct { - pid int - cpuStatsTotal *stats.CpuStats - cpuStatsUser *stats.CpuStats - cpuStatsSys *stats.CpuStats -} - // allPidGetter is a func which is used by the pid collector to gather // stats on -type allPidGetter func() (map[int]*nomadPid, error) +type allPidGetter func() (resources.PIDs, error) func newPidCollector(logger hclog.Logger) *pidCollector { return &pidCollector{ - pids: make(map[int]*nomadPid), + pids: make(map[int]*resources.PID), logger: logger.Named("pid_collector"), } } @@ -85,7 +78,7 @@ func (c *pidCollector) collectPids(stopCh chan interface{}, pidGetter allPidGett // scanPids scans all the pids on the machine running the current executor and // returns the child processes of the executor. -func scanPids(parentPid int, allPids []ps.Process) (map[int]*nomadPid, error) { +func scanPids(parentPid int, allPids []ps.Process) (map[int]*resources.PID, error) { processFamily := make(map[int]struct{}) processFamily[parentPid] = struct{}{} @@ -117,15 +110,14 @@ func scanPids(parentPid int, allPids []ps.Process) (map[int]*nomadPid, error) { } } - res := make(map[int]*nomadPid) + res := make(map[int]*resources.PID) for pid := range processFamily { - np := nomadPid{ - pid: pid, - cpuStatsTotal: stats.NewCpuStats(), - cpuStatsUser: stats.NewCpuStats(), - cpuStatsSys: stats.NewCpuStats(), + res[pid] = &resources.PID{ + PID: pid, + StatsTotalCPU: stats.NewCpuStats(), + StatsUserCPU: stats.NewCpuStats(), + StatsSysCPU: stats.NewCpuStats(), } - res[pid] = &np } return res, nil } @@ -134,7 +126,7 @@ func scanPids(parentPid int, allPids []ps.Process) (map[int]*nomadPid, error) { func (c *pidCollector) pidStats() (map[string]*drivers.ResourceUsage, error) { stats := make(map[string]*drivers.ResourceUsage) c.pidLock.RLock() - pids := make(map[int]*nomadPid, len(c.pids)) + pids := make(map[int]*resources.PID, len(c.pids)) for k, v := range c.pids { pids[k] = v } @@ -154,12 +146,12 @@ func (c *pidCollector) pidStats() (map[string]*drivers.ResourceUsage, error) { cs := &drivers.CpuStats{} if cpuStats, err := p.Times(); err == nil { - cs.SystemMode = np.cpuStatsSys.Percent(cpuStats.System * float64(time.Second)) - cs.UserMode = np.cpuStatsUser.Percent(cpuStats.User * float64(time.Second)) + cs.SystemMode = np.StatsSysCPU.Percent(cpuStats.System * float64(time.Second)) + cs.UserMode = np.StatsUserCPU.Percent(cpuStats.User * float64(time.Second)) cs.Measured = ExecutorBasicMeasuredCpuStats // calculate cpu usage percent - cs.Percent = np.cpuStatsTotal.Percent(cpuStats.Total() * float64(time.Second)) + cs.Percent = np.StatsTotalCPU.Percent(cpuStats.Total() * float64(time.Second)) } stats[strconv.Itoa(pid)] = &drivers.ResourceUsage{MemoryStats: ms, CpuStats: cs} } @@ -210,7 +202,7 @@ func aggregatedResourceUsage(systemCpuStats *stats.CpuStats, pidStats map[string } } -func getAllPidsByScanning() (map[int]*nomadPid, error) { +func getAllPidsByScanning() (resources.PIDs, error) { allProcesses, err := ps.Processes() if err != nil { return nil, err diff --git a/drivers/shared/executor/resource_container_default.go b/drivers/shared/executor/resource_container_default.go index ff4c8f9e808..0274e1b079d 100644 --- a/drivers/shared/executor/resource_container_default.go +++ b/drivers/shared/executor/resource_container_default.go @@ -1,5 +1,4 @@ -//go:build darwin || dragonfly || freebsd || netbsd || openbsd || solaris || windows -// +build darwin dragonfly freebsd netbsd openbsd solaris windows +//go:build !linux package executor diff --git a/drivers/shared/executor/resource_container_linux.go b/drivers/shared/executor/resource_container_linux.go deleted file mode 100644 index dbdf83f1628..00000000000 --- a/drivers/shared/executor/resource_container_linux.go +++ /dev/null @@ -1,63 +0,0 @@ -package executor - -import ( - "os" - "sync" - - "github.com/hashicorp/nomad/client/stats" - "github.com/opencontainers/runc/libcontainer/cgroups" - cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" -) - -// resourceContainerContext is a platform-specific struct for managing a -// resource container. In the case of Linux, this is used to control Cgroups. -type resourceContainerContext struct { - groups *cgroupConfig.Cgroup - cgLock sync.Mutex -} - -// cleanup removes this host's Cgroup from within an Executor's context -func (rc *resourceContainerContext) executorCleanup() error { - rc.cgLock.Lock() - defer rc.cgLock.Unlock() - if err := DestroyCgroup(rc.groups, os.Getpid()); err != nil { - return err - } - return nil -} - -func (rc *resourceContainerContext) isEmpty() bool { - return rc.groups == nil -} - -// todo(shoenig) cgroups.v2 #12351 -func (rc *resourceContainerContext) getAllPidsByCgroup() (map[int]*nomadPid, error) { - nPids := map[int]*nomadPid{} - - if rc.groups == nil { - return nPids, nil - } - - var path string - if p, ok := rc.groups.Paths["freezer"]; ok { - path = p - } else { - path = rc.groups.Path - } - - pids, err := cgroups.GetAllPids(path) - if err != nil { - return nPids, err - } - - for _, pid := range pids { - nPids[pid] = &nomadPid{ - pid: pid, - cpuStatsTotal: stats.NewCpuStats(), - cpuStatsUser: stats.NewCpuStats(), - cpuStatsSys: stats.NewCpuStats(), - } - } - - return nPids, nil -} diff --git a/main.go b/main.go index ee599656c0c..8fadcfa8f8d 100644 --- a/main.go +++ b/main.go @@ -14,10 +14,9 @@ import ( // processes along side of a task. By early importing them we can avoid // additional code being imported and thus reserving memory _ "github.com/hashicorp/nomad/client/logmon" + "github.com/hashicorp/nomad/command" _ "github.com/hashicorp/nomad/drivers/docker/docklog" _ "github.com/hashicorp/nomad/drivers/shared/executor" - - "github.com/hashicorp/nomad/command" "github.com/hashicorp/nomad/version" "github.com/mitchellh/cli" "github.com/sean-/seed" diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index 7441ce466fa..a0569b5b15e 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -2244,7 +2244,7 @@ func TestCoreScheduler_CSIPluginGC(t *testing.T) { } func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) { - ci.Parallel(t) + ci.SkipSlow(t, "flaky on GHA; #12358") srv, shutdown := TestServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue diff --git a/nomad/leader_test.go b/nomad/leader_test.go index 7ee5e90fd8d..c6d92160d3c 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -1186,6 +1186,7 @@ func leaderElectionTest(t *testing.T, raftProtocol raft.ProtocolVersion) { func TestLeader_RollRaftServer(t *testing.T) { ci.Parallel(t) + ci.SkipSlow(t, "flaky on GHA; #12358") s1, cleanupS1 := TestServer(t, func(c *Config) { c.RaftConfig.ProtocolVersion = 2 @@ -1389,7 +1390,7 @@ func TestLeader_TransitionsUpdateConsistencyRead(t *testing.T) { // (and unpaused) upon leader elections (and step downs). func TestLeader_PausingWorkers(t *testing.T) { ci.Parallel(t) - + s1, cleanupS1 := TestServer(t, func(c *Config) { c.NumSchedulers = 12 }) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index bbbfc45fe24..576fe94df5c 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1864,6 +1864,9 @@ type Node struct { // Node name Name string + // CgroupParent for this node (linux only) + CgroupParent string + // HTTPAddr is the address on which the Nomad client is listening for http // requests HTTPAddr string diff --git a/plugins/drivers/testutils/testing.go b/plugins/drivers/testutils/testing.go index f0f6c6713c2..c08220e3c72 100644 --- a/plugins/drivers/testutils/testing.go +++ b/plugins/drivers/testutils/testing.go @@ -43,7 +43,6 @@ func (h *DriverHarness) Impl() drivers.DriverPlugin { } func NewDriverHarness(t testing.T, d drivers.DriverPlugin) *DriverHarness { logger := testlog.HCLogger(t).Named("driver_harness") - pd := drivers.NewDriverPlugin(d, logger) client, server := plugin.TestPluginGRPCConn(t, @@ -68,18 +67,26 @@ func NewDriverHarness(t testing.T, d drivers.DriverPlugin) *DriverHarness { } } -// setCgroup creates a v2 cgroup for the task, as if a Client were initialized -// and managing the cgroup as it normally would. +// setupCgroupV2 creates a v2 cgroup for the task, as if a Client were initialized +// and managing the cgroup as it normally would via the cpuset manager. +// +// Note that we are being lazy and trying to avoid importing cgutil because +// currently plugins/drivers/testutils is platform agnostic-ish. // -// Uses testing.slice as a parent. -func (h *DriverHarness) setCgroup(allocID, task string) { +// Some drivers (raw_exec) setup their own cgroup, while others (exec, java, docker) +// would otherwise depend on the Nomad cpuset manager (and docker daemon) to create +// one, which isn't available here in testing, and so we create one via the harness. +// Plumbing such metadata through to the harness is a mind bender, so we just always +// create the cgroup, but at least put it under 'testing.slice'. +// +// tl;dr raw_exec tests should ignore this cgroup. +func (h *DriverHarness) setupCgroupV2(allocID, task string) { if cgutil.UseV2 { h.cgroup = filepath.Join(cgutil.CgroupRoot, "testing.slice", cgutil.CgroupScope(allocID, task)) - f, err := os.Create(h.cgroup) - if err != nil { + h.logger.Trace("create cgroup for test", "parent", "testing.slice", "id", allocID, "task", task, "path", h.cgroup) + if err := os.MkdirAll(h.cgroup, 0755); err != nil { panic(err) } - defer f.Close() } } @@ -89,11 +96,15 @@ func (h *DriverHarness) Kill() { h.cleanupCgroup() } +// cleanupCgroup might cleanup a cgroup that may or may not be tricked by DriverHarness. func (h *DriverHarness) cleanupCgroup() { - if cgutil.UseV2 { - err := os.Remove(h.cgroup) - if err != nil { - panic(err) + // some [non-exec] tests don't bother with MkAllocDir which is what would create + // the cgroup, but then do call Kill, so in that case skip the cgroup cleanup + if cgutil.UseV2 && h.cgroup != "" { + if err := os.Remove(h.cgroup); err != nil && !os.IsNotExist(err) { + // in some cases the driver will cleanup the cgroup itself, in which + // case we do not care about the cgroup not existing at cleanup time + h.t.Fatalf("failed to cleanup cgroup: %v", err) } } } @@ -153,6 +164,7 @@ func (h *DriverHarness) MkAllocDir(t *drivers.TaskConfig, enableLogs bool) func( require.NoError(h.t, err) fsi := caps.FSIsolation + h.logger.Trace("FS isolation", "fsi", fsi) require.NoError(h.t, taskDir.Build(fsi == drivers.FSIsolationChroot, tinyChroot)) task := &structs.Task{ @@ -181,8 +193,8 @@ func (h *DriverHarness) MkAllocDir(t *drivers.TaskConfig, enableLogs bool) func( } } - // set cgroup - h.setCgroup(alloc.ID, task.Name) + // setup a v2 cgroup for test cases that assume one exists + h.setupCgroupV2(alloc.ID, task.Name) //logmon if enableLogs { diff --git a/website/content/partials/envvars.mdx b/website/content/partials/envvars.mdx index cf57f13a088..019603d741a 100644 --- a/website/content/partials/envvars.mdx +++ b/website/content/partials/envvars.mdx @@ -115,6 +115,12 @@ Datacenter in which the allocation is running + + + NOMAD_PARENT_CGROUP + + The parent cgroup used to contain task cgroups (Linux only) + NOMAD_NAMESPACE From 1d2e2c0d3c1b135710a6918ef4ad044984d9d1ff Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Tue, 5 Apr 2022 15:21:15 -0500 Subject: [PATCH 2/2] raw_exec: fixup review comments --- api/fs_test.go | 1 + api/nodes.go | 1 + client/lib/resources/containment_default.go | 11 ----------- nomad/core_sched_test.go | 2 -- 4 files changed, 2 insertions(+), 13 deletions(-) delete mode 100644 client/lib/resources/containment_default.go diff --git a/api/fs_test.go b/api/fs_test.go index 9204484cb03..449239f9654 100644 --- a/api/fs_test.go +++ b/api/fs_test.go @@ -20,6 +20,7 @@ func TestFS_Logs(t *testing.T) { testutil.Parallel(t) require := require.New(t) rpcPort := 0 + c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) { rpcPort = c.Ports.RPC c.Client = &testutil.ClientConfig{ diff --git a/api/nodes.go b/api/nodes.go index 0a9ed2833ab..66b25a592f0 100644 --- a/api/nodes.go +++ b/api/nodes.go @@ -549,6 +549,7 @@ type Node struct { Links map[string]string Meta map[string]string NodeClass string + CgroupParent string Drain bool DrainStrategy *DrainStrategy SchedulingEligibility string diff --git a/client/lib/resources/containment_default.go b/client/lib/resources/containment_default.go deleted file mode 100644 index 1f3ec38fae0..00000000000 --- a/client/lib/resources/containment_default.go +++ /dev/null @@ -1,11 +0,0 @@ -//go:build !linux - -package resources - -type containment struct { - // non-linux executors currently do not create resources to be cleaned up -} - -func (c *containment) Cleanup() error { - return nil -} diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index a0569b5b15e..aa2d968fd1f 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -2244,8 +2244,6 @@ func TestCoreScheduler_CSIPluginGC(t *testing.T) { } func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) { - ci.SkipSlow(t, "flaky on GHA; #12358") - srv, shutdown := TestServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue })