diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go index 69c47ca51d..483dfca644 100644 --- a/cmd/kueue/main.go +++ b/cmd/kueue/main.go @@ -52,6 +52,7 @@ import ( "sigs.k8s.io/kueue/pkg/controller/core/indexer" "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/controller/tas" + tasindexer "sigs.k8s.io/kueue/pkg/controller/tas/indexer" "sigs.k8s.io/kueue/pkg/debugger" "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/metrics" @@ -218,7 +219,7 @@ func setupIndexes(ctx context.Context, mgr ctrl.Manager, cfg *configapi.Configur } if features.Enabled(features.TopologyAwareScheduling) { - if err := tas.SetupIndexes(ctx, mgr.GetFieldIndexer()); err != nil { + if err := tasindexer.SetupIndexes(ctx, mgr.GetFieldIndexer()); err != nil { setupLog.Error(err, "Could not setup TAS indexer") os.Exit(1) } diff --git a/pkg/cache/tas_cache_test.go b/pkg/cache/tas_cache_test.go index 2b5a062d9f..a16035ea30 100644 --- a/pkg/cache/tas_cache_test.go +++ b/pkg/cache/tas_cache_test.go @@ -28,6 +28,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + tasindexer "sigs.k8s.io/kueue/pkg/controller/tas/indexer" "sigs.k8s.io/kueue/pkg/resources" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" @@ -55,6 +56,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -71,6 +78,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -87,6 +100,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -103,6 +122,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -119,6 +144,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -135,6 +166,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("2"), corev1.ResourceMemory: resource.MustParse("4Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, } @@ -185,6 +222,12 @@ func TestFindTopologyAssignment(t *testing.T) { Allocatable: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("2"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -200,6 +243,12 @@ func TestFindTopologyAssignment(t *testing.T) { Allocatable: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("2"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -215,6 +264,12 @@ func TestFindTopologyAssignment(t *testing.T) { Allocatable: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("1"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -230,6 +285,12 @@ func TestFindTopologyAssignment(t *testing.T) { Allocatable: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("1"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -245,6 +306,12 @@ func TestFindTopologyAssignment(t *testing.T) { Allocatable: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("1"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -260,6 +327,12 @@ func TestFindTopologyAssignment(t *testing.T) { Allocatable: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("1"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, }, @@ -656,6 +729,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, }, @@ -698,6 +777,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, }, @@ -726,6 +811,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, }, @@ -768,6 +859,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, }, @@ -816,6 +913,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, }, @@ -850,6 +953,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, }, @@ -885,6 +994,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -899,6 +1014,12 @@ func TestFindTopologyAssignment(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, }, @@ -927,6 +1048,47 @@ func TestFindTopologyAssignment(t *testing.T) { }, }, }, + "no assignment as node is not ready": { + nodes: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "b1-r1-x1", + Labels: map[string]string{ + "zone": "zone-a", + tasHostLabel: "x1", + }, + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionFalse, + }, + { + Type: corev1.NodeNetworkUnavailable, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + }, + request: kueue.PodSetTopologyRequest{ + Required: ptr.To(tasHostLabel), + }, + nodeLabels: map[string]string{ + "zone": "zone-a", + }, + levels: defaultOneLevel, + requests: resources.Requests{ + corev1.ResourceCPU: 1000, + }, + count: 1, + wantAssignment: nil, + }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { @@ -941,6 +1103,7 @@ func TestFindTopologyAssignment(t *testing.T) { } clientBuilder := utiltesting.NewClientBuilder() clientBuilder.WithObjects(initialObjects...) + _ = tasindexer.SetupIndexes(ctx, utiltesting.AsIndexer(clientBuilder)) client := clientBuilder.Build() tasCache := NewTASCache(client) diff --git a/pkg/cache/tas_flavor.go b/pkg/cache/tas_flavor.go index 7f4d832fa5..7dcbc07d00 100644 --- a/pkg/cache/tas_flavor.go +++ b/pkg/cache/tas_flavor.go @@ -31,6 +31,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" + "sigs.k8s.io/kueue/pkg/controller/tas/indexer" "sigs.k8s.io/kueue/pkg/resources" "sigs.k8s.io/kueue/pkg/util/limitrange" utiltas "sigs.k8s.io/kueue/pkg/util/tas" @@ -80,7 +81,7 @@ func (c *TASFlavorCache) snapshot(ctx context.Context) (*TASFlavorSnapshot, erro } requiredLabelKeys := client.HasLabels{} requiredLabelKeys = append(requiredLabelKeys, c.Levels...) - err := c.client.List(ctx, nodes, requiredLabels, requiredLabelKeys) + err := c.client.List(ctx, nodes, requiredLabels, requiredLabelKeys, client.MatchingFields{indexer.ReadyNode: "true"}) if err != nil { return nil, fmt.Errorf("failed to list nodes for TAS: %w", err) } diff --git a/pkg/controller/tas/indexer.go b/pkg/controller/tas/indexer.go deleted file mode 100644 index 0bf6ad1159..0000000000 --- a/pkg/controller/tas/indexer.go +++ /dev/null @@ -1,44 +0,0 @@ -/* -Copyright 2024 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package tas - -import ( - "context" - - corev1 "k8s.io/api/core/v1" - "sigs.k8s.io/controller-runtime/pkg/client" - - kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" -) - -const ( - workloadNameKey = "metadata.workload" -) - -func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error { - return indexer.IndexField(ctx, &corev1.Pod{}, workloadNameKey, func(o client.Object) []string { - pod, ok := o.(*corev1.Pod) - if !ok { - return nil - } - value, found := pod.Annotations[kueuealpha.WorkloadAnnotation] - if !found { - return nil - } - return []string{value} - }) -} diff --git a/pkg/controller/tas/indexer/indexer.go b/pkg/controller/tas/indexer/indexer.go new file mode 100644 index 0000000000..1efbd5bafa --- /dev/null +++ b/pkg/controller/tas/indexer/indexer.go @@ -0,0 +1,70 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package indexer + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" + utiltas "sigs.k8s.io/kueue/pkg/util/tas" +) + +const ( + WorkloadNameKey = "metadata.workload" + ReadyNode = "metadata.ready" +) + +func indexPodWorkload(o client.Object) []string { + pod, ok := o.(*corev1.Pod) + if !ok { + return nil + } + value, found := pod.Annotations[kueuealpha.WorkloadAnnotation] + if !found { + return nil + } + return []string{value} +} + +func indexReadyNode(o client.Object) []string { + node, ok := o.(*corev1.Node) + if !ok || len(node.Status.Conditions) == 0 { + return nil + } + + if !utiltas.IsNodeStatusConditionTrue(node.Status.Conditions, corev1.NodeReady, corev1.ConditionTrue) { + return nil + } + + return []string{"true"} +} + +func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error { + if err := indexer.IndexField(ctx, &corev1.Pod{}, WorkloadNameKey, indexPodWorkload); err != nil { + return fmt.Errorf("setting index pod workload: %w", err) + } + + if err := indexer.IndexField(ctx, &corev1.Node{}, ReadyNode, indexReadyNode); err != nil { + return fmt.Errorf("setting index node ready: %w", err) + } + + return nil +} diff --git a/pkg/controller/tas/topology_ungater.go b/pkg/controller/tas/topology_ungater.go index aaac87e181..78c162db48 100644 --- a/pkg/controller/tas/topology_ungater.go +++ b/pkg/controller/tas/topology_ungater.go @@ -40,6 +40,7 @@ import ( kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/controller/core" + "sigs.k8s.io/kueue/pkg/controller/tas/indexer" utilclient "sigs.k8s.io/kueue/pkg/util/client" "sigs.k8s.io/kueue/pkg/util/expectations" "sigs.k8s.io/kueue/pkg/util/parallelize" @@ -311,7 +312,7 @@ func (r *topologyUngater) podsForDomain(ctx context.Context, ns, wlName, psName if err := r.client.List(ctx, &pods, client.InNamespace(ns), client.MatchingLabels{ kueuealpha.PodSetLabel: psName, }, client.MatchingFields{ - workloadNameKey: wlName, + indexer.WorkloadNameKey: wlName, }); err != nil { return nil, err } diff --git a/pkg/controller/tas/topology_ungater_test.go b/pkg/controller/tas/topology_ungater_test.go index 4683ebdfe1..2ce2c0adbb 100644 --- a/pkg/controller/tas/topology_ungater_test.go +++ b/pkg/controller/tas/topology_ungater_test.go @@ -32,6 +32,7 @@ import ( kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/controller/tas/indexer" utilpod "sigs.k8s.io/kueue/pkg/util/pod" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" @@ -875,7 +876,7 @@ func TestReconcile(t *testing.T) { t.Run(name, func(t *testing.T) { ctx, log := utiltesting.ContextWithLog(t) clientBuilder := utiltesting.NewClientBuilder().WithInterceptorFuncs(interceptor.Funcs{SubResourcePatch: utiltesting.TreatSSAAsStrategicMerge}) - if err := SetupIndexes(ctx, utiltesting.AsIndexer(clientBuilder)); err != nil { + if err := indexer.SetupIndexes(ctx, utiltesting.AsIndexer(clientBuilder)); err != nil { t.Fatalf("Could not setup indexes: %v", err) } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 36a4816cf6..f5e6733ec2 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -45,6 +45,7 @@ import ( kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/constants" + tasindexer "sigs.k8s.io/kueue/pkg/controller/tas/indexer" "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/metrics" "sigs.k8s.io/kueue/pkg/queue" @@ -3881,6 +3882,12 @@ func TestScheduleForTAS(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, } @@ -4202,6 +4209,12 @@ func TestScheduleForTAS(t *testing.T) { Allocatable: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("1"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, }, @@ -4480,6 +4493,7 @@ func TestScheduleForTAS(t *testing.T) { WithObjects( &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}}, ) + _ = tasindexer.SetupIndexes(ctx, utiltesting.AsIndexer(clientBuilder)) cl := clientBuilder.Build() recorder := &utiltesting.EventRecorder{} cqCache := cache.New(cl) diff --git a/pkg/util/tas/tas.go b/pkg/util/tas/tas.go index 8f0bc994c9..50c44140f5 100644 --- a/pkg/util/tas/tas.go +++ b/pkg/util/tas/tas.go @@ -19,6 +19,8 @@ package tas import ( "strings" + corev1 "k8s.io/api/core/v1" + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" ) @@ -51,3 +53,12 @@ func Levels(topology *kueuealpha.Topology) []string { } return result } + +func IsNodeStatusConditionTrue(conditions []corev1.NodeCondition, conditionType corev1.NodeConditionType, conditionStatus corev1.ConditionStatus) bool { + for _, cond := range conditions { + if cond.Type == conditionType { + return cond.Status == conditionStatus + } + } + return false +} diff --git a/test/integration/controller/jobs/job/job_controller_test.go b/test/integration/controller/jobs/job/job_controller_test.go index 0bf50b1fdc..db84c45070 100644 --- a/test/integration/controller/jobs/job/job_controller_test.go +++ b/test/integration/controller/jobs/job/job_controller_test.go @@ -2208,6 +2208,12 @@ var _ = ginkgo.Describe("Job controller when TopologyAwareScheduling enabled", g corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, } diff --git a/test/integration/controller/jobs/job/suite_test.go b/test/integration/controller/jobs/job/suite_test.go index 0dff9d4d26..6c455a5b2e 100644 --- a/test/integration/controller/jobs/job/suite_test.go +++ b/test/integration/controller/jobs/job/suite_test.go @@ -35,6 +35,7 @@ import ( "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/controller/jobs/job" "sigs.k8s.io/kueue/pkg/controller/tas" + tasindexer "sigs.k8s.io/kueue/pkg/controller/tas/indexer" "sigs.k8s.io/kueue/pkg/queue" "sigs.k8s.io/kueue/pkg/scheduler" "sigs.k8s.io/kueue/test/integration/framework" @@ -108,6 +109,9 @@ func managerAndControllersSetup( if setupTASControllers { failedCtrl, err = tas.SetupControllers(mgr, queues, cCache, configuration) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "TAS controller", failedCtrl) + + err = tasindexer.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) } if enableScheduler { diff --git a/test/integration/controller/jobs/jobset/jobset_controller_test.go b/test/integration/controller/jobs/jobset/jobset_controller_test.go index 0e92b9580f..aa44ca5055 100644 --- a/test/integration/controller/jobs/jobset/jobset_controller_test.go +++ b/test/integration/controller/jobs/jobset/jobset_controller_test.go @@ -1164,6 +1164,12 @@ var _ = ginkgo.Describe("JobSet controller when TopologyAwareScheduling enabled" corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, } diff --git a/test/integration/controller/jobs/jobset/suite_test.go b/test/integration/controller/jobs/jobset/suite_test.go index 8b54fe850d..2ba01e5c68 100644 --- a/test/integration/controller/jobs/jobset/suite_test.go +++ b/test/integration/controller/jobs/jobset/suite_test.go @@ -34,6 +34,7 @@ import ( "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/controller/jobs/jobset" "sigs.k8s.io/kueue/pkg/controller/tas" + tasindexer "sigs.k8s.io/kueue/pkg/controller/tas/indexer" "sigs.k8s.io/kueue/pkg/queue" "sigs.k8s.io/kueue/pkg/scheduler" "sigs.k8s.io/kueue/test/integration/framework" @@ -100,6 +101,9 @@ func managerAndSchedulerSetup(setupTASControllers bool, opts ...jobframework.Opt if setupTASControllers { failedCtrl, err = tas.SetupControllers(mgr, queues, cCache, configuration) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "TAS controller", failedCtrl) + + err = tasindexer.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) } sched := scheduler.New(queues, cCache, mgr.GetClient(), mgr.GetEventRecorderFor(constants.AdmissionName)) diff --git a/test/integration/controller/jobs/pod/pod_controller_test.go b/test/integration/controller/jobs/pod/pod_controller_test.go index 32f7a47733..cc0d01d372 100644 --- a/test/integration/controller/jobs/pod/pod_controller_test.go +++ b/test/integration/controller/jobs/pod/pod_controller_test.go @@ -1915,6 +1915,12 @@ var _ = ginkgo.Describe("Pod controller when TopologyAwareScheduling enabled", g corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, } diff --git a/test/integration/controller/jobs/pod/suite_test.go b/test/integration/controller/jobs/pod/suite_test.go index 76d14ac1c5..70aba2ffa6 100644 --- a/test/integration/controller/jobs/pod/suite_test.go +++ b/test/integration/controller/jobs/pod/suite_test.go @@ -36,6 +36,7 @@ import ( "sigs.k8s.io/kueue/pkg/controller/jobs/job" "sigs.k8s.io/kueue/pkg/controller/jobs/pod" "sigs.k8s.io/kueue/pkg/controller/tas" + tasindexer "sigs.k8s.io/kueue/pkg/controller/tas/indexer" "sigs.k8s.io/kueue/pkg/queue" "sigs.k8s.io/kueue/pkg/scheduler" "sigs.k8s.io/kueue/pkg/util/kubeversion" @@ -132,6 +133,9 @@ func managerSetup( if setupTASControllers { failedCtrl, err = tas.SetupControllers(mgr, queues, cCache, configuration) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "TAS controller", failedCtrl) + + err = tasindexer.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) } if enableScheduler { diff --git a/test/integration/tas/suite_test.go b/test/integration/tas/suite_test.go index afff6d6c2e..d61909fff4 100644 --- a/test/integration/tas/suite_test.go +++ b/test/integration/tas/suite_test.go @@ -33,6 +33,7 @@ import ( "sigs.k8s.io/kueue/pkg/controller/core" "sigs.k8s.io/kueue/pkg/controller/core/indexer" "sigs.k8s.io/kueue/pkg/controller/tas" + tasindexer "sigs.k8s.io/kueue/pkg/controller/tas/indexer" "sigs.k8s.io/kueue/pkg/queue" "sigs.k8s.io/kueue/pkg/scheduler" "sigs.k8s.io/kueue/pkg/webhooks" @@ -86,6 +87,9 @@ func managerSetup(ctx context.Context, mgr manager.Manager) { failedCtrl, err = tas.SetupControllers(mgr, queues, cCache, controllersCfg) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "TAS controller", failedCtrl) + err = tasindexer.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + sched := scheduler.New(queues, cCache, mgr.GetClient(), mgr.GetEventRecorderFor(constants.AdmissionName)) err = sched.Start(ctx) gomega.Expect(err).NotTo(gomega.HaveOccurred()) diff --git a/test/integration/tas/tas_test.go b/test/integration/tas/tas_test.go index 4f5af7efbc..e4fa7b61c1 100644 --- a/test/integration/tas/tas_test.go +++ b/test/integration/tas/tas_test.go @@ -230,6 +230,12 @@ var _ = ginkgo.Describe("Topology Aware Scheduling", ginkgo.Ordered, func() { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -246,6 +252,12 @@ var _ = ginkgo.Describe("Topology Aware Scheduling", ginkgo.Ordered, func() { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -262,6 +274,12 @@ var _ = ginkgo.Describe("Topology Aware Scheduling", ginkgo.Ordered, func() { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -278,6 +296,12 @@ var _ = ginkgo.Describe("Topology Aware Scheduling", ginkgo.Ordered, func() { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, } @@ -572,6 +596,12 @@ var _ = ginkgo.Describe("Topology Aware Scheduling", ginkgo.Ordered, func() { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("1Gi"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, } @@ -621,6 +651,12 @@ var _ = ginkgo.Describe("Topology Aware Scheduling", ginkgo.Ordered, func() { Allocatable: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("5"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, { @@ -635,6 +671,12 @@ var _ = ginkgo.Describe("Topology Aware Scheduling", ginkgo.Ordered, func() { Allocatable: corev1.ResourceList{ gpuResName: resource.MustParse("4"), }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, }, }, }