Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Utilize jobframework setups #1630

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 11 additions & 71 deletions cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package main
import (
"context"
"flag"
"fmt"
"os"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
Expand All @@ -30,7 +29,6 @@ import (
"go.uber.org/zap/zapcore"
corev1 "k8s.io/api/core/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
Expand All @@ -40,7 +38,6 @@ import (
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

Expand All @@ -55,7 +52,6 @@ import (
"sigs.k8s.io/kueue/pkg/controller/core"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/controller/jobs/noop"
"sigs.k8s.io/kueue/pkg/debugger"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/metrics"
Expand Down Expand Up @@ -216,23 +212,16 @@ func setupIndexes(ctx context.Context, mgr ctrl.Manager, cfg *configapi.Configur
}
}

err = jobframework.ForEachIntegration(func(name string, cb jobframework.IntegrationCallbacks) error {
if isFrameworkEnabled(cfg, name) {
if err := cb.SetupIndexes(ctx, mgr.GetFieldIndexer()); err != nil {
return fmt.Errorf("integration %s: %w", name, err)
}
}
return nil
})
return err
opts := []jobframework.Option{
jobframework.WithEnabledFrameworks(cfg.Integrations),
}
return jobframework.SetupIndexes(ctx, mgr.GetFieldIndexer(), opts...)
}

func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager, certsReady chan struct{}, cfg *configapi.Configuration, serverVersionFetcher *kubeversion.ServerVersionFetcher) {
// The controllers won't work until the webhooks are operating, and the webhook won't work until the
// certs are all in place.
setupLog.Info("Waiting for certificate generation to complete")
<-certsReady
setupLog.Info("Certs ready")
cert.WaitForCertsReady(setupLog, certsReady)

if failedCtrl, err := core.SetupControllers(mgr, queues, cCache, cfg); err != nil {
setupLog.Error(err, "Unable to create controller", "controller", failedCtrl)
Expand Down Expand Up @@ -261,61 +250,21 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag
}
}

manageJobsWithoutQueueName := cfg.ManageJobsWithoutQueueName

if failedWebhook, err := webhooks.Setup(mgr); err != nil {
setupLog.Error(err, "Unable to create webhook", "webhook", failedWebhook)
os.Exit(1)
}

opts := []jobframework.Option{
jobframework.WithManageJobsWithoutQueueName(manageJobsWithoutQueueName),
jobframework.WithWaitForPodsReady(waitForPodsReady(cfg)),
jobframework.WithManageJobsWithoutQueueName(cfg.ManageJobsWithoutQueueName),
jobframework.WithWaitForPodsReady(cfg.WaitForPodsReady),
jobframework.WithKubeServerVersion(serverVersionFetcher),
jobframework.WithIntegrationOptions(corev1.SchemeGroupVersion.WithKind("Pod").String(), cfg.Integrations.PodOptions),
jobframework.WithEnabledFrameworks(cfg.Integrations),
jobframework.WithManagerName(constants.KueueName),
}
err := jobframework.ForEachIntegration(func(name string, cb jobframework.IntegrationCallbacks) error {
log := setupLog.WithValues("jobFrameworkName", name)
if isFrameworkEnabled(cfg, name) {
if cb.CanSupportIntegration != nil {
if canSupport, err := cb.CanSupportIntegration(opts...); !canSupport || err != nil {
setupLog.Error(err, "Failed to configure reconcilers")
os.Exit(1)
}
}
gvk, err := apiutil.GVKForObject(cb.JobType, mgr.GetScheme())
if err != nil {
return err
}
if _, err = mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil {
if !meta.IsNoMatchError(err) {
return err
}
log.Info("No matching API in the server for job framework, skipped setup of controller and webhook")
} else {
if err = cb.NewReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", name, constants.KueueName)),
opts...,
).SetupWithManager(mgr); err != nil {
log.Error(err, "Unable to create controller")
return err
}
if err = cb.SetupWebhook(mgr, opts...); err != nil {
log.Error(err, "Unable to create webhook")
return err
}
log.Info("Set up controller and webhook for job framework")
return nil
}
}
if err := noop.SetupWebhook(mgr, cb.JobType); err != nil {
log.Error(err, "Unable to create noop webhook")
return err
}
return nil
})
if err != nil {
if err := jobframework.SetupControllers(mgr, setupLog, opts...); err != nil {
setupLog.Error(err, "Unable to create controller or webhook", "kubernetesVersion", serverVersionFetcher.GetServerVersion())
os.Exit(1)
}
// +kubebuilder:scaffold:builder
Expand Down Expand Up @@ -415,12 +364,3 @@ func apply(configFile string) (ctrl.Options, configapi.Configuration, error) {

return options, cfg, nil
}

func isFrameworkEnabled(cfg *configapi.Configuration, name string) bool {
for _, framework := range cfg.Integrations.Frameworks {
if framework == name {
return true
}
}
return false
}
12 changes: 6 additions & 6 deletions pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestWlReconcileJobset(t *testing.T) {
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand All @@ -83,7 +83,7 @@ func TestWlReconcileJobset(t *testing.T) {
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand All @@ -110,7 +110,7 @@ func TestWlReconcileJobset(t *testing.T) {
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestWlReconcileJobset(t *testing.T) {
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Obj(),
Expand Down Expand Up @@ -171,7 +171,7 @@ func TestWlReconcileJobset(t *testing.T) {
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Obj(),
Expand Down Expand Up @@ -203,7 +203,7 @@ func TestWlReconcileJobset(t *testing.T) {
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Obj(),
Expand Down
20 changes: 10 additions & 10 deletions pkg/controller/admissionchecks/multikueue/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestWlReconcile(t *testing.T) {
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
Obj(),
},
worker1Workloads: []kueue.Workload{
Expand All @@ -99,7 +99,7 @@ func TestWlReconcile(t *testing.T) {
wantManagersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
Obj(),
},
},
Expand All @@ -108,14 +108,14 @@ func TestWlReconcile(t *testing.T) {
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
wantManagersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand All @@ -128,7 +128,7 @@ func TestWlReconcile(t *testing.T) {
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand All @@ -149,7 +149,7 @@ func TestWlReconcile(t *testing.T) {
State: kueue.CheckStatePending,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestWlReconcile(t *testing.T) {
State: kueue.CheckStatePending,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestWlReconcile(t *testing.T) {
State: kueue.CheckStatePending,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Obj(),
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestWlReconcile(t *testing.T) {
State: kueue.CheckStatePending,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Obj(),
Expand Down Expand Up @@ -271,7 +271,7 @@ func TestWlReconcile(t *testing.T) {
State: kueue.CheckStatePending,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Obj(),
Expand Down
12 changes: 6 additions & 6 deletions pkg/controller/core/workload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,28 +382,28 @@ func TestReconcile(t *testing.T) {
Type: "Finished",
Status: "True",
}).
OwnerReference(batchv1.SchemeGroupVersion.String(), "Job", "job", "test-uid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job", "test-uid", true, true).
DeletionTimestamp(testStartTime).
Obj(),
wantWorkload: utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
Condition(metav1.Condition{
Type: "Finished",
Status: "True",
}).
OwnerReference(batchv1.SchemeGroupVersion.String(), "Job", "job", "test-uid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job", "test-uid", true, true).
DeletionTimestamp(testStartTime).
Obj(),
},
"unadmitted workload with rejected checks": {
workload: utiltesting.MakeWorkload("wl", "ns").
OwnerReference("ownerapi", "ownerkind", "ownername", "owneruid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", true, true).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRejected,
}).
Obj(),
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
OwnerReference("ownerapi", "ownerkind", "ownername", "owneruid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", true, true).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRejected,
Expand All @@ -427,7 +427,7 @@ func TestReconcile(t *testing.T) {
workload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
OwnerReference("ownerapi", "ownerkind", "ownername", "owneruid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", true, true).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRejected,
Expand All @@ -436,7 +436,7 @@ func TestReconcile(t *testing.T) {
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
OwnerReference("ownerapi", "ownerkind", "ownername", "owneruid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", true, true).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRejected,
Expand Down
25 changes: 23 additions & 2 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
Expand All @@ -32,6 +33,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"

configapi "sigs.k8s.io/kueue/apis/config/v1beta1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/constants"
controllerconsts "sigs.k8s.io/kueue/pkg/controller/constants"
Expand Down Expand Up @@ -71,6 +73,8 @@ type Options struct {
KubeServerVersion *kubeversion.ServerVersionFetcher
// IntegrationOptions key is "$GROUP/$VERSION, Kind=$KIND".
IntegrationOptions map[string]any
EnabledFrameworks sets.Set[string]
ManagerName string
}

// Option configures the reconciler.
Expand All @@ -95,9 +99,9 @@ func WithManageJobsWithoutQueueName(f bool) Option {
// WithWaitForPodsReady indicates if the controller should add the PodsReady
// condition to the workload when the corresponding job has all pods ready
// or succeeded.
func WithWaitForPodsReady(f bool) Option {
func WithWaitForPodsReady(w *configapi.WaitForPodsReady) Option {
return func(o *Options) {
o.WaitForPodsReady = f
o.WaitForPodsReady = w != nil && w.Enable
}
}

Expand All @@ -118,6 +122,23 @@ func WithIntegrationOptions(integrationName string, opts any) Option {
}
}

// WithEnabledFrameworks adds framework names enabled in the ConfigAPI.
func WithEnabledFrameworks(i *configapi.Integrations) Option {
return func(o *Options) {
if i == nil || len(i.Frameworks) == 0 {
return
}
o.EnabledFrameworks = sets.New(i.Frameworks...)
}
}

// WithManagerName adds the kueue's manager name.
func WithManagerName(n string) Option {
return func(o *Options) {
o.ManagerName = n
}
}

var defaultOptions = Options{}

func NewReconciler(
Expand Down
Loading