generated from kubernetes/kubernetes-template-project
-
Notifications
You must be signed in to change notification settings - Fork 288
/
Copy pathjobframework.go
93 lines (84 loc) · 2.82 KB
/
jobframework.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package jobframework
import (
"context"
"errors"
"fmt"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/meta"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/kueue/pkg/controller/jobs/noop"
"sigs.k8s.io/kueue/pkg/util/cert"
)
type modifyOptions func(fwkName string, opts ...Option) ([]Option, error)
var (
errFailedMappingResource = errors.New("restMapper failed mapping resource")
)
func SetupControllers(
mgr ctrl.Manager,
setupLog logr.Logger,
modifyOpts modifyOptions,
certsReady chan struct{},
opts ...Option,
) error {
// The controllers won't work until the webhooks are operating, and the webhook won't work until the
// certs are all in place.
cert.WaitForCertsReady(setupLog, certsReady)
options := DefaultOptions
for _, opt := range opts {
opt(&options)
}
return ForEachIntegration(func(name string, cb IntegrationCallbacks) error {
log := setupLog.WithValues("jobFrameworkName", name)
fwkNamePrefix := fmt.Sprintf("jobFrameworkName %q", name)
if options.EnabledFrameworks.Has(name) {
gvk, err := apiutil.GVKForObject(cb.JobType, mgr.GetScheme())
if err != nil {
return fmt.Errorf("%s: %w: %w", fwkNamePrefix, errFailedMappingResource, err)
}
if _, err = mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil {
if !meta.IsNoMatchError(err) {
return fmt.Errorf("%s: %w", fwkNamePrefix, err)
}
log.Info("No matching API in the server for job framework, skipped setup of controller and webhook")
} else {
if err = cb.NewReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", name, options.ManagerName)),
opts...,
).SetupWithManager(mgr); err != nil {
return fmt.Errorf("%s: %w", fwkNamePrefix, err)
}
if modifyOpts != nil {
if opts, err = modifyOpts(name, opts...); err != nil {
return fmt.Errorf("%s: failed to configure reconcilers: %w", fwkNamePrefix, err)
}
}
if err = cb.SetupWebhook(mgr, opts...); err != nil {
return fmt.Errorf("%s: unable to create webhook: %w", fwkNamePrefix, err)
}
log.Info("Set up controller and webhook for job framework")
return nil
}
}
if err := noop.SetupWebhook(mgr, cb.JobType); err != nil {
return fmt.Errorf("%s: unable to create noop webhook: %w", fwkNamePrefix, err)
}
return nil
})
}
func SetupIndexes(ctx context.Context, indexer client.FieldIndexer, opts ...Option) error {
options := DefaultOptions
for _, opt := range opts {
opt(&options)
}
return ForEachIntegration(func(name string, cb IntegrationCallbacks) error {
if options.EnabledFrameworks.Has(name) {
if err := cb.SetupIndexes(ctx, indexer); err != nil {
return fmt.Errorf("jobFrameworkName %q: %w", name, err)
}
}
return nil
})
}