Skip to content

Commit

Permalink
fix: hydration race for terminating nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Nov 29, 2024
1 parent 848b989 commit 8a97f3f
Show file tree
Hide file tree
Showing 7 changed files with 344 additions and 90 deletions.
98 changes: 98 additions & 0 deletions pkg/controllers/node/hydration/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package hydration

import (
"context"
"fmt"

"github.com/awslabs/operatorpkg/reasonable"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/controller"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/operator/injection"
"sigs.k8s.io/karpenter/pkg/utils/node"
nodeclaimutils "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
)

// Controller hydrates information to the Node which is expected in newer versions of Karpenter, but would not exist on
// pre-existing nodes.
type Controller struct {
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
}

func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) *Controller {
return &Controller{
kubeClient: kubeClient,
cloudProvider: cloudProvider,
}
}

func (c *Controller) Reconcile(ctx context.Context, n *corev1.Node) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, c.Name())
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Node", klog.KRef(n.Namespace, n.Name)))

nc, err := node.NodeClaimForNode(ctx, c.kubeClient, n)
if err != nil {
if node.IsDuplicateNodeClaimError(err) || node.IsNodeClaimNotFoundError(err) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, fmt.Errorf("hydrating node, %w", err)
}
if !nodeclaimutils.IsManaged(nc, c.cloudProvider) {
return reconcile.Result{}, nil
}

stored := n.DeepCopy()
n.Labels = lo.Assign(n.Labels, map[string]string{
v1.NodeClassLabelKey(nc.Spec.NodeClassRef.GroupKind()): nc.Spec.NodeClassRef.Name,
})
if !equality.Semantic.DeepEqual(stored, n) {
if err := c.kubeClient.Patch(ctx, n, client.MergeFrom(stored)); err != nil {
return reconcile.Result{}, err
}
}
return reconcile.Result{}, nil
}

func (c *Controller) Name() string {
return "node.hydration"
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named(c.Name()).
For(&corev1.Node{}).
Watches(&v1.NodeClaim{}, node.NodeClaimEventHandler(c.kubeClient)).
WithOptions(controller.Options{
RateLimiter: reasonable.RateLimiter(),
MaxConcurrentReconciles: 1000,
}).
Complete(reconcile.AsReconciler(m.GetClient(), c))
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,55 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package lifecycle_test
package hydration_test

import (
"fmt"
"context"
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/samber/lo"

"sigs.k8s.io/karpenter/pkg/apis"
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider/fake"
"sigs.k8s.io/karpenter/pkg/controllers/node/hydration"
"sigs.k8s.io/karpenter/pkg/operator/options"
"sigs.k8s.io/karpenter/pkg/test"
. "sigs.k8s.io/karpenter/pkg/test/expectations"
"sigs.k8s.io/karpenter/pkg/test/v1alpha1"
. "sigs.k8s.io/karpenter/pkg/utils/testing"
)

var ctx context.Context
var hydrationController *hydration.Controller
var env *test.Environment
var cloudProvider *fake.CloudProvider

func TestAPIs(t *testing.T) {
ctx = TestContextWithLogger(t)
RegisterFailHandler(Fail)
RunSpecs(t, "Lifecycle")
}

var _ = BeforeSuite(func() {
env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...), test.WithFieldIndexers(test.NodeProviderIDFieldIndexer(ctx), test.NodeClaimProviderIDFieldIndexer(ctx)))
ctx = options.ToContext(ctx, test.Options())

cloudProvider = fake.NewCloudProvider()
hydrationController = hydration.NewController(env.Client, cloudProvider)
})

var _ = AfterSuite(func() {
Expect(env.Stop()).To(Succeed(), "Failed to stop environment")
})

var _ = AfterEach(func() {
ExpectCleanedUp(ctx, env.Client)
cloudProvider.Reset()
})

var _ = Describe("Hydration", func() {
DescribeTable(
"Hydration",
Expand All @@ -46,31 +81,19 @@ var _ = Describe("Hydration", func() {
NodeClassRef: nodeClassRef,
},
})
delete(nodeClaim.Labels, v1.NodeClassLabelKey(nodeClassRef.GroupKind()))
delete(node.Labels, v1.NodeClassLabelKey(nodeClassRef.GroupKind()))
// Launch the NodeClaim to ensure the lifecycle controller doesn't override the provider-id and break the
// link between the Node and NodeClaim.
nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeLaunched)
ExpectApplied(ctx, env.Client, nodeClaim, node)
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, hydrationController, node)

// The missing NodeClass label should have been propagated to both the Node and NodeClaim
// The missing NodeClass label should have been propagated to the Node
node = ExpectExists(ctx, env.Client, node)
fmt.Printf("provider id: %s\n", node.Spec.ProviderID)
value, ok := node.Labels[v1.NodeClassLabelKey(nodeClassRef.GroupKind())]
Expect(ok).To(Equal(isNodeClaimManaged))
if isNodeClaimManaged {
Expect(value).To(Equal(nodeClassRef.Name))
}

nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
value, ok = nodeClaim.Labels[v1.NodeClassLabelKey(nodeClassRef.GroupKind())]
Expect(ok).To(Equal(isNodeClaimManaged))
if isNodeClaimManaged {
Expect(value).To(Equal(nodeClassRef.Name))
}
},
Entry("should hydrate missing metadata onto the NodeClaim and Node", true),
Entry("should ignore NodeClaims which aren't managed by this Karpenter instance", false),
Entry("should hydrate missing metadata onto the Node", true),
Entry("should ignore Nodes which aren't managed by this Karpenter instance", false),
)
})
89 changes: 89 additions & 0 deletions pkg/controllers/nodeclaim/hydration/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package hydration

import (
"context"

"github.com/awslabs/operatorpkg/reasonable"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/controller"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/operator/injection"
nodeclaimutils "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
)

// Controller hydrates information to the NodeClaim which is expected in newer versions of Karpenter, but would not
// exist on pre-existing NodeClaims.
type Controller struct {
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
}

func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) *Controller {
return &Controller{
kubeClient: kubeClient,
cloudProvider: cloudProvider,
}
}

func (c *Controller) Reconcile(ctx context.Context, nc *v1.NodeClaim) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, c.Name())
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("NodeClaim", klog.KRef(nc.Namespace, nc.Name)))
if !nodeclaimutils.IsManaged(nc, c.cloudProvider) {
return reconcile.Result{}, nil
}

stored := nc.DeepCopy()
nc.Labels = lo.Assign(nc.Labels, map[string]string{
v1.NodeClassLabelKey(nc.Spec.NodeClassRef.GroupKind()): nc.Spec.NodeClassRef.Name,
})
if !equality.Semantic.DeepEqual(stored, nc) {
if err := c.kubeClient.Patch(ctx, nc, client.MergeFrom(stored)); err != nil {
return reconcile.Result{}, err
}
}
return reconcile.Result{}, nil
}

func (c *Controller) Name() string {
return "nodeclaim.hydration"
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named(c.Name()).
For(&v1.NodeClaim{}, builder.WithPredicates(nodeclaimutils.IsManagedPredicateFuncs(c.cloudProvider))).
Watches(&corev1.Node{}, nodeclaimutils.NodeEventHandler(c.kubeClient, c.cloudProvider)).
WithOptions(controller.Options{
RateLimiter: reasonable.RateLimiter(),
MaxConcurrentReconciles: 1000,
}).
Complete(reconcile.AsReconciler(m.GetClient(), c))
}
98 changes: 98 additions & 0 deletions pkg/controllers/nodeclaim/hydration/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package hydration_test

import (
"context"
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/samber/lo"

"sigs.k8s.io/karpenter/pkg/apis"
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider/fake"
"sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/hydration"
"sigs.k8s.io/karpenter/pkg/operator/options"
"sigs.k8s.io/karpenter/pkg/test"
. "sigs.k8s.io/karpenter/pkg/test/expectations"
"sigs.k8s.io/karpenter/pkg/test/v1alpha1"
. "sigs.k8s.io/karpenter/pkg/utils/testing"
)

var ctx context.Context
var hydrationController *hydration.Controller
var env *test.Environment
var cloudProvider *fake.CloudProvider

func TestAPIs(t *testing.T) {
ctx = TestContextWithLogger(t)
RegisterFailHandler(Fail)
RunSpecs(t, "Lifecycle")
}

var _ = BeforeSuite(func() {
env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...), test.WithFieldIndexers(test.NodeProviderIDFieldIndexer(ctx)))
ctx = options.ToContext(ctx, test.Options())

cloudProvider = fake.NewCloudProvider()
hydrationController = hydration.NewController(env.Client, cloudProvider)
})

var _ = AfterSuite(func() {
Expect(env.Stop()).To(Succeed(), "Failed to stop environment")
})

var _ = AfterEach(func() {
ExpectCleanedUp(ctx, env.Client)
cloudProvider.Reset()
})

var _ = Describe("Hydration", func() {
DescribeTable(
"Hydration",
func(isNodeClaimManaged bool) {
nodeClassRef := lo.Ternary(isNodeClaimManaged, &v1.NodeClassReference{
Group: "karpenter.test.sh",
Kind: "TestNodeClass",
Name: "default",
}, &v1.NodeClassReference{
Group: "karpenter.test.sh",
Kind: "UnmanagedNodeClass",
Name: "default",
})
nodeClaim, _ := test.NodeClaimAndNode(v1.NodeClaim{
Spec: v1.NodeClaimSpec{
NodeClassRef: nodeClassRef,
},
})
delete(nodeClaim.Labels, v1.NodeClassLabelKey(nodeClassRef.GroupKind()))
ExpectApplied(ctx, env.Client, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, hydrationController, nodeClaim)

nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
value, ok := nodeClaim.Labels[v1.NodeClassLabelKey(nodeClassRef.GroupKind())]
Expect(ok).To(Equal(isNodeClaimManaged))
if isNodeClaimManaged {
Expect(value).To(Equal(nodeClassRef.Name))
}
},
Entry("should hydrate missing metadata onto the NodeClaim", true),
Entry("should ignore NodeClaims which aren't managed by this Karpenter instance", false),
)
})
Loading

0 comments on commit 8a97f3f

Please sign in to comment.