diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go index 219a36f9b7..4ca31bade1 100644 --- a/cmd/kueue/main.go +++ b/cmd/kueue/main.go @@ -19,7 +19,6 @@ package main import ( "context" "flag" - "fmt" "os" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) @@ -30,7 +29,6 @@ import ( "go.uber.org/zap/zapcore" corev1 "k8s.io/api/core/v1" schedulingv1 "k8s.io/api/scheduling/v1" - "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/validation/field" @@ -40,7 +38,6 @@ import ( clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -55,7 +52,6 @@ import ( "sigs.k8s.io/kueue/pkg/controller/core" "sigs.k8s.io/kueue/pkg/controller/core/indexer" "sigs.k8s.io/kueue/pkg/controller/jobframework" - "sigs.k8s.io/kueue/pkg/controller/jobs/noop" "sigs.k8s.io/kueue/pkg/debugger" "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/metrics" @@ -216,23 +212,16 @@ func setupIndexes(ctx context.Context, mgr ctrl.Manager, cfg *configapi.Configur } } - err = jobframework.ForEachIntegration(func(name string, cb jobframework.IntegrationCallbacks) error { - if isFrameworkEnabled(cfg, name) { - if err := cb.SetupIndexes(ctx, mgr.GetFieldIndexer()); err != nil { - return fmt.Errorf("integration %s: %w", name, err) - } - } - return nil - }) - return err + opts := []jobframework.Option{ + jobframework.WithEnabledFrameworks(cfg.Integrations), + } + return jobframework.SetupIndexes(ctx, mgr.GetFieldIndexer(), opts...) } func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager, certsReady chan struct{}, cfg *configapi.Configuration, serverVersionFetcher *kubeversion.ServerVersionFetcher) { // The controllers won't work until the webhooks are operating, and the webhook won't work until the // certs are all in place. - setupLog.Info("Waiting for certificate generation to complete") - <-certsReady - setupLog.Info("Certs ready") + cert.WaitForCertsReady(setupLog, certsReady) if failedCtrl, err := core.SetupControllers(mgr, queues, cCache, cfg); err != nil { setupLog.Error(err, "Unable to create controller", "controller", failedCtrl) @@ -261,61 +250,21 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag } } - manageJobsWithoutQueueName := cfg.ManageJobsWithoutQueueName - if failedWebhook, err := webhooks.Setup(mgr); err != nil { setupLog.Error(err, "Unable to create webhook", "webhook", failedWebhook) os.Exit(1) } opts := []jobframework.Option{ - jobframework.WithManageJobsWithoutQueueName(manageJobsWithoutQueueName), - jobframework.WithWaitForPodsReady(waitForPodsReady(cfg)), + jobframework.WithManageJobsWithoutQueueName(cfg.ManageJobsWithoutQueueName), + jobframework.WithWaitForPodsReady(cfg.WaitForPodsReady), jobframework.WithKubeServerVersion(serverVersionFetcher), jobframework.WithIntegrationOptions(corev1.SchemeGroupVersion.WithKind("Pod").String(), cfg.Integrations.PodOptions), + jobframework.WithEnabledFrameworks(cfg.Integrations), + jobframework.WithManagerName(constants.KueueName), } - err := jobframework.ForEachIntegration(func(name string, cb jobframework.IntegrationCallbacks) error { - log := setupLog.WithValues("jobFrameworkName", name) - if isFrameworkEnabled(cfg, name) { - if cb.CanSupportIntegration != nil { - if canSupport, err := cb.CanSupportIntegration(opts...); !canSupport || err != nil { - setupLog.Error(err, "Failed to configure reconcilers") - os.Exit(1) - } - } - gvk, err := apiutil.GVKForObject(cb.JobType, mgr.GetScheme()) - if err != nil { - return err - } - if _, err = mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil { - if !meta.IsNoMatchError(err) { - return err - } - log.Info("No matching API in the server for job framework, skipped setup of controller and webhook") - } else { - if err = cb.NewReconciler( - mgr.GetClient(), - mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", name, constants.KueueName)), - opts..., - ).SetupWithManager(mgr); err != nil { - log.Error(err, "Unable to create controller") - return err - } - if err = cb.SetupWebhook(mgr, opts...); err != nil { - log.Error(err, "Unable to create webhook") - return err - } - log.Info("Set up controller and webhook for job framework") - return nil - } - } - if err := noop.SetupWebhook(mgr, cb.JobType); err != nil { - log.Error(err, "Unable to create noop webhook") - return err - } - return nil - }) - if err != nil { + if err := jobframework.SetupControllers(mgr, setupLog, opts...); err != nil { + setupLog.Error(err, "Unable to create controller or webhook", "kubernetesVersion", serverVersionFetcher.GetServerVersion()) os.Exit(1) } // +kubebuilder:scaffold:builder @@ -415,12 +364,3 @@ func apply(configFile string) (ctrl.Options, configapi.Configuration, error) { return options, cfg, nil } - -func isFrameworkEnabled(cfg *configapi.Configuration, name string) bool { - for _, framework := range cfg.Integrations.Frameworks { - if framework == name { - return true - } - } - return false -} diff --git a/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go b/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go index 750381f5e7..507e219078 100644 --- a/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go +++ b/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go @@ -62,7 +62,7 @@ func TestWlReconcileJobset(t *testing.T) { managersWorkloads: []kueue.Workload{ *baseWorkloadBuilder.Clone(). AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}). - OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true). + OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true). ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). Obj(), }, @@ -83,7 +83,7 @@ func TestWlReconcileJobset(t *testing.T) { State: kueue.CheckStateReady, Message: `The workload got reservation on "worker1"`, }). - OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true). + OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true). ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). Obj(), }, @@ -110,7 +110,7 @@ func TestWlReconcileJobset(t *testing.T) { State: kueue.CheckStateReady, Message: `The workload got reservation on "worker1"`, }). - OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true). + OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true). ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). Obj(), }, @@ -139,7 +139,7 @@ func TestWlReconcileJobset(t *testing.T) { State: kueue.CheckStateReady, Message: `The workload got reservation on "worker1"`, }). - OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true). + OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true). ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}). Obj(), @@ -171,7 +171,7 @@ func TestWlReconcileJobset(t *testing.T) { State: kueue.CheckStateReady, Message: `The workload got reservation on "worker1"`, }). - OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true). + OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true). ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}). Obj(), @@ -203,7 +203,7 @@ func TestWlReconcileJobset(t *testing.T) { State: kueue.CheckStateReady, Message: `The workload got reservation on "worker1"`, }). - OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true). + OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true). ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}). Obj(), diff --git a/pkg/controller/admissionchecks/multikueue/workload_test.go b/pkg/controller/admissionchecks/multikueue/workload_test.go index 481646168a..e9d1784beb 100644 --- a/pkg/controller/admissionchecks/multikueue/workload_test.go +++ b/pkg/controller/admissionchecks/multikueue/workload_test.go @@ -90,7 +90,7 @@ func TestWlReconcile(t *testing.T) { managersWorkloads: []kueue.Workload{ *baseWorkloadBuilder.Clone(). AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}). - OwnerReference("batch/v1", "Job", "job1", "uid1", true, true). + OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true). Obj(), }, worker1Workloads: []kueue.Workload{ @@ -99,7 +99,7 @@ func TestWlReconcile(t *testing.T) { wantManagersWorkloads: []kueue.Workload{ *baseWorkloadBuilder.Clone(). AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}). - OwnerReference("batch/v1", "Job", "job1", "uid1", true, true). + OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true). Obj(), }, }, @@ -108,14 +108,14 @@ func TestWlReconcile(t *testing.T) { managersWorkloads: []kueue.Workload{ *baseWorkloadBuilder.Clone(). AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}). - OwnerReference("batch/v1", "Job", "job1", "uid1", true, true). + OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true). ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). Obj(), }, wantManagersWorkloads: []kueue.Workload{ *baseWorkloadBuilder.Clone(). AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}). - OwnerReference("batch/v1", "Job", "job1", "uid1", true, true). + OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true). ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). Obj(), }, @@ -128,7 +128,7 @@ func TestWlReconcile(t *testing.T) { managersWorkloads: []kueue.Workload{ *baseWorkloadBuilder.Clone(). AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}). - OwnerReference("batch/v1", "Job", "job1", "uid1", true, true). + OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true). ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). Obj(), }, @@ -149,7 +149,7 @@ func TestWlReconcile(t *testing.T) { State: kueue.CheckStatePending, Message: `The workload got reservation on "worker1"`, }). - OwnerReference("batch/v1", "Job", "job1", "uid1", true, true). + OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true). ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). Obj(), }, @@ -177,7 +177,7 @@ func TestWlReconcile(t *testing.T) { State: kueue.CheckStatePending, Message: `The workload got reservation on "worker1"`, }). - OwnerReference("batch/v1", "Job", "job1", "uid1", true, true). + OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true). ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). Obj(), }, @@ -206,7 +206,7 @@ func TestWlReconcile(t *testing.T) { State: kueue.CheckStatePending, Message: `The workload got reservation on "worker1"`, }). - OwnerReference("batch/v1", "Job", "job1", "uid1", true, true). + OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true). ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}). Obj(), @@ -239,7 +239,7 @@ func TestWlReconcile(t *testing.T) { State: kueue.CheckStatePending, Message: `The workload got reservation on "worker1"`, }). - OwnerReference("batch/v1", "Job", "job1", "uid1", true, true). + OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true). ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}). Obj(), @@ -271,7 +271,7 @@ func TestWlReconcile(t *testing.T) { State: kueue.CheckStatePending, Message: `The workload got reservation on "worker1"`, }). - OwnerReference("batch/v1", "Job", "job1", "uid1", true, true). + OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true). ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}). Obj(), diff --git a/pkg/controller/core/workload_controller_test.go b/pkg/controller/core/workload_controller_test.go index ae19b22f71..091088d707 100644 --- a/pkg/controller/core/workload_controller_test.go +++ b/pkg/controller/core/workload_controller_test.go @@ -382,7 +382,7 @@ func TestReconcile(t *testing.T) { Type: "Finished", Status: "True", }). - OwnerReference(batchv1.SchemeGroupVersion.String(), "Job", "job", "test-uid", true, true). + OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job", "test-uid", true, true). DeletionTimestamp(testStartTime). Obj(), wantWorkload: utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). @@ -390,20 +390,20 @@ func TestReconcile(t *testing.T) { Type: "Finished", Status: "True", }). - OwnerReference(batchv1.SchemeGroupVersion.String(), "Job", "job", "test-uid", true, true). + OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job", "test-uid", true, true). DeletionTimestamp(testStartTime). Obj(), }, "unadmitted workload with rejected checks": { workload: utiltesting.MakeWorkload("wl", "ns"). - OwnerReference("ownerapi", "ownerkind", "ownername", "owneruid", true, true). + OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", true, true). AdmissionCheck(kueue.AdmissionCheckState{ Name: "check", State: kueue.CheckStateRejected, }). Obj(), wantWorkload: utiltesting.MakeWorkload("wl", "ns"). - OwnerReference("ownerapi", "ownerkind", "ownername", "owneruid", true, true). + OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", true, true). AdmissionCheck(kueue.AdmissionCheckState{ Name: "check", State: kueue.CheckStateRejected, @@ -427,7 +427,7 @@ func TestReconcile(t *testing.T) { workload: utiltesting.MakeWorkload("wl", "ns"). ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). Admitted(true). - OwnerReference("ownerapi", "ownerkind", "ownername", "owneruid", true, true). + OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", true, true). AdmissionCheck(kueue.AdmissionCheckState{ Name: "check", State: kueue.CheckStateRejected, @@ -436,7 +436,7 @@ func TestReconcile(t *testing.T) { wantWorkload: utiltesting.MakeWorkload("wl", "ns"). ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). Admitted(true). - OwnerReference("ownerapi", "ownerkind", "ownername", "owneruid", true, true). + OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", true, true). AdmissionCheck(kueue.AdmissionCheckState{ Name: "check", State: kueue.CheckStateRejected, diff --git a/pkg/controller/jobframework/reconciler.go b/pkg/controller/jobframework/reconciler.go index 4c3d009a10..772abc50f5 100644 --- a/pkg/controller/jobframework/reconciler.go +++ b/pkg/controller/jobframework/reconciler.go @@ -24,6 +24,7 @@ import ( apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/validation" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" @@ -32,6 +33,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + configapi "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/constants" controllerconsts "sigs.k8s.io/kueue/pkg/controller/constants" @@ -71,6 +73,8 @@ type Options struct { KubeServerVersion *kubeversion.ServerVersionFetcher // IntegrationOptions key is "$GROUP/$VERSION, Kind=$KIND". IntegrationOptions map[string]any + EnabledFrameworks sets.Set[string] + ManagerName string } // Option configures the reconciler. @@ -95,9 +99,9 @@ func WithManageJobsWithoutQueueName(f bool) Option { // WithWaitForPodsReady indicates if the controller should add the PodsReady // condition to the workload when the corresponding job has all pods ready // or succeeded. -func WithWaitForPodsReady(f bool) Option { +func WithWaitForPodsReady(w *configapi.WaitForPodsReady) Option { return func(o *Options) { - o.WaitForPodsReady = f + o.WaitForPodsReady = w != nil && w.Enable } } @@ -118,6 +122,23 @@ func WithIntegrationOptions(integrationName string, opts any) Option { } } +// WithEnabledFrameworks adds framework names enabled in the ConfigAPI. +func WithEnabledFrameworks(i *configapi.Integrations) Option { + return func(o *Options) { + if i == nil || len(i.Frameworks) == 0 { + return + } + o.EnabledFrameworks = sets.New(i.Frameworks...) + } +} + +// WithManagerName adds the kueue's manager name. +func WithManagerName(n string) Option { + return func(o *Options) { + o.ManagerName = n + } +} + var defaultOptions = Options{} func NewReconciler( diff --git a/pkg/controller/jobframework/reconciler_test.go b/pkg/controller/jobframework/reconciler_test.go index 6a6644828b..da658d70d9 100644 --- a/pkg/controller/jobframework/reconciler_test.go +++ b/pkg/controller/jobframework/reconciler_test.go @@ -117,7 +117,7 @@ func TestProcessOptions(t *testing.T) { "all options are passed": { inputOpts: []Option{ WithManageJobsWithoutQueueName(true), - WithWaitForPodsReady(true), + WithWaitForPodsReady(&configapi.WaitForPodsReady{Enable: true}), WithKubeServerVersion(&kubeversion.ServerVersionFetcher{}), WithIntegrationOptions(corev1.SchemeGroupVersion.WithKind("Pod").String(), &configapi.PodIntegrationOptions{ PodSelector: &metav1.LabelSelector{}, diff --git a/pkg/controller/jobframework/setup.go b/pkg/controller/jobframework/setup.go new file mode 100644 index 0000000000..39541ad3c4 --- /dev/null +++ b/pkg/controller/jobframework/setup.go @@ -0,0 +1,107 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package jobframework + +import ( + "context" + "errors" + "fmt" + "os" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/api/meta" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + + "sigs.k8s.io/kueue/pkg/controller/jobs/noop" +) + +var ( + errFailedMappingResource = errors.New("restMapper failed mapping resource") +) + +// SetupControllers setups all controllers and webhooks for integrations. +// When the platform developers implement a separate kueue-manager to manage the in-house custom jobs, +// they can easily setup controllers and webhooks for the in-house custom jobs. +// +// Note that the first argument, "mgr" must be initialized on the outside of this function. +// In addition, if the manager uses the kueue's internal cert management for the webhooks, +// this function needs to be called after the certs get ready because the controllers won't work +// until the webhooks are operating, and the webhook won't work until the +// certs are all in place. +func SetupControllers(mgr ctrl.Manager, log logr.Logger, opts ...Option) error { + options := ProcessOptions(opts...) + + return ForEachIntegration(func(name string, cb IntegrationCallbacks) error { + logger := log.WithValues("jobFrameworkName", name) + fwkNamePrefix := fmt.Sprintf("jobFrameworkName %q", name) + + if options.EnabledFrameworks.Has(name) { + if cb.CanSupportIntegration != nil { + if canSupport, err := cb.CanSupportIntegration(opts...); !canSupport || err != nil { + log.Error(err, "Failed to configure reconcilers") + os.Exit(1) + } + } + gvk, err := apiutil.GVKForObject(cb.JobType, mgr.GetScheme()) + if err != nil { + return fmt.Errorf("%s: %w: %w", fwkNamePrefix, errFailedMappingResource, err) + } + if _, err = mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil { + if !meta.IsNoMatchError(err) { + return fmt.Errorf("%s: %w", fwkNamePrefix, err) + } + logger.Info("No matching API in the server for job framework, skipped setup of controller and webhook") + } else { + if err = cb.NewReconciler( + mgr.GetClient(), + mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", name, options.ManagerName)), + opts..., + ).SetupWithManager(mgr); err != nil { + return fmt.Errorf("%s: %w", fwkNamePrefix, err) + } + if err = cb.SetupWebhook(mgr, opts...); err != nil { + return fmt.Errorf("%s: unable to create webhook: %w", fwkNamePrefix, err) + } + logger.Info("Set up controller and webhook for job framework") + return nil + } + } + if err := noop.SetupWebhook(mgr, cb.JobType); err != nil { + return fmt.Errorf("%s: unable to create noop webhook: %w", fwkNamePrefix, err) + } + return nil + }) +} + +// SetupIndexes setups the indexers for integrations. +// When the platform developers implement a separate kueue-manager to manage the in-house custom jobs, +// they can easily setup indexers for the in-house custom jobs. +// +// Note that the second argument, "indexer" needs to be the fieldIndexer obtained from the Manager. +func SetupIndexes(ctx context.Context, indexer client.FieldIndexer, opts ...Option) error { + options := ProcessOptions(opts...) + return ForEachIntegration(func(name string, cb IntegrationCallbacks) error { + if options.EnabledFrameworks.Has(name) { + if err := cb.SetupIndexes(ctx, indexer); err != nil { + return fmt.Errorf("jobFrameworkName %q: %w", name, err) + } + } + return nil + }) +} diff --git a/pkg/controller/jobframework/setup_test.go b/pkg/controller/jobframework/setup_test.go new file mode 100644 index 0000000000..832074f536 --- /dev/null +++ b/pkg/controller/jobframework/setup_test.go @@ -0,0 +1,199 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package jobframework + +import ( + "context" + "net/http" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1" + kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + rayjobapi "github.com/ray-project/kuberay/ray-operator/apis/ray/v1alpha1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + apimeta "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + ctrlmgr "sigs.k8s.io/controller-runtime/pkg/manager" + jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" + + configapi "sigs.k8s.io/kueue/apis/config/v1beta1" + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/util/slices" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" +) + +func TestSetupControllers(t *testing.T) { + cases := map[string]struct { + opts []Option + mapperGVKs []schema.GroupVersionKind + wantError error + }{ + "setup controllers succeed": { + opts: []Option{ + WithEnabledFrameworks(&configapi.Integrations{ + Frameworks: []string{"batch/job", "kubeflow.org/mpijob"}, + }), + }, + mapperGVKs: []schema.GroupVersionKind{ + batchv1.SchemeGroupVersion.WithKind("Job"), + kubeflow.SchemeGroupVersionKind, + }, + }, + "mapper doesn't have kubeflow.org/mpijob, but no error occur": { + opts: []Option{ + WithEnabledFrameworks(&configapi.Integrations{ + Frameworks: []string{"batch/job", "kubeflow.org/mpijob"}, + }), + }, + mapperGVKs: []schema.GroupVersionKind{ + batchv1.SchemeGroupVersion.WithKind("Job"), + }, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + _, logger := utiltesting.ContextWithLog(t) + k8sClient := utiltesting.NewClientBuilder(jobset.AddToScheme, kubeflow.AddToScheme, rayjobapi.AddToScheme, kftraining.AddToScheme).Build() + + mgrOpts := ctrlmgr.Options{ + Scheme: k8sClient.Scheme(), + NewClient: func(*rest.Config, client.Options) (client.Client, error) { + return k8sClient, nil + }, + MapperProvider: func(*rest.Config, *http.Client) (apimeta.RESTMapper, error) { + gvs := make([]schema.GroupVersion, len(tc.mapperGVKs)) + for _, gvk := range tc.mapperGVKs { + gvs = append(gvs, gvk.GroupVersion()) + } + mapper := apimeta.NewDefaultRESTMapper(gvs) + for _, gvk := range tc.mapperGVKs { + mapper.Add(gvk, apimeta.RESTScopeNamespace) + } + return mapper, nil + }, + } + mgr, err := ctrlmgr.New(&rest.Config{}, mgrOpts) + if err != nil { + t.Fatalf("Failed to setup manager: %v", err) + } + + gotError := SetupControllers(mgr, logger, tc.opts...) + if diff := cmp.Diff(tc.wantError, gotError, cmpopts.EquateErrors()); len(diff) != 0 { + t.Errorf("Unexpected error from SetupControllers (-want,+got):\n%s", diff) + } + }) + } +} + +func TestSetupIndexes(t *testing.T) { + testNamespace := "test" + + cases := map[string]struct { + opts []Option + workloads []kueue.Workload + filter client.ListOption + wantError error + wantFieldMatcherError bool + wantWorkloads []string + }{ + "proper indexes are set": { + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("alpha-wl", testNamespace). + OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "alpha", "job", true, true). + Obj(), + *utiltesting.MakeWorkload("beta-wl", testNamespace). + OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "beta", "job", true, true). + Obj(), + }, + opts: []Option{ + WithEnabledFrameworks(&configapi.Integrations{ + Frameworks: []string{"batch/job"}, + }), + }, + filter: client.MatchingFields{GetOwnerKey(batchv1.SchemeGroupVersion.WithKind("Job")): "alpha"}, + wantWorkloads: []string{"alpha-wl"}, + }, + "kubeflow.org/mpijob is disabled in the configAPI": { + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("alpha-wl", testNamespace). + OwnerReference(kubeflow.SchemeGroupVersionKind, "alpha", "mpijob", true, true). + Obj(), + *utiltesting.MakeWorkload("beta-wl", testNamespace). + OwnerReference(kubeflow.SchemeGroupVersionKind, "beta", "mpijob", true, true). + Obj(), + }, + opts: []Option{ + WithEnabledFrameworks(&configapi.Integrations{ + Frameworks: []string{"batch/job"}, + }), + }, + filter: client.MatchingFields{GetOwnerKey(kubeflow.SchemeGroupVersionKind): "alpha"}, + wantFieldMatcherError: true, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + builder := utiltesting.NewClientBuilder().WithObjects(&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNamespace}}) + gotIndexerErr := SetupIndexes(ctx, utiltesting.AsIndexer(builder), tc.opts...) + if diff := cmp.Diff(tc.wantError, gotIndexerErr, cmpopts.EquateErrors()); len(diff) != 0 { + t.Fatalf("Unexpected setupIndexer error (-want,+got):\n%s", diff) + } + k8sClient := builder.Build() + for _, wl := range tc.workloads { + if err := k8sClient.Create(ctx, &wl); err != nil { + t.Fatalf("Unable to create workload, %q: %v", klog.KObj(&wl), err) + } + } + + // In any case, a list operation without fieldMatcher should succeed. + gotWls := &kueue.WorkloadList{} + if gotListErr := k8sClient.List(ctx, gotWls, client.InNamespace(testNamespace)); gotListErr != nil { + t.Fatalf("Failed to list workloads without a fieldMatcher: %v", gotListErr) + } + deployedWlNames := slices.Map(tc.workloads, func(j *kueue.Workload) string { return j.Name }) + gotWlNames := slices.Map(gotWls.Items, func(j *kueue.Workload) string { return j.Name }) + if diff := cmp.Diff(deployedWlNames, gotWlNames, cmpopts.EquateEmpty(), + cmpopts.SortSlices(func(a, b string) bool { return a < b })); len(diff) != 0 { + t.Errorf("Unexpected list workloads (-want,+got):\n%s", diff) + } + + // List workloads with fieldMatcher. + gotListErr := k8sClient.List(ctx, gotWls, client.InNamespace(testNamespace), tc.filter) + if (gotListErr != nil) != tc.wantFieldMatcherError { + t.Errorf("Unexpected list error\nwant: %v\ngot: %v", tc.wantFieldMatcherError, gotListErr) + } + + if !tc.wantFieldMatcherError { + gotWlNames = slices.Map(gotWls.Items, func(j *kueue.Workload) string { return j.Name }) + if diff := cmp.Diff(tc.wantWorkloads, gotWlNames, cmpopts.EquateEmpty(), + cmpopts.SortSlices(func(a, b string) bool { return a < b })); len(diff) != 0 { + t.Errorf("Unexpected list workloads (-want,+got):\n%s", diff) + } + } + }) + } +} diff --git a/pkg/controller/jobs/job/job_controller_test.go b/pkg/controller/jobs/job/job_controller_test.go index d2bc8fe7a1..5096c22a01 100644 --- a/pkg/controller/jobs/job/job_controller_test.go +++ b/pkg/controller/jobs/job/job_controller_test.go @@ -1922,7 +1922,7 @@ func TestReconciler(t *testing.T) { Labels(map[string]string{ controllerconsts.JobUIDLabel: "test-uid", }). - OwnerReference(batchv1.SchemeGroupVersion.String(), "Job", "job", "test-uid", true, true). + OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job", "test-uid", true, true). Obj(), }, wantEvents: []utiltesting.EventRecord{ @@ -1953,7 +1953,7 @@ func TestReconciler(t *testing.T) { PriorityClass("test-wpc"). Priority(100). PriorityClassSource(constants.WorkloadPriorityClassSource). - OwnerReference(batchv1.SchemeGroupVersion.String(), "Job", "other-job", "other-uid", true, true). + OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "other-job", "other-uid", true, true). Obj(), }, wantWorkloads: []kueue.Workload{ @@ -1963,7 +1963,7 @@ func TestReconciler(t *testing.T) { PriorityClass("test-wpc"). Priority(100). PriorityClassSource(constants.WorkloadPriorityClassSource). - OwnerReference(batchv1.SchemeGroupVersion.String(), "Job", "other-job", "other-uid", true, true). + OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "other-job", "other-uid", true, true). Obj(), }, wantEvents: []utiltesting.EventRecord{ @@ -2006,7 +2006,7 @@ func TestReconciler(t *testing.T) { Labels(map[string]string{ controllerconsts.JobUIDLabel: "test-uid", }). - OwnerReference(batchv1.SchemeGroupVersion.String(), "Job", "job", "test-uid", true, true). + OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job", "test-uid", true, true). Condition(metav1.Condition{ Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, diff --git a/pkg/controller/jobs/pod/pod_controller.go b/pkg/controller/jobs/pod/pod_controller.go index 22f55a2a0d..81b57962b3 100644 --- a/pkg/controller/jobs/pod/pod_controller.go +++ b/pkg/controller/jobs/pod/pod_controller.go @@ -468,6 +468,7 @@ func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error { func CanSupportIntegration(opts ...jobframework.Option) (bool, error) { options := jobframework.ProcessOptions(opts...) + v := options.KubeServerVersion.GetServerVersion() if v.String() == "" || v.LessThan(kubeversion.KubeVersion1_27) { return false, fmt.Errorf("kubernetesVersion %q: %w", v.String(), errPodNoSupportKubeVersion) diff --git a/pkg/controller/jobs/pod/pod_controller_test.go b/pkg/controller/jobs/pod/pod_controller_test.go index 2a48751089..c8bd121bf1 100644 --- a/pkg/controller/jobs/pod/pod_controller_test.go +++ b/pkg/controller/jobs/pod/pod_controller_test.go @@ -2360,13 +2360,13 @@ func TestReconciler(t *testing.T) { reconcileKey: &types.NamespacedName{Namespace: "ns", Name: "deleted_pod"}, workloads: []kueue.Workload{ *utiltesting.MakeWorkload("test-group", "ns"). - OwnerReference("v1", "Pod", "deleted_pod", "", true, true). + OwnerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "deleted_pod", "", true, true). Finalizers(kueue.ResourceInUseFinalizerName). Obj(), }, wantWorkloads: []kueue.Workload{ *utiltesting.MakeWorkload("test-group", "ns"). - OwnerReference("v1", "Pod", "deleted_pod", "", true, true). + OwnerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "deleted_pod", "", true, true). Obj(), }, workloadCmpOpts: defaultWorkloadCmpOpts, diff --git a/pkg/util/cert/cert.go b/pkg/util/cert/cert.go index 1032bfa30c..d63a3d6529 100644 --- a/pkg/util/cert/cert.go +++ b/pkg/util/cert/cert.go @@ -19,6 +19,7 @@ package cert import ( "fmt" + "github.com/go-logr/logr" cert "github.com/open-policy-agent/cert-controller/pkg/rotator" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" @@ -65,3 +66,9 @@ func ManageCerts(mgr ctrl.Manager, cfg config.Configuration, setupFinished chan RequireLeaderElection: false, }) } + +func WaitForCertsReady(log logr.Logger, certsReady chan struct{}) { + log.Info("Waiting for certificate generation to complete") + <-certsReady + log.Info("Certs ready") +} diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index e653c08bae..1079934cf4 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -26,6 +26,7 @@ import ( apimeta "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/ptr" @@ -216,11 +217,11 @@ func (w *WorkloadWrapper) AdmissionChecks(checks ...kueue.AdmissionCheckState) * return w } -func (w *WorkloadWrapper) OwnerReference(apiVersion, kind, name, uid string, controller, blockDeletion bool) *WorkloadWrapper { +func (w *WorkloadWrapper) OwnerReference(gvk schema.GroupVersionKind, name, uid string, controller, blockDeletion bool) *WorkloadWrapper { w.OwnerReferences = []metav1.OwnerReference{ { - APIVersion: apiVersion, - Kind: kind, + APIVersion: gvk.GroupVersion().String(), + Kind: gvk.Kind, Name: name, UID: types.UID(uid), Controller: &controller, diff --git a/test/integration/controller/jobs/job/job_controller_test.go b/test/integration/controller/jobs/job/job_controller_test.go index e824f84b96..a5dafef43f 100644 --- a/test/integration/controller/jobs/job/job_controller_test.go +++ b/test/integration/controller/jobs/job/job_controller_test.go @@ -34,6 +34,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + configapi "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" @@ -812,7 +813,7 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", ginkgo.O CRDPath: crdPath, } cfg = fwk.Init() - ctx, k8sClient = fwk.RunManager(cfg, managerSetup(jobframework.WithWaitForPodsReady(true))) + ctx, k8sClient = fwk.RunManager(cfg, managerSetup(jobframework.WithWaitForPodsReady(&configapi.WaitForPodsReady{Enable: true}))) ginkgo.By("Create a resource flavor") gomega.Expect(k8sClient.Create(ctx, defaultFlavor)).Should(gomega.Succeed()) }) diff --git a/test/integration/controller/jobs/jobset/jobset_controller_test.go b/test/integration/controller/jobs/jobset/jobset_controller_test.go index 69cc55c357..88e4a36620 100644 --- a/test/integration/controller/jobs/jobset/jobset_controller_test.go +++ b/test/integration/controller/jobs/jobset/jobset_controller_test.go @@ -33,6 +33,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" jobsetapi "sigs.k8s.io/jobset/api/jobset/v1alpha2" + configapi "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" @@ -587,7 +588,7 @@ var _ = ginkgo.Describe("JobSet controller when waitForPodsReady enabled", ginkg DepCRDPaths: []string{jobsetCrdPath}, } cfg = fwk.Init() - ctx, k8sClient = fwk.RunManager(cfg, managerSetup(jobframework.WithWaitForPodsReady(true))) + ctx, k8sClient = fwk.RunManager(cfg, managerSetup(jobframework.WithWaitForPodsReady(&configapi.WaitForPodsReady{Enable: true}))) ginkgo.By("Create a resource flavor") gomega.Expect(k8sClient.Create(ctx, defaultFlavor)).Should(gomega.Succeed()) diff --git a/test/integration/controller/jobs/mpijob/mpijob_controller_test.go b/test/integration/controller/jobs/mpijob/mpijob_controller_test.go index a116d409f5..797004f6ae 100644 --- a/test/integration/controller/jobs/mpijob/mpijob_controller_test.go +++ b/test/integration/controller/jobs/mpijob/mpijob_controller_test.go @@ -33,6 +33,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + configapi "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" @@ -615,7 +616,7 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", ginkgo.O DepCRDPaths: []string{mpiCrdPath}, } cfg = fwk.Init() - ctx, k8sClient = fwk.RunManager(cfg, managerSetup(false, jobframework.WithWaitForPodsReady(true))) + ctx, k8sClient = fwk.RunManager(cfg, managerSetup(false, jobframework.WithWaitForPodsReady(&configapi.WaitForPodsReady{Enable: true}))) ginkgo.By("Create a resource flavor") gomega.Expect(k8sClient.Create(ctx, defaultFlavor)).Should(gomega.Succeed()) diff --git a/test/integration/controller/jobs/mxjob/mxjob_controller_test.go b/test/integration/controller/jobs/mxjob/mxjob_controller_test.go index e410b384f3..c832d41524 100644 --- a/test/integration/controller/jobs/mxjob/mxjob_controller_test.go +++ b/test/integration/controller/jobs/mxjob/mxjob_controller_test.go @@ -23,6 +23,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + configapi "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/controller/jobframework" workloadmxjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/mxjob" @@ -105,7 +106,7 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", ginkgo.O DepCRDPaths: []string{mxnetCrdPath}, } cfg := fwk.Init() - ctx, k8sClient = fwk.RunManager(cfg, managerSetup(jobframework.WithWaitForPodsReady(true))) + ctx, k8sClient = fwk.RunManager(cfg, managerSetup(jobframework.WithWaitForPodsReady(&configapi.WaitForPodsReady{Enable: true}))) ginkgo.By("Create a resource flavor") gomega.Expect(k8sClient.Create(ctx, defaultFlavor)).Should(gomega.Succeed()) diff --git a/test/integration/controller/jobs/paddlejob/paddlejob_controller_test.go b/test/integration/controller/jobs/paddlejob/paddlejob_controller_test.go index 5daa4a10a2..9c783078d2 100644 --- a/test/integration/controller/jobs/paddlejob/paddlejob_controller_test.go +++ b/test/integration/controller/jobs/paddlejob/paddlejob_controller_test.go @@ -23,6 +23,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + configapi "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/controller/jobframework" workloadpaddlejob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/paddlejob" @@ -103,7 +104,7 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", ginkgo.O DepCRDPaths: []string{paddleCrdPath}, } cfg := fwk.Init() - ctx, k8sClient = fwk.RunManager(cfg, managerSetup(jobframework.WithWaitForPodsReady(true))) + ctx, k8sClient = fwk.RunManager(cfg, managerSetup(jobframework.WithWaitForPodsReady(&configapi.WaitForPodsReady{Enable: true}))) ginkgo.By("Create a resource flavor") gomega.Expect(k8sClient.Create(ctx, defaultFlavor)).Should(gomega.Succeed()) diff --git a/test/integration/controller/jobs/pytorchjob/pytorchjob_controller_test.go b/test/integration/controller/jobs/pytorchjob/pytorchjob_controller_test.go index e7a62f67b3..d27a57a5eb 100644 --- a/test/integration/controller/jobs/pytorchjob/pytorchjob_controller_test.go +++ b/test/integration/controller/jobs/pytorchjob/pytorchjob_controller_test.go @@ -25,9 +25,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/ptr" - "sigs.k8s.io/controller-runtime/pkg/client" + configapi "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" @@ -337,7 +337,7 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", ginkgo.O DepCRDPaths: []string{pytorchCrdPath}, } cfg := fwk.Init() - ctx, k8sClient = fwk.RunManager(cfg, managerSetup(jobframework.WithWaitForPodsReady(true))) + ctx, k8sClient = fwk.RunManager(cfg, managerSetup(jobframework.WithWaitForPodsReady(&configapi.WaitForPodsReady{Enable: true}))) ginkgo.By("Create a resource flavor") gomega.Expect(k8sClient.Create(ctx, defaultFlavor)).Should(gomega.Succeed()) diff --git a/test/integration/controller/jobs/rayjob/rayjob_controller_test.go b/test/integration/controller/jobs/rayjob/rayjob_controller_test.go index a5fd61e3d4..12b00d30be 100644 --- a/test/integration/controller/jobs/rayjob/rayjob_controller_test.go +++ b/test/integration/controller/jobs/rayjob/rayjob_controller_test.go @@ -33,6 +33,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + configapi "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" @@ -347,7 +348,7 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", ginkgo.O DepCRDPaths: []string{rayCrdPath}, } cfg = fwk.Init() - ctx, k8sClient = fwk.RunManager(cfg, managerSetup(jobframework.WithWaitForPodsReady(true))) + ctx, k8sClient = fwk.RunManager(cfg, managerSetup(jobframework.WithWaitForPodsReady(&configapi.WaitForPodsReady{Enable: true}))) ginkgo.By("Create a resource flavor") gomega.Expect(k8sClient.Create(ctx, defaultFlavor)).Should(gomega.Succeed()) diff --git a/test/integration/controller/jobs/tfjob/tfjob_controller_test.go b/test/integration/controller/jobs/tfjob/tfjob_controller_test.go index bc11cdc2ee..97f1ee5851 100644 --- a/test/integration/controller/jobs/tfjob/tfjob_controller_test.go +++ b/test/integration/controller/jobs/tfjob/tfjob_controller_test.go @@ -23,6 +23,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + configapi "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/controller/jobframework" workloadtfjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/tfjob" @@ -106,7 +107,7 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", ginkgo.O DepCRDPaths: []string{tensorflowCrdPath}, } cfg := fwk.Init() - ctx, k8sClient = fwk.RunManager(cfg, managerSetup(jobframework.WithWaitForPodsReady(true))) + ctx, k8sClient = fwk.RunManager(cfg, managerSetup(jobframework.WithWaitForPodsReady(&configapi.WaitForPodsReady{Enable: true}))) ginkgo.By("Create a resource flavor") gomega.Expect(k8sClient.Create(ctx, defaultFlavor)).Should(gomega.Succeed()) diff --git a/test/integration/controller/jobs/xgboostjob/xgboostjob_controller_test.go b/test/integration/controller/jobs/xgboostjob/xgboostjob_controller_test.go index f3cb8f2677..8aa01844d3 100644 --- a/test/integration/controller/jobs/xgboostjob/xgboostjob_controller_test.go +++ b/test/integration/controller/jobs/xgboostjob/xgboostjob_controller_test.go @@ -23,6 +23,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + configapi "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/controller/jobframework" workloadxgboostjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/xgboostjob" @@ -101,7 +102,7 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", ginkgo.O DepCRDPaths: []string{xgbCrdPath}, } cfg := fwk.Init() - ctx, k8sClient = fwk.RunManager(cfg, managerSetup(jobframework.WithWaitForPodsReady(true))) + ctx, k8sClient = fwk.RunManager(cfg, managerSetup(jobframework.WithWaitForPodsReady(&configapi.WaitForPodsReady{Enable: true}))) ginkgo.By("Create a resource flavor") gomega.Expect(k8sClient.Create(ctx, defaultFlavor)).Should(gomega.Succeed())