Skip to content

Commit

Permalink
Add support to clean Np indexers based on account
Browse files Browse the repository at this point in the history
- When CPA is deleted, it will notify a local event
to np controller.
- As part of processing local event, np controller
will clear entries corresponding to the account from
appliedToSGIndexer, addrSGIndexer, cloudRuleIndexer and
networkPolicyIndexer.
- Remove AT/AG group from retryQueue based on account
- Remomve groups from pendingDeleteGroups based on account

Signed-off-by: Anand Kumar <[email protected]>
  • Loading branch information
Anandkumar26 committed Jul 7, 2023
1 parent 291bf6b commit 5cb2c92
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 31 deletions.
37 changes: 19 additions & 18 deletions cmd/nephe-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,19 @@ func main() {
}
vmController.ConfigureConverterAndStart()

npController := &networkpolicy.NetworkPolicyReconciler{
Client: mgr.GetClient(),
Log: logging.GetLogger("controllers").WithName("NetworkPolicy"),
Scheme: mgr.GetScheme(),
CloudSyncInterval: opts.config.CloudSyncInterval,
Inventory: cloudInventory,
}

if err = npController.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "NetworkPolicy")
os.Exit(1)
}

if err = (&cloudentityselector.CloudEntitySelectorReconciler{
Client: mgr.GetClient(),
Log: logging.GetLogger("controllers").WithName("CloudEntitySelector"),
Expand All @@ -138,29 +151,17 @@ func main() {
}

if err = (&cloudprovideraccount.CloudProviderAccountReconciler{
Client: mgr.GetClient(),
Log: logging.GetLogger("controllers").WithName("CloudProviderAccount"),
Scheme: mgr.GetScheme(),
AccManager: accountManager,
Mgr: &mgr,
Client: mgr.GetClient(),
Log: logging.GetLogger("controllers").WithName("CloudProviderAccount"),
Scheme: mgr.GetScheme(),
AccManager: accountManager,
Mgr: &mgr,
NpController: npController,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "CloudProviderAccount")
os.Exit(1)
}

npController := &networkpolicy.NetworkPolicyReconciler{
Client: mgr.GetClient(),
Log: logging.GetLogger("controllers").WithName("NetworkPolicy"),
Scheme: mgr.GetScheme(),
CloudSyncInterval: opts.config.CloudSyncInterval,
Inventory: cloudInventory,
}

if err = npController.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "NetworkPolicy")
os.Exit(1)
}

if err = (&apiserver.NepheControllerAPIServer{}).SetupWithManager(mgr,
npController.GetVirtualMachinePolicyIndexer(), cloudInventory, defaultCertDir, logging.GetLogger("apiServer")); err != nil {
setupLog.Error(err, "unable to create APIServer")
Expand Down
8 changes: 8 additions & 0 deletions pkg/controllers/cloudprovideraccount/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

crdv1alpha1 "antrea.io/nephe/apis/crd/v1alpha1"
"antrea.io/nephe/pkg/accountmanager"
"antrea.io/nephe/pkg/controllers/networkpolicy"
controllersync "antrea.io/nephe/pkg/controllers/sync"
"antrea.io/nephe/pkg/util"
"antrea.io/nephe/pkg/util/env"
Expand All @@ -55,6 +56,7 @@ type CloudProviderAccountReconciler struct {
initialized bool
watcher watch.Interface
clientset kubernetes.Interface
NpController networkpolicy.NetworkPolicyController
}

// nolint:lll
Expand Down Expand Up @@ -151,6 +153,12 @@ func (r *CloudProviderAccountReconciler) processCreateOrUpdate(namespacedName *t
}

func (r *CloudProviderAccountReconciler) processDelete(namespacedName *types.NamespacedName) error {
deletedCpa := &crdv1alpha1.CloudProviderAccount{
ObjectMeta: metav1.ObjectMeta{Name: namespacedName.Name, Namespace: namespacedName.Namespace},
}
r.Log.V(1).Info("Sending local event", "account", deletedCpa)
r.NpController.LocalEvent(watch.Event{Type: watch.Deleted, Object: deletedCpa})

if err := r.AccManager.RemoveAccount(namespacedName); err != nil {
return err
}
Expand Down
29 changes: 19 additions & 10 deletions pkg/controllers/cloudprovideraccount/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
fakewatch "k8s.io/client-go/kubernetes/fake"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -40,25 +41,27 @@ import (
antreanetworking "antrea.io/antrea/pkg/apis/controlplane/v1beta2"
antreatypes "antrea.io/antrea/pkg/apis/crd/v1alpha2"
"antrea.io/nephe/apis/crd/v1alpha1"
cloud "antrea.io/nephe/apis/crd/v1alpha1"
crdv1alpha1 "antrea.io/nephe/apis/crd/v1alpha1"
runtimev1alpha1 "antrea.io/nephe/apis/runtime/v1alpha1"
ctrlsync "antrea.io/nephe/pkg/controllers/sync"
mockaccmanager "antrea.io/nephe/pkg/testing/accountmanager"
mocknpcontroller "antrea.io/nephe/pkg/testing/networkpolicy"
"antrea.io/nephe/pkg/util"
"antrea.io/nephe/pkg/util/env"
)

var (
mockCtrl *mock.Controller
mockAccManager *mockaccmanager.MockInterface
scheme = runtime.NewScheme()
mockCtrl *mock.Controller
mockAccManager *mockaccmanager.MockInterface
mockNpController *mocknpcontroller.MockNetworkPolicyController
scheme = runtime.NewScheme()
)

var _ = BeforeSuite(func() {
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))
_ = clientgoscheme.AddToScheme(scheme)
_ = antreatypes.AddToScheme(scheme)
_ = cloud.AddToScheme(scheme)
_ = crdv1alpha1.AddToScheme(scheme)
_ = antreanetworking.AddToScheme(scheme)
})

Expand Down Expand Up @@ -88,12 +91,14 @@ var _ = Describe("CloudProviderAccount Controller", func() {
fakeClient = fake.NewClientBuilder().WithScheme(newScheme).Build()
mockCtrl = mock.NewController(GinkgoT())
mockAccManager = mockaccmanager.NewMockInterface(mockCtrl)
mockNpController = mocknpcontroller.NewMockNetworkPolicyController(mockCtrl)
reconciler = &CloudProviderAccountReconciler{
Log: logf.Log,
Client: fakeClient,
Scheme: scheme,
mutex: sync.Mutex{},
AccManager: mockAccManager,
Log: logf.Log,
Client: fakeClient,
Scheme: scheme,
mutex: sync.Mutex{},
AccManager: mockAccManager,
NpController: mockNpController,
}

pollIntv = 1
Expand Down Expand Up @@ -142,6 +147,10 @@ var _ = Describe("CloudProviderAccount Controller", func() {
Expect(err).ShouldNot(HaveOccurred())
mockAccManager.EXPECT().AddAccount(&testAccountNamespacedName, accountCloudType, account).Return(false, nil).Times(1)
mockAccManager.EXPECT().RemoveAccount(&testAccountNamespacedName).Return(nil).Times(1)
deletedCpa := &crdv1alpha1.CloudProviderAccount{
ObjectMeta: v1.ObjectMeta{Name: testAccountNamespacedName.Name, Namespace: testAccountNamespacedName.Namespace},
}
mockNpController.EXPECT().LocalEvent(watch.Event{Type: watch.Deleted, Object: deletedCpa}).Times(1)
err = reconciler.processCreateOrUpdate(&testAccountNamespacedName, account)
Expect(err).ShouldNot(HaveOccurred())

Expand Down
138 changes: 136 additions & 2 deletions pkg/controllers/networkpolicy/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
antreav1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1"
antreav1alpha2 "antrea.io/antrea/pkg/apis/crd/v1alpha2"
antreanetworkingclient "antrea.io/antrea/pkg/client/clientset/versioned/typed/controlplane/v1beta2"
crdv1alpha1 "antrea.io/nephe/apis/crd/v1alpha1"
"antrea.io/nephe/pkg/cloudprovider/cloudresource"
"antrea.io/nephe/pkg/cloudprovider/securitygroup"
"antrea.io/nephe/pkg/config"
Expand All @@ -45,6 +46,7 @@ import (
const (
NetworkPolicyStatusIndexerByNamespace = "namespace"
addrAppliedToIndexerByGroupID = "GroupID"
addrAppliedToIndexerByAccountId = "AccountId"
appliedToIndexerByAddrGroupRef = "AddressGrp"
networkPolicyIndexerByAddrGrp = "AddressGrp"
networkPolicyIndexerByAppliedToGrp = "AppliedToGrp"
Expand All @@ -56,7 +58,7 @@ const (
cloudResponseChBuffer = 50

// NetworkPolicy controller is ready to sync after it receives bookmarks from
// networkpolicy, addrssGroup and appliedToGroup.
// networkpolicy, addressGroup and appliedToGroup.
npSyncReadyBookMarkCnt = 3
)

Expand Down Expand Up @@ -466,7 +468,9 @@ func (r *NetworkPolicyReconciler) processLocalEvent(event watch.Event) error {
return r.processAppliedToGroup(event)
case *antreanetworking.AppliedToGroupPatch:
return r.processAppliedToGroup(event)

case *crdv1alpha1.CloudProviderAccount:
cpa := event.Object.(*crdv1alpha1.CloudProviderAccount)
return r.removeIndexerObjectsByAccount(types.NamespacedName{Name: cpa.Name, Namespace: cpa.Namespace}.String())
default:
r.Log.Error(nil, "Unknown local event", "Event", event)
}
Expand Down Expand Up @@ -622,6 +626,13 @@ func (r *NetworkPolicyReconciler) SetupWithManager(mgr ctrl.Manager) error {
addrGrp := obj.(*addrSecurityGroup)
return []string{addrGrp.id.Name}, nil
},
addrAppliedToIndexerByAccountId: func(obj interface{}) ([]string, error) {
addrGrp := obj.(*addrSecurityGroup)
for _, member := range addrGrp.members {
return []string{member.AccountID}, nil
}
return []string{}, nil
},
})
r.appliedToSGIndexer = cache.NewIndexer(
// Each appliedToSecurityGroup is uniquely identified by its ID.
Expand All @@ -643,6 +654,13 @@ func (r *NetworkPolicyReconciler) SetupWithManager(mgr ctrl.Manager) error {
}
return addrGrps, nil
},
addrAppliedToIndexerByAccountId: func(obj interface{}) ([]string, error) {
appliedToGrp := obj.(*appliedToSecurityGroup)
for _, member := range appliedToGrp.members {
return []string{member.AccountID}, nil
}
return []string{}, nil
},
},
)
r.networkPolicyIndexer = cache.NewIndexer(
Expand Down Expand Up @@ -764,3 +782,119 @@ func (r *NetworkPolicyReconciler) resetWatchers() error {
func (r *NetworkPolicyReconciler) GetVirtualMachinePolicyIndexer() cache.Indexer {
return r.virtualMachinePolicyIndexer
}

// removeIndexerObjectsByAccount removes entries based on account, from all the np
// controller indexers
func (r *NetworkPolicyReconciler) removeIndexerObjectsByAccount(namespacedName string) error {
r.Log.Info("Clearing indexers for the account", "account", namespacedName)
atSgs, err := r.appliedToSGIndexer.ByIndex(addrAppliedToIndexerByAccountId, namespacedName)
if err != nil {
r.Log.Error(err, "failed to get appliedToGroups from indexer", "account", namespacedName)
}

if len(atSgs) > 0 {
for _, obj := range atSgs {
atSg, ok := obj.(*appliedToSecurityGroup)
if !ok {
continue
}

// Delete NPs based on AT.
nps, err := r.networkPolicyIndexer.ByIndex(networkPolicyIndexerByAppliedToGrp, atSg.id.Name)
if err != nil {
r.Log.Error(err, "failed to get np from indexer", "appliedToGroup", atSg.id.Name)
}
for _, obj := range nps {
np, ok := obj.(*networkPolicy)
if !ok {
continue
}

r.Log.V(1).Info("Deleting np from indexer", "np", np.Name)
if err := r.networkPolicyIndexer.Delete(np); err != nil {
r.Log.Error(err, "failed to delete np from indexer", "np", np.Name)
}
}

// Delete CloudRules based on AT.
rules, err := r.cloudRuleIndexer.ByIndex(cloudRuleIndexerByAppliedToGrp, atSg.id.CloudResourceID.String())
if err != nil {
r.Log.Error(err, "failed to get cloud rules from indexer", "appliedToGroup", atSg.id.CloudResourceID.String())
}
for _, obj := range rules {
rule, ok := obj.(*cloudresource.CloudRule)
if !ok {
continue
}

r.Log.V(1).Info("Deleting cloud rule from indexer", "rule", rule.NpNamespacedName)
if err := r.cloudRuleIndexer.Delete(rule); err != nil {
r.Log.Error(err, "failed to delete cloud rule from indexer",
"appliedToGroup", atSg.id.CloudResourceID.String(), "rule", rule.NpNamespacedName)
}
}

// Delete AT.
r.Log.V(1).Info("Deleting appliedToGroup from indexer", "appliedToGroup", atSg.id.Name)
if err := r.appliedToSGIndexer.Delete(atSg); err != nil {
r.Log.Error(err, "failed to delete appliedToGroup from indexer", "atSg", atSg.id.Name)
}
}
}

// Delete AG.
agSgs, err := r.addrSGIndexer.ByIndex(addrAppliedToIndexerByAccountId, namespacedName)
if err != nil {
r.Log.Error(err, "failed to get addressGroups from indexer", "account", namespacedName)
}
if len(agSgs) > 0 {
for _, obj := range agSgs {
agSg, ok := obj.(*addrSecurityGroup)
if !ok {
continue
}

r.Log.V(1).Info("Deleting addressGroup from indexer", "addressGroup", agSg.id.Name)
if err := r.addrSGIndexer.Delete(agSg); err != nil {
r.Log.Error(err, "failed to delete addressGroup from indexer", "addressGroup", agSg.id.Name)
}
}
}

// Remove AT and AG group from retryQueue.
for _, item := range r.retryQueue.items {
agSg, ok := item.PendingItem.(*addrSecurityGroup)
if ok {
uName := getGroupUniqueName(agSg.id.CloudResourceID.String(), true)
for _, member := range agSg.members {
if member.AccountID == namespacedName {
r.Log.V(1).Info("Removing addressGroup from retryQueue", "uName", uName)
r.retryQueue.Remove(uName)
break
}
}
}

atSg, ok := item.PendingItem.(*appliedToSecurityGroup)
if ok {
uName := getGroupUniqueName(atSg.id.CloudResourceID.String(), false)
for _, member := range atSg.members {
if member.AccountID == namespacedName {
r.Log.V(1).Info("Removing appliedToGroup from retryQueue", "uName", uName)
r.retryQueue.Remove(uName)
break
}
}
}
}

for _, item := range r.pendingDeleteGroups.items {
group, ok := item.PendingItem.(*pendingGroup)
if ok && group.account == namespacedName {
r.Log.V(1).Info("Removing group from pendingDeleteGroups", "groupName", group.id)
r.pendingDeleteGroups.Remove(group.id)
}
}

return nil
}
6 changes: 5 additions & 1 deletion pkg/controllers/networkpolicy/networkpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,11 @@ func (s *securityGroupImpl) deleteImpl(c cloudSecurityGroup, membershipOnly bool
uName := getGroupUniqueName(s.id.CloudResourceID.String(), membershipOnly)
guName := getGroupUniqueName(s.id.Name, membershipOnly)
if !r.pendingDeleteGroups.Has(guName) {
r.pendingDeleteGroups.Add(guName, &pendingGroup{refCnt: new(int)})
var accountId string
if len(s.members) > 0 {
accountId = s.members[0].AccountID
}
r.pendingDeleteGroups.Add(guName, &pendingGroup{id: guName, refCnt: new(int), account: accountId})
}
var nps []interface{}
if s.state != securityGroupStateGarbageCollectState {
Expand Down
3 changes: 3 additions & 0 deletions pkg/controllers/networkpolicy/pendingitem.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ type pendingGroup struct {
event watch.EventType
addedMembers map[string]antreanetworking.GroupMember
removedMembers map[string]antreanetworking.GroupMember
// Used to clean up indexers when only when account is deleted.
account string
id string
}

func (p *pendingGroup) RunPendingItem(id string, context interface{}) bool {
Expand Down

0 comments on commit 5cb2c92

Please sign in to comment.