diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 76d695bd527..b8c838d7695 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -21,6 +21,8 @@ import ( "github.com/hashicorp/nomad/client/devicemanager" "github.com/hashicorp/nomad/client/dynamicplugins" cinterfaces "github.com/hashicorp/nomad/client/interfaces" + "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" "github.com/hashicorp/nomad/client/lib/proclib" "github.com/hashicorp/nomad/client/pluginmanager/csimanager" "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" @@ -201,6 +203,9 @@ type allocRunner struct { // wranglers is an interface for managing unix/windows processes. wranglers cinterfaces.ProcessWranglers + // partitions is an interface for managing cpuset partitions + partitions cinterfaces.CPUPartitions + // widmgr fetches workload identities widmgr *widmgr.WIDMgr } @@ -244,6 +249,7 @@ func NewAllocRunner(config *config.AllocRunnerConfig) (interfaces.AllocRunner, e checkStore: config.CheckStore, getter: config.Getter, wranglers: config.Wranglers, + partitions: config.Partitions, hookResources: cstructs.NewAllocHookResources(), widmgr: config.WIDMgr, } @@ -455,6 +461,9 @@ func (ar *allocRunner) Restore() error { // restore process wrangler for task ar.wranglers.Setup(proclib.Task{AllocID: tr.Alloc().ID, Task: tr.Task().Name}) + + // restore cpuset partition state + ar.restoreCores(tr.Alloc().AllocatedResources) } ar.taskCoordinator.Restore(states) @@ -462,6 +471,15 @@ func (ar *allocRunner) Restore() error { return nil } +// restoreCores will restore the cpuset partitions with the reserved core +// data for each task in the alloc +func (ar *allocRunner) restoreCores(res *structs.AllocatedResources) { + for _, taskRes := range res.Tasks { + s := idset.From[hw.CoreID](taskRes.Cpu.ReservedCores) + ar.partitions.Restore(s) + } +} + // persistDeploymentStatus stores AllocDeploymentStatus. func (ar *allocRunner) persistDeploymentStatus(ds *structs.AllocDeploymentStatus) { if err := ar.stateDB.PutDeploymentStatus(ar.id, ds); err != nil { diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index 82dfac520a5..76ae3710c4c 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -121,6 +121,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { newAllocDirHook(hookLogger, ar.allocDir), newUpstreamAllocsHook(hookLogger, ar.prevAllocWatcher), newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir), + newCPUPartsHook(hookLogger, ar.partitions, alloc), newAllocHealthWatcherHook(hookLogger, alloc, newEnvBuilder, hs, ar.Listener(), ar.consulClient, ar.checkStore), newNetworkHook(hookLogger, ns, alloc, nm, nc, ar, builtTaskEnv), newGroupServiceHook(groupServiceHookConfig{ diff --git a/client/allocrunner/cpuparts_hook.go b/client/allocrunner/cpuparts_hook.go new file mode 100644 index 00000000000..7cac0f0f438 --- /dev/null +++ b/client/allocrunner/cpuparts_hook.go @@ -0,0 +1,55 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package allocrunner + +import ( + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/lib/cgroupslib" + "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" + "github.com/hashicorp/nomad/nomad/structs" +) + +const ( + cpuPartsHookName = "cpuparts_hook" +) + +// cpuPartsHooks is responsible for managing cpuset partitioning on Linux +// nodes. This mechanism works by segregating tasks that make use of "cpu" vs. +// "cores" resources. Tasks that make use of "cpu" resource actually make use +// of shared cores that have not been reserved. The scheduler ensures enough +// cores on a node are not reserved such that all tasks have the minimum amount +// of cpu bandwidth they requested. +type cpuPartsHook struct { + logger hclog.Logger + allocID string + + reservations *idset.Set[hw.CoreID] + partitions cgroupslib.Partition +} + +func newCPUPartsHook( + logger hclog.Logger, + partitions cgroupslib.Partition, + alloc *structs.Allocation, +) *cpuPartsHook { + return &cpuPartsHook{ + logger: logger.Named(cpuPartsHookName), + allocID: alloc.ID, + partitions: partitions, + reservations: alloc.ReservedCores(), + } +} + +func (h *cpuPartsHook) Name() string { + return cpuPartsHookName +} + +func (h *cpuPartsHook) Prerun() error { + return h.partitions.Reserve(h.reservations) +} + +func (h *cpuPartsHook) Postrun() error { + return h.partitions.Release(h.reservations) +} diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 0d9d6e94d07..cf70f5ace0e 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -848,7 +848,8 @@ func (tr *TaskRunner) shouldRestart() (bool, time.Duration) { } func (tr *TaskRunner) assignCgroup(taskConfig *drivers.TaskConfig) { - p := cgroupslib.LinuxResourcesPath(taskConfig.AllocID, taskConfig.Name) + reserveCores := len(tr.taskResources.Cpu.ReservedCores) > 0 + p := cgroupslib.LinuxResourcesPath(taskConfig.AllocID, taskConfig.Name, reserveCores) taskConfig.Resources.LinuxResources.CpusetCgroupPath = p } diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 80107917688..4a54c3b2b50 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -72,7 +72,7 @@ func (tr *TaskRunner) initHooks() { newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, hookLogger), newDeviceHook(tr.devicemanager, hookLogger), newAPIHook(tr.shutdownCtx, tr.clientConfig.APIListenerRegistrar, hookLogger), - newWranglerHook(tr.wranglers, task.Name, alloc.ID, hookLogger), + newWranglerHook(tr.wranglers, task.Name, alloc.ID, task.UsesCores(), hookLogger), } // If the task has a CSI block, add the hook. diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 70121907ee2..1023008be9b 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -135,7 +135,7 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri ShutdownDelayCancelFn: shutdownDelayCancelFn, ServiceRegWrapper: wrapperMock, Getter: getter.TestSandbox(t), - Wranglers: proclib.New(&proclib.Configs{Logger: testlog.HCLogger(t)}), + Wranglers: proclib.MockWranglers(t), WIDMgr: NewMockWIDMgr(nil), } diff --git a/client/allocrunner/taskrunner/wrangler_hook.go b/client/allocrunner/taskrunner/wrangler_hook.go index 975839709e8..d7cec795ad4 100644 --- a/client/allocrunner/taskrunner/wrangler_hook.go +++ b/client/allocrunner/taskrunner/wrangler_hook.go @@ -26,13 +26,19 @@ type wranglerHook struct { log hclog.Logger } -func newWranglerHook(wranglers cifs.ProcessWranglers, task, allocID string, log hclog.Logger) *wranglerHook { +func newWranglerHook( + wranglers cifs.ProcessWranglers, + task, allocID string, + cores bool, + log hclog.Logger, +) *wranglerHook { return &wranglerHook{ log: log.Named(wranglerHookName), wranglers: wranglers, task: proclib.Task{ AllocID: allocID, Task: task, + Cores: cores, }, } } diff --git a/client/allocrunner/testing.go b/client/allocrunner/testing.go index bfd23a3a289..615d9bb2305 100644 --- a/client/allocrunner/testing.go +++ b/client/allocrunner/testing.go @@ -18,6 +18,7 @@ import ( clientconfig "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/devicemanager" + "github.com/hashicorp/nomad/client/lib/cgroupslib" "github.com/hashicorp/nomad/client/lib/proclib" "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" "github.com/hashicorp/nomad/client/serviceregistration/checks/checkstore" @@ -25,7 +26,6 @@ import ( "github.com/hashicorp/nomad/client/serviceregistration/wrapper" "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/vaultclient" - "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" @@ -96,7 +96,8 @@ func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*config.All ServiceRegWrapper: wrapper.NewHandlerWrapper(clientConf.Logger, consulRegMock, nomadRegMock), CheckStore: checkstore.NewStore(clientConf.Logger, stateDB), Getter: getter.TestSandbox(t), - Wranglers: proclib.New(&proclib.Configs{Logger: testlog.HCLogger(t)}), + Wranglers: proclib.MockWranglers(t), + Partitions: cgroupslib.NoopPartition(), } return conf, cleanup diff --git a/client/client.go b/client/client.go index c6e1ae5b991..950b092c1b1 100644 --- a/client/client.go +++ b/client/client.go @@ -33,6 +33,7 @@ import ( "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/client/hoststats" cinterfaces "github.com/hashicorp/nomad/client/interfaces" + "github.com/hashicorp/nomad/client/lib/cgroupslib" "github.com/hashicorp/nomad/client/lib/numalib" "github.com/hashicorp/nomad/client/lib/proclib" "github.com/hashicorp/nomad/client/pluginmanager" @@ -331,6 +332,9 @@ type Client struct { // fingerprinting topology *numalib.Topology + // partitions is used for managing cpuset partitioning on linux systems + partitions cgroupslib.Partition + // widmgr retrieves workload identities widmgr *widmgr.WIDMgr } @@ -465,9 +469,15 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie c.topology = numalib.NoImpl(ir.Topology) } + // Create the cpu core partition manager + c.partitions = cgroupslib.GetPartition( + c.topology.UsableCores(), + ) + // Create the process wranglers wranglers := proclib.New(&proclib.Configs{ - Logger: c.logger.Named("proclib"), + UsableCores: c.topology.UsableCores(), + Logger: c.logger.Named("proclib"), }) c.wranglers = wranglers @@ -2754,6 +2764,7 @@ func (c *Client) newAllocRunnerConfig( Vault: c.vaultClient, WIDMgr: c.widmgr, Wranglers: c.wranglers, + Partitions: c.partitions, } } diff --git a/client/config/arconfig.go b/client/config/arconfig.go index 3ef5fbc120e..6e882809195 100644 --- a/client/config/arconfig.go +++ b/client/config/arconfig.go @@ -110,6 +110,9 @@ type AllocRunnerConfig struct { // Wranglers is an interface for managing unix/windows processes. Wranglers interfaces.ProcessWranglers + // Partitions is an interface for managing cpuset partitions. + Partitions interfaces.CPUPartitions + // WIDMgr fetches workload identities WIDMgr *widmgr.WIDMgr } diff --git a/client/config/config.go b/client/config/config.go index 05604c148c5..5b48c9cfd96 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -18,7 +18,7 @@ import ( "github.com/hashicorp/consul-template/config" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/interfaces" - "github.com/hashicorp/nomad/client/lib/numalib" + "github.com/hashicorp/nomad/client/lib/numalib/hw" "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/command/agent/host" "github.com/hashicorp/nomad/helper" @@ -309,7 +309,7 @@ type Config struct { CgroupParent string // ReservableCores if set overrides the set of reservable cores reported in fingerprinting. - ReservableCores []numalib.CoreID + ReservableCores []hw.CoreID // NomadServiceDiscovery determines whether the Nomad native service // discovery client functionality is enabled. diff --git a/client/fingerprint/cpu.go b/client/fingerprint/cpu.go index ffc482841ec..5c9d9df3b13 100644 --- a/client/fingerprint/cpu.go +++ b/client/fingerprint/cpu.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/lib/idset" "github.com/hashicorp/nomad/client/lib/numalib" + "github.com/hashicorp/nomad/client/lib/numalib/hw" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" "github.com/klauspost/cpuid/v2" @@ -77,14 +78,14 @@ func (*CPUFingerprint) reservedCompute(request *FingerprintRequest) structs.Node func (f *CPUFingerprint) initialize(request *FingerprintRequest) { var ( - reservableCores *idset.Set[numalib.CoreID] + reservableCores *idset.Set[hw.CoreID] totalCompute = request.Config.CpuCompute reservedCompute = f.reservedCompute(request) - reservedCores = idset.From[numalib.CoreID](reservedCompute.ReservedCpuCores) + reservedCores = idset.From[hw.CoreID](reservedCompute.ReservedCpuCores) ) if rc := request.Config.ReservableCores; rc != nil { - reservableCores = idset.From[numalib.CoreID](rc) + reservableCores = idset.From[hw.CoreID](rc) } f.top = numalib.Scan(append( @@ -157,7 +158,7 @@ func (f *CPUFingerprint) setReservableCores(response *FingerprintResponse) { usable := f.top.UsableCores() response.AddAttribute("cpu.reservablecores", f.cores(usable.Size())) f.nodeResources.Cpu.ReservableCpuCores = helper.ConvertSlice( - usable.Slice(), func(id numalib.CoreID) uint16 { + usable.Slice(), func(id hw.CoreID) uint16 { return uint16(id) }) default: @@ -189,7 +190,7 @@ func (f *CPUFingerprint) setNUMA(response *FingerprintResponse) { nodes := f.top.Nodes() response.AddAttribute("numa.node.count", f.nodes(nodes.Size())) - nodes.ForEach(func(id numalib.NodeID) error { + nodes.ForEach(func(id hw.NodeID) error { key := fmt.Sprintf("numa.node%d.cores", id) cores := f.top.NodeCores(id) response.AddAttribute(key, cores.String()) diff --git a/client/fingerprint/cpu_default_test.go b/client/fingerprint/cpu_default_test.go index 301a111d2f7..e3c873814fe 100644 --- a/client/fingerprint/cpu_default_test.go +++ b/client/fingerprint/cpu_default_test.go @@ -11,7 +11,7 @@ import ( "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/client/config" - "github.com/hashicorp/nomad/client/lib/numalib" + "github.com/hashicorp/nomad/client/lib/numalib/hw" "github.com/hashicorp/nomad/client/testutil" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/structs" @@ -63,7 +63,7 @@ func TestCPUFingerprint_OverrideCompute(t *testing.T) { Attributes: make(map[string]string), } cfg := &config.Config{ - ReservableCores: []numalib.CoreID{0, 1, 2}, + ReservableCores: []hw.CoreID{0, 1, 2}, } var originalCPU int diff --git a/client/interfaces/client.go b/client/interfaces/client.go index 3373cd265ad..796c52250fa 100644 --- a/client/interfaces/client.go +++ b/client/interfaces/client.go @@ -4,6 +4,8 @@ package interfaces import ( + "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" "github.com/hashicorp/nomad/client/lib/proclib" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/device" @@ -47,3 +49,10 @@ type ProcessWranglers interface { Setup(proclib.Task) error Destroy(proclib.Task) error } + +// CPUPartitions is an interface satisfied by the cgroupslib package. +type CPUPartitions interface { + Restore(*idset.Set[hw.CoreID]) + Reserve(*idset.Set[hw.CoreID]) error + Release(*idset.Set[hw.CoreID]) error +} diff --git a/client/lib/cgroupslib/default.go b/client/lib/cgroupslib/default.go index 12376bc9592..d8685a560ea 100644 --- a/client/lib/cgroupslib/default.go +++ b/client/lib/cgroupslib/default.go @@ -6,7 +6,7 @@ package cgroupslib // LinuxResourcesPath does nothing on non-Linux systems -func LinuxResourcesPath(string, string) string { +func LinuxResourcesPath(string, string, bool) string { return "" } diff --git a/client/lib/cgroupslib/editor.go b/client/lib/cgroupslib/editor.go index 0da8f49b439..2ebaf616b8d 100644 --- a/client/lib/cgroupslib/editor.go +++ b/client/lib/cgroupslib/editor.go @@ -32,16 +32,14 @@ func OpenPath(dir string) Interface { } } -// OpenFromCpusetCG1 creates a handle for modifying cgroup interface files of -// the given interface, given a path to the cpuset interface. -// -// This is useful because a Nomad task resources struct only keeps track of -// the cpuset cgroup directory in the cgroups v1 regime, but nowadays we want -// to modify more than the cpuset in some cases. -func OpenFromCpusetCG1(dir, iface string) Interface { - return &editor{ - dpath: strings.Replace(dir, "cpuset", iface, 1), +// OpenFromFreezerCG1 creates a handle for modifying cgroup interface files +// of the given interface, given a path to the freezer cgroup. +func OpenFromFreezerCG1(orig, iface string) Interface { + if iface == "cpuset" { + panic("cannot open cpuset") } + p := strings.Replace(orig, "/freezer/", "/"+iface+"/", 1) + return OpenPath(p) } // An Interface can be used to read and write the interface files of a cgroup. @@ -89,16 +87,17 @@ func (e *editor) Write(filename, content string) error { // A Factory creates a Lifecycle which is an abstraction over the setup and // teardown routines used for creating and destroying cgroups used for // constraining Nomad tasks. -func Factory(allocID, task string) Lifecycle { +func Factory(allocID, task string, cores bool) Lifecycle { switch GetMode() { case CG1: return &lifeCG1{ - allocID: allocID, - task: task, + allocID: allocID, + task: task, + reservedCores: cores, } default: return &lifeCG2{ - dpath: pathCG2(allocID, task), + dpath: pathCG2(allocID, task, cores), } } } @@ -116,8 +115,9 @@ type Lifecycle interface { // -------- cgroups v1 --------- type lifeCG1 struct { - allocID string - task string + allocID string + task string + reservedCores bool // uses core reservation } func (l *lifeCG1) Setup() error { @@ -127,13 +127,32 @@ func (l *lifeCG1) Setup() error { if err != nil { return err } + if strings.Contains(p, "/reserve/") { + if err = l.inheritMems(p); err != nil { + return err + } + } } return nil } +func (l *lifeCG1) inheritMems(destination string) error { + parent := filepath.Join(filepath.Dir(destination), "cpuset.mems") + b, err := os.ReadFile(parent) + if err != nil { + return err + } + destination = filepath.Join(destination, "cpuset.mems") + return os.WriteFile(destination, b, 0644) +} + func (l *lifeCG1) Teardown() error { paths := l.paths() for _, p := range paths { + if filepath.Base(p) == "share" { + // avoid removing the share cgroup + continue + } err := os.RemoveAll(p) if err != nil { return err @@ -162,7 +181,7 @@ func (l *lifeCG1) Kill() error { } func (l *lifeCG1) edit(iface string) *editor { - scope := scopeCG1(l.allocID, l.task) + scope := ScopeCG1(l.allocID, l.task) return &editor{ dpath: filepath.Join(root, iface, NomadCgroupParent, scope), } @@ -184,14 +203,22 @@ func (l *lifeCG1) thaw() error { } func (l *lifeCG1) paths() []string { - scope := scopeCG1(l.allocID, l.task) - ifaces := []string{"freezer", "cpu", "memory", "cpuset"} - paths := make([]string, 0, len(ifaces)) + scope := ScopeCG1(l.allocID, l.task) + ifaces := []string{"freezer", "cpu", "memory"} + paths := make([]string, 0, len(ifaces)+1) for _, iface := range ifaces { paths = append(paths, filepath.Join( root, iface, NomadCgroupParent, scope, )) } + + switch partition := GetPartitionFromBool(l.reservedCores); partition { + case "reserve": + paths = append(paths, filepath.Join(root, "cpuset", NomadCgroupParent, partition, scope)) + case "share": + paths = append(paths, filepath.Join(root, "cpuset", NomadCgroupParent, partition)) + } + return paths } @@ -235,7 +262,7 @@ func getPIDs(file string) (*set.Set[int], error) { return result, nil } -func scopeCG1(allocID, task string) string { +func ScopeCG1(allocID, task string) string { return fmt.Sprintf("%s.%s", allocID, task) } @@ -243,6 +270,7 @@ func scopeCG2(allocID, task string) string { return fmt.Sprintf("%s.%s.scope", allocID, task) } -func pathCG2(allocID, task string) string { - return filepath.Join(root, NomadCgroupParent, scopeCG2(allocID, task)) +func pathCG2(allocID, task string, cores bool) string { + partition := GetPartitionFromBool(cores) + return filepath.Join(root, NomadCgroupParent, partition, scopeCG2(allocID, task)) } diff --git a/client/lib/cgroupslib/init.go b/client/lib/cgroupslib/init.go index 8e95f36d1f0..0da6f6f7443 100644 --- a/client/lib/cgroupslib/init.go +++ b/client/lib/cgroupslib/init.go @@ -9,15 +9,31 @@ import ( "bytes" "os" "path/filepath" - "strings" "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-set" ) -func Init(log hclog.Logger) { +// Init will initialize the cgroup tree that the Nomad client will use for +// isolating resources of tasks. cores is the cpuset granted for use by Nomad. +func Init(log hclog.Logger, cores string) { + log.Info("initializing nomad cgroups", "cores", cores) + switch GetMode() { case CG1: + // the name of the cpuset interface file + const cpusetFile = "cpuset.cpus" + + // the name of the cpuset mems interface file + const memsFile = "cpuset.mems" + + const memsSet = "0" // TODO(shoenig) get from topology + + // the value to disable inheriting values from parent cgroup + const noClone = "0" + + // the name of the clone_children interface file + const cloneFile = "cgroup.clone_children" + // create the /nomad cgroup (or whatever the name is configured to be) // for each cgroup controller we are going to use controllers := []string{"freezer", "memory", "cpu", "cpuset"} @@ -25,43 +41,151 @@ func Init(log hclog.Logger) { p := filepath.Join(root, ctrl, NomadCgroupParent) if err := os.MkdirAll(p, 0755); err != nil { log.Error("failed to create nomad cgroup", "controller", ctrl, "error", err) + return } } - case CG2: - // minimum controllers must be set first - s, err := readRootCG2("cgroup.subtree_control") - if err != nil { - log.Error("failed to create nomad cgroup", "error", err) + + // + // configure cpuset partitioning + // + // the tree is lopsided - tasks making use of reserved cpu cores get + // their own cgroup with a static cpuset.cpus value. other tasks are + // placed in the single share cgroup and share its dynamic cpuset.cpus + // value + // + // e.g., + // root/cpuset/nomad/ + // share/{cgroup.procs, cpuset.cpus, cpuset.mems} + // reserve/ + // abc123.task/{cgroup.procs, cpuset.cpus, cpuset.mems} + // def456.task/{cgroup.procs, cpuset.cpus, cpuset.mems} + + if err := writeCG(noClone, "cpuset", NomadCgroupParent, cloneFile); err != nil { + log.Error("failed to set clone_children on nomad cpuset cgroup", "error", err) return } - required := set.From([]string{"cpuset", "cpu", "io", "memory", "pids"}) - enabled := set.From(strings.Fields(s)) - needed := required.Difference(enabled) + if err := writeCG(memsSet, "cpuset", NomadCgroupParent, memsFile); err != nil { + log.Error("failed to set cpuset.mems on nomad cpuset cgroup", "error", err) + return + } - if needed.Size() == 0 { - log.Debug("top level nomad.slice cgroup already exists") - return // already setup + if err := writeCG(cores, "cpuset", NomadCgroupParent, cpusetFile); err != nil { + log.Error("failed to write cores to nomad cpuset cgroup", "error", err) + return } - sb := new(strings.Builder) - for _, controller := range needed.List() { - sb.WriteString("+" + controller + " ") + // + // share partition + // + + if err := mkCG("cpuset", NomadCgroupParent, SharePartition()); err != nil { + log.Error("failed to create share cpuset partition", "error", err) + return + } + + if err := writeCG(noClone, "cpuset", NomadCgroupParent, SharePartition(), cloneFile); err != nil { + log.Error("failed to set clone_children on nomad cpuset cgroup", "error", err) + return } - activation := strings.TrimSpace(sb.String()) - if err = writeRootCG2("cgroup.subtree_control", activation); err != nil { + if err := writeCG(memsSet, "cpuset", NomadCgroupParent, SharePartition(), memsFile); err != nil { + log.Error("failed to set cpuset.mems on share cpuset partition", "error", err) + return + } + + // + // reserve partition + // + + if err := mkCG("cpuset", NomadCgroupParent, ReservePartition()); err != nil { + log.Error("failed to create reserve cpuset partition", "error", err) + return + } + + if err := writeCG(noClone, "cpuset", NomadCgroupParent, ReservePartition(), cloneFile); err != nil { + log.Error("failed to set clone_children on nomad cpuset cgroup", "error", err) + return + } + + if err := writeCG(memsSet, "cpuset", NomadCgroupParent, ReservePartition(), memsFile); err != nil { + log.Error("failed to set cpuset.mems on reserve cpuset partition", "error", err) + return + } + + log.Debug("nomad cpuset partitions initialized", "cores", cores) + + case CG2: + // the cgroup controllers we need to activate at the root and on the nomad slice + const activation = "+cpuset +cpu +io +memory +pids" + + // the name of the cgroup subtree interface file + const subtreeFile = "cgroup.subtree_control" + + // the name of the cpuset interface file + const cpusetFile = "cpuset.cpus" + + // + // configuring root cgroup (/sys/fs/cgroup) + // + + if err := writeCG(activation, subtreeFile); err != nil { log.Error("failed to create nomad cgroup", "error", err) return } - nomadSlice := filepath.Join("/sys/fs/cgroup", NomadCgroupParent) - if err := os.MkdirAll(nomadSlice, 0755); err != nil { + // + // configuring nomad.slice + // + + if err := mkCG(NomadCgroupParent); err != nil { log.Error("failed to create nomad cgroup", "error", err) return } - log.Debug("top level nomad.slice cgroup initialized", "controllers", needed) + if err := writeCG(activation, NomadCgroupParent, subtreeFile); err != nil { + log.Error("failed to set subtree control on nomad cgroup", "error", err) + return + } + + if err := writeCG(cores, NomadCgroupParent, cpusetFile); err != nil { + log.Error("failed to write root partition cpuset", "error", err) + return + } + + log.Debug("top level partition root nomad.slice cgroup initialized") + + // + // configuring nomad.slice/share (member) + // + + if err := mkCG(NomadCgroupParent, SharePartition()); err != nil { + log.Error("failed to create share cgroup", "error", err) + return + } + + if err := writeCG(activation, NomadCgroupParent, SharePartition(), subtreeFile); err != nil { + log.Error("failed to set subtree control on cpuset share partition", "error", err) + return + } + + log.Debug("partition member nomad.slice/share cgroup initialized") + + // + // configuring nomad.slice/reserve (member) + // + + if err := mkCG(NomadCgroupParent, ReservePartition()); err != nil { + log.Error("failed to create share cgroup", "error", err) + return + } + + if err := writeCG(activation, NomadCgroupParent, ReservePartition(), subtreeFile); err != nil { + log.Error("failed to set subtree control on cpuset reserve partition", "error", err) + return + } + + log.Debug("partition member nomad.slice/reserve cgroup initialized") } } @@ -71,11 +195,26 @@ func readRootCG2(filename string) (string, error) { return string(bytes.TrimSpace(b)), err } -func writeRootCG2(filename, content string) error { - p := filepath.Join(root, filename) +// filepathCG will return the given paths based on the cgroup root +func filepathCG(paths ...string) string { + base := []string{root} + base = append(base, paths...) + p := filepath.Join(base...) + return p +} + +// writeCG will write content to the cgroup interface file given by paths +func writeCG(content string, paths ...string) error { + p := filepathCG(paths...) return os.WriteFile(p, []byte(content), 0644) } +// mkCG will create a cgroup at the given path +func mkCG(paths ...string) error { + p := filepathCG(paths...) + return os.MkdirAll(p, 0755) +} + // ReadNomadCG2 reads an interface file under the nomad.slice parent cgroup // (or whatever its name is configured to be) func ReadNomadCG2(filename string) (string, error) { @@ -97,13 +236,23 @@ func WriteNomadCG1(iface, filename, content string) error { return os.WriteFile(p, []byte(content), 0644) } +// PathCG1 returns the filepath to the cgroup directory of the given interface +// and allocID / taskName. +func PathCG1(allocID, taskName, iface string) string { + return filepath.Join(root, iface, NomadCgroupParent, ScopeCG1(allocID, taskName)) +} + // LinuxResourcesPath returns the filepath to the directory that the field // x.Resources.LinuxResources.CpusetCgroupPath is expected to hold on to -func LinuxResourcesPath(allocID, task string) string { - switch GetMode() { - case CG1: - return filepath.Join(root, "cpuset", NomadCgroupParent, scopeCG1(allocID, task)) +func LinuxResourcesPath(allocID, task string, reserveCores bool) string { + partition := GetPartitionFromBool(reserveCores) + mode := GetMode() + switch { + case mode == CG1 && reserveCores: + return filepath.Join(root, "cpuset", NomadCgroupParent, partition, ScopeCG1(allocID, task)) + case mode == CG1 && !reserveCores: + return filepath.Join(root, "cpuset", NomadCgroupParent, partition) default: - return filepath.Join(root, NomadCgroupParent, scopeCG2(allocID, task)) + return filepath.Join(root, NomadCgroupParent, partition, scopeCG2(allocID, task)) } } diff --git a/client/lib/cgroupslib/init_default.go b/client/lib/cgroupslib/init_default.go new file mode 100644 index 00000000000..d08d6b11367 --- /dev/null +++ b/client/lib/cgroupslib/init_default.go @@ -0,0 +1,11 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +//go:build !linux + +package cgroupslib + +// PathCG1 returns empty string on non-Linux systems +func PathCG1(allocID, taskName, iface string) string { + return "" +} diff --git a/client/lib/cgroupslib/mount.go b/client/lib/cgroupslib/mount.go index b000084fce8..62c2f39193b 100644 --- a/client/lib/cgroupslib/mount.go +++ b/client/lib/cgroupslib/mount.go @@ -15,6 +15,10 @@ import ( ) func detect() Mode { + if os.Geteuid() > 0 { + return OFF + } + f, err := os.Open("/proc/self/mountinfo") if err != nil { return OFF @@ -22,6 +26,7 @@ func detect() Mode { defer func() { _ = f.Close() }() + return scan(f) } diff --git a/client/lib/cgroupslib/partition.go b/client/lib/cgroupslib/partition.go new file mode 100644 index 00000000000..6ada0425336 --- /dev/null +++ b/client/lib/cgroupslib/partition.go @@ -0,0 +1,63 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package cgroupslib + +import ( + "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" +) + +// A Partition is used to track reserved vs. shared cpu cores. +type Partition interface { + Restore(*idset.Set[hw.CoreID]) + Reserve(*idset.Set[hw.CoreID]) error + Release(*idset.Set[hw.CoreID]) error +} + +// SharePartition is the name of the cgroup containing cgroups for tasks +// making use of the 'cpu' resource, which pools and shares cpu cores. +func SharePartition() string { + switch GetMode() { + case CG1: + return "share" + case CG2: + return "share.slice" + default: + return "" + } +} + +// ReservePartition is the name of the cgroup containing cgroups for tasks +// making use of the 'cores' resource, which reserves specific cpu cores. +func ReservePartition() string { + switch GetMode() { + case CG1: + return "reserve" + case CG2: + return "reserve.slice" + default: + return "" + } +} + +// GetPartitionFromCores returns the name of the cgroup that should contain +// the cgroup of the task, which is determined by inspecting the cores +// parameter, which is a non-empty string if the task is using reserved +// cores. +func GetPartitionFromCores(cores string) string { + if cores == "" { + return SharePartition() + } + return ReservePartition() +} + +// GetPartitionFromBool returns the name of the cgroup that should contain +// the cgroup of the task, which is determined from the cores parameter, +// which indicates whether the task is making use of reserved cores. +func GetPartitionFromBool(cores bool) string { + if cores { + return ReservePartition() + } + return SharePartition() +} diff --git a/client/lib/cgroupslib/partition_default.go b/client/lib/cgroupslib/partition_default.go new file mode 100644 index 00000000000..f685c671d54 --- /dev/null +++ b/client/lib/cgroupslib/partition_default.go @@ -0,0 +1,16 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +//go:build !linux + +package cgroupslib + +import ( + "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" +) + +// GetPartition creates a no-op Partition that does not do anything. +func GetPartition(*idset.Set[hw.CoreID]) Partition { + return NoopPartition() +} diff --git a/client/lib/cgroupslib/partition_linux.go b/client/lib/cgroupslib/partition_linux.go new file mode 100644 index 00000000000..ee0891fa106 --- /dev/null +++ b/client/lib/cgroupslib/partition_linux.go @@ -0,0 +1,98 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +//go:build linux + +package cgroupslib + +import ( + "os" + "path/filepath" + "sync" + + "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" +) + +// GetPartition creates a Partition suitable for managing cores on this +// Linux system. +func GetPartition(cores *idset.Set[hw.CoreID]) Partition { + return NewPartition(cores) +} + +// NewPartition creates a cpuset partition manager for managing the books +// when allocations are created and destroyed. The initial set of cores is +// the usable set of cores by Nomad. +func NewPartition(cores *idset.Set[hw.CoreID]) Partition { + var ( + sharePath string + reservePath string + ) + + switch GetMode() { + case OFF: + return NoopPartition() + case CG1: + sharePath = filepath.Join(root, "cpuset", NomadCgroupParent, SharePartition(), "cpuset.cpus") + reservePath = filepath.Join(root, "cpuset", NomadCgroupParent, ReservePartition(), "cpuset.cpus") + case CG2: + sharePath = filepath.Join(root, NomadCgroupParent, SharePartition(), "cpuset.cpus") + reservePath = filepath.Join(root, NomadCgroupParent, ReservePartition(), "cpuset.cpus") + } + + return &partition{ + sharePath: sharePath, + reservePath: reservePath, + share: cores.Copy(), + reserve: idset.Empty[hw.CoreID](), + } +} + +type partition struct { + sharePath string + reservePath string + + lock sync.Mutex + share *idset.Set[hw.CoreID] + reserve *idset.Set[hw.CoreID] +} + +func (p *partition) Restore(cores *idset.Set[hw.CoreID]) { + p.lock.Lock() + defer p.lock.Unlock() + + p.share.RemoveSet(cores) + p.reserve.InsertSet(cores) +} + +func (p *partition) Reserve(cores *idset.Set[hw.CoreID]) error { + p.lock.Lock() + defer p.lock.Unlock() + + p.share.RemoveSet(cores) + p.reserve.InsertSet(cores) + + return p.write() +} + +func (p *partition) Release(cores *idset.Set[hw.CoreID]) error { + p.lock.Lock() + defer p.lock.Unlock() + + p.reserve.RemoveSet(cores) + p.share.InsertSet(cores) + + return p.write() +} + +func (p *partition) write() error { + shareStr := p.share.String() + if err := os.WriteFile(p.sharePath, []byte(shareStr), 0644); err != nil { + return err + } + reserveStr := p.reserve.String() + if err := os.WriteFile(p.reservePath, []byte(reserveStr), 0644); err != nil { + return err + } + return nil +} diff --git a/client/lib/cgroupslib/partition_noop.go b/client/lib/cgroupslib/partition_noop.go new file mode 100644 index 00000000000..6f0dced98f9 --- /dev/null +++ b/client/lib/cgroupslib/partition_noop.go @@ -0,0 +1,25 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package cgroupslib + +import ( + "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" +) + +func NoopPartition() Partition { + return new(noop) +} + +type noop struct{} + +func (p *noop) Reserve(*idset.Set[hw.CoreID]) error { + return nil +} + +func (p *noop) Release(*idset.Set[hw.CoreID]) error { + return nil +} + +func (p *noop) Restore(*idset.Set[hw.CoreID]) {} diff --git a/client/lib/cgroupslib/partition_test.go b/client/lib/cgroupslib/partition_test.go new file mode 100644 index 00000000000..601ff6a2190 --- /dev/null +++ b/client/lib/cgroupslib/partition_test.go @@ -0,0 +1,96 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +//go:build linux + +package cgroupslib + +import ( + "path/filepath" + "testing" + + "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" + "github.com/shoenig/test/must" +) + +// testPartition creates a fresh partition configured with cores 10-20. +func testPartition(t *testing.T) *partition { + dir := t.TempDir() + shareFile := filepath.Join(dir, "share.cpus") + reserveFile := filepath.Join(dir, "reserve.cpus") + return &partition{ + sharePath: shareFile, + reservePath: reserveFile, + share: idset.From[hw.CoreID]([]hw.CoreID{10, 11, 12, 13, 14, 15, 16, 17, 18, 19}), + reserve: idset.Empty[hw.CoreID](), + } +} + +func coreset(ids ...hw.CoreID) *idset.Set[hw.CoreID] { + return idset.From[hw.CoreID](ids) +} + +func TestPartition_Restore(t *testing.T) { + p := testPartition(t) + + must.NotEmpty(t, p.share) + must.Empty(t, p.reserve) + + p.Restore(coreset(11, 13)) + p.Restore(coreset(15, 16, 17)) + p.Restore(coreset(10, 19)) + + expShare := idset.From[hw.CoreID]([]hw.CoreID{12, 14, 18}) + expReserve := idset.From[hw.CoreID]([]hw.CoreID{11, 13, 15, 16, 17, 10, 19}) + + must.Eq(t, expShare, p.share) + must.Eq(t, expReserve, p.reserve) + + // restore does not write to the cgroup interface + must.FileNotExists(t, p.sharePath) + must.FileNotExists(t, p.reservePath) +} + +func TestPartition_Reserve(t *testing.T) { + p := testPartition(t) + + p.Reserve(coreset(10, 15, 19)) + p.Reserve(coreset(12, 13)) + + expShare := idset.From[hw.CoreID]([]hw.CoreID{11, 14, 16, 17, 18}) + expReserve := idset.From[hw.CoreID]([]hw.CoreID{10, 12, 13, 15, 19}) + + must.Eq(t, expShare, p.share) + must.Eq(t, expReserve, p.reserve) + + must.FileContains(t, p.sharePath, "11,14,16-18") + must.FileContains(t, p.reservePath, "10,12-13,15,19") +} + +func TestPartition_Release(t *testing.T) { + p := testPartition(t) + + // some reservations + p.Reserve(coreset(10, 15, 19)) + p.Reserve(coreset(12, 13)) + p.Reserve(coreset(11, 18)) + + must.FileContains(t, p.sharePath, "14,16-17") + must.FileContains(t, p.reservePath, "10-13,15,18-19") + + // release 1 + p.Release(coreset(12, 13)) + must.FileContains(t, p.sharePath, "12-14,16-17") + must.FileContains(t, p.reservePath, "10-11,15,18-19") + + // release 2 + p.Release(coreset(10, 15, 19)) + must.FileContains(t, p.sharePath, "10,12-17,19") + must.FileContains(t, p.reservePath, "11,18") + + // release 3 + p.Release(coreset(11, 18)) + must.FileContains(t, p.sharePath, "10-19") + must.FileContains(t, p.reservePath, "") +} diff --git a/client/lib/idset/idset.go b/client/lib/idset/idset.go index 73bf3da26ef..6a99b46e35e 100644 --- a/client/lib/idset/idset.go +++ b/client/lib/idset/idset.go @@ -17,6 +17,8 @@ import ( // An ID is representative of a non-negative identifier of something like // a CPU core ID, a NUMA node ID, etc. +// +// See the hwids package for typical use cases. type ID interface { ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~uint } @@ -37,6 +39,11 @@ func Empty[T ID]() *Set[T] { } } +// Copy creates a deep copy of s. +func (s *Set[T]) Copy() *Set[T] { + return &Set[T]{items: s.items.Copy()} +} + var ( numberRe = regexp.MustCompile(`^\d+$`) spanRe = regexp.MustCompile(`^(\d+)-(\d+)$`) @@ -108,10 +115,24 @@ func (s *Set[T]) Slice() []T { return items } +// InsertSet inserts all items of other into s. +func (s *Set[T]) InsertSet(other *Set[T]) { + s.items.InsertSet(other.items) +} + +// RemoveSet removes all items of other from s. +func (s *Set[T]) RemoveSet(other *Set[T]) { + s.items.RemoveSet(other.items) +} + // String creates a well-formed cpuset string representation of the Set. func (s *Set[T]) String() string { if s.items.Empty() { - return "" + // cgroups notation uses a space (or newline) to indicate + // "empty"; and this value is written to cgroups interface + // files + const empty = " " + return empty } var parts []string diff --git a/client/lib/numalib/detect.go b/client/lib/numalib/detect.go index cc87de72d6e..ed99f157a12 100644 --- a/client/lib/numalib/detect.go +++ b/client/lib/numalib/detect.go @@ -5,6 +5,7 @@ package numalib import ( "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" ) // A SystemScanner represents one methodology of detecting CPU hardware on a @@ -35,7 +36,7 @@ type ConfigScanner struct { // Only meaningful on Linux, this value can be used to override the set of // CPU core IDs we may make use of. Normally these are detected by reading // Nomad parent cgroup cpuset interface file. - ReservableCores *idset.Set[CoreID] + ReservableCores *idset.Set[hw.CoreID] // TotalCompute comes from client.cpu_total_compute. // @@ -49,7 +50,7 @@ type ConfigScanner struct { // ReservedCores comes from client.reserved.cores. // // Used to withhold a set of cores from being used by Nomad for scheduling. - ReservedCores *idset.Set[CoreID] + ReservedCores *idset.Set[hw.CoreID] // ReservedCompute comes from client.reserved.cpu. // diff --git a/client/lib/numalib/detect_darwin.go b/client/lib/numalib/detect_darwin.go index 0977b45a6ec..40c1976b15d 100644 --- a/client/lib/numalib/detect_darwin.go +++ b/client/lib/numalib/detect_darwin.go @@ -7,6 +7,7 @@ package numalib import ( "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" "github.com/shoenig/go-m1cpu" "golang.org/x/sys/unix" ) @@ -19,8 +20,8 @@ func PlatformScanners() []SystemScanner { } const ( - nodeID = NodeID(0) - socketID = SocketID(0) + nodeID = hw.NodeID(0) + socketID = hw.SocketID(0) maxSpeed = KHz(0) ) @@ -29,7 +30,7 @@ type MacOS struct{} func (m *MacOS) ScanSystem(top *Topology) { // all apple hardware is non-numa; just assume as much - top.NodeIDs = idset.Empty[NodeID]() + top.NodeIDs = idset.Empty[hw.NodeID]() top.NodeIDs.Insert(nodeID) // arch specific detection @@ -49,7 +50,7 @@ func (m *MacOS) scanAppleSilicon(top *Topology) { eCoreSpeed := KHz(m1cpu.ECoreHz() / 1000) top.Cores = make([]Core, pCoreCount+eCoreCount) - nthCore := CoreID(0) + nthCore := hw.CoreID(0) for i := 0; i < pCoreCount; i++ { top.insert(nodeID, socketID, nthCore, performance, maxSpeed, pCoreSpeed) @@ -69,6 +70,6 @@ func (m *MacOS) scanLegacyX86(top *Topology) { top.Cores = make([]Core, coreCount) for i := 0; i < int(coreCount); i++ { - top.insert(nodeID, socketID, CoreID(i), performance, maxSpeed, coreSpeed) + top.insert(nodeID, socketID, hw.CoreID(i), performance, maxSpeed, coreSpeed) } } diff --git a/client/lib/numalib/detect_default.go b/client/lib/numalib/detect_default.go index b603a64bf43..dc4fca8cbd1 100644 --- a/client/lib/numalib/detect_default.go +++ b/client/lib/numalib/detect_default.go @@ -10,6 +10,7 @@ import ( "time" "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" "github.com/shirou/gopsutil/v3/cpu" ) @@ -22,8 +23,8 @@ func PlatformScanners() []SystemScanner { } const ( - nodeID = NodeID(0) - socketID = SocketID(0) + nodeID = hw.NodeID(0) + socketID = hw.SocketID(0) maxSpeed = KHz(0) ) @@ -34,7 +35,7 @@ type Generic struct{} func (g *Generic) ScanSystem(top *Topology) { // hardware may or may not be NUMA, but for now we only // detect such topology on linux systems - top.NodeIDs = idset.Empty[NodeID]() + top.NodeIDs = idset.Empty[hw.NodeID]() top.NodeIDs.Insert(nodeID) // cores @@ -55,6 +56,6 @@ func (g *Generic) ScanSystem(top *Topology) { for i := 0; i < count; i++ { info := infos[0] speed := KHz(MHz(info.Mhz) * 1000) - top.insert(nodeID, socketID, CoreID(i), performance, maxSpeed, speed) + top.insert(nodeID, socketID, hw.CoreID(i), performance, maxSpeed, speed) } } diff --git a/client/lib/numalib/detect_linux.go b/client/lib/numalib/detect_linux.go index 8b3e8371211..fbb966a789d 100644 --- a/client/lib/numalib/detect_linux.go +++ b/client/lib/numalib/detect_linux.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/nomad/client/lib/cgroupslib" "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" ) // PlatformScanners returns the set of SystemScanner for Linux. @@ -59,7 +60,7 @@ func (*Sysfs) available() bool { } func (*Sysfs) discoverOnline(st *Topology) { - ids, err := getIDSet[NodeID](nodeOnline) + ids, err := getIDSet[hw.NodeID](nodeOnline) if err == nil { st.NodeIDs = ids } @@ -72,7 +73,7 @@ func (*Sysfs) discoverCosts(st *Topology) { st.Distances[i] = make([]Cost, dimension) } - _ = st.NodeIDs.ForEach(func(id NodeID) error { + _ = st.NodeIDs.ForEach(func(id hw.NodeID) error { s, err := getString(distanceFile, id) if err != nil { return err @@ -87,25 +88,25 @@ func (*Sysfs) discoverCosts(st *Topology) { } func (*Sysfs) discoverCores(st *Topology) { - onlineCores, err := getIDSet[CoreID](cpuOnline) + onlineCores, err := getIDSet[hw.CoreID](cpuOnline) if err != nil { return } st.Cores = make([]Core, onlineCores.Size()) - _ = st.NodeIDs.ForEach(func(node NodeID) error { + _ = st.NodeIDs.ForEach(func(node hw.NodeID) error { s, err := os.ReadFile(fmt.Sprintf(cpulistFile, node)) if err != nil { return err } - cores := idset.Parse[CoreID](string(s)) - _ = cores.ForEach(func(core CoreID) error { + cores := idset.Parse[hw.CoreID](string(s)) + _ = cores.ForEach(func(core hw.CoreID) error { // best effort, zero values are defaults - socket, _ := getNumeric[SocketID](cpuSocketFile, core) + socket, _ := getNumeric[hw.SocketID](cpuSocketFile, core) max, _ := getNumeric[KHz](cpuMaxFile, core) base, _ := getNumeric[KHz](cpuBaseFile, core) - siblings, _ := getIDSet[CoreID](cpuSiblingFile, core) + siblings, _ := getIDSet[hw.CoreID](cpuSiblingFile, core) st.insert(node, socket, core, gradeOf(siblings), max, base) return nil }) @@ -182,7 +183,7 @@ func (s *Cgroups2) ScanSystem(top *Topology) { // combine scanCgroups func scanIDs(top *Topology, content string) { - ids := idset.Parse[CoreID](content) + ids := idset.Parse[hw.CoreID](content) for _, cpu := range top.Cores { if !ids.Contains(cpu.ID) { cpu.Disable = true diff --git a/client/lib/numalib/hw/ids.go b/client/lib/numalib/hw/ids.go new file mode 100644 index 00000000000..811df0e4b28 --- /dev/null +++ b/client/lib/numalib/hw/ids.go @@ -0,0 +1,20 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +// Package hw provides types for identifying hardware. +// +// This is a separate "leaf" package that is easy to import from many other +// packages without creating circular imports. +package hw + +type ( + // A NodeID represents a NUMA node. There could be more than + // one NUMA node per socket. + NodeID uint8 + + // A SocketID represents a physicsl CPU socket. + SocketID uint8 + + // A CoreID represents one logical (vCPU) core. + CoreID uint16 +) diff --git a/client/lib/numalib/topology.go b/client/lib/numalib/topology.go index 2196ef326bf..99085817a29 100644 --- a/client/lib/numalib/topology.go +++ b/client/lib/numalib/topology.go @@ -14,6 +14,7 @@ import ( "strings" "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" ) // CoreGrade describes whether a specific core is a performance or efficiency @@ -26,7 +27,7 @@ const ( efficiency CoreGrade = false ) -func gradeOf(siblings *idset.Set[CoreID]) CoreGrade { +func gradeOf(siblings *idset.Set[hw.CoreID]) CoreGrade { switch siblings.Size() { case 0, 1: return efficiency @@ -45,13 +46,10 @@ func (g CoreGrade) String() string { } type ( - NodeID uint8 - SocketID uint8 - CoreID uint16 - KHz uint64 - MHz uint64 - GHz float64 - Cost uint8 + KHz uint64 + MHz uint64 + GHz float64 + Cost uint8 ) func (khz KHz) MHz() MHz { @@ -67,9 +65,9 @@ func (khz KHz) String() string { // The JSON encoding is not used yet but my be part of the gRPC plumbing // in the future. type Topology struct { - NodeIDs *idset.Set[NodeID] `json:"node_ids"` - Distances SLIT `json:"distances"` - Cores []Core `json:"cores"` + NodeIDs *idset.Set[hw.NodeID] `json:"node_ids"` + Distances SLIT `json:"distances"` + Cores []Core `json:"cores"` // explicit overrides from client configuration OverrideTotalCompute MHz `json:"override_total_compute"` @@ -79,14 +77,14 @@ type Topology struct { // A Core represents one logical (vCPU) core on a processor. Basically the slice // of cores detected should match up with the vCPU description in cloud providers. type Core struct { - NodeID NodeID `json:"node_id"` - SocketID SocketID `json:"socket_id"` - ID CoreID `json:"id"` - Grade CoreGrade `json:"grade"` - Disable bool `json:"disable"` // indicates whether Nomad must not use this core - BaseSpeed MHz `json:"base_speed"` // cpuinfo_base_freq (primary choice) - MaxSpeed MHz `json:"max_speed"` // cpuinfo_max_freq (second choice) - GuessSpeed MHz `json:"guess_speed"` // best effort (fallback) + NodeID hw.NodeID `json:"node_id"` + SocketID hw.SocketID `json:"socket_id"` + ID hw.CoreID `json:"id"` + Grade CoreGrade `json:"grade"` + Disable bool `json:"disable"` // indicates whether Nomad must not use this core + BaseSpeed MHz `json:"base_speed"` // cpuinfo_base_freq (primary choice) + MaxSpeed MHz `json:"max_speed"` // cpuinfo_max_freq (second choice) + GuessSpeed MHz `json:"guess_speed"` // best effort (fallback) } func (c Core) String() string { @@ -110,7 +108,7 @@ func (c Core) MHz() MHz { // accessing memory across each combination of NUMA boundary. type SLIT [][]Cost -func (d SLIT) cost(a, b NodeID) Cost { +func (d SLIT) cost(a, b hw.NodeID) Cost { return d[a][b] } @@ -126,7 +124,7 @@ func (st *Topology) SupportsNUMA() bool { } // Nodes returns the set of NUMA Node IDs. -func (st *Topology) Nodes() *idset.Set[NodeID] { +func (st *Topology) Nodes() *idset.Set[hw.NodeID] { if !st.SupportsNUMA() { return nil } @@ -134,8 +132,8 @@ func (st *Topology) Nodes() *idset.Set[NodeID] { } // NodeCores returns the set of Core IDs for the given NUMA Node ID. -func (st *Topology) NodeCores(node NodeID) *idset.Set[CoreID] { - result := idset.Empty[CoreID]() +func (st *Topology) NodeCores(node hw.NodeID) *idset.Set[hw.CoreID] { + result := idset.Empty[hw.CoreID]() for _, cpu := range st.Cores { if cpu.NodeID == node { result.Insert(cpu.ID) @@ -144,7 +142,7 @@ func (st *Topology) NodeCores(node NodeID) *idset.Set[CoreID] { return result } -func (st *Topology) insert(node NodeID, socket SocketID, core CoreID, grade CoreGrade, max, base KHz) { +func (st *Topology) insert(node hw.NodeID, socket hw.SocketID, core hw.CoreID, grade CoreGrade, max, base KHz) { st.Cores[core] = Core{ NodeID: node, SocketID: socket, @@ -226,8 +224,8 @@ func (st *Topology) NumECores() int { // UsableCores returns the number of logical cores usable by the Nomad client // for running tasks. Nomad must subtract off any reserved cores (reserved.cores) // and/or must mask the cpuset to the one set in config (config.reservable_cores). -func (st *Topology) UsableCores() *idset.Set[CoreID] { - result := idset.Empty[CoreID]() +func (st *Topology) UsableCores() *idset.Set[hw.CoreID] { + result := idset.Empty[hw.CoreID]() for _, cpu := range st.Cores { if !cpu.Disable { result.Insert(cpu.ID) diff --git a/client/lib/proclib/config.go b/client/lib/proclib/config.go index fe1accb8ad6..2d9ce83bca7 100644 --- a/client/lib/proclib/config.go +++ b/client/lib/proclib/config.go @@ -5,6 +5,8 @@ package proclib import ( "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" ) // Configs is used to pass along values from client configuration that are @@ -12,4 +14,8 @@ import ( // was set in agent configuration. type Configs struct { Logger hclog.Logger + + // UsableCores is the actual set of cpu cores Nomad is able and + // allowed to use. + UsableCores *idset.Set[hw.CoreID] } diff --git a/client/lib/proclib/testing.go b/client/lib/proclib/testing.go new file mode 100644 index 00000000000..77114546495 --- /dev/null +++ b/client/lib/proclib/testing.go @@ -0,0 +1,38 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package proclib + +import ( + "github.com/hashicorp/nomad/helper/testlog" + testing "github.com/mitchellh/go-testing-interface" +) + +func MockWranglers(t testing.T) *Wranglers { + return &Wranglers{ + configs: &Configs{ + Logger: testlog.HCLogger(t), + }, + m: make(map[Task]ProcessWrangler), + create: mocks, + } +} + +func mocks(Task) ProcessWrangler { + return new(mock) +} + +type mock struct { +} + +func (m *mock) Initialize() error { + return nil +} + +func (m *mock) Kill() error { + return nil +} + +func (m *mock) Cleanup() error { + return nil +} diff --git a/client/lib/proclib/wrangler.go b/client/lib/proclib/wrangler.go index b59b3ce849c..c73a03d9cc6 100644 --- a/client/lib/proclib/wrangler.go +++ b/client/lib/proclib/wrangler.go @@ -9,10 +9,12 @@ import ( ) // Task records the unique coordinates of a task from the perspective of a Nomad -// client running the task, that is to say (alloc_id, task_name). +// client running the task, that is to say (alloc_id, task_name). Also indicates +// whether the task is making use of reserved cpu cores. type Task struct { AllocID string Task string + Cores bool } func (task Task) String() string { diff --git a/client/lib/proclib/wrangler_cg1_linux.go b/client/lib/proclib/wrangler_cg1_linux.go index cb8fe9f45ce..a8654a6337d 100644 --- a/client/lib/proclib/wrangler_cg1_linux.go +++ b/client/lib/proclib/wrangler_cg1_linux.go @@ -25,12 +25,12 @@ type LinuxWranglerCG1 struct { func newCG1(c *Configs) create { logger := c.Logger.Named("cg1") - cgroupslib.Init(logger) + cgroupslib.Init(logger, c.UsableCores.String()) return func(task Task) ProcessWrangler { return &LinuxWranglerCG1{ task: task, log: logger, - cg: cgroupslib.Factory(task.AllocID, task.Task), + cg: cgroupslib.Factory(task.AllocID, task.Task, task.Cores), } } } diff --git a/client/lib/proclib/wrangler_cg2_linux.go b/client/lib/proclib/wrangler_cg2_linux.go index 9c2946f2b25..8dcbf3dfe5c 100644 --- a/client/lib/proclib/wrangler_cg2_linux.go +++ b/client/lib/proclib/wrangler_cg2_linux.go @@ -22,12 +22,12 @@ type LinuxWranglerCG2 struct { func newCG2(c *Configs) create { logger := c.Logger.Named("cg2") - cgroupslib.Init(logger) + cgroupslib.Init(logger, c.UsableCores.String()) return func(task Task) ProcessWrangler { return &LinuxWranglerCG2{ task: task, log: c.Logger, - cg: cgroupslib.Factory(task.AllocID, task.Task), + cg: cgroupslib.Factory(task.AllocID, task.Task, task.Cores), } } } diff --git a/client/state/upgrade_int_test.go b/client/state/upgrade_int_test.go index f055ca06bbf..b2f1f41856e 100644 --- a/client/state/upgrade_int_test.go +++ b/client/state/upgrade_int_test.go @@ -20,6 +20,7 @@ import ( clientconfig "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/devicemanager" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" + "github.com/hashicorp/nomad/client/lib/cgroupslib" "github.com/hashicorp/nomad/client/lib/proclib" "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" regMock "github.com/hashicorp/nomad/client/serviceregistration/mock" @@ -214,7 +215,8 @@ func checkUpgradedAlloc(t *testing.T, path string, db StateDB, alloc *structs.Al PrevAllocMigrator: allocwatcher.NoopPrevAlloc{}, DeviceManager: devicemanager.NoopMockManager(), DriverManager: drivermanager.TestDriverManager(t), - Wranglers: proclib.New(&proclib.Configs{Logger: testlog.HCLogger(t)}), + Wranglers: proclib.MockWranglers(t), + Partitions: cgroupslib.NoopPartition(), } ar, err := allocrunner.NewAllocRunner(conf) require.NoError(t, err) diff --git a/command/agent/agent.go b/command/agent/agent.go index 18c0c3092b2..53195f6ca64 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -23,15 +23,17 @@ import ( uuidparse "github.com/hashicorp/go-uuid" "github.com/hashicorp/nomad/client" clientconfig "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/command/agent/event" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/bufconndialer" "github.com/hashicorp/nomad/helper/escapingfs" "github.com/hashicorp/nomad/helper/pluginutils/loader" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/uuid" - "github.com/hashicorp/nomad/lib/cpuset" "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/deploymentwatcher" "github.com/hashicorp/nomad/nomad/structs" @@ -791,12 +793,26 @@ func convertClientConfig(agentConfig *Config) (*clientconfig.Config, error) { res.Memory.MemoryMB = int64(agentConfig.Client.Reserved.MemoryMB) res.Disk.DiskMB = int64(agentConfig.Client.Reserved.DiskMB) res.Networks.ReservedHostPorts = agentConfig.Client.Reserved.ReservedPorts - if agentConfig.Client.Reserved.Cores != "" { - cores, err := cpuset.Parse(agentConfig.Client.Reserved.Cores) - if err != nil { - return nil, fmt.Errorf("failed to parse client > reserved > cores value %q: %v", agentConfig.Client.Reserved.Cores, err) - } - res.Cpu.ReservedCpuCores = cores.ToSlice() + + // Operators may set one of + // + // - config.reservable_cores (highest precedence) for specifying which cpu cores + // nomad tasks may run on + // + // - config.reserved.cores (lowest precedence) for specifying which cpu cores + // nomad tasks may NOT run on + // + // In either case we will compute the partitioning and have it enforced by + // cgroups (on linux). In -dev mode we let nomad use 2 cores. + if agentConfig.Client.ReservableCores != "" { + cores := idset.Parse[hw.CoreID](agentConfig.Client.ReservableCores) + conf.ReservableCores = cores.Slice() + } else if agentConfig.Client.Reserved.Cores != "" { + cores := idset.Parse[hw.CoreID](agentConfig.Client.Reserved.Cores) + res.Cpu.ReservedCpuCores = helper.ConvertSlice( + cores.Slice(), + func(id hw.CoreID) uint16 { return uint16(id) }, + ) } conf.Version = agentConfig.Version diff --git a/command/agent/config.go b/command/agent/config.go index ec7863edcd5..d22d732a362 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -261,7 +261,7 @@ type ClientConfig struct { DiskFreeMB int `hcl:"disk_free_mb"` // ReservableCores is used to override detected reservable cpu cores. - ReserveableCores string `hcl:"reservable_cores"` + ReservableCores string `hcl:"reservable_cores"` // MaxKillTimeout allows capping the user-specifiable KillTimeout. MaxKillTimeout string `hcl:"max_kill_timeout"` @@ -1271,10 +1271,10 @@ func DevConfig(mode *devModeConfig) *Config { conf.Client.Options[fingerprint.TightenNetworkTimeoutsConfig] = "true" conf.Client.BindWildcardDefaultHostNetwork = true conf.Client.NomadServiceDiscovery = pointer.Of(true) + conf.Client.ReservableCores = "" // inherit all the cores conf.Telemetry.PrometheusMetrics = true conf.Telemetry.PublishAllocationMetrics = true conf.Telemetry.PublishNodeMetrics = true - return conf } @@ -2161,8 +2161,8 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig { } else if b.Reserved != nil { result.Reserved = result.Reserved.Merge(b.Reserved) } - if b.ReserveableCores != "" { - result.ReserveableCores = b.ReserveableCores + if b.ReservableCores != "" { + result.ReservableCores = b.ReservableCores } if b.GCInterval != 0 { result.GCInterval = b.GCInterval diff --git a/command/agent/consul/int_test.go b/command/agent/consul/int_test.go index 10a984fe784..27e5d945f88 100644 --- a/command/agent/consul/int_test.go +++ b/command/agent/consul/int_test.go @@ -170,7 +170,7 @@ func TestConsul_Integration(t *testing.T) { DriverManager: drivermanager.TestDriverManager(t), StartConditionMetCh: closedCh, ServiceRegWrapper: wrapper.NewHandlerWrapper(logger, serviceClient, regMock.NewServiceRegistrationHandler(logger)), - Wranglers: proclib.New(&proclib.Configs{Logger: testlog.HCLogger(t)}), + Wranglers: proclib.MockWranglers(t), } tr, err := taskrunner.NewTaskRunner(config) diff --git a/drivers/docker/cpuset.go b/drivers/docker/cpuset.go new file mode 100644 index 00000000000..7534c3364ef --- /dev/null +++ b/drivers/docker/cpuset.go @@ -0,0 +1,77 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package docker + +import ( + "os" + "path/filepath" + "time" + + "github.com/hashicorp/nomad/helper" +) + +const ( + // cpusetSyncPeriod is how often we check to see if the cpuset of a task + // needs to be updated - if there is no work to do, no action is taken + cpusetSyncPeriod = 3 * time.Second +) + +// cpuset is used to manage the cpuset.cpus interface file in the cgroup that +// docker daemon creates for the container being run by the task driver. we +// must do this hack because docker does not allow specifying a pre-existing +// cgroup in which to run the container (i.e. one that we control). +type cpuset struct { + doneCh <-chan bool + source string + destination string + previous string + sync func(string, string) +} + +func (c *cpuset) watch() { + if c.sync == nil { + // use the real thing if we are not doing tests + c.sync = c.copyCpuset + } + + ticks, cancel := helper.NewSafeTimer(cpusetSyncPeriod) + defer cancel() + + for { + select { + case <-c.doneCh: + return + case <-ticks.C: + c.sync(c.source, c.destination) + ticks.Reset(cpusetSyncPeriod) + } + } +} + +func (c *cpuset) copyCpuset(source, destination string) { + source = filepath.Join(source, "cpuset.cpus.effective") + destination = filepath.Join(destination, "cpuset.cpus") + + // read the current value of usable cores + b, err := os.ReadFile(source) + if err != nil { + return + } + + // if the current value is the same as the value we wrote last, + // there is nothing to do + current := string(b) + if current == c.previous { + return + } + + // otherwise write the new value + err = os.WriteFile(destination, b, 0644) + if err != nil { + return + } + + // we wrote a new value; store that value so we do not write it again + c.previous = current +} diff --git a/drivers/docker/cpuset_test.go b/drivers/docker/cpuset_test.go new file mode 100644 index 00000000000..19cd32e09b6 --- /dev/null +++ b/drivers/docker/cpuset_test.go @@ -0,0 +1,42 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package docker + +import ( + "testing" + "time" + + "github.com/hashicorp/nomad/ci" + "github.com/shoenig/test/must" +) + +func Test_cpuset_watch(t *testing.T) { + ci.Parallel(t) + + doneCh := make(chan bool) + + source := "/source" + destination := "/destination" + hits := 0 + + callback := func(s, d string) { + must.Eq(t, source, s) + must.Eq(t, destination, d) + hits++ + } + + c := &cpuset{ + doneCh: doneCh, + source: "/source", + destination: "/destination", + previous: "", + sync: callback, + } + go c.watch() + + time.Sleep(3*time.Second + 10*time.Millisecond) + doneCh <- true + + must.Eq(t, 1, hits) +} diff --git a/drivers/docker/driver.go b/drivers/docker/driver.go index d2ac7015090..26bf62566e2 100644 --- a/drivers/docker/driver.go +++ b/drivers/docker/driver.go @@ -250,6 +250,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { logger: d.logger.With("container_id", container.ID), task: handle.Config, containerID: container.ID, + containerCgroup: container.HostConfig.Cgroup, containerImage: container.Image, doneCh: make(chan bool), waitCh: make(chan struct{}), @@ -977,12 +978,13 @@ func (d *Driver) createContainerConfig(task *drivers.TaskConfig, driverConfig *T } hostConfig := &docker.HostConfig{ - // TODO(shoenig) set cgroup parent when we do partitioning + // do not set cgroup parent anymore Memory: memory, // hard limit MemoryReservation: memoryReservation, // soft limit - CPUShares: task.Resources.LinuxResources.CPUShares, + CPUShares: task.Resources.LinuxResources.CPUShares, + CPUSetCPUs: task.Resources.LinuxResources.CpusetCpus, // Binds are used to mount a host volume into the container. We mount a // local directory for storage and a shared alloc directory that can be @@ -999,12 +1001,10 @@ func (d *Driver) createContainerConfig(task *drivers.TaskConfig, driverConfig *T GroupAdd: driverConfig.GroupAdd, } - // This translates to docker create/run --cpuset-cpus option. - // --cpuset-cpus limit the specific CPUs or cores a container can use. - // Nomad natively manages cpusets, setting this option will override - // Nomad managed cpusets. + // Setting cpuset_cpus in driver config is no longer supported (it has + // not worked correctly since Nomad 0.12) if driverConfig.CPUSetCPUs != "" { - hostConfig.CPUSetCPUs = driverConfig.CPUSetCPUs + d.logger.Warn("cpuset_cpus is no longer supported") } // Enable tini (docker-init) init system. diff --git a/drivers/docker/driver_test.go b/drivers/docker/driver_test.go index 289503e0931..6c853ab6fb7 100644 --- a/drivers/docker/driver_test.go +++ b/drivers/docker/driver_test.go @@ -1560,6 +1560,8 @@ func TestDockerDriver_Init(t *testing.T) { } func TestDockerDriver_CPUSetCPUs(t *testing.T) { + // The cpuset_cpus config option is ignored starting in Nomad 1.7 + ci.Parallel(t) testutil.DockerCompatible(t) testutil.CgroupsCompatible(t) @@ -1570,15 +1572,15 @@ func TestDockerDriver_CPUSetCPUs(t *testing.T) { }{ { Name: "Single CPU", - CPUSetCPUs: "0", + CPUSetCPUs: "", }, { Name: "Comma separated list of CPUs", - CPUSetCPUs: "0,1", + CPUSetCPUs: "", }, { Name: "Range of CPUs", - CPUSetCPUs: "0-1", + CPUSetCPUs: "", }, } @@ -1587,16 +1589,16 @@ func TestDockerDriver_CPUSetCPUs(t *testing.T) { task, cfg, _ := dockerTask(t) cfg.CPUSetCPUs = testCase.CPUSetCPUs - require.NoError(t, task.EncodeConcreteDriverConfig(cfg)) + must.NoError(t, task.EncodeConcreteDriverConfig(cfg)) client, d, handle, cleanup := dockerSetup(t, task, nil) defer cleanup() - require.NoError(t, d.WaitUntilStarted(task.ID, 5*time.Second)) + must.NoError(t, d.WaitUntilStarted(task.ID, 5*time.Second)) container, err := client.InspectContainer(handle.containerID) - require.NoError(t, err) + must.NoError(t, err) - require.Equal(t, cfg.CPUSetCPUs, container.HostConfig.CPUSetCPUs) + must.Eq(t, cfg.CPUSetCPUs, container.HostConfig.CPUSetCPUs) }) } } diff --git a/drivers/docker/handle.go b/drivers/docker/handle.go index 6a2b8318258..3096796c7dd 100644 --- a/drivers/docker/handle.go +++ b/drivers/docker/handle.go @@ -18,7 +18,7 @@ import ( "github.com/hashicorp/consul-template/signals" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-plugin" - + "github.com/hashicorp/nomad/client/lib/cgroupslib" "github.com/hashicorp/nomad/drivers/docker/docklog" "github.com/hashicorp/nomad/plugins/drivers" pstructs "github.com/hashicorp/nomad/plugins/shared/structs" @@ -41,6 +41,7 @@ type taskHandle struct { dloggerPluginClient *plugin.Client task *drivers.TaskConfig containerID string + containerCgroup string containerImage string doneCh chan bool waitCh chan struct{} @@ -243,9 +244,42 @@ func (h *taskHandle) shutdownLogger() { h.dloggerPluginClient.Kill() } +func (h *taskHandle) startCpusetFixer() { + if cgroupslib.GetMode() == cgroupslib.OFF { + return + } + + if h.task.Resources.LinuxResources.CpusetCpus != "" { + // nothing to fixup if the task is given static cores + return + } + + cgroup := h.containerCgroup + if cgroup == "" { + // The api does not actually set this value, so we are left to compute it ourselves. + // Luckily this is documented, + // https://docs.docker.com/config/containers/runmetrics/#find-the-cgroup-for-a-given-container + switch cgroupslib.GetMode() { + case cgroupslib.CG1: + cgroup = "/sys/fs/cgroup/cpuset/docker/" + h.containerID + default: + // systemd driver; not sure if we need to consider cgroupfs driver + cgroup = "/sys/fs/cgroup/system.slice/docker-" + h.containerID + ".scope" + } + } + + go (&cpuset{ + doneCh: h.doneCh, + source: h.task.Resources.LinuxResources.CpusetCgroupPath, + destination: cgroup, + }).watch() +} + func (h *taskHandle) run() { defer h.shutdownLogger() + h.startCpusetFixer() + exitCode, werr := h.infinityClient.WaitContainer(h.containerID) if werr != nil { h.logger.Error("failed to wait for container; already terminated") diff --git a/drivers/exec/driver_test.go b/drivers/exec/driver_test.go index a5d0a8b8385..49fc754c90d 100644 --- a/drivers/exec/driver_test.go +++ b/drivers/exec/driver_test.go @@ -57,7 +57,7 @@ func testResources(allocID, task string) *drivers.Resources { LinuxResources: &drivers.LinuxResources{ MemoryLimitBytes: 134217728, CPUShares: 100, - CpusetCgroupPath: cgroupslib.LinuxResourcesPath(allocID, task), + CpusetCgroupPath: cgroupslib.LinuxResourcesPath(allocID, task, false), }, } diff --git a/drivers/java/driver_test.go b/drivers/java/driver_test.go index 766875184ca..b2770b10d10 100644 --- a/drivers/java/driver_test.go +++ b/drivers/java/driver_test.go @@ -299,7 +299,7 @@ func basicTask(t *testing.T, name string, taskConfig *TaskConfig) *drivers.TaskC LinuxResources: &drivers.LinuxResources{ MemoryLimitBytes: 134217728, CPUShares: 100, - CpusetCgroupPath: cgroupslib.LinuxResourcesPath(allocID, name), + CpusetCgroupPath: cgroupslib.LinuxResourcesPath(allocID, name, false), }, }, } diff --git a/drivers/rawexec/driver_test.go b/drivers/rawexec/driver_test.go index 9130dee08e4..142125abe36 100644 --- a/drivers/rawexec/driver_test.go +++ b/drivers/rawexec/driver_test.go @@ -58,7 +58,7 @@ func testResources(allocID, task string) *drivers.Resources { LinuxResources: &drivers.LinuxResources{ MemoryLimitBytes: 134217728, CPUShares: 100, - CpusetCgroupPath: cgroupslib.LinuxResourcesPath(allocID, task), + CpusetCgroupPath: cgroupslib.LinuxResourcesPath(allocID, task, false), }, } diff --git a/drivers/shared/executor/executor.go b/drivers/shared/executor/executor.go index f138308c14d..59098e19d69 100644 --- a/drivers/shared/executor/executor.go +++ b/drivers/shared/executor/executor.go @@ -21,6 +21,7 @@ import ( hclog "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/lib/cgroupslib" "github.com/hashicorp/nomad/client/lib/cpustats" "github.com/hashicorp/nomad/client/lib/fifo" "github.com/hashicorp/nomad/client/lib/numalib" @@ -158,21 +159,44 @@ type ExecCommand struct { Capabilities []string } -// Cgroup returns the path to the cgroup the Nomad client is managing for the -// task that is about to be run. +// CpusetCgroup returns the path to the cgroup in which the Nomad client will +// write the PID of the task process for managing cpu core usage. // // On cgroups v1 systems this returns the path to the cpuset cgroup specifically. +// Critical: is "/reserve/" or "/share"; do not try to parse this! // -// On cgroups v2 systems this returns the patah to the task's scope. +// On cgroups v2 systems this just returns the unified cgroup. // // On non-Linux systems this returns the empty string and has no meaning. -func (c *ExecCommand) Cgroup() string { +func (c *ExecCommand) CpusetCgroup() string { if c == nil || c.Resources == nil || c.Resources.LinuxResources == nil { return "" } return c.Resources.LinuxResources.CpusetCgroupPath } +// StatsCgroup returns the path to the cgroup Nomad client will use to inspect +// for spawned process IDs. +// +// On cgroups v1 systems this returns the path to the freezer cgroup. +// +// On cgroups v2 systems this just returns the unified cgroup. +// +// On non-Linux systems this returns the empty string and has no meaning. +func (c *ExecCommand) StatsCgroup() string { + if c == nil || c.Resources == nil || c.Resources.LinuxResources == nil { + return "" + } + switch cgroupslib.GetMode() { + case cgroupslib.CG1: + taskName := filepath.Base(c.TaskDir) + allocID := filepath.Base(filepath.Dir(c.TaskDir)) + return cgroupslib.PathCG1(allocID, taskName, "freezer") + default: + return c.CpusetCgroup() + } +} + // SetWriters sets the writer for the process stdout and stderr. This should // not be used if writing to a file path such as a fifo file. SetStdoutWriter // is mainly used for unit testing purposes. @@ -364,7 +388,7 @@ func (e *UniversalExecutor) Exec(deadline time.Time, name string, args []string) ctx, cancel := context.WithDeadline(context.Background(), deadline) defer cancel() - if cleanup, err := e.setSubCmdCgroup(&e.childCmd, e.command.Cgroup()); err != nil { + if cleanup, err := e.setSubCmdCgroup(&e.childCmd, e.command.StatsCgroup()); err != nil { return nil, 0, err } else { defer cleanup() @@ -455,7 +479,7 @@ func (e *UniversalExecutor) ExecStreaming(ctx context.Context, command []string, return err } } - cgroup := e.command.Cgroup() + cgroup := e.command.StatsCgroup() if cleanup, err := e.setSubCmdCgroup(cmd, cgroup); err != nil { return err } else { diff --git a/drivers/shared/executor/executor_linux.go b/drivers/shared/executor/executor_linux.go index a509426ac53..b4e84e662b2 100644 --- a/drivers/shared/executor/executor_linux.go +++ b/drivers/shared/executor/executor_linux.go @@ -34,7 +34,7 @@ import ( "github.com/hashicorp/nomad/plugins/drivers" "github.com/opencontainers/runc/libcontainer" "github.com/opencontainers/runc/libcontainer/cgroups" - lconfigs "github.com/opencontainers/runc/libcontainer/configs" + runc "github.com/opencontainers/runc/libcontainer/configs" "github.com/opencontainers/runc/libcontainer/devices" ldevices "github.com/opencontainers/runc/libcontainer/devices" "github.com/opencontainers/runc/libcontainer/specconv" @@ -510,13 +510,13 @@ func (l *LibcontainerExecutor) handleExecWait(ch chan *waitResult, process *libc ch <- &waitResult{ps, err} } -func configureCapabilities(cfg *lconfigs.Config, command *ExecCommand) { +func configureCapabilities(cfg *runc.Config, command *ExecCommand) { switch command.User { case "root": // when running as root, use the legacy set of system capabilities, so // that we do not break existing nomad clusters using this "feature" legacyCaps := capabilities.LegacySupported().Slice(true) - cfg.Capabilities = &lconfigs.Capabilities{ + cfg.Capabilities = &runc.Capabilities{ Bounding: legacyCaps, Permitted: legacyCaps, Effective: legacyCaps, @@ -531,7 +531,7 @@ func configureCapabilities(cfg *lconfigs.Config, command *ExecCommand) { // that capabilities are Permitted and Inheritable. Setting Effective // is unnecessary, because we only need the capabilities to become // effective _after_ execve, not before. - cfg.Capabilities = &lconfigs.Capabilities{ + cfg.Capabilities = &runc.Capabilities{ Bounding: command.Capabilities, Permitted: command.Capabilities, Inheritable: command.Capabilities, @@ -540,13 +540,13 @@ func configureCapabilities(cfg *lconfigs.Config, command *ExecCommand) { } } -func configureNamespaces(pidMode, ipcMode string) lconfigs.Namespaces { - namespaces := lconfigs.Namespaces{{Type: lconfigs.NEWNS}} +func configureNamespaces(pidMode, ipcMode string) runc.Namespaces { + namespaces := runc.Namespaces{{Type: runc.NEWNS}} if pidMode == IsolationModePrivate { - namespaces = append(namespaces, lconfigs.Namespace{Type: lconfigs.NEWPID}) + namespaces = append(namespaces, runc.Namespace{Type: runc.NEWPID}) } if ipcMode == IsolationModePrivate { - namespaces = append(namespaces, lconfigs.Namespace{Type: lconfigs.NEWIPC}) + namespaces = append(namespaces, runc.Namespace{Type: runc.NEWIPC}) } return namespaces } @@ -558,7 +558,7 @@ func configureNamespaces(pidMode, ipcMode string) lconfigs.Namespaces { // * dedicated mount points namespace, but shares the PID, User, domain, network namespaces with host // * small subset of devices (e.g. stdout/stderr/stdin, tty, shm, pts); default to using the same set of devices as Docker // * some special filesystems: `/proc`, `/sys`. Some case is given to avoid exec escaping or setting malicious values through them. -func configureIsolation(cfg *lconfigs.Config, command *ExecCommand) error { +func configureIsolation(cfg *runc.Config, command *ExecCommand) error { defaultMountFlags := syscall.MS_NOEXEC | syscall.MS_NOSUID | syscall.MS_NODEV // set the new root directory for the container @@ -571,8 +571,8 @@ func configureIsolation(cfg *lconfigs.Config, command *ExecCommand) error { cfg.Namespaces = configureNamespaces(command.ModePID, command.ModeIPC) if command.NetworkIsolation != nil { - cfg.Namespaces = append(cfg.Namespaces, lconfigs.Namespace{ - Type: lconfigs.NEWNET, + cfg.Namespaces = append(cfg.Namespaces, runc.Namespace{ + Type: runc.NEWNET, Path: command.NetworkIsolation.Path, }) } @@ -597,7 +597,7 @@ func configureIsolation(cfg *lconfigs.Config, command *ExecCommand) error { cfg.Devices = append(cfg.Devices, devs...) } - cfg.Mounts = []*lconfigs.Mount{ + cfg.Mounts = []*runc.Mount{ { Source: "tmpfs", Destination: "/dev", @@ -646,20 +646,21 @@ func configureIsolation(cfg *lconfigs.Config, command *ExecCommand) error { return nil } -func (l *LibcontainerExecutor) configureCgroups(cfg *lconfigs.Config, command *ExecCommand) error { +func (l *LibcontainerExecutor) configureCgroups(cfg *runc.Config, command *ExecCommand) error { // note: an alloc TR hook pre-creates the cgroup(s) in both v1 and v2 if !command.ResourceLimits { return nil } - cg := command.Cgroup() + cg := command.StatsCgroup() if cg == "" { return errors.New("cgroup must be set") } - // set the libcontainer hook for writing the PID to cgroup.procs file - l.configureCgroupHook(cfg, command) + // // set the libcontainer hook for writing the PID to cgroup.procs file + // TODO: this can be cg1 only, right? + // l.configureCgroupHook(cfg, command) // set the libcontainer memory limits l.configureCgroupMemory(cfg, command) @@ -673,15 +674,15 @@ func (l *LibcontainerExecutor) configureCgroups(cfg *lconfigs.Config, command *E } } -func (*LibcontainerExecutor) configureCgroupHook(cfg *lconfigs.Config, command *ExecCommand) { - cfg.Hooks = lconfigs.Hooks{ - lconfigs.CreateRuntime: lconfigs.HookList{ +func (*LibcontainerExecutor) configureCgroupHook(cfg *runc.Config, command *ExecCommand) { + cfg.Hooks = runc.Hooks{ + runc.CreateRuntime: runc.HookList{ newSetCPUSetCgroupHook(command.Resources.LinuxResources.CpusetCgroupPath), }, } } -func (l *LibcontainerExecutor) configureCgroupMemory(cfg *lconfigs.Config, command *ExecCommand) { +func (l *LibcontainerExecutor) configureCgroupMemory(cfg *runc.Config, command *ExecCommand) { // Total amount of memory allowed to consume res := command.Resources.NomadResources memHard, memSoft := res.Memory.MemoryMaxMB, res.Memory.MemoryMB @@ -697,37 +698,65 @@ func (l *LibcontainerExecutor) configureCgroupMemory(cfg *lconfigs.Config, comma cfg.Cgroups.Resources.MemorySwappiness = cgroupslib.MaybeDisableMemorySwappiness() } -func (*LibcontainerExecutor) configureCG1(cfg *lconfigs.Config, command *ExecCommand, cg string) error { - // Set the v1 parent relative path (i.e. /nomad/) - scope := filepath.Base(cg) +func (l *LibcontainerExecutor) configureCG1(cfg *runc.Config, command *ExecCommand, cgroup string) error { + + cpuShares := command.Resources.LinuxResources.CPUShares + cpusetPath := command.Resources.LinuxResources.CpusetCgroupPath + cpuCores := command.Resources.LinuxResources.CpusetCpus + + // Set the v1 parent relative path (i.e. /nomad/) for the NON-cpuset cgroups + scope := filepath.Base(cgroup) cfg.Cgroups.Path = filepath.Join("/", cgroupslib.NomadCgroupParent, scope) - // set cpu.shares - res := command.Resources.NomadResources - cfg.Cgroups.CpuShares = uint64(res.Cpu.CpuShares) + // set cpu resources + cfg.Cgroups.Resources.CpuShares = uint64(cpuShares) + + // we need to manually set the cpuset, because libcontainer will not set + // it for our special cpuset cgroup + if err := l.cpusetCG1(cpusetPath, cpuCores); err != nil { + return fmt.Errorf("failed to set cpuset: %w", err) + } + + // tell libcontainer to write the pid to our special cpuset cgroup + l.configureCgroupHook(cfg, command) + return nil } -func (l *LibcontainerExecutor) configureCG2(cfg *lconfigs.Config, command *ExecCommand, cg string) error { +func (l *LibcontainerExecutor) cpusetCG1(cpusetCgroupPath, cores string) error { + if cores == "" { + return nil + } + ed := cgroupslib.OpenPath(cpusetCgroupPath) + return ed.Write("cpuset.cpus", cores) +} + +func (l *LibcontainerExecutor) configureCG2(cfg *runc.Config, command *ExecCommand, cg string) error { + cpuShares := command.Resources.LinuxResources.CPUShares + cpuCores := command.Resources.LinuxResources.CpusetCpus + // Set the v2 specific unified path - scope := filepath.Base(cg) - cfg.Cgroups.Path = filepath.Join("/", cgroupslib.NomadCgroupParent, scope) + cfg.Cgroups.Resources.CpusetCpus = cpuCores + partition := cgroupslib.GetPartitionFromCores(cpuCores) - res := command.Resources.NomadResources - cpuShares := res.Cpu.CpuShares // a cgroups v1 concept - cpuWeight := cgroups.ConvertCPUSharesToCgroupV2Value(uint64(cpuShares)) // sets cpu.weight, which the kernel also translates to cpu.weight.nice // despite what the libcontainer docs say, this sets priority not bandwidth + cpuWeight := cgroups.ConvertCPUSharesToCgroupV2Value(uint64(cpuShares)) cfg.Cgroups.Resources.CpuWeight = cpuWeight - // todo: we will also want to set cpu bandwidth (i.e. cpu_hard_limit) + // finally set the path of the cgroup in which to run the task + scope := filepath.Base(cg) + cfg.Cgroups.Path = filepath.Join("/", cgroupslib.NomadCgroupParent, partition, scope) + + // todo(shoenig): we will also want to set cpu bandwidth (i.e. cpu_hard_limit) + // hopefully for 1.7 return nil } -func (l *LibcontainerExecutor) newLibcontainerConfig(command *ExecCommand) (*lconfigs.Config, error) { - cfg := &lconfigs.Config{ - Cgroups: &lconfigs.Cgroup{ - Resources: &lconfigs.Resources{ +func (l *LibcontainerExecutor) newLibcontainerConfig(command *ExecCommand) (*runc.Config, error) { + cfg := &runc.Config{ + Cgroups: &runc.Cgroup{ + Resources: &runc.Resources{ MemorySwappiness: nil, }, }, @@ -785,12 +814,12 @@ var userMountToUnixMount = map[string]int{ } // cmdMounts converts a list of driver.MountConfigs into excutor.Mounts. -func cmdMounts(mounts []*drivers.MountConfig) []*lconfigs.Mount { +func cmdMounts(mounts []*drivers.MountConfig) []*runc.Mount { if len(mounts) == 0 { return nil } - r := make([]*lconfigs.Mount, len(mounts)) + r := make([]*runc.Mount, len(mounts)) for i, m := range mounts { flags := unix.MS_BIND @@ -798,7 +827,7 @@ func cmdMounts(mounts []*drivers.MountConfig) []*lconfigs.Mount { flags |= unix.MS_RDONLY } - r[i] = &lconfigs.Mount{ + r[i] = &runc.Mount{ Source: m.HostPath, Destination: m.TaskPath, Device: "bind", @@ -941,8 +970,8 @@ func filepathIsRegular(path string) error { return nil } -func newSetCPUSetCgroupHook(cgroupPath string) lconfigs.Hook { - return lconfigs.NewFunctionHook(func(state *specs.State) error { +func newSetCPUSetCgroupHook(cgroupPath string) runc.Hook { + return runc.NewFunctionHook(func(state *specs.State) error { return cgroups.WriteCgroupProc(cgroupPath, state.Pid) }) } diff --git a/drivers/shared/executor/executor_linux_test.go b/drivers/shared/executor/executor_linux_test.go index 8c02c1d19ad..be6f49a2529 100644 --- a/drivers/shared/executor/executor_linux_test.go +++ b/drivers/shared/executor/executor_linux_test.go @@ -78,13 +78,18 @@ func testExecutorCommandWithChroot(t *testing.T) *testExecCmd { t.Fatalf("allocDir.NewTaskDir(%q) failed: %v", task.Name, err) } td := allocDir.TaskDirs[task.Name] + cmd := &ExecCommand{ Env: taskEnv.List(), TaskDir: td.Dir, Resources: &drivers.Resources{ NomadResources: alloc.AllocatedResources.Tasks[task.Name], LinuxResources: &drivers.LinuxResources{ - CpusetCgroupPath: cgroupslib.LinuxResourcesPath(alloc.ID, task.Name), + CpusetCgroupPath: cgroupslib.LinuxResourcesPath( + alloc.ID, + task.Name, + alloc.AllocatedResources.UsesCores(), + ), }, }, } diff --git a/drivers/shared/executor/executor_test.go b/drivers/shared/executor/executor_test.go index 36891569083..f49e856e82c 100644 --- a/drivers/shared/executor/executor_test.go +++ b/drivers/shared/executor/executor_test.go @@ -91,13 +91,13 @@ func testExecutorCommand(t *testing.T) *testExecCmd { LinuxResources: &drivers.LinuxResources{ CPUShares: 500, MemoryLimitBytes: 256 * 1024 * 1024, - CpusetCgroupPath: cgroupslib.LinuxResourcesPath(alloc.ID, task.Name), + CpusetCgroupPath: cgroupslib.LinuxResourcesPath(alloc.ID, task.Name, false), }, }, } // create cgroup for our task (because we aren't using task runners) - f := cgroupslib.Factory(alloc.ID, task.Name) + f := cgroupslib.Factory(alloc.ID, task.Name, false) must.NoError(t, f.Setup()) // cleanup cgroup once test is done (because no task runners) diff --git a/drivers/shared/executor/executor_universal_linux.go b/drivers/shared/executor/executor_universal_linux.go index 9fc72bbed12..fcd43d28954 100644 --- a/drivers/shared/executor/executor_universal_linux.go +++ b/drivers/shared/executor/executor_universal_linux.go @@ -107,10 +107,10 @@ func (e *UniversalExecutor) statCG(cgroup string) (int, func(), error) { // configureResourceContainer on Linux configures the cgroups to be used to track // pids created by the executor +// +// pid: pid of the executor (i.e. ourself) func (e *UniversalExecutor) configureResourceContainer(command *ExecCommand, pid int) (func(), error) { - - // get our cgroup reference (cpuset in v1) - cgroup := command.Cgroup() + cgroup := command.StatsCgroup() // cgCleanup will be called after the task has been launched // v1: remove the executor process from the task's cgroups @@ -121,7 +121,7 @@ func (e *UniversalExecutor) configureResourceContainer(command *ExecCommand, pid switch cgroupslib.GetMode() { case cgroupslib.CG1: e.configureCG1(cgroup, command) - cgCleanup = e.enterCG1(cgroup) + cgCleanup = e.enterCG1(cgroup, command.CpusetCgroup()) default: e.configureCG2(cgroup, command) // configure child process to spawn in the cgroup @@ -144,19 +144,26 @@ func (e *UniversalExecutor) configureResourceContainer(command *ExecCommand, pid // created for the task - so that the task and its children will spawn in // those cgroups. The cleanup function moves the executor out of the task's // cgroups and into the nomad/ parent cgroups. -func (e *UniversalExecutor) enterCG1(cgroup string) func() { +func (e *UniversalExecutor) enterCG1(statsCgroup, cpusetCgroup string) func() { pid := strconv.Itoa(unix.Getpid()) - // write pid to all the groups - ifaces := []string{"freezer", "cpu", "memory"} // todo: cpuset + // write pid to all the normal interfaces + ifaces := []string{"freezer", "cpu", "memory"} for _, iface := range ifaces { - ed := cgroupslib.OpenFromCpusetCG1(cgroup, iface) + ed := cgroupslib.OpenFromFreezerCG1(statsCgroup, iface) err := ed.Write("cgroup.procs", pid) if err != nil { e.logger.Warn("failed to write cgroup", "interface", iface, "error", err) } } + // write pid to the cpuset interface, which varies between reserve/share + ed := cgroupslib.OpenPath(cpusetCgroup) + err := ed.Write("cgroup.procs", pid) + if err != nil { + e.logger.Warn("failed to write cpuset cgroup", "error", err) + } + // cleanup func that moves executor back up to nomad cgroup return func() { for _, iface := range ifaces { @@ -169,29 +176,32 @@ func (e *UniversalExecutor) enterCG1(cgroup string) func() { } func (e *UniversalExecutor) configureCG1(cgroup string, command *ExecCommand) { + // write memory limits memHard, memSoft := e.computeMemory(command) - ed := cgroupslib.OpenFromCpusetCG1(cgroup, "memory") + ed := cgroupslib.OpenFromFreezerCG1(cgroup, "memory") _ = ed.Write("memory.limit_in_bytes", strconv.FormatInt(memHard, 10)) if memSoft > 0 { - ed = cgroupslib.OpenFromCpusetCG1(cgroup, "memory") _ = ed.Write("memory.soft_limit_in_bytes", strconv.FormatInt(memSoft, 10)) } - // set memory swappiness + // write memory swappiness swappiness := cgroupslib.MaybeDisableMemorySwappiness() if swappiness != nil { - ed := cgroupslib.OpenFromCpusetCG1(cgroup, "memory") value := int64(*swappiness) _ = ed.Write("memory.swappiness", strconv.FormatInt(value, 10)) } - // write cpu shares file + // write cpu shares cpuShares := strconv.FormatInt(command.Resources.LinuxResources.CPUShares, 10) - ed = cgroupslib.OpenFromCpusetCG1(cgroup, "cpu") + ed = cgroupslib.OpenFromFreezerCG1(cgroup, "cpu") _ = ed.Write("cpu.shares", cpuShares) - // TODO(shoenig) manage cpuset - e.logger.Info("TODO CORES", "cpuset", command.Resources.LinuxResources.CpusetCpus) + // write cpuset, if set + if cpuSet := command.Resources.LinuxResources.CpusetCpus; cpuSet != "" { + cpusetPath := command.Resources.LinuxResources.CpusetCgroupPath + ed = cgroupslib.OpenPath(cpusetPath) + _ = ed.Write("cpuset.cpus", cpuSet) + } } func (e *UniversalExecutor) configureCG2(cgroup string, command *ExecCommand) { @@ -212,13 +222,14 @@ func (e *UniversalExecutor) configureCG2(cgroup string, command *ExecCommand) { _ = ed.Write("memory.swappiness", strconv.FormatInt(value, 10)) } - // write cpu cgroup files + // write cpu weight cgroup file cpuWeight := e.computeCPU(command) ed = cgroupslib.OpenPath(cgroup) _ = ed.Write("cpu.weight", strconv.FormatUint(cpuWeight, 10)) - // TODO(shoenig) manage cpuset - e.logger.Info("TODO CORES", "cpuset", command.Resources.LinuxResources.CpusetCpus) + // write cpuset cgroup file, if set + cpusetCpus := command.Resources.LinuxResources.CpusetCpus + _ = ed.Write("cpuset.cpus", cpusetCpus) } func (*UniversalExecutor) computeCPU(command *ExecCommand) uint64 { diff --git a/drivers/shared/executor/procstats/list_linux.go b/drivers/shared/executor/procstats/list_linux.go index 5570ecd1d3b..ca62529b41e 100644 --- a/drivers/shared/executor/procstats/list_linux.go +++ b/drivers/shared/executor/procstats/list_linux.go @@ -11,19 +11,12 @@ import ( ) type Cgrouper interface { - Cgroup() string + StatsCgroup() string } func List(cg Cgrouper) *set.Set[ProcessID] { - cgroup := cg.Cgroup() - var ed cgroupslib.Interface - switch cgroupslib.GetMode() { - case cgroupslib.CG1: - ed = cgroupslib.OpenFromCpusetCG1(cgroup, "freezer") - default: - ed = cgroupslib.OpenPath(cgroup) - } - + cgroup := cg.StatsCgroup() + ed := cgroupslib.OpenPath(cgroup) s, err := ed.PIDs() if err != nil { return set.New[ProcessID](0) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index b78af764a8d..aa4dbb9eb3e 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -36,6 +36,8 @@ import ( "github.com/hashicorp/go-set" "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" "github.com/hashicorp/nomad/command/agent/host" "github.com/hashicorp/nomad/command/agent/pprof" "github.com/hashicorp/nomad/helper" @@ -3782,6 +3784,17 @@ type AllocatedResources struct { Shared AllocatedSharedResources } +// UsesCores returns true if any of the tasks in the allocation make use +// of reserved cpu cores. +func (a *AllocatedResources) UsesCores() bool { + for _, taskRes := range a.Tasks { + if len(taskRes.Cpu.ReservedCores) > 0 { + return true + } + } + return false +} + func (a *AllocatedResources) Copy() *AllocatedResources { if a == nil { return nil @@ -7597,6 +7610,10 @@ type Task struct { Identities []*WorkloadIdentity } +func (t *Task) UsesCores() bool { + return t.Resources.Cores > 0 +} + // UsesConnect is for conveniently detecting if the Task is able to make use // of Consul Connect features. This will be indicated in the TaskKind of the // Task, which exports known types of Tasks. UsesConnect will be true if the @@ -10652,6 +10669,22 @@ func (a *Allocation) GetCreateIndex() uint64 { return a.CreateIndex } +// ReservedCores returns the union of reserved cores across tasks in this alloc. +func (a *Allocation) ReservedCores() *idset.Set[hw.CoreID] { + s := idset.Empty[hw.CoreID]() + if a == nil || a.AllocatedResources == nil { + return s + } + for _, taskResources := range a.AllocatedResources.Tasks { + if len(taskResources.Cpu.ReservedCores) > 0 { + for _, core := range taskResources.Cpu.ReservedCores { + s.Insert(hw.CoreID(core)) + } + } + } + return s +} + // ConsulNamespace returns the Consul namespace of the task group associated // with this allocation. func (a *Allocation) ConsulNamespace() string { diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index a0636a9cb0c..15f0ce8ace4 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -4591,6 +4591,41 @@ func TestReschedulePolicy_Validate(t *testing.T) { } } +func TestAllocation_ReservedCores(t *testing.T) { + ci.Parallel(t) + + a := &Allocation{ + AllocatedResources: &AllocatedResources{ + Tasks: map[string]*AllocatedTaskResources{ + "nil": { + Cpu: AllocatedCpuResources{ + ReservedCores: nil, + }, + }, + "empty": { + Cpu: AllocatedCpuResources{ + ReservedCores: make([]uint16, 0), + }, + }, + "one": { + Cpu: AllocatedCpuResources{ + ReservedCores: []uint16{7}, + }, + }, + "three": { + Cpu: AllocatedCpuResources{ + ReservedCores: []uint16{1, 3, 4}, + }, + }, + }, + }, + } + + result := a.ReservedCores() + must.Eq(t, "1,3-4,7", result.String()) + +} + func TestAllocation_Index(t *testing.T) { ci.Parallel(t) diff --git a/plugins/drivers/testutils/testing_linux.go b/plugins/drivers/testutils/testing_linux.go index 7ed2808583c..3ab097db74a 100644 --- a/plugins/drivers/testutils/testing_linux.go +++ b/plugins/drivers/testutils/testing_linux.go @@ -14,7 +14,7 @@ import ( // exists, since Nomad client creates them. Why do we write tests that directly // invoke task drivers without any context of the Nomad client? Who knows. func (h *DriverHarness) MakeTaskCgroup(allocID, taskName string) { - f := cgroupslib.Factory(allocID, taskName) + f := cgroupslib.Factory(allocID, taskName, false) must.NoError(h.t, f.Setup()) // ensure child procs are dead and remove the cgroup when the test is done