Skip to content

Commit

Permalink
CPU-Pooler Corrections
Browse files Browse the repository at this point in the history
- Issue nokia#10: First check there is no more than 1 shared pool, and if it's correct, start CPU DP server and register resources
- Issue nokia#14: set container's cpu request to "0m" in case of shared pool
  • Loading branch information
balintTobik committed Jun 7, 2019
1 parent 2ad6c03 commit 80eb904
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 29 deletions.
1 change: 1 addition & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
# non-go = false
# go-tests = true
# unused-packages = true

[[constraint]]
name = "github.com/go-yaml/yaml"
version = "2.2.1"
Expand Down
44 changes: 26 additions & 18 deletions cmd/cpu-device-plugin/cpu-device-plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"path"
"path/filepath"
"strconv"
"strings"
"syscall"
"time"
)
Expand Down Expand Up @@ -91,14 +90,14 @@ func (cdm *cpuDeviceManager) ListAndWatch(e *pluginapi.Empty, stream pluginapi.D
if updateNeeded {
resp := new(pluginapi.ListAndWatchResponse)
if cdm.poolType == "shared" {
nbrOfCPUs := len(strings.Split(cdm.pool.CPUs, ","))
nbrOfCPUs := cdm.pool.CPUs.Size()
for i := 0; i < nbrOfCPUs*1000; i++ {
cpuID := strconv.Itoa(i)
resp.Devices = append(resp.Devices, &pluginapi.Device{cpuID, pluginapi.Healthy})
}
} else {
for _, cpuID := range strings.Split(cdm.pool.CPUs, ",") {
resp.Devices = append(resp.Devices, &pluginapi.Device{cpuID, pluginapi.Healthy})
for _, cpuID := range cdm.pool.CPUs.ToSlice() {
resp.Devices = append(resp.Devices, &pluginapi.Device{strconv.Itoa(cpuID), pluginapi.Healthy})
}
}
if err := stream.Send(resp); err != nil {
Expand Down Expand Up @@ -204,23 +203,32 @@ func createPluginsForPools() error {
glog.Errorf("Pool config : %v", poolConf)
break
}
sharedCPUs = pool.CPUs
sharedCPUs = pool.CPUs.String()
}
cdm := newCPUDeviceManager(poolName, pool, sharedCPUs)
if err := cdm.Start(); err != nil {
glog.Errorf("cpuDeviceManager.Start() failed: %v", err)
break
}
resourceName := resourceBaseName + "/" + poolName
err := cdm.Register(path.Join(pluginapi.DevicePluginPath, "kubelet.sock"), resourceName)
if err != nil {
// Stop server
cdm.grpcServer.Stop()
glog.Error(err)
break
}
cdms = append(cdms, cdm)
glog.Infof("CPU device plugin registered with the Kubelet")
}
if err != nil {
return err
}
for poolName, pool := range poolConf.Pools {
for _, cdm := range cdms{
if pool.CPUs.Equals(cdm.pool.CPUs){
if err := cdm.Start(); err != nil {
glog.Errorf("cpuDeviceManager.Start() failed: %v", err)
break
}
resourceName := resourceBaseName + "/" + poolName
err := cdm.Register(path.Join(pluginapi.DevicePluginPath, "kubelet.sock"), resourceName)
if err != nil {
// Stop server
cdm.grpcServer.Stop()
glog.Error(err)
break
}
glog.Infof("CPU device plugin registered with the Kubelet")
}
}
}
if err != nil {
for _, cdm := range cdms {
Expand Down
1 change: 1 addition & 0 deletions cmd/cpusetter/cpusetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func main() {
stopChannel := make(chan struct{})
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM)
log.Println("CPUSetter's Controller initalized successfully! Warm-up starts now!")
go controller.Run(stopChannel)
// Wait until Controller pushes a signal on the stop channel
select {
Expand Down
11 changes: 11 additions & 0 deletions cmd/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,17 @@ func patchCPULimit(sharedCPUTime int, patchList []patch, i int, c *corev1.Contai
patchItem.Value = json.RawMessage(`{ "limits": { "cpu":` + cpuVal + ` } }`)
}
patchList = append(patchList, patchItem)
patchItem.Op = "add"

cpuVal = `"0m"`
if len(c.Resources.Limits) > 0 {
patchItem.Path = "/spec/containers/" + strconv.Itoa(i) + "/resources/requests/cpu"
patchItem.Value = json.RawMessage(cpuVal)
} else {
patchItem.Path = "/spec/containers/" + strconv.Itoa(i) + "/resources"
patchItem.Value = json.RawMessage(`{ "requests": { "cpu":` + cpuVal + ` } }`)
}
patchList = append(patchList, patchItem)
return patchList

}
Expand Down
23 changes: 18 additions & 5 deletions pkg/sethandler/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (setHandler *SetHandler) podAdded(pod v1.Pod) {

func (setHandler *SetHandler) podChanged(oldPod, newPod v1.Pod) {
//The maze wasn't meant for you either
if shouldPodBeHandled(oldPod) || !shouldPodBeHandled(newPod) {
if !shouldPodBeHandled(newPod) {
return
}
setHandler.adjustContainerSets(newPod)
Expand All @@ -99,6 +99,11 @@ func shouldPodBeHandled(pod v1.Pod) bool {
}

func (setHandler *SetHandler) adjustContainerSets(pod v1.Pod) {
for _, containerStatus := range pod.Status.ContainerStatuses {
if !containerStatus.Ready {
return
}
}
for _, container := range pod.Spec.Containers {
cpuset, err := setHandler.determineCorrectCpuset(pod, container)
if err != nil {
Expand All @@ -118,12 +123,12 @@ func (setHandler *SetHandler) determineCorrectCpuset(pod v1.Pod, container v1.Co
for resourceName := range container.Resources.Requests {
resNameAsString := string(resourceName)
if strings.Contains(resNameAsString, resourceBaseName) && strings.Contains(resNameAsString, types.SharedPoolID) {
return cpuset.Parse(setHandler.poolConfig.SelectPool(resNameAsString).CPUs)
return setHandler.poolConfig.SelectPool(types.SharedPoolID).CPUs, nil
} else if strings.Contains(resNameAsString, resourceBaseName) && strings.Contains(resNameAsString, types.ExclusivePoolID) {
return setHandler.getListOfAllocatedExclusiveCpus(resNameAsString, pod, container)
}
}
return cpuset.Parse(setHandler.poolConfig.SelectPool(resourceBaseName + "/" + types.DefaultPoolID).CPUs)
return setHandler.poolConfig.SelectPool(types.DefaultPoolID).CPUs, nil
}

func (setHandler *SetHandler) getListOfAllocatedExclusiveCpus(exclusivePoolName string, pod v1.Pod, container v1.Container) (cpuset.CPUSet, error) {
Expand Down Expand Up @@ -194,12 +199,13 @@ func determineCid(podStatus v1.PodStatus, containerName string) string {
func (setHandler *SetHandler) applyCpusetToContainer(containerID string, cpuset cpuset.CPUSet) error {
if cpuset.IsEmpty() {
//Nothing to set. We will leave the container running on the Kubernetes provisioned default cpuset
log.Println("WARNING: for some reason cpuset to set was quite empty for container:" + containerID + ".I left it untouched.")
return nil
}
//According to K8s documentation CID is stored in "docker://<container_id>" format
trimmedCid := strings.TrimPrefix(containerID, "docker://")
var pathToContainerCpusetFile string
err := filepath.Walk(setHandler.cpusetRoot, func(path string, f os.FileInfo, err error) error {
filepath.Walk(setHandler.cpusetRoot, func(path string, f os.FileInfo, err error) error {
if strings.HasSuffix(path, trimmedCid) {
pathToContainerCpusetFile = path
}
Expand All @@ -209,7 +215,14 @@ func (setHandler *SetHandler) applyCpusetToContainer(containerID string, cpuset
return errors.New("cpuset file does not exist for container:" + trimmedCid + " under the provided cgroupfs hierarchy:" + setHandler.cpusetRoot)
}
//And for our grand finale, we just "echo" the calculated cpuset to the cpuset cgroupfs "file" of the given container
file, err := os.OpenFile(pathToContainerCpusetFile, os.O_WRONLY|os.O_SYNC, 0755)
//Find child cpuset if it exists (kube-proxy)
filepath.Walk(pathToContainerCpusetFile, func(path string, f os.FileInfo, err error) error {
if f.IsDir() {
pathToContainerCpusetFile = path
}
return nil
})
file, err := os.OpenFile(pathToContainerCpusetFile + "/cpuset.cpus", os.O_WRONLY|os.O_SYNC, 0755)
if err != nil {
return errors.New("Can't open cpuset file:" + pathToContainerCpusetFile + " for container:" + trimmedCid + " because:" + err.Error())
}
Expand Down
29 changes: 23 additions & 6 deletions pkg/types/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/nokia/CPU-Pooler/pkg/k8sclient"
"io/ioutil"
"path/filepath"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
)

const (
Expand All @@ -25,13 +26,13 @@ var (
)
// Pool defines cpupool
type Pool struct {
CPUs string `yaml:"cpus"`
CPUs cpuset.CPUSet
}

// PoolConfig defines pool configuration for a node
type PoolConfig struct {
Pools map[string]Pool `yaml:"pools"`
NodeSelector map[string]string `yaml:"nodeSelector"`
Pools map[string]Pool
NodeSelector map[string]string
}

//DeterminePoolType takes the name of CPU pool as defined in the CPU-Pooler ConfigMap, and returns the type of CPU pool it represents.
Expand All @@ -41,7 +42,7 @@ func DeterminePoolType(poolName string) string {
if strings.HasPrefix(poolName, SharedPoolID) {
return SharedPoolID
} else if strings.HasPrefix(poolName, ExclusivePoolID) {
return ExclusivePoolID
return ExclusivePoolID
}
return DefaultPoolID
}
Expand Down Expand Up @@ -87,14 +88,30 @@ func readPoolConfig(labelMap map[string]string) (PoolConfig, string, error) {
// ReadPoolConfigFile reads a pool configuration file
func ReadPoolConfigFile(name string) (PoolConfig, error) {
var pools PoolConfig
var parsePools struct {
Pools map[string]struct{
CPUStr string `yaml:"cpus"`
} `yaml:"pools"`
NodeSelector map[string]string `yaml:"nodeSelector"`
}
file, err := ioutil.ReadFile(name)
if err != nil {
return PoolConfig{}, errors.New("Could not read poolconfig file named: " + name + " because:" + err.Error())
}
err = yaml.Unmarshal([]byte(file), &pools)
err = yaml.Unmarshal([]byte(file), &parsePools)
if err != nil {
return PoolConfig{}, errors.New("Poolconfig file could not be parsed because:" + err.Error())
}
pools.NodeSelector = parsePools.NodeSelector
pools.Pools = map[string]Pool{}
for pool := range parsePools.Pools {
temp := pools.Pools[pool]
temp.CPUs, err = cpuset.Parse(parsePools.Pools[pool].CPUStr)
if err != nil {
return PoolConfig{}, errors.New("CPUs could not be parsed because:" + err.Error())
}
pools.Pools[pool] = temp
}
return pools, err
}

Expand All @@ -107,4 +124,4 @@ func (poolConf PoolConfig) SelectPool(prefix string) Pool {
}
}
return Pool{}
}
}

0 comments on commit 80eb904

Please sign in to comment.