Skip to content

Commit

Permalink
client: refactor cpuset partitioning
Browse files Browse the repository at this point in the history
This PR updates the way Nomad client manages the split between tasks
that make use of resources.cpus vs. resources.cores.

Previously, each task was explicitly assigned which CPU cores they were
able to run on. Every time a task was started or destroyed, all other
tasks' cpusets would need to be updated. This was inefficient and would
crush the Linux kernel when a client would try to run ~400 or so tasks.

Now, we make use of cgroup heirarchy and cpuset inheritence to efficiently
manage cpusets.
  • Loading branch information
shoenig committed Sep 11, 2023
1 parent 668dc5f commit 3a488fc
Show file tree
Hide file tree
Showing 58 changed files with 1,270 additions and 228 deletions.
18 changes: 18 additions & 0 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/hashicorp/nomad/client/devicemanager"
"github.com/hashicorp/nomad/client/dynamicplugins"
cinterfaces "github.com/hashicorp/nomad/client/interfaces"
"github.com/hashicorp/nomad/client/lib/idset"
"github.com/hashicorp/nomad/client/lib/numalib/hw"
"github.com/hashicorp/nomad/client/lib/proclib"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
Expand Down Expand Up @@ -201,6 +203,9 @@ type allocRunner struct {
// wranglers is an interface for managing unix/windows processes.
wranglers cinterfaces.ProcessWranglers

// partitions is an interface for managing cpuset partitions
partitions cinterfaces.CPUPartitions

// widmgr fetches workload identities
widmgr *widmgr.WIDMgr
}
Expand Down Expand Up @@ -244,6 +249,7 @@ func NewAllocRunner(config *config.AllocRunnerConfig) (interfaces.AllocRunner, e
checkStore: config.CheckStore,
getter: config.Getter,
wranglers: config.Wranglers,
partitions: config.Partitions,
hookResources: cstructs.NewAllocHookResources(),
widmgr: config.WIDMgr,
}
Expand Down Expand Up @@ -455,13 +461,25 @@ func (ar *allocRunner) Restore() error {

// restore process wrangler for task
ar.wranglers.Setup(proclib.Task{AllocID: tr.Alloc().ID, Task: tr.Task().Name})

// restore cpuset partition state
ar.restoreCores(tr.Alloc().AllocatedResources)
}

ar.taskCoordinator.Restore(states)

return nil
}

// restoreCores will restore the cpuset partitions with the reserved core
// data for each task in the alloc
func (ar *allocRunner) restoreCores(res *structs.AllocatedResources) {
for _, taskRes := range res.Tasks {
s := idset.From[hw.CoreID](taskRes.Cpu.ReservedCores)
ar.partitions.Restore(s)
}
}

// persistDeploymentStatus stores AllocDeploymentStatus.
func (ar *allocRunner) persistDeploymentStatus(ds *structs.AllocDeploymentStatus) {
if err := ar.stateDB.PutDeploymentStatus(ar.id, ds); err != nil {
Expand Down
1 change: 1 addition & 0 deletions client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
newAllocDirHook(hookLogger, ar.allocDir),
newUpstreamAllocsHook(hookLogger, ar.prevAllocWatcher),
newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir),
newCPUPartsHook(hookLogger, ar.partitions, alloc),
newAllocHealthWatcherHook(hookLogger, alloc, newEnvBuilder, hs, ar.Listener(), ar.consulClient, ar.checkStore),
newNetworkHook(hookLogger, ns, alloc, nm, nc, ar, builtTaskEnv),
newGroupServiceHook(groupServiceHookConfig{
Expand Down
56 changes: 56 additions & 0 deletions client/allocrunner/cpuparts_hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package allocrunner

import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/lib/cgroupslib"
"github.com/hashicorp/nomad/client/lib/idset"
"github.com/hashicorp/nomad/client/lib/numalib/hw"
"github.com/hashicorp/nomad/nomad/structs"
)

const (
cpuPartsHookName = "cpuparts_hook"
)

// cpuPartsHooks is responsible for managing cpuset partitioning on Linux
// nodes. This mechanism works by segregating tasks that make use of "cpu" vs.
// "cores" resources. Tasks that make use of "cpu" resource actually make use
// of shared cores that have not been reserved. The scheduler ensures enough
// cores on a node are not reserved such that all tasks have the minimum amount
// of cpu bandwidth they requested.
type cpuPartsHook struct {
logger hclog.Logger
allocID string

reservations *idset.Set[hw.CoreID]
partitions cgroupslib.Partition
}

func newCPUPartsHook(
logger hclog.Logger,
partitions cgroupslib.Partition,
alloc *structs.Allocation,
) *cpuPartsHook {

return &cpuPartsHook{
logger: logger,
allocID: alloc.ID,
partitions: partitions,
reservations: alloc.ReservedCores(),
}
}

func (h *cpuPartsHook) Name() string {
return cpuPartsHookName
}

func (h *cpuPartsHook) Prerun() error {
return h.partitions.Reserve(h.reservations)
}

func (h *cpuPartsHook) Postrun() error {
return h.partitions.Release(h.reservations)
}
3 changes: 2 additions & 1 deletion client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,8 @@ func (tr *TaskRunner) shouldRestart() (bool, time.Duration) {
}

func (tr *TaskRunner) assignCgroup(taskConfig *drivers.TaskConfig) {
p := cgroupslib.LinuxResourcesPath(taskConfig.AllocID, taskConfig.Name)
reserveCores := len(tr.taskResources.Cpu.ReservedCores) > 0
p := cgroupslib.LinuxResourcesPath(taskConfig.AllocID, taskConfig.Name, reserveCores)
taskConfig.Resources.LinuxResources.CpusetCgroupPath = p
}

Expand Down
2 changes: 1 addition & 1 deletion client/allocrunner/taskrunner/task_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (tr *TaskRunner) initHooks() {
newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, hookLogger),
newDeviceHook(tr.devicemanager, hookLogger),
newAPIHook(tr.shutdownCtx, tr.clientConfig.APIListenerRegistrar, hookLogger),
newWranglerHook(tr.wranglers, task.Name, alloc.ID, hookLogger),
newWranglerHook(tr.wranglers, task.Name, alloc.ID, task.UsesCores(), hookLogger),
}

// If the task has a CSI block, add the hook.
Expand Down
2 changes: 1 addition & 1 deletion client/allocrunner/taskrunner/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri
ShutdownDelayCancelFn: shutdownDelayCancelFn,
ServiceRegWrapper: wrapperMock,
Getter: getter.TestSandbox(t),
Wranglers: proclib.New(&proclib.Configs{Logger: testlog.HCLogger(t)}),
Wranglers: proclib.MockWranglers(t),
WIDMgr: NewMockWIDMgr(nil),
}

Expand Down
8 changes: 7 additions & 1 deletion client/allocrunner/taskrunner/wrangler_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,19 @@ type wranglerHook struct {
log hclog.Logger
}

func newWranglerHook(wranglers cifs.ProcessWranglers, task, allocID string, log hclog.Logger) *wranglerHook {
func newWranglerHook(
wranglers cifs.ProcessWranglers,
task, allocID string,
cores bool,
log hclog.Logger,
) *wranglerHook {
return &wranglerHook{
log: log.Named(wranglerHookName),
wranglers: wranglers,
task: proclib.Task{
AllocID: allocID,
Task: task,
Cores: cores,
},
}
}
Expand Down
5 changes: 3 additions & 2 deletions client/allocrunner/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import (
clientconfig "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/devicemanager"
"github.com/hashicorp/nomad/client/lib/cgroupslib"
"github.com/hashicorp/nomad/client/lib/proclib"
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
"github.com/hashicorp/nomad/client/serviceregistration/checks/checkstore"
"github.com/hashicorp/nomad/client/serviceregistration/mock"
"github.com/hashicorp/nomad/client/serviceregistration/wrapper"
"github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -96,7 +96,8 @@ func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*config.All
ServiceRegWrapper: wrapper.NewHandlerWrapper(clientConf.Logger, consulRegMock, nomadRegMock),
CheckStore: checkstore.NewStore(clientConf.Logger, stateDB),
Getter: getter.TestSandbox(t),
Wranglers: proclib.New(&proclib.Configs{Logger: testlog.HCLogger(t)}),
Wranglers: proclib.MockWranglers(t),
Partitions: cgroupslib.MockPartition(),
}

return conf, cleanup
Expand Down
13 changes: 12 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/hoststats"
cinterfaces "github.com/hashicorp/nomad/client/interfaces"
"github.com/hashicorp/nomad/client/lib/cgroupslib"
"github.com/hashicorp/nomad/client/lib/numalib"
"github.com/hashicorp/nomad/client/lib/proclib"
"github.com/hashicorp/nomad/client/pluginmanager"
Expand Down Expand Up @@ -331,6 +332,9 @@ type Client struct {
// fingerprinting
topology *numalib.Topology

// partitions is used for managing cpuset partitioning on linux systems
partitions cgroupslib.Partition

// widmgr retrieves workload identities
widmgr *widmgr.WIDMgr
}
Expand Down Expand Up @@ -465,9 +469,15 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
c.topology = numalib.NoImpl(ir.Topology)
}

// Create the cpu core partition manager
c.partitions = cgroupslib.GetPartition(
c.topology.UsableCores(),
)

// Create the process wranglers
wranglers := proclib.New(&proclib.Configs{
Logger: c.logger.Named("proclib"),
UsableCores: c.topology.UsableCores(),
Logger: c.logger.Named("proclib"),
})
c.wranglers = wranglers

Expand Down Expand Up @@ -2754,6 +2764,7 @@ func (c *Client) newAllocRunnerConfig(
Vault: c.vaultClient,
WIDMgr: c.widmgr,
Wranglers: c.wranglers,
Partitions: c.partitions,
}
}

Expand Down
3 changes: 3 additions & 0 deletions client/config/arconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ type AllocRunnerConfig struct {
// Wranglers is an interface for managing unix/windows processes.
Wranglers interfaces.ProcessWranglers

// Partitions is an interface for managing cpuset partitions.
Partitions interfaces.CPUPartitions

// WIDMgr fetches workload identities
WIDMgr *widmgr.WIDMgr
}
Expand Down
4 changes: 2 additions & 2 deletions client/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/hashicorp/consul-template/config"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/lib/numalib"
"github.com/hashicorp/nomad/client/lib/numalib/hw"
"github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/command/agent/host"
"github.com/hashicorp/nomad/helper"
Expand Down Expand Up @@ -309,7 +309,7 @@ type Config struct {
CgroupParent string

// ReservableCores if set overrides the set of reservable cores reported in fingerprinting.
ReservableCores []numalib.CoreID
ReservableCores []hw.CoreID

// NomadServiceDiscovery determines whether the Nomad native service
// discovery client functionality is enabled.
Expand Down
11 changes: 6 additions & 5 deletions client/fingerprint/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/lib/idset"
"github.com/hashicorp/nomad/client/lib/numalib"
"github.com/hashicorp/nomad/client/lib/numalib/hw"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/klauspost/cpuid/v2"
Expand Down Expand Up @@ -77,14 +78,14 @@ func (*CPUFingerprint) reservedCompute(request *FingerprintRequest) structs.Node

func (f *CPUFingerprint) initialize(request *FingerprintRequest) {
var (
reservableCores *idset.Set[numalib.CoreID]
reservableCores *idset.Set[hw.CoreID]
totalCompute = request.Config.CpuCompute
reservedCompute = f.reservedCompute(request)
reservedCores = idset.From[numalib.CoreID](reservedCompute.ReservedCpuCores)
reservedCores = idset.From[hw.CoreID](reservedCompute.ReservedCpuCores)
)

if rc := request.Config.ReservableCores; rc != nil {
reservableCores = idset.From[numalib.CoreID](rc)
reservableCores = idset.From[hw.CoreID](rc)
}

f.top = numalib.Scan(append(
Expand Down Expand Up @@ -157,7 +158,7 @@ func (f *CPUFingerprint) setReservableCores(response *FingerprintResponse) {
usable := f.top.UsableCores()
response.AddAttribute("cpu.reservablecores", f.cores(usable.Size()))
f.nodeResources.Cpu.ReservableCpuCores = helper.ConvertSlice(
usable.Slice(), func(id numalib.CoreID) uint16 {
usable.Slice(), func(id hw.CoreID) uint16 {
return uint16(id)
})
default:
Expand Down Expand Up @@ -189,7 +190,7 @@ func (f *CPUFingerprint) setNUMA(response *FingerprintResponse) {
nodes := f.top.Nodes()
response.AddAttribute("numa.node.count", f.nodes(nodes.Size()))

nodes.ForEach(func(id numalib.NodeID) error {
nodes.ForEach(func(id hw.NodeID) error {
key := fmt.Sprintf("numa.node%d.cores", id)
cores := f.top.NodeCores(id)
response.AddAttribute(key, cores.String())
Expand Down
4 changes: 2 additions & 2 deletions client/fingerprint/cpu_default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/lib/numalib"
"github.com/hashicorp/nomad/client/lib/numalib/hw"
"github.com/hashicorp/nomad/client/testutil"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -63,7 +63,7 @@ func TestCPUFingerprint_OverrideCompute(t *testing.T) {
Attributes: make(map[string]string),
}
cfg := &config.Config{
ReservableCores: []numalib.CoreID{0, 1, 2},
ReservableCores: []hw.CoreID{0, 1, 2},
}
var originalCPU int

Expand Down
9 changes: 9 additions & 0 deletions client/interfaces/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package interfaces

import (
"github.com/hashicorp/nomad/client/lib/idset"
"github.com/hashicorp/nomad/client/lib/numalib/hw"
"github.com/hashicorp/nomad/client/lib/proclib"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/device"
Expand Down Expand Up @@ -47,3 +49,10 @@ type ProcessWranglers interface {
Setup(proclib.Task) error
Destroy(proclib.Task) error
}

// CPUPartitions is an interface satisfied by the cgroupslib package.
type CPUPartitions interface {
Restore(*idset.Set[hw.CoreID])
Reserve(*idset.Set[hw.CoreID]) error
Release(*idset.Set[hw.CoreID]) error
}
2 changes: 1 addition & 1 deletion client/lib/cgroupslib/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package cgroupslib

// LinuxResourcesPath does nothing on non-Linux systems
func LinuxResourcesPath(string, string) string {
func LinuxResourcesPath(string, string, bool) string {
return ""
}

Expand Down
Loading

0 comments on commit 3a488fc

Please sign in to comment.