Skip to content

Commit

Permalink
Utilize jobframework setups (kubernetes-sigs#1630)
Browse files Browse the repository at this point in the history
* Utilize jobframework setups

Signed-off-by: tenzen-y <[email protected]>

* Use WaitForCertsReady function in main setupControllers

Signed-off-by: tenzen-y <[email protected]>

* Add a comment how/when use the utilized functions

Signed-off-by: tenzen-y <[email protected]>

* Rename jobframework.go with setup.go

Signed-off-by: tenzen-y <[email protected]>

* No wait for certs get ready inside the SetupControlers

Signed-off-by: tenzen-y <[email protected]>

---------

Signed-off-by: tenzen-y <[email protected]>
  • Loading branch information
tenzen-y authored and kannon92 committed Nov 19, 2024
1 parent 9fdbc00 commit 5623386
Show file tree
Hide file tree
Showing 22 changed files with 399 additions and 115 deletions.
82 changes: 11 additions & 71 deletions cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package main
import (
"context"
"flag"
"fmt"
"os"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
Expand All @@ -30,7 +29,6 @@ import (
"go.uber.org/zap/zapcore"
corev1 "k8s.io/api/core/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
Expand All @@ -40,7 +38,6 @@ import (
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

Expand All @@ -55,7 +52,6 @@ import (
"sigs.k8s.io/kueue/pkg/controller/core"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/controller/jobs/noop"
"sigs.k8s.io/kueue/pkg/debugger"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/metrics"
Expand Down Expand Up @@ -216,23 +212,16 @@ func setupIndexes(ctx context.Context, mgr ctrl.Manager, cfg *configapi.Configur
}
}

err = jobframework.ForEachIntegration(func(name string, cb jobframework.IntegrationCallbacks) error {
if isFrameworkEnabled(cfg, name) {
if err := cb.SetupIndexes(ctx, mgr.GetFieldIndexer()); err != nil {
return fmt.Errorf("integration %s: %w", name, err)
}
}
return nil
})
return err
opts := []jobframework.Option{
jobframework.WithEnabledFrameworks(cfg.Integrations),
}
return jobframework.SetupIndexes(ctx, mgr.GetFieldIndexer(), opts...)
}

func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager, certsReady chan struct{}, cfg *configapi.Configuration, serverVersionFetcher *kubeversion.ServerVersionFetcher) {
// The controllers won't work until the webhooks are operating, and the webhook won't work until the
// certs are all in place.
setupLog.Info("Waiting for certificate generation to complete")
<-certsReady
setupLog.Info("Certs ready")
cert.WaitForCertsReady(setupLog, certsReady)

if failedCtrl, err := core.SetupControllers(mgr, queues, cCache, cfg); err != nil {
setupLog.Error(err, "Unable to create controller", "controller", failedCtrl)
Expand Down Expand Up @@ -261,61 +250,21 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag
}
}

manageJobsWithoutQueueName := cfg.ManageJobsWithoutQueueName

if failedWebhook, err := webhooks.Setup(mgr); err != nil {
setupLog.Error(err, "Unable to create webhook", "webhook", failedWebhook)
os.Exit(1)
}

opts := []jobframework.Option{
jobframework.WithManageJobsWithoutQueueName(manageJobsWithoutQueueName),
jobframework.WithWaitForPodsReady(waitForPodsReady(cfg)),
jobframework.WithManageJobsWithoutQueueName(cfg.ManageJobsWithoutQueueName),
jobframework.WithWaitForPodsReady(cfg.WaitForPodsReady),
jobframework.WithKubeServerVersion(serverVersionFetcher),
jobframework.WithIntegrationOptions(corev1.SchemeGroupVersion.WithKind("Pod").String(), cfg.Integrations.PodOptions),
jobframework.WithEnabledFrameworks(cfg.Integrations),
jobframework.WithManagerName(constants.KueueName),
}
err := jobframework.ForEachIntegration(func(name string, cb jobframework.IntegrationCallbacks) error {
log := setupLog.WithValues("jobFrameworkName", name)
if isFrameworkEnabled(cfg, name) {
if cb.CanSupportIntegration != nil {
if canSupport, err := cb.CanSupportIntegration(opts...); !canSupport || err != nil {
setupLog.Error(err, "Failed to configure reconcilers")
os.Exit(1)
}
}
gvk, err := apiutil.GVKForObject(cb.JobType, mgr.GetScheme())
if err != nil {
return err
}
if _, err = mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil {
if !meta.IsNoMatchError(err) {
return err
}
log.Info("No matching API in the server for job framework, skipped setup of controller and webhook")
} else {
if err = cb.NewReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", name, constants.KueueName)),
opts...,
).SetupWithManager(mgr); err != nil {
log.Error(err, "Unable to create controller")
return err
}
if err = cb.SetupWebhook(mgr, opts...); err != nil {
log.Error(err, "Unable to create webhook")
return err
}
log.Info("Set up controller and webhook for job framework")
return nil
}
}
if err := noop.SetupWebhook(mgr, cb.JobType); err != nil {
log.Error(err, "Unable to create noop webhook")
return err
}
return nil
})
if err != nil {
if err := jobframework.SetupControllers(mgr, setupLog, opts...); err != nil {
setupLog.Error(err, "Unable to create controller or webhook", "kubernetesVersion", serverVersionFetcher.GetServerVersion())
os.Exit(1)
}
// +kubebuilder:scaffold:builder
Expand Down Expand Up @@ -415,12 +364,3 @@ func apply(configFile string) (ctrl.Options, configapi.Configuration, error) {

return options, cfg, nil
}

func isFrameworkEnabled(cfg *configapi.Configuration, name string) bool {
for _, framework := range cfg.Integrations.Frameworks {
if framework == name {
return true
}
}
return false
}
12 changes: 6 additions & 6 deletions pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestWlReconcileJobset(t *testing.T) {
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand All @@ -83,7 +83,7 @@ func TestWlReconcileJobset(t *testing.T) {
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand All @@ -110,7 +110,7 @@ func TestWlReconcileJobset(t *testing.T) {
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestWlReconcileJobset(t *testing.T) {
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Obj(),
Expand Down Expand Up @@ -171,7 +171,7 @@ func TestWlReconcileJobset(t *testing.T) {
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Obj(),
Expand Down Expand Up @@ -203,7 +203,7 @@ func TestWlReconcileJobset(t *testing.T) {
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Obj(),
Expand Down
20 changes: 10 additions & 10 deletions pkg/controller/admissionchecks/multikueue/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestWlReconcile(t *testing.T) {
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
Obj(),
},
worker1Workloads: []kueue.Workload{
Expand All @@ -99,7 +99,7 @@ func TestWlReconcile(t *testing.T) {
wantManagersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
Obj(),
},
},
Expand All @@ -108,14 +108,14 @@ func TestWlReconcile(t *testing.T) {
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
wantManagersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand All @@ -128,7 +128,7 @@ func TestWlReconcile(t *testing.T) {
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand All @@ -149,7 +149,7 @@ func TestWlReconcile(t *testing.T) {
State: kueue.CheckStatePending,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestWlReconcile(t *testing.T) {
State: kueue.CheckStatePending,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestWlReconcile(t *testing.T) {
State: kueue.CheckStatePending,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Obj(),
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestWlReconcile(t *testing.T) {
State: kueue.CheckStatePending,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Obj(),
Expand Down Expand Up @@ -271,7 +271,7 @@ func TestWlReconcile(t *testing.T) {
State: kueue.CheckStatePending,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Obj(),
Expand Down
12 changes: 6 additions & 6 deletions pkg/controller/core/workload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,28 +382,28 @@ func TestReconcile(t *testing.T) {
Type: "Finished",
Status: "True",
}).
OwnerReference(batchv1.SchemeGroupVersion.String(), "Job", "job", "test-uid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job", "test-uid", true, true).
DeletionTimestamp(testStartTime).
Obj(),
wantWorkload: utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
Condition(metav1.Condition{
Type: "Finished",
Status: "True",
}).
OwnerReference(batchv1.SchemeGroupVersion.String(), "Job", "job", "test-uid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job", "test-uid", true, true).
DeletionTimestamp(testStartTime).
Obj(),
},
"unadmitted workload with rejected checks": {
workload: utiltesting.MakeWorkload("wl", "ns").
OwnerReference("ownerapi", "ownerkind", "ownername", "owneruid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", true, true).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRejected,
}).
Obj(),
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
OwnerReference("ownerapi", "ownerkind", "ownername", "owneruid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", true, true).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRejected,
Expand All @@ -427,7 +427,7 @@ func TestReconcile(t *testing.T) {
workload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
OwnerReference("ownerapi", "ownerkind", "ownername", "owneruid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", true, true).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRejected,
Expand All @@ -436,7 +436,7 @@ func TestReconcile(t *testing.T) {
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
OwnerReference("ownerapi", "ownerkind", "ownername", "owneruid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", true, true).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRejected,
Expand Down
25 changes: 23 additions & 2 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
Expand All @@ -32,6 +33,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"

configapi "sigs.k8s.io/kueue/apis/config/v1beta1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/constants"
controllerconsts "sigs.k8s.io/kueue/pkg/controller/constants"
Expand Down Expand Up @@ -71,6 +73,8 @@ type Options struct {
KubeServerVersion *kubeversion.ServerVersionFetcher
// IntegrationOptions key is "$GROUP/$VERSION, Kind=$KIND".
IntegrationOptions map[string]any
EnabledFrameworks sets.Set[string]
ManagerName string
}

// Option configures the reconciler.
Expand All @@ -95,9 +99,9 @@ func WithManageJobsWithoutQueueName(f bool) Option {
// WithWaitForPodsReady indicates if the controller should add the PodsReady
// condition to the workload when the corresponding job has all pods ready
// or succeeded.
func WithWaitForPodsReady(f bool) Option {
func WithWaitForPodsReady(w *configapi.WaitForPodsReady) Option {
return func(o *Options) {
o.WaitForPodsReady = f
o.WaitForPodsReady = w != nil && w.Enable
}
}

Expand All @@ -118,6 +122,23 @@ func WithIntegrationOptions(integrationName string, opts any) Option {
}
}

// WithEnabledFrameworks adds framework names enabled in the ConfigAPI.
func WithEnabledFrameworks(i *configapi.Integrations) Option {
return func(o *Options) {
if i == nil || len(i.Frameworks) == 0 {
return
}
o.EnabledFrameworks = sets.New(i.Frameworks...)
}
}

// WithManagerName adds the kueue's manager name.
func WithManagerName(n string) Option {
return func(o *Options) {
o.ManagerName = n
}
}

var defaultOptions = Options{}

func NewReconciler(
Expand Down
Loading

0 comments on commit 5623386

Please sign in to comment.