diff --git a/cmd/webhook/webhook.go b/cmd/webhook/webhook.go index e234ba8..b5c1da7 100644 --- a/cmd/webhook/webhook.go +++ b/cmd/webhook/webhook.go @@ -21,15 +21,21 @@ import ( "k8s.io/apimachinery/pkg/runtime/serializer" ) -var scheme = runtime.NewScheme() -var codecs = serializer.NewCodecFactory(scheme) -var resourceBaseName = "nokia.k8s.io" -var annotationKey = "patched" -var processStarterPath = "/opt/bin/process-starter" +const ( + mixedContainerSafetyMarginRatio = 20 +) + +var ( + scheme = runtime.NewScheme() + codecs = serializer.NewCodecFactory(scheme) + resourceBaseName = "nokia.k8s.io" + annotationKey = "patched" + processStarterPath = "/opt/bin/process-starter" +) type containerPoolRequests struct { sharedCPURequests int - exclusiveCPURequests bool + exclusiveCPURequests int pools map[string]int } type poolRequestMap map[string]containerPoolRequests @@ -68,7 +74,7 @@ func getCPUPoolRequests(pod *corev1.Pod) (poolRequestMap, error) { cPoolRequests.sharedCPURequests += val } if strings.HasPrefix(string(key), resourceBaseName+"/exclusive") { - cPoolRequests.exclusiveCPURequests = true + cPoolRequests.exclusiveCPURequests += val } poolName := strings.TrimPrefix(string(key), resourceBaseName+"/") cPoolRequests.pools[poolName] = val @@ -112,6 +118,22 @@ func validateAnnotation(poolRequests poolRequestMap, cpuAnnotation types.CPUAnno return nil } +func setRequestLimit(requests containerPoolRequests, patchList []patch, contID int, contSpec *corev1.Container) []patch { + totalCFSLimit := requests.sharedCPURequests + 1000*requests.exclusiveCPURequests + if requests.sharedCPURequests > 0 && requests.exclusiveCPURequests > 0 { + //This is the case when both shared, and exclusive pool resources are requested by the same container + //To avoid artificially throttling the exclusive user threads when the shared threads are overstepping their boundaries, + // we add a 20% safety margin to the overall CFS quota governing the usage of the whole cpuset. + //As the exclusive cores utiliziation is capped at 100% of physical capacity, + // this margin is only utilized when shared threads would throttle the exclusive ones. + totalCFSLimit += mixedContainerSafetyMarginRatio * requests.sharedCPURequests / 100 + } + if totalCFSLimit > 0 { + patchList = patchCPULimit(totalCFSLimit, patchList, contID, contSpec) + } + return patchList +} + func patchCPULimit(sharedCPUTime int, patchList []patch, i int, c *corev1.Container) []patch { var patchItem patch @@ -134,9 +156,9 @@ func patchContainerEnv(poolRequests poolRequestMap, envPatched bool, patchList [ var patchItem patch var poolStr string - if poolRequests[c.Name].exclusiveCPURequests && poolRequests[c.Name].sharedCPURequests > 0 { + if poolRequests[c.Name].exclusiveCPURequests > 0 && poolRequests[c.Name].sharedCPURequests > 0 { poolStr = types.ExclusivePoolID + "&" + types.SharedPoolID - } else if poolRequests[c.Name].exclusiveCPURequests { + } else if poolRequests[c.Name].exclusiveCPURequests > 0 { poolStr = types.ExclusivePoolID } else if poolRequests[c.Name].sharedCPURequests > 0 { poolStr = types.SharedPoolID @@ -287,14 +309,8 @@ func mutatePods(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { } // Patch container if needed. - for i, c := range pod.Spec.Containers { - // Set CPU limit if shared CPU were requested and exclusive CPUs were not requested - containerEnvPatched := false - if poolRequests[c.Name].sharedCPURequests > 0 && - !poolRequests[c.Name].exclusiveCPURequests { - patchList = patchCPULimit(poolRequests[c.Name].sharedCPURequests, - patchList, i, &c) - } + for contID, contSpec := range pod.Spec.Containers { + patchList = setRequestLimit(poolRequests[contSpec.Name], patchList, contID, &contSpec) // If pod annotation has entry for this container or // container asks for exclusive cpus, we add patches to enable pinning. // The patches enable process in container to be started with cpu pooler's 'process starter' @@ -303,28 +319,29 @@ func mutatePods(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { // has started, the cpu affinity setting by application will be overwritten by the cpuset. // The process starter will wait for cpusetter to finish it's job for this container // and starts the application process after that. - pinningPatchNeeded := cpuAnnotation.ContainerExists(c.Name) - if poolRequests[c.Name].exclusiveCPURequests { - if len(c.Command) == 0 && !pinningPatchNeeded { - glog.Warningf("Container %s asked exclusive cpus but command not given. CPU affinity settings possibly lost for container", c.Name) + pinningPatchNeeded := cpuAnnotation.ContainerExists(contSpec.Name) + if poolRequests[contSpec.Name].exclusiveCPURequests > 0 { + if len(contSpec.Command) == 0 && !pinningPatchNeeded { + glog.Warningf("Container %s asked exclusive cpus but command not given. CPU affinity settings possibly lost for container", contSpec.Name) } else { pinningPatchNeeded = true } } + containerEnvPatched := false if pinningPatchNeeded { - glog.V(2).Infof("Patch container for pinning %s", c.Name) + glog.V(2).Infof("Patch container for pinning %s", contSpec.Name) - patchList, err = patchContainerForPinning(cpuAnnotation, patchList, i, &c) + patchList, err = patchContainerForPinning(cpuAnnotation, patchList, contID, &contSpec) if err != nil { return toAdmissionResponse(err) } pinningPatchAdded = true containerEnvPatched = true } - if poolRequests[c.Name].sharedCPURequests > 0 || - poolRequests[c.Name].exclusiveCPURequests { + if poolRequests[contSpec.Name].sharedCPURequests > 0 || + poolRequests[contSpec.Name].exclusiveCPURequests > 0 { // Patch container environment variable - patchList, err = patchContainerEnv(poolRequests, containerEnvPatched, patchList, i, &c) + patchList, err = patchContainerEnv(poolRequests, containerEnvPatched, patchList, contID, &contSpec) if err != nil { return toAdmissionResponse(err) }