Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep adjusted workload resources in sync with limitRanges and runtimeClasses #653

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions apis/kueue/v1beta1/workload_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,25 @@ type Admission struct {
// clusterQueue is the name of the ClusterQueue that admitted this workload.
ClusterQueue ClusterQueueReference `json:"clusterQueue"`

// podSetFlavors hold the admission results for each of the .spec.podSets entries.
// PodSetAssignments hold the admission results for each of the .spec.podSets entries.
// +listType=map
// +listMapKey=name
PodSetFlavors []PodSetFlavors `json:"podSetFlavors"`
PodSetAssignments []PodSetAssignment `json:"podSetAssignments"`
}

type PodSetFlavors struct {
type PodSetAssignment struct {
// Name is the name of the podSet. It should match one of the names in .spec.podSets.
// +kubebuilder:default=main
Name string `json:"name"`

// Flavors are the flavors assigned to the workload for each resource.
Flavors map[corev1.ResourceName]ResourceFlavorReference `json:"flavors,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are on time to change PodSetFlavors by something else, given that it's no longer just about flavors.

Maybe PodSetAssignments is a bit more generic?

And we could have a single list holding both the quantity and the flavor, instead of two maps. I'm not to sure about this one. How does the calculation of workload.Info change if we do this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the first part done.

For the second one ... , it will imply adding two addition conversion methods ... but it will look strange. Also since ResourceList is "standard", I'd say to keep it as is. (also the time that will take to re-re-work the tests might not be negligible )


// resourceUsage keeps track of the total resources all the pods in the podset need to run.
//
// Beside what is provided in podSet's specs, this calculation takes into account
// the LimitRange defaults and RuntimeClass overheads at the moment of admission.
ResourceUsage corev1.ResourceList `json:"resourceUsage,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make a PR that includes the field name changes without adding ResourceUsage?

I would like to merge that before cutting the release.

If this PR ends up taking longer, we can leave it to the next release.

}

type PodSet struct {
Expand Down
21 changes: 14 additions & 7 deletions apis/kueue/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions apis/kueue/webhooks/workload_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ func validateAdmission(obj *kueue.Workload, path *field.Path) field.ErrorList {
names.Insert(ps.Name)
}
psFlavorsPath := path.Child("podSetFlavors")
if names.Len() != len(admission.PodSetFlavors) {
if names.Len() != len(admission.PodSetAssignments) {
allErrs = append(allErrs, field.Invalid(psFlavorsPath, field.OmitValueType{}, "must have the same number of podSets as the spec"))
}

for i, ps := range admission.PodSetFlavors {
for i, ps := range admission.PodSetAssignments {
if !names.Has(ps.Name) {
allErrs = append(allErrs, field.NotFound(psFlavorsPath.Index(i).Child("name"), ps.Name))
}
Expand Down
6 changes: 3 additions & 3 deletions apis/kueue/webhooks/workload_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,13 @@ func TestValidateWorkloadUpdate(t *testing.T) {
"admission can be set": {
before: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).Obj(),
after: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).Admit(
testingutil.MakeAdmission("cluster-queue").Flavor("on-demand", "5").Obj(),
testingutil.MakeAdmission("cluster-queue").Assignment("on-demand", "5", "1").Obj(),
).Obj(),
wantErr: nil,
},
"admission can be unset": {
before: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).Admit(
testingutil.MakeAdmission("cluster-queue").Flavor("on-demand", "5").Obj(),
testingutil.MakeAdmission("cluster-queue").Assignment("on-demand", "5", "1").Obj(),
).Obj(),
after: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).Obj(),
wantErr: nil,
Expand All @@ -273,7 +273,7 @@ func TestValidateWorkloadUpdate(t *testing.T) {
testingutil.MakeAdmission("cluster-queue").Obj(),
).Obj(),
after: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).Admit(
testingutil.MakeAdmission("cluster-queue").Flavor("on-demand", "5").Obj(),
testingutil.MakeAdmission("cluster-queue").Assignment("on-demand", "5", "1").Obj(),
).Obj(),
wantErr: field.ErrorList{
field.Invalid(field.NewPath("status", "admission"), nil, ""),
Expand Down
21 changes: 17 additions & 4 deletions config/components/crd/bases/kueue.x-k8s.io_workloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8062,9 +8062,9 @@ spec:
description: clusterQueue is the name of the ClusterQueue that
admitted this workload.
type: string
podSetFlavors:
description: podSetFlavors hold the admission results for each
of the .spec.podSets entries.
podSetAssignments:
description: PodSetAssignments hold the admission results for
each of the .spec.podSets entries.
items:
properties:
flavors:
Expand All @@ -8080,6 +8080,19 @@ spec:
description: Name is the name of the podSet. It should match
one of the names in .spec.podSets.
type: string
resourceUsage:
additionalProperties:
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: "resourceUsage keeps track of the total resources
all the pods in the podset need to run. \n Beside what
is provided in podSet's specs, this calculation takes
into account the LimitRange defaults and RuntimeClass
overheads at the moment of admission."
type: object
required:
- name
type: object
Expand All @@ -8089,7 +8102,7 @@ spec:
x-kubernetes-list-type: map
required:
- clusterQueue
- podSetFlavors
- podSetAssignments
type: object
conditions:
description: "conditions hold the latest available observations of
Expand Down
54 changes: 27 additions & 27 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,24 +720,30 @@ func TestCacheWorkloadOperations(t *testing.T) {
Request(corev1.ResourceCPU, "5m").
Obj(),
}
podSetFlavors := []kueue.PodSetFlavors{
podSetFlavors := []kueue.PodSetAssignment{
{
Name: "driver",
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "on-demand",
},
ResourceUsage: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("10m"),
},
},
{
Name: "workers",
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "spot",
},
ResourceUsage: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("15m"),
},
},
}
cl := utiltesting.NewFakeClient(
utiltesting.MakeWorkload("a", "").PodSets(podSets...).Admit(&kueue.Admission{
ClusterQueue: "one",
PodSetFlavors: podSetFlavors,
ClusterQueue: "one",
PodSetAssignments: podSetFlavors,
}).Obj(),
utiltesting.MakeWorkload("b", "").Admit(&kueue.Admission{
ClusterQueue: "one",
Expand All @@ -763,8 +769,8 @@ func TestCacheWorkloadOperations(t *testing.T) {
operation: func(cache *Cache) error {
workloads := []*kueue.Workload{
utiltesting.MakeWorkload("a", "").PodSets(podSets...).Admit(&kueue.Admission{
ClusterQueue: "one",
PodSetFlavors: podSetFlavors,
ClusterQueue: "one",
PodSetAssignments: podSetFlavors,
}).Obj(),
utiltesting.MakeWorkload("d", "").Admit(&kueue.Admission{
ClusterQueue: "two",
Expand Down Expand Up @@ -857,8 +863,8 @@ func TestCacheWorkloadOperations(t *testing.T) {
ClusterQueue: "one",
}).Obj()
latest := utiltesting.MakeWorkload("a", "").PodSets(podSets...).Admit(&kueue.Admission{
ClusterQueue: "two",
PodSetFlavors: podSetFlavors,
ClusterQueue: "two",
PodSetAssignments: podSetFlavors,
}).Obj()
return cache.UpdateWorkload(old, latest)
},
Expand Down Expand Up @@ -1093,12 +1099,12 @@ func TestCacheWorkloadOperations(t *testing.T) {
operation: func(cache *Cache) error {
workloads := []*kueue.Workload{
utiltesting.MakeWorkload("d", "").PodSets(podSets...).Admit(&kueue.Admission{
ClusterQueue: "one",
PodSetFlavors: podSetFlavors,
ClusterQueue: "one",
PodSetAssignments: podSetFlavors,
}).Obj(),
utiltesting.MakeWorkload("e", "").PodSets(podSets...).Admit(&kueue.Admission{
ClusterQueue: "two",
PodSetFlavors: podSetFlavors,
ClusterQueue: "two",
PodSetAssignments: podSetFlavors,
}).Obj(),
}
for i := range workloads {
Expand Down Expand Up @@ -1164,12 +1170,12 @@ func TestCacheWorkloadOperations(t *testing.T) {
operation: func(cache *Cache) error {
workloads := []*kueue.Workload{
utiltesting.MakeWorkload("d", "").PodSets(podSets...).Admit(&kueue.Admission{
ClusterQueue: "one",
PodSetFlavors: podSetFlavors,
ClusterQueue: "one",
PodSetAssignments: podSetFlavors,
}).Obj(),
utiltesting.MakeWorkload("e", "").PodSets(podSets...).Admit(&kueue.Admission{
ClusterQueue: "two",
PodSetFlavors: podSetFlavors,
ClusterQueue: "two",
PodSetAssignments: podSetFlavors,
}).Obj(),
}
for i := range workloads {
Expand Down Expand Up @@ -1235,12 +1241,12 @@ func TestCacheWorkloadOperations(t *testing.T) {
operation: func(cache *Cache) error {
workloads := []*kueue.Workload{
utiltesting.MakeWorkload("d", "").PodSets(podSets...).Admit(&kueue.Admission{
ClusterQueue: "one",
PodSetFlavors: podSetFlavors,
ClusterQueue: "one",
PodSetAssignments: podSetFlavors,
}).Obj(),
utiltesting.MakeWorkload("e", "").PodSets(podSets...).Admit(&kueue.Admission{
ClusterQueue: "two",
PodSetFlavors: podSetFlavors,
ClusterQueue: "two",
PodSetAssignments: podSetFlavors,
}).Obj(),
}
for i := range workloads {
Expand Down Expand Up @@ -1327,18 +1333,12 @@ func TestClusterQueueUsage(t *testing.T) {
*utiltesting.MakeWorkload("one", "").
Request(corev1.ResourceCPU, "8").
Request("example.com/gpu", "5").
Admit(utiltesting.MakeAdmission("foo").
Flavor(corev1.ResourceCPU, "default").
Flavor("example.com/gpu", "model_a").
Obj()).
Admit(utiltesting.MakeAdmission("foo").Assignment(corev1.ResourceCPU, "default", "8000m").Assignment("example.com/gpu", "model_a", "5").Obj()).
Obj(),
*utiltesting.MakeWorkload("two", "").
Request(corev1.ResourceCPU, "5").
Request("example.com/gpu", "6").
Admit(utiltesting.MakeAdmission("foo").
Flavor(corev1.ResourceCPU, "default").
Flavor("example.com/gpu", "model_b").
Obj()).
Admit(utiltesting.MakeAdmission("foo").Assignment(corev1.ResourceCPU, "default", "5000m").Assignment("example.com/gpu", "model_b", "6").Obj()).
Obj(),
}
cases := map[string]struct {
Expand Down
33 changes: 11 additions & 22 deletions pkg/cache/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,28 +155,23 @@ func TestSnapshot(t *testing.T) {
utiltesting.MakeWorkload("alpha", "").
PodSets(*utiltesting.MakePodSet("main", 5).
Request(corev1.ResourceCPU, "2").Obj()).
Admit(utiltesting.MakeAdmission("a", "main").
Flavor(corev1.ResourceCPU, "demand").Obj()).
Admit(utiltesting.MakeAdmission("a", "main").Assignment(corev1.ResourceCPU, "demand", "10000m").Obj()).
Obj(),
utiltesting.MakeWorkload("beta", "").
PodSets(*utiltesting.MakePodSet("main", 5).
Request(corev1.ResourceCPU, "1").
Request("example.com/gpu", "2").
Obj(),
).
Admit(utiltesting.MakeAdmission("b", "main").
Flavor(corev1.ResourceCPU, "spot").
Flavor("example.com/gpu", "default").Obj()).
Admit(utiltesting.MakeAdmission("b", "main").Assignment(corev1.ResourceCPU, "spot", "5000m").Assignment("example.com/gpu", "default", "10").Obj()).
Obj(),
utiltesting.MakeWorkload("gamma", "").
PodSets(*utiltesting.MakePodSet("main", 5).
Request(corev1.ResourceCPU, "1").
Request("example.com/gpu", "1").
Obj(),
).
Admit(utiltesting.MakeAdmission("b", "main").
Flavor(corev1.ResourceCPU, "spot").
Flavor("example.com/gpu", "default").Obj()).
Admit(utiltesting.MakeAdmission("b", "main").Assignment(corev1.ResourceCPU, "spot", "5000m").Assignment("example.com/gpu", "default", "5").Obj()).
Obj(),
utiltesting.MakeWorkload("sigma", "").
PodSets(*utiltesting.MakePodSet("main", 5).
Expand Down Expand Up @@ -244,8 +239,7 @@ func TestSnapshot(t *testing.T) {
"/alpha": workload.NewInfo(utiltesting.MakeWorkload("alpha", "").
PodSets(*utiltesting.MakePodSet("main", 5).
Request(corev1.ResourceCPU, "2").Obj()).
Admit(utiltesting.MakeAdmission("a", "main").
Flavor(corev1.ResourceCPU, "demand").Obj()).
Admit(utiltesting.MakeAdmission("a", "main").Assignment(corev1.ResourceCPU, "demand", "10000m").Obj()).
Obj()),
},
Preemption: defaultPreemption,
Expand Down Expand Up @@ -290,20 +284,15 @@ func TestSnapshot(t *testing.T) {
Request(corev1.ResourceCPU, "1").
Request("example.com/gpu", "2").
Obj()).
Admit(utiltesting.MakeAdmission("b", "main").
Flavor(corev1.ResourceCPU, "spot").
Flavor("example.com/gpu", "default").
Obj()).
Admit(utiltesting.MakeAdmission("b", "main").Assignment(corev1.ResourceCPU, "spot", "5000m").Assignment("example.com/gpu", "default", "10").Obj()).
Obj()),
"/gamma": workload.NewInfo(utiltesting.MakeWorkload("gamma", "").
PodSets(*utiltesting.MakePodSet("main", 5).
Request(corev1.ResourceCPU, "1").
Request("example.com/gpu", "1").
Obj(),
).
Admit(utiltesting.MakeAdmission("b", "main").
Flavor(corev1.ResourceCPU, "spot").
Flavor("example.com/gpu", "default").Obj()).
Admit(utiltesting.MakeAdmission("b", "main").Assignment(corev1.ResourceCPU, "spot", "5000m").Assignment("example.com/gpu", "default", "5").Obj()).
Obj()),
},
Preemption: defaultPreemption,
Expand Down Expand Up @@ -427,23 +416,23 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) {
workloads := []kueue.Workload{
*utiltesting.MakeWorkload("c1-cpu", "").
Request(corev1.ResourceCPU, "1").
Admit(utiltesting.MakeAdmission("c1").Flavor(corev1.ResourceCPU, "default").Obj()).
Admit(utiltesting.MakeAdmission("c1").Assignment(corev1.ResourceCPU, "default", "1000m").Obj()).
Obj(),
*utiltesting.MakeWorkload("c1-memory-alpha", "").
Request(corev1.ResourceMemory, "1Gi").
Admit(utiltesting.MakeAdmission("c1").Flavor(corev1.ResourceMemory, "alpha").Obj()).
Admit(utiltesting.MakeAdmission("c1").Assignment(corev1.ResourceMemory, "alpha", "1Gi").Obj()).
Obj(),
*utiltesting.MakeWorkload("c1-memory-beta", "").
Request(corev1.ResourceMemory, "1Gi").
Admit(utiltesting.MakeAdmission("c1").Flavor(corev1.ResourceMemory, "beta").Obj()).
Admit(utiltesting.MakeAdmission("c1").Assignment(corev1.ResourceMemory, "beta", "1Gi").Obj()).
Obj(),
*utiltesting.MakeWorkload("c2-cpu-1", "").
Request(corev1.ResourceCPU, "1").
Admit(utiltesting.MakeAdmission("c2").Flavor(corev1.ResourceCPU, "default").Obj()).
Admit(utiltesting.MakeAdmission("c2").Assignment(corev1.ResourceCPU, "default", "1000m").Obj()).
Obj(),
*utiltesting.MakeWorkload("c2-cpu-2", "").
Request(corev1.ResourceCPU, "1").
Admit(utiltesting.MakeAdmission("c2").Flavor(corev1.ResourceCPU, "default").Obj()).
Admit(utiltesting.MakeAdmission("c2").Assignment(corev1.ResourceCPU, "default", "1000m").Obj()).
Obj(),
}

Expand Down
Loading