Skip to content

Commit

Permalink
feat: Centralize leaked ENI cleanup (#374)
Browse files Browse the repository at this point in the history
* feat: centralized eni cleanup
  • Loading branch information
sushrk authored Mar 13, 2024
1 parent a177073 commit e3c6c05
Show file tree
Hide file tree
Showing 34 changed files with 1,254 additions and 454 deletions.
1 change: 1 addition & 0 deletions PROJECT
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ resources:
version: v1beta1
- api:
crdVersion: v1
controller: true
domain: k8s.aws
group: vpcresources
kind: CNINode
Expand Down
2 changes: 2 additions & 0 deletions apis/vpcresources/v1alpha1/cninode_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Feature struct {
// CNINodeSpec defines the desired state of CNINode
type CNINodeSpec struct {
Features []Feature `json:"features,omitempty"`
// Additional tag key/value added to all network interfaces provisioned by the vpc-resource-controller and VPC-CNI
Tags map[string]string `json:"tags,omitempty"`
}

// CNINodeStatus defines the managed VPC resources.
Expand Down
7 changes: 7 additions & 0 deletions apis/vpcresources/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions config/crd/bases/vpcresources.k8s.aws_cninodes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ spec:
type: string
type: object
type: array
tags:
additionalProperties:
type: string
description: Additional tag key/value added to all network interfaces
provisioned by the vpc-resource-controller and VPC-CNI
type: object
type: object
status:
description: CNINodeStatus defines the managed VPC resources.
Expand Down
2 changes: 2 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ rules:
- create
- get
- list
- patch
- update
- watch
- apiGroups:
- vpcresources.k8s.aws
Expand Down
12 changes: 2 additions & 10 deletions controllers/core/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/aws/amazon-vpc-resource-controller-k8s/apis/vpcresources/v1alpha1"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/condition"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config"
rcHealthz "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/healthz"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/k8s"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/node/manager"
Expand All @@ -36,15 +37,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/healthz"
)

// MaxNodeConcurrentReconciles is the number of go routines that can invoke
// Reconcile in parallel. Since Node Reconciler, performs local operation
// on cache only a single go routine should be sufficient. Using more than
// one routines to help high rate churn and larger nodes groups restarting
// when the controller has to be restarted for various reasons.
const (
MaxNodeConcurrentReconciles = 10
)

// NodeReconciler reconciles a Node object
type NodeReconciler struct {
client.Client
Expand Down Expand Up @@ -117,7 +109,7 @@ func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager, healthzHandler *rcHe

return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Node{}).
WithOptions(controller.Options{MaxConcurrentReconciles: MaxNodeConcurrentReconciles}).
WithOptions(controller.Options{MaxConcurrentReconciles: config.MaxNodeConcurrentReconciles}).
Owns(&v1alpha1.CNINode{}).
Complete(r)
}
Expand Down
223 changes: 223 additions & 0 deletions controllers/crds/cninode_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package crds

import (
"context"
"fmt"
"time"

"github.com/aws/amazon-vpc-resource-controller-k8s/apis/vpcresources/v1alpha1"
ec2API "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api/cleanup"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/k8s"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/utils"
"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)

var (
prometheusRegistered = false
recreateCNINodeCallCount = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "recreate_cniNode_call_count",
Help: "The number of requests made by controller to recreate CNINode when node exists",
},
)
recreateCNINodeErrCount = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "recreate_cniNode_err_count",
Help: "The number of requests that failed when controller tried to recreate the CNINode",
},
)
)

func prometheusRegister() {
prometheusRegistered = true

metrics.Registry.MustRegister(
recreateCNINodeCallCount,
recreateCNINodeErrCount)

prometheusRegistered = true
}

// CNINodeReconciler reconciles a CNINode object
type CNINodeReconciler struct {
client.Client
Scheme *runtime.Scheme
Context context.Context
Log logr.Logger
EC2Wrapper ec2API.EC2Wrapper
K8sAPI k8s.K8sWrapper
ClusterName string
VPCID string
FinalizerManager k8s.FinalizerManager
}

//+kubebuilder:rbac:groups=vpcresources.k8s.aws,resources=cninodes,verbs=get;list;watch;create;update;patch;

// Reconcile handles CNINode create/update/delete events
// Reconciler will add the finalizer and cluster name tag if it does not exist and finalize on CNINode on deletion to clean up leaked resource on node
func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
cniNode := &v1alpha1.CNINode{}
if err := r.Client.Get(ctx, req.NamespacedName, cniNode); err != nil {
if errors.IsNotFound(err) {
r.Log.Info("CNINode is deleted", "CNINode", req.NamespacedName)
}
// Ignore not found error
return ctrl.Result{}, client.IgnoreNotFound(err)
}

if cniNode.GetDeletionTimestamp().IsZero() {
// Add cluster name tag if it does not exist
clusterNameTagKey := fmt.Sprintf(config.ClusterNameTagKeyFormat, r.ClusterName)
val, ok := cniNode.Spec.Tags[clusterNameTagKey]
if !ok || val != config.ClusterNameTagValue {
cniNodeCopy := cniNode.DeepCopy()
if len(cniNodeCopy.Spec.Tags) != 0 {
cniNodeCopy.Spec.Tags[clusterNameTagKey] = config.ClusterNameTagValue
} else {
cniNodeCopy.Spec.Tags = map[string]string{
clusterNameTagKey: config.ClusterNameTagValue,
}
}
return ctrl.Result{}, r.Client.Patch(ctx, cniNodeCopy, client.MergeFromWithOptions(cniNode, client.MergeFromWithOptimisticLock{}))
}
if err := r.FinalizerManager.AddFinalizers(ctx, cniNode, config.NodeTerminationFinalizer); err != nil {
r.Log.Error(err, "failed to add finalizer on CNINode, will retry", "cniNode", cniNode.Name, "finalizer", config.NodeTerminationFinalizer)
return ctrl.Result{}, err
}
r.Log.Info("added finalizer on cninode", "finalizer", config.NodeTerminationFinalizer, "cniNode", cniNode.Name)
return ctrl.Result{}, nil

} else { // CNINode is marked for deletion
// check if node object exists
node := &v1.Node{}
if err := r.Client.Get(ctx, req.NamespacedName, node); err != nil {
if errors.IsNotFound(err) {
// node is also deleted, proceed with running the cleanup routine and remove the finalizer
r.Log.Info("running the finalizer routine on cniNode", "cniNode", cniNode.Name)
cleaner := &cleanup.NodeTerminationCleaner{
NodeName: cniNode.Name,
}
cleaner.ENICleaner = &cleanup.ENICleaner{
EC2Wrapper: r.EC2Wrapper,
Manager: cleaner,
VPCID: r.VPCID,
Log: ctrl.Log.WithName("eniCleaner").WithName("node"),
}
// Return err if failed to delete leaked ENIs on node so it can be retried
if err := cleaner.DeleteLeakedResources(); err != nil {
r.Log.Error(err, "failed to cleanup resources during node termination, request will be requeued")
return ctrl.Result{}, err
}
if err = r.FinalizerManager.RemoveFinalizers(ctx, cniNode, config.NodeTerminationFinalizer); err != nil {
r.Log.Error(err, "failed to remove finalizer on CNINode, will retry", "cniNode", cniNode.Name, "finalizer", config.NodeTerminationFinalizer)
return ctrl.Result{}, err
}
r.Log.Info("removed finalizer on cniNode", "finalizer", config.NodeTerminationFinalizer, "cniNode", cniNode.Name)
return ctrl.Result{}, nil
} else {
r.Log.Error(err, "failed to get the node object in CNINode reconciliation, will retry")
// Requeue request so it can be retried
return ctrl.Result{}, err
}
} else {
// node exists, do not run the cleanup routine(periodic cleanup routine will anyway delete leaked ENIs), remove the finalizer
// to proceed with object deletion, and recreate similar object

// Create a copy without deletion timestamp for creation
newCNINode := &v1alpha1.CNINode{
ObjectMeta: metav1.ObjectMeta{
Name: cniNode.Name,
Namespace: "",
OwnerReferences: cniNode.OwnerReferences,
// TODO: should we include finalizers at object creation or let controller patch it on Create/Update event?
Finalizers: cniNode.Finalizers,
},
Spec: cniNode.Spec,
}

if err := r.FinalizerManager.RemoveFinalizers(ctx, cniNode, config.NodeTerminationFinalizer); err != nil {
r.Log.Error(err, "failed to remove finalizer on CNINode on node deletion, will retry")
return ctrl.Result{}, err
}
// wait till CNINode is deleted before recreation as the new object will be created with same name to avoid "object already exists" error
if err := r.waitTillCNINodeDeleted(k8s.NamespacedName(newCNINode)); err != nil {
// raise event if CNINode could not be deleted after removing the finalizer
r.K8sAPI.BroadcastEvent(cniNode, utils.CNINodeDeleteFailed, "CNINode deletion failed and object could not be recreated by the vpc-resource-controller, will retry",
v1.EventTypeWarning)
// requeue here to check if CNINode deletion is successful and retry CNINode deletion if node exists
return ctrl.Result{}, err
}

r.Log.Info("creating CNINode after it has been deleted as node still exists", "cniNode", newCNINode.Name)
recreateCNINodeCallCount.Inc()
if err := r.createCNINodeFromObj(ctx, newCNINode); err != nil {
recreateCNINodeErrCount.Inc()
// raise event on node publish warning that CNINode is deleted and could not be recreated by controller
utils.SendNodeEventWithNodeName(r.K8sAPI, node.Name, utils.CNINodeCreateFailed,
fmt.Sprint("CNINode was deleted and failed to be recreated by the vpc-resource-controller"), v1.EventTypeWarning, r.Log)
// return nil as deleted and we cannot recreate the object now
return ctrl.Result{}, nil
}
r.Log.Info("successfully recreated CNINode", "cniNode", newCNINode.Name)
}
}
return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *CNINodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
if !prometheusRegistered {
prometheusRegister()
}
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.CNINode{}).
WithOptions(controller.Options{MaxConcurrentReconciles: config.MaxNodeConcurrentReconciles}).
Complete(r)
}

// waitTillCNINodeDeleted waits for CNINode to be deleted with timeout and returns error
func (r *CNINodeReconciler) waitTillCNINodeDeleted(nameSpacedCNINode types.NamespacedName) error {
oldCNINode := &v1alpha1.CNINode{}

return wait.PollUntilContextTimeout(context.TODO(), 500*time.Millisecond, time.Second*3, true, func(ctx context.Context) (bool, error) {
if err := r.Client.Get(ctx, nameSpacedCNINode, oldCNINode); err != nil && errors.IsNotFound(err) {
return true, nil
}
return false, nil
})
}

// createCNINodeFromObj will create CNINode with backoff and returns error if CNINode is not recreated
func (r *CNINodeReconciler) createCNINodeFromObj(ctx context.Context, newCNINode client.Object) error {
return retry.OnError(retry.DefaultBackoff, func(error) bool { return true },
func() error {
return r.Client.Create(ctx, newCNINode)
})
}
79 changes: 79 additions & 0 deletions controllers/crds/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package crds

import (
"path/filepath"
"testing"

"github.com/aws/amazon-vpc-resource-controller-k8s/apis/vpcresources/v1alpha1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
//+kubebuilder:scaffold:imports
)

// These tests use Ginkgo (BDD-style Go testing framework). Refer to
// http://onsi.github.io/ginkgo/ to learn more about Ginkgo.

var cfg *rest.Config
var k8sClient client.Client
var testEnv *envtest.Environment

func TestAPIs(t *testing.T) {
RegisterFailHandler(Fail)

RunSpecs(t, "Controller Suite")
}

var _ = BeforeSuite(func() {
done := make(chan interface{})
go func() {
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))

By("bootstrapping test environment")
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")},
ErrorIfCRDPathMissing: false,
}

var err error
// cfg is defined in this file globally.
cfg, err = testEnv.Start()
Expect(err).NotTo(HaveOccurred())
Expect(cfg).NotTo(BeNil())

err = v1alpha1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())
//+kubebuilder:scaffold:scheme

k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient).NotTo(BeNil())
}()
Eventually(done, 60).Should(BeClosed())

})

var _ = AfterSuite(func() {
By("tearing down the test environment")
err := testEnv.Stop()
Expect(err).NotTo(HaveOccurred())
})
Loading

0 comments on commit e3c6c05

Please sign in to comment.