From de7470aaacf6c04503d366e92ffa592bec785a9c Mon Sep 17 00:00:00 2001 From: Yash Thakkar Date: Fri, 20 Sep 2024 15:47:19 -0700 Subject: [PATCH] Adding cninode leak metrics and cninode finalizer handler. (#476) * add finalizer handler in v1.4 * fix an err variable * adding logs for mismatched CNINode * add metrics for mismatches Co-authored-by: Hao Zhou --- controllers/core/node_controller.go | 60 ++++++++++++++++--- controllers/core/node_controller_test.go | 41 +++++++++++++ .../pkg/aws/ec2/mock_instance.go | 15 +++++ pkg/aws/ec2/instance.go | 8 +++ pkg/node/manager/manager.go | 2 +- pkg/provider/branch/trunk/trunk.go | 47 +++++++++++++++ pkg/provider/branch/trunk/trunk_test.go | 2 + 7 files changed, 166 insertions(+), 9 deletions(-) diff --git a/controllers/core/node_controller.go b/controllers/core/node_controller.go index 8a1f8b0e..6e750eec 100644 --- a/controllers/core/node_controller.go +++ b/controllers/core/node_controller.go @@ -15,6 +15,7 @@ package controllers import ( "context" + "fmt" "net/http" "time" @@ -24,6 +25,8 @@ import ( "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/k8s" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/node/manager" "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus" + "sigs.k8s.io/controller-runtime/pkg/metrics" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" @@ -33,9 +36,21 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/healthz" ) +var ( + leakedCNINodeResourceCount = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "orphaned_cninode_objects", + Help: "The number of leaked cninode resources", + }, + ) + + prometheusRegistered = false +) + // MaxNodeConcurrentReconciles is the number of go routines that can invoke // Reconcile in parallel. Since Node Reconciler, performs local operation // on cache only a single go routine should be sufficient. Using more than @@ -43,6 +58,7 @@ import ( // when the controller has to be restarted for various reasons. const ( MaxNodeConcurrentReconciles = 10 + NodeTerminationFinalizer = "networking.k8s.aws/resource-cleanup" ) // NodeReconciler reconciles a Node object @@ -73,27 +89,45 @@ func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. } node := &corev1.Node{} - var err error logger := r.Log.WithValues("node", req.NamespacedName) - if err := r.Client.Get(ctx, req.NamespacedName, node); err != nil { - if errors.IsNotFound(err) { - r.Log.V(1).Info("the requested node couldn't be found by k8s client", "Node", req.NamespacedName) + if nodeErr := r.Client.Get(ctx, req.NamespacedName, node); nodeErr != nil { + if errors.IsNotFound(nodeErr) { + // clean up CNINode finalizer + cniNode := &v1alpha1.CNINode{} + if cninodeErr := r.Client.Get(ctx, req.NamespacedName, cniNode); cninodeErr == nil { + if yes := controllerutil.ContainsFinalizer(cniNode, NodeTerminationFinalizer); yes { + updated := cniNode.DeepCopy() + if yes = controllerutil.RemoveFinalizer(updated, NodeTerminationFinalizer); yes { + if err := r.Client.Patch(ctx, updated, client.MergeFrom(cniNode)); err != nil { + return ctrl.Result{}, err + } + r.Log.Info("removed leaked CNINode resource's finalizer", "cninode", cniNode.Name) + } + leakedCNINodeResourceCount.Inc() + } + } else if !errors.IsNotFound(cninodeErr) { + return ctrl.Result{}, fmt.Errorf("failed getting CNINode %s from cached client, %w", cniNode.Name, cninodeErr) + } + + // clean up local cached nodes _, found := r.Manager.GetNode(req.Name) if found { - err := r.Manager.DeleteNode(req.Name) - if err != nil { + cacheErr := r.Manager.DeleteNode(req.Name) + if cacheErr != nil { // The request is not retryable so not returning the error - logger.Error(err, "failed to delete node from manager") + logger.Error(cacheErr, "failed to delete node from manager") return ctrl.Result{}, nil } logger.V(1).Info("deleted the node from manager") } } - return ctrl.Result{}, client.IgnoreNotFound(err) + return ctrl.Result{}, client.IgnoreNotFound(nodeErr) } + var err error + _, found := r.Manager.GetNode(req.Name) if found { logger.V(1).Info("updating node") @@ -115,6 +149,8 @@ func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager, healthzHandler *rcHe map[string]healthz.Checker{"health-node-controller": r.Check()}, ) + prometheusRegister() + return ctrl.NewControllerManagedBy(mgr). For(&corev1.Node{}). WithOptions(controller.Options{MaxConcurrentReconciles: MaxNodeConcurrentReconciles}). @@ -152,3 +188,11 @@ func (r *NodeReconciler) Check() healthz.Checker { return err } } + +func prometheusRegister() { + if !prometheusRegistered { + metrics.Registry.MustRegister(leakedCNINodeResourceCount) + + prometheusRegistered = true + } +} diff --git a/controllers/core/node_controller_test.go b/controllers/core/node_controller_test.go index c592dccc..311a35b6 100644 --- a/controllers/core/node_controller_test.go +++ b/controllers/core/node_controller_test.go @@ -18,6 +18,7 @@ import ( "testing" "time" + "github.com/aws/amazon-vpc-resource-controller-k8s/apis/vpcresources/v1alpha1" mock_condition "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/condition" mock_node "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/node" mock_manager "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/node/manager" @@ -25,11 +26,13 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" fakeClient "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -61,6 +64,7 @@ func NewNodeMock(ctrl *gomock.Controller, mockObjects ...client.Object) NodeMock scheme := runtime.NewScheme() _ = corev1.AddToScheme(scheme) + _ = v1alpha1.AddToScheme(scheme) client := fakeClient.NewClientBuilder().WithScheme(scheme).WithObjects(mockObjects...).Build() return NodeMock{ @@ -139,6 +143,43 @@ func TestNodeReconciler_Reconcile_DeleteNonExistentNode(t *testing.T) { assert.Equal(t, res, reconcile.Result{}) } +func TestNodeReconciler_Reconcile_DeleteNonExistentNodesCNINode(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mock := NewNodeMock(ctrl) + cniNode := &v1alpha1.CNINode{ + ObjectMeta: v1.ObjectMeta{ + Name: mockNodeName, + Finalizers: []string{NodeTerminationFinalizer}, + }, + } + mock.Reconciler.Client = fakeClient.NewClientBuilder().WithScheme(mock.Reconciler.Scheme).WithObjects(cniNode).Build() + + mock.Conditions.EXPECT().GetPodDataStoreSyncStatus().Return(true) + mock.Manager.EXPECT().GetNode(mockNodeName).Return(mock.MockNode, false) + + original := &v1alpha1.CNINode{} + err := mock.Reconciler.Client.Get(context.TODO(), types.NamespacedName{Name: cniNode.Name}, original) + assert.NoError(t, err) + assert.True(t, controllerutil.ContainsFinalizer(original, NodeTerminationFinalizer), "the CNINode has finalizer") + + res, err := mock.Reconciler.Reconcile(context.TODO(), reconcileRequest) + assert.NoError(t, err) + assert.Equal(t, res, reconcile.Result{}) + + node := &corev1.Node{} + updated := &v1alpha1.CNINode{} + err = mock.Reconciler.Client.Get(context.TODO(), types.NamespacedName{Name: cniNode.Name}, node) + assert.Error(t, err, "the node shouldn't existing") + assert.True(t, errors.IsNotFound(err)) + + err = mock.Reconciler.Client.Get(context.TODO(), types.NamespacedName{Name: cniNode.Name}, updated) + assert.NoError(t, err) + assert.True(t, updated.Name == mockNodeName, "the CNINode should existing and waiting for finalizer removal") + assert.False(t, controllerutil.ContainsFinalizer(updated, NodeTerminationFinalizer), "CNINode finalizer should be removed when the node is gone") +} + func TestNodeReconciler_Reconcile_DeleteNonExistentUnmanagedNode(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/mocks/amazon-vcp-resource-controller-k8s/pkg/aws/ec2/mock_instance.go b/mocks/amazon-vcp-resource-controller-k8s/pkg/aws/ec2/mock_instance.go index d287cff4..92015b49 100644 --- a/mocks/amazon-vcp-resource-controller-k8s/pkg/aws/ec2/mock_instance.go +++ b/mocks/amazon-vcp-resource-controller-k8s/pkg/aws/ec2/mock_instance.go @@ -73,6 +73,21 @@ func (mr *MockEC2InstanceMockRecorder) FreeDeviceIndex(arg0 interface{}) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FreeDeviceIndex", reflect.TypeOf((*MockEC2Instance)(nil).FreeDeviceIndex), arg0) } +// GetCustomNetworkingSpec mocks base method. +func (m *MockEC2Instance) GetCustomNetworkingSpec() (string, []string) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetCustomNetworkingSpec") + ret0, _ := ret[0].(string) + ret1, _ := ret[1].([]string) + return ret0, ret1 +} + +// GetCustomNetworkingSpec indicates an expected call of GetCustomNetworkingSpec. +func (mr *MockEC2InstanceMockRecorder) GetCustomNetworkingSpec() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCustomNetworkingSpec", reflect.TypeOf((*MockEC2Instance)(nil).GetCustomNetworkingSpec)) +} + // GetHighestUnusedDeviceIndex mocks base method. func (m *MockEC2Instance) GetHighestUnusedDeviceIndex() (int64, error) { m.ctrl.T.Helper() diff --git a/pkg/aws/ec2/instance.go b/pkg/aws/ec2/instance.go index adde9058..a513e871 100644 --- a/pkg/aws/ec2/instance.go +++ b/pkg/aws/ec2/instance.go @@ -79,6 +79,7 @@ type EC2Instance interface { PrimaryNetworkInterfaceID() string CurrentInstanceSecurityGroups() []string SetNewCustomNetworkingSpec(subnetID string, securityGroup []string) + GetCustomNetworkingSpec() (subnetID string, securityGroup []string) UpdateCurrentSubnetAndCidrBlock(helper api.EC2APIHelper) error } @@ -311,3 +312,10 @@ func (i *ec2Instance) updateCurrentSubnetAndCidrBlock(ec2APIHelper api.EC2APIHel return nil } + +func (i *ec2Instance) GetCustomNetworkingSpec() (subnetID string, securityGroup []string) { + i.lock.RLock() + defer i.lock.RUnlock() + + return i.newCustomNetworkingSubnetID, i.newCustomNetworkingSecurityGroups +} diff --git a/pkg/node/manager/manager.go b/pkg/node/manager/manager.go index c9b6f9c2..2759e775 100644 --- a/pkg/node/manager/manager.go +++ b/pkg/node/manager/manager.go @@ -228,7 +228,7 @@ func (m *manager) CreateCNINodeIfNotExisting(node *v1.Node) error { } return err } else { - m.Log.V(1).Info("The CNINode is already existing", "CNINode", cniNode) + m.Log.Info("The CNINode is already existing", "cninode", cniNode.Name, "features", cniNode.Spec.Features) return nil } } diff --git a/pkg/provider/branch/trunk/trunk.go b/pkg/provider/branch/trunk/trunk.go index 1a5e1dd3..b4b9a699 100644 --- a/pkg/provider/branch/trunk/trunk.go +++ b/pkg/provider/branch/trunk/trunk.go @@ -16,6 +16,7 @@ package trunk import ( "encoding/json" "fmt" + "slices" "strconv" "strings" "sync" @@ -27,6 +28,7 @@ import ( "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/vpc" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/provider/branch/cooldown" + "github.com/samber/lo" "github.com/aws/aws-sdk-go/aws" awsEC2 "github.com/aws/aws-sdk-go/service/ec2" @@ -62,6 +64,13 @@ var ( }, []string{"operation"}, ) + unreconciledTrunkENICount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "unreconciled_trunk_network_interfaces", + Help: "The number of unreconciled trunk network interfaces", + }, + []string{"attribute"}, + ) branchENIOperationsSuccessCount = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "branch_eni_opeartions_success_count", @@ -173,6 +182,7 @@ func NewTrunkENI(logger logr.Logger, instance ec2.EC2Instance, helper api.EC2API func PrometheusRegister() { if !prometheusRegistered { metrics.Registry.MustRegister(trunkENIOperationsErrCount) + metrics.Registry.MustRegister(unreconciledTrunkENICount) metrics.Registry.MustRegister(branchENIOperationsSuccessCount) metrics.Registry.MustRegister(branchENIOperationsFailureCount) @@ -192,6 +202,7 @@ func (t *trunkENI) InitTrunk(instance ec2.EC2Instance, podList []v1.Pod) error { return err } + var trunk awsEC2.InstanceNetworkInterface // Get trunk network interface for _, nwInterface := range nwInterfaces { // It's possible to get an empty network interface response if the instance is being deleted. @@ -206,6 +217,7 @@ func (t *trunkENI) InitTrunk(instance ec2.EC2Instance, podList []v1.Pod) error { } else { return fmt.Errorf("failed to verify network interface status attached for %v", *nwInterface.NetworkInterfaceId) } + trunk = *nwInterface } } @@ -231,6 +243,41 @@ func (t *trunkENI) InitTrunk(instance ec2.EC2Instance, podList []v1.Pod) error { return nil } + // the node already have trunk, let's check if its SGs and Subnets match with expected + expectedSubnetID, expectedSecurityGroups := t.instance.GetCustomNetworkingSpec() + if len(expectedSecurityGroups) > 0 || expectedSubnetID != "" { + slices.Sort(expectedSecurityGroups) + trunkSGs := lo.Map(trunk.Groups, func(g *awsEC2.GroupIdentifier, _ int) string { + return lo.FromPtr(g.GroupId) + }) + slices.Sort(trunkSGs) + + mismatchedSubnets := expectedSubnetID != lo.FromPtr(trunk.SubnetId) + mismatchedSGs := !slices.Equal(expectedSecurityGroups, trunkSGs) + + extraSGsInTrunk, missingSGsInTrunk := lo.Difference(trunkSGs, expectedSecurityGroups) + t.log.Info("Observed trunk ENI config", + "instanceID", t.instance.InstanceID(), + "trunkENIID", lo.FromPtr(trunk.NetworkInterfaceId), + "configuredTrunkSGs", trunkSGs, + "configuredTrunkSubnet", lo.FromPtr(trunk.SubnetId), + "desiredTrunkSGs", expectedSecurityGroups, + "desiredTrunkSubnet", expectedSubnetID, + "mismatchedSGs", mismatchedSGs, + "mismatchedSubnets", mismatchedSubnets, + "missingSGs", missingSGsInTrunk, + "extraSGs", extraSGsInTrunk, + ) + + if mismatchedSGs { + unreconciledTrunkENICount.WithLabelValues("security_groups").Inc() + } + + if mismatchedSubnets { + unreconciledTrunkENICount.WithLabelValues("subnet").Inc() + } + } + // Get the list of branch ENIs branchInterfaces, err := t.ec2ApiHelper.GetBranchNetworkInterface(&t.trunkENIId, aws.String(t.instance.SubnetID())) if err != nil { diff --git a/pkg/provider/branch/trunk/trunk_test.go b/pkg/provider/branch/trunk/trunk_test.go index 2f0eed90..49dcaf0d 100644 --- a/pkg/provider/branch/trunk/trunk_test.go +++ b/pkg/provider/branch/trunk/trunk_test.go @@ -645,6 +645,7 @@ func TestTrunkENI_InitTrunk(t *testing.T) { name: "TrunkExists_WithBranches, verifies no error when trunk exists with branches", prepare: func(f *fields) { f.mockInstance.EXPECT().InstanceID().Return(InstanceId) + f.mockInstance.EXPECT().GetCustomNetworkingSpec().Return("", []string{}) f.mockEC2APIHelper.EXPECT().GetInstanceNetworkInterface(&InstanceId).Return(instanceNwInterfaces, nil) f.mockEC2APIHelper.EXPECT().WaitForNetworkInterfaceStatusChange(&trunkId, awsEc2.AttachmentStatusAttached).Return(nil) f.mockInstance.EXPECT().SubnetID().Return(SubnetId) @@ -674,6 +675,7 @@ func TestTrunkENI_InitTrunk(t *testing.T) { name: "TrunkExists_DanglingENIs, verifies ENIs are pushed to delete queue if no pod exists", prepare: func(f *fields) { f.mockInstance.EXPECT().InstanceID().Return(InstanceId) + f.mockInstance.EXPECT().GetCustomNetworkingSpec().Return("", []string{}) f.mockEC2APIHelper.EXPECT().GetInstanceNetworkInterface(&InstanceId).Return(instanceNwInterfaces, nil) f.mockEC2APIHelper.EXPECT().WaitForNetworkInterfaceStatusChange(&trunkId, awsEc2.AttachmentStatusAttached).Return(nil) f.mockInstance.EXPECT().SubnetID().Return(SubnetId)