Skip to content

Commit

Permalink
Utilize jobframework setups
Browse files Browse the repository at this point in the history
Signed-off-by: tenzen-y <[email protected]>
  • Loading branch information
tenzen-y committed Jan 22, 2024
1 parent e3c74b6 commit b60d7ed
Show file tree
Hide file tree
Showing 11 changed files with 350 additions and 98 deletions.
97 changes: 22 additions & 75 deletions cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"errors"
"flag"
"fmt"
"os"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
Expand All @@ -30,7 +29,6 @@ import (
zaplog "go.uber.org/zap"
"go.uber.org/zap/zapcore"
schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
Expand All @@ -40,7 +38,6 @@ import (
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

Expand All @@ -55,7 +52,6 @@ import (
"sigs.k8s.io/kueue/pkg/controller/core"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/controller/jobs/noop"
"sigs.k8s.io/kueue/pkg/debugger"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/metrics"
Expand Down Expand Up @@ -217,15 +213,10 @@ func setupIndexes(ctx context.Context, mgr ctrl.Manager, cfg *configapi.Configur
}
}

err = jobframework.ForEachIntegration(func(name string, cb jobframework.IntegrationCallbacks) error {
if isFrameworkEnabled(cfg, name) {
if err := cb.SetupIndexes(ctx, mgr.GetFieldIndexer()); err != nil {
return fmt.Errorf("integration %s: %w", name, err)
}
}
return nil
})
return err
opts := []jobframework.Option{
jobframework.WithEnabledFrameworks(config.EnabledFrameworks(cfg)),
}
return jobframework.SetupIndexes(ctx, mgr.GetFieldIndexer(), opts...)
}

func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager, certsReady chan struct{}, cfg *configapi.Configuration, serverVersionFetcher *kubeversion.ServerVersionFetcher) {
Expand Down Expand Up @@ -262,69 +253,34 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag
}
}

manageJobsWithoutQueueName := cfg.ManageJobsWithoutQueueName

if failedWebhook, err := webhooks.Setup(mgr); err != nil {
setupLog.Error(err, "Unable to create webhook", "webhook", failedWebhook)
os.Exit(1)
}

opts := []jobframework.Option{
jobframework.WithManageJobsWithoutQueueName(manageJobsWithoutQueueName),
jobframework.WithWaitForPodsReady(waitForPodsReady(cfg)),
jobframework.WithManageJobsWithoutQueueName(cfg.ManageJobsWithoutQueueName),
jobframework.WithWaitForPodsReady(config.IsWaitForPodsReadyEnable(cfg)),
jobframework.WithKubeServerVersion(serverVersionFetcher),
jobframework.WithEnabledFrameworks(config.EnabledFrameworks(cfg)),
jobframework.WithManagerName(constants.KueueName),
}
err := jobframework.ForEachIntegration(func(name string, cb jobframework.IntegrationCallbacks) error {
log := setupLog.WithValues("jobFrameworkName", name)
if isFrameworkEnabled(cfg, name) {
gvk, err := apiutil.GVKForObject(cb.JobType, mgr.GetScheme())
if err != nil {
return err
}
if _, err = mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil {
if !meta.IsNoMatchError(err) {
return err
}
log.Info("No matching API in the server for job framework, skipped setup of controller and webhook")
} else {
if err = cb.NewReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", name, constants.KueueName)),
opts...,
).SetupWithManager(mgr); err != nil {
log.Error(err, "Unable to create controller")
return err
}
if name == "pod" {
v := serverVersionFetcher.GetServerVersion()
if v.String() == "" || v.LessThan(kubeversion.KubeVersion1_27) {
setupLog.Error(errPodIntegration,
"Failed to configure reconcilers",
"kubernetesVersion", v)
os.Exit(1)
}

opts = append(
opts,
jobframework.WithPodNamespaceSelector(cfg.Integrations.PodOptions.NamespaceSelector),
jobframework.WithPodSelector(cfg.Integrations.PodOptions.PodSelector),
)
}
if err = cb.SetupWebhook(mgr, opts...); err != nil {
log.Error(err, "Unable to create webhook")
return err
}
log.Info("Set up controller and webhook for job framework")
return nil
addPodIntegrationOptsFunc := func(fwkName string, opts ...jobframework.Option) ([]jobframework.Option, error) {
if fwkName == "pod" {
v := serverVersionFetcher.GetServerVersion()
if v.String() == "" || v.LessThan(kubeversion.KubeVersion1_27) {
return opts, errPodIntegration
}
opts = append(
opts,
jobframework.WithPodNamespaceSelector(cfg.Integrations.PodOptions.NamespaceSelector),
jobframework.WithPodSelector(cfg.Integrations.PodOptions.PodSelector),
)
}
if err := noop.SetupWebhook(mgr, cb.JobType); err != nil {
log.Error(err, "Unable to create noop webhook")
return err
}
return nil
})
if err != nil {
return opts, nil
}
if err := jobframework.SetupControllers(mgr, setupLog, addPodIntegrationOptsFunc, certsReady, opts...); err != nil {
setupLog.Error(err, "Unable to create controller or webhook", "kubernetesVersion", serverVersionFetcher.GetServerVersion())
os.Exit(1)
}
// +kubebuilder:scaffold:builder
Expand Down Expand Up @@ -424,12 +380,3 @@ func apply(configFile string) (ctrl.Options, configapi.Configuration, error) {

return options, cfg, nil
}

func isFrameworkEnabled(cfg *configapi.Configuration, name string) bool {
for _, framework := range cfg.Integrations.Frameworks {
if framework == name {
return true
}
}
return false
}
12 changes: 12 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/sets"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/webhook"

Expand Down Expand Up @@ -168,3 +169,14 @@ func Load(scheme *runtime.Scheme, configFile string) (ctrl.Options, configapi.Co
addTo(&options, &cfg)
return options, cfg, err
}

func EnabledFrameworks(cfg *configapi.Configuration) sets.Set[string] {
if cfg.Integrations == nil || len(cfg.Integrations.Frameworks) == 0 {
return nil
}
return sets.New(cfg.Integrations.Frameworks...)
}

func IsWaitForPodsReadyEnable(cfg *configapi.Configuration) bool {
return cfg.WaitForPodsReady != nil && cfg.WaitForPodsReady.Enable
}
16 changes: 8 additions & 8 deletions pkg/controller/admissionchecks/multikueue/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestWlReconcile(t *testing.T) {
managersWorkloads: []kueue.Workload{
*baseWorklodBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
Obj(),
},
worker1Workloads: []kueue.Workload{
Expand All @@ -99,7 +99,7 @@ func TestWlReconcile(t *testing.T) {
wantManagersWorkloads: []kueue.Workload{
*baseWorklodBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
Obj(),
},
},
Expand All @@ -108,14 +108,14 @@ func TestWlReconcile(t *testing.T) {
managersWorkloads: []kueue.Workload{
*baseWorklodBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
wantManagersWorkloads: []kueue.Workload{
*baseWorklodBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand All @@ -128,7 +128,7 @@ func TestWlReconcile(t *testing.T) {
managersWorkloads: []kueue.Workload{
*baseWorklodBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand All @@ -149,7 +149,7 @@ func TestWlReconcile(t *testing.T) {
State: kueue.CheckStatePending,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestWlReconcile(t *testing.T) {
State: kueue.CheckStatePending,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestWlReconcile(t *testing.T) {
State: kueue.CheckStatePending,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("batch/v1", "Job", "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Obj(),
Expand Down
12 changes: 6 additions & 6 deletions pkg/controller/core/workload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,28 +382,28 @@ func TestReconcile(t *testing.T) {
Type: "Finished",
Status: "True",
}).
OwnerReference(batchv1.SchemeGroupVersion.String(), "Job", "job", "test-uid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job", "test-uid", true, true).
DeletionTimestamp(testStartTime).
Obj(),
wantWorkload: utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
Condition(metav1.Condition{
Type: "Finished",
Status: "True",
}).
OwnerReference(batchv1.SchemeGroupVersion.String(), "Job", "job", "test-uid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job", "test-uid", true, true).
DeletionTimestamp(testStartTime).
Obj(),
},
"unadmitted workload with rejected checks": {
workload: utiltesting.MakeWorkload("wl", "ns").
OwnerReference("ownerapi", "ownerkind", "ownername", "owneruid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", true, true).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRejected,
}).
Obj(),
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
OwnerReference("ownerapi", "ownerkind", "ownername", "owneruid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", true, true).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRejected,
Expand All @@ -427,7 +427,7 @@ func TestReconcile(t *testing.T) {
workload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
OwnerReference("ownerapi", "ownerkind", "ownername", "owneruid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", true, true).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRejected,
Expand All @@ -436,7 +436,7 @@ func TestReconcile(t *testing.T) {
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
OwnerReference("ownerapi", "ownerkind", "ownername", "owneruid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", true, true).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRejected,
Expand Down
93 changes: 93 additions & 0 deletions pkg/controller/jobframework/jobframework.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package jobframework

import (
"context"
"errors"
"fmt"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/meta"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"

"sigs.k8s.io/kueue/pkg/controller/jobs/noop"
"sigs.k8s.io/kueue/pkg/util/cert"
)

type kubeVersionFunc func(fwkName string, opts ...Option) ([]Option, error)

var (
errFailedMappingResource = errors.New("restMapper failed mapping resource")
)

func SetupControllers(
mgr ctrl.Manager,
setupLog logr.Logger,
kubeVersionFunc kubeVersionFunc,
certsReady chan struct{},
opts ...Option,
) error {
// The controllers won't work until the webhooks are operating, and the webhook won't work until the
// certs are all in place.
cert.WaitForCertsReady(setupLog, certsReady)
options := DefaultOptions
for _, opt := range opts {
opt(&options)
}

return ForEachIntegration(func(name string, cb IntegrationCallbacks) error {
log := setupLog.WithValues("jobFrameworkName", name)
fwkNamePrefix := fmt.Sprintf("jobFrameworkName %q", name)

if options.EnabledFrameworks.Has(name) {
gvk, err := apiutil.GVKForObject(cb.JobType, mgr.GetScheme())
if err != nil {
return fmt.Errorf("%s: %w: %w", fwkNamePrefix, errFailedMappingResource, err)
}
if _, err = mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil {
if !meta.IsNoMatchError(err) {
return fmt.Errorf("%s: %w", fwkNamePrefix, err)
}
log.Info("No matching API in the server for job framework, skipped setup of controller and webhook")
} else {
if err = cb.NewReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", name, options.ManagerName)),
opts...,
).SetupWithManager(mgr); err != nil {
return fmt.Errorf("%s: %w", fwkNamePrefix, err)
}
if kubeVersionFunc != nil {
if opts, err = kubeVersionFunc(name, opts...); err != nil {
return fmt.Errorf("%s: failed to configure reconcilers: %w", fwkNamePrefix, err)
}
}
if err = cb.SetupWebhook(mgr, opts...); err != nil {
return fmt.Errorf("%s: unable to create webhook: %w", fwkNamePrefix, err)
}
log.Info("Set up controller and webhook for job framework")
return nil
}
}
if err := noop.SetupWebhook(mgr, cb.JobType); err != nil {
return fmt.Errorf("%s: unable to create noop webhook: %w", fwkNamePrefix, err)
}
return nil
})
}

func SetupIndexes(ctx context.Context, indexer client.FieldIndexer, opts ...Option) error {
options := DefaultOptions
for _, opt := range opts {
opt(&options)
}
return ForEachIntegration(func(name string, cb IntegrationCallbacks) error {
if options.EnabledFrameworks.Has(name) {
if err := cb.SetupIndexes(ctx, indexer); err != nil {
return fmt.Errorf("jobFrameworkName %q: %w", name, err)
}
}
return nil
})
}
Loading

0 comments on commit b60d7ed

Please sign in to comment.