From 8a97f3f8234e630b12b10b61f1ff96af65724344 Mon Sep 17 00:00:00 2001 From: Jason Deal Date: Fri, 29 Nov 2024 15:33:15 -0800 Subject: [PATCH] fix: hydration race for terminating nodes --- pkg/controllers/node/hydration/controller.go | 98 +++++++++++++++++++ .../hydration/suite_test.go} | 59 +++++++---- .../nodeclaim/hydration/controller.go | 89 +++++++++++++++++ .../nodeclaim/hydration/suite_test.go | 98 +++++++++++++++++++ .../nodeclaim/lifecycle/controller.go | 3 - .../nodeclaim/lifecycle/hydration.go | 69 ------------- pkg/utils/node/node.go | 18 ++++ 7 files changed, 344 insertions(+), 90 deletions(-) create mode 100644 pkg/controllers/node/hydration/controller.go rename pkg/controllers/{nodeclaim/lifecycle/hydration_test.go => node/hydration/suite_test.go} (56%) create mode 100644 pkg/controllers/nodeclaim/hydration/controller.go create mode 100644 pkg/controllers/nodeclaim/hydration/suite_test.go delete mode 100644 pkg/controllers/nodeclaim/lifecycle/hydration.go diff --git a/pkg/controllers/node/hydration/controller.go b/pkg/controllers/node/hydration/controller.go new file mode 100644 index 0000000000..f741a6e557 --- /dev/null +++ b/pkg/controllers/node/hydration/controller.go @@ -0,0 +1,98 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hydration + +import ( + "context" + "fmt" + + "github.com/awslabs/operatorpkg/reasonable" + "github.com/samber/lo" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/klog/v2" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/controller" + + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" + "sigs.k8s.io/karpenter/pkg/cloudprovider" + "sigs.k8s.io/karpenter/pkg/operator/injection" + "sigs.k8s.io/karpenter/pkg/utils/node" + nodeclaimutils "sigs.k8s.io/karpenter/pkg/utils/nodeclaim" +) + +// Controller hydrates information to the Node which is expected in newer versions of Karpenter, but would not exist on +// pre-existing nodes. +type Controller struct { + kubeClient client.Client + cloudProvider cloudprovider.CloudProvider +} + +func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) *Controller { + return &Controller{ + kubeClient: kubeClient, + cloudProvider: cloudProvider, + } +} + +func (c *Controller) Reconcile(ctx context.Context, n *corev1.Node) (reconcile.Result, error) { + ctx = injection.WithControllerName(ctx, c.Name()) + ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Node", klog.KRef(n.Namespace, n.Name))) + + nc, err := node.NodeClaimForNode(ctx, c.kubeClient, n) + if err != nil { + if node.IsDuplicateNodeClaimError(err) || node.IsNodeClaimNotFoundError(err) { + return reconcile.Result{}, nil + } + return reconcile.Result{}, fmt.Errorf("hydrating node, %w", err) + } + if !nodeclaimutils.IsManaged(nc, c.cloudProvider) { + return reconcile.Result{}, nil + } + + stored := n.DeepCopy() + n.Labels = lo.Assign(n.Labels, map[string]string{ + v1.NodeClassLabelKey(nc.Spec.NodeClassRef.GroupKind()): nc.Spec.NodeClassRef.Name, + }) + if !equality.Semantic.DeepEqual(stored, n) { + if err := c.kubeClient.Patch(ctx, n, client.MergeFrom(stored)); err != nil { + return reconcile.Result{}, err + } + } + return reconcile.Result{}, nil +} + +func (c *Controller) Name() string { + return "node.hydration" +} + +func (c *Controller) Register(_ context.Context, m manager.Manager) error { + return controllerruntime.NewControllerManagedBy(m). + Named(c.Name()). + For(&corev1.Node{}). + Watches(&v1.NodeClaim{}, node.NodeClaimEventHandler(c.kubeClient)). + WithOptions(controller.Options{ + RateLimiter: reasonable.RateLimiter(), + MaxConcurrentReconciles: 1000, + }). + Complete(reconcile.AsReconciler(m.GetClient(), c)) +} diff --git a/pkg/controllers/nodeclaim/lifecycle/hydration_test.go b/pkg/controllers/node/hydration/suite_test.go similarity index 56% rename from pkg/controllers/nodeclaim/lifecycle/hydration_test.go rename to pkg/controllers/node/hydration/suite_test.go index 9ada851a31..e01aad3580 100644 --- a/pkg/controllers/nodeclaim/lifecycle/hydration_test.go +++ b/pkg/controllers/node/hydration/suite_test.go @@ -14,20 +14,55 @@ See the License for the specific language governing permissions and limitations under the License. */ -package lifecycle_test +package hydration_test import ( - "fmt" + "context" + "testing" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/samber/lo" + "sigs.k8s.io/karpenter/pkg/apis" v1 "sigs.k8s.io/karpenter/pkg/apis/v1" + "sigs.k8s.io/karpenter/pkg/cloudprovider/fake" + "sigs.k8s.io/karpenter/pkg/controllers/node/hydration" + "sigs.k8s.io/karpenter/pkg/operator/options" "sigs.k8s.io/karpenter/pkg/test" . "sigs.k8s.io/karpenter/pkg/test/expectations" + "sigs.k8s.io/karpenter/pkg/test/v1alpha1" + . "sigs.k8s.io/karpenter/pkg/utils/testing" ) +var ctx context.Context +var hydrationController *hydration.Controller +var env *test.Environment +var cloudProvider *fake.CloudProvider + +func TestAPIs(t *testing.T) { + ctx = TestContextWithLogger(t) + RegisterFailHandler(Fail) + RunSpecs(t, "Lifecycle") +} + +var _ = BeforeSuite(func() { + env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...), test.WithFieldIndexers(test.NodeProviderIDFieldIndexer(ctx), test.NodeClaimProviderIDFieldIndexer(ctx))) + ctx = options.ToContext(ctx, test.Options()) + + cloudProvider = fake.NewCloudProvider() + hydrationController = hydration.NewController(env.Client, cloudProvider) +}) + +var _ = AfterSuite(func() { + Expect(env.Stop()).To(Succeed(), "Failed to stop environment") +}) + +var _ = AfterEach(func() { + ExpectCleanedUp(ctx, env.Client) + cloudProvider.Reset() +}) + var _ = Describe("Hydration", func() { DescribeTable( "Hydration", @@ -46,31 +81,19 @@ var _ = Describe("Hydration", func() { NodeClassRef: nodeClassRef, }, }) - delete(nodeClaim.Labels, v1.NodeClassLabelKey(nodeClassRef.GroupKind())) delete(node.Labels, v1.NodeClassLabelKey(nodeClassRef.GroupKind())) - // Launch the NodeClaim to ensure the lifecycle controller doesn't override the provider-id and break the - // link between the Node and NodeClaim. - nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeLaunched) ExpectApplied(ctx, env.Client, nodeClaim, node) - ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim) + ExpectObjectReconciled(ctx, env.Client, hydrationController, node) - // The missing NodeClass label should have been propagated to both the Node and NodeClaim + // The missing NodeClass label should have been propagated to the Node node = ExpectExists(ctx, env.Client, node) - fmt.Printf("provider id: %s\n", node.Spec.ProviderID) value, ok := node.Labels[v1.NodeClassLabelKey(nodeClassRef.GroupKind())] Expect(ok).To(Equal(isNodeClaimManaged)) if isNodeClaimManaged { Expect(value).To(Equal(nodeClassRef.Name)) } - - nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) - value, ok = nodeClaim.Labels[v1.NodeClassLabelKey(nodeClassRef.GroupKind())] - Expect(ok).To(Equal(isNodeClaimManaged)) - if isNodeClaimManaged { - Expect(value).To(Equal(nodeClassRef.Name)) - } }, - Entry("should hydrate missing metadata onto the NodeClaim and Node", true), - Entry("should ignore NodeClaims which aren't managed by this Karpenter instance", false), + Entry("should hydrate missing metadata onto the Node", true), + Entry("should ignore Nodes which aren't managed by this Karpenter instance", false), ) }) diff --git a/pkg/controllers/nodeclaim/hydration/controller.go b/pkg/controllers/nodeclaim/hydration/controller.go new file mode 100644 index 0000000000..e45535559a --- /dev/null +++ b/pkg/controllers/nodeclaim/hydration/controller.go @@ -0,0 +1,89 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hydration + +import ( + "context" + + "github.com/awslabs/operatorpkg/reasonable" + "github.com/samber/lo" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/klog/v2" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/controller" + + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" + "sigs.k8s.io/karpenter/pkg/cloudprovider" + "sigs.k8s.io/karpenter/pkg/operator/injection" + nodeclaimutils "sigs.k8s.io/karpenter/pkg/utils/nodeclaim" +) + +// Controller hydrates information to the NodeClaim which is expected in newer versions of Karpenter, but would not +// exist on pre-existing NodeClaims. +type Controller struct { + kubeClient client.Client + cloudProvider cloudprovider.CloudProvider +} + +func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) *Controller { + return &Controller{ + kubeClient: kubeClient, + cloudProvider: cloudProvider, + } +} + +func (c *Controller) Reconcile(ctx context.Context, nc *v1.NodeClaim) (reconcile.Result, error) { + ctx = injection.WithControllerName(ctx, c.Name()) + ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("NodeClaim", klog.KRef(nc.Namespace, nc.Name))) + if !nodeclaimutils.IsManaged(nc, c.cloudProvider) { + return reconcile.Result{}, nil + } + + stored := nc.DeepCopy() + nc.Labels = lo.Assign(nc.Labels, map[string]string{ + v1.NodeClassLabelKey(nc.Spec.NodeClassRef.GroupKind()): nc.Spec.NodeClassRef.Name, + }) + if !equality.Semantic.DeepEqual(stored, nc) { + if err := c.kubeClient.Patch(ctx, nc, client.MergeFrom(stored)); err != nil { + return reconcile.Result{}, err + } + } + return reconcile.Result{}, nil +} + +func (c *Controller) Name() string { + return "nodeclaim.hydration" +} + +func (c *Controller) Register(_ context.Context, m manager.Manager) error { + return controllerruntime.NewControllerManagedBy(m). + Named(c.Name()). + For(&v1.NodeClaim{}, builder.WithPredicates(nodeclaimutils.IsManagedPredicateFuncs(c.cloudProvider))). + Watches(&corev1.Node{}, nodeclaimutils.NodeEventHandler(c.kubeClient, c.cloudProvider)). + WithOptions(controller.Options{ + RateLimiter: reasonable.RateLimiter(), + MaxConcurrentReconciles: 1000, + }). + Complete(reconcile.AsReconciler(m.GetClient(), c)) +} diff --git a/pkg/controllers/nodeclaim/hydration/suite_test.go b/pkg/controllers/nodeclaim/hydration/suite_test.go new file mode 100644 index 0000000000..559eee4061 --- /dev/null +++ b/pkg/controllers/nodeclaim/hydration/suite_test.go @@ -0,0 +1,98 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hydration_test + +import ( + "context" + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/samber/lo" + + "sigs.k8s.io/karpenter/pkg/apis" + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" + "sigs.k8s.io/karpenter/pkg/cloudprovider/fake" + "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/hydration" + "sigs.k8s.io/karpenter/pkg/operator/options" + "sigs.k8s.io/karpenter/pkg/test" + . "sigs.k8s.io/karpenter/pkg/test/expectations" + "sigs.k8s.io/karpenter/pkg/test/v1alpha1" + . "sigs.k8s.io/karpenter/pkg/utils/testing" +) + +var ctx context.Context +var hydrationController *hydration.Controller +var env *test.Environment +var cloudProvider *fake.CloudProvider + +func TestAPIs(t *testing.T) { + ctx = TestContextWithLogger(t) + RegisterFailHandler(Fail) + RunSpecs(t, "Lifecycle") +} + +var _ = BeforeSuite(func() { + env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...), test.WithFieldIndexers(test.NodeProviderIDFieldIndexer(ctx))) + ctx = options.ToContext(ctx, test.Options()) + + cloudProvider = fake.NewCloudProvider() + hydrationController = hydration.NewController(env.Client, cloudProvider) +}) + +var _ = AfterSuite(func() { + Expect(env.Stop()).To(Succeed(), "Failed to stop environment") +}) + +var _ = AfterEach(func() { + ExpectCleanedUp(ctx, env.Client) + cloudProvider.Reset() +}) + +var _ = Describe("Hydration", func() { + DescribeTable( + "Hydration", + func(isNodeClaimManaged bool) { + nodeClassRef := lo.Ternary(isNodeClaimManaged, &v1.NodeClassReference{ + Group: "karpenter.test.sh", + Kind: "TestNodeClass", + Name: "default", + }, &v1.NodeClassReference{ + Group: "karpenter.test.sh", + Kind: "UnmanagedNodeClass", + Name: "default", + }) + nodeClaim, _ := test.NodeClaimAndNode(v1.NodeClaim{ + Spec: v1.NodeClaimSpec{ + NodeClassRef: nodeClassRef, + }, + }) + delete(nodeClaim.Labels, v1.NodeClassLabelKey(nodeClassRef.GroupKind())) + ExpectApplied(ctx, env.Client, nodeClaim) + ExpectObjectReconciled(ctx, env.Client, hydrationController, nodeClaim) + + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + value, ok := nodeClaim.Labels[v1.NodeClassLabelKey(nodeClassRef.GroupKind())] + Expect(ok).To(Equal(isNodeClaimManaged)) + if isNodeClaimManaged { + Expect(value).To(Equal(nodeClassRef.Name)) + } + }, + Entry("should hydrate missing metadata onto the NodeClaim", true), + Entry("should ignore NodeClaims which aren't managed by this Karpenter instance", false), + ) +}) diff --git a/pkg/controllers/nodeclaim/lifecycle/controller.go b/pkg/controllers/nodeclaim/lifecycle/controller.go index 89401dfdae..6293af6b94 100644 --- a/pkg/controllers/nodeclaim/lifecycle/controller.go +++ b/pkg/controllers/nodeclaim/lifecycle/controller.go @@ -69,7 +69,6 @@ type Controller struct { registration *Registration initialization *Initialization liveness *Liveness - hydration *Hydration } func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, recorder events.Recorder) *Controller { @@ -82,7 +81,6 @@ func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider clou registration: &Registration{kubeClient: kubeClient}, initialization: &Initialization{kubeClient: kubeClient}, liveness: &Liveness{clock: clk, kubeClient: kubeClient}, - hydration: &Hydration{kubeClient: kubeClient}, } } @@ -144,7 +142,6 @@ func (c *Controller) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (re c.registration, c.initialization, c.liveness, - c.hydration, } { res, err := reconciler.Reconcile(ctx, nodeClaim) errs = multierr.Append(errs, err) diff --git a/pkg/controllers/nodeclaim/lifecycle/hydration.go b/pkg/controllers/nodeclaim/lifecycle/hydration.go deleted file mode 100644 index 832f8480f5..0000000000 --- a/pkg/controllers/nodeclaim/lifecycle/hydration.go +++ /dev/null @@ -1,69 +0,0 @@ -/* -Copyright The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package lifecycle - -import ( - "context" - "fmt" - - "github.com/samber/lo" - "k8s.io/apimachinery/pkg/api/equality" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - - v1 "sigs.k8s.io/karpenter/pkg/apis/v1" - nodeclaimutils "sigs.k8s.io/karpenter/pkg/utils/nodeclaim" -) - -// The Hydration sub-reconciler is used to hydrate Nodes / NodeClaims with metadata added in new versions of Karpenter. -// TODO: Remove after a sufficiently long time from the v1.1 release -type Hydration struct { - kubeClient client.Client -} - -func (h *Hydration) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reconcile.Result, error) { - nodeClaim.Labels = lo.Assign(nodeClaim.Labels, map[string]string{ - v1.NodeClassLabelKey(nodeClaim.Spec.NodeClassRef.GroupKind()): nodeClaim.Spec.NodeClassRef.Name, - }) - if err := h.hydrateNode(ctx, nodeClaim); err != nil { - return reconcile.Result{}, client.IgnoreNotFound(fmt.Errorf("hydrating node, %w", err)) - } - return reconcile.Result{}, nil -} - -func (h *Hydration) hydrateNode(ctx context.Context, nodeClaim *v1.NodeClaim) error { - node, err := nodeclaimutils.NodeForNodeClaim(ctx, h.kubeClient, nodeClaim) - if err != nil { - if nodeclaimutils.IsNodeNotFoundError(err) { - return nil - } - return err - } - stored := node.DeepCopy() - node.Labels = lo.Assign(node.Labels, map[string]string{ - v1.NodeClassLabelKey(nodeClaim.Spec.NodeClassRef.GroupKind()): nodeClaim.Spec.NodeClassRef.Name, - }) - if !equality.Semantic.DeepEqual(stored, node) { - // We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch - // can cause races due to the fact that it fully replaces the list on a change - // Here, we are updating the taint list - if err := h.kubeClient.Patch(ctx, node, client.MergeFrom(stored)); err != nil { - return err - } - } - return nil -} diff --git a/pkg/utils/node/node.go b/pkg/utils/node/node.go index 0587c2fe1c..c308589dce 100644 --- a/pkg/utils/node/node.go +++ b/pkg/utils/node/node.go @@ -27,7 +27,9 @@ import ( corev1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" v1 "sigs.k8s.io/karpenter/pkg/apis/v1" "sigs.k8s.io/karpenter/pkg/cloudprovider" @@ -178,3 +180,19 @@ func IsManagedPredicateFuncs(cp cloudprovider.CloudProvider) predicate.Funcs { return IsManaged(o.(*corev1.Node), cp) }) } + +func NodeClaimEventHandler(c client.Client) handler.EventHandler { + return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) []reconcile.Request { + providerID := o.(*v1.NodeClaim).Status.ProviderID + if providerID == "" { + return nil + } + nodes := &corev1.NodeList{} + if err := c.List(ctx, nodes, client.MatchingFields{"spec.providerID": providerID}); err != nil { + return nil + } + return lo.Map(nodes.Items, func(n corev1.Node, _ int) reconcile.Request { + return reconcile.Request{NamespacedName: client.ObjectKeyFromObject(&n)} + }) + }) +}