Skip to content

Commit

Permalink
Utilize jobframework setups
Browse files Browse the repository at this point in the history
Signed-off-by: tenzen-y <[email protected]>
  • Loading branch information
tenzen-y committed Jan 24, 2024
1 parent a66ee8a commit 78acd53
Show file tree
Hide file tree
Showing 22 changed files with 390 additions and 112 deletions.
78 changes: 10 additions & 68 deletions cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package main
import (
"context"
"flag"
"fmt"
"os"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
Expand All @@ -30,7 +29,6 @@ import (
"go.uber.org/zap/zapcore"
corev1 "k8s.io/api/core/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
Expand All @@ -40,7 +38,6 @@ import (
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

Expand All @@ -55,7 +52,6 @@ import (
"sigs.k8s.io/kueue/pkg/controller/core"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/controller/jobs/noop"
"sigs.k8s.io/kueue/pkg/debugger"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/metrics"
Expand Down Expand Up @@ -216,15 +212,10 @@ func setupIndexes(ctx context.Context, mgr ctrl.Manager, cfg *configapi.Configur
}
}

err = jobframework.ForEachIntegration(func(name string, cb jobframework.IntegrationCallbacks) error {
if isFrameworkEnabled(cfg, name) {
if err := cb.SetupIndexes(ctx, mgr.GetFieldIndexer()); err != nil {
return fmt.Errorf("integration %s: %w", name, err)
}
}
return nil
})
return err
opts := []jobframework.Option{
jobframework.WithEnabledFrameworks(cfg.Integrations),
}
return jobframework.SetupIndexes(ctx, mgr.GetFieldIndexer(), opts...)
}

func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager, certsReady chan struct{}, cfg *configapi.Configuration, serverVersionFetcher *kubeversion.ServerVersionFetcher) {
Expand Down Expand Up @@ -261,61 +252,21 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag
}
}

manageJobsWithoutQueueName := cfg.ManageJobsWithoutQueueName

if failedWebhook, err := webhooks.Setup(mgr); err != nil {
setupLog.Error(err, "Unable to create webhook", "webhook", failedWebhook)
os.Exit(1)
}

opts := []jobframework.Option{
jobframework.WithManageJobsWithoutQueueName(manageJobsWithoutQueueName),
jobframework.WithWaitForPodsReady(waitForPodsReady(cfg)),
jobframework.WithManageJobsWithoutQueueName(cfg.ManageJobsWithoutQueueName),
jobframework.WithWaitForPodsReady(cfg.WaitForPodsReady),
jobframework.WithKubeServerVersion(serverVersionFetcher),
jobframework.WithIntegrationOptions(corev1.SchemeGroupVersion.WithKind("Pod").String(), cfg.Integrations.PodOptions),
jobframework.WithEnabledFrameworks(cfg.Integrations),
jobframework.WithManagerName(constants.KueueName),
}
err := jobframework.ForEachIntegration(func(name string, cb jobframework.IntegrationCallbacks) error {
log := setupLog.WithValues("jobFrameworkName", name)
if isFrameworkEnabled(cfg, name) {
if cb.CanSupportIntegration != nil {
if canSupport, err := cb.CanSupportIntegration(opts...); !canSupport || err != nil {
setupLog.Error(err, "Failed to configure reconcilers")
os.Exit(1)
}
}
gvk, err := apiutil.GVKForObject(cb.JobType, mgr.GetScheme())
if err != nil {
return err
}
if _, err = mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil {
if !meta.IsNoMatchError(err) {
return err
}
log.Info("No matching API in the server for job framework, skipped setup of controller and webhook")
} else {
if err = cb.NewReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", name, constants.KueueName)),
opts...,
).SetupWithManager(mgr); err != nil {
log.Error(err, "Unable to create controller")
return err
}
if err = cb.SetupWebhook(mgr, opts...); err != nil {
log.Error(err, "Unable to create webhook")
return err
}
log.Info("Set up controller and webhook for job framework")
return nil
}
}
if err := noop.SetupWebhook(mgr, cb.JobType); err != nil {
log.Error(err, "Unable to create noop webhook")
return err
}
return nil
})
if err != nil {
if err := jobframework.SetupControllers(mgr, setupLog, certsReady, opts...); err != nil {
setupLog.Error(err, "Unable to create controller or webhook", "kubernetesVersion", serverVersionFetcher.GetServerVersion())
os.Exit(1)
}
// +kubebuilder:scaffold:builder
Expand Down Expand Up @@ -415,12 +366,3 @@ func apply(configFile string) (ctrl.Options, configapi.Configuration, error) {

return options, cfg, nil
}

func isFrameworkEnabled(cfg *configapi.Configuration, name string) bool {
for _, framework := range cfg.Integrations.Frameworks {
if framework == name {
return true
}
}
return false
}
12 changes: 6 additions & 6 deletions pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestWlReconcileJobset(t *testing.T) {
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand All @@ -83,7 +83,7 @@ func TestWlReconcileJobset(t *testing.T) {
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand All @@ -110,7 +110,7 @@ func TestWlReconcileJobset(t *testing.T) {
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestWlReconcileJobset(t *testing.T) {
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Obj(),
Expand Down Expand Up @@ -171,7 +171,7 @@ func TestWlReconcileJobset(t *testing.T) {
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Obj(),
Expand Down Expand Up @@ -203,7 +203,7 @@ func TestWlReconcileJobset(t *testing.T) {
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Obj(),
Expand Down
20 changes: 10 additions & 10 deletions pkg/controller/admissionchecks/multikueue/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestWlReconcile(t *testing.T) {
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
Obj(),
},
worker1Workloads: []kueue.Workload{
Expand All @@ -99,7 +99,7 @@ func TestWlReconcile(t *testing.T) {
wantManagersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
Obj(),
},
},
Expand All @@ -108,14 +108,14 @@ func TestWlReconcile(t *testing.T) {
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
wantManagersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand All @@ -128,7 +128,7 @@ func TestWlReconcile(t *testing.T) {
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand All @@ -149,7 +149,7 @@ func TestWlReconcile(t *testing.T) {
State: kueue.CheckStatePending,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestWlReconcile(t *testing.T) {
State: kueue.CheckStatePending,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestWlReconcile(t *testing.T) {
State: kueue.CheckStatePending,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Obj(),
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestWlReconcile(t *testing.T) {
State: kueue.CheckStatePending,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Obj(),
Expand Down Expand Up @@ -271,7 +271,7 @@ func TestWlReconcile(t *testing.T) {
State: kueue.CheckStatePending,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Obj(),
Expand Down
12 changes: 6 additions & 6 deletions pkg/controller/core/workload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,28 +382,28 @@ func TestReconcile(t *testing.T) {
Type: "Finished",
Status: "True",
}).
OwnerReference(batchv1.SchemeGroupVersion.String(), "Job", "job", "test-uid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job", "test-uid", true, true).
DeletionTimestamp(testStartTime).
Obj(),
wantWorkload: utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
Condition(metav1.Condition{
Type: "Finished",
Status: "True",
}).
OwnerReference(batchv1.SchemeGroupVersion.String(), "Job", "job", "test-uid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job", "test-uid", true, true).
DeletionTimestamp(testStartTime).
Obj(),
},
"unadmitted workload with rejected checks": {
workload: utiltesting.MakeWorkload("wl", "ns").
OwnerReference("ownerapi", "ownerkind", "ownername", "owneruid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", true, true).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRejected,
}).
Obj(),
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
OwnerReference("ownerapi", "ownerkind", "ownername", "owneruid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", true, true).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRejected,
Expand All @@ -427,7 +427,7 @@ func TestReconcile(t *testing.T) {
workload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
OwnerReference("ownerapi", "ownerkind", "ownername", "owneruid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", true, true).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRejected,
Expand All @@ -436,7 +436,7 @@ func TestReconcile(t *testing.T) {
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
OwnerReference("ownerapi", "ownerkind", "ownername", "owneruid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", true, true).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRejected,
Expand Down
Loading

0 comments on commit 78acd53

Please sign in to comment.