From 462d20cd66687e4bc84b0862d330316d47244602 Mon Sep 17 00:00:00 2001 From: Jonathan Innis <joinnis@amazon.com> Date: Mon, 3 Jun 2024 00:34:50 -0700 Subject: [PATCH] Convert singleton reconcilers to operatorpkg --- go.mod | 9 ++++---- go.sum | 14 +++++++------ hack/code/prices_gen/main.go | 4 +--- pkg/controllers/controllers.go | 2 +- pkg/controllers/interruption/controller.go | 16 ++++++++------ pkg/controllers/interruption/suite_test.go | 21 +++++++++---------- .../nodeclaim/garbagecollection/controller.go | 13 ++++++++---- .../nodeclaim/garbagecollection/suite_test.go | 20 ++++++++---------- .../providers/instancetype/controller.go | 13 ++++++++---- .../providers/instancetype/suite_test.go | 9 ++++---- .../providers/pricing/controller.go | 14 ++++++++----- .../providers/pricing/suite_test.go | 19 ++++++++--------- test/pkg/debug/monitor.go | 3 +-- .../suites/integration/kubelet_config_test.go | 2 +- 14 files changed, 86 insertions(+), 73 deletions(-) diff --git a/go.mod b/go.mod index 9130b9a49903..47f3ef03bd3b 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/aws/karpenter-provider-aws -go 1.22.3 +go 1.22.4 require ( github.com/Pallinder/go-randomdata v1.2.0 @@ -9,7 +9,7 @@ require ( github.com/aws/aws-sdk-go v1.53.14 github.com/aws/karpenter-provider-aws/tools/kompat v0.0.0-20240410220356-6b868db24881 github.com/awslabs/amazon-eks-ami/nodeadm v0.0.0-20240229193347-cfab22a10647 - github.com/awslabs/operatorpkg v0.0.0-20240518001059-1e35978ba21b + github.com/awslabs/operatorpkg v0.0.0-20240605172541-88cf99023fa4 github.com/go-logr/zapr v1.3.0 github.com/imdario/mergo v0.3.16 github.com/mitchellh/hashstructure/v2 v2.0.2 @@ -30,7 +30,7 @@ require ( k8s.io/utils v0.0.0-20240102154912-e7106e64919e knative.dev/pkg v0.0.0-20231010144348-ca8c009405dd sigs.k8s.io/controller-runtime v0.18.3 - sigs.k8s.io/karpenter v0.37.0 + sigs.k8s.io/karpenter v0.37.1-0.20240605225346-c7c5068db687 sigs.k8s.io/yaml v1.4.0 ) @@ -51,7 +51,7 @@ require ( github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-kit/log v0.2.1 // indirect github.com/go-logfmt/logfmt v0.6.0 // indirect - github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/logr v1.4.2 // indirect github.com/go-openapi/jsonpointer v0.20.0 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.22.4 // indirect @@ -113,6 +113,7 @@ require ( k8s.io/component-base v0.30.1 // indirect k8s.io/csi-translation-lib v0.30.1 // indirect k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect + k8s.io/kubernetes v1.30.1 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect ) diff --git a/go.sum b/go.sum index 598d2ae19b53..9c9981f2964c 100644 --- a/go.sum +++ b/go.sum @@ -60,8 +60,8 @@ github.com/aws/karpenter-provider-aws/tools/kompat v0.0.0-20240410220356-6b868db github.com/aws/karpenter-provider-aws/tools/kompat v0.0.0-20240410220356-6b868db24881/go.mod h1:+Mk5k0b6HpKobxNq+B56DOhZ+I/NiPhd5MIBhQMSTSs= github.com/awslabs/amazon-eks-ami/nodeadm v0.0.0-20240229193347-cfab22a10647 h1:8yRBVsjGmI7qQsPWtIrbWP+XfwHO9Wq7gdLVzjqiZFs= github.com/awslabs/amazon-eks-ami/nodeadm v0.0.0-20240229193347-cfab22a10647/go.mod h1:9NafTAUHL0FlMeL6Cu5PXnMZ1q/LnC9X2emLXHsVbM8= -github.com/awslabs/operatorpkg v0.0.0-20240518001059-1e35978ba21b h1:bmlbw6EjSDoZEWbGE2rnXDsCgbTsxMyufM4NRRHaLVk= -github.com/awslabs/operatorpkg v0.0.0-20240518001059-1e35978ba21b/go.mod h1:YcidmUg8Pjk349+jd+sRCdo6h3jzxqAY1VDNgVJKbSA= +github.com/awslabs/operatorpkg v0.0.0-20240605172541-88cf99023fa4 h1:EVFVrteX0PQuofO9Ah4rf4aGyUkBM3lLuKgzwilAEAg= +github.com/awslabs/operatorpkg v0.0.0-20240605172541-88cf99023fa4/go.mod h1:OR0NDOTl6XUXKgcksUab5d7mCnpaZf7Ko4eWEbheJTY= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -115,8 +115,8 @@ github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4= github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= -github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= -github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/zapr v1.3.0 h1:XGdV8XW8zdwFiwOA2Dryh1gj2KRQyOOoNmBy4EplIcQ= github.com/go-logr/zapr v1.3.0/go.mod h1:YKepepNBd1u/oyhd/yQmtjVXmm9uML4IXUgMOwR8/Gg= github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs= @@ -750,6 +750,8 @@ k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw= k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag= k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340/go.mod h1:yD4MZYeKMBwQKVht279WycxKyM84kkAx2DPrTXaeb98= +k8s.io/kubernetes v1.30.1 h1:XlqS6KslLEA5mQzLK2AJrhr4Z1m8oJfkhHiWJ5lue+I= +k8s.io/kubernetes v1.30.1/go.mod h1:yPbIk3MhmhGigX62FLJm+CphNtjxqCvAIFQXup6RKS0= k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ= k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= knative.dev/pkg v0.0.0-20231010144348-ca8c009405dd h1:KJXBX9dOmRTUWduHg1gnWtPGIEl+GMh8UHdrBEZgOXE= @@ -761,8 +763,8 @@ sigs.k8s.io/controller-runtime v0.18.3 h1:B5Wmmo8WMWK7izei+2LlXLVDGzMwAHBNLX68lw sigs.k8s.io/controller-runtime v0.18.3/go.mod h1:TVoGrfdpbA9VRFaRnKgk9P5/atA0pMwq+f+msb9M8Sg= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= -sigs.k8s.io/karpenter v0.37.0 h1:eUFD9hJ2mpZrw31OUYhpbxLWEDmbXT05wX27dZB2E5o= -sigs.k8s.io/karpenter v0.37.0/go.mod h1:5XYrIz9Bi7HgQyaUsx7O08ft+TJjrH+htlnPq8Sz9J8= +sigs.k8s.io/karpenter v0.37.1-0.20240605225346-c7c5068db687 h1:I4JBstyyyAwKDXJsb86HAQRPvR2VowsqF81F734L2G8= +sigs.k8s.io/karpenter v0.37.1-0.20240605225346-c7c5068db687/go.mod h1:SO2WU8Li+bwkLiiEYrRmuhP0QUseZCyYgo6OoXgd9uk= sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4= sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= diff --git a/hack/code/prices_gen/main.go b/hack/code/prices_gen/main.go index 1bf74fbcd76f..837eeaedb360 100644 --- a/hack/code/prices_gen/main.go +++ b/hack/code/prices_gen/main.go @@ -31,8 +31,6 @@ import ( "github.com/aws/aws-sdk-go/aws/session" ec22 "github.com/aws/aws-sdk-go/service/ec2" "github.com/samber/lo" - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/reconcile" controllerspricing "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/pricing" "github.com/aws/karpenter-provider-aws/pkg/operator/options" @@ -110,7 +108,7 @@ func main() { log.Println("fetching for", region) pricingProvider := pricing.NewDefaultProvider(ctx, pricing.NewAPI(sess, region), ec2, region) controller := controllerspricing.NewController(pricingProvider) - _, err := controller.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{}}) + _, err := controller.Reconcile(ctx) if err != nil { log.Fatalf("failed to initialize pricing provider %s", err) } diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index 282ad695f2c1..b270658e95ef 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -17,8 +17,8 @@ package controllers import ( "context" + "github.com/awslabs/operatorpkg/controller" "sigs.k8s.io/karpenter/pkg/cloudprovider" - "sigs.k8s.io/karpenter/pkg/operator/controller" nodeclasshash "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass/hash" nodeclassstatus "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass/status" diff --git a/pkg/controllers/interruption/controller.go b/pkg/controllers/interruption/controller.go index e5addf69f5e3..cabf5960b0be 100644 --- a/pkg/controllers/interruption/controller.go +++ b/pkg/controllers/interruption/controller.go @@ -20,6 +20,7 @@ import ( "time" sqsapi "github.com/aws/aws-sdk-go/service/sqs" + "github.com/awslabs/operatorpkg/singleton" "github.com/prometheus/client_golang/prometheus" "github.com/samber/lo" "go.uber.org/multierr" @@ -27,11 +28,13 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "k8s.io/utils/clock" + controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/karpenter/pkg/metrics" + "sigs.k8s.io/karpenter/pkg/operator/injection" "sigs.k8s.io/karpenter/pkg/apis/v1beta1" "sigs.k8s.io/karpenter/pkg/utils/pretty" @@ -44,7 +47,6 @@ import ( "github.com/aws/karpenter-provider-aws/pkg/utils" "sigs.k8s.io/karpenter/pkg/events" - corecontroller "sigs.k8s.io/karpenter/pkg/operator/controller" ) type Action string @@ -81,7 +83,8 @@ func NewController(kubeClient client.Client, clk clock.Clock, recorder events.Re } } -func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) { +func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) { + ctx = injection.WithControllerName(ctx, "interruption") ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("queue", c.sqsProvider.Name())) if c.cm.HasChanged(c.sqsProvider.Name(), nil) { log.FromContext(ctx).V(1).Info("watching interruption queue") @@ -91,7 +94,7 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc return reconcile.Result{}, fmt.Errorf("getting messages from queue, %w", err) } if len(sqsMessages) == 0 { - return reconcile.Result{}, nil + return reconcile.Result{RequeueAfter: singleton.RequeueImmediately}, nil } nodeClaimInstanceIDMap, err := c.makeNodeClaimInstanceIDMap(ctx) if err != nil { @@ -119,13 +122,14 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc if err = multierr.Combine(errs...); err != nil { return reconcile.Result{}, err } - return reconcile.Result{}, nil + return reconcile.Result{RequeueAfter: singleton.RequeueImmediately}, nil } func (c *Controller) Register(_ context.Context, m manager.Manager) error { - return corecontroller.NewSingletonManagedBy(m). + return controllerruntime.NewControllerManagedBy(m). Named("interruption"). - Complete(c) + WatchesRawSource(singleton.Source()). + Complete(singleton.AsReconciler(c)) } // parseMessage parses the passed SQS message into an internal Message interface diff --git a/pkg/controllers/interruption/suite_test.go b/pkg/controllers/interruption/suite_test.go index 20d9ac2e30a1..ca786b360be3 100644 --- a/pkg/controllers/interruption/suite_test.go +++ b/pkg/controllers/interruption/suite_test.go @@ -27,7 +27,6 @@ import ( "github.com/samber/lo" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/tools/record" clock "k8s.io/utils/clock/testing" @@ -119,7 +118,7 @@ var _ = Describe("InterruptionHandling", func() { ExpectMessagesCreated(spotInterruptionMessage(lo.Must(utils.ParseInstanceID(nodeClaim.Status.ProviderID)))) ExpectApplied(ctx, env.Client, nodeClaim, node) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) Expect(sqsapi.ReceiveMessageBehavior.SuccessfulCalls()).To(Equal(1)) ExpectNotFound(ctx, env.Client, nodeClaim) Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(1)) @@ -128,7 +127,7 @@ var _ = Describe("InterruptionHandling", func() { ExpectMessagesCreated(scheduledChangeMessage(lo.Must(utils.ParseInstanceID(nodeClaim.Status.ProviderID)))) ExpectApplied(ctx, env.Client, nodeClaim, node) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) Expect(sqsapi.ReceiveMessageBehavior.SuccessfulCalls()).To(Equal(1)) ExpectNotFound(ctx, env.Client, nodeClaim) Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(1)) @@ -153,7 +152,7 @@ var _ = Describe("InterruptionHandling", func() { messages = append(messages, stateChangeMessage(instanceID, state)) } ExpectMessagesCreated(messages...) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) Expect(sqsapi.ReceiveMessageBehavior.SuccessfulCalls()).To(Equal(1)) ExpectNotFound(ctx, env.Client, lo.Map(nodeClaims, func(nc *corev1beta1.NodeClaim, _ int) client.Object { return nc })...) Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(4)) @@ -183,7 +182,7 @@ var _ = Describe("InterruptionHandling", func() { messages = append(messages, spotInterruptionMessage(id)) } ExpectMessagesCreated(messages...) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) Expect(sqsapi.ReceiveMessageBehavior.SuccessfulCalls()).To(Equal(1)) ExpectNotFound(ctx, env.Client, lo.Map(nodeClaims, func(nc *corev1beta1.NodeClaim, _ int) client.Object { return nc })...) Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(100)) @@ -199,7 +198,7 @@ var _ = Describe("InterruptionHandling", func() { ExpectMessagesCreated(badMessage) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) Expect(sqsapi.ReceiveMessageBehavior.SuccessfulCalls()).To(Equal(1)) Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(1)) }) @@ -207,7 +206,7 @@ var _ = Describe("InterruptionHandling", func() { ExpectMessagesCreated(stateChangeMessage(lo.Must(utils.ParseInstanceID(nodeClaim.Status.ProviderID)), "creating")) ExpectApplied(ctx, env.Client, nodeClaim, node) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) Expect(sqsapi.ReceiveMessageBehavior.SuccessfulCalls()).To(Equal(1)) ExpectExists(ctx, env.Client, nodeClaim) Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(1)) @@ -221,7 +220,7 @@ var _ = Describe("InterruptionHandling", func() { ExpectMessagesCreated(spotInterruptionMessage(lo.Must(utils.ParseInstanceID(nodeClaim.Status.ProviderID)))) ExpectApplied(ctx, env.Client, nodeClaim, node) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) Expect(sqsapi.ReceiveMessageBehavior.SuccessfulCalls()).To(Equal(1)) ExpectNotFound(ctx, env.Client, nodeClaim) Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(1)) @@ -235,15 +234,15 @@ var _ = Describe("InterruptionHandling", func() { var _ = Describe("Error Handling", func() { It("should send an error on polling when QueueNotExists", func() { sqsapi.ReceiveMessageBehavior.Error.Set(awsErrWithCode(servicesqs.ErrCodeQueueDoesNotExist), fake.MaxCalls(0)) - ExpectReconcileFailed(ctx, controller, types.NamespacedName{}) + _ = ExpectSingletonReconcileFailed(ctx, controller) }) It("should send an error on polling when AccessDenied", func() { sqsapi.ReceiveMessageBehavior.Error.Set(awsErrWithCode("AccessDenied"), fake.MaxCalls(0)) - ExpectReconcileFailed(ctx, controller, types.NamespacedName{}) + _ = ExpectSingletonReconcileFailed(ctx, controller) }) It("should not return an error when deleting a nodeClaim that is already deleted", func() { ExpectMessagesCreated(spotInterruptionMessage(fake.InstanceID())) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) }) }) diff --git a/pkg/controllers/nodeclaim/garbagecollection/controller.go b/pkg/controllers/nodeclaim/garbagecollection/controller.go index 71748eaaa02c..04b27aa88af5 100644 --- a/pkg/controllers/nodeclaim/garbagecollection/controller.go +++ b/pkg/controllers/nodeclaim/garbagecollection/controller.go @@ -19,20 +19,22 @@ import ( "fmt" "time" + "github.com/awslabs/operatorpkg/singleton" "github.com/samber/lo" "go.uber.org/multierr" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/karpenter/pkg/cloudprovider" + "sigs.k8s.io/karpenter/pkg/operator/injection" "sigs.k8s.io/karpenter/pkg/apis/v1beta1" - "sigs.k8s.io/karpenter/pkg/operator/controller" ) type Controller struct { @@ -49,7 +51,9 @@ func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudPr } } -func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) { +func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) { + ctx = injection.WithControllerName(ctx, "nodeclaim.garbagecollection") + // We LIST machines on the CloudProvider BEFORE we grab Machines/Nodes on the cluster so that we make sure that, if // LISTing instances takes a long time, our information is more updated by the time we get to Machine and Node LIST // This works since our CloudProvider instances are deleted based on whether the Machine exists or not, not vise-versa @@ -105,7 +109,8 @@ func (c *Controller) garbageCollect(ctx context.Context, nodeClaim *v1beta1.Node } func (c *Controller) Register(_ context.Context, m manager.Manager) error { - return controller.NewSingletonManagedBy(m). + return controllerruntime.NewControllerManagedBy(m). Named("nodeclaim.garbagecollection"). - Complete(c) + WatchesRawSource(singleton.Source()). + Complete(singleton.AsReconciler(c)) } diff --git a/pkg/controllers/nodeclaim/garbagecollection/suite_test.go b/pkg/controllers/nodeclaim/garbagecollection/suite_test.go index 0446d774da30..c8d37bcf5a48 100644 --- a/pkg/controllers/nodeclaim/garbagecollection/suite_test.go +++ b/pkg/controllers/nodeclaim/garbagecollection/suite_test.go @@ -22,11 +22,9 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" "github.com/samber/lo" v1 "k8s.io/api/core/v1" - "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/aws/aws-sdk-go/service/ec2" "k8s.io/client-go/tools/record" corev1beta1 "sigs.k8s.io/karpenter/pkg/apis/v1beta1" corecloudprovider "sigs.k8s.io/karpenter/pkg/cloudprovider" @@ -136,7 +134,7 @@ var _ = Describe("GarbageCollection", func() { instance.LaunchTime = aws.Time(time.Now().Add(-time.Minute)) awsEnv.EC2API.Instances.Store(aws.StringValue(instance.InstanceId), instance) - ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{}) + ExpectSingletonReconciled(ctx, garbageCollectionController) _, err := cloudProvider.Get(ctx, providerID) Expect(err).To(HaveOccurred()) Expect(corecloudprovider.IsNodeClaimNotFoundError(err)).To(BeTrue()) @@ -151,7 +149,7 @@ var _ = Describe("GarbageCollection", func() { }) ExpectApplied(ctx, env.Client, node) - ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{}) + ExpectSingletonReconciled(ctx, garbageCollectionController) _, err := cloudProvider.Get(ctx, providerID) Expect(err).To(HaveOccurred()) Expect(corecloudprovider.IsNodeClaimNotFoundError(err)).To(BeTrue()) @@ -199,7 +197,7 @@ var _ = Describe("GarbageCollection", func() { ) ids = append(ids, instanceID) } - ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{}) + ExpectSingletonReconciled(ctx, garbageCollectionController) wg := sync.WaitGroup{} for _, id := range ids { @@ -257,7 +255,7 @@ var _ = Describe("GarbageCollection", func() { nodeClaims = append(nodeClaims, nodeClaim) ids = append(ids, instanceID) } - ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{}) + ExpectSingletonReconciled(ctx, garbageCollectionController) wg := sync.WaitGroup{} for _, id := range ids { @@ -281,7 +279,7 @@ var _ = Describe("GarbageCollection", func() { instance.LaunchTime = aws.Time(time.Now()) awsEnv.EC2API.Instances.Store(aws.StringValue(instance.InstanceId), instance) - ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{}) + ExpectSingletonReconciled(ctx, garbageCollectionController) _, err := cloudProvider.Get(ctx, providerID) Expect(err).NotTo(HaveOccurred()) }) @@ -295,7 +293,7 @@ var _ = Describe("GarbageCollection", func() { instance.LaunchTime = aws.Time(time.Now().Add(-time.Minute)) awsEnv.EC2API.Instances.Store(aws.StringValue(instance.InstanceId), instance) - ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{}) + ExpectSingletonReconciled(ctx, garbageCollectionController) _, err := cloudProvider.Get(ctx, providerID) Expect(err).NotTo(HaveOccurred()) }) @@ -319,7 +317,7 @@ var _ = Describe("GarbageCollection", func() { }) ExpectApplied(ctx, env.Client, nodeClaim, node) - ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{}) + ExpectSingletonReconciled(ctx, garbageCollectionController) _, err := cloudProvider.Get(ctx, providerID) Expect(err).ToNot(HaveOccurred()) ExpectExists(ctx, env.Client, node) @@ -377,7 +375,7 @@ var _ = Describe("GarbageCollection", func() { ids = append(ids, instanceID) nodes = append(nodes, node) } - ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{}) + ExpectSingletonReconciled(ctx, garbageCollectionController) wg := sync.WaitGroup{} for i := range ids { diff --git a/pkg/controllers/providers/instancetype/controller.go b/pkg/controllers/providers/instancetype/controller.go index 0768c81d1abb..38537ab88007 100644 --- a/pkg/controllers/providers/instancetype/controller.go +++ b/pkg/controllers/providers/instancetype/controller.go @@ -19,11 +19,13 @@ import ( "fmt" "time" + "github.com/awslabs/operatorpkg/singleton" lop "github.com/samber/lo/parallel" "go.uber.org/multierr" + controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/karpenter/pkg/operator/controller" + "sigs.k8s.io/karpenter/pkg/operator/injection" "github.com/aws/karpenter-provider-aws/pkg/providers/instancetype" ) @@ -38,7 +40,9 @@ func NewController(instancetypeProvider instancetype.Provider) *Controller { } } -func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) { +func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) { + ctx = injection.WithControllerName(ctx, "providers.instancetype") + work := []func(ctx context.Context) error{ c.instancetypeProvider.UpdateInstanceTypes, c.instancetypeProvider.UpdateInstanceTypeOfferings, @@ -57,7 +61,8 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc func (c *Controller) Register(_ context.Context, m manager.Manager) error { // Includes a default exponential failure rate limiter of base: time.Millisecond, and max: 1000*time.Second - return controller.NewSingletonManagedBy(m). + return controllerruntime.NewControllerManagedBy(m). Named("providers.instancetype"). - Complete(c) + WatchesRawSource(singleton.Source()). + Complete(singleton.AsReconciler(c)) } diff --git a/pkg/controllers/providers/instancetype/suite_test.go b/pkg/controllers/providers/instancetype/suite_test.go index 4a156e5d9ae6..e520d6dce547 100644 --- a/pkg/controllers/providers/instancetype/suite_test.go +++ b/pkg/controllers/providers/instancetype/suite_test.go @@ -19,7 +19,6 @@ import ( "testing" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" corev1beta1 "sigs.k8s.io/karpenter/pkg/apis/v1beta1" coreoptions "sigs.k8s.io/karpenter/pkg/operator/options" "sigs.k8s.io/karpenter/pkg/operator/scheme" @@ -89,7 +88,7 @@ var _ = Describe("InstanceType", func() { InstanceTypeOfferings: ec2Offerings, }) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) instanceTypes, err := awsEnv.InstanceTypesProvider.List(ctx, &corev1beta1.KubeletConfiguration{}, &v1beta1.EC2NodeClass{ Status: v1beta1.EC2NodeClassStatus{ Subnets: []v1beta1.Subnet{ @@ -123,7 +122,7 @@ var _ = Describe("InstanceType", func() { InstanceTypeOfferings: ec2Offerings, }) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) instanceTypes, err := awsEnv.InstanceTypesProvider.List(ctx, &corev1beta1.KubeletConfiguration{}, &v1beta1.EC2NodeClass{ Status: v1beta1.EC2NodeClassStatus{ Subnets: []v1beta1.Subnet{ @@ -158,14 +157,14 @@ var _ = Describe("InstanceType", func() { It("should not update instance type date with response from the DescribeInstanceTypes API is empty", func() { awsEnv.EC2API.DescribeInstanceTypesOutput.Set(&ec2.DescribeInstanceTypesOutput{}) awsEnv.EC2API.DescribeInstanceTypeOfferingsOutput.Set(&ec2.DescribeInstanceTypeOfferingsOutput{}) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) _, err := awsEnv.InstanceTypesProvider.List(ctx, &corev1beta1.KubeletConfiguration{}, &v1beta1.EC2NodeClass{}) Expect(err).ToNot(BeNil()) }) It("should not update instance type offering date with response from the DescribeInstanceTypesOfferings API", func() { awsEnv.EC2API.DescribeInstanceTypesOutput.Set(&ec2.DescribeInstanceTypesOutput{}) awsEnv.EC2API.DescribeInstanceTypeOfferingsOutput.Set(&ec2.DescribeInstanceTypeOfferingsOutput{}) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) _, err := awsEnv.InstanceTypesProvider.List(ctx, &corev1beta1.KubeletConfiguration{}, &v1beta1.EC2NodeClass{}) Expect(err).ToNot(BeNil()) }) diff --git a/pkg/controllers/providers/pricing/controller.go b/pkg/controllers/providers/pricing/controller.go index 8cd42a8fd489..24daae768bc6 100644 --- a/pkg/controllers/providers/pricing/controller.go +++ b/pkg/controllers/providers/pricing/controller.go @@ -19,12 +19,13 @@ import ( "fmt" "time" + "github.com/awslabs/operatorpkg/singleton" lop "github.com/samber/lo/parallel" "go.uber.org/multierr" + controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" - - "sigs.k8s.io/karpenter/pkg/operator/controller" + "sigs.k8s.io/karpenter/pkg/operator/injection" "github.com/aws/karpenter-provider-aws/pkg/providers/pricing" ) @@ -39,7 +40,9 @@ func NewController(pricingProvider pricing.Provider) *Controller { } } -func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) { +func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) { + ctx = injection.WithControllerName(ctx, "providers.pricing") + work := []func(ctx context.Context) error{ c.pricingProvider.UpdateSpotPricing, c.pricingProvider.UpdateOnDemandPricing, @@ -57,7 +60,8 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc } func (c *Controller) Register(_ context.Context, m manager.Manager) error { - return controller.NewSingletonManagedBy(m). + return controllerruntime.NewControllerManagedBy(m). Named("providers.pricing"). - Complete(c) + WatchesRawSource(singleton.Source()). + Complete(singleton.AsReconciler(c)) } diff --git a/pkg/controllers/providers/pricing/suite_test.go b/pkg/controllers/providers/pricing/suite_test.go index 8941d0f85cac..ba48bccda745 100644 --- a/pkg/controllers/providers/pricing/suite_test.go +++ b/pkg/controllers/providers/pricing/suite_test.go @@ -24,7 +24,6 @@ import ( "github.com/aws/aws-sdk-go/service/ec2" awspricing "github.com/aws/aws-sdk-go/service/pricing" "github.com/samber/lo" - "k8s.io/apimachinery/pkg/types" coreoptions "sigs.k8s.io/karpenter/pkg/operator/options" "sigs.k8s.io/karpenter/pkg/operator/scheme" coretest "sigs.k8s.io/karpenter/pkg/test" @@ -98,14 +97,14 @@ var _ = Describe("Pricing", func() { ) It("should return static on-demand data if pricing API fails", func() { awsEnv.PricingAPI.NextError.Set(fmt.Errorf("failed")) - ExpectReconcileFailed(ctx, controller, types.NamespacedName{}) + _ = ExpectSingletonReconcileFailed(ctx, controller) price, ok := awsEnv.PricingProvider.OnDemandPrice("c5.large") Expect(ok).To(BeTrue()) Expect(price).To(BeNumerically(">", 0)) }) It("should return static spot data if EC2 describeSpotPriceHistory API fails", func() { awsEnv.PricingAPI.NextError.Set(fmt.Errorf("failed")) - ExpectReconcileFailed(ctx, controller, types.NamespacedName{}) + _ = ExpectSingletonReconcileFailed(ctx, controller) price, ok := awsEnv.PricingProvider.SpotPrice("c5.large", "test-zone-1a") Expect(ok).To(BeTrue()) Expect(price).To(BeNumerically(">", 0)) @@ -119,7 +118,7 @@ var _ = Describe("Pricing", func() { fake.NewOnDemandPrice("c99.large", 1.23), }, }) - ExpectReconcileFailed(ctx, controller, types.NamespacedName{}) + _ = ExpectSingletonReconcileFailed(ctx, controller) price, ok := awsEnv.PricingProvider.OnDemandPrice("c98.large") Expect(ok).To(BeTrue()) @@ -165,7 +164,7 @@ var _ = Describe("Pricing", func() { fake.NewOnDemandPrice("c99.large", 1.23), }, }) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) price, ok := awsEnv.PricingProvider.SpotPrice("c98.large", "test-zone-1b") Expect(ok).To(BeTrue()) @@ -199,7 +198,7 @@ var _ = Describe("Pricing", func() { fake.NewOnDemandPrice("c99.large", 1.23), }, }) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) price, ok := awsEnv.PricingProvider.SpotPrice("c98.large", "test-zone-1a") Expect(ok).To(BeTrue()) @@ -226,7 +225,7 @@ var _ = Describe("Pricing", func() { fake.NewOnDemandPrice("c99.large", 1.23), }, }) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) _, ok := awsEnv.PricingProvider.SpotPrice("c99.large", "test-zone-1b") Expect(ok).To(BeFalse()) @@ -253,7 +252,7 @@ var _ = Describe("Pricing", func() { fake.NewOnDemandPrice("c99.large", 1.23), }, }) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) inp := awsEnv.EC2API.DescribeSpotPriceHistoryInput.Clone() Expect(lo.Map(inp.ProductDescriptions, func(x *string, _ int) string { return *x })). To(ContainElements("Linux/UNIX", "Linux/UNIX (Amazon VPC)")) @@ -288,7 +287,7 @@ var _ = Describe("Pricing", func() { fake.NewOnDemandPrice("c5.xlarge", 1.23), }, }) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) price, ok := awsEnv.PricingProvider.OnDemandPrice("c3.2xlarge") Expect(ok).To(BeTrue()) Expect(price).To(BeNumerically("==", 0.420000)) @@ -318,7 +317,7 @@ var _ = Describe("Pricing", func() { fake.NewOnDemandPriceWithCurrency("c99.large", 1.23, "CNY"), }, }) - ExpectReconcileSucceeded(ctx, tmpController, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, tmpController) price, ok := tmpPricingProvider.OnDemandPrice("c98.large") Expect(ok).To(BeTrue()) diff --git a/test/pkg/debug/monitor.go b/test/pkg/debug/monitor.go index c0f8588b8654..d0234726364b 100644 --- a/test/pkg/debug/monitor.go +++ b/test/pkg/debug/monitor.go @@ -18,6 +18,7 @@ import ( "context" "sync" + "github.com/awslabs/operatorpkg/controller" "github.com/samber/lo" "k8s.io/client-go/rest" controllerruntime "sigs.k8s.io/controller-runtime" @@ -25,8 +26,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics/server" - "sigs.k8s.io/karpenter/pkg/operator/controller" - "sigs.k8s.io/karpenter/pkg/operator/scheme" ) diff --git a/test/suites/integration/kubelet_config_test.go b/test/suites/integration/kubelet_config_test.go index 7f59366245ba..5030c9496394 100644 --- a/test/suites/integration/kubelet_config_test.go +++ b/test/suites/integration/kubelet_config_test.go @@ -157,7 +157,7 @@ var _ = Describe("KubeletConfiguration Overrides", func() { // Get the DS pod count and use it to calculate the DS pod overhead dsCount := env.GetDaemonSetCount(nodePool) nodePool.Spec.Template.Spec.Kubelet = &corev1beta1.KubeletConfiguration{ - MaxPods: lo.ToPtr(int32(1 + int32(dsCount))), + MaxPods: lo.ToPtr(1 + int32(dsCount)), } numPods := 3