diff --git a/pkg/scheduler/flavorassigner/flavorassigner.go b/pkg/scheduler/flavorassigner/flavorassigner.go index e5141894ac..b86f9f004b 100644 --- a/pkg/scheduler/flavorassigner/flavorassigner.go +++ b/pkg/scheduler/flavorassigner/flavorassigner.go @@ -17,11 +17,13 @@ limitations under the License. package flavorassigner import ( + "errors" "fmt" "sort" "strings" "github.com/go-logr/logr" + "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" @@ -40,12 +42,56 @@ type Assignment struct { // usedResources is the accumulated usage of resources as podSets get // flavors assigned. usage cache.ResourceQuantities + + // repMode is the cached representative mode for this assignment. + repMode *FlavorAssignmentMode } func (a *Assignment) Borrows() bool { return len(a.TotalBorrow) > 0 } +func (a *Assignment) RepresentativeMode() FlavorAssignmentMode { + if a.repMode != nil { + return *a.repMode + } + if len(a.PodSets) == 0 { + return NoFit + } + mode := Fit + for _, ps := range a.PodSets { + psMode := ps.RepresentativeMode() + if psMode < mode { + mode = psMode + if mode == NoFit { + break + } + } + } + a.repMode = &mode + return mode +} + +func (a *Assignment) Message() string { + var builder strings.Builder + for _, ps := range a.PodSets { + if ps.Status == nil { + continue + } + if ps.Status.IsError() { + return fmt.Sprintf("failed to assign flavors to podset %s: %v", ps.Name, ps.Status.err) + } + if builder.Len() > 0 { + builder.WriteString("; ") + } + builder.WriteString("couldn't assign flavors to podSet ") + builder.WriteString(ps.Name) + builder.WriteString(": ") + builder.WriteString(ps.Status.Message()) + } + return builder.String() +} + func (a *Assignment) ToAPI() []kueue.PodSetFlavors { psFlavors := make([]kueue.PodSetFlavors, len(a.PodSets)) for i := range psFlavors { @@ -55,38 +101,60 @@ func (a *Assignment) ToAPI() []kueue.PodSetFlavors { } type Status struct { - podSet string reasons []string err error } -func (s *Status) IsSuccess() bool { - return s == nil -} - func (s *Status) IsError() bool { return s != nil && s.err != nil } -func (s *Status) append(r string) { - s.reasons = append(s.reasons, r) +func (s *Status) append(r ...string) *Status { + s.reasons = append(s.reasons, r...) + return s } func (s *Status) Message() string { - if s.IsSuccess() { + if s == nil { return "" } if s.err != nil { - return fmt.Sprintf("Couldn't assign flavors for podSet %s: %v", s.podSet, s.err) + return s.err.Error() } sort.Strings(s.reasons) - msg := strings.Join(s.reasons, "; ") - return fmt.Sprintf("Workload's %q podSet didn't fit: %s", s.podSet, msg) + return strings.Join(s.reasons, ", ") +} + +func (s *Status) Equal(o *Status) bool { + if s == nil || o == nil { + return s == o + } + if s.err != nil { + return errors.Is(s.err, o.err) + } + return cmp.Equal(s.reasons, o.reasons) } type PodSetAssignment struct { Name string Flavors ResourceAssignment + Status *Status +} + +func (psa *PodSetAssignment) RepresentativeMode() FlavorAssignmentMode { + if psa.Status == nil { + return Fit + } + if psa.Flavors == nil { + return NoFit + } + mode := Fit + for _, flvAssignment := range psa.Flavors { + if flvAssignment.Mode < mode { + mode = flvAssignment.Mode + } + } + return mode } type ResourceAssignment map[corev1.ResourceName]*FlavorAssignment @@ -104,19 +172,44 @@ func (psa *PodSetAssignment) toAPI() kueue.PodSetFlavors { type FlavorAssignmentMode int +// The flavor assigment modes below are ordered from lowest to highest +// preference. const ( - // CohortFit means that there are enough unused resources in the cohort to + // NoFit means that there is no enough quota left to assign this flavor. + NoFit FlavorAssignmentMode = iota + // ClusterQueuePreempt means that there is not enough min quota in the + // ClusterQueue, so we need to preempt other workloads in the ClusterQueue + // with lower priority. + ClusterQueuePreempt + // CohortReclaim means that there is enough unused min quota in the + // ClusterQueue, but it needs to be reclaimed from the cohort. + CohortReclaim + // Fit means that there is not enough unused quota in the cohort to // assign this flavor. - CohortFit FlavorAssignmentMode = iota + Fit ) +func (m FlavorAssignmentMode) String() string { + switch m { + case NoFit: + return "NoFit" + case ClusterQueuePreempt: + return "ClusterQueuePreempt" + case CohortReclaim: + return "CohortReclaim" + case Fit: + return "Fit" + } + return "Unknown" +} + type FlavorAssignment struct { Name string Mode FlavorAssignmentMode borrow int64 } -func AssignFlavors(log logr.Logger, wl *workload.Info, resourceFlavors map[string]*kueue.ResourceFlavor, cq *cache.ClusterQueue) (*Assignment, *Status) { +func AssignFlavors(log logr.Logger, wl *workload.Info, resourceFlavors map[string]*kueue.ResourceFlavor, cq *cache.ClusterQueue) Assignment { assignment := Assignment{ TotalBorrow: make(cache.ResourceQuantities), PodSets: make([]PodSetAssignment, 0, len(wl.TotalRequests)), @@ -134,10 +227,11 @@ func AssignFlavors(log logr.Logger, wl *workload.Info, resourceFlavors map[strin continue } if _, ok := cq.RequestableResources[resName]; !ok { - return nil, &Status{ - podSet: podSet.Name, + psAssignment.Flavors = nil + psAssignment.Status = &Status{ reasons: []string{fmt.Sprintf("resource %s unavailable in ClusterQueue", resName)}, } + break } codepResources := cq.RequestableResources[resName].CodependentResources if codepResources.Len() == 0 { @@ -145,24 +239,36 @@ func AssignFlavors(log logr.Logger, wl *workload.Info, resourceFlavors map[strin } codepReq := filterRequestedResources(podSet.Requests, codepResources) flavors, status := assignment.findFlavorForCodepResources(log, codepReq, resourceFlavors, cq, &wl.Obj.Spec.PodSets[i].Spec) - if !status.IsSuccess() { - status.podSet = podSet.Name - return nil, status + if status.IsError() || len(flavors) == 0 { + psAssignment.Flavors = nil + psAssignment.Status = status + break } - psAssignment.append(flavors) + psAssignment.append(flavors, status) } + assignment.append(podSet.Requests, &psAssignment) + if psAssignment.Status.IsError() || (len(podSet.Requests) > 0 && len(psAssignment.Flavors) == 0) { + // This assignment failed, no need to continue tracking. + assignment.TotalBorrow = nil + return assignment + } } if len(assignment.TotalBorrow) == 0 { assignment.TotalBorrow = nil } - return &assignment, nil + return assignment } -func (psa *PodSetAssignment) append(flavors ResourceAssignment) { +func (psa *PodSetAssignment) append(flavors ResourceAssignment, status *Status) { for resource, assignment := range flavors { psa.Flavors[resource] = assignment } + if psa.Status == nil { + psa.Status = status + } else if status != nil { + psa.Status.reasons = append(psa.Status.reasons, status.reasons...) + } } func (a *Assignment) append(requests workload.Requests, psAssignment *PodSetAssignment) { @@ -192,13 +298,16 @@ func (a *Assignment) findFlavorForCodepResources( resourceFlavors map[string]*kueue.ResourceFlavor, cq *cache.ClusterQueue, spec *corev1.PodSpec) (ResourceAssignment, *Status) { - var status Status + status := &Status{} // Keep any resource name as an anchor to gather flavors for. var rName corev1.ResourceName for rName = range requests { break } + var bestAssignment ResourceAssignment + bestAssignmentMode := NoFit + // We will only check against the flavors' labels for the resource. // Since all the resources share the same flavors, they use the same selector. selector := flavorSelector(spec, cq.LabelKeys[rName]) @@ -219,35 +328,46 @@ func (a *Assignment) findFlavorForCodepResources( if match, err := selector.Match(&corev1.Node{ObjectMeta: metav1.ObjectMeta{Labels: flavor.NodeSelector}}); !match || err != nil { if err != nil { status.err = err - return nil, &status + return nil, status } status.append(fmt.Sprintf("flavor %s doesn't match with node affinity", flvLimit.Name)) continue } assignments := make(ResourceAssignment, len(requests)) + worstFlavorMode := Fit for name, val := range requests { codepFlvLimit := cq.RequestableResources[name].Flavors[i] // Check considering the flavor usage by previous pod sets. - borrow, s := fitsFlavorLimits(name, val+a.usage[name][flavor.Name], cq, &codepFlvLimit) - if s.IsError() { - return nil, s - } - if !s.IsSuccess() { + mode, borrow, s := fitsFlavorLimits(name, val+a.usage[name][flavor.Name], cq, &codepFlvLimit) + if s != nil { status.reasons = append(status.reasons, s.reasons...) + } + if mode < worstFlavorMode { + worstFlavorMode = mode + } + if worstFlavorMode == NoFit { + // The flavor doesn't fit, no need to check other resources. break } + assignments[name] = &FlavorAssignment{ Name: flavor.Name, - Mode: CohortFit, + Mode: mode, borrow: borrow, } } - if len(assignments) == len(requests) { - return assignments, nil + + if worstFlavorMode > bestAssignmentMode { + bestAssignment = assignments + bestAssignmentMode = worstFlavorMode + if bestAssignmentMode == Fit { + // All the resources fit in the cohort, no need to check more flavors. + return bestAssignment, nil + } } } - return nil, &status + return bestAssignment, status } func flavorSelector(spec *corev1.PodSpec, allowedKeys sets.String) nodeaffinity.RequiredNodeAffinity { @@ -296,14 +416,28 @@ func flavorSelector(spec *corev1.PodSpec, allowedKeys sets.String) nodeaffinity. return nodeaffinity.GetRequiredNodeAffinity(&corev1.Pod{Spec: specCopy}) } -// fitsFlavorLimits returns whether a requested resource fits in a specific flavor's quota limits. +// fitsFlavorLimits returns how could this flavor be assigned, according to the +// remaining quota in the ClusterQueue and cohort. // If it fits, also returns any borrowing required. -func fitsFlavorLimits(rName corev1.ResourceName, val int64, cq *cache.ClusterQueue, flavor *cache.FlavorLimits) (int64, *Status) { +func fitsFlavorLimits(rName corev1.ResourceName, val int64, cq *cache.ClusterQueue, flavor *cache.FlavorLimits) (FlavorAssignmentMode, int64, *Status) { var status Status used := cq.UsedResources[rName][flavor.Name] + mode := NoFit + if val <= flavor.Min { + // The request can be satisfied by the min quota, if all active workloads + // in the ClusterQueue are preempted. + mode = ClusterQueuePreempt + } + borrow := used + val - flavor.Min + if borrow <= 0 { + borrow = 0 + // The request can be satisfied by the min quota, if all active workloads + // from other ClusterQueues in the cohort are preempted. + mode = CohortReclaim + } if flavor.Max != nil && used+val > *flavor.Max { status.append(fmt.Sprintf("borrowing limit for %s flavor %s exceeded", rName, flavor.Name)) - return 0, &status + return mode, 0, &status } cohortUsed := used cohortTotal := flavor.Min @@ -311,23 +445,23 @@ func fitsFlavorLimits(rName corev1.ResourceName, val int64, cq *cache.ClusterQue cohortUsed = cq.Cohort.UsedResources[rName][flavor.Name] cohortTotal = cq.Cohort.RequestableResources[rName][flavor.Name] } - borrow := used + val - flavor.Min - if borrow < 0 { - borrow = 0 - } lack := cohortUsed + val - cohortTotal - if lack > 0 { - lackQuantity := workload.ResourceQuantity(rName, lack) - if cq.Cohort == nil { - status.append(fmt.Sprintf("insufficient quota for %s flavor %s, %s more needed", rName, flavor.Name, &lackQuantity)) + if lack <= 0 { + return Fit, borrow, nil + } + lackQuantity := workload.ResourceQuantity(rName, lack) + msg := fmt.Sprintf("insufficient unused quota in cohort for %s flavor %s, %s more needed", rName, flavor.Name, &lackQuantity) + if cq.Cohort == nil { + if mode == NoFit { + msg = fmt.Sprintf("insufficient quota for %s flavor %s in ClusterQueue", rName, flavor.Name) } else { - status.append(fmt.Sprintf("insufficient quota for %s flavor %s, %s more needed after borrowing", rName, flavor.Name, &lackQuantity)) + msg = fmt.Sprintf("insufficient unused quota for %s flavor %s, %s more needed", rName, flavor.Name, &lackQuantity) } - // TODO(PostMVP): preemption could help if borrow == 0 - return 0, &status } - return borrow, nil + status.append(msg) + // This workload might preempt from ClusterQueue or cohort without borrowing. + return mode, 0, &status } func filterRequestedResources(req workload.Requests, allowList sets.String) workload.Requests { diff --git a/pkg/scheduler/flavorassigner/flavorassigner_test.go b/pkg/scheduler/flavorassigner/flavorassigner_test.go index c23b7947df..58f76761b3 100644 --- a/pkg/scheduler/flavorassigner/flavorassigner_test.go +++ b/pkg/scheduler/flavorassigner/flavorassigner_test.go @@ -17,7 +17,6 @@ limitations under the License. package flavorassigner import ( - "strings" "testing" "github.com/go-logr/logr/testr" @@ -77,9 +76,8 @@ func TestAssignFlavors(t *testing.T) { cases := map[string]struct { wlPods []kueue.PodSet clusterQueue cache.ClusterQueue - wantFits bool - wantAssignment *Assignment - wantMsg string + wantRepMode FlavorAssignmentMode + wantAssignment Assignment }{ "single flavor, fits": { wlPods: []kueue.PodSet{ @@ -98,13 +96,13 @@ func TestAssignFlavors(t *testing.T) { corev1.ResourceMemory: {Flavors: []cache.FlavorLimits{{Name: "default", Min: 2 * utiltesting.Mi}}}, }, }, - wantFits: true, - wantAssignment: &Assignment{ + wantRepMode: Fit, + wantAssignment: Assignment{ PodSets: []PodSetAssignment{{ Name: "main", Flavors: ResourceAssignment{ - corev1.ResourceCPU: {Name: "default", Mode: CohortFit}, - corev1.ResourceMemory: {Name: "default", Mode: CohortFit}, + corev1.ResourceCPU: {Name: "default", Mode: Fit}, + corev1.ResourceMemory: {Name: "default", Mode: Fit}, }, }}, }, @@ -138,12 +136,12 @@ func TestAssignFlavors(t *testing.T) { }, }, }, - wantFits: true, - wantAssignment: &Assignment{ + wantRepMode: Fit, + wantAssignment: Assignment{ PodSets: []PodSetAssignment{{ Name: "main", Flavors: ResourceAssignment{ - corev1.ResourceCPU: {Name: "tainted", Mode: CohortFit}, + corev1.ResourceCPU: {Name: "tainted", Mode: Fit}, }, }}, }, @@ -168,7 +166,18 @@ func TestAssignFlavors(t *testing.T) { }, }, }, - wantMsg: "insufficient quota for cpu flavor default, 1 more needed", + wantRepMode: ClusterQueuePreempt, + wantAssignment: Assignment{ + PodSets: []PodSetAssignment{{ + Name: "main", + Flavors: ResourceAssignment{ + corev1.ResourceCPU: {Name: "default", Mode: ClusterQueuePreempt}, + }, + Status: &Status{ + reasons: []string{"insufficient unused quota for cpu flavor default, 1 more needed"}, + }, + }}, + }, }, "multiple independent flavors, fits": { wlPods: []kueue.PodSet{ @@ -197,13 +206,54 @@ func TestAssignFlavors(t *testing.T) { }, }, }, - wantFits: true, - wantAssignment: &Assignment{ + wantRepMode: Fit, + wantAssignment: Assignment{ PodSets: []PodSetAssignment{{ Name: "main", Flavors: ResourceAssignment{ - corev1.ResourceCPU: {Name: "two", Mode: CohortFit}, - corev1.ResourceMemory: {Name: "b_one", Mode: CohortFit}, + corev1.ResourceCPU: {Name: "two", Mode: Fit}, + corev1.ResourceMemory: {Name: "b_one", Mode: Fit}, + }, + }}, + }, + }, + "multiple independent flavors, one could fit with preemption, other doesn't fit": { + wlPods: []kueue.PodSet{ + { + Count: 1, + Name: "main", + Spec: utiltesting.PodSpecForRequest(map[corev1.ResourceName]string{ + corev1.ResourceCPU: "3", + corev1.ResourceMemory: "10Mi", + }), + }, + }, + clusterQueue: cache.ClusterQueue{ + RequestableResources: map[corev1.ResourceName]*cache.Resource{ + corev1.ResourceCPU: { + Flavors: []cache.FlavorLimits{ + {Name: "one", Min: 3000}, + }, + }, + corev1.ResourceMemory: { + Flavors: []cache.FlavorLimits{ + {Name: "b_one", Min: utiltesting.Mi}, + }, + }, + }, + UsedResources: cache.ResourceQuantities{ + corev1.ResourceCPU: { + "one": 1000, + }, + }, + }, + wantAssignment: Assignment{ + PodSets: []PodSetAssignment{{ + Name: "main", + Status: &Status{ + reasons: []string{ + "insufficient quota for memory flavor b_one in ClusterQueue", + }, }, }}, }, @@ -242,14 +292,94 @@ func TestAssignFlavors(t *testing.T) { }, }, }, - wantFits: true, - wantAssignment: &Assignment{ + wantRepMode: Fit, + wantAssignment: Assignment{ PodSets: []PodSetAssignment{{ Name: "main", Flavors: ResourceAssignment{ - corev1.ResourceCPU: {Name: "two", Mode: CohortFit}, - corev1.ResourceMemory: {Name: "two", Mode: CohortFit}, - "example.com/gpu": {Name: "b_one", Mode: CohortFit}, + corev1.ResourceCPU: {Name: "two", Mode: Fit}, + corev1.ResourceMemory: {Name: "two", Mode: Fit}, + "example.com/gpu": {Name: "b_one", Mode: Fit}, + }, + }}, + }, + }, + "some codepedent flavors, fits with different modes": { + wlPods: []kueue.PodSet{ + { + Count: 1, + Name: "main", + Spec: utiltesting.PodSpecForRequest(map[corev1.ResourceName]string{ + corev1.ResourceCPU: "3", + corev1.ResourceMemory: "10Mi", + "example.com/gpu": "3", + }), + }, + }, + clusterQueue: cache.ClusterQueue{ + RequestableResources: map[corev1.ResourceName]*cache.Resource{ + corev1.ResourceCPU: { + Flavors: []cache.FlavorLimits{ + {Name: "one", Min: 2000}, + {Name: "two", Min: 4000}, + }, + }, + corev1.ResourceMemory: { + Flavors: []cache.FlavorLimits{ + {Name: "one", Min: utiltesting.Gi}, + {Name: "two", Min: 15 * utiltesting.Mi}, + }, + }, + "example.com/gpu": { + Flavors: []cache.FlavorLimits{ + {Name: "b_one", Min: 4}, + }, + }, + }, + UsedResources: cache.ResourceQuantities{ + corev1.ResourceMemory: { + "two": 10 * utiltesting.Mi, + }, + }, + Cohort: &cache.Cohort{ + RequestableResources: cache.ResourceQuantities{ + corev1.ResourceCPU: { + "one": 2000, + "two": 4000, + }, + corev1.ResourceMemory: { + "one": utiltesting.Gi, + "two": 15 * utiltesting.Mi, + }, + "example.com/gpu": { + "b_one": 4, + }, + }, + UsedResources: cache.ResourceQuantities{ + corev1.ResourceMemory: { + "two": 10 * utiltesting.Mi, + }, + "example.com/gpu": { + "b_one": 2, + }, + }, + }, + }, + wantRepMode: ClusterQueuePreempt, + wantAssignment: Assignment{ + PodSets: []PodSetAssignment{{ + Name: "main", + Flavors: ResourceAssignment{ + corev1.ResourceCPU: {Name: "two", Mode: Fit}, + corev1.ResourceMemory: {Name: "two", Mode: ClusterQueuePreempt}, + "example.com/gpu": {Name: "b_one", Mode: CohortReclaim}, + }, + Status: &Status{ + reasons: []string{ + "insufficient unused quota in cohort for cpu flavor one, 1 more needed", + "insufficient unused quota in cohort for memory flavor two, 5Mi more needed", + "insufficient unused quota in cohort for example.com/gpu flavor b_one, 1 more needed", + }, }, }}, }, @@ -281,7 +411,17 @@ func TestAssignFlavors(t *testing.T) { }, }, }, - wantMsg: "insufficient quota for cpu flavor one, 1 more needed; insufficient quota for memory flavor two, 5Mi more needed", + wantAssignment: Assignment{ + PodSets: []PodSetAssignment{{ + Name: "main", + Status: &Status{ + reasons: []string{ + "insufficient quota for cpu flavor one in ClusterQueue", + "insufficient quota for memory flavor two in ClusterQueue", + }, + }, + }}, + }, }, "multiple flavors, fits while skipping tainted flavor": { wlPods: []kueue.PodSet{ @@ -303,12 +443,12 @@ func TestAssignFlavors(t *testing.T) { }, }, }, - wantFits: true, - wantAssignment: &Assignment{ + wantRepMode: Fit, + wantAssignment: Assignment{ PodSets: []PodSetAssignment{{ Name: "main", Flavors: ResourceAssignment{ - corev1.ResourceCPU: {Name: "two", Mode: CohortFit}, + corev1.ResourceCPU: {Name: "two", Mode: Fit}, }, }}, }, @@ -333,12 +473,12 @@ func TestAssignFlavors(t *testing.T) { }, }, }, - wantFits: true, - wantAssignment: &Assignment{ + wantRepMode: Fit, + wantAssignment: Assignment{ PodSets: []PodSetAssignment{{ Name: "main", Flavors: ResourceAssignment{ - corev1.ResourceCPU: {Name: "two", Mode: CohortFit}, + corev1.ResourceCPU: {Name: "two", Mode: Fit}, }, }}, }, @@ -389,12 +529,12 @@ func TestAssignFlavors(t *testing.T) { }, LabelKeys: map[corev1.ResourceName]sets.String{corev1.ResourceCPU: sets.NewString("cpuType")}, }, - wantFits: true, - wantAssignment: &Assignment{ + wantRepMode: Fit, + wantAssignment: Assignment{ PodSets: []PodSetAssignment{{ Name: "main", Flavors: ResourceAssignment{ - corev1.ResourceCPU: {Name: "two", Mode: CohortFit}, + corev1.ResourceCPU: {Name: "two", Mode: Fit}, }, }}, }, @@ -450,13 +590,13 @@ func TestAssignFlavors(t *testing.T) { }, }, }, - wantFits: true, - wantAssignment: &Assignment{ + wantRepMode: Fit, + wantAssignment: Assignment{ PodSets: []PodSetAssignment{{ Name: "main", Flavors: ResourceAssignment{ - corev1.ResourceCPU: {Name: "two", Mode: CohortFit}, - corev1.ResourceMemory: {Name: "two", Mode: CohortFit}, + corev1.ResourceCPU: {Name: "two", Mode: Fit}, + corev1.ResourceMemory: {Name: "two", Mode: Fit}, }, }}, }, @@ -515,12 +655,12 @@ func TestAssignFlavors(t *testing.T) { }, }, }, - wantFits: true, - wantAssignment: &Assignment{ + wantRepMode: Fit, + wantAssignment: Assignment{ PodSets: []PodSetAssignment{{ Name: "main", Flavors: ResourceAssignment{ - corev1.ResourceCPU: {Name: "one", Mode: CohortFit}, + corev1.ResourceCPU: {Name: "one", Mode: Fit}, }, }}, }, @@ -567,8 +707,17 @@ func TestAssignFlavors(t *testing.T) { }, LabelKeys: map[corev1.ResourceName]sets.String{corev1.ResourceCPU: sets.NewString("cpuType")}, }, - wantFits: false, - wantMsg: "flavor one doesn't match with node affinity", + wantAssignment: Assignment{ + PodSets: []PodSetAssignment{{ + Name: "main", + Status: &Status{ + reasons: []string{ + "flavor one doesn't match with node affinity", + "flavor two doesn't match with node affinity", + }, + }, + }}, + }, }, "multiple specs, fit different flavors": { wlPods: []kueue.PodSet{ @@ -597,19 +746,19 @@ func TestAssignFlavors(t *testing.T) { }, }, }, - wantFits: true, - wantAssignment: &Assignment{ + wantRepMode: Fit, + wantAssignment: Assignment{ PodSets: []PodSetAssignment{ { Name: "driver", Flavors: ResourceAssignment{ - corev1.ResourceCPU: {Name: "two", Mode: CohortFit}, + corev1.ResourceCPU: {Name: "two", Mode: Fit}, }, }, { Name: "worker", Flavors: ResourceAssignment{ - corev1.ResourceCPU: {Name: "one", Mode: CohortFit}, + corev1.ResourceCPU: {Name: "one", Mode: Fit}, }, }, }, @@ -666,21 +815,21 @@ func TestAssignFlavors(t *testing.T) { }, }, }, - wantFits: true, - wantAssignment: &Assignment{ + wantRepMode: Fit, + wantAssignment: Assignment{ PodSets: []PodSetAssignment{ { Name: "driver", Flavors: ResourceAssignment{ - corev1.ResourceCPU: {Name: "default", Mode: CohortFit}, - corev1.ResourceMemory: {Name: "default", Mode: CohortFit}, + corev1.ResourceCPU: {Name: "default", Mode: Fit}, + corev1.ResourceMemory: {Name: "default", Mode: Fit}, }, }, { Name: "worker", Flavors: ResourceAssignment{ - corev1.ResourceCPU: {Name: "default", Mode: CohortFit}, - corev1.ResourceMemory: {Name: "default", Mode: CohortFit}, + corev1.ResourceCPU: {Name: "default", Mode: Fit}, + corev1.ResourceMemory: {Name: "default", Mode: Fit}, }, }, }, @@ -725,9 +874,16 @@ func TestAssignFlavors(t *testing.T) { }, }, }, - wantMsg: "insufficient quota for cpu flavor one, 1 more needed after borrowing", + wantAssignment: Assignment{ + PodSets: []PodSetAssignment{{ + Name: "main", + Status: &Status{ + reasons: []string{"insufficient unused quota in cohort for cpu flavor one, 1 more needed"}, + }, + }}, + }, }, - "past max": { + "past max, but can preempt in ClusterQueue": { wlPods: []kueue.PodSet{ { Count: 1, @@ -743,7 +899,7 @@ func TestAssignFlavors(t *testing.T) { Flavors: []cache.FlavorLimits{ { Name: "one", - Min: 1000, + Min: 2000, Max: pointer.Int64(10_000), }, }, @@ -761,15 +917,26 @@ func TestAssignFlavors(t *testing.T) { }, }, }, - wantMsg: "borrowing limit for cpu flavor one exceeded", + wantRepMode: ClusterQueuePreempt, + wantAssignment: Assignment{ + PodSets: []PodSetAssignment{{ + Name: "main", + Flavors: ResourceAssignment{ + corev1.ResourceCPU: {Name: "one", Mode: ClusterQueuePreempt}, + }, + Status: &Status{ + reasons: []string{"borrowing limit for cpu flavor one exceeded"}, + }, + }}, + }, }, - "resource not listed in clusterQueue": { + "past min, but can preempt in ClusterQueue": { wlPods: []kueue.PodSet{ { Count: 1, Name: "main", Spec: utiltesting.PodSpecForRequest(map[corev1.ResourceName]string{ - "example.com/gpu": "1", + corev1.ResourceCPU: "2", }), }, }, @@ -777,30 +944,100 @@ func TestAssignFlavors(t *testing.T) { RequestableResources: map[corev1.ResourceName]*cache.Resource{ corev1.ResourceCPU: { Flavors: []cache.FlavorLimits{ - {Name: "one", Min: 4000}, + { + Name: "one", + Min: 2000, + }, + }, + }, + }, + UsedResources: cache.ResourceQuantities{ + corev1.ResourceCPU: {"one": 1_000}, + }, + }, + wantRepMode: ClusterQueuePreempt, + wantAssignment: Assignment{ + PodSets: []PodSetAssignment{{ + Name: "main", + Flavors: ResourceAssignment{ + corev1.ResourceCPU: {Name: "one", Mode: ClusterQueuePreempt}, + }, + Status: &Status{ + reasons: []string{"insufficient unused quota for cpu flavor one, 1 more needed"}, + }, + }}, + }, + }, + "past min, but can preempt in cohort": { + wlPods: []kueue.PodSet{ + { + Count: 1, + Name: "main", + Spec: utiltesting.PodSpecForRequest(map[corev1.ResourceName]string{ + corev1.ResourceCPU: "2", + }), + }, + }, + clusterQueue: cache.ClusterQueue{ + RequestableResources: map[corev1.ResourceName]*cache.Resource{ + corev1.ResourceCPU: { + Flavors: []cache.FlavorLimits{ + { + Name: "one", + Min: 2000, + }, }, }, }, + Cohort: &cache.Cohort{ + RequestableResources: cache.ResourceQuantities{ + corev1.ResourceCPU: {"one": 10_000}, + }, + UsedResources: cache.ResourceQuantities{ + corev1.ResourceCPU: {"one": 9_000}, + }, + }, + }, + wantRepMode: CohortReclaim, + wantAssignment: Assignment{ + PodSets: []PodSetAssignment{{ + Name: "main", + Flavors: ResourceAssignment{ + corev1.ResourceCPU: {Name: "one", Mode: CohortReclaim}, + }, + Status: &Status{ + reasons: []string{"insufficient unused quota in cohort for cpu flavor one, 1 more needed"}, + }, + }}, }, - wantFits: false, - wantMsg: "resource example.com/gpu unavailable in ClusterQueue", }, - "resource not found": { + "resource not listed in clusterQueue": { wlPods: []kueue.PodSet{ { Count: 1, Name: "main", Spec: utiltesting.PodSpecForRequest(map[corev1.ResourceName]string{ - "unknown_resource": "1", + "example.com/gpu": "1", }), }, }, clusterQueue: cache.ClusterQueue{ RequestableResources: map[corev1.ResourceName]*cache.Resource{ - corev1.ResourceCPU: {Flavors: []cache.FlavorLimits{{Name: "one", Min: 1000}}}, + corev1.ResourceCPU: { + Flavors: []cache.FlavorLimits{ + {Name: "one", Min: 4000}, + }, + }, }, }, - wantMsg: "resource unknown_resource unavailable in ClusterQueue", + wantAssignment: Assignment{ + PodSets: []PodSetAssignment{{ + Name: "main", + Status: &Status{ + reasons: []string{"resource example.com/gpu unavailable in ClusterQueue"}, + }, + }}, + }, }, "flavor not found": { wlPods: []kueue.PodSet{ @@ -817,7 +1054,14 @@ func TestAssignFlavors(t *testing.T) { corev1.ResourceCPU: {Flavors: []cache.FlavorLimits{{Name: "nonexistent-flavor", Min: 1000}}}, }, }, - wantMsg: "flavor nonexistent-flavor not found", + wantAssignment: Assignment{ + PodSets: []PodSetAssignment{{ + Name: "main", + Status: &Status{ + reasons: []string{"flavor nonexistent-flavor not found"}, + }, + }}, + }, }, } for name, tc := range cases { @@ -832,14 +1076,9 @@ func TestAssignFlavors(t *testing.T) { }, }) tc.clusterQueue.UpdateWithFlavors(resourceFlavors) - assignment, status := AssignFlavors(log, wlInfo, resourceFlavors, &tc.clusterQueue) - if status.IsSuccess() != tc.wantFits { - t.Errorf("e.assignFlavors(_)=%t, want %t", status.IsSuccess(), tc.wantFits) - } - if !tc.wantFits { - if len(tc.wantMsg) == 0 || !strings.Contains(status.Message(), tc.wantMsg) { - t.Errorf("got msg:\n%s\nwant msg containing:\n%s", status.Message(), tc.wantMsg) - } + assignment := AssignFlavors(log, wlInfo, resourceFlavors, &tc.clusterQueue) + if repMode := assignment.RepresentativeMode(); repMode != tc.wantRepMode { + t.Errorf("e.assignFlavors(_).RepresentativeMode()=%s, want %s", repMode, tc.wantRepMode) } if diff := cmp.Diff(tc.wantAssignment, assignment, cmpopts.IgnoreUnexported(Assignment{}, FlavorAssignment{})); diff != "" { t.Errorf("Unexpected assignment (-want,+got):\n%s", diff) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 83f9ca2dc1..f31eddd9d6 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -112,24 +112,29 @@ func (s *Scheduler) schedule(ctx context.Context) { usedCohorts := sets.NewString() for i := range entries { e := &entries[i] - if e.status != nominated { + if e.assignment.RepresentativeMode() == flavorassigner.NoFit { continue } c := snapshot.ClusterQueues[e.ClusterQueue] if e.assignment.Borrows() && c.Cohort != nil && usedCohorts.Has(c.Cohort.Name) { e.status = skipped - e.inadmissibleMsg = "cohort used in this cycle" + e.inadmissibleMsg = "workloads in the cohort that don't require borrowing were prioritized and admitted first" continue } - log := log.WithValues("workload", klog.KObj(e.Obj), "clusterQueue", klog.KRef("", e.ClusterQueue)) - if err := s.admit(ctrl.LoggerInto(ctx, log), e); err != nil { - e.inadmissibleMsg = fmt.Sprintf("Failed to admit workload: %v", err) - } // Even if there was a failure, we shouldn't admit other workloads to this // cohort. if c.Cohort != nil { usedCohorts.Insert(c.Cohort.Name) } + if e.assignment.RepresentativeMode() != flavorassigner.Fit { + // TODO(#43): Implement preemption. + continue + } + e.status = nominated + log := log.WithValues("workload", klog.KObj(e.Obj), "clusterQueue", klog.KRef("", e.ClusterQueue)) + if err := s.admit(ctrl.LoggerInto(ctx, log), e); err != nil { + e.inadmissibleMsg = fmt.Sprintf("Failed to admit workload: %v", err) + } } // 6. Requeue the heads that were not scheduled. @@ -154,7 +159,7 @@ type entryStatus string const ( // indicates if the workload was nominated for admission. nominated entryStatus = "nominated" - // indicates if the workload was nominated but skipped in this cycle. + // indicates if the workload was skipped in this cycle. skipped entryStatus = "skipped" // indicates if the workload was assumed to have been admitted. assumed entryStatus = "assumed" @@ -192,11 +197,9 @@ func (s *Scheduler) nominate(ctx context.Context, workloads []workload.Info, sna } else if !cq.NamespaceSelector.Matches(labels.Set(ns.Labels)) { e.inadmissibleMsg = "Workload namespace doesn't match ClusterQueue selector" e.requeueReason = queue.RequeueReasonNamespaceMismatch - } else if assignment, status := flavorassigner.AssignFlavors(log, &e.Info, snap.ResourceFlavors, cq); !status.IsSuccess() { - e.inadmissibleMsg = api.TruncateEventMessage(status.Message()) } else { - e.assignment = *assignment - e.status = nominated + e.assignment = flavorassigner.AssignFlavors(log, &e.Info, snap.ResourceFlavors, cq) + e.inadmissibleMsg = api.TruncateEventMessage(e.assignment.Message()) } entries = append(entries, e) } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 20b62f993f..03e63e43a3 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -654,6 +654,34 @@ func TestSchedule(t *testing.T) { "eng-beta": sets.NewString("eng-beta/new"), }, }, + "cannot borrow if needs reclaim from cohort": { + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("can-reclaim", "eng-alpha"). + Queue("main"). + Request(corev1.ResourceCPU, "100"). + Obj(), + *utiltesting.MakeWorkload("needs-to-borrow", "eng-beta"). + Queue("main"). + Request(corev1.ResourceCPU, "1"). + Obj(), + *utiltesting.MakeWorkload("user-on-demand", "eng-beta"). + Request(corev1.ResourceCPU, "50"). + Admit(utiltesting.MakeAdmission("eng-beta").Flavor(corev1.ResourceCPU, "on-demand").Obj()). + Obj(), + *utiltesting.MakeWorkload("user-spot", "eng-beta"). + Request(corev1.ResourceCPU, "1"). + Admit(utiltesting.MakeAdmission("eng-beta").Flavor(corev1.ResourceCPU, "spot").Obj()). + Obj(), + }, + wantLeft: map[string]sets.String{ + "eng-alpha": sets.NewString("eng-alpha/can-reclaim"), + "eng-beta": sets.NewString("eng-beta/needs-to-borrow"), + }, + wantAssignments: map[string]kueue.Admission{ + "eng-beta/user-spot": *utiltesting.MakeAdmission("eng-beta").Flavor(corev1.ResourceCPU, "spot").Obj(), + "eng-beta/user-on-demand": *utiltesting.MakeAdmission("eng-beta").Flavor(corev1.ResourceCPU, "on-demand").Obj(), + }, + }, "cannot borrow resource not listed in clusterQueue": { workloads: []kueue.Workload{ { @@ -975,15 +1003,15 @@ func TestRequeueAndUpdate(t *testing.T) { w1 := utiltesting.MakeWorkload("w1", "ns1").Queue(q1.Name).Obj() cases := []struct { - name string - e entry - wantWorkloads map[string]sets.String - wantStatus kueue.WorkloadStatus + name string + e entry + wantWorkloads map[string]sets.String + wantInadmissible map[string]sets.String + wantStatus kueue.WorkloadStatus }{ { name: "workload didn't fit", e: entry{ - status: "", inadmissibleMsg: "didn't fit", }, wantStatus: kueue.WorkloadStatus{ @@ -996,6 +1024,9 @@ func TestRequeueAndUpdate(t *testing.T) { }, }, }, + wantInadmissible: map[string]sets.String{ + "cq": sets.NewString(workload.Key(w1)), + }, }, { name: "assumed", @@ -1075,6 +1106,11 @@ func TestRequeueAndUpdate(t *testing.T) { t.Errorf("Unexpected elements in the cluster queue (-want,+got):\n%s", diff) } + inadmissibleDump := qManager.DumpInadmissible() + if diff := cmp.Diff(tc.wantInadmissible, inadmissibleDump); diff != "" { + t.Errorf("Unexpected elements in the inadmissible stage of the cluster queue (-want,+got):\n%s", diff) + } + var updatedWl kueue.Workload if err := cl.Get(ctx, client.ObjectKeyFromObject(w1), &updatedWl); err != nil { t.Fatalf("Failed obtaining updated object: %v", err)