Skip to content

Commit

Permalink
Adding auto-provisioned CFS quotas to all non-default containers
Browse files Browse the repository at this point in the history
This commit solves Issue #25.
When a container is using shared pool resources, the CFS quota is set to its limit value
With exclusive users it is set to the total amount of all exclusive cores * 1000
When both are requested the overall quota is set to exclusive*1000 + 1.2*shared
In this hybrid scenario we leave a 20% safety margin on top of the originally requested shared resoruces,
  to avoid accidentally throttling the higher prio exclusive thread when the lower prio shared threads are overloaded.
  • Loading branch information
Levovar committed Jan 7, 2021
1 parent 0adced7 commit c129ba0
Showing 1 changed file with 43 additions and 26 deletions.
69 changes: 43 additions & 26 deletions cmd/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,21 @@ import (
"k8s.io/apimachinery/pkg/runtime/serializer"
)

var scheme = runtime.NewScheme()
var codecs = serializer.NewCodecFactory(scheme)
var resourceBaseName = "nokia.k8s.io"
var annotationKey = "patched"
var processStarterPath = "/opt/bin/process-starter"
const (
mixedContainerSafetyMarginRatio = 20
)

var (
scheme = runtime.NewScheme()
codecs = serializer.NewCodecFactory(scheme)
resourceBaseName = "nokia.k8s.io"
annotationKey = "patched"
processStarterPath = "/opt/bin/process-starter"
)

type containerPoolRequests struct {
sharedCPURequests int
exclusiveCPURequests bool
exclusiveCPURequests int
pools map[string]int
}
type poolRequestMap map[string]containerPoolRequests
Expand Down Expand Up @@ -68,7 +74,7 @@ func getCPUPoolRequests(pod *corev1.Pod) (poolRequestMap, error) {
cPoolRequests.sharedCPURequests += val
}
if strings.HasPrefix(string(key), resourceBaseName+"/exclusive") {
cPoolRequests.exclusiveCPURequests = true
cPoolRequests.exclusiveCPURequests += val
}
poolName := strings.TrimPrefix(string(key), resourceBaseName+"/")
cPoolRequests.pools[poolName] = val
Expand Down Expand Up @@ -112,6 +118,22 @@ func validateAnnotation(poolRequests poolRequestMap, cpuAnnotation types.CPUAnno
return nil
}

func setRequestLimit(requests containerPoolRequests, patchList []patch, contID int, contSpec *corev1.Container) []patch {
totalCFSLimit := requests.sharedCPURequests + 1000*requests.exclusiveCPURequests
if requests.sharedCPURequests > 0 && requests.exclusiveCPURequests > 0 {
//This is the case when both shared, and exclusive pool resources are requested by the same container
//To avoid artificially throttling the exclusive user threads when the shared threads are overstepping their boundaries,
// we add a 20% safety margin to the overall CFS quota governing the usage of the whole cpuset.
//As the exclusive cores utiliziation is capped at 100% of physical capacity,
// this margin is only utilized when shared threads would throttle the exclusive ones.
totalCFSLimit += mixedContainerSafetyMarginRatio * requests.sharedCPURequests / 100
}
if totalCFSLimit > 0 {
patchList = patchCPULimit(totalCFSLimit, patchList, contID, contSpec)
}
return patchList
}

func patchCPULimit(sharedCPUTime int, patchList []patch, i int, c *corev1.Container) []patch {
var patchItem patch

Expand All @@ -134,9 +156,9 @@ func patchContainerEnv(poolRequests poolRequestMap, envPatched bool, patchList [
var patchItem patch
var poolStr string

if poolRequests[c.Name].exclusiveCPURequests && poolRequests[c.Name].sharedCPURequests > 0 {
if poolRequests[c.Name].exclusiveCPURequests > 0 && poolRequests[c.Name].sharedCPURequests > 0 {
poolStr = types.ExclusivePoolID + "&" + types.SharedPoolID
} else if poolRequests[c.Name].exclusiveCPURequests {
} else if poolRequests[c.Name].exclusiveCPURequests > 0 {
poolStr = types.ExclusivePoolID
} else if poolRequests[c.Name].sharedCPURequests > 0 {
poolStr = types.SharedPoolID
Expand Down Expand Up @@ -287,14 +309,8 @@ func mutatePods(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
}

// Patch container if needed.
for i, c := range pod.Spec.Containers {
// Set CPU limit if shared CPU were requested and exclusive CPUs were not requested
containerEnvPatched := false
if poolRequests[c.Name].sharedCPURequests > 0 &&
!poolRequests[c.Name].exclusiveCPURequests {
patchList = patchCPULimit(poolRequests[c.Name].sharedCPURequests,
patchList, i, &c)
}
for contID, contSpec := range pod.Spec.Containers {
patchList = setRequestLimit(poolRequests[contSpec.Name], patchList, contID, &contSpec)
// If pod annotation has entry for this container or
// container asks for exclusive cpus, we add patches to enable pinning.
// The patches enable process in container to be started with cpu pooler's 'process starter'
Expand All @@ -303,28 +319,29 @@ func mutatePods(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
// has started, the cpu affinity setting by application will be overwritten by the cpuset.
// The process starter will wait for cpusetter to finish it's job for this container
// and starts the application process after that.
pinningPatchNeeded := cpuAnnotation.ContainerExists(c.Name)
if poolRequests[c.Name].exclusiveCPURequests {
if len(c.Command) == 0 && !pinningPatchNeeded {
glog.Warningf("Container %s asked exclusive cpus but command not given. CPU affinity settings possibly lost for container", c.Name)
pinningPatchNeeded := cpuAnnotation.ContainerExists(contSpec.Name)
if poolRequests[contSpec.Name].exclusiveCPURequests > 0 {
if len(contSpec.Command) == 0 && !pinningPatchNeeded {
glog.Warningf("Container %s asked exclusive cpus but command not given. CPU affinity settings possibly lost for container", contSpec.Name)
} else {
pinningPatchNeeded = true
}
}
containerEnvPatched := false
if pinningPatchNeeded {
glog.V(2).Infof("Patch container for pinning %s", c.Name)
glog.V(2).Infof("Patch container for pinning %s", contSpec.Name)

patchList, err = patchContainerForPinning(cpuAnnotation, patchList, i, &c)
patchList, err = patchContainerForPinning(cpuAnnotation, patchList, contID, &contSpec)
if err != nil {
return toAdmissionResponse(err)
}
pinningPatchAdded = true
containerEnvPatched = true
}
if poolRequests[c.Name].sharedCPURequests > 0 ||
poolRequests[c.Name].exclusiveCPURequests {
if poolRequests[contSpec.Name].sharedCPURequests > 0 ||
poolRequests[contSpec.Name].exclusiveCPURequests > 0 {
// Patch container environment variable
patchList, err = patchContainerEnv(poolRequests, containerEnvPatched, patchList, i, &c)
patchList, err = patchContainerEnv(poolRequests, containerEnvPatched, patchList, contID, &contSpec)
if err != nil {
return toAdmissionResponse(err)
}
Expand Down

0 comments on commit c129ba0

Please sign in to comment.