From 3a488fcd3bfe80e3160065007fe706e02fde9547 Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Wed, 16 Aug 2023 21:40:52 +0000 Subject: [PATCH 1/2] client: refactor cpuset partitioning This PR updates the way Nomad client manages the split between tasks that make use of resources.cpus vs. resources.cores. Previously, each task was explicitly assigned which CPU cores they were able to run on. Every time a task was started or destroyed, all other tasks' cpusets would need to be updated. This was inefficient and would crush the Linux kernel when a client would try to run ~400 or so tasks. Now, we make use of cgroup heirarchy and cpuset inheritence to efficiently manage cpusets. --- client/allocrunner/alloc_runner.go | 18 ++ client/allocrunner/alloc_runner_hooks.go | 1 + client/allocrunner/cpuparts_hook.go | 56 +++++ client/allocrunner/taskrunner/task_runner.go | 3 +- .../taskrunner/task_runner_hooks.go | 2 +- .../taskrunner/task_runner_test.go | 2 +- .../allocrunner/taskrunner/wrangler_hook.go | 8 +- client/allocrunner/testing.go | 5 +- client/client.go | 13 +- client/config/arconfig.go | 3 + client/config/config.go | 4 +- client/fingerprint/cpu.go | 11 +- client/fingerprint/cpu_default_test.go | 4 +- client/interfaces/client.go | 9 + client/lib/cgroupslib/default.go | 2 +- client/lib/cgroupslib/editor.go | 64 ++++-- client/lib/cgroupslib/init.go | 207 +++++++++++++++--- client/lib/cgroupslib/init_default.go | 11 + client/lib/cgroupslib/mount.go | 5 + client/lib/cgroupslib/partition.go | 63 ++++++ client/lib/cgroupslib/partition_default.go | 16 ++ client/lib/cgroupslib/partition_linux.go | 98 +++++++++ client/lib/cgroupslib/partition_noop.go | 25 +++ client/lib/cgroupslib/partition_test.go | 96 ++++++++ client/lib/cgroupslib/testing.go | 53 +++++ client/lib/idset/idset.go | 23 +- client/lib/numalib/detect.go | 5 +- client/lib/numalib/detect_darwin.go | 11 +- client/lib/numalib/detect_default.go | 9 +- client/lib/numalib/detect_linux.go | 19 +- client/lib/numalib/hw/ids.go | 20 ++ client/lib/numalib/topology.go | 50 ++--- client/lib/proclib/config.go | 6 + client/lib/proclib/testing.go | 38 ++++ client/lib/proclib/wrangler.go | 4 +- client/lib/proclib/wrangler_cg1_linux.go | 4 +- client/lib/proclib/wrangler_cg2_linux.go | 4 +- client/state/upgrade_int_test.go | 4 +- command/agent/agent.go | 30 ++- command/agent/config.go | 8 +- command/agent/consul/int_test.go | 2 +- drivers/docker/cpuset.go | 76 +++++++ drivers/docker/cpuset_test.go | 42 ++++ drivers/docker/driver.go | 14 +- drivers/docker/driver_test.go | 16 +- drivers/docker/handle.go | 36 ++- drivers/exec/driver_test.go | 2 +- drivers/java/driver_test.go | 2 +- drivers/rawexec/driver_test.go | 2 +- drivers/shared/executor/executor.go | 36 ++- drivers/shared/executor/executor_linux.go | 113 ++++++---- .../shared/executor/executor_linux_test.go | 7 +- drivers/shared/executor/executor_test.go | 4 +- .../executor/executor_universal_linux.go | 49 +++-- .../shared/executor/procstats/list_linux.go | 13 +- nomad/structs/structs.go | 33 +++ nomad/structs/structs_test.go | 35 +++ plugins/drivers/testutils/testing_linux.go | 2 +- 58 files changed, 1270 insertions(+), 228 deletions(-) create mode 100644 client/allocrunner/cpuparts_hook.go create mode 100644 client/lib/cgroupslib/init_default.go create mode 100644 client/lib/cgroupslib/partition.go create mode 100644 client/lib/cgroupslib/partition_default.go create mode 100644 client/lib/cgroupslib/partition_linux.go create mode 100644 client/lib/cgroupslib/partition_noop.go create mode 100644 client/lib/cgroupslib/partition_test.go create mode 100644 client/lib/cgroupslib/testing.go create mode 100644 client/lib/numalib/hw/ids.go create mode 100644 client/lib/proclib/testing.go create mode 100644 drivers/docker/cpuset.go create mode 100644 drivers/docker/cpuset_test.go diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 76d695bd527..b8c838d7695 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -21,6 +21,8 @@ import ( "github.com/hashicorp/nomad/client/devicemanager" "github.com/hashicorp/nomad/client/dynamicplugins" cinterfaces "github.com/hashicorp/nomad/client/interfaces" + "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" "github.com/hashicorp/nomad/client/lib/proclib" "github.com/hashicorp/nomad/client/pluginmanager/csimanager" "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" @@ -201,6 +203,9 @@ type allocRunner struct { // wranglers is an interface for managing unix/windows processes. wranglers cinterfaces.ProcessWranglers + // partitions is an interface for managing cpuset partitions + partitions cinterfaces.CPUPartitions + // widmgr fetches workload identities widmgr *widmgr.WIDMgr } @@ -244,6 +249,7 @@ func NewAllocRunner(config *config.AllocRunnerConfig) (interfaces.AllocRunner, e checkStore: config.CheckStore, getter: config.Getter, wranglers: config.Wranglers, + partitions: config.Partitions, hookResources: cstructs.NewAllocHookResources(), widmgr: config.WIDMgr, } @@ -455,6 +461,9 @@ func (ar *allocRunner) Restore() error { // restore process wrangler for task ar.wranglers.Setup(proclib.Task{AllocID: tr.Alloc().ID, Task: tr.Task().Name}) + + // restore cpuset partition state + ar.restoreCores(tr.Alloc().AllocatedResources) } ar.taskCoordinator.Restore(states) @@ -462,6 +471,15 @@ func (ar *allocRunner) Restore() error { return nil } +// restoreCores will restore the cpuset partitions with the reserved core +// data for each task in the alloc +func (ar *allocRunner) restoreCores(res *structs.AllocatedResources) { + for _, taskRes := range res.Tasks { + s := idset.From[hw.CoreID](taskRes.Cpu.ReservedCores) + ar.partitions.Restore(s) + } +} + // persistDeploymentStatus stores AllocDeploymentStatus. func (ar *allocRunner) persistDeploymentStatus(ds *structs.AllocDeploymentStatus) { if err := ar.stateDB.PutDeploymentStatus(ar.id, ds); err != nil { diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index 82dfac520a5..76ae3710c4c 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -121,6 +121,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { newAllocDirHook(hookLogger, ar.allocDir), newUpstreamAllocsHook(hookLogger, ar.prevAllocWatcher), newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir), + newCPUPartsHook(hookLogger, ar.partitions, alloc), newAllocHealthWatcherHook(hookLogger, alloc, newEnvBuilder, hs, ar.Listener(), ar.consulClient, ar.checkStore), newNetworkHook(hookLogger, ns, alloc, nm, nc, ar, builtTaskEnv), newGroupServiceHook(groupServiceHookConfig{ diff --git a/client/allocrunner/cpuparts_hook.go b/client/allocrunner/cpuparts_hook.go new file mode 100644 index 00000000000..d0252ffaed0 --- /dev/null +++ b/client/allocrunner/cpuparts_hook.go @@ -0,0 +1,56 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package allocrunner + +import ( + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/lib/cgroupslib" + "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" + "github.com/hashicorp/nomad/nomad/structs" +) + +const ( + cpuPartsHookName = "cpuparts_hook" +) + +// cpuPartsHooks is responsible for managing cpuset partitioning on Linux +// nodes. This mechanism works by segregating tasks that make use of "cpu" vs. +// "cores" resources. Tasks that make use of "cpu" resource actually make use +// of shared cores that have not been reserved. The scheduler ensures enough +// cores on a node are not reserved such that all tasks have the minimum amount +// of cpu bandwidth they requested. +type cpuPartsHook struct { + logger hclog.Logger + allocID string + + reservations *idset.Set[hw.CoreID] + partitions cgroupslib.Partition +} + +func newCPUPartsHook( + logger hclog.Logger, + partitions cgroupslib.Partition, + alloc *structs.Allocation, +) *cpuPartsHook { + + return &cpuPartsHook{ + logger: logger, + allocID: alloc.ID, + partitions: partitions, + reservations: alloc.ReservedCores(), + } +} + +func (h *cpuPartsHook) Name() string { + return cpuPartsHookName +} + +func (h *cpuPartsHook) Prerun() error { + return h.partitions.Reserve(h.reservations) +} + +func (h *cpuPartsHook) Postrun() error { + return h.partitions.Release(h.reservations) +} diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 0d9d6e94d07..cf70f5ace0e 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -848,7 +848,8 @@ func (tr *TaskRunner) shouldRestart() (bool, time.Duration) { } func (tr *TaskRunner) assignCgroup(taskConfig *drivers.TaskConfig) { - p := cgroupslib.LinuxResourcesPath(taskConfig.AllocID, taskConfig.Name) + reserveCores := len(tr.taskResources.Cpu.ReservedCores) > 0 + p := cgroupslib.LinuxResourcesPath(taskConfig.AllocID, taskConfig.Name, reserveCores) taskConfig.Resources.LinuxResources.CpusetCgroupPath = p } diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 80107917688..4a54c3b2b50 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -72,7 +72,7 @@ func (tr *TaskRunner) initHooks() { newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, hookLogger), newDeviceHook(tr.devicemanager, hookLogger), newAPIHook(tr.shutdownCtx, tr.clientConfig.APIListenerRegistrar, hookLogger), - newWranglerHook(tr.wranglers, task.Name, alloc.ID, hookLogger), + newWranglerHook(tr.wranglers, task.Name, alloc.ID, task.UsesCores(), hookLogger), } // If the task has a CSI block, add the hook. diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 70121907ee2..1023008be9b 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -135,7 +135,7 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri ShutdownDelayCancelFn: shutdownDelayCancelFn, ServiceRegWrapper: wrapperMock, Getter: getter.TestSandbox(t), - Wranglers: proclib.New(&proclib.Configs{Logger: testlog.HCLogger(t)}), + Wranglers: proclib.MockWranglers(t), WIDMgr: NewMockWIDMgr(nil), } diff --git a/client/allocrunner/taskrunner/wrangler_hook.go b/client/allocrunner/taskrunner/wrangler_hook.go index 975839709e8..d7cec795ad4 100644 --- a/client/allocrunner/taskrunner/wrangler_hook.go +++ b/client/allocrunner/taskrunner/wrangler_hook.go @@ -26,13 +26,19 @@ type wranglerHook struct { log hclog.Logger } -func newWranglerHook(wranglers cifs.ProcessWranglers, task, allocID string, log hclog.Logger) *wranglerHook { +func newWranglerHook( + wranglers cifs.ProcessWranglers, + task, allocID string, + cores bool, + log hclog.Logger, +) *wranglerHook { return &wranglerHook{ log: log.Named(wranglerHookName), wranglers: wranglers, task: proclib.Task{ AllocID: allocID, Task: task, + Cores: cores, }, } } diff --git a/client/allocrunner/testing.go b/client/allocrunner/testing.go index bfd23a3a289..69a817a8733 100644 --- a/client/allocrunner/testing.go +++ b/client/allocrunner/testing.go @@ -18,6 +18,7 @@ import ( clientconfig "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/devicemanager" + "github.com/hashicorp/nomad/client/lib/cgroupslib" "github.com/hashicorp/nomad/client/lib/proclib" "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" "github.com/hashicorp/nomad/client/serviceregistration/checks/checkstore" @@ -25,7 +26,6 @@ import ( "github.com/hashicorp/nomad/client/serviceregistration/wrapper" "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/vaultclient" - "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" @@ -96,7 +96,8 @@ func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*config.All ServiceRegWrapper: wrapper.NewHandlerWrapper(clientConf.Logger, consulRegMock, nomadRegMock), CheckStore: checkstore.NewStore(clientConf.Logger, stateDB), Getter: getter.TestSandbox(t), - Wranglers: proclib.New(&proclib.Configs{Logger: testlog.HCLogger(t)}), + Wranglers: proclib.MockWranglers(t), + Partitions: cgroupslib.MockPartition(), } return conf, cleanup diff --git a/client/client.go b/client/client.go index c6e1ae5b991..950b092c1b1 100644 --- a/client/client.go +++ b/client/client.go @@ -33,6 +33,7 @@ import ( "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/client/hoststats" cinterfaces "github.com/hashicorp/nomad/client/interfaces" + "github.com/hashicorp/nomad/client/lib/cgroupslib" "github.com/hashicorp/nomad/client/lib/numalib" "github.com/hashicorp/nomad/client/lib/proclib" "github.com/hashicorp/nomad/client/pluginmanager" @@ -331,6 +332,9 @@ type Client struct { // fingerprinting topology *numalib.Topology + // partitions is used for managing cpuset partitioning on linux systems + partitions cgroupslib.Partition + // widmgr retrieves workload identities widmgr *widmgr.WIDMgr } @@ -465,9 +469,15 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie c.topology = numalib.NoImpl(ir.Topology) } + // Create the cpu core partition manager + c.partitions = cgroupslib.GetPartition( + c.topology.UsableCores(), + ) + // Create the process wranglers wranglers := proclib.New(&proclib.Configs{ - Logger: c.logger.Named("proclib"), + UsableCores: c.topology.UsableCores(), + Logger: c.logger.Named("proclib"), }) c.wranglers = wranglers @@ -2754,6 +2764,7 @@ func (c *Client) newAllocRunnerConfig( Vault: c.vaultClient, WIDMgr: c.widmgr, Wranglers: c.wranglers, + Partitions: c.partitions, } } diff --git a/client/config/arconfig.go b/client/config/arconfig.go index 3ef5fbc120e..6e882809195 100644 --- a/client/config/arconfig.go +++ b/client/config/arconfig.go @@ -110,6 +110,9 @@ type AllocRunnerConfig struct { // Wranglers is an interface for managing unix/windows processes. Wranglers interfaces.ProcessWranglers + // Partitions is an interface for managing cpuset partitions. + Partitions interfaces.CPUPartitions + // WIDMgr fetches workload identities WIDMgr *widmgr.WIDMgr } diff --git a/client/config/config.go b/client/config/config.go index 05604c148c5..5b48c9cfd96 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -18,7 +18,7 @@ import ( "github.com/hashicorp/consul-template/config" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/interfaces" - "github.com/hashicorp/nomad/client/lib/numalib" + "github.com/hashicorp/nomad/client/lib/numalib/hw" "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/command/agent/host" "github.com/hashicorp/nomad/helper" @@ -309,7 +309,7 @@ type Config struct { CgroupParent string // ReservableCores if set overrides the set of reservable cores reported in fingerprinting. - ReservableCores []numalib.CoreID + ReservableCores []hw.CoreID // NomadServiceDiscovery determines whether the Nomad native service // discovery client functionality is enabled. diff --git a/client/fingerprint/cpu.go b/client/fingerprint/cpu.go index ffc482841ec..5c9d9df3b13 100644 --- a/client/fingerprint/cpu.go +++ b/client/fingerprint/cpu.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/lib/idset" "github.com/hashicorp/nomad/client/lib/numalib" + "github.com/hashicorp/nomad/client/lib/numalib/hw" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" "github.com/klauspost/cpuid/v2" @@ -77,14 +78,14 @@ func (*CPUFingerprint) reservedCompute(request *FingerprintRequest) structs.Node func (f *CPUFingerprint) initialize(request *FingerprintRequest) { var ( - reservableCores *idset.Set[numalib.CoreID] + reservableCores *idset.Set[hw.CoreID] totalCompute = request.Config.CpuCompute reservedCompute = f.reservedCompute(request) - reservedCores = idset.From[numalib.CoreID](reservedCompute.ReservedCpuCores) + reservedCores = idset.From[hw.CoreID](reservedCompute.ReservedCpuCores) ) if rc := request.Config.ReservableCores; rc != nil { - reservableCores = idset.From[numalib.CoreID](rc) + reservableCores = idset.From[hw.CoreID](rc) } f.top = numalib.Scan(append( @@ -157,7 +158,7 @@ func (f *CPUFingerprint) setReservableCores(response *FingerprintResponse) { usable := f.top.UsableCores() response.AddAttribute("cpu.reservablecores", f.cores(usable.Size())) f.nodeResources.Cpu.ReservableCpuCores = helper.ConvertSlice( - usable.Slice(), func(id numalib.CoreID) uint16 { + usable.Slice(), func(id hw.CoreID) uint16 { return uint16(id) }) default: @@ -189,7 +190,7 @@ func (f *CPUFingerprint) setNUMA(response *FingerprintResponse) { nodes := f.top.Nodes() response.AddAttribute("numa.node.count", f.nodes(nodes.Size())) - nodes.ForEach(func(id numalib.NodeID) error { + nodes.ForEach(func(id hw.NodeID) error { key := fmt.Sprintf("numa.node%d.cores", id) cores := f.top.NodeCores(id) response.AddAttribute(key, cores.String()) diff --git a/client/fingerprint/cpu_default_test.go b/client/fingerprint/cpu_default_test.go index 301a111d2f7..e3c873814fe 100644 --- a/client/fingerprint/cpu_default_test.go +++ b/client/fingerprint/cpu_default_test.go @@ -11,7 +11,7 @@ import ( "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/client/config" - "github.com/hashicorp/nomad/client/lib/numalib" + "github.com/hashicorp/nomad/client/lib/numalib/hw" "github.com/hashicorp/nomad/client/testutil" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/structs" @@ -63,7 +63,7 @@ func TestCPUFingerprint_OverrideCompute(t *testing.T) { Attributes: make(map[string]string), } cfg := &config.Config{ - ReservableCores: []numalib.CoreID{0, 1, 2}, + ReservableCores: []hw.CoreID{0, 1, 2}, } var originalCPU int diff --git a/client/interfaces/client.go b/client/interfaces/client.go index 3373cd265ad..796c52250fa 100644 --- a/client/interfaces/client.go +++ b/client/interfaces/client.go @@ -4,6 +4,8 @@ package interfaces import ( + "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" "github.com/hashicorp/nomad/client/lib/proclib" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/device" @@ -47,3 +49,10 @@ type ProcessWranglers interface { Setup(proclib.Task) error Destroy(proclib.Task) error } + +// CPUPartitions is an interface satisfied by the cgroupslib package. +type CPUPartitions interface { + Restore(*idset.Set[hw.CoreID]) + Reserve(*idset.Set[hw.CoreID]) error + Release(*idset.Set[hw.CoreID]) error +} diff --git a/client/lib/cgroupslib/default.go b/client/lib/cgroupslib/default.go index 12376bc9592..d8685a560ea 100644 --- a/client/lib/cgroupslib/default.go +++ b/client/lib/cgroupslib/default.go @@ -6,7 +6,7 @@ package cgroupslib // LinuxResourcesPath does nothing on non-Linux systems -func LinuxResourcesPath(string, string) string { +func LinuxResourcesPath(string, string, bool) string { return "" } diff --git a/client/lib/cgroupslib/editor.go b/client/lib/cgroupslib/editor.go index 0da8f49b439..6d846078d83 100644 --- a/client/lib/cgroupslib/editor.go +++ b/client/lib/cgroupslib/editor.go @@ -32,16 +32,14 @@ func OpenPath(dir string) Interface { } } -// OpenFromCpusetCG1 creates a handle for modifying cgroup interface files of -// the given interface, given a path to the cpuset interface. -// -// This is useful because a Nomad task resources struct only keeps track of -// the cpuset cgroup directory in the cgroups v1 regime, but nowadays we want -// to modify more than the cpuset in some cases. -func OpenFromCpusetCG1(dir, iface string) Interface { - return &editor{ - dpath: strings.Replace(dir, "cpuset", iface, 1), +// OpenFromFreezerCG1 creates a handle for modifying cgroup interface files +// of the given interface, given a path to the freezer cgroup. +func OpenFromFreezerCG1(orig, iface string) Interface { + if iface == "cpuset" { + panic("cannot open cpuset") } + p := strings.Replace(orig, "/freezer/", "/"+iface+"/", 1) + return OpenPath(p) } // An Interface can be used to read and write the interface files of a cgroup. @@ -89,16 +87,17 @@ func (e *editor) Write(filename, content string) error { // A Factory creates a Lifecycle which is an abstraction over the setup and // teardown routines used for creating and destroying cgroups used for // constraining Nomad tasks. -func Factory(allocID, task string) Lifecycle { +func Factory(allocID, task string, cores bool) Lifecycle { switch GetMode() { case CG1: return &lifeCG1{ allocID: allocID, task: task, + cores: cores, } default: return &lifeCG2{ - dpath: pathCG2(allocID, task), + dpath: pathCG2(allocID, task, cores), } } } @@ -118,6 +117,7 @@ type Lifecycle interface { type lifeCG1 struct { allocID string task string + cores bool // uses core reservation } func (l *lifeCG1) Setup() error { @@ -127,13 +127,32 @@ func (l *lifeCG1) Setup() error { if err != nil { return err } + if strings.Contains(p, "/reserve/") { + if err = l.inheritMems(p); err != nil { + return err + } + } } return nil } +func (l *lifeCG1) inheritMems(destination string) error { + parent := filepath.Join(filepath.Dir(destination), "cpuset.mems") + b, err := os.ReadFile(parent) + if err != nil { + return err + } + destination = filepath.Join(destination, "cpuset.mems") + return os.WriteFile(destination, b, 0644) +} + func (l *lifeCG1) Teardown() error { paths := l.paths() for _, p := range paths { + if filepath.Base(p) == "share" { + // avoid removing the share cgroup + continue + } err := os.RemoveAll(p) if err != nil { return err @@ -162,7 +181,7 @@ func (l *lifeCG1) Kill() error { } func (l *lifeCG1) edit(iface string) *editor { - scope := scopeCG1(l.allocID, l.task) + scope := ScopeCG1(l.allocID, l.task) return &editor{ dpath: filepath.Join(root, iface, NomadCgroupParent, scope), } @@ -184,14 +203,22 @@ func (l *lifeCG1) thaw() error { } func (l *lifeCG1) paths() []string { - scope := scopeCG1(l.allocID, l.task) - ifaces := []string{"freezer", "cpu", "memory", "cpuset"} - paths := make([]string, 0, len(ifaces)) + scope := ScopeCG1(l.allocID, l.task) + ifaces := []string{"freezer", "cpu", "memory"} + paths := make([]string, 0, len(ifaces)+1) for _, iface := range ifaces { paths = append(paths, filepath.Join( root, iface, NomadCgroupParent, scope, )) } + + switch partition := GetPartitionFromBool(l.cores); partition { + case "reserve": + paths = append(paths, filepath.Join(root, "cpuset", NomadCgroupParent, partition, scope)) + case "share": + paths = append(paths, filepath.Join(root, "cpuset", NomadCgroupParent, partition)) + } + return paths } @@ -235,7 +262,7 @@ func getPIDs(file string) (*set.Set[int], error) { return result, nil } -func scopeCG1(allocID, task string) string { +func ScopeCG1(allocID, task string) string { return fmt.Sprintf("%s.%s", allocID, task) } @@ -243,6 +270,7 @@ func scopeCG2(allocID, task string) string { return fmt.Sprintf("%s.%s.scope", allocID, task) } -func pathCG2(allocID, task string) string { - return filepath.Join(root, NomadCgroupParent, scopeCG2(allocID, task)) +func pathCG2(allocID, task string, cores bool) string { + partition := GetPartitionFromBool(cores) + return filepath.Join(root, NomadCgroupParent, partition, scopeCG2(allocID, task)) } diff --git a/client/lib/cgroupslib/init.go b/client/lib/cgroupslib/init.go index 8e95f36d1f0..0da6f6f7443 100644 --- a/client/lib/cgroupslib/init.go +++ b/client/lib/cgroupslib/init.go @@ -9,15 +9,31 @@ import ( "bytes" "os" "path/filepath" - "strings" "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-set" ) -func Init(log hclog.Logger) { +// Init will initialize the cgroup tree that the Nomad client will use for +// isolating resources of tasks. cores is the cpuset granted for use by Nomad. +func Init(log hclog.Logger, cores string) { + log.Info("initializing nomad cgroups", "cores", cores) + switch GetMode() { case CG1: + // the name of the cpuset interface file + const cpusetFile = "cpuset.cpus" + + // the name of the cpuset mems interface file + const memsFile = "cpuset.mems" + + const memsSet = "0" // TODO(shoenig) get from topology + + // the value to disable inheriting values from parent cgroup + const noClone = "0" + + // the name of the clone_children interface file + const cloneFile = "cgroup.clone_children" + // create the /nomad cgroup (or whatever the name is configured to be) // for each cgroup controller we are going to use controllers := []string{"freezer", "memory", "cpu", "cpuset"} @@ -25,43 +41,151 @@ func Init(log hclog.Logger) { p := filepath.Join(root, ctrl, NomadCgroupParent) if err := os.MkdirAll(p, 0755); err != nil { log.Error("failed to create nomad cgroup", "controller", ctrl, "error", err) + return } } - case CG2: - // minimum controllers must be set first - s, err := readRootCG2("cgroup.subtree_control") - if err != nil { - log.Error("failed to create nomad cgroup", "error", err) + + // + // configure cpuset partitioning + // + // the tree is lopsided - tasks making use of reserved cpu cores get + // their own cgroup with a static cpuset.cpus value. other tasks are + // placed in the single share cgroup and share its dynamic cpuset.cpus + // value + // + // e.g., + // root/cpuset/nomad/ + // share/{cgroup.procs, cpuset.cpus, cpuset.mems} + // reserve/ + // abc123.task/{cgroup.procs, cpuset.cpus, cpuset.mems} + // def456.task/{cgroup.procs, cpuset.cpus, cpuset.mems} + + if err := writeCG(noClone, "cpuset", NomadCgroupParent, cloneFile); err != nil { + log.Error("failed to set clone_children on nomad cpuset cgroup", "error", err) return } - required := set.From([]string{"cpuset", "cpu", "io", "memory", "pids"}) - enabled := set.From(strings.Fields(s)) - needed := required.Difference(enabled) + if err := writeCG(memsSet, "cpuset", NomadCgroupParent, memsFile); err != nil { + log.Error("failed to set cpuset.mems on nomad cpuset cgroup", "error", err) + return + } - if needed.Size() == 0 { - log.Debug("top level nomad.slice cgroup already exists") - return // already setup + if err := writeCG(cores, "cpuset", NomadCgroupParent, cpusetFile); err != nil { + log.Error("failed to write cores to nomad cpuset cgroup", "error", err) + return } - sb := new(strings.Builder) - for _, controller := range needed.List() { - sb.WriteString("+" + controller + " ") + // + // share partition + // + + if err := mkCG("cpuset", NomadCgroupParent, SharePartition()); err != nil { + log.Error("failed to create share cpuset partition", "error", err) + return + } + + if err := writeCG(noClone, "cpuset", NomadCgroupParent, SharePartition(), cloneFile); err != nil { + log.Error("failed to set clone_children on nomad cpuset cgroup", "error", err) + return } - activation := strings.TrimSpace(sb.String()) - if err = writeRootCG2("cgroup.subtree_control", activation); err != nil { + if err := writeCG(memsSet, "cpuset", NomadCgroupParent, SharePartition(), memsFile); err != nil { + log.Error("failed to set cpuset.mems on share cpuset partition", "error", err) + return + } + + // + // reserve partition + // + + if err := mkCG("cpuset", NomadCgroupParent, ReservePartition()); err != nil { + log.Error("failed to create reserve cpuset partition", "error", err) + return + } + + if err := writeCG(noClone, "cpuset", NomadCgroupParent, ReservePartition(), cloneFile); err != nil { + log.Error("failed to set clone_children on nomad cpuset cgroup", "error", err) + return + } + + if err := writeCG(memsSet, "cpuset", NomadCgroupParent, ReservePartition(), memsFile); err != nil { + log.Error("failed to set cpuset.mems on reserve cpuset partition", "error", err) + return + } + + log.Debug("nomad cpuset partitions initialized", "cores", cores) + + case CG2: + // the cgroup controllers we need to activate at the root and on the nomad slice + const activation = "+cpuset +cpu +io +memory +pids" + + // the name of the cgroup subtree interface file + const subtreeFile = "cgroup.subtree_control" + + // the name of the cpuset interface file + const cpusetFile = "cpuset.cpus" + + // + // configuring root cgroup (/sys/fs/cgroup) + // + + if err := writeCG(activation, subtreeFile); err != nil { log.Error("failed to create nomad cgroup", "error", err) return } - nomadSlice := filepath.Join("/sys/fs/cgroup", NomadCgroupParent) - if err := os.MkdirAll(nomadSlice, 0755); err != nil { + // + // configuring nomad.slice + // + + if err := mkCG(NomadCgroupParent); err != nil { log.Error("failed to create nomad cgroup", "error", err) return } - log.Debug("top level nomad.slice cgroup initialized", "controllers", needed) + if err := writeCG(activation, NomadCgroupParent, subtreeFile); err != nil { + log.Error("failed to set subtree control on nomad cgroup", "error", err) + return + } + + if err := writeCG(cores, NomadCgroupParent, cpusetFile); err != nil { + log.Error("failed to write root partition cpuset", "error", err) + return + } + + log.Debug("top level partition root nomad.slice cgroup initialized") + + // + // configuring nomad.slice/share (member) + // + + if err := mkCG(NomadCgroupParent, SharePartition()); err != nil { + log.Error("failed to create share cgroup", "error", err) + return + } + + if err := writeCG(activation, NomadCgroupParent, SharePartition(), subtreeFile); err != nil { + log.Error("failed to set subtree control on cpuset share partition", "error", err) + return + } + + log.Debug("partition member nomad.slice/share cgroup initialized") + + // + // configuring nomad.slice/reserve (member) + // + + if err := mkCG(NomadCgroupParent, ReservePartition()); err != nil { + log.Error("failed to create share cgroup", "error", err) + return + } + + if err := writeCG(activation, NomadCgroupParent, ReservePartition(), subtreeFile); err != nil { + log.Error("failed to set subtree control on cpuset reserve partition", "error", err) + return + } + + log.Debug("partition member nomad.slice/reserve cgroup initialized") } } @@ -71,11 +195,26 @@ func readRootCG2(filename string) (string, error) { return string(bytes.TrimSpace(b)), err } -func writeRootCG2(filename, content string) error { - p := filepath.Join(root, filename) +// filepathCG will return the given paths based on the cgroup root +func filepathCG(paths ...string) string { + base := []string{root} + base = append(base, paths...) + p := filepath.Join(base...) + return p +} + +// writeCG will write content to the cgroup interface file given by paths +func writeCG(content string, paths ...string) error { + p := filepathCG(paths...) return os.WriteFile(p, []byte(content), 0644) } +// mkCG will create a cgroup at the given path +func mkCG(paths ...string) error { + p := filepathCG(paths...) + return os.MkdirAll(p, 0755) +} + // ReadNomadCG2 reads an interface file under the nomad.slice parent cgroup // (or whatever its name is configured to be) func ReadNomadCG2(filename string) (string, error) { @@ -97,13 +236,23 @@ func WriteNomadCG1(iface, filename, content string) error { return os.WriteFile(p, []byte(content), 0644) } +// PathCG1 returns the filepath to the cgroup directory of the given interface +// and allocID / taskName. +func PathCG1(allocID, taskName, iface string) string { + return filepath.Join(root, iface, NomadCgroupParent, ScopeCG1(allocID, taskName)) +} + // LinuxResourcesPath returns the filepath to the directory that the field // x.Resources.LinuxResources.CpusetCgroupPath is expected to hold on to -func LinuxResourcesPath(allocID, task string) string { - switch GetMode() { - case CG1: - return filepath.Join(root, "cpuset", NomadCgroupParent, scopeCG1(allocID, task)) +func LinuxResourcesPath(allocID, task string, reserveCores bool) string { + partition := GetPartitionFromBool(reserveCores) + mode := GetMode() + switch { + case mode == CG1 && reserveCores: + return filepath.Join(root, "cpuset", NomadCgroupParent, partition, ScopeCG1(allocID, task)) + case mode == CG1 && !reserveCores: + return filepath.Join(root, "cpuset", NomadCgroupParent, partition) default: - return filepath.Join(root, NomadCgroupParent, scopeCG2(allocID, task)) + return filepath.Join(root, NomadCgroupParent, partition, scopeCG2(allocID, task)) } } diff --git a/client/lib/cgroupslib/init_default.go b/client/lib/cgroupslib/init_default.go new file mode 100644 index 00000000000..d08d6b11367 --- /dev/null +++ b/client/lib/cgroupslib/init_default.go @@ -0,0 +1,11 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +//go:build !linux + +package cgroupslib + +// PathCG1 returns empty string on non-Linux systems +func PathCG1(allocID, taskName, iface string) string { + return "" +} diff --git a/client/lib/cgroupslib/mount.go b/client/lib/cgroupslib/mount.go index b000084fce8..62c2f39193b 100644 --- a/client/lib/cgroupslib/mount.go +++ b/client/lib/cgroupslib/mount.go @@ -15,6 +15,10 @@ import ( ) func detect() Mode { + if os.Geteuid() > 0 { + return OFF + } + f, err := os.Open("/proc/self/mountinfo") if err != nil { return OFF @@ -22,6 +26,7 @@ func detect() Mode { defer func() { _ = f.Close() }() + return scan(f) } diff --git a/client/lib/cgroupslib/partition.go b/client/lib/cgroupslib/partition.go new file mode 100644 index 00000000000..6ada0425336 --- /dev/null +++ b/client/lib/cgroupslib/partition.go @@ -0,0 +1,63 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package cgroupslib + +import ( + "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" +) + +// A Partition is used to track reserved vs. shared cpu cores. +type Partition interface { + Restore(*idset.Set[hw.CoreID]) + Reserve(*idset.Set[hw.CoreID]) error + Release(*idset.Set[hw.CoreID]) error +} + +// SharePartition is the name of the cgroup containing cgroups for tasks +// making use of the 'cpu' resource, which pools and shares cpu cores. +func SharePartition() string { + switch GetMode() { + case CG1: + return "share" + case CG2: + return "share.slice" + default: + return "" + } +} + +// ReservePartition is the name of the cgroup containing cgroups for tasks +// making use of the 'cores' resource, which reserves specific cpu cores. +func ReservePartition() string { + switch GetMode() { + case CG1: + return "reserve" + case CG2: + return "reserve.slice" + default: + return "" + } +} + +// GetPartitionFromCores returns the name of the cgroup that should contain +// the cgroup of the task, which is determined by inspecting the cores +// parameter, which is a non-empty string if the task is using reserved +// cores. +func GetPartitionFromCores(cores string) string { + if cores == "" { + return SharePartition() + } + return ReservePartition() +} + +// GetPartitionFromBool returns the name of the cgroup that should contain +// the cgroup of the task, which is determined from the cores parameter, +// which indicates whether the task is making use of reserved cores. +func GetPartitionFromBool(cores bool) string { + if cores { + return ReservePartition() + } + return SharePartition() +} diff --git a/client/lib/cgroupslib/partition_default.go b/client/lib/cgroupslib/partition_default.go new file mode 100644 index 00000000000..f685c671d54 --- /dev/null +++ b/client/lib/cgroupslib/partition_default.go @@ -0,0 +1,16 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +//go:build !linux + +package cgroupslib + +import ( + "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" +) + +// GetPartition creates a no-op Partition that does not do anything. +func GetPartition(*idset.Set[hw.CoreID]) Partition { + return NoopPartition() +} diff --git a/client/lib/cgroupslib/partition_linux.go b/client/lib/cgroupslib/partition_linux.go new file mode 100644 index 00000000000..ee0891fa106 --- /dev/null +++ b/client/lib/cgroupslib/partition_linux.go @@ -0,0 +1,98 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +//go:build linux + +package cgroupslib + +import ( + "os" + "path/filepath" + "sync" + + "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" +) + +// GetPartition creates a Partition suitable for managing cores on this +// Linux system. +func GetPartition(cores *idset.Set[hw.CoreID]) Partition { + return NewPartition(cores) +} + +// NewPartition creates a cpuset partition manager for managing the books +// when allocations are created and destroyed. The initial set of cores is +// the usable set of cores by Nomad. +func NewPartition(cores *idset.Set[hw.CoreID]) Partition { + var ( + sharePath string + reservePath string + ) + + switch GetMode() { + case OFF: + return NoopPartition() + case CG1: + sharePath = filepath.Join(root, "cpuset", NomadCgroupParent, SharePartition(), "cpuset.cpus") + reservePath = filepath.Join(root, "cpuset", NomadCgroupParent, ReservePartition(), "cpuset.cpus") + case CG2: + sharePath = filepath.Join(root, NomadCgroupParent, SharePartition(), "cpuset.cpus") + reservePath = filepath.Join(root, NomadCgroupParent, ReservePartition(), "cpuset.cpus") + } + + return &partition{ + sharePath: sharePath, + reservePath: reservePath, + share: cores.Copy(), + reserve: idset.Empty[hw.CoreID](), + } +} + +type partition struct { + sharePath string + reservePath string + + lock sync.Mutex + share *idset.Set[hw.CoreID] + reserve *idset.Set[hw.CoreID] +} + +func (p *partition) Restore(cores *idset.Set[hw.CoreID]) { + p.lock.Lock() + defer p.lock.Unlock() + + p.share.RemoveSet(cores) + p.reserve.InsertSet(cores) +} + +func (p *partition) Reserve(cores *idset.Set[hw.CoreID]) error { + p.lock.Lock() + defer p.lock.Unlock() + + p.share.RemoveSet(cores) + p.reserve.InsertSet(cores) + + return p.write() +} + +func (p *partition) Release(cores *idset.Set[hw.CoreID]) error { + p.lock.Lock() + defer p.lock.Unlock() + + p.reserve.RemoveSet(cores) + p.share.InsertSet(cores) + + return p.write() +} + +func (p *partition) write() error { + shareStr := p.share.String() + if err := os.WriteFile(p.sharePath, []byte(shareStr), 0644); err != nil { + return err + } + reserveStr := p.reserve.String() + if err := os.WriteFile(p.reservePath, []byte(reserveStr), 0644); err != nil { + return err + } + return nil +} diff --git a/client/lib/cgroupslib/partition_noop.go b/client/lib/cgroupslib/partition_noop.go new file mode 100644 index 00000000000..6f0dced98f9 --- /dev/null +++ b/client/lib/cgroupslib/partition_noop.go @@ -0,0 +1,25 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package cgroupslib + +import ( + "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" +) + +func NoopPartition() Partition { + return new(noop) +} + +type noop struct{} + +func (p *noop) Reserve(*idset.Set[hw.CoreID]) error { + return nil +} + +func (p *noop) Release(*idset.Set[hw.CoreID]) error { + return nil +} + +func (p *noop) Restore(*idset.Set[hw.CoreID]) {} diff --git a/client/lib/cgroupslib/partition_test.go b/client/lib/cgroupslib/partition_test.go new file mode 100644 index 00000000000..601ff6a2190 --- /dev/null +++ b/client/lib/cgroupslib/partition_test.go @@ -0,0 +1,96 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +//go:build linux + +package cgroupslib + +import ( + "path/filepath" + "testing" + + "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" + "github.com/shoenig/test/must" +) + +// testPartition creates a fresh partition configured with cores 10-20. +func testPartition(t *testing.T) *partition { + dir := t.TempDir() + shareFile := filepath.Join(dir, "share.cpus") + reserveFile := filepath.Join(dir, "reserve.cpus") + return &partition{ + sharePath: shareFile, + reservePath: reserveFile, + share: idset.From[hw.CoreID]([]hw.CoreID{10, 11, 12, 13, 14, 15, 16, 17, 18, 19}), + reserve: idset.Empty[hw.CoreID](), + } +} + +func coreset(ids ...hw.CoreID) *idset.Set[hw.CoreID] { + return idset.From[hw.CoreID](ids) +} + +func TestPartition_Restore(t *testing.T) { + p := testPartition(t) + + must.NotEmpty(t, p.share) + must.Empty(t, p.reserve) + + p.Restore(coreset(11, 13)) + p.Restore(coreset(15, 16, 17)) + p.Restore(coreset(10, 19)) + + expShare := idset.From[hw.CoreID]([]hw.CoreID{12, 14, 18}) + expReserve := idset.From[hw.CoreID]([]hw.CoreID{11, 13, 15, 16, 17, 10, 19}) + + must.Eq(t, expShare, p.share) + must.Eq(t, expReserve, p.reserve) + + // restore does not write to the cgroup interface + must.FileNotExists(t, p.sharePath) + must.FileNotExists(t, p.reservePath) +} + +func TestPartition_Reserve(t *testing.T) { + p := testPartition(t) + + p.Reserve(coreset(10, 15, 19)) + p.Reserve(coreset(12, 13)) + + expShare := idset.From[hw.CoreID]([]hw.CoreID{11, 14, 16, 17, 18}) + expReserve := idset.From[hw.CoreID]([]hw.CoreID{10, 12, 13, 15, 19}) + + must.Eq(t, expShare, p.share) + must.Eq(t, expReserve, p.reserve) + + must.FileContains(t, p.sharePath, "11,14,16-18") + must.FileContains(t, p.reservePath, "10,12-13,15,19") +} + +func TestPartition_Release(t *testing.T) { + p := testPartition(t) + + // some reservations + p.Reserve(coreset(10, 15, 19)) + p.Reserve(coreset(12, 13)) + p.Reserve(coreset(11, 18)) + + must.FileContains(t, p.sharePath, "14,16-17") + must.FileContains(t, p.reservePath, "10-13,15,18-19") + + // release 1 + p.Release(coreset(12, 13)) + must.FileContains(t, p.sharePath, "12-14,16-17") + must.FileContains(t, p.reservePath, "10-11,15,18-19") + + // release 2 + p.Release(coreset(10, 15, 19)) + must.FileContains(t, p.sharePath, "10,12-17,19") + must.FileContains(t, p.reservePath, "11,18") + + // release 3 + p.Release(coreset(11, 18)) + must.FileContains(t, p.sharePath, "10-19") + must.FileContains(t, p.reservePath, "") +} diff --git a/client/lib/cgroupslib/testing.go b/client/lib/cgroupslib/testing.go new file mode 100644 index 00000000000..fd347d6d88c --- /dev/null +++ b/client/lib/cgroupslib/testing.go @@ -0,0 +1,53 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package cgroupslib + +import ( + "sync" + + "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" +) + +// MockPartition creates an in-memory Partition manager backed by 8 fake cpu cores. +func MockPartition() Partition { + return &mock{ + share: idset.From[hw.CoreID]([]hw.CoreID{0, 1, 2, 3, 4, 5, 6, 7}), + reserve: idset.Empty[hw.CoreID](), + } +} + +type mock struct { + lock sync.Mutex + share *idset.Set[hw.CoreID] + reserve *idset.Set[hw.CoreID] +} + +func (m *mock) Restore(cores *idset.Set[hw.CoreID]) { + m.lock.Lock() + defer m.lock.Unlock() + + m.share.RemoveSet(cores) + m.reserve.InsertSet(cores) +} + +func (m *mock) Reserve(cores *idset.Set[hw.CoreID]) error { + m.lock.Lock() + defer m.lock.Unlock() + + m.reserve.RemoveSet(cores) + m.share.InsertSet(cores) + + return nil +} + +func (m *mock) Release(cores *idset.Set[hw.CoreID]) error { + m.lock.Lock() + defer m.lock.Unlock() + + m.reserve.RemoveSet(cores) + m.share.InsertSet(cores) + + return nil +} diff --git a/client/lib/idset/idset.go b/client/lib/idset/idset.go index 73bf3da26ef..6a99b46e35e 100644 --- a/client/lib/idset/idset.go +++ b/client/lib/idset/idset.go @@ -17,6 +17,8 @@ import ( // An ID is representative of a non-negative identifier of something like // a CPU core ID, a NUMA node ID, etc. +// +// See the hwids package for typical use cases. type ID interface { ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~uint } @@ -37,6 +39,11 @@ func Empty[T ID]() *Set[T] { } } +// Copy creates a deep copy of s. +func (s *Set[T]) Copy() *Set[T] { + return &Set[T]{items: s.items.Copy()} +} + var ( numberRe = regexp.MustCompile(`^\d+$`) spanRe = regexp.MustCompile(`^(\d+)-(\d+)$`) @@ -108,10 +115,24 @@ func (s *Set[T]) Slice() []T { return items } +// InsertSet inserts all items of other into s. +func (s *Set[T]) InsertSet(other *Set[T]) { + s.items.InsertSet(other.items) +} + +// RemoveSet removes all items of other from s. +func (s *Set[T]) RemoveSet(other *Set[T]) { + s.items.RemoveSet(other.items) +} + // String creates a well-formed cpuset string representation of the Set. func (s *Set[T]) String() string { if s.items.Empty() { - return "" + // cgroups notation uses a space (or newline) to indicate + // "empty"; and this value is written to cgroups interface + // files + const empty = " " + return empty } var parts []string diff --git a/client/lib/numalib/detect.go b/client/lib/numalib/detect.go index cc87de72d6e..ed99f157a12 100644 --- a/client/lib/numalib/detect.go +++ b/client/lib/numalib/detect.go @@ -5,6 +5,7 @@ package numalib import ( "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" ) // A SystemScanner represents one methodology of detecting CPU hardware on a @@ -35,7 +36,7 @@ type ConfigScanner struct { // Only meaningful on Linux, this value can be used to override the set of // CPU core IDs we may make use of. Normally these are detected by reading // Nomad parent cgroup cpuset interface file. - ReservableCores *idset.Set[CoreID] + ReservableCores *idset.Set[hw.CoreID] // TotalCompute comes from client.cpu_total_compute. // @@ -49,7 +50,7 @@ type ConfigScanner struct { // ReservedCores comes from client.reserved.cores. // // Used to withhold a set of cores from being used by Nomad for scheduling. - ReservedCores *idset.Set[CoreID] + ReservedCores *idset.Set[hw.CoreID] // ReservedCompute comes from client.reserved.cpu. // diff --git a/client/lib/numalib/detect_darwin.go b/client/lib/numalib/detect_darwin.go index 0977b45a6ec..40c1976b15d 100644 --- a/client/lib/numalib/detect_darwin.go +++ b/client/lib/numalib/detect_darwin.go @@ -7,6 +7,7 @@ package numalib import ( "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" "github.com/shoenig/go-m1cpu" "golang.org/x/sys/unix" ) @@ -19,8 +20,8 @@ func PlatformScanners() []SystemScanner { } const ( - nodeID = NodeID(0) - socketID = SocketID(0) + nodeID = hw.NodeID(0) + socketID = hw.SocketID(0) maxSpeed = KHz(0) ) @@ -29,7 +30,7 @@ type MacOS struct{} func (m *MacOS) ScanSystem(top *Topology) { // all apple hardware is non-numa; just assume as much - top.NodeIDs = idset.Empty[NodeID]() + top.NodeIDs = idset.Empty[hw.NodeID]() top.NodeIDs.Insert(nodeID) // arch specific detection @@ -49,7 +50,7 @@ func (m *MacOS) scanAppleSilicon(top *Topology) { eCoreSpeed := KHz(m1cpu.ECoreHz() / 1000) top.Cores = make([]Core, pCoreCount+eCoreCount) - nthCore := CoreID(0) + nthCore := hw.CoreID(0) for i := 0; i < pCoreCount; i++ { top.insert(nodeID, socketID, nthCore, performance, maxSpeed, pCoreSpeed) @@ -69,6 +70,6 @@ func (m *MacOS) scanLegacyX86(top *Topology) { top.Cores = make([]Core, coreCount) for i := 0; i < int(coreCount); i++ { - top.insert(nodeID, socketID, CoreID(i), performance, maxSpeed, coreSpeed) + top.insert(nodeID, socketID, hw.CoreID(i), performance, maxSpeed, coreSpeed) } } diff --git a/client/lib/numalib/detect_default.go b/client/lib/numalib/detect_default.go index b603a64bf43..dc4fca8cbd1 100644 --- a/client/lib/numalib/detect_default.go +++ b/client/lib/numalib/detect_default.go @@ -10,6 +10,7 @@ import ( "time" "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" "github.com/shirou/gopsutil/v3/cpu" ) @@ -22,8 +23,8 @@ func PlatformScanners() []SystemScanner { } const ( - nodeID = NodeID(0) - socketID = SocketID(0) + nodeID = hw.NodeID(0) + socketID = hw.SocketID(0) maxSpeed = KHz(0) ) @@ -34,7 +35,7 @@ type Generic struct{} func (g *Generic) ScanSystem(top *Topology) { // hardware may or may not be NUMA, but for now we only // detect such topology on linux systems - top.NodeIDs = idset.Empty[NodeID]() + top.NodeIDs = idset.Empty[hw.NodeID]() top.NodeIDs.Insert(nodeID) // cores @@ -55,6 +56,6 @@ func (g *Generic) ScanSystem(top *Topology) { for i := 0; i < count; i++ { info := infos[0] speed := KHz(MHz(info.Mhz) * 1000) - top.insert(nodeID, socketID, CoreID(i), performance, maxSpeed, speed) + top.insert(nodeID, socketID, hw.CoreID(i), performance, maxSpeed, speed) } } diff --git a/client/lib/numalib/detect_linux.go b/client/lib/numalib/detect_linux.go index 8b3e8371211..fbb966a789d 100644 --- a/client/lib/numalib/detect_linux.go +++ b/client/lib/numalib/detect_linux.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/nomad/client/lib/cgroupslib" "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" ) // PlatformScanners returns the set of SystemScanner for Linux. @@ -59,7 +60,7 @@ func (*Sysfs) available() bool { } func (*Sysfs) discoverOnline(st *Topology) { - ids, err := getIDSet[NodeID](nodeOnline) + ids, err := getIDSet[hw.NodeID](nodeOnline) if err == nil { st.NodeIDs = ids } @@ -72,7 +73,7 @@ func (*Sysfs) discoverCosts(st *Topology) { st.Distances[i] = make([]Cost, dimension) } - _ = st.NodeIDs.ForEach(func(id NodeID) error { + _ = st.NodeIDs.ForEach(func(id hw.NodeID) error { s, err := getString(distanceFile, id) if err != nil { return err @@ -87,25 +88,25 @@ func (*Sysfs) discoverCosts(st *Topology) { } func (*Sysfs) discoverCores(st *Topology) { - onlineCores, err := getIDSet[CoreID](cpuOnline) + onlineCores, err := getIDSet[hw.CoreID](cpuOnline) if err != nil { return } st.Cores = make([]Core, onlineCores.Size()) - _ = st.NodeIDs.ForEach(func(node NodeID) error { + _ = st.NodeIDs.ForEach(func(node hw.NodeID) error { s, err := os.ReadFile(fmt.Sprintf(cpulistFile, node)) if err != nil { return err } - cores := idset.Parse[CoreID](string(s)) - _ = cores.ForEach(func(core CoreID) error { + cores := idset.Parse[hw.CoreID](string(s)) + _ = cores.ForEach(func(core hw.CoreID) error { // best effort, zero values are defaults - socket, _ := getNumeric[SocketID](cpuSocketFile, core) + socket, _ := getNumeric[hw.SocketID](cpuSocketFile, core) max, _ := getNumeric[KHz](cpuMaxFile, core) base, _ := getNumeric[KHz](cpuBaseFile, core) - siblings, _ := getIDSet[CoreID](cpuSiblingFile, core) + siblings, _ := getIDSet[hw.CoreID](cpuSiblingFile, core) st.insert(node, socket, core, gradeOf(siblings), max, base) return nil }) @@ -182,7 +183,7 @@ func (s *Cgroups2) ScanSystem(top *Topology) { // combine scanCgroups func scanIDs(top *Topology, content string) { - ids := idset.Parse[CoreID](content) + ids := idset.Parse[hw.CoreID](content) for _, cpu := range top.Cores { if !ids.Contains(cpu.ID) { cpu.Disable = true diff --git a/client/lib/numalib/hw/ids.go b/client/lib/numalib/hw/ids.go new file mode 100644 index 00000000000..811df0e4b28 --- /dev/null +++ b/client/lib/numalib/hw/ids.go @@ -0,0 +1,20 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +// Package hw provides types for identifying hardware. +// +// This is a separate "leaf" package that is easy to import from many other +// packages without creating circular imports. +package hw + +type ( + // A NodeID represents a NUMA node. There could be more than + // one NUMA node per socket. + NodeID uint8 + + // A SocketID represents a physicsl CPU socket. + SocketID uint8 + + // A CoreID represents one logical (vCPU) core. + CoreID uint16 +) diff --git a/client/lib/numalib/topology.go b/client/lib/numalib/topology.go index 2196ef326bf..99085817a29 100644 --- a/client/lib/numalib/topology.go +++ b/client/lib/numalib/topology.go @@ -14,6 +14,7 @@ import ( "strings" "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" ) // CoreGrade describes whether a specific core is a performance or efficiency @@ -26,7 +27,7 @@ const ( efficiency CoreGrade = false ) -func gradeOf(siblings *idset.Set[CoreID]) CoreGrade { +func gradeOf(siblings *idset.Set[hw.CoreID]) CoreGrade { switch siblings.Size() { case 0, 1: return efficiency @@ -45,13 +46,10 @@ func (g CoreGrade) String() string { } type ( - NodeID uint8 - SocketID uint8 - CoreID uint16 - KHz uint64 - MHz uint64 - GHz float64 - Cost uint8 + KHz uint64 + MHz uint64 + GHz float64 + Cost uint8 ) func (khz KHz) MHz() MHz { @@ -67,9 +65,9 @@ func (khz KHz) String() string { // The JSON encoding is not used yet but my be part of the gRPC plumbing // in the future. type Topology struct { - NodeIDs *idset.Set[NodeID] `json:"node_ids"` - Distances SLIT `json:"distances"` - Cores []Core `json:"cores"` + NodeIDs *idset.Set[hw.NodeID] `json:"node_ids"` + Distances SLIT `json:"distances"` + Cores []Core `json:"cores"` // explicit overrides from client configuration OverrideTotalCompute MHz `json:"override_total_compute"` @@ -79,14 +77,14 @@ type Topology struct { // A Core represents one logical (vCPU) core on a processor. Basically the slice // of cores detected should match up with the vCPU description in cloud providers. type Core struct { - NodeID NodeID `json:"node_id"` - SocketID SocketID `json:"socket_id"` - ID CoreID `json:"id"` - Grade CoreGrade `json:"grade"` - Disable bool `json:"disable"` // indicates whether Nomad must not use this core - BaseSpeed MHz `json:"base_speed"` // cpuinfo_base_freq (primary choice) - MaxSpeed MHz `json:"max_speed"` // cpuinfo_max_freq (second choice) - GuessSpeed MHz `json:"guess_speed"` // best effort (fallback) + NodeID hw.NodeID `json:"node_id"` + SocketID hw.SocketID `json:"socket_id"` + ID hw.CoreID `json:"id"` + Grade CoreGrade `json:"grade"` + Disable bool `json:"disable"` // indicates whether Nomad must not use this core + BaseSpeed MHz `json:"base_speed"` // cpuinfo_base_freq (primary choice) + MaxSpeed MHz `json:"max_speed"` // cpuinfo_max_freq (second choice) + GuessSpeed MHz `json:"guess_speed"` // best effort (fallback) } func (c Core) String() string { @@ -110,7 +108,7 @@ func (c Core) MHz() MHz { // accessing memory across each combination of NUMA boundary. type SLIT [][]Cost -func (d SLIT) cost(a, b NodeID) Cost { +func (d SLIT) cost(a, b hw.NodeID) Cost { return d[a][b] } @@ -126,7 +124,7 @@ func (st *Topology) SupportsNUMA() bool { } // Nodes returns the set of NUMA Node IDs. -func (st *Topology) Nodes() *idset.Set[NodeID] { +func (st *Topology) Nodes() *idset.Set[hw.NodeID] { if !st.SupportsNUMA() { return nil } @@ -134,8 +132,8 @@ func (st *Topology) Nodes() *idset.Set[NodeID] { } // NodeCores returns the set of Core IDs for the given NUMA Node ID. -func (st *Topology) NodeCores(node NodeID) *idset.Set[CoreID] { - result := idset.Empty[CoreID]() +func (st *Topology) NodeCores(node hw.NodeID) *idset.Set[hw.CoreID] { + result := idset.Empty[hw.CoreID]() for _, cpu := range st.Cores { if cpu.NodeID == node { result.Insert(cpu.ID) @@ -144,7 +142,7 @@ func (st *Topology) NodeCores(node NodeID) *idset.Set[CoreID] { return result } -func (st *Topology) insert(node NodeID, socket SocketID, core CoreID, grade CoreGrade, max, base KHz) { +func (st *Topology) insert(node hw.NodeID, socket hw.SocketID, core hw.CoreID, grade CoreGrade, max, base KHz) { st.Cores[core] = Core{ NodeID: node, SocketID: socket, @@ -226,8 +224,8 @@ func (st *Topology) NumECores() int { // UsableCores returns the number of logical cores usable by the Nomad client // for running tasks. Nomad must subtract off any reserved cores (reserved.cores) // and/or must mask the cpuset to the one set in config (config.reservable_cores). -func (st *Topology) UsableCores() *idset.Set[CoreID] { - result := idset.Empty[CoreID]() +func (st *Topology) UsableCores() *idset.Set[hw.CoreID] { + result := idset.Empty[hw.CoreID]() for _, cpu := range st.Cores { if !cpu.Disable { result.Insert(cpu.ID) diff --git a/client/lib/proclib/config.go b/client/lib/proclib/config.go index fe1accb8ad6..2d9ce83bca7 100644 --- a/client/lib/proclib/config.go +++ b/client/lib/proclib/config.go @@ -5,6 +5,8 @@ package proclib import ( "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" ) // Configs is used to pass along values from client configuration that are @@ -12,4 +14,8 @@ import ( // was set in agent configuration. type Configs struct { Logger hclog.Logger + + // UsableCores is the actual set of cpu cores Nomad is able and + // allowed to use. + UsableCores *idset.Set[hw.CoreID] } diff --git a/client/lib/proclib/testing.go b/client/lib/proclib/testing.go new file mode 100644 index 00000000000..77114546495 --- /dev/null +++ b/client/lib/proclib/testing.go @@ -0,0 +1,38 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package proclib + +import ( + "github.com/hashicorp/nomad/helper/testlog" + testing "github.com/mitchellh/go-testing-interface" +) + +func MockWranglers(t testing.T) *Wranglers { + return &Wranglers{ + configs: &Configs{ + Logger: testlog.HCLogger(t), + }, + m: make(map[Task]ProcessWrangler), + create: mocks, + } +} + +func mocks(Task) ProcessWrangler { + return new(mock) +} + +type mock struct { +} + +func (m *mock) Initialize() error { + return nil +} + +func (m *mock) Kill() error { + return nil +} + +func (m *mock) Cleanup() error { + return nil +} diff --git a/client/lib/proclib/wrangler.go b/client/lib/proclib/wrangler.go index b59b3ce849c..c73a03d9cc6 100644 --- a/client/lib/proclib/wrangler.go +++ b/client/lib/proclib/wrangler.go @@ -9,10 +9,12 @@ import ( ) // Task records the unique coordinates of a task from the perspective of a Nomad -// client running the task, that is to say (alloc_id, task_name). +// client running the task, that is to say (alloc_id, task_name). Also indicates +// whether the task is making use of reserved cpu cores. type Task struct { AllocID string Task string + Cores bool } func (task Task) String() string { diff --git a/client/lib/proclib/wrangler_cg1_linux.go b/client/lib/proclib/wrangler_cg1_linux.go index cb8fe9f45ce..a8654a6337d 100644 --- a/client/lib/proclib/wrangler_cg1_linux.go +++ b/client/lib/proclib/wrangler_cg1_linux.go @@ -25,12 +25,12 @@ type LinuxWranglerCG1 struct { func newCG1(c *Configs) create { logger := c.Logger.Named("cg1") - cgroupslib.Init(logger) + cgroupslib.Init(logger, c.UsableCores.String()) return func(task Task) ProcessWrangler { return &LinuxWranglerCG1{ task: task, log: logger, - cg: cgroupslib.Factory(task.AllocID, task.Task), + cg: cgroupslib.Factory(task.AllocID, task.Task, task.Cores), } } } diff --git a/client/lib/proclib/wrangler_cg2_linux.go b/client/lib/proclib/wrangler_cg2_linux.go index 9c2946f2b25..8dcbf3dfe5c 100644 --- a/client/lib/proclib/wrangler_cg2_linux.go +++ b/client/lib/proclib/wrangler_cg2_linux.go @@ -22,12 +22,12 @@ type LinuxWranglerCG2 struct { func newCG2(c *Configs) create { logger := c.Logger.Named("cg2") - cgroupslib.Init(logger) + cgroupslib.Init(logger, c.UsableCores.String()) return func(task Task) ProcessWrangler { return &LinuxWranglerCG2{ task: task, log: c.Logger, - cg: cgroupslib.Factory(task.AllocID, task.Task), + cg: cgroupslib.Factory(task.AllocID, task.Task, task.Cores), } } } diff --git a/client/state/upgrade_int_test.go b/client/state/upgrade_int_test.go index f055ca06bbf..48f72ba0f53 100644 --- a/client/state/upgrade_int_test.go +++ b/client/state/upgrade_int_test.go @@ -20,6 +20,7 @@ import ( clientconfig "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/devicemanager" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" + "github.com/hashicorp/nomad/client/lib/cgroupslib" "github.com/hashicorp/nomad/client/lib/proclib" "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" regMock "github.com/hashicorp/nomad/client/serviceregistration/mock" @@ -214,7 +215,8 @@ func checkUpgradedAlloc(t *testing.T, path string, db StateDB, alloc *structs.Al PrevAllocMigrator: allocwatcher.NoopPrevAlloc{}, DeviceManager: devicemanager.NoopMockManager(), DriverManager: drivermanager.TestDriverManager(t), - Wranglers: proclib.New(&proclib.Configs{Logger: testlog.HCLogger(t)}), + Wranglers: proclib.MockWranglers(t), + Partitions: cgroupslib.MockPartition(), } ar, err := allocrunner.NewAllocRunner(conf) require.NoError(t, err) diff --git a/command/agent/agent.go b/command/agent/agent.go index 18c0c3092b2..53195f6ca64 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -23,15 +23,17 @@ import ( uuidparse "github.com/hashicorp/go-uuid" "github.com/hashicorp/nomad/client" clientconfig "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/command/agent/event" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/bufconndialer" "github.com/hashicorp/nomad/helper/escapingfs" "github.com/hashicorp/nomad/helper/pluginutils/loader" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/uuid" - "github.com/hashicorp/nomad/lib/cpuset" "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/deploymentwatcher" "github.com/hashicorp/nomad/nomad/structs" @@ -791,12 +793,26 @@ func convertClientConfig(agentConfig *Config) (*clientconfig.Config, error) { res.Memory.MemoryMB = int64(agentConfig.Client.Reserved.MemoryMB) res.Disk.DiskMB = int64(agentConfig.Client.Reserved.DiskMB) res.Networks.ReservedHostPorts = agentConfig.Client.Reserved.ReservedPorts - if agentConfig.Client.Reserved.Cores != "" { - cores, err := cpuset.Parse(agentConfig.Client.Reserved.Cores) - if err != nil { - return nil, fmt.Errorf("failed to parse client > reserved > cores value %q: %v", agentConfig.Client.Reserved.Cores, err) - } - res.Cpu.ReservedCpuCores = cores.ToSlice() + + // Operators may set one of + // + // - config.reservable_cores (highest precedence) for specifying which cpu cores + // nomad tasks may run on + // + // - config.reserved.cores (lowest precedence) for specifying which cpu cores + // nomad tasks may NOT run on + // + // In either case we will compute the partitioning and have it enforced by + // cgroups (on linux). In -dev mode we let nomad use 2 cores. + if agentConfig.Client.ReservableCores != "" { + cores := idset.Parse[hw.CoreID](agentConfig.Client.ReservableCores) + conf.ReservableCores = cores.Slice() + } else if agentConfig.Client.Reserved.Cores != "" { + cores := idset.Parse[hw.CoreID](agentConfig.Client.Reserved.Cores) + res.Cpu.ReservedCpuCores = helper.ConvertSlice( + cores.Slice(), + func(id hw.CoreID) uint16 { return uint16(id) }, + ) } conf.Version = agentConfig.Version diff --git a/command/agent/config.go b/command/agent/config.go index ec7863edcd5..d22d732a362 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -261,7 +261,7 @@ type ClientConfig struct { DiskFreeMB int `hcl:"disk_free_mb"` // ReservableCores is used to override detected reservable cpu cores. - ReserveableCores string `hcl:"reservable_cores"` + ReservableCores string `hcl:"reservable_cores"` // MaxKillTimeout allows capping the user-specifiable KillTimeout. MaxKillTimeout string `hcl:"max_kill_timeout"` @@ -1271,10 +1271,10 @@ func DevConfig(mode *devModeConfig) *Config { conf.Client.Options[fingerprint.TightenNetworkTimeoutsConfig] = "true" conf.Client.BindWildcardDefaultHostNetwork = true conf.Client.NomadServiceDiscovery = pointer.Of(true) + conf.Client.ReservableCores = "" // inherit all the cores conf.Telemetry.PrometheusMetrics = true conf.Telemetry.PublishAllocationMetrics = true conf.Telemetry.PublishNodeMetrics = true - return conf } @@ -2161,8 +2161,8 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig { } else if b.Reserved != nil { result.Reserved = result.Reserved.Merge(b.Reserved) } - if b.ReserveableCores != "" { - result.ReserveableCores = b.ReserveableCores + if b.ReservableCores != "" { + result.ReservableCores = b.ReservableCores } if b.GCInterval != 0 { result.GCInterval = b.GCInterval diff --git a/command/agent/consul/int_test.go b/command/agent/consul/int_test.go index 10a984fe784..27e5d945f88 100644 --- a/command/agent/consul/int_test.go +++ b/command/agent/consul/int_test.go @@ -170,7 +170,7 @@ func TestConsul_Integration(t *testing.T) { DriverManager: drivermanager.TestDriverManager(t), StartConditionMetCh: closedCh, ServiceRegWrapper: wrapper.NewHandlerWrapper(logger, serviceClient, regMock.NewServiceRegistrationHandler(logger)), - Wranglers: proclib.New(&proclib.Configs{Logger: testlog.HCLogger(t)}), + Wranglers: proclib.MockWranglers(t), } tr, err := taskrunner.NewTaskRunner(config) diff --git a/drivers/docker/cpuset.go b/drivers/docker/cpuset.go new file mode 100644 index 00000000000..3be2a1a9cff --- /dev/null +++ b/drivers/docker/cpuset.go @@ -0,0 +1,76 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package docker + +import ( + "os" + "path/filepath" + "time" + + "github.com/hashicorp/nomad/helper" +) + +const ( + // cpusetSyncPeriod is how often we check to see if the cpuset of a task + // needs to be updated - if there is no work to do, no action is taken + cpusetSyncPeriod = 3 * time.Second +) + +// cpuset is used to manage the cpuset.cpus interface file in the cgroup that +// docker daemon creates for the container being run by the task driver. we +// must do this hack because docker does not allow +type cpuset struct { + doneCh <-chan bool + source string + destination string + previous string + sync func(string, string) +} + +func (c *cpuset) watch() { + if c.sync == nil { + // use the real thing if we are not doing tests + c.sync = c.copyCpuset + } + + ticks, cancel := helper.NewSafeTimer(cpusetSyncPeriod) + defer cancel() + + for { + select { + case <-c.doneCh: + return + case <-ticks.C: + c.sync(c.source, c.destination) + ticks.Reset(cpusetSyncPeriod) + } + } +} + +func (c *cpuset) copyCpuset(source, destination string) { + source = filepath.Join(source, "cpuset.cpus.effective") + destination = filepath.Join(destination, "cpuset.cpus") + + // read the current value of usable cores + b, err := os.ReadFile(source) + if err != nil { + return + } + + // if the current value is the same as the value we wrote last, + // there is nothing to do + current := string(b) + if current == c.previous { + return + } + + // otherwise write the new value + err = os.WriteFile(destination, b, 0644) + if err != nil { + return + } + + // we wrote a new value; store that value so we do not write it again + c.previous = current +} diff --git a/drivers/docker/cpuset_test.go b/drivers/docker/cpuset_test.go new file mode 100644 index 00000000000..19cd32e09b6 --- /dev/null +++ b/drivers/docker/cpuset_test.go @@ -0,0 +1,42 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package docker + +import ( + "testing" + "time" + + "github.com/hashicorp/nomad/ci" + "github.com/shoenig/test/must" +) + +func Test_cpuset_watch(t *testing.T) { + ci.Parallel(t) + + doneCh := make(chan bool) + + source := "/source" + destination := "/destination" + hits := 0 + + callback := func(s, d string) { + must.Eq(t, source, s) + must.Eq(t, destination, d) + hits++ + } + + c := &cpuset{ + doneCh: doneCh, + source: "/source", + destination: "/destination", + previous: "", + sync: callback, + } + go c.watch() + + time.Sleep(3*time.Second + 10*time.Millisecond) + doneCh <- true + + must.Eq(t, 1, hits) +} diff --git a/drivers/docker/driver.go b/drivers/docker/driver.go index d2ac7015090..26bf62566e2 100644 --- a/drivers/docker/driver.go +++ b/drivers/docker/driver.go @@ -250,6 +250,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { logger: d.logger.With("container_id", container.ID), task: handle.Config, containerID: container.ID, + containerCgroup: container.HostConfig.Cgroup, containerImage: container.Image, doneCh: make(chan bool), waitCh: make(chan struct{}), @@ -977,12 +978,13 @@ func (d *Driver) createContainerConfig(task *drivers.TaskConfig, driverConfig *T } hostConfig := &docker.HostConfig{ - // TODO(shoenig) set cgroup parent when we do partitioning + // do not set cgroup parent anymore Memory: memory, // hard limit MemoryReservation: memoryReservation, // soft limit - CPUShares: task.Resources.LinuxResources.CPUShares, + CPUShares: task.Resources.LinuxResources.CPUShares, + CPUSetCPUs: task.Resources.LinuxResources.CpusetCpus, // Binds are used to mount a host volume into the container. We mount a // local directory for storage and a shared alloc directory that can be @@ -999,12 +1001,10 @@ func (d *Driver) createContainerConfig(task *drivers.TaskConfig, driverConfig *T GroupAdd: driverConfig.GroupAdd, } - // This translates to docker create/run --cpuset-cpus option. - // --cpuset-cpus limit the specific CPUs or cores a container can use. - // Nomad natively manages cpusets, setting this option will override - // Nomad managed cpusets. + // Setting cpuset_cpus in driver config is no longer supported (it has + // not worked correctly since Nomad 0.12) if driverConfig.CPUSetCPUs != "" { - hostConfig.CPUSetCPUs = driverConfig.CPUSetCPUs + d.logger.Warn("cpuset_cpus is no longer supported") } // Enable tini (docker-init) init system. diff --git a/drivers/docker/driver_test.go b/drivers/docker/driver_test.go index 289503e0931..b018885f9f6 100644 --- a/drivers/docker/driver_test.go +++ b/drivers/docker/driver_test.go @@ -1560,6 +1560,8 @@ func TestDockerDriver_Init(t *testing.T) { } func TestDockerDriver_CPUSetCPUs(t *testing.T) { + // The cpuset_cpus config option is ignored starting in Nomad 1.6 + ci.Parallel(t) testutil.DockerCompatible(t) testutil.CgroupsCompatible(t) @@ -1570,15 +1572,15 @@ func TestDockerDriver_CPUSetCPUs(t *testing.T) { }{ { Name: "Single CPU", - CPUSetCPUs: "0", + CPUSetCPUs: "", }, { Name: "Comma separated list of CPUs", - CPUSetCPUs: "0,1", + CPUSetCPUs: "", }, { Name: "Range of CPUs", - CPUSetCPUs: "0-1", + CPUSetCPUs: "", }, } @@ -1587,16 +1589,16 @@ func TestDockerDriver_CPUSetCPUs(t *testing.T) { task, cfg, _ := dockerTask(t) cfg.CPUSetCPUs = testCase.CPUSetCPUs - require.NoError(t, task.EncodeConcreteDriverConfig(cfg)) + must.NoError(t, task.EncodeConcreteDriverConfig(cfg)) client, d, handle, cleanup := dockerSetup(t, task, nil) defer cleanup() - require.NoError(t, d.WaitUntilStarted(task.ID, 5*time.Second)) + must.NoError(t, d.WaitUntilStarted(task.ID, 5*time.Second)) container, err := client.InspectContainer(handle.containerID) - require.NoError(t, err) + must.NoError(t, err) - require.Equal(t, cfg.CPUSetCPUs, container.HostConfig.CPUSetCPUs) + must.Eq(t, cfg.CPUSetCPUs, container.HostConfig.CPUSetCPUs) }) } } diff --git a/drivers/docker/handle.go b/drivers/docker/handle.go index 6a2b8318258..3096796c7dd 100644 --- a/drivers/docker/handle.go +++ b/drivers/docker/handle.go @@ -18,7 +18,7 @@ import ( "github.com/hashicorp/consul-template/signals" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-plugin" - + "github.com/hashicorp/nomad/client/lib/cgroupslib" "github.com/hashicorp/nomad/drivers/docker/docklog" "github.com/hashicorp/nomad/plugins/drivers" pstructs "github.com/hashicorp/nomad/plugins/shared/structs" @@ -41,6 +41,7 @@ type taskHandle struct { dloggerPluginClient *plugin.Client task *drivers.TaskConfig containerID string + containerCgroup string containerImage string doneCh chan bool waitCh chan struct{} @@ -243,9 +244,42 @@ func (h *taskHandle) shutdownLogger() { h.dloggerPluginClient.Kill() } +func (h *taskHandle) startCpusetFixer() { + if cgroupslib.GetMode() == cgroupslib.OFF { + return + } + + if h.task.Resources.LinuxResources.CpusetCpus != "" { + // nothing to fixup if the task is given static cores + return + } + + cgroup := h.containerCgroup + if cgroup == "" { + // The api does not actually set this value, so we are left to compute it ourselves. + // Luckily this is documented, + // https://docs.docker.com/config/containers/runmetrics/#find-the-cgroup-for-a-given-container + switch cgroupslib.GetMode() { + case cgroupslib.CG1: + cgroup = "/sys/fs/cgroup/cpuset/docker/" + h.containerID + default: + // systemd driver; not sure if we need to consider cgroupfs driver + cgroup = "/sys/fs/cgroup/system.slice/docker-" + h.containerID + ".scope" + } + } + + go (&cpuset{ + doneCh: h.doneCh, + source: h.task.Resources.LinuxResources.CpusetCgroupPath, + destination: cgroup, + }).watch() +} + func (h *taskHandle) run() { defer h.shutdownLogger() + h.startCpusetFixer() + exitCode, werr := h.infinityClient.WaitContainer(h.containerID) if werr != nil { h.logger.Error("failed to wait for container; already terminated") diff --git a/drivers/exec/driver_test.go b/drivers/exec/driver_test.go index a5d0a8b8385..49fc754c90d 100644 --- a/drivers/exec/driver_test.go +++ b/drivers/exec/driver_test.go @@ -57,7 +57,7 @@ func testResources(allocID, task string) *drivers.Resources { LinuxResources: &drivers.LinuxResources{ MemoryLimitBytes: 134217728, CPUShares: 100, - CpusetCgroupPath: cgroupslib.LinuxResourcesPath(allocID, task), + CpusetCgroupPath: cgroupslib.LinuxResourcesPath(allocID, task, false), }, } diff --git a/drivers/java/driver_test.go b/drivers/java/driver_test.go index 766875184ca..b2770b10d10 100644 --- a/drivers/java/driver_test.go +++ b/drivers/java/driver_test.go @@ -299,7 +299,7 @@ func basicTask(t *testing.T, name string, taskConfig *TaskConfig) *drivers.TaskC LinuxResources: &drivers.LinuxResources{ MemoryLimitBytes: 134217728, CPUShares: 100, - CpusetCgroupPath: cgroupslib.LinuxResourcesPath(allocID, name), + CpusetCgroupPath: cgroupslib.LinuxResourcesPath(allocID, name, false), }, }, } diff --git a/drivers/rawexec/driver_test.go b/drivers/rawexec/driver_test.go index 9130dee08e4..142125abe36 100644 --- a/drivers/rawexec/driver_test.go +++ b/drivers/rawexec/driver_test.go @@ -58,7 +58,7 @@ func testResources(allocID, task string) *drivers.Resources { LinuxResources: &drivers.LinuxResources{ MemoryLimitBytes: 134217728, CPUShares: 100, - CpusetCgroupPath: cgroupslib.LinuxResourcesPath(allocID, task), + CpusetCgroupPath: cgroupslib.LinuxResourcesPath(allocID, task, false), }, } diff --git a/drivers/shared/executor/executor.go b/drivers/shared/executor/executor.go index f138308c14d..59098e19d69 100644 --- a/drivers/shared/executor/executor.go +++ b/drivers/shared/executor/executor.go @@ -21,6 +21,7 @@ import ( hclog "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/lib/cgroupslib" "github.com/hashicorp/nomad/client/lib/cpustats" "github.com/hashicorp/nomad/client/lib/fifo" "github.com/hashicorp/nomad/client/lib/numalib" @@ -158,21 +159,44 @@ type ExecCommand struct { Capabilities []string } -// Cgroup returns the path to the cgroup the Nomad client is managing for the -// task that is about to be run. +// CpusetCgroup returns the path to the cgroup in which the Nomad client will +// write the PID of the task process for managing cpu core usage. // // On cgroups v1 systems this returns the path to the cpuset cgroup specifically. +// Critical: is "/reserve/" or "/share"; do not try to parse this! // -// On cgroups v2 systems this returns the patah to the task's scope. +// On cgroups v2 systems this just returns the unified cgroup. // // On non-Linux systems this returns the empty string and has no meaning. -func (c *ExecCommand) Cgroup() string { +func (c *ExecCommand) CpusetCgroup() string { if c == nil || c.Resources == nil || c.Resources.LinuxResources == nil { return "" } return c.Resources.LinuxResources.CpusetCgroupPath } +// StatsCgroup returns the path to the cgroup Nomad client will use to inspect +// for spawned process IDs. +// +// On cgroups v1 systems this returns the path to the freezer cgroup. +// +// On cgroups v2 systems this just returns the unified cgroup. +// +// On non-Linux systems this returns the empty string and has no meaning. +func (c *ExecCommand) StatsCgroup() string { + if c == nil || c.Resources == nil || c.Resources.LinuxResources == nil { + return "" + } + switch cgroupslib.GetMode() { + case cgroupslib.CG1: + taskName := filepath.Base(c.TaskDir) + allocID := filepath.Base(filepath.Dir(c.TaskDir)) + return cgroupslib.PathCG1(allocID, taskName, "freezer") + default: + return c.CpusetCgroup() + } +} + // SetWriters sets the writer for the process stdout and stderr. This should // not be used if writing to a file path such as a fifo file. SetStdoutWriter // is mainly used for unit testing purposes. @@ -364,7 +388,7 @@ func (e *UniversalExecutor) Exec(deadline time.Time, name string, args []string) ctx, cancel := context.WithDeadline(context.Background(), deadline) defer cancel() - if cleanup, err := e.setSubCmdCgroup(&e.childCmd, e.command.Cgroup()); err != nil { + if cleanup, err := e.setSubCmdCgroup(&e.childCmd, e.command.StatsCgroup()); err != nil { return nil, 0, err } else { defer cleanup() @@ -455,7 +479,7 @@ func (e *UniversalExecutor) ExecStreaming(ctx context.Context, command []string, return err } } - cgroup := e.command.Cgroup() + cgroup := e.command.StatsCgroup() if cleanup, err := e.setSubCmdCgroup(cmd, cgroup); err != nil { return err } else { diff --git a/drivers/shared/executor/executor_linux.go b/drivers/shared/executor/executor_linux.go index a509426ac53..b4e84e662b2 100644 --- a/drivers/shared/executor/executor_linux.go +++ b/drivers/shared/executor/executor_linux.go @@ -34,7 +34,7 @@ import ( "github.com/hashicorp/nomad/plugins/drivers" "github.com/opencontainers/runc/libcontainer" "github.com/opencontainers/runc/libcontainer/cgroups" - lconfigs "github.com/opencontainers/runc/libcontainer/configs" + runc "github.com/opencontainers/runc/libcontainer/configs" "github.com/opencontainers/runc/libcontainer/devices" ldevices "github.com/opencontainers/runc/libcontainer/devices" "github.com/opencontainers/runc/libcontainer/specconv" @@ -510,13 +510,13 @@ func (l *LibcontainerExecutor) handleExecWait(ch chan *waitResult, process *libc ch <- &waitResult{ps, err} } -func configureCapabilities(cfg *lconfigs.Config, command *ExecCommand) { +func configureCapabilities(cfg *runc.Config, command *ExecCommand) { switch command.User { case "root": // when running as root, use the legacy set of system capabilities, so // that we do not break existing nomad clusters using this "feature" legacyCaps := capabilities.LegacySupported().Slice(true) - cfg.Capabilities = &lconfigs.Capabilities{ + cfg.Capabilities = &runc.Capabilities{ Bounding: legacyCaps, Permitted: legacyCaps, Effective: legacyCaps, @@ -531,7 +531,7 @@ func configureCapabilities(cfg *lconfigs.Config, command *ExecCommand) { // that capabilities are Permitted and Inheritable. Setting Effective // is unnecessary, because we only need the capabilities to become // effective _after_ execve, not before. - cfg.Capabilities = &lconfigs.Capabilities{ + cfg.Capabilities = &runc.Capabilities{ Bounding: command.Capabilities, Permitted: command.Capabilities, Inheritable: command.Capabilities, @@ -540,13 +540,13 @@ func configureCapabilities(cfg *lconfigs.Config, command *ExecCommand) { } } -func configureNamespaces(pidMode, ipcMode string) lconfigs.Namespaces { - namespaces := lconfigs.Namespaces{{Type: lconfigs.NEWNS}} +func configureNamespaces(pidMode, ipcMode string) runc.Namespaces { + namespaces := runc.Namespaces{{Type: runc.NEWNS}} if pidMode == IsolationModePrivate { - namespaces = append(namespaces, lconfigs.Namespace{Type: lconfigs.NEWPID}) + namespaces = append(namespaces, runc.Namespace{Type: runc.NEWPID}) } if ipcMode == IsolationModePrivate { - namespaces = append(namespaces, lconfigs.Namespace{Type: lconfigs.NEWIPC}) + namespaces = append(namespaces, runc.Namespace{Type: runc.NEWIPC}) } return namespaces } @@ -558,7 +558,7 @@ func configureNamespaces(pidMode, ipcMode string) lconfigs.Namespaces { // * dedicated mount points namespace, but shares the PID, User, domain, network namespaces with host // * small subset of devices (e.g. stdout/stderr/stdin, tty, shm, pts); default to using the same set of devices as Docker // * some special filesystems: `/proc`, `/sys`. Some case is given to avoid exec escaping or setting malicious values through them. -func configureIsolation(cfg *lconfigs.Config, command *ExecCommand) error { +func configureIsolation(cfg *runc.Config, command *ExecCommand) error { defaultMountFlags := syscall.MS_NOEXEC | syscall.MS_NOSUID | syscall.MS_NODEV // set the new root directory for the container @@ -571,8 +571,8 @@ func configureIsolation(cfg *lconfigs.Config, command *ExecCommand) error { cfg.Namespaces = configureNamespaces(command.ModePID, command.ModeIPC) if command.NetworkIsolation != nil { - cfg.Namespaces = append(cfg.Namespaces, lconfigs.Namespace{ - Type: lconfigs.NEWNET, + cfg.Namespaces = append(cfg.Namespaces, runc.Namespace{ + Type: runc.NEWNET, Path: command.NetworkIsolation.Path, }) } @@ -597,7 +597,7 @@ func configureIsolation(cfg *lconfigs.Config, command *ExecCommand) error { cfg.Devices = append(cfg.Devices, devs...) } - cfg.Mounts = []*lconfigs.Mount{ + cfg.Mounts = []*runc.Mount{ { Source: "tmpfs", Destination: "/dev", @@ -646,20 +646,21 @@ func configureIsolation(cfg *lconfigs.Config, command *ExecCommand) error { return nil } -func (l *LibcontainerExecutor) configureCgroups(cfg *lconfigs.Config, command *ExecCommand) error { +func (l *LibcontainerExecutor) configureCgroups(cfg *runc.Config, command *ExecCommand) error { // note: an alloc TR hook pre-creates the cgroup(s) in both v1 and v2 if !command.ResourceLimits { return nil } - cg := command.Cgroup() + cg := command.StatsCgroup() if cg == "" { return errors.New("cgroup must be set") } - // set the libcontainer hook for writing the PID to cgroup.procs file - l.configureCgroupHook(cfg, command) + // // set the libcontainer hook for writing the PID to cgroup.procs file + // TODO: this can be cg1 only, right? + // l.configureCgroupHook(cfg, command) // set the libcontainer memory limits l.configureCgroupMemory(cfg, command) @@ -673,15 +674,15 @@ func (l *LibcontainerExecutor) configureCgroups(cfg *lconfigs.Config, command *E } } -func (*LibcontainerExecutor) configureCgroupHook(cfg *lconfigs.Config, command *ExecCommand) { - cfg.Hooks = lconfigs.Hooks{ - lconfigs.CreateRuntime: lconfigs.HookList{ +func (*LibcontainerExecutor) configureCgroupHook(cfg *runc.Config, command *ExecCommand) { + cfg.Hooks = runc.Hooks{ + runc.CreateRuntime: runc.HookList{ newSetCPUSetCgroupHook(command.Resources.LinuxResources.CpusetCgroupPath), }, } } -func (l *LibcontainerExecutor) configureCgroupMemory(cfg *lconfigs.Config, command *ExecCommand) { +func (l *LibcontainerExecutor) configureCgroupMemory(cfg *runc.Config, command *ExecCommand) { // Total amount of memory allowed to consume res := command.Resources.NomadResources memHard, memSoft := res.Memory.MemoryMaxMB, res.Memory.MemoryMB @@ -697,37 +698,65 @@ func (l *LibcontainerExecutor) configureCgroupMemory(cfg *lconfigs.Config, comma cfg.Cgroups.Resources.MemorySwappiness = cgroupslib.MaybeDisableMemorySwappiness() } -func (*LibcontainerExecutor) configureCG1(cfg *lconfigs.Config, command *ExecCommand, cg string) error { - // Set the v1 parent relative path (i.e. /nomad/) - scope := filepath.Base(cg) +func (l *LibcontainerExecutor) configureCG1(cfg *runc.Config, command *ExecCommand, cgroup string) error { + + cpuShares := command.Resources.LinuxResources.CPUShares + cpusetPath := command.Resources.LinuxResources.CpusetCgroupPath + cpuCores := command.Resources.LinuxResources.CpusetCpus + + // Set the v1 parent relative path (i.e. /nomad/) for the NON-cpuset cgroups + scope := filepath.Base(cgroup) cfg.Cgroups.Path = filepath.Join("/", cgroupslib.NomadCgroupParent, scope) - // set cpu.shares - res := command.Resources.NomadResources - cfg.Cgroups.CpuShares = uint64(res.Cpu.CpuShares) + // set cpu resources + cfg.Cgroups.Resources.CpuShares = uint64(cpuShares) + + // we need to manually set the cpuset, because libcontainer will not set + // it for our special cpuset cgroup + if err := l.cpusetCG1(cpusetPath, cpuCores); err != nil { + return fmt.Errorf("failed to set cpuset: %w", err) + } + + // tell libcontainer to write the pid to our special cpuset cgroup + l.configureCgroupHook(cfg, command) + return nil } -func (l *LibcontainerExecutor) configureCG2(cfg *lconfigs.Config, command *ExecCommand, cg string) error { +func (l *LibcontainerExecutor) cpusetCG1(cpusetCgroupPath, cores string) error { + if cores == "" { + return nil + } + ed := cgroupslib.OpenPath(cpusetCgroupPath) + return ed.Write("cpuset.cpus", cores) +} + +func (l *LibcontainerExecutor) configureCG2(cfg *runc.Config, command *ExecCommand, cg string) error { + cpuShares := command.Resources.LinuxResources.CPUShares + cpuCores := command.Resources.LinuxResources.CpusetCpus + // Set the v2 specific unified path - scope := filepath.Base(cg) - cfg.Cgroups.Path = filepath.Join("/", cgroupslib.NomadCgroupParent, scope) + cfg.Cgroups.Resources.CpusetCpus = cpuCores + partition := cgroupslib.GetPartitionFromCores(cpuCores) - res := command.Resources.NomadResources - cpuShares := res.Cpu.CpuShares // a cgroups v1 concept - cpuWeight := cgroups.ConvertCPUSharesToCgroupV2Value(uint64(cpuShares)) // sets cpu.weight, which the kernel also translates to cpu.weight.nice // despite what the libcontainer docs say, this sets priority not bandwidth + cpuWeight := cgroups.ConvertCPUSharesToCgroupV2Value(uint64(cpuShares)) cfg.Cgroups.Resources.CpuWeight = cpuWeight - // todo: we will also want to set cpu bandwidth (i.e. cpu_hard_limit) + // finally set the path of the cgroup in which to run the task + scope := filepath.Base(cg) + cfg.Cgroups.Path = filepath.Join("/", cgroupslib.NomadCgroupParent, partition, scope) + + // todo(shoenig): we will also want to set cpu bandwidth (i.e. cpu_hard_limit) + // hopefully for 1.7 return nil } -func (l *LibcontainerExecutor) newLibcontainerConfig(command *ExecCommand) (*lconfigs.Config, error) { - cfg := &lconfigs.Config{ - Cgroups: &lconfigs.Cgroup{ - Resources: &lconfigs.Resources{ +func (l *LibcontainerExecutor) newLibcontainerConfig(command *ExecCommand) (*runc.Config, error) { + cfg := &runc.Config{ + Cgroups: &runc.Cgroup{ + Resources: &runc.Resources{ MemorySwappiness: nil, }, }, @@ -785,12 +814,12 @@ var userMountToUnixMount = map[string]int{ } // cmdMounts converts a list of driver.MountConfigs into excutor.Mounts. -func cmdMounts(mounts []*drivers.MountConfig) []*lconfigs.Mount { +func cmdMounts(mounts []*drivers.MountConfig) []*runc.Mount { if len(mounts) == 0 { return nil } - r := make([]*lconfigs.Mount, len(mounts)) + r := make([]*runc.Mount, len(mounts)) for i, m := range mounts { flags := unix.MS_BIND @@ -798,7 +827,7 @@ func cmdMounts(mounts []*drivers.MountConfig) []*lconfigs.Mount { flags |= unix.MS_RDONLY } - r[i] = &lconfigs.Mount{ + r[i] = &runc.Mount{ Source: m.HostPath, Destination: m.TaskPath, Device: "bind", @@ -941,8 +970,8 @@ func filepathIsRegular(path string) error { return nil } -func newSetCPUSetCgroupHook(cgroupPath string) lconfigs.Hook { - return lconfigs.NewFunctionHook(func(state *specs.State) error { +func newSetCPUSetCgroupHook(cgroupPath string) runc.Hook { + return runc.NewFunctionHook(func(state *specs.State) error { return cgroups.WriteCgroupProc(cgroupPath, state.Pid) }) } diff --git a/drivers/shared/executor/executor_linux_test.go b/drivers/shared/executor/executor_linux_test.go index 8c02c1d19ad..be6f49a2529 100644 --- a/drivers/shared/executor/executor_linux_test.go +++ b/drivers/shared/executor/executor_linux_test.go @@ -78,13 +78,18 @@ func testExecutorCommandWithChroot(t *testing.T) *testExecCmd { t.Fatalf("allocDir.NewTaskDir(%q) failed: %v", task.Name, err) } td := allocDir.TaskDirs[task.Name] + cmd := &ExecCommand{ Env: taskEnv.List(), TaskDir: td.Dir, Resources: &drivers.Resources{ NomadResources: alloc.AllocatedResources.Tasks[task.Name], LinuxResources: &drivers.LinuxResources{ - CpusetCgroupPath: cgroupslib.LinuxResourcesPath(alloc.ID, task.Name), + CpusetCgroupPath: cgroupslib.LinuxResourcesPath( + alloc.ID, + task.Name, + alloc.AllocatedResources.UsesCores(), + ), }, }, } diff --git a/drivers/shared/executor/executor_test.go b/drivers/shared/executor/executor_test.go index 36891569083..f49e856e82c 100644 --- a/drivers/shared/executor/executor_test.go +++ b/drivers/shared/executor/executor_test.go @@ -91,13 +91,13 @@ func testExecutorCommand(t *testing.T) *testExecCmd { LinuxResources: &drivers.LinuxResources{ CPUShares: 500, MemoryLimitBytes: 256 * 1024 * 1024, - CpusetCgroupPath: cgroupslib.LinuxResourcesPath(alloc.ID, task.Name), + CpusetCgroupPath: cgroupslib.LinuxResourcesPath(alloc.ID, task.Name, false), }, }, } // create cgroup for our task (because we aren't using task runners) - f := cgroupslib.Factory(alloc.ID, task.Name) + f := cgroupslib.Factory(alloc.ID, task.Name, false) must.NoError(t, f.Setup()) // cleanup cgroup once test is done (because no task runners) diff --git a/drivers/shared/executor/executor_universal_linux.go b/drivers/shared/executor/executor_universal_linux.go index 9fc72bbed12..fcd43d28954 100644 --- a/drivers/shared/executor/executor_universal_linux.go +++ b/drivers/shared/executor/executor_universal_linux.go @@ -107,10 +107,10 @@ func (e *UniversalExecutor) statCG(cgroup string) (int, func(), error) { // configureResourceContainer on Linux configures the cgroups to be used to track // pids created by the executor +// +// pid: pid of the executor (i.e. ourself) func (e *UniversalExecutor) configureResourceContainer(command *ExecCommand, pid int) (func(), error) { - - // get our cgroup reference (cpuset in v1) - cgroup := command.Cgroup() + cgroup := command.StatsCgroup() // cgCleanup will be called after the task has been launched // v1: remove the executor process from the task's cgroups @@ -121,7 +121,7 @@ func (e *UniversalExecutor) configureResourceContainer(command *ExecCommand, pid switch cgroupslib.GetMode() { case cgroupslib.CG1: e.configureCG1(cgroup, command) - cgCleanup = e.enterCG1(cgroup) + cgCleanup = e.enterCG1(cgroup, command.CpusetCgroup()) default: e.configureCG2(cgroup, command) // configure child process to spawn in the cgroup @@ -144,19 +144,26 @@ func (e *UniversalExecutor) configureResourceContainer(command *ExecCommand, pid // created for the task - so that the task and its children will spawn in // those cgroups. The cleanup function moves the executor out of the task's // cgroups and into the nomad/ parent cgroups. -func (e *UniversalExecutor) enterCG1(cgroup string) func() { +func (e *UniversalExecutor) enterCG1(statsCgroup, cpusetCgroup string) func() { pid := strconv.Itoa(unix.Getpid()) - // write pid to all the groups - ifaces := []string{"freezer", "cpu", "memory"} // todo: cpuset + // write pid to all the normal interfaces + ifaces := []string{"freezer", "cpu", "memory"} for _, iface := range ifaces { - ed := cgroupslib.OpenFromCpusetCG1(cgroup, iface) + ed := cgroupslib.OpenFromFreezerCG1(statsCgroup, iface) err := ed.Write("cgroup.procs", pid) if err != nil { e.logger.Warn("failed to write cgroup", "interface", iface, "error", err) } } + // write pid to the cpuset interface, which varies between reserve/share + ed := cgroupslib.OpenPath(cpusetCgroup) + err := ed.Write("cgroup.procs", pid) + if err != nil { + e.logger.Warn("failed to write cpuset cgroup", "error", err) + } + // cleanup func that moves executor back up to nomad cgroup return func() { for _, iface := range ifaces { @@ -169,29 +176,32 @@ func (e *UniversalExecutor) enterCG1(cgroup string) func() { } func (e *UniversalExecutor) configureCG1(cgroup string, command *ExecCommand) { + // write memory limits memHard, memSoft := e.computeMemory(command) - ed := cgroupslib.OpenFromCpusetCG1(cgroup, "memory") + ed := cgroupslib.OpenFromFreezerCG1(cgroup, "memory") _ = ed.Write("memory.limit_in_bytes", strconv.FormatInt(memHard, 10)) if memSoft > 0 { - ed = cgroupslib.OpenFromCpusetCG1(cgroup, "memory") _ = ed.Write("memory.soft_limit_in_bytes", strconv.FormatInt(memSoft, 10)) } - // set memory swappiness + // write memory swappiness swappiness := cgroupslib.MaybeDisableMemorySwappiness() if swappiness != nil { - ed := cgroupslib.OpenFromCpusetCG1(cgroup, "memory") value := int64(*swappiness) _ = ed.Write("memory.swappiness", strconv.FormatInt(value, 10)) } - // write cpu shares file + // write cpu shares cpuShares := strconv.FormatInt(command.Resources.LinuxResources.CPUShares, 10) - ed = cgroupslib.OpenFromCpusetCG1(cgroup, "cpu") + ed = cgroupslib.OpenFromFreezerCG1(cgroup, "cpu") _ = ed.Write("cpu.shares", cpuShares) - // TODO(shoenig) manage cpuset - e.logger.Info("TODO CORES", "cpuset", command.Resources.LinuxResources.CpusetCpus) + // write cpuset, if set + if cpuSet := command.Resources.LinuxResources.CpusetCpus; cpuSet != "" { + cpusetPath := command.Resources.LinuxResources.CpusetCgroupPath + ed = cgroupslib.OpenPath(cpusetPath) + _ = ed.Write("cpuset.cpus", cpuSet) + } } func (e *UniversalExecutor) configureCG2(cgroup string, command *ExecCommand) { @@ -212,13 +222,14 @@ func (e *UniversalExecutor) configureCG2(cgroup string, command *ExecCommand) { _ = ed.Write("memory.swappiness", strconv.FormatInt(value, 10)) } - // write cpu cgroup files + // write cpu weight cgroup file cpuWeight := e.computeCPU(command) ed = cgroupslib.OpenPath(cgroup) _ = ed.Write("cpu.weight", strconv.FormatUint(cpuWeight, 10)) - // TODO(shoenig) manage cpuset - e.logger.Info("TODO CORES", "cpuset", command.Resources.LinuxResources.CpusetCpus) + // write cpuset cgroup file, if set + cpusetCpus := command.Resources.LinuxResources.CpusetCpus + _ = ed.Write("cpuset.cpus", cpusetCpus) } func (*UniversalExecutor) computeCPU(command *ExecCommand) uint64 { diff --git a/drivers/shared/executor/procstats/list_linux.go b/drivers/shared/executor/procstats/list_linux.go index 5570ecd1d3b..ca62529b41e 100644 --- a/drivers/shared/executor/procstats/list_linux.go +++ b/drivers/shared/executor/procstats/list_linux.go @@ -11,19 +11,12 @@ import ( ) type Cgrouper interface { - Cgroup() string + StatsCgroup() string } func List(cg Cgrouper) *set.Set[ProcessID] { - cgroup := cg.Cgroup() - var ed cgroupslib.Interface - switch cgroupslib.GetMode() { - case cgroupslib.CG1: - ed = cgroupslib.OpenFromCpusetCG1(cgroup, "freezer") - default: - ed = cgroupslib.OpenPath(cgroup) - } - + cgroup := cg.StatsCgroup() + ed := cgroupslib.OpenPath(cgroup) s, err := ed.PIDs() if err != nil { return set.New[ProcessID](0) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index b78af764a8d..aa4dbb9eb3e 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -36,6 +36,8 @@ import ( "github.com/hashicorp/go-set" "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" "github.com/hashicorp/nomad/command/agent/host" "github.com/hashicorp/nomad/command/agent/pprof" "github.com/hashicorp/nomad/helper" @@ -3782,6 +3784,17 @@ type AllocatedResources struct { Shared AllocatedSharedResources } +// UsesCores returns true if any of the tasks in the allocation make use +// of reserved cpu cores. +func (a *AllocatedResources) UsesCores() bool { + for _, taskRes := range a.Tasks { + if len(taskRes.Cpu.ReservedCores) > 0 { + return true + } + } + return false +} + func (a *AllocatedResources) Copy() *AllocatedResources { if a == nil { return nil @@ -7597,6 +7610,10 @@ type Task struct { Identities []*WorkloadIdentity } +func (t *Task) UsesCores() bool { + return t.Resources.Cores > 0 +} + // UsesConnect is for conveniently detecting if the Task is able to make use // of Consul Connect features. This will be indicated in the TaskKind of the // Task, which exports known types of Tasks. UsesConnect will be true if the @@ -10652,6 +10669,22 @@ func (a *Allocation) GetCreateIndex() uint64 { return a.CreateIndex } +// ReservedCores returns the union of reserved cores across tasks in this alloc. +func (a *Allocation) ReservedCores() *idset.Set[hw.CoreID] { + s := idset.Empty[hw.CoreID]() + if a == nil || a.AllocatedResources == nil { + return s + } + for _, taskResources := range a.AllocatedResources.Tasks { + if len(taskResources.Cpu.ReservedCores) > 0 { + for _, core := range taskResources.Cpu.ReservedCores { + s.Insert(hw.CoreID(core)) + } + } + } + return s +} + // ConsulNamespace returns the Consul namespace of the task group associated // with this allocation. func (a *Allocation) ConsulNamespace() string { diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index a0636a9cb0c..15f0ce8ace4 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -4591,6 +4591,41 @@ func TestReschedulePolicy_Validate(t *testing.T) { } } +func TestAllocation_ReservedCores(t *testing.T) { + ci.Parallel(t) + + a := &Allocation{ + AllocatedResources: &AllocatedResources{ + Tasks: map[string]*AllocatedTaskResources{ + "nil": { + Cpu: AllocatedCpuResources{ + ReservedCores: nil, + }, + }, + "empty": { + Cpu: AllocatedCpuResources{ + ReservedCores: make([]uint16, 0), + }, + }, + "one": { + Cpu: AllocatedCpuResources{ + ReservedCores: []uint16{7}, + }, + }, + "three": { + Cpu: AllocatedCpuResources{ + ReservedCores: []uint16{1, 3, 4}, + }, + }, + }, + }, + } + + result := a.ReservedCores() + must.Eq(t, "1,3-4,7", result.String()) + +} + func TestAllocation_Index(t *testing.T) { ci.Parallel(t) diff --git a/plugins/drivers/testutils/testing_linux.go b/plugins/drivers/testutils/testing_linux.go index 7ed2808583c..3ab097db74a 100644 --- a/plugins/drivers/testutils/testing_linux.go +++ b/plugins/drivers/testutils/testing_linux.go @@ -14,7 +14,7 @@ import ( // exists, since Nomad client creates them. Why do we write tests that directly // invoke task drivers without any context of the Nomad client? Who knows. func (h *DriverHarness) MakeTaskCgroup(allocID, taskName string) { - f := cgroupslib.Factory(allocID, taskName) + f := cgroupslib.Factory(allocID, taskName, false) must.NoError(h.t, f.Setup()) // ensure child procs are dead and remove the cgroup when the test is done From 5f1df275eb6c4ea21c4d31cb00b13c00a666f6ab Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Tue, 12 Sep 2023 13:41:52 +0000 Subject: [PATCH 2/2] cr: tweaks for feedback --- client/allocrunner/cpuparts_hook.go | 3 +- client/allocrunner/testing.go | 2 +- client/lib/cgroupslib/editor.go | 14 ++++---- client/lib/cgroupslib/testing.go | 53 ----------------------------- client/state/upgrade_int_test.go | 2 +- drivers/docker/cpuset.go | 3 +- drivers/docker/driver_test.go | 2 +- 7 files changed, 13 insertions(+), 66 deletions(-) delete mode 100644 client/lib/cgroupslib/testing.go diff --git a/client/allocrunner/cpuparts_hook.go b/client/allocrunner/cpuparts_hook.go index d0252ffaed0..7cac0f0f438 100644 --- a/client/allocrunner/cpuparts_hook.go +++ b/client/allocrunner/cpuparts_hook.go @@ -34,9 +34,8 @@ func newCPUPartsHook( partitions cgroupslib.Partition, alloc *structs.Allocation, ) *cpuPartsHook { - return &cpuPartsHook{ - logger: logger, + logger: logger.Named(cpuPartsHookName), allocID: alloc.ID, partitions: partitions, reservations: alloc.ReservedCores(), diff --git a/client/allocrunner/testing.go b/client/allocrunner/testing.go index 69a817a8733..615d9bb2305 100644 --- a/client/allocrunner/testing.go +++ b/client/allocrunner/testing.go @@ -97,7 +97,7 @@ func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*config.All CheckStore: checkstore.NewStore(clientConf.Logger, stateDB), Getter: getter.TestSandbox(t), Wranglers: proclib.MockWranglers(t), - Partitions: cgroupslib.MockPartition(), + Partitions: cgroupslib.NoopPartition(), } return conf, cleanup diff --git a/client/lib/cgroupslib/editor.go b/client/lib/cgroupslib/editor.go index 6d846078d83..2ebaf616b8d 100644 --- a/client/lib/cgroupslib/editor.go +++ b/client/lib/cgroupslib/editor.go @@ -91,9 +91,9 @@ func Factory(allocID, task string, cores bool) Lifecycle { switch GetMode() { case CG1: return &lifeCG1{ - allocID: allocID, - task: task, - cores: cores, + allocID: allocID, + task: task, + reservedCores: cores, } default: return &lifeCG2{ @@ -115,9 +115,9 @@ type Lifecycle interface { // -------- cgroups v1 --------- type lifeCG1 struct { - allocID string - task string - cores bool // uses core reservation + allocID string + task string + reservedCores bool // uses core reservation } func (l *lifeCG1) Setup() error { @@ -212,7 +212,7 @@ func (l *lifeCG1) paths() []string { )) } - switch partition := GetPartitionFromBool(l.cores); partition { + switch partition := GetPartitionFromBool(l.reservedCores); partition { case "reserve": paths = append(paths, filepath.Join(root, "cpuset", NomadCgroupParent, partition, scope)) case "share": diff --git a/client/lib/cgroupslib/testing.go b/client/lib/cgroupslib/testing.go deleted file mode 100644 index fd347d6d88c..00000000000 --- a/client/lib/cgroupslib/testing.go +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright (c) HashiCorp, Inc. -// SPDX-License-Identifier: BUSL-1.1 - -package cgroupslib - -import ( - "sync" - - "github.com/hashicorp/nomad/client/lib/idset" - "github.com/hashicorp/nomad/client/lib/numalib/hw" -) - -// MockPartition creates an in-memory Partition manager backed by 8 fake cpu cores. -func MockPartition() Partition { - return &mock{ - share: idset.From[hw.CoreID]([]hw.CoreID{0, 1, 2, 3, 4, 5, 6, 7}), - reserve: idset.Empty[hw.CoreID](), - } -} - -type mock struct { - lock sync.Mutex - share *idset.Set[hw.CoreID] - reserve *idset.Set[hw.CoreID] -} - -func (m *mock) Restore(cores *idset.Set[hw.CoreID]) { - m.lock.Lock() - defer m.lock.Unlock() - - m.share.RemoveSet(cores) - m.reserve.InsertSet(cores) -} - -func (m *mock) Reserve(cores *idset.Set[hw.CoreID]) error { - m.lock.Lock() - defer m.lock.Unlock() - - m.reserve.RemoveSet(cores) - m.share.InsertSet(cores) - - return nil -} - -func (m *mock) Release(cores *idset.Set[hw.CoreID]) error { - m.lock.Lock() - defer m.lock.Unlock() - - m.reserve.RemoveSet(cores) - m.share.InsertSet(cores) - - return nil -} diff --git a/client/state/upgrade_int_test.go b/client/state/upgrade_int_test.go index 48f72ba0f53..b2f1f41856e 100644 --- a/client/state/upgrade_int_test.go +++ b/client/state/upgrade_int_test.go @@ -216,7 +216,7 @@ func checkUpgradedAlloc(t *testing.T, path string, db StateDB, alloc *structs.Al DeviceManager: devicemanager.NoopMockManager(), DriverManager: drivermanager.TestDriverManager(t), Wranglers: proclib.MockWranglers(t), - Partitions: cgroupslib.MockPartition(), + Partitions: cgroupslib.NoopPartition(), } ar, err := allocrunner.NewAllocRunner(conf) require.NoError(t, err) diff --git a/drivers/docker/cpuset.go b/drivers/docker/cpuset.go index 3be2a1a9cff..7534c3364ef 100644 --- a/drivers/docker/cpuset.go +++ b/drivers/docker/cpuset.go @@ -19,7 +19,8 @@ const ( // cpuset is used to manage the cpuset.cpus interface file in the cgroup that // docker daemon creates for the container being run by the task driver. we -// must do this hack because docker does not allow +// must do this hack because docker does not allow specifying a pre-existing +// cgroup in which to run the container (i.e. one that we control). type cpuset struct { doneCh <-chan bool source string diff --git a/drivers/docker/driver_test.go b/drivers/docker/driver_test.go index b018885f9f6..6c853ab6fb7 100644 --- a/drivers/docker/driver_test.go +++ b/drivers/docker/driver_test.go @@ -1560,7 +1560,7 @@ func TestDockerDriver_Init(t *testing.T) { } func TestDockerDriver_CPUSetCPUs(t *testing.T) { - // The cpuset_cpus config option is ignored starting in Nomad 1.6 + // The cpuset_cpus config option is ignored starting in Nomad 1.7 ci.Parallel(t) testutil.DockerCompatible(t)