Skip to content

Commit

Permalink
Prevent borrowing when preemption could help
Browse files Browse the repository at this point in the history
Assign flavors that could be satisfied with preemption.

If we find such assigment, prevent borrowing in the cohort.

Change-Id: I94ee6d6207e3c44fc323fb448e0836a35195e2bb
  • Loading branch information
alculquicondor committed Dec 14, 2022
1 parent 7059151 commit c8557f3
Show file tree
Hide file tree
Showing 4 changed files with 548 additions and 136 deletions.
230 changes: 182 additions & 48 deletions pkg/scheduler/flavorassigner/flavorassigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ limitations under the License.
package flavorassigner

import (
"errors"
"fmt"
"sort"
"strings"

"github.com/go-logr/logr"
"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -40,12 +42,56 @@ type Assignment struct {
// usedResources is the accumulated usage of resources as podSets get
// flavors assigned.
usage cache.ResourceQuantities

// repMode is the cached representative mode for this assignment.
repMode *FlavorAssignmentMode
}

func (a *Assignment) Borrows() bool {
return len(a.TotalBorrow) > 0
}

func (a *Assignment) RepresentativeMode() FlavorAssignmentMode {
if a.repMode != nil {
return *a.repMode
}
if len(a.PodSets) == 0 {
return NoFit
}
mode := Fit
for _, ps := range a.PodSets {
psMode := ps.RepresentativeMode()
if psMode < mode {
mode = psMode
if mode == NoFit {
break
}
}
}
a.repMode = &mode
return mode
}

func (a *Assignment) Message() string {
var builder strings.Builder
for _, ps := range a.PodSets {
if ps.Status == nil {
continue
}
if ps.Status.IsError() {
return fmt.Sprintf("failed to assign flavors to podset %s: %v", ps.Name, ps.Status.err)
}
if builder.Len() > 0 {
builder.WriteString("; ")
}
builder.WriteString("couldn't assign flavors to podSet ")
builder.WriteString(ps.Name)
builder.WriteString(": ")
builder.WriteString(ps.Status.Message())
}
return builder.String()
}

func (a *Assignment) ToAPI() []kueue.PodSetFlavors {
psFlavors := make([]kueue.PodSetFlavors, len(a.PodSets))
for i := range psFlavors {
Expand All @@ -55,38 +101,60 @@ func (a *Assignment) ToAPI() []kueue.PodSetFlavors {
}

type Status struct {
podSet string
reasons []string
err error
}

func (s *Status) IsSuccess() bool {
return s == nil
}

func (s *Status) IsError() bool {
return s != nil && s.err != nil
}

func (s *Status) append(r string) {
s.reasons = append(s.reasons, r)
func (s *Status) append(r ...string) *Status {
s.reasons = append(s.reasons, r...)
return s
}

func (s *Status) Message() string {
if s.IsSuccess() {
if s == nil {
return ""
}
if s.err != nil {
return fmt.Sprintf("Couldn't assign flavors for podSet %s: %v", s.podSet, s.err)
return s.err.Error()
}
sort.Strings(s.reasons)
msg := strings.Join(s.reasons, "; ")
return fmt.Sprintf("Workload's %q podSet didn't fit: %s", s.podSet, msg)
return strings.Join(s.reasons, ", ")
}

func (s *Status) Equal(o *Status) bool {
if s == nil || o == nil {
return s == o
}
if s.err != nil {
return errors.Is(s.err, o.err)
}
return cmp.Equal(s.reasons, o.reasons)
}

type PodSetAssignment struct {
Name string
Flavors ResourceAssignment
Status *Status
}

func (psa *PodSetAssignment) RepresentativeMode() FlavorAssignmentMode {
if psa.Status == nil {
return Fit
}
if psa.Flavors == nil {
return NoFit
}
mode := Fit
for _, flvAssignment := range psa.Flavors {
if flvAssignment.Mode < mode {
mode = flvAssignment.Mode
}
}
return mode
}

type ResourceAssignment map[corev1.ResourceName]*FlavorAssignment
Expand All @@ -104,19 +172,44 @@ func (psa *PodSetAssignment) toAPI() kueue.PodSetFlavors {

type FlavorAssignmentMode int

// The flavor assigment modes below are ordered from lowest to highest
// preference.
const (
// CohortFit means that there are enough unused resources in the cohort to
// NoFit means that there is no enough quota left to assign this flavor.
NoFit FlavorAssignmentMode = iota
// ClusterQueuePreempt means that there is not enough min quota in the
// ClusterQueue, so we need to preempt other workloads in the ClusterQueue
// with lower priority.
ClusterQueuePreempt
// CohortReclaim means that there is enough unused min quota in the
// ClusterQueue, but it needs to be reclaimed from the cohort.
CohortReclaim
// Fit means that there is not enough unused quota in the cohort to
// assign this flavor.
CohortFit FlavorAssignmentMode = iota
Fit
)

func (m FlavorAssignmentMode) String() string {
switch m {
case NoFit:
return "NoFit"
case ClusterQueuePreempt:
return "ClusterQueuePreempt"
case CohortReclaim:
return "CohortReclaim"
case Fit:
return "Fit"
}
return "Unknown"
}

type FlavorAssignment struct {
Name string
Mode FlavorAssignmentMode
borrow int64
}

func AssignFlavors(log logr.Logger, wl *workload.Info, resourceFlavors map[string]*kueue.ResourceFlavor, cq *cache.ClusterQueue) (*Assignment, *Status) {
func AssignFlavors(log logr.Logger, wl *workload.Info, resourceFlavors map[string]*kueue.ResourceFlavor, cq *cache.ClusterQueue) Assignment {
assignment := Assignment{
TotalBorrow: make(cache.ResourceQuantities),
PodSets: make([]PodSetAssignment, 0, len(wl.TotalRequests)),
Expand All @@ -134,35 +227,48 @@ func AssignFlavors(log logr.Logger, wl *workload.Info, resourceFlavors map[strin
continue
}
if _, ok := cq.RequestableResources[resName]; !ok {
return nil, &Status{
podSet: podSet.Name,
psAssignment.Flavors = nil
psAssignment.Status = &Status{
reasons: []string{fmt.Sprintf("resource %s unavailable in ClusterQueue", resName)},
}
break
}
codepResources := cq.RequestableResources[resName].CodependentResources
if codepResources.Len() == 0 {
codepResources = sets.NewString(string(resName))
}
codepReq := filterRequestedResources(podSet.Requests, codepResources)
flavors, status := assignment.findFlavorForCodepResources(log, codepReq, resourceFlavors, cq, &wl.Obj.Spec.PodSets[i].Spec)
if !status.IsSuccess() {
status.podSet = podSet.Name
return nil, status
if status.IsError() || len(flavors) == 0 {
psAssignment.Flavors = nil
psAssignment.Status = status
break
}
psAssignment.append(flavors)
psAssignment.append(flavors, status)
}

assignment.append(podSet.Requests, &psAssignment)
if psAssignment.Status.IsError() || (len(podSet.Requests) > 0 && len(psAssignment.Flavors) == 0) {
// This assignment failed, no need to continue tracking.
assignment.TotalBorrow = nil
return assignment
}
}
if len(assignment.TotalBorrow) == 0 {
assignment.TotalBorrow = nil
}
return &assignment, nil
return assignment
}

func (psa *PodSetAssignment) append(flavors ResourceAssignment) {
func (psa *PodSetAssignment) append(flavors ResourceAssignment, status *Status) {
for resource, assignment := range flavors {
psa.Flavors[resource] = assignment
}
if psa.Status == nil {
psa.Status = status
} else if status != nil {
psa.Status.reasons = append(psa.Status.reasons, status.reasons...)
}
}

func (a *Assignment) append(requests workload.Requests, psAssignment *PodSetAssignment) {
Expand Down Expand Up @@ -192,13 +298,16 @@ func (a *Assignment) findFlavorForCodepResources(
resourceFlavors map[string]*kueue.ResourceFlavor,
cq *cache.ClusterQueue,
spec *corev1.PodSpec) (ResourceAssignment, *Status) {
var status Status
status := &Status{}

// Keep any resource name as an anchor to gather flavors for.
var rName corev1.ResourceName
for rName = range requests {
break
}
var bestAssignment ResourceAssignment
bestAssignmentMode := NoFit

// We will only check against the flavors' labels for the resource.
// Since all the resources share the same flavors, they use the same selector.
selector := flavorSelector(spec, cq.LabelKeys[rName])
Expand All @@ -219,35 +328,46 @@ func (a *Assignment) findFlavorForCodepResources(
if match, err := selector.Match(&corev1.Node{ObjectMeta: metav1.ObjectMeta{Labels: flavor.NodeSelector}}); !match || err != nil {
if err != nil {
status.err = err
return nil, &status
return nil, status
}
status.append(fmt.Sprintf("flavor %s doesn't match with node affinity", flvLimit.Name))
continue
}

assignments := make(ResourceAssignment, len(requests))
worstFlavorMode := Fit
for name, val := range requests {
codepFlvLimit := cq.RequestableResources[name].Flavors[i]
// Check considering the flavor usage by previous pod sets.
borrow, s := fitsFlavorLimits(name, val+a.usage[name][flavor.Name], cq, &codepFlvLimit)
if s.IsError() {
return nil, s
}
if !s.IsSuccess() {
mode, borrow, s := fitsFlavorLimits(name, val+a.usage[name][flavor.Name], cq, &codepFlvLimit)
if s != nil {
status.reasons = append(status.reasons, s.reasons...)
}
if mode < worstFlavorMode {
worstFlavorMode = mode
}
if worstFlavorMode == NoFit {
// The flavor doesn't fit, no need to check other resources.
break
}

assignments[name] = &FlavorAssignment{
Name: flavor.Name,
Mode: CohortFit,
Mode: mode,
borrow: borrow,
}
}
if len(assignments) == len(requests) {
return assignments, nil

if worstFlavorMode > bestAssignmentMode {
bestAssignment = assignments
bestAssignmentMode = worstFlavorMode
if bestAssignmentMode == Fit {
// All the resources fit in the cohort, no need to check more flavors.
return bestAssignment, nil
}
}
}
return nil, &status
return bestAssignment, status
}

func flavorSelector(spec *corev1.PodSpec, allowedKeys sets.String) nodeaffinity.RequiredNodeAffinity {
Expand Down Expand Up @@ -296,38 +416,52 @@ func flavorSelector(spec *corev1.PodSpec, allowedKeys sets.String) nodeaffinity.
return nodeaffinity.GetRequiredNodeAffinity(&corev1.Pod{Spec: specCopy})
}

// fitsFlavorLimits returns whether a requested resource fits in a specific flavor's quota limits.
// fitsFlavorLimits returns how could this flavor be assigned, according to the
// remaining quota in the ClusterQueue and cohort.
// If it fits, also returns any borrowing required.
func fitsFlavorLimits(rName corev1.ResourceName, val int64, cq *cache.ClusterQueue, flavor *cache.FlavorLimits) (int64, *Status) {
func fitsFlavorLimits(rName corev1.ResourceName, val int64, cq *cache.ClusterQueue, flavor *cache.FlavorLimits) (FlavorAssignmentMode, int64, *Status) {
var status Status
used := cq.UsedResources[rName][flavor.Name]
mode := NoFit
if val <= flavor.Min {
// The request can be satisfied by the min quota, if all active workloads
// in the ClusterQueue are preempted.
mode = ClusterQueuePreempt
}
borrow := used + val - flavor.Min
if borrow <= 0 {
borrow = 0
// The request can be satisfied by the min quota, if all active workloads
// from other ClusterQueues in the cohort are preempted.
mode = CohortReclaim
}
if flavor.Max != nil && used+val > *flavor.Max {
status.append(fmt.Sprintf("borrowing limit for %s flavor %s exceeded", rName, flavor.Name))
return 0, &status
return mode, 0, &status
}
cohortUsed := used
cohortTotal := flavor.Min
if cq.Cohort != nil {
cohortUsed = cq.Cohort.UsedResources[rName][flavor.Name]
cohortTotal = cq.Cohort.RequestableResources[rName][flavor.Name]
}
borrow := used + val - flavor.Min
if borrow < 0 {
borrow = 0
}

lack := cohortUsed + val - cohortTotal
if lack > 0 {
lackQuantity := workload.ResourceQuantity(rName, lack)
if cq.Cohort == nil {
status.append(fmt.Sprintf("insufficient quota for %s flavor %s, %s more needed", rName, flavor.Name, &lackQuantity))
if lack <= 0 {
return Fit, borrow, nil
}
lackQuantity := workload.ResourceQuantity(rName, lack)
msg := fmt.Sprintf("insufficient unused quota in cohort for %s flavor %s, %s more needed", rName, flavor.Name, &lackQuantity)
if cq.Cohort == nil {
if mode == NoFit {
msg = fmt.Sprintf("insufficient quota for %s flavor %s in ClusterQueue", rName, flavor.Name)
} else {
status.append(fmt.Sprintf("insufficient quota for %s flavor %s, %s more needed after borrowing", rName, flavor.Name, &lackQuantity))
msg = fmt.Sprintf("insufficient unused quota for %s flavor %s, %s more needed", rName, flavor.Name, &lackQuantity)
}
// TODO(PostMVP): preemption could help if borrow == 0
return 0, &status
}
return borrow, nil
status.append(msg)
// This workload might preempt from ClusterQueue or cohort without borrowing.
return mode, 0, &status
}

func filterRequestedResources(req workload.Requests, allowList sets.String) workload.Requests {
Expand Down
Loading

0 comments on commit c8557f3

Please sign in to comment.