Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Qiu <[email protected]>
  • Loading branch information
wenqiq committed Oct 8, 2024
1 parent 5e83337 commit f25edd1
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 63 deletions.
46 changes: 27 additions & 19 deletions pkg/controllers/subnet/subnet_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apimachineryruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -56,8 +57,7 @@ func (r *SubnetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr

subnetCR := &v1alpha1.Subnet{}
if err := r.Client.Get(ctx, req.NamespacedName, subnetCR); err != nil {
// IgnoreNotFound returns nil on NotFound errors.
if client.IgnoreNotFound(err) == nil {
if apierrors.IsNotFound(err) {
if err := r.deleteSubnetByName(req.Name, req.Namespace); err != nil {
log.Error(err, "failed to delete NSX Subnet", "Subnet", req.NamespacedName)
return ResultRequeue, err
Expand Down Expand Up @@ -147,23 +147,11 @@ func (r *SubnetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr

func (r *SubnetReconciler) deleteSubnetByID(subnetID string) error {
nsxSubnets := r.SubnetService.SubnetStore.GetByIndex(servicecommon.TagScopeSubnetCRUID, subnetID)
return r.deleteSubnets(nsxSubnets)
return r.deleteStaleSubnets(nsxSubnets)
}

func (r *SubnetReconciler) deleteSubnets(nsxSubnets []*model.VpcSubnet) error {
crdSubnetIDs, err := r.listSubnetCRs(context.Background())
if err != nil {
log.Error(err, "failed to list Subnet CRs")
return err
}
crdSubnetIDsSet := sets.NewString(crdSubnetIDs...)
for _, nsxSubnet := range nsxSubnets {
uid := nsxutil.FindTag(nsxSubnet.Tags, servicecommon.TagScopeSubnetCRUID)
if crdSubnetIDsSet.Has(uid) {
log.Info("Skipping deletion, Subnet CRD still exists in K8s", "ID", *nsxSubnet.Id)
continue
}

portNums := len(r.SubnetPortService.GetPortsOfSubnet(*nsxSubnet.Id))
if portNums > 0 {
err := fmt.Errorf("cannot delete Subnet %s, still attached by %d port(s)", *nsxSubnet.Id, portNums)
Expand All @@ -176,13 +164,33 @@ func (r *SubnetReconciler) deleteSubnets(nsxSubnets []*model.VpcSubnet) error {
}
log.Info("Successfully deleted Subnet", "ID", *nsxSubnet.Id)
}
log.Info("Successfully cleaned stale Subnets")
log.Info("Successfully cleaned Subnets")
return nil
}

func (r *SubnetReconciler) deleteStaleSubnets(nsxSubnets []*model.VpcSubnet) error {
crdSubnetIDs, err := r.listSubnetIDsFromCRs(context.Background())
if err != nil {
log.Error(err, "Failed to list Subnet CRs")
return err
}
crdSubnetIDsSet := sets.NewString(crdSubnetIDs...)
nsxSubnetsToDelete := make([]*model.VpcSubnet, 0, len(nsxSubnets))
for _, nsxSubnet := range nsxSubnets {
uid := nsxutil.FindTag(nsxSubnet.Tags, servicecommon.TagScopeSubnetCRUID)
if crdSubnetIDsSet.Has(uid) {
log.Info("Skipping deletion, Subnet CR still exists in K8s", "ID", *nsxSubnet.Id)
continue
}
nsxSubnetsToDelete = append(nsxSubnetsToDelete, nsxSubnet)
}
log.Info("Cleaning stale Subnets", "Count", len(nsxSubnetsToDelete))
return r.deleteSubnets(nsxSubnetsToDelete)
}

func (r *SubnetReconciler) deleteSubnetByName(name, ns string) error {
nsxSubnets := r.SubnetService.SubnetStore.GetByIndex(servicecommon.TagScopeSubnetCRNamespacedName, util.CombineNamespaceName(name, ns))
return r.deleteSubnets(nsxSubnets)
return r.deleteStaleSubnets(nsxSubnets)
}

func (r *SubnetReconciler) updateSubnetStatus(obj *v1alpha1.Subnet) error {
Expand Down Expand Up @@ -346,7 +354,7 @@ func (r *SubnetReconciler) setupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func (r *SubnetReconciler) listSubnetCRs(ctx context.Context) ([]string, error) {
func (r *SubnetReconciler) listSubnetIDsFromCRs(ctx context.Context) ([]string, error) {
crdSubnetList := &v1alpha1.SubnetList{}
err := r.Client.List(ctx, crdSubnetList)
if err != nil {
Expand All @@ -367,7 +375,7 @@ func (r *SubnetReconciler) collectGarbage(ctx context.Context) {
log.Info("Subnet garbage collection completed", "time", time.Since(startTime))
}()

crdSubnetIDs, err := r.listSubnetCRs(ctx)
crdSubnetIDs, err := r.listSubnetIDsFromCRs(ctx)
if err != nil {
log.Error(err, "failed to list Subnet CRs")
return
Expand Down
96 changes: 52 additions & 44 deletions pkg/controllers/subnetset/subnetset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apimachineryruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/vmware-tanzu/nsx-operator/pkg/metrics"
servicecommon "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/common"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/subnet"
nsxutil "github.com/vmware-tanzu/nsx-operator/pkg/nsx/util"
"github.com/vmware-tanzu/nsx-operator/pkg/util"
)

Expand Down Expand Up @@ -58,30 +60,26 @@ func (r *SubnetSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
metrics.CounterInc(r.SubnetService.NSXConfig, metrics.ControllerSyncTotal, MetricResTypeSubnetSet)

if err := r.Client.Get(ctx, req.NamespacedName, subnetsetCR); err != nil {
// IgnoreNotFound returns nil on NotFound errors.
if client.IgnoreNotFound(err) == nil {
if apierrors.IsNotFound(err) {
if err := r.deleteSubnetBySubnetSetName(req.Name, req.Namespace); err != nil {
log.Error(err, "failed to delete NSX Subnet", "SubnetSet", req.NamespacedName)
log.Error(err, "Failed to delete NSX Subnet", "SubnetSet", req.NamespacedName)
return ResultRequeue, err
}
return ResultNormal, nil
}
log.Error(err, "unable to fetch Subnet CR", "req", req.NamespacedName)
return ResultRequeue, err
}
if err := r.cleanStaleSubnetsForSubnetSet(subnetsetCR.Name, subnetsetCR.Namespace, string(subnetsetCR.UID)); err != nil {
log.Error(err, "Unable to fetch Subnet CR", "req", req.NamespacedName)
return ResultRequeue, err
}
if !subnetsetCR.ObjectMeta.DeletionTimestamp.IsZero() {
metrics.CounterInc(r.SubnetService.NSXConfig, metrics.ControllerDeleteTotal, MetricResTypeSubnetSet)
err := r.deleteSubnetForSubnetSet(*subnetsetCR, false)
if err != nil {
log.Error(err, "failed to delete NSX Subnet, retrying", "SubnetSet", req.NamespacedName)
log.Error(err, "Failed to delete NSX Subnet, retrying", "SubnetSet", req.NamespacedName)
deleteFail(r, ctx, subnetsetCR, err.Error())
return ResultRequeue, err
}
if err := r.Client.Delete(ctx, subnetsetCR); err != nil {
log.Error(err, "failed to delete Subnet CR, retrying", "SubnetSet", req.NamespacedName)
log.Error(err, "Failed to delete Subnet CR, retrying", "SubnetSet", req.NamespacedName)
deleteFail(r, ctx, subnetsetCR, err.Error())
return ResultRequeue, err
}
Expand All @@ -99,7 +97,7 @@ func (r *SubnetSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
vpcNetworkConfig := r.VPCService.GetVPCNetworkConfigByNamespace(subnetsetCR.Namespace)
if vpcNetworkConfig == nil {
err := fmt.Errorf("failed to find VPCNetworkConfig for namespace %s", subnetsetCR.Namespace)
log.Error(err, "operate failed, would retry exponentially", "SubnetSet", req.NamespacedName)
log.Error(err, "Operate failed, would retry exponentially", "SubnetSet", req.NamespacedName)
updateFail(r, ctx, subnetsetCR, err.Error())
return ResultRequeue, err
}
Expand All @@ -116,7 +114,7 @@ func (r *SubnetSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if specChanged {
err := r.Client.Update(ctx, subnetsetCR)
if err != nil {
log.Error(err, "update SubnetSet failed", "SubnetSet", req.NamespacedName)
log.Error(err, "Update SubnetSet failed", "SubnetSet", req.NamespacedName)
updateFail(r, ctx, subnetsetCR, err.Error())
return ResultRequeue, err
}
Expand All @@ -127,18 +125,18 @@ func (r *SubnetSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if len(nsxSubnets) > 0 {
tags := r.SubnetService.GenerateSubnetNSTags(subnetsetCR)
if tags == nil {
log.Error(nil, "failed to generate SubnetSet tags", "SubnetSet", req.NamespacedName)
log.Error(nil, "Failed to generate SubnetSet tags", "SubnetSet", req.NamespacedName)
return ResultRequeue, errors.New("failed to generate SubnetSet tags")
}
// tags cannot exceed maximum size 26
if len(tags) > servicecommon.TagsCountMax {
errorMsg := fmt.Sprintf("tags cannot exceed maximum size 26, tags length: %d", len(tags))
log.Error(nil, "exceed tags limit, would not retry", "SubnetSet", req.NamespacedName)
log.Error(nil, "Exceed tags limit, would not retry", "SubnetSet", req.NamespacedName)
updateFail(r, ctx, subnetsetCR, errorMsg)
return ResultNormal, nil
}
if err := r.SubnetService.UpdateSubnetSetTags(subnetsetCR.Namespace, nsxSubnets, tags); err != nil {
log.Error(err, "failed to update SubnetSet tags", "SubnetSet", req.NamespacedName)
log.Error(err, "Failed to update SubnetSet tags", "SubnetSet", req.NamespacedName)
}
}
updateSuccess(r, ctx, subnetsetCR)
Expand Down Expand Up @@ -207,9 +205,9 @@ func (r *SubnetSetReconciler) updateSubnetSetStatusConditions(ctx context.Contex
}
if conditionsUpdated {
if err := r.Client.Status().Update(ctx, subnetSet); err != nil {
log.Error(err, "failed to update status", "Name", subnetSet.Name, "Namespace", subnetSet.Namespace)
log.Error(err, "Failed to update status", "Name", subnetSet.Name, "Namespace", subnetSet.Namespace)
} else {
log.Info("updated SubnetSet", "Name", subnetSet.Name, "Namespace", subnetSet.Namespace, "New Conditions", newConditions)
log.Info("Updated SubnetSet", "Name", subnetSet.Name, "Namespace", subnetSet.Namespace, "New Conditions", newConditions)
}
}
}
Expand All @@ -218,7 +216,7 @@ func (r *SubnetSetReconciler) mergeSubnetSetStatusCondition(ctx context.Context,
matchedCondition := getExistingConditionOfType(newCondition.Type, subnetSet.Status.Conditions)

if reflect.DeepEqual(matchedCondition, newCondition) {
log.V(2).Info("conditions already match", "New Condition", newCondition, "Existing Condition", matchedCondition)
log.V(2).Info("Conditions already match", "New Condition", newCondition, "Existing Condition", matchedCondition)
return false
}

Expand Down Expand Up @@ -265,7 +263,7 @@ func (r *SubnetSetReconciler) CollectGarbage(ctx context.Context) {
subnetSetList := &v1alpha1.SubnetSetList{}
err := r.Client.List(ctx, subnetSetList)
if err != nil {
log.Error(err, "failed to list SubnetSet CR")
log.Error(err, "Failed to list SubnetSet CR")
return
}
var nsxSubnetList []*model.VpcSubnet
Expand Down Expand Up @@ -297,38 +295,14 @@ func (r *SubnetSetReconciler) CollectGarbage(ctx context.Context) {
}
}

func (r *SubnetSetReconciler) cleanStaleSubnetsForSubnetSet(subnetSetName, ns, subnetsetID string) error {
nsxStaleSubnets := r.SubnetService.SubnetStore.GetByIndex(servicecommon.TagScopeSubnetSetCRNamespacedName, util.CombineNamespaceName(subnetSetName, ns))
nsxSubnets := r.SubnetService.SubnetStore.GetByIndex(servicecommon.TagScopeSubnetSetCRUID, subnetsetID)
subnetIDs := sets.New[string]()
for _, subnet := range nsxSubnets {
subnetIDs.Insert(*subnet.Id)
}
subnetToDelete := []*model.VpcSubnet{}
for _, staleSubnet := range nsxStaleSubnets {
if !subnetIDs.Has(*staleSubnet.Id) {
subnetToDelete = append(subnetToDelete, staleSubnet)
}
}
if err := r.deleteSubnets(subnetToDelete); err != nil {
log.Error(err, "Failed to delete stale subnets", "SubnetSet", subnetSetName, "Namespace", ns)
return err
}
log.Info("Successfully cleaned stale subnets for SubnetSet", "SubnetSet", subnetSetName, "Namespace", ns, "num", len(subnetToDelete))
return nil
}

func (r *SubnetSetReconciler) deleteSubnetBySubnetSetName(subnetSetName, ns string) error {
nsxSubnets := r.SubnetService.SubnetStore.GetByIndex(servicecommon.TagScopeSubnetSetCRNamespacedName, util.CombineNamespaceName(subnetSetName, ns))
if err := r.deleteSubnets(nsxSubnets); err != nil {
return err
}
return nil
return r.deleteStaleSubnets(nsxSubnets)
}

func (r *SubnetSetReconciler) deleteSubnetForSubnetSet(obj v1alpha1.SubnetSet, updateStatus bool) error {
nsxSubnets := r.SubnetService.SubnetStore.GetByIndex(servicecommon.TagScopeSubnetSetCRUID, string(obj.GetUID()))
if err := r.deleteSubnets(nsxSubnets); err != nil {
if err := r.deleteStaleSubnets(nsxSubnets); err != nil {
return err
}
if updateStatus {
Expand All @@ -340,6 +314,20 @@ func (r *SubnetSetReconciler) deleteSubnetForSubnetSet(obj v1alpha1.SubnetSet, u
return nil
}

func (r *SubnetSetReconciler) listSubnetIDsFromCRs(ctx context.Context) ([]string, error) {
crdSubnetList := &v1alpha1.SubnetList{}
err := r.Client.List(ctx, crdSubnetList)
if err != nil {
return nil, err
}

crdSubnetIDs := make([]string, 0, len(crdSubnetList.Items))
for _, sr := range crdSubnetList.Items {
crdSubnetIDs = append(crdSubnetIDs, string(sr.UID))
}
return crdSubnetIDs, nil
}

func (r *SubnetSetReconciler) deleteSubnets(nsxSubnets []*model.VpcSubnet) error {
for _, nsxSubnet := range nsxSubnets {
portNums := len(r.SubnetPortService.GetPortsOfSubnet(*nsxSubnet.Id))
Expand All @@ -358,6 +346,26 @@ func (r *SubnetSetReconciler) deleteSubnets(nsxSubnets []*model.VpcSubnet) error
return nil
}

func (r *SubnetSetReconciler) deleteStaleSubnets(nsxSubnets []*model.VpcSubnet) error {
crdSubnetIDs, err := r.listSubnetIDsFromCRs(context.Background())
if err != nil {
log.Error(err, "Failed to list Subnet CRs")
return err
}
crdSubnetIDsSet := sets.NewString(crdSubnetIDs...)
nsxSubnetsToDelete := make([]*model.VpcSubnet, 0, len(nsxSubnets))
for _, nsxSubnet := range nsxSubnets {
uid := nsxutil.FindTag(nsxSubnet.Tags, servicecommon.TagScopeSubnetCRUID)
if crdSubnetIDsSet.Has(uid) {
log.Info("Skipping deletion, Subnet CR still exists in K8s", "ID", *nsxSubnet.Id)
continue
}
nsxSubnetsToDelete = append(nsxSubnetsToDelete, nsxSubnet)
}
log.Info("Cleaning stale Subnets for SubnetSet", "Count", len(nsxSubnetsToDelete))
return r.deleteSubnets(nsxSubnetsToDelete)
}

func StartSubnetSetController(mgr ctrl.Manager, subnetService *subnet.SubnetService,
subnetPortService servicecommon.SubnetPortServiceProvider, vpcService servicecommon.VPCServiceProvider,
enableWebhook bool,
Expand Down

0 comments on commit f25edd1

Please sign in to comment.