From 40c6b5ce05afa1cb4254e92ae09044f44e67e319 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Fri, 26 Apr 2024 11:52:39 +0200 Subject: [PATCH] :warning: Add TypedReconciler This change adds a TypedReconciler which allows to customize the type being used in the workqueue. There is a number of situations where a custom type might be better than the default `reconcile.Request`: * Multi-Cluster controllers might want to put the clusters in there * Some controllers do not reconcile individual resources of a given type but all of them at once, for example IngressControllers might do this * Some controllers do not operate on Kubernetes resources at all --- .golangci.yml | 4 - examples/typed/main.go | 60 ++++++++++ pkg/builder/controller.go | 106 +++++++++++------ pkg/builder/controller_test.go | 71 ++++++------ pkg/builder/options.go | 34 +++--- pkg/builder/webhook_test.go | 2 - pkg/controller/controller.go | 47 +++++--- pkg/controller/controller_test.go | 33 +++--- pkg/controller/controllertest/testing.go | 13 ++- pkg/handler/enqueue.go | 10 +- pkg/handler/enqueue_mapped.go | 50 +++++--- pkg/handler/enqueue_owner.go | 10 +- pkg/handler/eventhandler.go | 33 +++--- pkg/handler/eventhandler_test.go | 50 +++----- pkg/handler/example_test.go | 10 +- pkg/internal/controller/controller.go | 49 +++----- pkg/internal/controller/controller_test.go | 95 +++++++-------- pkg/internal/source/event_handler.go | 38 +++--- pkg/internal/source/internal_test.go | 30 ++--- pkg/internal/source/kind.go | 14 +-- pkg/ratelimiter/doc.go | 22 ---- pkg/ratelimiter/ratelimiter.go | 30 ----- pkg/reconcile/reconcile.go | 18 ++- pkg/source/source.go | 107 ++++++++++++----- pkg/source/source_integration_test.go | 57 +++++---- pkg/source/source_test.go | 129 +++++++++++++-------- 26 files changed, 633 insertions(+), 489 deletions(-) create mode 100644 examples/typed/main.go delete mode 100644 pkg/ratelimiter/doc.go delete mode 100644 pkg/ratelimiter/ratelimiter.go diff --git a/.golangci.yml b/.golangci.yml index 696a52ebb0..4c43665e2b 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -122,10 +122,6 @@ issues: - linters: - staticcheck text: "SA1019: .*The component config package has been deprecated and will be removed in a future release." - - linters: - - staticcheck - # Will be addressed separately. - text: "SA1019: workqueue.(RateLimitingInterface|DefaultControllerRateLimiter|New|NewItemExponentialFailureRateLimiter|NewRateLimitingQueueWithConfig|DefaultItemBasedRateLimiter|RateLimitingQueueConfig) is deprecated:" # With Go 1.16, the new embed directive can be used with an un-named import, # revive (previously, golint) only allows these to be imported in a main.go, which wouldn't work for us. # This directive allows the embed package to be imported with an underscore everywhere. diff --git a/examples/typed/main.go b/examples/typed/main.go new file mode 100644 index 0000000000..a1a4f203e7 --- /dev/null +++ b/examples/typed/main.go @@ -0,0 +1,60 @@ +package main + +import ( + "context" + "fmt" + "os" + + networkingv1 "k8s.io/api/networking/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +func main() { + if err := run(); err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } +} + +func run() error { + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{}) + if err != nil { + return fmt.Errorf("failed to construct manager: %w", err) + } + + // Use a request type that is always equal to itself so the workqueue + // de-duplicates all events. + // This can for example be useful for an ingress-controller that + // generates a config from all ingresses, rather than individual ones. + type request struct{} + + r := reconcile.TypedFunc[request](func(ctx context.Context, _ request) (reconcile.Result, error) { + ingressList := &networkingv1.IngressList{} + if err := mgr.GetClient().List(ctx, ingressList); err != nil { + return reconcile.Result{}, fmt.Errorf("failed to list ingresses: %w", err) + } + + buildIngressConfig(ingressList) + return reconcile.Result{}, nil + }) + if err := builder.TypedControllerManagedBy[request](mgr). + WatchesRawSource(source.TypedKind( + mgr.GetCache(), + &networkingv1.Ingress{}, + handler.TypedEnqueueRequestsFromMapFunc(func(context.Context, *networkingv1.Ingress) []request { + return []request{{}} + })), + ). + Named("ingress_controller"). + Complete(r); err != nil { + return fmt.Errorf("failed to construct ingress-controller: %w", err) + } + + return nil +} + +func buildIngressConfig(*networkingv1.IngressList) {} diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index 2c0063a837..8721f9be05 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -19,6 +19,7 @@ package builder import ( "errors" "fmt" + "reflect" "strings" "github.com/go-logr/logr" @@ -37,7 +38,6 @@ import ( ) // Supporting mocking out functions for testing. -var newController = controller.New var getGvk = apiutil.GVKForObject // project represents other forms that we can use to @@ -52,21 +52,32 @@ const ( ) // Builder builds a Controller. -type Builder struct { +type Builder = TypedBuilder[reconcile.Request] + +// TypedBuilder builds a Controller. The request is the request type +// that is passed to the workqueue and then to the Reconciler. +// The workqueue de-duplicates identical requests. +type TypedBuilder[request comparable] struct { forInput ForInput ownsInput []OwnsInput - rawSources []source.Source - watchesInput []WatchesInput + rawSources []source.TypedSource[request] + watchesInput []WatchesInput[request] mgr manager.Manager globalPredicates []predicate.Predicate - ctrl controller.Controller - ctrlOptions controller.Options + ctrl controller.TypedController[request] + ctrlOptions controller.TypedOptions[request] name string + newController func(name string, mgr manager.Manager, options controller.TypedOptions[request]) (controller.TypedController[request], error) } // ControllerManagedBy returns a new controller builder that will be started by the provided Manager. -func ControllerManagedBy(m manager.Manager) *Builder { - return &Builder{mgr: m} +func ControllerManagedBy(m manager.Manager) *TypedBuilder[reconcile.Request] { + return TypedControllerManagedBy[reconcile.Request](m) +} + +// TypedControllerManagedBy returns a new tyepd controller builder that will be started by the provided Manager. +func TypedControllerManagedBy[request comparable](m manager.Manager) *TypedBuilder[request] { + return &TypedBuilder[request]{mgr: m} } // ForInput represents the information set by the For method. @@ -81,7 +92,7 @@ type ForInput struct { // update events by *reconciling the object*. // This is the equivalent of calling // Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{}). -func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder { +func (blder *TypedBuilder[request]) For(object client.Object, opts ...ForOption) *TypedBuilder[request] { if blder.forInput.object != nil { blder.forInput.err = fmt.Errorf("For(...) should only be called once, could not assign multiple objects for reconciliation") return blder @@ -111,7 +122,7 @@ type OwnsInput struct { // // By default, this is the equivalent of calling // Watches(object, handler.EnqueueRequestForOwner([...], ownerType, OnlyControllerOwner())). -func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder { +func (blder *TypedBuilder[request]) Owns(object client.Object, opts ...OwnsOption) *TypedBuilder[request] { input := OwnsInput{object: object} for _, opt := range opts { opt.ApplyToOwns(&input) @@ -122,9 +133,9 @@ func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder { } // WatchesInput represents the information set by Watches method. -type WatchesInput struct { +type WatchesInput[request comparable] struct { obj client.Object - handler handler.EventHandler + handler handler.TypedEventHandler[client.Object, request] predicates []predicate.Predicate objectProjection objectProjection } @@ -134,8 +145,12 @@ type WatchesInput struct { // // This is the equivalent of calling // WatchesRawSource(source.Kind(cache, object, eventHandler, predicates...)). -func (blder *Builder) Watches(object client.Object, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder { - input := WatchesInput{ +func (blder *TypedBuilder[request]) Watches( + object client.Object, + eventHandler handler.TypedEventHandler[client.Object, request], + opts ...WatchesOption[request], +) *TypedBuilder[request] { + input := WatchesInput[request]{ obj: object, handler: eventHandler, } @@ -175,8 +190,12 @@ func (blder *Builder) Watches(object client.Object, eventHandler handler.EventHa // In the first case, controller-runtime will create another cache for the // concrete type on top of the metadata cache; this increases memory // consumption and leads to race conditions as caches are not in sync. -func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder { - opts = append(opts, OnlyMetadata) +func (blder *TypedBuilder[request]) WatchesMetadata( + object client.Object, + eventHandler handler.TypedEventHandler[client.Object, request], + opts ...WatchesOption[request], +) *TypedBuilder[request] { + opts = append(opts, projectAs[request](projectAsMetadata)) return blder.Watches(object, eventHandler, opts...) } @@ -187,7 +206,7 @@ func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler // This method is only exposed for more advanced use cases, most users should use one of the higher level functions. // // WatchesRawSource does not respect predicates configured through WithEventFilter. -func (blder *Builder) WatchesRawSource(src source.Source) *Builder { +func (blder *TypedBuilder[request]) WatchesRawSource(src source.TypedSource[request]) *TypedBuilder[request] { blder.rawSources = append(blder.rawSources, src) return blder @@ -197,19 +216,19 @@ func (blder *Builder) WatchesRawSource(src source.Source) *Builder { // trigger reconciliations. For example, filtering on whether the resource version has changed. // Given predicate is added for all watched objects. // Defaults to the empty list. -func (blder *Builder) WithEventFilter(p predicate.Predicate) *Builder { +func (blder *TypedBuilder[request]) WithEventFilter(p predicate.Predicate) *TypedBuilder[request] { blder.globalPredicates = append(blder.globalPredicates, p) return blder } // WithOptions overrides the controller options used in doController. Defaults to empty. -func (blder *Builder) WithOptions(options controller.Options) *Builder { +func (blder *TypedBuilder[request]) WithOptions(options controller.TypedOptions[request]) *TypedBuilder[request] { blder.ctrlOptions = options return blder } // WithLogConstructor overrides the controller options's LogConstructor. -func (blder *Builder) WithLogConstructor(logConstructor func(*reconcile.Request) logr.Logger) *Builder { +func (blder *TypedBuilder[request]) WithLogConstructor(logConstructor func(*request) logr.Logger) *TypedBuilder[request] { blder.ctrlOptions.LogConstructor = logConstructor return blder } @@ -219,19 +238,19 @@ func (blder *Builder) WithLogConstructor(logConstructor func(*reconcile.Request) // (underscores and alphanumeric characters only). // // By default, controllers are named using the lowercase version of their kind. -func (blder *Builder) Named(name string) *Builder { +func (blder *TypedBuilder[request]) Named(name string) *TypedBuilder[request] { blder.name = name return blder } // Complete builds the Application Controller. -func (blder *Builder) Complete(r reconcile.Reconciler) error { +func (blder *TypedBuilder[request]) Complete(r reconcile.TypedReconciler[request]) error { _, err := blder.Build(r) return err } // Build builds the Application Controller and returns the Controller it created. -func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) { +func (blder *TypedBuilder[request]) Build(r reconcile.TypedReconciler[request]) (controller.TypedController[request], error) { if r == nil { return nil, fmt.Errorf("must provide a non-nil Reconciler") } @@ -255,7 +274,7 @@ func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, erro return blder.ctrl, nil } -func (blder *Builder) project(obj client.Object, proj objectProjection) (client.Object, error) { +func (blder *TypedBuilder[request]) project(obj client.Object, proj objectProjection) (client.Object, error) { switch proj { case projectAsNormal: return obj, nil @@ -272,17 +291,23 @@ func (blder *Builder) project(obj client.Object, proj objectProjection) (client. } } -func (blder *Builder) doWatch() error { +func (blder *TypedBuilder[request]) doWatch() error { // Reconcile type if blder.forInput.object != nil { obj, err := blder.project(blder.forInput.object, blder.forInput.objectProjection) if err != nil { return err } - hdler := &handler.EnqueueRequestForObject{} + + var hdler handler.TypedEventHandler[client.Object, request] + if reflect.TypeFor[request]() != reflect.TypeOf(reconcile.Request{}) { + return errors.New("For() is not supported for TypedBuilder") + } + + reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(&handler.EnqueueRequestForObject{})) allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, blder.forInput.predicates...) - src := source.Kind(blder.mgr.GetCache(), obj, hdler, allPredicates...) + src := source.TypedKind(blder.mgr.GetCache(), obj, hdler, allPredicates...) if err := blder.ctrl.Watch(src); err != nil { return err } @@ -301,14 +326,18 @@ func (blder *Builder) doWatch() error { if !own.matchEveryOwner { opts = append(opts, handler.OnlyControllerOwner()) } - hdler := handler.EnqueueRequestForOwner( + var hdler handler.TypedEventHandler[client.Object, request] + if reflect.TypeFor[request]() != reflect.TypeOf(reconcile.Request{}) { + return errors.New("Owns() is not supported for TypedBuilder") + } + reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.EnqueueRequestForOwner( blder.mgr.GetScheme(), blder.mgr.GetRESTMapper(), blder.forInput.object, opts..., - ) + ))) allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, own.predicates...) - src := source.Kind(blder.mgr.GetCache(), obj, hdler, allPredicates...) + src := source.TypedKind(blder.mgr.GetCache(), obj, hdler, allPredicates...) if err := blder.ctrl.Watch(src); err != nil { return err } @@ -325,7 +354,7 @@ func (blder *Builder) doWatch() error { } allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, w.predicates...) - if err := blder.ctrl.Watch(source.Kind(blder.mgr.GetCache(), projected, w.handler, allPredicates...)); err != nil { + if err := blder.ctrl.Watch(source.TypedKind[client.Object, request](blder.mgr.GetCache(), projected, w.handler, allPredicates...)); err != nil { return err } } @@ -337,7 +366,7 @@ func (blder *Builder) doWatch() error { return nil } -func (blder *Builder) getControllerName(gvk schema.GroupVersionKind, hasGVK bool) (string, error) { +func (blder *TypedBuilder[request]) getControllerName(gvk schema.GroupVersionKind, hasGVK bool) (string, error) { if blder.name != "" { return blder.name, nil } @@ -347,7 +376,7 @@ func (blder *Builder) getControllerName(gvk schema.GroupVersionKind, hasGVK bool return strings.ToLower(gvk.Kind), nil } -func (blder *Builder) doController(r reconcile.Reconciler) error { +func (blder *TypedBuilder[request]) doController(r reconcile.TypedReconciler[request]) error { globalOpts := blder.mgr.GetControllerOptions() ctrlOptions := blder.ctrlOptions @@ -401,9 +430,10 @@ func (blder *Builder) doController(r reconcile.Reconciler) error { ) } - ctrlOptions.LogConstructor = func(req *reconcile.Request) logr.Logger { + ctrlOptions.LogConstructor = func(in *request) logr.Logger { log := log - if req != nil { + + if req, ok := any(in).(*reconcile.Request); ok && req != nil { if hasGVK { log = log.WithValues(gvk.Kind, klog.KRef(req.Namespace, req.Name)) } @@ -415,7 +445,11 @@ func (blder *Builder) doController(r reconcile.Reconciler) error { } } + if blder.newController == nil { + blder.newController = controller.NewTyped[request] + } + // Build the controller and return. - blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions) + blder.ctrl, err = blder.newController(controllerName, blder.mgr, ctrlOptions) return err } diff --git a/pkg/builder/controller_test.go b/pkg/builder/controller_test.go index 4ff576edad..84cd4bae1c 100644 --- a/pkg/builder/controller_test.go +++ b/pkg/builder/controller_test.go @@ -76,10 +76,6 @@ func (l *testLogger) WithName(name string) logr.LogSink { } var _ = Describe("application", func() { - BeforeEach(func() { - newController = controller.New - }) - noop := reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) { return reconcile.Result{}, nil }) @@ -182,19 +178,19 @@ var _ = Describe("application", func() { }) It("should return an error if it cannot create the controller", func() { - newController = func(name string, mgr manager.Manager, options controller.Options) ( - controller.Controller, error) { - return nil, fmt.Errorf("expected error") - } By("creating a controller manager") m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) - instance, err := ControllerManagedBy(m). + builder := ControllerManagedBy(m). For(&appsv1.ReplicaSet{}). - Owns(&appsv1.ReplicaSet{}). - Build(noop) + Owns(&appsv1.ReplicaSet{}) + builder.newController = func(name string, mgr manager.Manager, options controller.Options) ( + controller.Controller, error) { + return nil, fmt.Errorf("expected error") + } + instance, err := builder.Build(noop) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("expected error")) Expect(instance).To(BeNil()) @@ -202,7 +198,7 @@ var _ = Describe("application", func() { It("should override max concurrent reconcilers during creation of controller", func() { const maxConcurrentReconciles = 5 - newController = func(name string, mgr manager.Manager, options controller.Options) ( + newController := func(name string, mgr manager.Manager, options controller.Options) ( controller.Controller, error) { if options.MaxConcurrentReconciles == maxConcurrentReconciles { return controller.New(name, mgr, options) @@ -214,18 +210,20 @@ var _ = Describe("application", func() { m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) - instance, err := ControllerManagedBy(m). + builder := ControllerManagedBy(m). For(&appsv1.ReplicaSet{}). Owns(&appsv1.ReplicaSet{}). - WithOptions(controller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}). - Build(noop) + WithOptions(controller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}) + builder.newController = newController + + instance, err := builder.Build(noop) Expect(err).NotTo(HaveOccurred()) Expect(instance).NotTo(BeNil()) }) It("should override max concurrent reconcilers during creation of controller, when using", func() { const maxConcurrentReconciles = 10 - newController = func(name string, mgr manager.Manager, options controller.Options) ( + newController := func(name string, mgr manager.Manager, options controller.Options) ( controller.Controller, error) { if options.MaxConcurrentReconciles == maxConcurrentReconciles { return controller.New(name, mgr, options) @@ -243,17 +241,19 @@ var _ = Describe("application", func() { }) Expect(err).NotTo(HaveOccurred()) - instance, err := ControllerManagedBy(m). + builder := ControllerManagedBy(m). For(&appsv1.ReplicaSet{}). - Owns(&appsv1.ReplicaSet{}). - Build(noop) + Owns(&appsv1.ReplicaSet{}) + builder.newController = newController + + instance, err := builder.Build(noop) Expect(err).NotTo(HaveOccurred()) Expect(instance).NotTo(BeNil()) }) It("should override rate limiter during creation of controller", func() { - rateLimiter := workqueue.DefaultItemBasedRateLimiter() - newController = func(name string, mgr manager.Manager, options controller.Options) (controller.Controller, error) { + rateLimiter := workqueue.DefaultTypedItemBasedRateLimiter[reconcile.Request]() + newController := func(name string, mgr manager.Manager, options controller.Options) (controller.Controller, error) { if options.RateLimiter == rateLimiter { return controller.New(name, mgr, options) } @@ -264,19 +264,20 @@ var _ = Describe("application", func() { m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) - instance, err := ControllerManagedBy(m). + builder := ControllerManagedBy(m). For(&appsv1.ReplicaSet{}). Owns(&appsv1.ReplicaSet{}). - WithOptions(controller.Options{RateLimiter: rateLimiter}). - Build(noop) + WithOptions(controller.Options{RateLimiter: rateLimiter}) + builder.newController = newController + + instance, err := builder.Build(noop) Expect(err).NotTo(HaveOccurred()) Expect(instance).NotTo(BeNil()) }) It("should override logger during creation of controller", func() { - logger := &testLogger{} - newController = func(name string, mgr manager.Manager, options controller.Options) (controller.Controller, error) { + newController := func(name string, mgr manager.Manager, options controller.Options) (controller.Controller, error) { if options.LogConstructor(nil).GetSink() == logger { return controller.New(name, mgr, options) } @@ -287,19 +288,20 @@ var _ = Describe("application", func() { m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) - instance, err := ControllerManagedBy(m). + builder := ControllerManagedBy(m). For(&appsv1.ReplicaSet{}). Owns(&appsv1.ReplicaSet{}). WithLogConstructor(func(request *reconcile.Request) logr.Logger { return logr.New(logger) - }). - Build(noop) + }) + builder.newController = newController + instance, err := builder.Build(noop) Expect(err).NotTo(HaveOccurred()) Expect(instance).NotTo(BeNil()) }) It("should not allow multiple reconcilers during creation of controller", func() { - newController = func(name string, mgr manager.Manager, options controller.Options) (controller.Controller, error) { + newController := func(name string, mgr manager.Manager, options controller.Options) (controller.Controller, error) { if options.Reconciler != (typedNoop{}) { return nil, fmt.Errorf("Custom reconciler expected %T but found %T", typedNoop{}, options.Reconciler) } @@ -310,11 +312,12 @@ var _ = Describe("application", func() { m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) - instance, err := ControllerManagedBy(m). + builder := ControllerManagedBy(m). For(&appsv1.ReplicaSet{}). Owns(&appsv1.ReplicaSet{}). - WithOptions(controller.Options{Reconciler: typedNoop{}}). - Build(noop) + WithOptions(controller.Options{Reconciler: typedNoop{}}) + builder.newController = newController + instance, err := builder.Build(noop) Expect(err).To(HaveOccurred()) Expect(instance).To(BeNil()) }) @@ -590,7 +593,7 @@ func (c *nonTypedOnlyCache) GetInformerForKind(ctx context.Context, gvk schema.G // TODO(directxman12): this function has too many arguments, and the whole // "nameSuffix" think is a bit of a hack It should be cleaned up significantly by someone with a bit of time. -func doReconcileTest(ctx context.Context, nameSuffix string, mgr manager.Manager, complete bool, blders ...*Builder) { +func doReconcileTest(ctx context.Context, nameSuffix string, mgr manager.Manager, complete bool, blders ...*TypedBuilder[reconcile.Request]) { deployName := "deploy-name-" + nameSuffix rsName := "rs-name-" + nameSuffix diff --git a/pkg/builder/options.go b/pkg/builder/options.go index 15f66b2a82..d2e7f311cc 100644 --- a/pkg/builder/options.go +++ b/pkg/builder/options.go @@ -18,6 +18,7 @@ package builder import ( "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" ) // {{{ "Functional" Option Interfaces @@ -35,9 +36,9 @@ type OwnsOption interface { } // WatchesOption is some configuration that modifies options for a watches request. -type WatchesOption interface { +type WatchesOption[request comparable] interface { // ApplyToWatches applies this configuration to the given watches options. - ApplyToWatches(*WatchesInput) + ApplyToWatches(*WatchesInput[request]) } // }}} @@ -52,28 +53,31 @@ func WithPredicates(predicates ...predicate.Predicate) Predicates { } // Predicates filters events before enqueuing the keys. -type Predicates struct { +type Predicates = TypedPredicates[reconcile.Request] + +// TypedPredicates filters events before enqueuing the keys. +type TypedPredicates[request comparable] struct { predicates []predicate.Predicate } // ApplyToFor applies this configuration to the given ForInput options. -func (w Predicates) ApplyToFor(opts *ForInput) { +func (w TypedPredicates[request]) ApplyToFor(opts *ForInput) { opts.predicates = w.predicates } // ApplyToOwns applies this configuration to the given OwnsInput options. -func (w Predicates) ApplyToOwns(opts *OwnsInput) { +func (w TypedPredicates[request]) ApplyToOwns(opts *OwnsInput) { opts.predicates = w.predicates } // ApplyToWatches applies this configuration to the given WatchesInput options. -func (w Predicates) ApplyToWatches(opts *WatchesInput) { +func (w TypedPredicates[request]) ApplyToWatches(opts *WatchesInput[request]) { opts.predicates = w.predicates } var _ ForOption = &Predicates{} var _ OwnsOption = &Predicates{} -var _ WatchesOption = &Predicates{} +var _ WatchesOption[reconcile.Request] = &Predicates{} // }}} @@ -82,20 +86,20 @@ var _ WatchesOption = &Predicates{} // projectAs configures the projection on the input. // Currently only OnlyMetadata is supported. We might want to expand // this to arbitrary non-special local projections in the future. -type projectAs objectProjection +type projectAs[request comparable] objectProjection // ApplyToFor applies this configuration to the given ForInput options. -func (p projectAs) ApplyToFor(opts *ForInput) { +func (p projectAs[request]) ApplyToFor(opts *ForInput) { opts.objectProjection = objectProjection(p) } // ApplyToOwns applies this configuration to the given OwnsInput options. -func (p projectAs) ApplyToOwns(opts *OwnsInput) { +func (p projectAs[request]) ApplyToOwns(opts *OwnsInput) { opts.objectProjection = objectProjection(p) } // ApplyToWatches applies this configuration to the given WatchesInput options. -func (p projectAs) ApplyToWatches(opts *WatchesInput) { +func (p projectAs[request]) ApplyToWatches(opts *WatchesInput[request]) { opts.objectProjection = objectProjection(p) } @@ -130,11 +134,11 @@ var ( // In the first case, controller-runtime will create another cache for the // concrete type on top of the metadata cache; this increases memory // consumption and leads to race conditions as caches are not in sync. - OnlyMetadata = projectAs(projectAsMetadata) + OnlyMetadata = projectAs[reconcile.Request](projectAsMetadata) - _ ForOption = OnlyMetadata - _ OwnsOption = OnlyMetadata - _ WatchesOption = OnlyMetadata + _ ForOption = OnlyMetadata + _ OwnsOption = OnlyMetadata + _ WatchesOption[reconcile.Request] = OnlyMetadata ) // }}} diff --git a/pkg/builder/webhook_test.go b/pkg/builder/webhook_test.go index 4028b549a0..abb11bf957 100644 --- a/pkg/builder/webhook_test.go +++ b/pkg/builder/webhook_test.go @@ -34,7 +34,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "sigs.k8s.io/controller-runtime/pkg/controller" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -70,7 +69,6 @@ func runTests(admissionReviewVersion string) { BeforeEach(func() { stop = make(chan struct{}) - newController = controller.New logBuffer = gbytes.NewBuffer() testingLogger = zap.New(zap.JSONEncoder(), zap.WriteTo(io.MultiWriter(logBuffer, GinkgoWriter))) }) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 5c9e48beae..6a709bee40 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -27,13 +27,15 @@ import ( "sigs.k8s.io/controller-runtime/pkg/internal/controller" "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/ratelimiter" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) // Options are the arguments for creating a new Controller. -type Options struct { +type Options = TypedOptions[reconcile.Request] + +// TypedOptions are the arguments for creating a new Controller. +type TypedOptions[T comparable] struct { // MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1. MaxConcurrentReconciles int @@ -50,12 +52,12 @@ type Options struct { NeedLeaderElection *bool // Reconciler reconciles an object - Reconciler reconcile.Reconciler + Reconciler reconcile.TypedReconciler[T] // RateLimiter is used to limit how frequently requests may be queued. // Defaults to MaxOfRateLimiter which has both overall and per-item rate limiting. // The overall is a token bucket and the per-item is exponential. - RateLimiter ratelimiter.RateLimiter + RateLimiter workqueue.TypedRateLimiter[T] // NewQueue constructs the queue for this controller once the controller is ready to start. // With NewQueue a custom queue implementation can be used, e.g. a priority queue to prioritize with which @@ -67,23 +69,26 @@ type Options struct { // // NOTE: LOW LEVEL PRIMITIVE! // Only use a custom NewQueue if you know what you are doing. - NewQueue func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface + NewQueue func(controllerName string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] // LogConstructor is used to construct a logger used for this controller and passed // to each reconciliation via the context field. - LogConstructor func(request *reconcile.Request) logr.Logger + LogConstructor func(request *T) logr.Logger } // Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests // from source.Sources. Work is performed through the reconcile.Reconciler for each enqueued item. // Work typically is reads and writes Kubernetes objects to make the system state match the state specified // in the object Spec. -type Controller interface { +type Controller = TypedController[reconcile.Request] + +// TypedController implements an API. +type TypedController[T comparable] interface { // Reconciler is called to reconcile an object by Namespace/Name - reconcile.Reconciler + reconcile.TypedReconciler[T] // Watch watches the provided Source. - Watch(src source.Source) error + Watch(src source.TypedSource[T]) error // Start starts the controller. Start blocks until the context is closed or a // controller has an error starting. @@ -96,7 +101,12 @@ type Controller interface { // New returns a new Controller registered with the Manager. The Manager will ensure that shared Caches have // been synced before the Controller is Started. func New(name string, mgr manager.Manager, options Options) (Controller, error) { - c, err := NewUnmanaged(name, mgr, options) + return NewTyped(name, mgr, options) +} + +// NewTyped returns a new typed controller registered with the Manager, +func NewTyped[T comparable](name string, mgr manager.Manager, options TypedOptions[T]) (TypedController[T], error) { + c, err := NewTypedUnmanaged(name, mgr, options) if err != nil { return nil, err } @@ -108,6 +118,11 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error) // NewUnmanaged returns a new controller without adding it to the manager. The // caller is responsible for starting the returned controller. func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) { + return NewTyped(name, mgr, options) +} + +// NewTypedUnmanaged returns a new typed controller without adding it to the manager. +func NewTypedUnmanaged[T comparable](name string, mgr manager.Manager, options TypedOptions[T]) (TypedController[T], error) { if options.Reconciler == nil { return nil, fmt.Errorf("must specify Reconciler") } @@ -120,9 +135,9 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller log := mgr.GetLogger().WithValues( "controller", name, ) - options.LogConstructor = func(req *reconcile.Request) logr.Logger { + options.LogConstructor = func(in *T) logr.Logger { log := log - if req != nil { + if req, ok := any(in).(*reconcile.Request); ok && req != nil { log = log.WithValues( "object", klog.KRef(req.Namespace, req.Name), "namespace", req.Namespace, "name", req.Name, @@ -149,12 +164,12 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller } if options.RateLimiter == nil { - options.RateLimiter = workqueue.DefaultControllerRateLimiter() + options.RateLimiter = workqueue.DefaultTypedControllerRateLimiter[T]() } if options.NewQueue == nil { - options.NewQueue = func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface { - return workqueue.NewRateLimitingQueueWithConfig(rateLimiter, workqueue.RateLimitingQueueConfig{ + options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] { + return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[T]{ Name: controllerName, }) } @@ -169,7 +184,7 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller } // Create controller with dependencies set - return &controller.Controller{ + return &controller.Controller[T]{ Do: options.Reconciler, RateLimiter: options.RateLimiter, NewQueue: options.NewQueue, diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 0454cb4b90..c27181a0ef 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -33,7 +33,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" internalcontroller "sigs.k8s.io/controller-runtime/pkg/internal/controller" "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/ratelimiter" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) @@ -144,7 +143,7 @@ var _ = Describe("controller.Controller", func() { }) Expect(err).NotTo(HaveOccurred()) - ctrl, ok := c.(*internalcontroller.Controller) + ctrl, ok := c.(*internalcontroller.Controller[reconcile.Request]) Expect(ok).To(BeTrue()) Expect(ctrl.RateLimiter).NotTo(BeNil()) @@ -155,9 +154,9 @@ var _ = Describe("controller.Controller", func() { m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) - customRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second) + customRateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](5*time.Millisecond, 1000*time.Second) customNewQueueCalled := false - customNewQueue := func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface { + customNewQueue := func(controllerName string, rateLimiter workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { customNewQueueCalled = true return nil } @@ -169,7 +168,7 @@ var _ = Describe("controller.Controller", func() { }) Expect(err).NotTo(HaveOccurred()) - ctrl, ok := c.(*internalcontroller.Controller) + ctrl, ok := c.(*internalcontroller.Controller[reconcile.Request]) Expect(ok).To(BeTrue()) Expect(ctrl.RateLimiter).To(BeIdenticalTo(customRateLimiter)) @@ -186,7 +185,7 @@ var _ = Describe("controller.Controller", func() { }) Expect(err).NotTo(HaveOccurred()) - ctrl, ok := c.(*internalcontroller.Controller) + ctrl, ok := c.(*internalcontroller.Controller[reconcile.Request]) Expect(ok).To(BeTrue()) Expect(ctrl.RecoverPanic).NotTo(BeNil()) @@ -203,7 +202,7 @@ var _ = Describe("controller.Controller", func() { }) Expect(err).NotTo(HaveOccurred()) - ctrl, ok := c.(*internalcontroller.Controller) + ctrl, ok := c.(*internalcontroller.Controller[reconcile.Request]) Expect(ok).To(BeTrue()) Expect(ctrl.RecoverPanic).NotTo(BeNil()) @@ -219,7 +218,7 @@ var _ = Describe("controller.Controller", func() { }) Expect(err).NotTo(HaveOccurred()) - ctrl, ok := c.(*internalcontroller.Controller) + ctrl, ok := c.(*internalcontroller.Controller[reconcile.Request]) Expect(ok).To(BeTrue()) Expect(ctrl.NeedLeaderElection()).To(BeTrue()) @@ -235,7 +234,7 @@ var _ = Describe("controller.Controller", func() { }) Expect(err).NotTo(HaveOccurred()) - ctrl, ok := c.(*internalcontroller.Controller) + ctrl, ok := c.(*internalcontroller.Controller[reconcile.Request]) Expect(ok).To(BeTrue()) Expect(ctrl.NeedLeaderElection()).To(BeFalse()) @@ -250,7 +249,7 @@ var _ = Describe("controller.Controller", func() { }) Expect(err).NotTo(HaveOccurred()) - ctrl, ok := c.(*internalcontroller.Controller) + ctrl, ok := c.(*internalcontroller.Controller[reconcile.Request]) Expect(ok).To(BeTrue()) Expect(ctrl.MaxConcurrentReconciles).To(BeEquivalentTo(5)) @@ -265,7 +264,7 @@ var _ = Describe("controller.Controller", func() { }) Expect(err).NotTo(HaveOccurred()) - ctrl, ok := c.(*internalcontroller.Controller) + ctrl, ok := c.(*internalcontroller.Controller[reconcile.Request]) Expect(ok).To(BeTrue()) Expect(ctrl.MaxConcurrentReconciles).To(BeEquivalentTo(1)) @@ -281,7 +280,7 @@ var _ = Describe("controller.Controller", func() { }) Expect(err).NotTo(HaveOccurred()) - ctrl, ok := c.(*internalcontroller.Controller) + ctrl, ok := c.(*internalcontroller.Controller[reconcile.Request]) Expect(ok).To(BeTrue()) Expect(ctrl.MaxConcurrentReconciles).To(BeEquivalentTo(5)) @@ -296,7 +295,7 @@ var _ = Describe("controller.Controller", func() { }) Expect(err).NotTo(HaveOccurred()) - ctrl, ok := c.(*internalcontroller.Controller) + ctrl, ok := c.(*internalcontroller.Controller[reconcile.Request]) Expect(ok).To(BeTrue()) Expect(ctrl.CacheSyncTimeout).To(BeEquivalentTo(5)) @@ -311,7 +310,7 @@ var _ = Describe("controller.Controller", func() { }) Expect(err).NotTo(HaveOccurred()) - ctrl, ok := c.(*internalcontroller.Controller) + ctrl, ok := c.(*internalcontroller.Controller[reconcile.Request]) Expect(ok).To(BeTrue()) Expect(ctrl.CacheSyncTimeout).To(BeEquivalentTo(2 * time.Minute)) @@ -327,7 +326,7 @@ var _ = Describe("controller.Controller", func() { }) Expect(err).NotTo(HaveOccurred()) - ctrl, ok := c.(*internalcontroller.Controller) + ctrl, ok := c.(*internalcontroller.Controller[reconcile.Request]) Expect(ok).To(BeTrue()) Expect(ctrl.CacheSyncTimeout).To(BeEquivalentTo(5)) @@ -342,7 +341,7 @@ var _ = Describe("controller.Controller", func() { }) Expect(err).NotTo(HaveOccurred()) - ctrl, ok := c.(*internalcontroller.Controller) + ctrl, ok := c.(*internalcontroller.Controller[reconcile.Request]) Expect(ok).To(BeTrue()) Expect(ctrl.NeedLeaderElection()).To(BeTrue()) @@ -358,7 +357,7 @@ var _ = Describe("controller.Controller", func() { }) Expect(err).NotTo(HaveOccurred()) - ctrl, ok := c.(*internalcontroller.Controller) + ctrl, ok := c.(*internalcontroller.Controller[reconcile.Request]) Expect(ok).To(BeTrue()) Expect(ctrl.NeedLeaderElection()).To(BeFalse()) diff --git a/pkg/controller/controllertest/testing.go b/pkg/controller/controllertest/testing.go index 627915f94b..014b86dd39 100644 --- a/pkg/controller/controllertest/testing.go +++ b/pkg/controller/controllertest/testing.go @@ -23,6 +23,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/reconcile" ) var _ runtime.Object = &ErrorType{} @@ -36,23 +37,23 @@ func (ErrorType) GetObjectKind() schema.ObjectKind { return nil } // DeepCopyObject implements runtime.Object. func (ErrorType) DeepCopyObject() runtime.Object { return nil } -var _ workqueue.RateLimitingInterface = &Queue{} +var _ workqueue.TypedRateLimitingInterface[reconcile.Request] = &Queue{} // Queue implements a RateLimiting queue as a non-ratelimited queue for testing. // This helps testing by having functions that use a RateLimiting queue synchronously add items to the queue. type Queue struct { - workqueue.Interface + workqueue.TypedInterface[reconcile.Request] AddedRateLimitedLock sync.Mutex AddedRatelimited []any } // AddAfter implements RateLimitingInterface. -func (q *Queue) AddAfter(item interface{}, duration time.Duration) { +func (q *Queue) AddAfter(item reconcile.Request, duration time.Duration) { q.Add(item) } // AddRateLimited implements RateLimitingInterface. TODO(community): Implement this. -func (q *Queue) AddRateLimited(item interface{}) { +func (q *Queue) AddRateLimited(item reconcile.Request) { q.AddedRateLimitedLock.Lock() q.AddedRatelimited = append(q.AddedRatelimited, item) q.AddedRateLimitedLock.Unlock() @@ -60,9 +61,9 @@ func (q *Queue) AddRateLimited(item interface{}) { } // Forget implements RateLimitingInterface. TODO(community): Implement this. -func (q *Queue) Forget(item interface{}) {} +func (q *Queue) Forget(item reconcile.Request) {} // NumRequeues implements RateLimitingInterface. TODO(community): Implement this. -func (q *Queue) NumRequeues(item interface{}) int { +func (q *Queue) NumRequeues(item reconcile.Request) int { return 0 } diff --git a/pkg/handler/enqueue.go b/pkg/handler/enqueue.go index c9c7693854..1a1d1ab2f4 100644 --- a/pkg/handler/enqueue.go +++ b/pkg/handler/enqueue.go @@ -44,10 +44,10 @@ type EnqueueRequestForObject = TypedEnqueueRequestForObject[client.Object] // Controllers that have associated Resources (e.g. CRDs) to reconcile the associated Resource. // // TypedEnqueueRequestForObject is experimental and subject to future change. -type TypedEnqueueRequestForObject[T client.Object] struct{} +type TypedEnqueueRequestForObject[object client.Object] struct{} // Create implements EventHandler. -func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.RateLimitingInterface) { +func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { if isNil(evt.Object) { enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt) return @@ -59,7 +59,7 @@ func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event. } // Update implements EventHandler. -func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.RateLimitingInterface) { +func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { switch { case !isNil(evt.ObjectNew): q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ @@ -77,7 +77,7 @@ func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event. } // Delete implements EventHandler. -func (e *TypedEnqueueRequestForObject[T]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.RateLimitingInterface) { +func (e *TypedEnqueueRequestForObject[T]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { if isNil(evt.Object) { enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt) return @@ -89,7 +89,7 @@ func (e *TypedEnqueueRequestForObject[T]) Delete(ctx context.Context, evt event. } // Generic implements EventHandler. -func (e *TypedEnqueueRequestForObject[T]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.RateLimitingInterface) { +func (e *TypedEnqueueRequestForObject[T]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { if isNil(evt.Object) { enqueueLog.Error(nil, "GenericEvent received with no metadata", "event", evt) return diff --git a/pkg/handler/enqueue_mapped.go b/pkg/handler/enqueue_mapped.go index 6e34e2ae45..491bc40c42 100644 --- a/pkg/handler/enqueue_mapped.go +++ b/pkg/handler/enqueue_mapped.go @@ -27,13 +27,13 @@ import ( // MapFunc is the signature required for enqueueing requests from a generic function. // This type is usually used with EnqueueRequestsFromMapFunc when registering an event handler. -type MapFunc = TypedMapFunc[client.Object] +type MapFunc = TypedMapFunc[client.Object, reconcile.Request] // TypedMapFunc is the signature required for enqueueing requests from a generic function. // This type is usually used with EnqueueRequestsFromTypedMapFunc when registering an event handler. // // TypedMapFunc is experimental and subject to future change. -type TypedMapFunc[T any] func(context.Context, T) []reconcile.Request +type TypedMapFunc[object any, request comparable] func(context.Context, object) []request // EnqueueRequestsFromMapFunc enqueues Requests by running a transformation function that outputs a collection // of reconcile.Requests on each Event. The reconcile.Requests may be for an arbitrary set of objects @@ -61,46 +61,62 @@ func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler { // objects and both sets of Requests are enqueue. // // TypedEnqueueRequestsFromMapFunc is experimental and subject to future change. -func TypedEnqueueRequestsFromMapFunc[T any](fn TypedMapFunc[T]) TypedEventHandler[T] { - return &enqueueRequestsFromMapFunc[T]{ +func TypedEnqueueRequestsFromMapFunc[object any, request comparable](fn TypedMapFunc[object, request]) TypedEventHandler[object, request] { + return &enqueueRequestsFromMapFunc[object, request]{ toRequests: fn, } } -var _ EventHandler = &enqueueRequestsFromMapFunc[client.Object]{} +var _ EventHandler = &enqueueRequestsFromMapFunc[client.Object, reconcile.Request]{} -type enqueueRequestsFromMapFunc[T any] struct { +type enqueueRequestsFromMapFunc[object any, request comparable] struct { // Mapper transforms the argument into a slice of keys to be reconciled - toRequests TypedMapFunc[T] + toRequests TypedMapFunc[object, request] } // Create implements EventHandler. -func (e *enqueueRequestsFromMapFunc[T]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.RateLimitingInterface) { - reqs := map[reconcile.Request]empty{} +func (e *enqueueRequestsFromMapFunc[object, request]) Create( + ctx context.Context, + evt event.TypedCreateEvent[object], + q workqueue.TypedRateLimitingInterface[request], +) { + reqs := map[request]empty{} e.mapAndEnqueue(ctx, q, evt.Object, reqs) } // Update implements EventHandler. -func (e *enqueueRequestsFromMapFunc[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.RateLimitingInterface) { - reqs := map[reconcile.Request]empty{} +func (e *enqueueRequestsFromMapFunc[object, request]) Update( + ctx context.Context, + evt event.TypedUpdateEvent[object], + q workqueue.TypedRateLimitingInterface[request], +) { + reqs := map[request]empty{} e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs) e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs) } // Delete implements EventHandler. -func (e *enqueueRequestsFromMapFunc[T]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.RateLimitingInterface) { - reqs := map[reconcile.Request]empty{} +func (e *enqueueRequestsFromMapFunc[object, request]) Delete( + ctx context.Context, + evt event.TypedDeleteEvent[object], + q workqueue.TypedRateLimitingInterface[request], +) { + reqs := map[request]empty{} e.mapAndEnqueue(ctx, q, evt.Object, reqs) } // Generic implements EventHandler. -func (e *enqueueRequestsFromMapFunc[T]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.RateLimitingInterface) { - reqs := map[reconcile.Request]empty{} +func (e *enqueueRequestsFromMapFunc[object, request]) Generic( + ctx context.Context, + evt event.TypedGenericEvent[object], + q workqueue.TypedRateLimitingInterface[request], +) { + reqs := map[request]empty{} e.mapAndEnqueue(ctx, q, evt.Object, reqs) } -func (e *enqueueRequestsFromMapFunc[T]) mapAndEnqueue(ctx context.Context, q workqueue.RateLimitingInterface, object T, reqs map[reconcile.Request]empty) { - for _, req := range e.toRequests(ctx, object) { +func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueue(ctx context.Context, q workqueue.TypedRateLimitingInterface[request], o object, reqs map[request]empty) { + for _, req := range e.toRequests(ctx, o) { _, ok := reqs[req] if !ok { q.Add(req) diff --git a/pkg/handler/enqueue_owner.go b/pkg/handler/enqueue_owner.go index 052a3140e1..b0df7d1f5d 100644 --- a/pkg/handler/enqueue_owner.go +++ b/pkg/handler/enqueue_owner.go @@ -61,7 +61,7 @@ func EnqueueRequestForOwner(scheme *runtime.Scheme, mapper meta.RESTMapper, owne // - a handler.typedEnqueueRequestForOwner EventHandler with an OwnerType of ReplicaSet and OnlyControllerOwner set to true. // // TypedEnqueueRequestForOwner is experimental and subject to future change. -func TypedEnqueueRequestForOwner[T client.Object](scheme *runtime.Scheme, mapper meta.RESTMapper, ownerType client.Object, opts ...OwnerOption) TypedEventHandler[T] { +func TypedEnqueueRequestForOwner[T client.Object](scheme *runtime.Scheme, mapper meta.RESTMapper, ownerType client.Object, opts ...OwnerOption) TypedEventHandler[T, reconcile.Request] { e := &enqueueRequestForOwner[T]{ ownerType: ownerType, mapper: mapper, @@ -105,7 +105,7 @@ func (e *enqueueRequestForOwner[T]) setIsController(isController bool) { } // Create implements EventHandler. -func (e *enqueueRequestForOwner[T]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.RateLimitingInterface) { +func (e *enqueueRequestForOwner[T]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { reqs := map[reconcile.Request]empty{} e.getOwnerReconcileRequest(evt.Object, reqs) for req := range reqs { @@ -114,7 +114,7 @@ func (e *enqueueRequestForOwner[T]) Create(ctx context.Context, evt event.TypedC } // Update implements EventHandler. -func (e *enqueueRequestForOwner[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.RateLimitingInterface) { +func (e *enqueueRequestForOwner[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { reqs := map[reconcile.Request]empty{} e.getOwnerReconcileRequest(evt.ObjectOld, reqs) e.getOwnerReconcileRequest(evt.ObjectNew, reqs) @@ -124,7 +124,7 @@ func (e *enqueueRequestForOwner[T]) Update(ctx context.Context, evt event.TypedU } // Delete implements EventHandler. -func (e *enqueueRequestForOwner[T]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.RateLimitingInterface) { +func (e *enqueueRequestForOwner[T]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { reqs := map[reconcile.Request]empty{} e.getOwnerReconcileRequest(evt.Object, reqs) for req := range reqs { @@ -133,7 +133,7 @@ func (e *enqueueRequestForOwner[T]) Delete(ctx context.Context, evt event.TypedD } // Generic implements EventHandler. -func (e *enqueueRequestForOwner[T]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.RateLimitingInterface) { +func (e *enqueueRequestForOwner[T]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { reqs := map[reconcile.Request]empty{} e.getOwnerReconcileRequest(evt.Object, reqs) for req := range reqs { diff --git a/pkg/handler/eventhandler.go b/pkg/handler/eventhandler.go index 1756ffefa3..07715fe8b4 100644 --- a/pkg/handler/eventhandler.go +++ b/pkg/handler/eventhandler.go @@ -22,6 +22,7 @@ import ( "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/reconcile" ) // EventHandler enqueues reconcile.Requests in response to events (e.g. Pod Create). EventHandlers map an Event @@ -42,7 +43,7 @@ import ( // // Unless you are implementing your own EventHandler, you can ignore the functions on the EventHandler interface. // Most users shouldn't need to implement their own EventHandler. -type EventHandler TypedEventHandler[client.Object] +type EventHandler TypedEventHandler[client.Object, reconcile.Request] // TypedEventHandler enqueues reconcile.Requests in response to events (e.g. Pod Create). TypedEventHandlers map an Event // for one object to trigger Reconciles for either the same object or different objects - e.g. if there is an @@ -64,70 +65,70 @@ type EventHandler TypedEventHandler[client.Object] // Most users shouldn't need to implement their own TypedEventHandler. // // TypedEventHandler is experimental and subject to future change. -type TypedEventHandler[T any] interface { +type TypedEventHandler[object any, request comparable] interface { // Create is called in response to a create event - e.g. Pod Creation. - Create(context.Context, event.TypedCreateEvent[T], workqueue.RateLimitingInterface) + Create(context.Context, event.TypedCreateEvent[object], workqueue.TypedRateLimitingInterface[request]) // Update is called in response to an update event - e.g. Pod Updated. - Update(context.Context, event.TypedUpdateEvent[T], workqueue.RateLimitingInterface) + Update(context.Context, event.TypedUpdateEvent[object], workqueue.TypedRateLimitingInterface[request]) // Delete is called in response to a delete event - e.g. Pod Deleted. - Delete(context.Context, event.TypedDeleteEvent[T], workqueue.RateLimitingInterface) + Delete(context.Context, event.TypedDeleteEvent[object], workqueue.TypedRateLimitingInterface[request]) // Generic is called in response to an event of an unknown type or a synthetic event triggered as a cron or // external trigger request - e.g. reconcile Autoscaling, or a Webhook. - Generic(context.Context, event.TypedGenericEvent[T], workqueue.RateLimitingInterface) + Generic(context.Context, event.TypedGenericEvent[object], workqueue.TypedRateLimitingInterface[request]) } var _ EventHandler = Funcs{} // Funcs implements eventhandler. -type Funcs = TypedFuncs[client.Object] +type Funcs = TypedFuncs[client.Object, reconcile.Request] // TypedFuncs implements eventhandler. // // TypedFuncs is experimental and subject to future change. -type TypedFuncs[T any] struct { +type TypedFuncs[object any, request comparable] struct { // Create is called in response to an add event. Defaults to no-op. // RateLimitingInterface is used to enqueue reconcile.Requests. - CreateFunc func(context.Context, event.TypedCreateEvent[T], workqueue.RateLimitingInterface) + CreateFunc func(context.Context, event.TypedCreateEvent[object], workqueue.TypedRateLimitingInterface[request]) // Update is called in response to an update event. Defaults to no-op. // RateLimitingInterface is used to enqueue reconcile.Requests. - UpdateFunc func(context.Context, event.TypedUpdateEvent[T], workqueue.RateLimitingInterface) + UpdateFunc func(context.Context, event.TypedUpdateEvent[object], workqueue.TypedRateLimitingInterface[request]) // Delete is called in response to a delete event. Defaults to no-op. // RateLimitingInterface is used to enqueue reconcile.Requests. - DeleteFunc func(context.Context, event.TypedDeleteEvent[T], workqueue.RateLimitingInterface) + DeleteFunc func(context.Context, event.TypedDeleteEvent[object], workqueue.TypedRateLimitingInterface[request]) // GenericFunc is called in response to a generic event. Defaults to no-op. // RateLimitingInterface is used to enqueue reconcile.Requests. - GenericFunc func(context.Context, event.TypedGenericEvent[T], workqueue.RateLimitingInterface) + GenericFunc func(context.Context, event.TypedGenericEvent[object], workqueue.TypedRateLimitingInterface[request]) } // Create implements EventHandler. -func (h TypedFuncs[T]) Create(ctx context.Context, e event.TypedCreateEvent[T], q workqueue.RateLimitingInterface) { +func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCreateEvent[object], q workqueue.TypedRateLimitingInterface[request]) { if h.CreateFunc != nil { h.CreateFunc(ctx, e, q) } } // Delete implements EventHandler. -func (h TypedFuncs[T]) Delete(ctx context.Context, e event.TypedDeleteEvent[T], q workqueue.RateLimitingInterface) { +func (h TypedFuncs[object, request]) Delete(ctx context.Context, e event.TypedDeleteEvent[object], q workqueue.TypedRateLimitingInterface[request]) { if h.DeleteFunc != nil { h.DeleteFunc(ctx, e, q) } } // Update implements EventHandler. -func (h TypedFuncs[T]) Update(ctx context.Context, e event.TypedUpdateEvent[T], q workqueue.RateLimitingInterface) { +func (h TypedFuncs[object, request]) Update(ctx context.Context, e event.TypedUpdateEvent[object], q workqueue.TypedRateLimitingInterface[request]) { if h.UpdateFunc != nil { h.UpdateFunc(ctx, e, q) } } // Generic implements EventHandler. -func (h TypedFuncs[T]) Generic(ctx context.Context, e event.TypedGenericEvent[T], q workqueue.RateLimitingInterface) { +func (h TypedFuncs[object, request]) Generic(ctx context.Context, e event.TypedGenericEvent[object], q workqueue.TypedRateLimitingInterface[request]) { if h.GenericFunc != nil { h.GenericFunc(ctx, e, q) } diff --git a/pkg/handler/eventhandler_test.go b/pkg/handler/eventhandler_test.go index 0df77c70d0..38b5040971 100644 --- a/pkg/handler/eventhandler_test.go +++ b/pkg/handler/eventhandler_test.go @@ -41,12 +41,12 @@ import ( var _ = Describe("Eventhandler", func() { var ctx = context.Background() - var q workqueue.RateLimitingInterface + var q workqueue.TypedRateLimitingInterface[reconcile.Request] var instance handler.EnqueueRequestForObject var pod *corev1.Pod var mapper meta.RESTMapper BeforeEach(func() { - q = &controllertest.Queue{Interface: workqueue.New()} + q = &controllertest.Queue{TypedInterface: workqueue.NewTyped[reconcile.Request]()} pod = &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{Namespace: "biz", Name: "baz"}, } @@ -66,10 +66,7 @@ var _ = Describe("Eventhandler", func() { instance.Create(ctx, evt, q) Expect(q.Len()).To(Equal(1)) - i, _ := q.Get() - Expect(i).NotTo(BeNil()) - req, ok := i.(reconcile.Request) - Expect(ok).To(BeTrue()) + req, _ := q.Get() Expect(req.NamespacedName).To(Equal(types.NamespacedName{Namespace: "biz", Name: "baz"})) }) @@ -80,10 +77,7 @@ var _ = Describe("Eventhandler", func() { instance.Delete(ctx, evt, q) Expect(q.Len()).To(Equal(1)) - i, _ := q.Get() - Expect(i).NotTo(BeNil()) - req, ok := i.(reconcile.Request) - Expect(ok).To(BeTrue()) + req, _ := q.Get() Expect(req.NamespacedName).To(Equal(types.NamespacedName{Namespace: "biz", Name: "baz"})) }) @@ -100,10 +94,7 @@ var _ = Describe("Eventhandler", func() { instance.Update(ctx, evt, q) Expect(q.Len()).To(Equal(1)) - i, _ := q.Get() - Expect(i).NotTo(BeNil()) - req, ok := i.(reconcile.Request) - Expect(ok).To(BeTrue()) + req, _ := q.Get() Expect(req.NamespacedName).To(Equal(types.NamespacedName{Namespace: "biz2", Name: "baz2"})) }) @@ -113,10 +104,7 @@ var _ = Describe("Eventhandler", func() { } instance.Generic(ctx, evt, q) Expect(q.Len()).To(Equal(1)) - i, _ := q.Get() - Expect(i).NotTo(BeNil()) - req, ok := i.(reconcile.Request) - Expect(ok).To(BeTrue()) + req, _ := q.Get() Expect(req.NamespacedName).To(Equal(types.NamespacedName{Namespace: "biz", Name: "baz"})) }) @@ -140,20 +128,14 @@ var _ = Describe("Eventhandler", func() { } instance.Update(ctx, evt, q) Expect(q.Len()).To(Equal(1)) - i, _ := q.Get() - Expect(i).NotTo(BeNil()) - req, ok := i.(reconcile.Request) - Expect(ok).To(BeTrue()) + req, _ := q.Get() Expect(req.NamespacedName).To(Equal(types.NamespacedName{Namespace: "biz2", Name: "baz2"})) evt.ObjectNew = nil evt.ObjectOld = pod instance.Update(ctx, evt, q) Expect(q.Len()).To(Equal(1)) - i, _ = q.Get() - Expect(i).NotTo(BeNil()) - req, ok = i.(reconcile.Request) - Expect(ok).To(BeTrue()) + req, _ = q.Get() Expect(req.NamespacedName).To(Equal(types.NamespacedName{Namespace: "biz", Name: "baz"})) }) @@ -677,19 +659,19 @@ var _ = Describe("Eventhandler", func() { Describe("Funcs", func() { failingFuncs := handler.Funcs{ - CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { + CreateFunc: func(context.Context, event.CreateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Did not expect CreateEvent to be called.") }, - DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Did not expect DeleteEvent to be called.") }, - UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Did not expect UpdateEvent to be called.") }, - GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { + GenericFunc: func(context.Context, event.GenericEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Did not expect GenericEvent to be called.") }, @@ -700,7 +682,7 @@ var _ = Describe("Eventhandler", func() { evt := event.CreateEvent{ Object: pod, } - instance.CreateFunc = func(ctx context.Context, evt2 event.CreateEvent, q2 workqueue.RateLimitingInterface) { + instance.CreateFunc = func(ctx context.Context, evt2 event.CreateEvent, q2 workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Expect(q2).To(Equal(q)) Expect(evt2).To(Equal(evt)) @@ -727,7 +709,7 @@ var _ = Describe("Eventhandler", func() { } instance := failingFuncs - instance.UpdateFunc = func(ctx context.Context, evt2 event.UpdateEvent, q2 workqueue.RateLimitingInterface) { + instance.UpdateFunc = func(ctx context.Context, evt2 event.UpdateEvent, q2 workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Expect(q2).To(Equal(q)) Expect(evt2).To(Equal(evt)) @@ -752,7 +734,7 @@ var _ = Describe("Eventhandler", func() { evt := event.DeleteEvent{ Object: pod, } - instance.DeleteFunc = func(ctx context.Context, evt2 event.DeleteEvent, q2 workqueue.RateLimitingInterface) { + instance.DeleteFunc = func(ctx context.Context, evt2 event.DeleteEvent, q2 workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Expect(q2).To(Equal(q)) Expect(evt2).To(Equal(evt)) @@ -774,7 +756,7 @@ var _ = Describe("Eventhandler", func() { evt := event.GenericEvent{ Object: pod, } - instance.GenericFunc = func(ctx context.Context, evt2 event.GenericEvent, q2 workqueue.RateLimitingInterface) { + instance.GenericFunc = func(ctx context.Context, evt2 event.GenericEvent, q2 workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Expect(q2).To(Equal(q)) Expect(evt2).To(Equal(evt)) diff --git a/pkg/handler/example_test.go b/pkg/handler/example_test.go index 9252e6bd42..ad87e4be63 100644 --- a/pkg/handler/example_test.go +++ b/pkg/handler/example_test.go @@ -93,26 +93,26 @@ func ExampleFuncs() { // controller is a controller.controller err := c.Watch( source.Kind(mgr.GetCache(), &corev1.Pod{}, - handler.TypedFuncs[*corev1.Pod]{ - CreateFunc: func(ctx context.Context, e event.TypedCreateEvent[*corev1.Pod], q workqueue.RateLimitingInterface) { + handler.TypedFuncs[*corev1.Pod, reconcile.Request]{ + CreateFunc: func(ctx context.Context, e event.TypedCreateEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: e.Object.Name, Namespace: e.Object.Namespace, }}) }, - UpdateFunc: func(ctx context.Context, e event.TypedUpdateEvent[*corev1.Pod], q workqueue.RateLimitingInterface) { + UpdateFunc: func(ctx context.Context, e event.TypedUpdateEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: e.ObjectNew.Name, Namespace: e.ObjectNew.Namespace, }}) }, - DeleteFunc: func(ctx context.Context, e event.TypedDeleteEvent[*corev1.Pod], q workqueue.RateLimitingInterface) { + DeleteFunc: func(ctx context.Context, e event.TypedDeleteEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: e.Object.Name, Namespace: e.Object.Namespace, }}) }, - GenericFunc: func(ctx context.Context, e event.TypedGenericEvent[*corev1.Pod], q workqueue.RateLimitingInterface) { + GenericFunc: func(ctx context.Context, e event.TypedGenericEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: e.Object.Name, Namespace: e.Object.Namespace, diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index da8552f647..81768f783e 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -31,13 +31,12 @@ import ( ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics" logf "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/ratelimiter" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) // Controller implements controller.Controller. -type Controller struct { +type Controller[T comparable] struct { // Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required. Name string @@ -47,19 +46,19 @@ type Controller struct { // Reconciler is a function that can be called at any time with the Name / Namespace of an object and // ensures that the state of the system matches the state specified in the object. // Defaults to the DefaultReconcileFunc. - Do reconcile.Reconciler + Do reconcile.TypedReconciler[T] // RateLimiter is used to limit how frequently requests may be queued into the work queue. - RateLimiter ratelimiter.RateLimiter + RateLimiter workqueue.TypedRateLimiter[T] // NewQueue constructs the queue for this controller once the controller is ready to start. // This is a func because the standard Kubernetes work queues start themselves immediately, which // leads to goroutine leaks if something calls controller.New repeatedly. - NewQueue func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface + NewQueue func(controllerName string, rateLimiter workqueue.TypedRateLimiter[T]) workqueue.TypedRateLimitingInterface[T] // Queue is an listeningQueue that listens for events from Informers and adds object keys to // the Queue for processing - Queue workqueue.RateLimitingInterface + Queue workqueue.TypedRateLimitingInterface[T] // mu is used to synchronize Controller setup mu sync.Mutex @@ -79,13 +78,13 @@ type Controller struct { CacheSyncTimeout time.Duration // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started. - startWatches []source.Source + startWatches []source.TypedSource[T] // LogConstructor is used to construct a logger to then log messages to users during reconciliation, // or for example when a watch is started. // Note: LogConstructor has to be able to handle nil requests as we are also using it // outside the context of a reconciliation. - LogConstructor func(request *reconcile.Request) logr.Logger + LogConstructor func(request *T) logr.Logger // RecoverPanic indicates whether the panic caused by reconcile should be recovered. RecoverPanic *bool @@ -95,7 +94,7 @@ type Controller struct { } // Reconcile implements reconcile.Reconciler. -func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) { +func (c *Controller[T]) Reconcile(ctx context.Context, req T) (_ reconcile.Result, err error) { defer func() { if r := recover(); r != nil { if c.RecoverPanic != nil && *c.RecoverPanic { @@ -115,7 +114,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ re } // Watch implements controller.Controller. -func (c *Controller) Watch(src source.Source) error { +func (c *Controller[T]) Watch(src source.TypedSource[T]) error { c.mu.Lock() defer c.mu.Unlock() @@ -132,7 +131,7 @@ func (c *Controller) Watch(src source.Source) error { } // NeedLeaderElection implements the manager.LeaderElectionRunnable interface. -func (c *Controller) NeedLeaderElection() bool { +func (c *Controller[T]) NeedLeaderElection() bool { if c.LeaderElected == nil { return true } @@ -140,7 +139,7 @@ func (c *Controller) NeedLeaderElection() bool { } // Start implements controller.Controller. -func (c *Controller) Start(ctx context.Context) error { +func (c *Controller[T]) Start(ctx context.Context) error { // use an IIFE to get proper lock handling // but lock outside to get proper handling of the queue shutdown c.mu.Lock() @@ -240,7 +239,7 @@ func (c *Controller) Start(ctx context.Context) error { // processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the reconcileHandler. -func (c *Controller) processNextWorkItem(ctx context.Context) bool { +func (c *Controller[T]) processNextWorkItem(ctx context.Context) bool { obj, shutdown := c.Queue.Get() if shutdown { // Stop working @@ -269,7 +268,7 @@ const ( labelSuccess = "success" ) -func (c *Controller) initMetrics() { +func (c *Controller[T]) initMetrics() { ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Set(0) ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Add(0) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Add(0) @@ -279,25 +278,13 @@ func (c *Controller) initMetrics() { ctrlmetrics.WorkerCount.WithLabelValues(c.Name).Set(float64(c.MaxConcurrentReconciles)) } -func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) { +func (c *Controller[T]) reconcileHandler(ctx context.Context, req T) { // Update metrics after processing each item reconcileStartTS := time.Now() defer func() { c.updateMetrics(time.Since(reconcileStartTS)) }() - // Make sure that the object is a valid request. - req, ok := obj.(reconcile.Request) - if !ok { - // As the item in the workqueue is actually invalid, we call - // Forget here else we'd go into a loop of attempting to - // process a work item that is invalid. - c.Queue.Forget(obj) - c.LogConstructor(nil).Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj) - // Return true, don't take a break - return - } - log := c.LogConstructor(&req) reconcileID := uuid.NewUUID() @@ -328,7 +315,7 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) { // along with a non-nil error. But this is intended as // We need to drive to stable reconcile loops before queuing due // to result.RequestAfter - c.Queue.Forget(obj) + c.Queue.Forget(req) c.Queue.AddAfter(req, result.RequeueAfter) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc() case result.Requeue: @@ -339,18 +326,18 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) { log.V(5).Info("Reconcile successful") // Finally, if no error occurs we Forget this item so it does not // get queued again until another change happens. - c.Queue.Forget(obj) + c.Queue.Forget(req) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc() } } // GetLogger returns this controller's logger. -func (c *Controller) GetLogger() logr.Logger { +func (c *Controller[T]) GetLogger() logr.Logger { return c.LogConstructor(nil) } // updateMetrics updates prometheus metrics within the controller. -func (c *Controller) updateMetrics(reconcileTime time.Duration) { +func (c *Controller[T]) updateMetrics(reconcileTime time.Duration) { ctrlmetrics.ReconcileTime.WithLabelValues(c.Name).Observe(reconcileTime.Seconds()) } diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 2e1842d907..12dc0e745e 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -42,14 +42,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics" "sigs.k8s.io/controller-runtime/pkg/internal/log" - "sigs.k8s.io/controller-runtime/pkg/ratelimiter" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) var _ = Describe("controller", func() { var fakeReconcile *fakeReconciler - var ctrl *Controller + var ctrl *Controller[reconcile.Request] var queue *controllertest.Queue var reconciled chan reconcile.Request var request = reconcile.Request{ @@ -63,12 +62,14 @@ var _ = Describe("controller", func() { results: make(chan fakeReconcileResultPair, 10 /* chosen by the completely scientific approach of guessing */), } queue = &controllertest.Queue{ - Interface: workqueue.New(), + TypedInterface: workqueue.NewTyped[reconcile.Request](), } - ctrl = &Controller{ + ctrl = &Controller[reconcile.Request]{ MaxConcurrentReconciles: 1, Do: fakeReconcile, - NewQueue: func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return queue }, + NewQueue: func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { + return queue + }, LogConstructor: func(_ *reconcile.Request) logr.Logger { return log.RuntimeLog.WithName("controller").WithName("test") }, @@ -126,7 +127,7 @@ var _ = Describe("controller", func() { Describe("Start", func() { It("should return an error if there is an error waiting for the informers", func() { f := false - ctrl.startWatches = []source.Source{ + ctrl.startWatches = []source.TypedSource[reconcile.Request]{ source.Kind(&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{}), } ctrl.Name = "foo" @@ -144,7 +145,7 @@ var _ = Describe("controller", func() { Expect(err).NotTo(HaveOccurred()) c = &cacheWithIndefinitelyBlockingGetInformer{c} - ctrl.startWatches = []source.Source{ + ctrl.startWatches = []source.TypedSource[reconcile.Request]{ source.Kind(c, &appsv1.Deployment{}, &handler.TypedEnqueueRequestForObject[*appsv1.Deployment]{}), } ctrl.Name = "testcontroller" @@ -161,7 +162,7 @@ var _ = Describe("controller", func() { c, err := cache.New(cfg, cache.Options{}) Expect(err).NotTo(HaveOccurred()) c = &cacheWithIndefinitelyBlockingGetInformer{c} - ctrl.startWatches = []source.Source{ + ctrl.startWatches = []source.TypedSource[reconcile.Request]{ &singnallingSourceWrapper{ SyncingSource: source.Kind[client.Object](c, &appsv1.Deployment{}, &handler.EnqueueRequestForObject{}), cacheSyncDone: sourceSynced, @@ -189,7 +190,7 @@ var _ = Describe("controller", func() { sourceSynced := make(chan struct{}) c, err := cache.New(cfg, cache.Options{}) Expect(err).NotTo(HaveOccurred()) - ctrl.startWatches = []source.Source{ + ctrl.startWatches = []source.TypedSource[reconcile.Request]{ &singnallingSourceWrapper{ SyncingSource: source.Kind[client.Object](c, &appsv1.Deployment{}, &handler.EnqueueRequestForObject{}), cacheSyncDone: sourceSynced, @@ -229,7 +230,7 @@ var _ = Describe("controller", func() { ins := source.Channel( ch, handler.Funcs{ - GenericFunc: func(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) { + GenericFunc: func(ctx context.Context, evt event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() close(processed) }, @@ -239,7 +240,7 @@ var _ = Describe("controller", func() { // send the event to the channel ch <- evt - ctrl.startWatches = []source.Source{ins} + ctrl.startWatches = []source.TypedSource[reconcile.Request]{ins} go func() { defer GinkgoRecover() @@ -253,7 +254,7 @@ var _ = Describe("controller", func() { defer cancel() ins := source.Channel[string](nil, nil) - ctrl.startWatches = []source.Source{ins} + ctrl.startWatches = []source.TypedSource[reconcile.Request]{ins} e := ctrl.Start(ctx) Expect(e).To(HaveOccurred()) @@ -262,7 +263,7 @@ var _ = Describe("controller", func() { It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() { started := false - src := source.Func(func(ctx context.Context, q workqueue.RateLimitingInterface) error { + src := source.Func[reconcile.Request](func(ctx context.Context, q workqueue.TypedRateLimitingInterface[reconcile.Request]) error { defer GinkgoRecover() Expect(q).To(Equal(ctrl.Queue)) @@ -280,8 +281,8 @@ var _ = Describe("controller", func() { It("should return an error if there is an error starting sources", func() { err := fmt.Errorf("Expected Error: could not start source") - src := source.Func(func(context.Context, - workqueue.RateLimitingInterface, + src := source.Func[reconcile.Request](func(context.Context, + workqueue.TypedRateLimitingInterface[reconcile.Request], ) error { defer GinkgoRecover() return err @@ -324,28 +325,6 @@ var _ = Describe("controller", func() { Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0)) }) - It("should continue to process additional queue items after the first", func() { - ctrl.Do = reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) { - defer GinkgoRecover() - Fail("Reconciler should not have been called") - return reconcile.Result{}, nil - }) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go func() { - defer GinkgoRecover() - Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) - }() - - By("adding two bad items to the queue") - queue.Add("foo/bar1") - queue.Add("foo/bar2") - - By("expecting both of them to be skipped") - Eventually(queue.Len).Should(Equal(0)) - Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0)) - }) - PIt("should forget an item if it is not a Request and continue processing items", func() { // TODO(community): write this test }) @@ -400,8 +379,10 @@ var _ = Describe("controller", func() { // TODO(directxman12): we should ensure that backoff occurrs with error requeue It("should not reset backoff until there's a non-error result", func() { - dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)} - ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq } + dq := &DelegatingQueue{TypedRateLimitingInterface: ctrl.NewQueue("controller1", nil)} + ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { + return dq + } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -436,8 +417,10 @@ var _ = Describe("controller", func() { }) It("should requeue a Request with rate limiting if the Result sets Requeue:true and continue processing items", func() { - dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)} - ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq } + dq := &DelegatingQueue{TypedRateLimitingInterface: ctrl.NewQueue("controller1", nil)} + ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { + return dq + } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -466,8 +449,10 @@ var _ = Describe("controller", func() { }) It("should requeue a Request after a duration (but not rate-limitted) if the Result sets RequeueAfter (regardless of Requeue)", func() { - dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)} - ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq } + dq := &DelegatingQueue{TypedRateLimitingInterface: ctrl.NewQueue("controller1", nil)} + ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { + return dq + } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -496,8 +481,10 @@ var _ = Describe("controller", func() { }) It("should perform error behavior if error is not nil, regardless of RequeueAfter", func() { - dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)} - ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq } + dq := &DelegatingQueue{TypedRateLimitingInterface: ctrl.NewQueue("controller1", nil)} + ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { + return dq + } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -764,7 +751,7 @@ var _ = Describe("ReconcileIDFromContext function", func() { }) type DelegatingQueue struct { - workqueue.RateLimitingInterface + workqueue.TypedRateLimitingInterface[reconcile.Request] mu sync.Mutex countAddRateLimited int @@ -772,36 +759,36 @@ type DelegatingQueue struct { countAddAfter int } -func (q *DelegatingQueue) AddRateLimited(item interface{}) { +func (q *DelegatingQueue) AddRateLimited(item reconcile.Request) { q.mu.Lock() defer q.mu.Unlock() q.countAddRateLimited++ - q.RateLimitingInterface.AddRateLimited(item) + q.TypedRateLimitingInterface.AddRateLimited(item) } -func (q *DelegatingQueue) AddAfter(item interface{}, d time.Duration) { +func (q *DelegatingQueue) AddAfter(item reconcile.Request, d time.Duration) { q.mu.Lock() defer q.mu.Unlock() q.countAddAfter++ - q.RateLimitingInterface.AddAfter(item, d) + q.TypedRateLimitingInterface.AddAfter(item, d) } -func (q *DelegatingQueue) Add(item interface{}) { +func (q *DelegatingQueue) Add(item reconcile.Request) { q.mu.Lock() defer q.mu.Unlock() q.countAdd++ - q.RateLimitingInterface.Add(item) + q.TypedRateLimitingInterface.Add(item) } -func (q *DelegatingQueue) Forget(item interface{}) { +func (q *DelegatingQueue) Forget(item reconcile.Request) { q.mu.Lock() defer q.mu.Unlock() q.countAdd-- - q.RateLimitingInterface.Forget(item) + q.TypedRateLimitingInterface.Forget(item) } type countInfo struct { diff --git a/pkg/internal/source/event_handler.go b/pkg/internal/source/event_handler.go index 8651ea453e..38432a1a79 100644 --- a/pkg/internal/source/event_handler.go +++ b/pkg/internal/source/event_handler.go @@ -33,8 +33,12 @@ import ( var log = logf.RuntimeLog.WithName("source").WithName("EventHandler") // NewEventHandler creates a new EventHandler. -func NewEventHandler[T client.Object](ctx context.Context, queue workqueue.RateLimitingInterface, handler handler.TypedEventHandler[T], predicates []predicate.TypedPredicate[T]) *EventHandler[T] { - return &EventHandler[T]{ +func NewEventHandler[object client.Object, request comparable]( + ctx context.Context, + queue workqueue.TypedRateLimitingInterface[request], + handler handler.TypedEventHandler[object, request], + predicates []predicate.TypedPredicate[object]) *EventHandler[object, request] { + return &EventHandler[object, request]{ ctx: ctx, handler: handler, queue: queue, @@ -43,19 +47,19 @@ func NewEventHandler[T client.Object](ctx context.Context, queue workqueue.RateL } // EventHandler adapts a handler.EventHandler interface to a cache.ResourceEventHandler interface. -type EventHandler[T client.Object] struct { +type EventHandler[object client.Object, request comparable] struct { // ctx stores the context that created the event handler // that is used to propagate cancellation signals to each handler function. ctx context.Context - handler handler.TypedEventHandler[T] - queue workqueue.RateLimitingInterface - predicates []predicate.TypedPredicate[T] + handler handler.TypedEventHandler[object, request] + queue workqueue.TypedRateLimitingInterface[request] + predicates []predicate.TypedPredicate[object] } // HandlerFuncs converts EventHandler to a ResourceEventHandlerFuncs // TODO: switch to ResourceEventHandlerDetailedFuncs with client-go 1.27 -func (e *EventHandler[T]) HandlerFuncs() cache.ResourceEventHandlerFuncs { +func (e *EventHandler[object, request]) HandlerFuncs() cache.ResourceEventHandlerFuncs { return cache.ResourceEventHandlerFuncs{ AddFunc: e.OnAdd, UpdateFunc: e.OnUpdate, @@ -64,11 +68,11 @@ func (e *EventHandler[T]) HandlerFuncs() cache.ResourceEventHandlerFuncs { } // OnAdd creates CreateEvent and calls Create on EventHandler. -func (e *EventHandler[T]) OnAdd(obj interface{}) { - c := event.TypedCreateEvent[T]{} +func (e *EventHandler[object, request]) OnAdd(obj interface{}) { + c := event.TypedCreateEvent[object]{} // Pull Object out of the object - if o, ok := obj.(T); ok { + if o, ok := obj.(object); ok { c.Object = o } else { log.Error(nil, "OnAdd missing Object", @@ -89,10 +93,10 @@ func (e *EventHandler[T]) OnAdd(obj interface{}) { } // OnUpdate creates UpdateEvent and calls Update on EventHandler. -func (e *EventHandler[T]) OnUpdate(oldObj, newObj interface{}) { - u := event.TypedUpdateEvent[T]{} +func (e *EventHandler[object, request]) OnUpdate(oldObj, newObj interface{}) { + u := event.TypedUpdateEvent[object]{} - if o, ok := oldObj.(T); ok { + if o, ok := oldObj.(object); ok { u.ObjectOld = o } else { log.Error(nil, "OnUpdate missing ObjectOld", @@ -101,7 +105,7 @@ func (e *EventHandler[T]) OnUpdate(oldObj, newObj interface{}) { } // Pull Object out of the object - if o, ok := newObj.(T); ok { + if o, ok := newObj.(object); ok { u.ObjectNew = o } else { log.Error(nil, "OnUpdate missing ObjectNew", @@ -122,8 +126,8 @@ func (e *EventHandler[T]) OnUpdate(oldObj, newObj interface{}) { } // OnDelete creates DeleteEvent and calls Delete on EventHandler. -func (e *EventHandler[T]) OnDelete(obj interface{}) { - d := event.TypedDeleteEvent[T]{} +func (e *EventHandler[object, request]) OnDelete(obj interface{}) { + d := event.TypedDeleteEvent[object]{} // Deal with tombstone events by pulling the object out. Tombstone events wrap the object in a // DeleteFinalStateUnknown struct, so the object needs to be pulled out. @@ -149,7 +153,7 @@ func (e *EventHandler[T]) OnDelete(obj interface{}) { } // Pull Object out of the object - if o, ok := obj.(T); ok { + if o, ok := obj.(object); ok { d.Object = o } else { log.Error(nil, "OnDelete missing Object", diff --git a/pkg/internal/source/internal_test.go b/pkg/internal/source/internal_test.go index e25315ffcc..4de8628ebf 100644 --- a/pkg/internal/source/internal_test.go +++ b/pkg/internal/source/internal_test.go @@ -27,6 +27,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" internal "sigs.k8s.io/controller-runtime/pkg/internal/source" + "sigs.k8s.io/controller-runtime/pkg/reconcile" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -38,40 +39,40 @@ import ( var _ = Describe("Internal", func() { var ctx = context.Background() - var instance *internal.EventHandler[client.Object] + var instance *internal.EventHandler[client.Object, reconcile.Request] var funcs, setfuncs *handler.Funcs var set bool BeforeEach(func() { funcs = &handler.Funcs{ - CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { + CreateFunc: func(context.Context, event.CreateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Did not expect CreateEvent to be called.") }, - DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Did not expect DeleteEvent to be called.") }, - UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Did not expect UpdateEvent to be called.") }, - GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { + GenericFunc: func(context.Context, event.GenericEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Did not expect GenericEvent to be called.") }, } setfuncs = &handler.Funcs{ - CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { + CreateFunc: func(context.Context, event.CreateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { set = true }, - DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { set = true }, - UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { set = true }, - GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { + GenericFunc: func(context.Context, event.GenericEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { set = true }, } @@ -92,7 +93,7 @@ var _ = Describe("Internal", func() { }) It("should create a CreateEvent", func() { - funcs.CreateFunc = func(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) { + funcs.CreateFunc = func(ctx context.Context, evt event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Expect(evt.Object).To(Equal(pod)) } @@ -148,7 +149,7 @@ var _ = Describe("Internal", func() { }) It("should create an UpdateEvent", func() { - funcs.UpdateFunc = func(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) { + funcs.UpdateFunc = func(ctx context.Context, evt event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Expect(evt.ObjectOld).To(Equal(pod)) Expect(evt.ObjectNew).To(Equal(newPod)) @@ -207,7 +208,7 @@ var _ = Describe("Internal", func() { }) It("should create a DeleteEvent", func() { - funcs.DeleteFunc = func(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) { + funcs.DeleteFunc = func(ctx context.Context, evt event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Expect(evt.Object).To(Equal(pod)) } @@ -263,11 +264,10 @@ var _ = Describe("Internal", func() { }) It("should create a DeleteEvent from a tombstone", func() { - tombstone := cache.DeletedFinalStateUnknown{ Obj: pod, } - funcs.DeleteFunc = func(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) { + funcs.DeleteFunc = func(ctx context.Context, evt event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Expect(evt.Object).To(Equal(pod)) Expect(evt.DeleteStateUnknown).Should(BeTrue()) @@ -289,7 +289,7 @@ var _ = Describe("Internal", func() { Describe("Kind", func() { It("should return kind source type", func() { - kind := internal.Kind[*corev1.Pod]{ + kind := internal.Kind[*corev1.Pod, reconcile.Request]{ Type: &corev1.Pod{}, } Expect(kind.String()).Should(Equal("kind source: *v1.Pod")) diff --git a/pkg/internal/source/kind.go b/pkg/internal/source/kind.go index 3a8db96e3c..4999edc432 100644 --- a/pkg/internal/source/kind.go +++ b/pkg/internal/source/kind.go @@ -19,16 +19,16 @@ import ( ) // Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create). -type Kind[T client.Object] struct { +type Kind[object client.Object, request comparable] struct { // Type is the type of object to watch. e.g. &v1.Pod{} - Type T + Type object // Cache used to watch APIs Cache cache.Cache - Handler handler.TypedEventHandler[T] + Handler handler.TypedEventHandler[object, request] - Predicates []predicate.TypedPredicate[T] + Predicates []predicate.TypedPredicate[object] // startedErr may contain an error if one was encountered during startup. If its closed and does not // contain an error, startup and syncing finished. @@ -38,7 +38,7 @@ type Kind[T client.Object] struct { // Start is internal and should be called only by the Controller to register an EventHandler with the Informer // to enqueue reconcile.Requests. -func (ks *Kind[T]) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error { +func (ks *Kind[object, request]) Start(ctx context.Context, queue workqueue.TypedRateLimitingInterface[request]) error { if isNil(ks.Type) { return fmt.Errorf("must create Kind with a non-nil object") } @@ -102,7 +102,7 @@ func (ks *Kind[T]) Start(ctx context.Context, queue workqueue.RateLimitingInterf return nil } -func (ks *Kind[T]) String() string { +func (ks *Kind[object, request]) String() string { if !isNil(ks.Type) { return fmt.Sprintf("kind source: %T", ks.Type) } @@ -111,7 +111,7 @@ func (ks *Kind[T]) String() string { // WaitForSync implements SyncingSource to allow controllers to wait with starting // workers until the cache is synced. -func (ks *Kind[T]) WaitForSync(ctx context.Context) error { +func (ks *Kind[object, request]) WaitForSync(ctx context.Context) error { select { case err := <-ks.startedErr: return err diff --git a/pkg/ratelimiter/doc.go b/pkg/ratelimiter/doc.go deleted file mode 100644 index a01d603fe5..0000000000 --- a/pkg/ratelimiter/doc.go +++ /dev/null @@ -1,22 +0,0 @@ -/* -Copyright 2020 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -/* -Package ratelimiter defines rate limiters used by Controllers to limit how frequently requests may be queued. - -Typical rate limiters that can be used are implemented in client-go's workqueue package. -*/ -package ratelimiter diff --git a/pkg/ratelimiter/ratelimiter.go b/pkg/ratelimiter/ratelimiter.go deleted file mode 100644 index 565a3a227f..0000000000 --- a/pkg/ratelimiter/ratelimiter.go +++ /dev/null @@ -1,30 +0,0 @@ -/* -Copyright 2020 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package ratelimiter - -import "time" - -// RateLimiter is an identical interface of client-go workqueue RateLimiter. -type RateLimiter interface { - // When gets an item and gets to decide how long that item should wait - When(item interface{}) time.Duration - // Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing - // or for success, we'll stop tracking it - Forget(item interface{}) - // NumRequeues returns back how many failures the item has had - NumRequeues(item interface{}) int -} diff --git a/pkg/reconcile/reconcile.go b/pkg/reconcile/reconcile.go index f1cce87c85..8ebfedc40a 100644 --- a/pkg/reconcile/reconcile.go +++ b/pkg/reconcile/reconcile.go @@ -89,7 +89,14 @@ driven by actual cluster state read from the apiserver or a local cache. For example if responding to a Pod Delete Event, the Request won't contain that a Pod was deleted, instead the reconcile function observes this when reading the cluster state and seeing the Pod as missing. */ -type Reconciler interface { +type Reconciler = TypedReconciler[Request] + +// TypedReconciler implements an API for a specific Resource by Creating, Updating or Deleting Kubernetes +// objects, or by making changes to systems external to the cluster (e.g. cloudproviders, github, etc). +// +// The request type is what event handlers put into the workqueue. The workqueue then de-duplicates identical +// requests. +type TypedReconciler[request comparable] interface { // Reconcile performs a full reconciliation for the object referred to by the Request. // // If the returned error is non-nil, the Result is ignored and the request will be @@ -101,16 +108,19 @@ type Reconciler interface { // // If the error is nil and result.RequeueAfter is zero and result.Requeue is true, the request // will be requeued using exponential backoff. - Reconcile(context.Context, Request) (Result, error) + Reconcile(context.Context, request) (Result, error) } // Func is a function that implements the reconcile interface. -type Func func(context.Context, Request) (Result, error) +type Func = TypedFunc[Request] + +// TypedFunc is a function that implements the reconcile interface. +type TypedFunc[T comparable] func(context.Context, T) (Result, error) var _ Reconciler = Func(nil) // Reconcile implements Reconciler. -func (r Func) Reconcile(ctx context.Context, o Request) (Result, error) { return r(ctx, o) } +func (r TypedFunc[T]) Reconcile(ctx context.Context, o T) (Result, error) { return r(ctx, o) } // ObjectReconciler is a specialized version of Reconciler that acts on instances of client.Object. Each reconciliation // event gets the associated object from Kubernetes before passing it to Reconcile. An ObjectReconciler can be used in diff --git a/pkg/source/source.go b/pkg/source/source.go index 26e53022bf..d8091ae5ca 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -28,6 +28,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" internal "sigs.k8s.io/controller-runtime/pkg/internal/source" + "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -41,22 +42,51 @@ import ( // * Use Channel for events originating outside the cluster (e.g. GitHub Webhook callback, Polling external urls). // // Users may build their own Source implementations. -type Source interface { +type Source TypedSource[reconcile.Request] + +// TypedSource is a generic source of events (e.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc) +// which should be processed by event.EventHandlers to enqueue reconcile.Requests. +// +// * Use Kind for events originating in the cluster (e.g. Pod Create, Pod Update, Deployment Update). +// +// * Use Channel for events originating outside the cluster (e.g. GitHub Webhook callback, Polling external urls). +// +// Users may build their own Source implementations. +type TypedSource[T comparable] interface { // Start is internal and should be called only by the Controller to register an EventHandler with the Informer // to enqueue reconcile.Requests. - Start(context.Context, workqueue.RateLimitingInterface) error + Start(context.Context, workqueue.TypedRateLimitingInterface[T]) error } // SyncingSource is a source that needs syncing prior to being usable. The controller // will call its WaitForSync prior to starting workers. -type SyncingSource interface { - Source +type SyncingSource = TypedSyncingSource[reconcile.Request] + +// TypedSyncingSource is a source that needs syncing prior to being usable. The controller +// will call its WaitForSync prior to starting workers. +type TypedSyncingSource[T comparable] interface { + TypedSource[T] WaitForSync(ctx context.Context) error } // Kind creates a KindSource with the given cache provider. -func Kind[T client.Object](cache cache.Cache, object T, handler handler.TypedEventHandler[T], predicates ...predicate.TypedPredicate[T]) SyncingSource { - return &internal.Kind[T]{ +func Kind[T client.Object]( + cache cache.Cache, + object T, + handler handler.TypedEventHandler[T, reconcile.Request], + predicates ...predicate.TypedPredicate[T], +) SyncingSource { + return TypedKind(cache, object, handler, predicates...) +} + +// TypedKind creates a KindSource with the given cache provider. +func TypedKind[objectType client.Object, request comparable]( + cache cache.Cache, + object objectType, + handler handler.TypedEventHandler[objectType, request], + predicates ...predicate.TypedPredicate[objectType], +) TypedSyncingSource[request] { + return &internal.Kind[objectType, request]{ Type: object, Cache: cache, Handler: handler, @@ -64,22 +94,22 @@ func Kind[T client.Object](cache cache.Cache, object T, handler handler.TypedEve } } -var _ Source = &channel[string]{} +var _ Source = &channel[string, reconcile.Request]{} // ChannelOpt allows to configure a source.Channel. -type ChannelOpt[T any] func(*channel[T]) +type ChannelOpt[object any, request comparable] func(*channel[object, request]) // WithPredicates adds the configured predicates to a source.Channel. -func WithPredicates[T any](p ...predicate.TypedPredicate[T]) ChannelOpt[T] { - return func(c *channel[T]) { +func WithPredicates[object any, request comparable](p ...predicate.TypedPredicate[object]) ChannelOpt[object, request] { + return func(c *channel[object, request]) { c.predicates = append(c.predicates, p...) } } // WithBufferSize configures the buffer size for a source.Channel. By // default, the buffer size is 1024. -func WithBufferSize[T any](bufferSize int) ChannelOpt[T] { - return func(c *channel[T]) { +func WithBufferSize[object any, request comparable](bufferSize int) ChannelOpt[object, request] { + return func(c *channel[object, request]) { c.bufferSize = &bufferSize } } @@ -87,8 +117,23 @@ func WithBufferSize[T any](bufferSize int) ChannelOpt[T] { // Channel is used to provide a source of events originating outside the cluster // (e.g. GitHub Webhook callback). Channel requires the user to wire the external // source (e.g. http handler) to write GenericEvents to the underlying channel. -func Channel[T any](source <-chan event.TypedGenericEvent[T], handler handler.TypedEventHandler[T], opts ...ChannelOpt[T]) Source { - c := &channel[T]{ +func Channel[object any]( + source <-chan event.TypedGenericEvent[object], + handler handler.TypedEventHandler[object, reconcile.Request], + opts ...ChannelOpt[object, reconcile.Request], +) Source { + return TypedChannel[object, reconcile.Request](source, handler, opts...) +} + +// TypedChannel is used to provide a source of events originating outside the cluster +// (e.g. GitHub Webhook callback). Channel requires the user to wire the external +// source (e.g. http handler) to write GenericEvents to the underlying channel. +func TypedChannel[object any, request comparable]( + source <-chan event.TypedGenericEvent[object], + handler handler.TypedEventHandler[object, request], + opts ...ChannelOpt[object, request], +) TypedSource[request] { + c := &channel[object, request]{ source: source, handler: handler, } @@ -99,34 +144,34 @@ func Channel[T any](source <-chan event.TypedGenericEvent[T], handler handler.Ty return c } -type channel[T any] struct { +type channel[object any, request comparable] struct { // once ensures the event distribution goroutine will be performed only once once sync.Once // source is the source channel to fetch GenericEvents - source <-chan event.TypedGenericEvent[T] + source <-chan event.TypedGenericEvent[object] - handler handler.TypedEventHandler[T] + handler handler.TypedEventHandler[object, request] - predicates []predicate.TypedPredicate[T] + predicates []predicate.TypedPredicate[object] bufferSize *int // dest is the destination channels of the added event handlers - dest []chan event.TypedGenericEvent[T] + dest []chan event.TypedGenericEvent[object] // destLock is to ensure the destination channels are safely added/removed destLock sync.Mutex } -func (cs *channel[T]) String() string { +func (cs *channel[object, request]) String() string { return fmt.Sprintf("channel source: %p", cs) } // Start implements Source and should only be called by the Controller. -func (cs *channel[T]) Start( +func (cs *channel[object, request]) Start( ctx context.Context, - queue workqueue.RateLimitingInterface, + queue workqueue.TypedRateLimitingInterface[request], ) error { // Source should have been specified by the user. if cs.source == nil { @@ -140,7 +185,7 @@ func (cs *channel[T]) Start( cs.bufferSize = ptr.To(1024) } - dst := make(chan event.TypedGenericEvent[T], *cs.bufferSize) + dst := make(chan event.TypedGenericEvent[object], *cs.bufferSize) cs.destLock.Lock() cs.dest = append(cs.dest, dst) @@ -174,7 +219,7 @@ func (cs *channel[T]) Start( return nil } -func (cs *channel[T]) doStop() { +func (cs *channel[object, request]) doStop() { cs.destLock.Lock() defer cs.destLock.Unlock() @@ -183,7 +228,7 @@ func (cs *channel[T]) doStop() { } } -func (cs *channel[T]) distribute(evt event.TypedGenericEvent[T]) { +func (cs *channel[object, request]) distribute(evt event.TypedGenericEvent[object]) { cs.destLock.Lock() defer cs.destLock.Unlock() @@ -197,7 +242,7 @@ func (cs *channel[T]) distribute(evt event.TypedGenericEvent[T]) { } } -func (cs *channel[T]) syncLoop(ctx context.Context) { +func (cs *channel[object, request]) syncLoop(ctx context.Context) { for { select { case <-ctx.Done(): @@ -228,7 +273,7 @@ var _ Source = &Informer{} // Start is internal and should be called only by the Controller to register an EventHandler with the Informer // to enqueue reconcile.Requests. -func (is *Informer) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error { +func (is *Informer) Start(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error { // Informer should have been specified by the user. if is.Informer == nil { return fmt.Errorf("must specify Informer.Informer") @@ -248,16 +293,16 @@ func (is *Informer) String() string { return fmt.Sprintf("informer source: %p", is.Informer) } -var _ Source = Func(nil) +var _ Source = Func[reconcile.Request](nil) // Func is a function that implements Source. -type Func func(context.Context, workqueue.RateLimitingInterface) error +type Func[request comparable] func(context.Context, workqueue.TypedRateLimitingInterface[request]) error // Start implements Source. -func (f Func) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error { +func (f Func[request]) Start(ctx context.Context, queue workqueue.TypedRateLimitingInterface[request]) error { return f(ctx, queue) } -func (f Func) String() string { +func (f Func[request]) String() string { return fmt.Sprintf("func source: %p", f) } diff --git a/pkg/source/source_integration_test.go b/pkg/source/source_integration_test.go index f6b2948874..504a671c8a 100644 --- a/pkg/source/source_integration_test.go +++ b/pkg/source/source_integration_test.go @@ -24,6 +24,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" . "github.com/onsi/ginkgo/v2" @@ -39,7 +40,7 @@ import ( var _ = Describe("Source", func() { var instance1, instance2 source.Source var obj client.Object - var q workqueue.RateLimitingInterface + var q workqueue.TypedRateLimitingInterface[reconcile.Request] var c1, c2 chan interface{} var ns string count := 0 @@ -53,7 +54,11 @@ var _ = Describe("Source", func() { }, metav1.CreateOptions{}) Expect(err).NotTo(HaveOccurred()) - q = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") + q = workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[reconcile.Request](), + workqueue.TypedRateLimitingQueueConfig[reconcile.Request]{ + Name: "test", + }) c1 = make(chan interface{}) c2 = make(chan interface{}) }) @@ -98,17 +103,17 @@ var _ = Describe("Source", func() { // Create an event handler to verify the events newHandler := func(c chan interface{}) handler.Funcs { return handler.Funcs{ - CreateFunc: func(ctx context.Context, evt event.CreateEvent, rli workqueue.RateLimitingInterface) { + CreateFunc: func(ctx context.Context, evt event.CreateEvent, rli workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Expect(rli).To(Equal(q)) c <- evt }, - UpdateFunc: func(ctx context.Context, evt event.UpdateEvent, rli workqueue.RateLimitingInterface) { + UpdateFunc: func(ctx context.Context, evt event.UpdateEvent, rli workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Expect(rli).To(Equal(q)) c <- evt }, - DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, rli workqueue.RateLimitingInterface) { + DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, rli workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Expect(rli).To(Equal(q)) c <- evt @@ -237,11 +242,15 @@ var _ = Describe("Source", func() { It("should provide a ReplicaSet CreateEvent", func() { c := make(chan struct{}) - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") + q := workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[reconcile.Request](), + workqueue.TypedRateLimitingQueueConfig[reconcile.Request]{ + Name: "test", + }) instance := &source.Informer{ Informer: depInformer, Handler: handler.Funcs{ - CreateFunc: func(ctx context.Context, evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { + CreateFunc: func(ctx context.Context, evt event.CreateEvent, q2 workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() var err error rs, err := clientset.AppsV1().ReplicaSets("default").Get(ctx, rs.Name, metav1.GetOptions{}) @@ -251,15 +260,15 @@ var _ = Describe("Source", func() { Expect(evt.Object).To(Equal(rs)) close(c) }, - UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected UpdateEvent") }, - DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected DeleteEvent") }, - GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { + GenericFunc: func(context.Context, event.GenericEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected GenericEvent") }, @@ -281,13 +290,17 @@ var _ = Describe("Source", func() { rs2 := rs.DeepCopy() rs2.SetLabels(map[string]string{"biz": "baz"}) - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") + q := workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[reconcile.Request](), + workqueue.TypedRateLimitingQueueConfig[reconcile.Request]{ + Name: "test", + }) instance := &source.Informer{ Informer: depInformer, Handler: handler.Funcs{ - CreateFunc: func(ctx context.Context, evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { + CreateFunc: func(ctx context.Context, evt event.CreateEvent, q2 workqueue.TypedRateLimitingInterface[reconcile.Request]) { }, - UpdateFunc: func(ctx context.Context, evt event.UpdateEvent, q2 workqueue.RateLimitingInterface) { + UpdateFunc: func(ctx context.Context, evt event.UpdateEvent, q2 workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() var err error rs2, err := clientset.AppsV1().ReplicaSets("default").Get(ctx, rs.Name, metav1.GetOptions{}) @@ -300,11 +313,11 @@ var _ = Describe("Source", func() { close(c) }, - DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected DeleteEvent") }, - GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { + GenericFunc: func(context.Context, event.GenericEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected GenericEvent") }, @@ -321,21 +334,25 @@ var _ = Describe("Source", func() { It("should provide a ReplicaSet DeletedEvent", func() { c := make(chan struct{}) - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") + q := workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[reconcile.Request](), + workqueue.TypedRateLimitingQueueConfig[reconcile.Request]{ + Name: "test", + }) instance := &source.Informer{ Informer: depInformer, Handler: handler.Funcs{ - CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { + CreateFunc: func(context.Context, event.CreateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { }, - UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { }, - DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, q2 workqueue.RateLimitingInterface) { + DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, q2 workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Expect(q2).To(Equal(q)) Expect(evt.Object.GetName()).To(Equal(rs.Name)) close(c) }, - GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { + GenericFunc: func(context.Context, event.GenericEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected GenericEvent") }, diff --git a/pkg/source/source_test.go b/pkg/source/source_test.go index d30d5ae5c7..6fae4902a5 100644 --- a/pkg/source/source_test.go +++ b/pkg/source/source_test.go @@ -29,6 +29,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" corev1 "k8s.io/api/core/v1" @@ -65,23 +66,27 @@ var _ = Describe("Source", func() { }, } - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := source.Kind(ic, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod]{ - CreateFunc: func(ctx context.Context, evt event.TypedCreateEvent[*corev1.Pod], q2 workqueue.RateLimitingInterface) { + q := workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[reconcile.Request](), + workqueue.TypedRateLimitingQueueConfig[reconcile.Request]{ + Name: "test", + }) + instance := source.Kind(ic, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod, reconcile.Request]{ + CreateFunc: func(ctx context.Context, evt event.TypedCreateEvent[*corev1.Pod], q2 workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Expect(q2).To(Equal(q)) Expect(evt.Object).To(Equal(p)) close(c) }, - UpdateFunc: func(context.Context, event.TypedUpdateEvent[*corev1.Pod], workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.TypedUpdateEvent[*corev1.Pod], workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected UpdateEvent") }, - DeleteFunc: func(context.Context, event.TypedDeleteEvent[*corev1.Pod], workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.TypedDeleteEvent[*corev1.Pod], workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected DeleteEvent") }, - GenericFunc: func(context.Context, event.TypedGenericEvent[*corev1.Pod], workqueue.RateLimitingInterface) { + GenericFunc: func(context.Context, event.TypedGenericEvent[*corev1.Pod], workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected GenericEvent") }, @@ -102,13 +107,17 @@ var _ = Describe("Source", func() { p2.SetLabels(map[string]string{"biz": "baz"}) ic := &informertest.FakeInformers{} - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := source.Kind(ic, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod]{ - CreateFunc: func(ctx context.Context, evt event.TypedCreateEvent[*corev1.Pod], q2 workqueue.RateLimitingInterface) { + q := workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[reconcile.Request](), + workqueue.TypedRateLimitingQueueConfig[reconcile.Request]{ + Name: "test", + }) + instance := source.Kind(ic, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod, reconcile.Request]{ + CreateFunc: func(ctx context.Context, evt event.TypedCreateEvent[*corev1.Pod], q2 workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected CreateEvent") }, - UpdateFunc: func(ctx context.Context, evt event.TypedUpdateEvent[*corev1.Pod], q2 workqueue.RateLimitingInterface) { + UpdateFunc: func(ctx context.Context, evt event.TypedUpdateEvent[*corev1.Pod], q2 workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Expect(q2).To(BeIdenticalTo(q)) Expect(evt.ObjectOld).To(Equal(p)) @@ -117,11 +126,11 @@ var _ = Describe("Source", func() { close(c) }, - DeleteFunc: func(context.Context, event.TypedDeleteEvent[*corev1.Pod], workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.TypedDeleteEvent[*corev1.Pod], workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected DeleteEvent") }, - GenericFunc: func(context.Context, event.TypedGenericEvent[*corev1.Pod], workqueue.RateLimitingInterface) { + GenericFunc: func(context.Context, event.TypedGenericEvent[*corev1.Pod], workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected GenericEvent") }, @@ -147,23 +156,27 @@ var _ = Describe("Source", func() { }, } - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := source.Kind(ic, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod]{ - CreateFunc: func(context.Context, event.TypedCreateEvent[*corev1.Pod], workqueue.RateLimitingInterface) { + q := workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[reconcile.Request](), + workqueue.TypedRateLimitingQueueConfig[reconcile.Request]{ + Name: "test", + }) + instance := source.Kind(ic, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod, reconcile.Request]{ + CreateFunc: func(context.Context, event.TypedCreateEvent[*corev1.Pod], workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected DeleteEvent") }, - UpdateFunc: func(context.Context, event.TypedUpdateEvent[*corev1.Pod], workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.TypedUpdateEvent[*corev1.Pod], workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected UpdateEvent") }, - DeleteFunc: func(ctx context.Context, evt event.TypedDeleteEvent[*corev1.Pod], q2 workqueue.RateLimitingInterface) { + DeleteFunc: func(ctx context.Context, evt event.TypedDeleteEvent[*corev1.Pod], q2 workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Expect(q2).To(BeIdenticalTo(q)) Expect(evt.Object).To(Equal(p)) close(c) }, - GenericFunc: func(context.Context, event.TypedGenericEvent[*corev1.Pod], workqueue.RateLimitingInterface) { + GenericFunc: func(context.Context, event.TypedGenericEvent[*corev1.Pod], workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected GenericEvent") }, @@ -213,12 +226,16 @@ var _ = Describe("Source", func() { Context("for a Kind not in the cache", func() { It("should return an error when WaitForSync is called", func() { ic.Error = fmt.Errorf("test error") - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") + q := workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[reconcile.Request](), + workqueue.TypedRateLimitingQueueConfig[reconcile.Request]{ + Name: "test", + }) ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() - instance := source.Kind(ic, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod]{}) + instance := source.Kind(ic, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod, reconcile.Request]{}) err := instance.Start(ctx, q) Expect(err).NotTo(HaveOccurred()) Eventually(instance.WaitForSync).WithArguments(context.Background()).Should(HaveOccurred()) @@ -239,9 +256,9 @@ var _ = Describe("Source", func() { Describe("Func", func() { It("should be called from Start", func() { run := false - instance := source.Func(func( + instance := source.Func[reconcile.Request](func( context.Context, - workqueue.RateLimitingInterface) error { + workqueue.TypedRateLimitingInterface[reconcile.Request]) error { run = true return nil }) @@ -249,9 +266,9 @@ var _ = Describe("Source", func() { Expect(run).To(BeTrue()) expected := fmt.Errorf("expected error: Func") - instance = source.Func(func( + instance = source.Func[reconcile.Request](func( context.Context, - workqueue.RateLimitingInterface) error { + workqueue.TypedRateLimitingInterface[reconcile.Request]) error { return expected }) Expect(instance.Start(ctx, nil)).To(Equal(expected)) @@ -293,23 +310,27 @@ var _ = Describe("Source", func() { }, } - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") + q := workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[reconcile.Request](), + workqueue.TypedRateLimitingQueueConfig[reconcile.Request]{ + Name: "test", + }) instance := source.Channel( ch, handler.Funcs{ - CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { + CreateFunc: func(context.Context, event.CreateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected CreateEvent") }, - UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected UpdateEvent") }, - DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected DeleteEvent") }, - GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.RateLimitingInterface) { + GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() // The empty event should have been filtered out by the predicates, // and will not be passed to the handler. @@ -318,7 +339,7 @@ var _ = Describe("Source", func() { close(c) }, }, - source.WithPredicates(prct), + source.WithPredicates[client.Object, reconcile.Request](prct), ) err := instance.Start(ctx, q) Expect(err).NotTo(HaveOccurred()) @@ -334,24 +355,28 @@ var _ = Describe("Source", func() { evt := event.GenericEvent{} eventCount := 0 - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") + q := workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[reconcile.Request](), + workqueue.TypedRateLimitingQueueConfig[reconcile.Request]{ + Name: "test", + }) // Add a handler to get distribution blocked instance := source.Channel( ch, handler.Funcs{ - CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { + CreateFunc: func(context.Context, event.CreateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected CreateEvent") }, - UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected UpdateEvent") }, - DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected DeleteEvent") }, - GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.RateLimitingInterface) { + GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() // Block for the first time if eventCount == 0 { @@ -392,24 +417,28 @@ var _ = Describe("Source", func() { evt := event.GenericEvent{} ch <- evt - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") + q := workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[reconcile.Request](), + workqueue.TypedRateLimitingQueueConfig[reconcile.Request]{ + Name: "test", + }) // Add a handler to get distribution blocked instance := source.Channel( ch, handler.Funcs{ - CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { + CreateFunc: func(context.Context, event.CreateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected CreateEvent") }, - UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected UpdateEvent") }, - DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected DeleteEvent") }, - GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.RateLimitingInterface) { + GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() close(processed) @@ -423,7 +452,11 @@ var _ = Describe("Source", func() { <-processed }) It("should stop when the source channel is closed", func() { - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") + q := workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[reconcile.Request](), + workqueue.TypedRateLimitingQueueConfig[reconcile.Request]{ + Name: "test", + }) // if we didn't stop, we'd start spamming the queue with empty // messages as we "received" a zero-valued GenericEvent from // the source channel @@ -440,19 +473,19 @@ var _ = Describe("Source", func() { src := source.Channel( ch, handler.Funcs{ - CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { + CreateFunc: func(context.Context, event.CreateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected CreateEvent") }, - UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected UpdateEvent") }, - DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Unexpected DeleteEvent") }, - GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.RateLimitingInterface) { + GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() processed <- struct{}{} @@ -468,7 +501,11 @@ var _ = Describe("Source", func() { Consistently(processed).ShouldNot(Receive()) }) It("should get error if no source specified", func() { - q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") + q := workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[reconcile.Request](), + workqueue.TypedRateLimitingQueueConfig[reconcile.Request]{ + Name: "test", + }) instance := source.Channel[string](nil, nil /*no source specified*/) err := instance.Start(ctx, q) Expect(err).To(Equal(fmt.Errorf("must specify Channel.Source")))