Skip to content

Commit

Permalink
Use NEG CRs for NEG Garbage Collection
Browse files Browse the repository at this point in the history
 - Add finalizer to NEG CRs to ensure CR exists until NEG is deleted
 during GC
 - When neg crd is enabled, GC will use neg crs, otherwise will fallback
 to regular GC using the naming to determine which NEGs are to be
 deleted
  • Loading branch information
swetharepakula committed Jul 31, 2020
1 parent d14b9c9 commit 49c862c
Show file tree
Hide file tree
Showing 4 changed files with 349 additions and 7 deletions.
95 changes: 92 additions & 3 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"reflect"
"sync"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -39,6 +40,8 @@ import (
negsyncer "k8s.io/ingress-gce/pkg/neg/syncers"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/common"
"k8s.io/klog"
utilpointer "k8s.io/utils/pointer"
)
Expand Down Expand Up @@ -236,10 +239,16 @@ func (manager *syncerManager) GC() error {
manager.garbageCollectSyncer()

// Garbage collect NEGs
if err := manager.garbageCollectNEG(); err != nil {
return fmt.Errorf("failed to garbage collect negs: %v", err)
var err error
if manager.svcNegClient != nil {
err = manager.garbageCollectNEGWithCRD()
} else {
err = manager.garbageCollectNEG()
}
return nil
if err != nil {
err = fmt.Errorf("failed to garbage collect negs: %v", err)
}
return err
}

// ReadinessGateEnabledNegs returns a list of NEGs which has readiness gate enabled for the input pod's namespace and labels.
Expand Down Expand Up @@ -370,6 +379,62 @@ func (manager *syncerManager) garbageCollectNEG() error {
return nil
}

// garbageCollectNEGWithCRD uses the NEG CRs and the svcPortMap to determine which NEGs
// need to be garbage collected. Neg CRs that do not have a configuration in the svcPortMap will deleted
// along with all corresponding NEGs in the CR's list of NetworkEndpointGroups. If NEG deletion fails in
// the cloud, the corresponding Neg CR will not be deleted
func (manager *syncerManager) garbageCollectNEGWithCRD() error {
deletionCandidates := map[string]*negv1beta1.ServiceNetworkEndpointGroup{}
negCRs := manager.svcNegLister.List()
for _, obj := range negCRs {
neg := obj.(*negv1beta1.ServiceNetworkEndpointGroup)
deletionCandidates[neg.Name] = neg
}

func() {
manager.mu.Lock()
defer manager.mu.Unlock()
for _, portInfoMap := range manager.svcPortMap {
for _, portInfo := range portInfoMap {
// only keep in deletionCandidates if neg does not exist in svcPortMap
if _, ok := deletionCandidates[portInfo.NegName]; ok {
delete(deletionCandidates, portInfo.NegName)
}
}
}
}()

var errList []error
for _, cr := range deletionCandidates {
shouldDeleteNegCR := true
for _, negRef := range cr.Status.NetworkEndpointGroups {
resourceID, err := cloud.ParseResourceURL(negRef.SelfLink)
if err != nil {
errList = append(errList, fmt.Errorf("failed to parse selflink for neg cr %s/%s: %s", cr.Namespace, cr.Name, err))
continue
}

if err := manager.ensureDeleteNetworkEndpointGroup(resourceID.Key.Name, resourceID.Key.Zone); err != nil {
err = fmt.Errorf("failed to delete NEG %s in %s: %s", resourceID.Key.Name, resourceID.Key.Zone, err)
manager.recorder.Eventf(cr, v1.EventTypeWarning, negtypes.NegGCError, err.Error())
errList = append(errList, err)

// Error when deleting NEG, do not delete Neg CR
shouldDeleteNegCR = false
}
}

if !shouldDeleteNegCR {
continue
}

if err := deleteSvcNegCR(manager.svcNegClient, cr); err != nil {
errList = append(errList, err)
}
}
return utilerrors.NewAggregate(errList)
}

// ensureDeleteNetworkEndpointGroup ensures neg is delete from zone
func (manager *syncerManager) ensureDeleteNetworkEndpointGroup(name, zone string) error {
_, err := manager.cloud.GetNetworkEndpointGroup(name, zone, meta.VersionGA)
Expand Down Expand Up @@ -416,6 +481,7 @@ func (manager *syncerManager) ensureSvcNegCR(svcKey serviceKey, portInfo negtype
Namespace: svcKey.namespace,
OwnerReferences: []metav1.OwnerReference{*ownerReference},
Labels: labels,
Finalizers: []string{common.NegFinalizerKey},
},
}

Expand Down Expand Up @@ -473,6 +539,29 @@ func ensureNegCROwnerRef(negCR *negv1beta1.ServiceNetworkEndpointGroup, expected
return false
}

// deleteSvcNegCR will remove finalizers on the given negCR and if deletion timestamp is not set, will delete it as well
func deleteSvcNegCR(svcNegClient svcnegclient.Interface, negCR *negv1beta1.ServiceNetworkEndpointGroup) error {
updatedCR := negCR.DeepCopy()
updatedCR.Finalizers = []string{}
patchNegStatus(svcNegClient, *negCR, *updatedCR)

// If CR does not have a deletion timestamp, delete
if negCR.GetDeletionTimestamp().IsZero() {
return svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(negCR.Namespace).Delete(context.Background(), negCR.Name, metav1.DeleteOptions{})
}
return nil
}

// patchNegStatus patches the specified NegCR status with the provided new status
func patchNegStatus(svcNegClient svcnegclient.Interface, oldNeg, newNeg negv1beta1.ServiceNetworkEndpointGroup) (*negv1beta1.ServiceNetworkEndpointGroup, error) {
patchBytes, err := utils.MergePatchBytes(oldNeg, newNeg)
if err != nil {
return nil, fmt.Errorf("failed to prepare patch bytes: %s", err)
}

return svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(oldNeg.Namespace).Patch(context.Background(), oldNeg.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
}

// getSyncerKey encodes a service namespace, name, service port and targetPort into a string key
func getSyncerKey(namespace, name string, servicePortKey negtypes.PortInfoMapKey, portInfo negtypes.PortInfo) negtypes.NegSyncerKey {
networkEndpointType := negtypes.VmIpPortEndpointType
Expand Down
Loading

0 comments on commit 49c862c

Please sign in to comment.