Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: refactor cpuset partitioning #18371

Merged
merged 2 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/hashicorp/nomad/client/devicemanager"
"github.com/hashicorp/nomad/client/dynamicplugins"
cinterfaces "github.com/hashicorp/nomad/client/interfaces"
"github.com/hashicorp/nomad/client/lib/idset"
"github.com/hashicorp/nomad/client/lib/numalib/hw"
"github.com/hashicorp/nomad/client/lib/proclib"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
Expand Down Expand Up @@ -201,6 +203,9 @@ type allocRunner struct {
// wranglers is an interface for managing unix/windows processes.
wranglers cinterfaces.ProcessWranglers

// partitions is an interface for managing cpuset partitions
partitions cinterfaces.CPUPartitions

// widmgr fetches workload identities
widmgr *widmgr.WIDMgr
}
Expand Down Expand Up @@ -244,6 +249,7 @@ func NewAllocRunner(config *config.AllocRunnerConfig) (interfaces.AllocRunner, e
checkStore: config.CheckStore,
getter: config.Getter,
wranglers: config.Wranglers,
partitions: config.Partitions,
hookResources: cstructs.NewAllocHookResources(),
widmgr: config.WIDMgr,
}
Expand Down Expand Up @@ -455,13 +461,25 @@ func (ar *allocRunner) Restore() error {

// restore process wrangler for task
ar.wranglers.Setup(proclib.Task{AllocID: tr.Alloc().ID, Task: tr.Task().Name})

// restore cpuset partition state
ar.restoreCores(tr.Alloc().AllocatedResources)
}

ar.taskCoordinator.Restore(states)

return nil
}

// restoreCores will restore the cpuset partitions with the reserved core
// data for each task in the alloc
func (ar *allocRunner) restoreCores(res *structs.AllocatedResources) {
for _, taskRes := range res.Tasks {
s := idset.From[hw.CoreID](taskRes.Cpu.ReservedCores)
ar.partitions.Restore(s)
}
}

// persistDeploymentStatus stores AllocDeploymentStatus.
func (ar *allocRunner) persistDeploymentStatus(ds *structs.AllocDeploymentStatus) {
if err := ar.stateDB.PutDeploymentStatus(ar.id, ds); err != nil {
Expand Down
1 change: 1 addition & 0 deletions client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
newAllocDirHook(hookLogger, ar.allocDir),
newUpstreamAllocsHook(hookLogger, ar.prevAllocWatcher),
newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir),
newCPUPartsHook(hookLogger, ar.partitions, alloc),
newAllocHealthWatcherHook(hookLogger, alloc, newEnvBuilder, hs, ar.Listener(), ar.consulClient, ar.checkStore),
newNetworkHook(hookLogger, ns, alloc, nm, nc, ar, builtTaskEnv),
newGroupServiceHook(groupServiceHookConfig{
Expand Down
55 changes: 55 additions & 0 deletions client/allocrunner/cpuparts_hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) HashiCorp, Inc.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love how clean the hook code is here because you've isolated the cpuset management really well.

// SPDX-License-Identifier: BUSL-1.1

package allocrunner

import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/lib/cgroupslib"
"github.com/hashicorp/nomad/client/lib/idset"
"github.com/hashicorp/nomad/client/lib/numalib/hw"
"github.com/hashicorp/nomad/nomad/structs"
)

const (
cpuPartsHookName = "cpuparts_hook"
)

// cpuPartsHooks is responsible for managing cpuset partitioning on Linux
// nodes. This mechanism works by segregating tasks that make use of "cpu" vs.
// "cores" resources. Tasks that make use of "cpu" resource actually make use
// of shared cores that have not been reserved. The scheduler ensures enough
// cores on a node are not reserved such that all tasks have the minimum amount
// of cpu bandwidth they requested.
type cpuPartsHook struct {
logger hclog.Logger
allocID string

reservations *idset.Set[hw.CoreID]
partitions cgroupslib.Partition
}

func newCPUPartsHook(
logger hclog.Logger,
partitions cgroupslib.Partition,
alloc *structs.Allocation,
) *cpuPartsHook {
return &cpuPartsHook{
logger: logger.Named(cpuPartsHookName),
allocID: alloc.ID,
partitions: partitions,
reservations: alloc.ReservedCores(),
}
}

func (h *cpuPartsHook) Name() string {
return cpuPartsHookName
}

func (h *cpuPartsHook) Prerun() error {
return h.partitions.Reserve(h.reservations)
}

func (h *cpuPartsHook) Postrun() error {
return h.partitions.Release(h.reservations)
}
3 changes: 2 additions & 1 deletion client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,8 @@ func (tr *TaskRunner) shouldRestart() (bool, time.Duration) {
}

func (tr *TaskRunner) assignCgroup(taskConfig *drivers.TaskConfig) {
p := cgroupslib.LinuxResourcesPath(taskConfig.AllocID, taskConfig.Name)
reserveCores := len(tr.taskResources.Cpu.ReservedCores) > 0
p := cgroupslib.LinuxResourcesPath(taskConfig.AllocID, taskConfig.Name, reserveCores)
taskConfig.Resources.LinuxResources.CpusetCgroupPath = p
}

Expand Down
2 changes: 1 addition & 1 deletion client/allocrunner/taskrunner/task_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (tr *TaskRunner) initHooks() {
newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, hookLogger),
newDeviceHook(tr.devicemanager, hookLogger),
newAPIHook(tr.shutdownCtx, tr.clientConfig.APIListenerRegistrar, hookLogger),
newWranglerHook(tr.wranglers, task.Name, alloc.ID, hookLogger),
newWranglerHook(tr.wranglers, task.Name, alloc.ID, task.UsesCores(), hookLogger),
}

// If the task has a CSI block, add the hook.
Expand Down
2 changes: 1 addition & 1 deletion client/allocrunner/taskrunner/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri
ShutdownDelayCancelFn: shutdownDelayCancelFn,
ServiceRegWrapper: wrapperMock,
Getter: getter.TestSandbox(t),
Wranglers: proclib.New(&proclib.Configs{Logger: testlog.HCLogger(t)}),
Wranglers: proclib.MockWranglers(t),
WIDMgr: NewMockWIDMgr(nil),
}

Expand Down
8 changes: 7 additions & 1 deletion client/allocrunner/taskrunner/wrangler_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,19 @@ type wranglerHook struct {
log hclog.Logger
}

func newWranglerHook(wranglers cifs.ProcessWranglers, task, allocID string, log hclog.Logger) *wranglerHook {
func newWranglerHook(
wranglers cifs.ProcessWranglers,
task, allocID string,
cores bool,
log hclog.Logger,
) *wranglerHook {
return &wranglerHook{
log: log.Named(wranglerHookName),
wranglers: wranglers,
task: proclib.Task{
AllocID: allocID,
Task: task,
Cores: cores,
},
}
}
Expand Down
5 changes: 3 additions & 2 deletions client/allocrunner/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import (
clientconfig "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/devicemanager"
"github.com/hashicorp/nomad/client/lib/cgroupslib"
"github.com/hashicorp/nomad/client/lib/proclib"
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
"github.com/hashicorp/nomad/client/serviceregistration/checks/checkstore"
"github.com/hashicorp/nomad/client/serviceregistration/mock"
"github.com/hashicorp/nomad/client/serviceregistration/wrapper"
"github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -96,7 +96,8 @@ func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*config.All
ServiceRegWrapper: wrapper.NewHandlerWrapper(clientConf.Logger, consulRegMock, nomadRegMock),
CheckStore: checkstore.NewStore(clientConf.Logger, stateDB),
Getter: getter.TestSandbox(t),
Wranglers: proclib.New(&proclib.Configs{Logger: testlog.HCLogger(t)}),
Wranglers: proclib.MockWranglers(t),
Partitions: cgroupslib.NoopPartition(),
}

return conf, cleanup
Expand Down
13 changes: 12 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/hoststats"
cinterfaces "github.com/hashicorp/nomad/client/interfaces"
"github.com/hashicorp/nomad/client/lib/cgroupslib"
"github.com/hashicorp/nomad/client/lib/numalib"
"github.com/hashicorp/nomad/client/lib/proclib"
"github.com/hashicorp/nomad/client/pluginmanager"
Expand Down Expand Up @@ -331,6 +332,9 @@ type Client struct {
// fingerprinting
topology *numalib.Topology

// partitions is used for managing cpuset partitioning on linux systems
partitions cgroupslib.Partition

// widmgr retrieves workload identities
widmgr *widmgr.WIDMgr
}
Expand Down Expand Up @@ -465,9 +469,15 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
c.topology = numalib.NoImpl(ir.Topology)
}

// Create the cpu core partition manager
c.partitions = cgroupslib.GetPartition(
c.topology.UsableCores(),
)

// Create the process wranglers
wranglers := proclib.New(&proclib.Configs{
Logger: c.logger.Named("proclib"),
UsableCores: c.topology.UsableCores(),
Logger: c.logger.Named("proclib"),
})
c.wranglers = wranglers

Expand Down Expand Up @@ -2754,6 +2764,7 @@ func (c *Client) newAllocRunnerConfig(
Vault: c.vaultClient,
WIDMgr: c.widmgr,
Wranglers: c.wranglers,
Partitions: c.partitions,
}
}

Expand Down
3 changes: 3 additions & 0 deletions client/config/arconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ type AllocRunnerConfig struct {
// Wranglers is an interface for managing unix/windows processes.
Wranglers interfaces.ProcessWranglers

// Partitions is an interface for managing cpuset partitions.
Partitions interfaces.CPUPartitions

// WIDMgr fetches workload identities
WIDMgr *widmgr.WIDMgr
}
Expand Down
4 changes: 2 additions & 2 deletions client/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/hashicorp/consul-template/config"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/lib/numalib"
"github.com/hashicorp/nomad/client/lib/numalib/hw"
"github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/command/agent/host"
"github.com/hashicorp/nomad/helper"
Expand Down Expand Up @@ -309,7 +309,7 @@ type Config struct {
CgroupParent string

// ReservableCores if set overrides the set of reservable cores reported in fingerprinting.
ReservableCores []numalib.CoreID
ReservableCores []hw.CoreID

// NomadServiceDiscovery determines whether the Nomad native service
// discovery client functionality is enabled.
Expand Down
11 changes: 6 additions & 5 deletions client/fingerprint/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/lib/idset"
"github.com/hashicorp/nomad/client/lib/numalib"
"github.com/hashicorp/nomad/client/lib/numalib/hw"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/klauspost/cpuid/v2"
Expand Down Expand Up @@ -77,14 +78,14 @@ func (*CPUFingerprint) reservedCompute(request *FingerprintRequest) structs.Node

func (f *CPUFingerprint) initialize(request *FingerprintRequest) {
var (
reservableCores *idset.Set[numalib.CoreID]
reservableCores *idset.Set[hw.CoreID]
totalCompute = request.Config.CpuCompute
reservedCompute = f.reservedCompute(request)
reservedCores = idset.From[numalib.CoreID](reservedCompute.ReservedCpuCores)
reservedCores = idset.From[hw.CoreID](reservedCompute.ReservedCpuCores)
)

if rc := request.Config.ReservableCores; rc != nil {
reservableCores = idset.From[numalib.CoreID](rc)
reservableCores = idset.From[hw.CoreID](rc)
}

f.top = numalib.Scan(append(
Expand Down Expand Up @@ -157,7 +158,7 @@ func (f *CPUFingerprint) setReservableCores(response *FingerprintResponse) {
usable := f.top.UsableCores()
response.AddAttribute("cpu.reservablecores", f.cores(usable.Size()))
f.nodeResources.Cpu.ReservableCpuCores = helper.ConvertSlice(
usable.Slice(), func(id numalib.CoreID) uint16 {
usable.Slice(), func(id hw.CoreID) uint16 {
return uint16(id)
})
default:
Expand Down Expand Up @@ -189,7 +190,7 @@ func (f *CPUFingerprint) setNUMA(response *FingerprintResponse) {
nodes := f.top.Nodes()
response.AddAttribute("numa.node.count", f.nodes(nodes.Size()))

nodes.ForEach(func(id numalib.NodeID) error {
nodes.ForEach(func(id hw.NodeID) error {
key := fmt.Sprintf("numa.node%d.cores", id)
cores := f.top.NodeCores(id)
response.AddAttribute(key, cores.String())
Expand Down
4 changes: 2 additions & 2 deletions client/fingerprint/cpu_default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/lib/numalib"
"github.com/hashicorp/nomad/client/lib/numalib/hw"
"github.com/hashicorp/nomad/client/testutil"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -63,7 +63,7 @@ func TestCPUFingerprint_OverrideCompute(t *testing.T) {
Attributes: make(map[string]string),
}
cfg := &config.Config{
ReservableCores: []numalib.CoreID{0, 1, 2},
ReservableCores: []hw.CoreID{0, 1, 2},
}
var originalCPU int

Expand Down
9 changes: 9 additions & 0 deletions client/interfaces/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package interfaces

import (
"github.com/hashicorp/nomad/client/lib/idset"
"github.com/hashicorp/nomad/client/lib/numalib/hw"
"github.com/hashicorp/nomad/client/lib/proclib"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/device"
Expand Down Expand Up @@ -47,3 +49,10 @@ type ProcessWranglers interface {
Setup(proclib.Task) error
Destroy(proclib.Task) error
}

// CPUPartitions is an interface satisfied by the cgroupslib package.
type CPUPartitions interface {
Restore(*idset.Set[hw.CoreID])
Reserve(*idset.Set[hw.CoreID]) error
Release(*idset.Set[hw.CoreID]) error
}
2 changes: 1 addition & 1 deletion client/lib/cgroupslib/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package cgroupslib

// LinuxResourcesPath does nothing on non-Linux systems
func LinuxResourcesPath(string, string) string {
func LinuxResourcesPath(string, string, bool) string {
return ""
}

Expand Down
Loading