From 0b7cf9ee26e41dd6bb0aa95a596c0ba51c579cdd Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Fri, 1 Nov 2024 13:08:46 +0100 Subject: [PATCH 1/6] Exclude non-ready nodes from scheduling. --- pkg/cache/tas_cache_test.go | 161 ++++++++++++++++++ pkg/cache/tas_flavor.go | 21 ++- pkg/scheduler/scheduler_test.go | 12 ++ .../jobs/job/job_controller_test.go | 6 + test/integration/tas/tas_test.go | 42 +++++ 5 files changed, 236 insertions(+), 6 deletions(-) diff --git a/pkg/cache/tas_cache_test.go b/pkg/cache/tas_cache_test.go index 2b5a062d9f..4db40ce601 100644 --- a/pkg/cache/tas_cache_test.go +++ b/pkg/cache/tas_cache_test.go @@ -55,6 +55,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -71,6 +77,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -87,6 +99,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -103,6 +121,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -119,6 +143,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -135,6 +165,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("2"), corev1.ResourceMemory: resource.MustParse("4Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, } @@ -185,6 +221,12 @@ func TestFindTopologyAssignment(t *testing.T) { Allocatable: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("2"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -200,6 +242,12 @@ func TestFindTopologyAssignment(t *testing.T) { Allocatable: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("2"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -215,6 +263,12 @@ func TestFindTopologyAssignment(t *testing.T) { Allocatable: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("1"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -230,6 +284,12 @@ func TestFindTopologyAssignment(t *testing.T) { Allocatable: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("1"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -245,6 +305,12 @@ func TestFindTopologyAssignment(t *testing.T) { Allocatable: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("1"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -260,6 +326,12 @@ func TestFindTopologyAssignment(t *testing.T) { Allocatable: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("1"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, }, @@ -656,6 +728,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, }, @@ -698,6 +776,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, }, @@ -726,6 +810,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, }, @@ -768,6 +858,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, }, @@ -816,6 +912,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, }, @@ -850,6 +952,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, }, @@ -885,6 +993,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -899,6 +1013,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, }, @@ -927,6 +1047,47 @@ func TestFindTopologyAssignment(t *testing.T) { }, }, }, + "no assignment as node is not ready": { + nodes: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "b1-r1-x1", + Labels: map[string]string{ + "zone": "zone-a", + tasHostLabel: "x1", + }, + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionFalse, + }, + { + Type: corev1.NodeNetworkUnavailable, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + }, + request: kueue.PodSetTopologyRequest{ + Required: ptr.To(tasHostLabel), + }, + nodeLabels: map[string]string{ + "zone": "zone-a", + }, + levels: defaultOneLevel, + requests: resources.Requests{ + corev1.ResourceCPU: 1000, + }, + count: 1, + wantAssignment: nil, + }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { diff --git a/pkg/cache/tas_flavor.go b/pkg/cache/tas_flavor.go index 7f4d832fa5..b29a091c62 100644 --- a/pkg/cache/tas_flavor.go +++ b/pkg/cache/tas_flavor.go @@ -108,12 +108,21 @@ func (c *TASFlavorCache) snapshotForNodes(log logr.Logger, nodes []corev1.Node, snapshot := newTASFlavorSnapshot(log, c.Levels) nodeToDomain := make(map[string]utiltas.TopologyDomainID) for _, node := range nodes { - levelValues := utiltas.LevelValues(c.Levels, node.Labels) - capacity := resources.NewRequests(node.Status.Allocatable) - domainID := utiltas.DomainID(levelValues) - snapshot.levelValuesPerDomain[domainID] = levelValues - snapshot.addCapacity(domainID, capacity) - nodeToDomain[node.Name] = domainID + ready := false + for _, cond := range node.Status.Conditions { + // Only healthy and ready to accept pods nodes are considered for scheduling calculation + ready = (cond.Type == corev1.NodeReady && cond.Status == corev1.ConditionTrue) + } + if ready { + levelValues := utiltas.LevelValues(c.Levels, node.Labels) + capacity := resources.NewRequests(node.Status.Allocatable) + domainID := utiltas.DomainID(levelValues) + snapshot.levelValuesPerDomain[domainID] = levelValues + snapshot.addCapacity(domainID, capacity) + nodeToDomain[node.Name] = domainID + } else { + log.V(3).Info("Node was excluded from TAS Flavor snapshot", "nodeName", node.Name, "nodeStatusConditions", node.Status.Conditions) + } } snapshot.initialize() for domainID, usage := range c.usage { diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 36a4816cf6..f8842b6746 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -3881,6 +3881,12 @@ func TestScheduleForTAS(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, } @@ -4202,6 +4208,12 @@ func TestScheduleForTAS(t *testing.T) { Allocatable: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("1"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, }, diff --git a/test/integration/controller/jobs/job/job_controller_test.go b/test/integration/controller/jobs/job/job_controller_test.go index 0bf50b1fdc..db84c45070 100644 --- a/test/integration/controller/jobs/job/job_controller_test.go +++ b/test/integration/controller/jobs/job/job_controller_test.go @@ -2208,6 +2208,12 @@ var _ = ginkgo.Describe("Job controller when TopologyAwareScheduling enabled", g corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, } diff --git a/test/integration/tas/tas_test.go b/test/integration/tas/tas_test.go index 4f5af7efbc..e4fa7b61c1 100644 --- a/test/integration/tas/tas_test.go +++ b/test/integration/tas/tas_test.go @@ -230,6 +230,12 @@ var _ = ginkgo.Describe("Topology Aware Scheduling", ginkgo.Ordered, func() { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -246,6 +252,12 @@ var _ = ginkgo.Describe("Topology Aware Scheduling", ginkgo.Ordered, func() { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -262,6 +274,12 @@ var _ = ginkgo.Describe("Topology Aware Scheduling", ginkgo.Ordered, func() { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -278,6 +296,12 @@ var _ = ginkgo.Describe("Topology Aware Scheduling", ginkgo.Ordered, func() { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, } @@ -572,6 +596,12 @@ var _ = ginkgo.Describe("Topology Aware Scheduling", ginkgo.Ordered, func() { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, } @@ -621,6 +651,12 @@ var _ = ginkgo.Describe("Topology Aware Scheduling", ginkgo.Ordered, func() { Allocatable: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("5"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -635,6 +671,12 @@ var _ = ginkgo.Describe("Topology Aware Scheduling", ginkgo.Ordered, func() { Allocatable: corev1.ResourceList{ gpuResName: resource.MustParse("4"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, } From 85b9be4102126920f6a927deff98dde0e68ce6a5 Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Mon, 4 Nov 2024 09:25:15 +0100 Subject: [PATCH 2/6] Update after code review --- pkg/cache/tas_flavor.go | 22 +++++++++------------- pkg/util/tas/tas.go | 11 +++++++++++ 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/pkg/cache/tas_flavor.go b/pkg/cache/tas_flavor.go index b29a091c62..dea250ad0d 100644 --- a/pkg/cache/tas_flavor.go +++ b/pkg/cache/tas_flavor.go @@ -108,21 +108,17 @@ func (c *TASFlavorCache) snapshotForNodes(log logr.Logger, nodes []corev1.Node, snapshot := newTASFlavorSnapshot(log, c.Levels) nodeToDomain := make(map[string]utiltas.TopologyDomainID) for _, node := range nodes { - ready := false - for _, cond := range node.Status.Conditions { + if ready := utiltas.IsNodeStatusConditionTrue(node.Status.Conditions, corev1.NodeReady, corev1.ConditionTrue); !ready { // Only healthy and ready to accept pods nodes are considered for scheduling calculation - ready = (cond.Type == corev1.NodeReady && cond.Status == corev1.ConditionTrue) - } - if ready { - levelValues := utiltas.LevelValues(c.Levels, node.Labels) - capacity := resources.NewRequests(node.Status.Allocatable) - domainID := utiltas.DomainID(levelValues) - snapshot.levelValuesPerDomain[domainID] = levelValues - snapshot.addCapacity(domainID, capacity) - nodeToDomain[node.Name] = domainID - } else { - log.V(3).Info("Node was excluded from TAS Flavor snapshot", "nodeName", node.Name, "nodeStatusConditions", node.Status.Conditions) + log.V(3).Info("Node was excluded from TAS Flavor snapshot", "ready", ready) + continue } + levelValues := utiltas.LevelValues(c.Levels, node.Labels) + capacity := resources.NewRequests(node.Status.Allocatable) + domainID := utiltas.DomainID(levelValues) + snapshot.levelValuesPerDomain[domainID] = levelValues + snapshot.addCapacity(domainID, capacity) + nodeToDomain[node.Name] = domainID } snapshot.initialize() for domainID, usage := range c.usage { diff --git a/pkg/util/tas/tas.go b/pkg/util/tas/tas.go index 8f0bc994c9..50c44140f5 100644 --- a/pkg/util/tas/tas.go +++ b/pkg/util/tas/tas.go @@ -19,6 +19,8 @@ package tas import ( "strings" + corev1 "k8s.io/api/core/v1" + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" ) @@ -51,3 +53,12 @@ func Levels(topology *kueuealpha.Topology) []string { } return result } + +func IsNodeStatusConditionTrue(conditions []corev1.NodeCondition, conditionType corev1.NodeConditionType, conditionStatus corev1.ConditionStatus) bool { + for _, cond := range conditions { + if cond.Type == conditionType { + return cond.Status == conditionStatus + } + } + return false +} From ec294ca8746d619f1439f0882f00295bdf6518d6 Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Mon, 4 Nov 2024 13:54:59 +0100 Subject: [PATCH 3/6] Move TAS indexer to subpackage --- cmd/kueue/main.go | 3 +- pkg/controller/tas/indexer.go | 44 ------------- pkg/controller/tas/indexer/indexer.go | 70 +++++++++++++++++++++ pkg/controller/tas/topology_ungater.go | 3 +- pkg/controller/tas/topology_ungater_test.go | 3 +- pkg/scheduler/scheduler_test.go | 2 + 6 files changed, 78 insertions(+), 47 deletions(-) delete mode 100644 pkg/controller/tas/indexer.go create mode 100644 pkg/controller/tas/indexer/indexer.go diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go index 69c47ca51d..483dfca644 100644 --- a/cmd/kueue/main.go +++ b/cmd/kueue/main.go @@ -52,6 +52,7 @@ import ( "sigs.k8s.io/kueue/pkg/controller/core/indexer" "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/controller/tas" + tasindexer "sigs.k8s.io/kueue/pkg/controller/tas/indexer" "sigs.k8s.io/kueue/pkg/debugger" "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/metrics" @@ -218,7 +219,7 @@ func setupIndexes(ctx context.Context, mgr ctrl.Manager, cfg *configapi.Configur } if features.Enabled(features.TopologyAwareScheduling) { - if err := tas.SetupIndexes(ctx, mgr.GetFieldIndexer()); err != nil { + if err := tasindexer.SetupIndexes(ctx, mgr.GetFieldIndexer()); err != nil { setupLog.Error(err, "Could not setup TAS indexer") os.Exit(1) } diff --git a/pkg/controller/tas/indexer.go b/pkg/controller/tas/indexer.go deleted file mode 100644 index 0bf6ad1159..0000000000 --- a/pkg/controller/tas/indexer.go +++ /dev/null @@ -1,44 +0,0 @@ -/* -Copyright 2024 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package tas - -import ( - "context" - - corev1 "k8s.io/api/core/v1" - "sigs.k8s.io/controller-runtime/pkg/client" - - kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" -) - -const ( - workloadNameKey = "metadata.workload" -) - -func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error { - return indexer.IndexField(ctx, &corev1.Pod{}, workloadNameKey, func(o client.Object) []string { - pod, ok := o.(*corev1.Pod) - if !ok { - return nil - } - value, found := pod.Annotations[kueuealpha.WorkloadAnnotation] - if !found { - return nil - } - return []string{value} - }) -} diff --git a/pkg/controller/tas/indexer/indexer.go b/pkg/controller/tas/indexer/indexer.go new file mode 100644 index 0000000000..1efbd5bafa --- /dev/null +++ b/pkg/controller/tas/indexer/indexer.go @@ -0,0 +1,70 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package indexer + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" + utiltas "sigs.k8s.io/kueue/pkg/util/tas" +) + +const ( + WorkloadNameKey = "metadata.workload" + ReadyNode = "metadata.ready" +) + +func indexPodWorkload(o client.Object) []string { + pod, ok := o.(*corev1.Pod) + if !ok { + return nil + } + value, found := pod.Annotations[kueuealpha.WorkloadAnnotation] + if !found { + return nil + } + return []string{value} +} + +func indexReadyNode(o client.Object) []string { + node, ok := o.(*corev1.Node) + if !ok || len(node.Status.Conditions) == 0 { + return nil + } + + if !utiltas.IsNodeStatusConditionTrue(node.Status.Conditions, corev1.NodeReady, corev1.ConditionTrue) { + return nil + } + + return []string{"true"} +} + +func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error { + if err := indexer.IndexField(ctx, &corev1.Pod{}, WorkloadNameKey, indexPodWorkload); err != nil { + return fmt.Errorf("setting index pod workload: %w", err) + } + + if err := indexer.IndexField(ctx, &corev1.Node{}, ReadyNode, indexReadyNode); err != nil { + return fmt.Errorf("setting index node ready: %w", err) + } + + return nil +} diff --git a/pkg/controller/tas/topology_ungater.go b/pkg/controller/tas/topology_ungater.go index aaac87e181..78c162db48 100644 --- a/pkg/controller/tas/topology_ungater.go +++ b/pkg/controller/tas/topology_ungater.go @@ -40,6 +40,7 @@ import ( kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/controller/core" + "sigs.k8s.io/kueue/pkg/controller/tas/indexer" utilclient "sigs.k8s.io/kueue/pkg/util/client" "sigs.k8s.io/kueue/pkg/util/expectations" "sigs.k8s.io/kueue/pkg/util/parallelize" @@ -311,7 +312,7 @@ func (r *topologyUngater) podsForDomain(ctx context.Context, ns, wlName, psName if err := r.client.List(ctx, &pods, client.InNamespace(ns), client.MatchingLabels{ kueuealpha.PodSetLabel: psName, }, client.MatchingFields{ - workloadNameKey: wlName, + indexer.WorkloadNameKey: wlName, }); err != nil { return nil, err } diff --git a/pkg/controller/tas/topology_ungater_test.go b/pkg/controller/tas/topology_ungater_test.go index 4683ebdfe1..93be3c1a75 100644 --- a/pkg/controller/tas/topology_ungater_test.go +++ b/pkg/controller/tas/topology_ungater_test.go @@ -38,6 +38,7 @@ import ( _ "sigs.k8s.io/kueue/pkg/controller/jobs/job" _ "sigs.k8s.io/kueue/pkg/controller/jobs/raycluster" + "sigs.k8s.io/kueue/pkg/controller/tas/indexer" ) const ( @@ -875,7 +876,7 @@ func TestReconcile(t *testing.T) { t.Run(name, func(t *testing.T) { ctx, log := utiltesting.ContextWithLog(t) clientBuilder := utiltesting.NewClientBuilder().WithInterceptorFuncs(interceptor.Funcs{SubResourcePatch: utiltesting.TreatSSAAsStrategicMerge}) - if err := SetupIndexes(ctx, utiltesting.AsIndexer(clientBuilder)); err != nil { + if err := indexer.SetupIndexes(ctx, utiltesting.AsIndexer(clientBuilder)); err != nil { t.Fatalf("Could not setup indexes: %v", err) } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index f8842b6746..f5e6733ec2 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -45,6 +45,7 @@ import ( kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/constants" + tasindexer "sigs.k8s.io/kueue/pkg/controller/tas/indexer" "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/metrics" "sigs.k8s.io/kueue/pkg/queue" @@ -4492,6 +4493,7 @@ func TestScheduleForTAS(t *testing.T) { WithObjects( &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}}, ) + _ = tasindexer.SetupIndexes(ctx, utiltesting.AsIndexer(clientBuilder)) cl := clientBuilder.Build() recorder := &utiltesting.EventRecorder{} cqCache := cache.New(cl) From a70c16c5c4574f6a592e5ed1238a22e30393d7d6 Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Mon, 4 Nov 2024 13:55:53 +0100 Subject: [PATCH 4/6] Use TAS indexer to filter out non ready nodes --- pkg/cache/tas_cache_test.go | 2 ++ pkg/cache/tas_flavor.go | 8 ++------ 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/pkg/cache/tas_cache_test.go b/pkg/cache/tas_cache_test.go index 4db40ce601..b1e7edfaab 100644 --- a/pkg/cache/tas_cache_test.go +++ b/pkg/cache/tas_cache_test.go @@ -31,6 +31,7 @@ import ( "sigs.k8s.io/kueue/pkg/resources" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" + tasindexer "sigs.k8s.io/kueue/pkg/controller/tas/indexer" ) func TestFindTopologyAssignment(t *testing.T) { @@ -1102,6 +1103,7 @@ func TestFindTopologyAssignment(t *testing.T) { } clientBuilder := utiltesting.NewClientBuilder() clientBuilder.WithObjects(initialObjects...) + _ = tasindexer.SetupIndexes(ctx, utiltesting.AsIndexer(clientBuilder)) client := clientBuilder.Build() tasCache := NewTASCache(client) diff --git a/pkg/cache/tas_flavor.go b/pkg/cache/tas_flavor.go index dea250ad0d..7dcbc07d00 100644 --- a/pkg/cache/tas_flavor.go +++ b/pkg/cache/tas_flavor.go @@ -31,6 +31,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" + "sigs.k8s.io/kueue/pkg/controller/tas/indexer" "sigs.k8s.io/kueue/pkg/resources" "sigs.k8s.io/kueue/pkg/util/limitrange" utiltas "sigs.k8s.io/kueue/pkg/util/tas" @@ -80,7 +81,7 @@ func (c *TASFlavorCache) snapshot(ctx context.Context) (*TASFlavorSnapshot, erro } requiredLabelKeys := client.HasLabels{} requiredLabelKeys = append(requiredLabelKeys, c.Levels...) - err := c.client.List(ctx, nodes, requiredLabels, requiredLabelKeys) + err := c.client.List(ctx, nodes, requiredLabels, requiredLabelKeys, client.MatchingFields{indexer.ReadyNode: "true"}) if err != nil { return nil, fmt.Errorf("failed to list nodes for TAS: %w", err) } @@ -108,11 +109,6 @@ func (c *TASFlavorCache) snapshotForNodes(log logr.Logger, nodes []corev1.Node, snapshot := newTASFlavorSnapshot(log, c.Levels) nodeToDomain := make(map[string]utiltas.TopologyDomainID) for _, node := range nodes { - if ready := utiltas.IsNodeStatusConditionTrue(node.Status.Conditions, corev1.NodeReady, corev1.ConditionTrue); !ready { - // Only healthy and ready to accept pods nodes are considered for scheduling calculation - log.V(3).Info("Node was excluded from TAS Flavor snapshot", "ready", ready) - continue - } levelValues := utiltas.LevelValues(c.Levels, node.Labels) capacity := resources.NewRequests(node.Status.Allocatable) domainID := utiltas.DomainID(levelValues) From 8b2f0b1ec4b66e9269dfc94bc623a531b23916fc Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Mon, 4 Nov 2024 14:22:54 +0100 Subject: [PATCH 5/6] Update integration tests --- test/integration/controller/jobs/job/suite_test.go | 4 ++++ .../controller/jobs/jobset/jobset_controller_test.go | 6 ++++++ test/integration/controller/jobs/jobset/suite_test.go | 4 ++++ test/integration/controller/jobs/pod/pod_controller_test.go | 6 ++++++ test/integration/controller/jobs/pod/suite_test.go | 4 ++++ test/integration/tas/suite_test.go | 4 ++++ 6 files changed, 28 insertions(+) diff --git a/test/integration/controller/jobs/job/suite_test.go b/test/integration/controller/jobs/job/suite_test.go index 0dff9d4d26..6c455a5b2e 100644 --- a/test/integration/controller/jobs/job/suite_test.go +++ b/test/integration/controller/jobs/job/suite_test.go @@ -35,6 +35,7 @@ import ( "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/controller/jobs/job" "sigs.k8s.io/kueue/pkg/controller/tas" + tasindexer "sigs.k8s.io/kueue/pkg/controller/tas/indexer" "sigs.k8s.io/kueue/pkg/queue" "sigs.k8s.io/kueue/pkg/scheduler" "sigs.k8s.io/kueue/test/integration/framework" @@ -108,6 +109,9 @@ func managerAndControllersSetup( if setupTASControllers { failedCtrl, err = tas.SetupControllers(mgr, queues, cCache, configuration) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "TAS controller", failedCtrl) + + err = tasindexer.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) } if enableScheduler { diff --git a/test/integration/controller/jobs/jobset/jobset_controller_test.go b/test/integration/controller/jobs/jobset/jobset_controller_test.go index 0e92b9580f..aa44ca5055 100644 --- a/test/integration/controller/jobs/jobset/jobset_controller_test.go +++ b/test/integration/controller/jobs/jobset/jobset_controller_test.go @@ -1164,6 +1164,12 @@ var _ = ginkgo.Describe("JobSet controller when TopologyAwareScheduling enabled" corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, } diff --git a/test/integration/controller/jobs/jobset/suite_test.go b/test/integration/controller/jobs/jobset/suite_test.go index 8b54fe850d..2ba01e5c68 100644 --- a/test/integration/controller/jobs/jobset/suite_test.go +++ b/test/integration/controller/jobs/jobset/suite_test.go @@ -34,6 +34,7 @@ import ( "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/controller/jobs/jobset" "sigs.k8s.io/kueue/pkg/controller/tas" + tasindexer "sigs.k8s.io/kueue/pkg/controller/tas/indexer" "sigs.k8s.io/kueue/pkg/queue" "sigs.k8s.io/kueue/pkg/scheduler" "sigs.k8s.io/kueue/test/integration/framework" @@ -100,6 +101,9 @@ func managerAndSchedulerSetup(setupTASControllers bool, opts ...jobframework.Opt if setupTASControllers { failedCtrl, err = tas.SetupControllers(mgr, queues, cCache, configuration) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "TAS controller", failedCtrl) + + err = tasindexer.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) } sched := scheduler.New(queues, cCache, mgr.GetClient(), mgr.GetEventRecorderFor(constants.AdmissionName)) diff --git a/test/integration/controller/jobs/pod/pod_controller_test.go b/test/integration/controller/jobs/pod/pod_controller_test.go index 32f7a47733..cc0d01d372 100644 --- a/test/integration/controller/jobs/pod/pod_controller_test.go +++ b/test/integration/controller/jobs/pod/pod_controller_test.go @@ -1915,6 +1915,12 @@ var _ = ginkgo.Describe("Pod controller when TopologyAwareScheduling enabled", g corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, } diff --git a/test/integration/controller/jobs/pod/suite_test.go b/test/integration/controller/jobs/pod/suite_test.go index 76d14ac1c5..70aba2ffa6 100644 --- a/test/integration/controller/jobs/pod/suite_test.go +++ b/test/integration/controller/jobs/pod/suite_test.go @@ -36,6 +36,7 @@ import ( "sigs.k8s.io/kueue/pkg/controller/jobs/job" "sigs.k8s.io/kueue/pkg/controller/jobs/pod" "sigs.k8s.io/kueue/pkg/controller/tas" + tasindexer "sigs.k8s.io/kueue/pkg/controller/tas/indexer" "sigs.k8s.io/kueue/pkg/queue" "sigs.k8s.io/kueue/pkg/scheduler" "sigs.k8s.io/kueue/pkg/util/kubeversion" @@ -132,6 +133,9 @@ func managerSetup( if setupTASControllers { failedCtrl, err = tas.SetupControllers(mgr, queues, cCache, configuration) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "TAS controller", failedCtrl) + + err = tasindexer.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) } if enableScheduler { diff --git a/test/integration/tas/suite_test.go b/test/integration/tas/suite_test.go index afff6d6c2e..d61909fff4 100644 --- a/test/integration/tas/suite_test.go +++ b/test/integration/tas/suite_test.go @@ -33,6 +33,7 @@ import ( "sigs.k8s.io/kueue/pkg/controller/core" "sigs.k8s.io/kueue/pkg/controller/core/indexer" "sigs.k8s.io/kueue/pkg/controller/tas" + tasindexer "sigs.k8s.io/kueue/pkg/controller/tas/indexer" "sigs.k8s.io/kueue/pkg/queue" "sigs.k8s.io/kueue/pkg/scheduler" "sigs.k8s.io/kueue/pkg/webhooks" @@ -86,6 +87,9 @@ func managerSetup(ctx context.Context, mgr manager.Manager) { failedCtrl, err = tas.SetupControllers(mgr, queues, cCache, controllersCfg) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "TAS controller", failedCtrl) + err = tasindexer.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + sched := scheduler.New(queues, cCache, mgr.GetClient(), mgr.GetEventRecorderFor(constants.AdmissionName)) err = sched.Start(ctx) gomega.Expect(err).NotTo(gomega.HaveOccurred()) From dc4e2ef80122dda31b1221a653a5aa09f3981de0 Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Mon, 4 Nov 2024 14:46:34 +0100 Subject: [PATCH 6/6] Update to match verify --- pkg/cache/tas_cache_test.go | 2 +- pkg/controller/tas/topology_ungater_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/cache/tas_cache_test.go b/pkg/cache/tas_cache_test.go index b1e7edfaab..a16035ea30 100644 --- a/pkg/cache/tas_cache_test.go +++ b/pkg/cache/tas_cache_test.go @@ -28,10 +28,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + tasindexer "sigs.k8s.io/kueue/pkg/controller/tas/indexer" "sigs.k8s.io/kueue/pkg/resources" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" - tasindexer "sigs.k8s.io/kueue/pkg/controller/tas/indexer" ) func TestFindTopologyAssignment(t *testing.T) { diff --git a/pkg/controller/tas/topology_ungater_test.go b/pkg/controller/tas/topology_ungater_test.go index 93be3c1a75..2ce2c0adbb 100644 --- a/pkg/controller/tas/topology_ungater_test.go +++ b/pkg/controller/tas/topology_ungater_test.go @@ -32,13 +32,13 @@ import ( kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/controller/tas/indexer" utilpod "sigs.k8s.io/kueue/pkg/util/pod" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" _ "sigs.k8s.io/kueue/pkg/controller/jobs/job" _ "sigs.k8s.io/kueue/pkg/controller/jobs/raycluster" - "sigs.k8s.io/kueue/pkg/controller/tas/indexer" ) const (