Skip to content

Commit

Permalink
Implement the reconciler for SubnetConnectionBindingMap
Browse files Browse the repository at this point in the history
1. Implement the reconciler for SubnetConnectionBindingMap, it may update the
SubnetConnectionBindingMap status with condition ready is false if its dependent
Subnet or SubnetSet is not ready (or realized) or it hits errors when realizing
NSX SubnetConnectionBindingMaps. It updates the status with ready condition as
true if it is successfully realized on NSX. The reconciler also watches the
Subnet/SubnetSet CR events to sync the connection binding maps.
2. The change also modifies the Subnet/SubnetSet reconciler to watch
SubnetConnectionBindingMap CR events. If a Subnet/SubnetSet is used by a
SubnetConnectionBindingMap, a finalizer is added on the corresponding
Subnet/SubnetSet CR, and the finalizer is removed automatically if the CR is not
used by any SubnetConnectionBindingMaps.
  • Loading branch information
wenyingd committed Nov 28, 2024
1 parent a3a6318 commit 3773b1f
Show file tree
Hide file tree
Showing 34 changed files with 4,880 additions and 104 deletions.
14 changes: 12 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ import (
"github.com/vmware-tanzu/nsx-operator/pkg/controllers/service"
staticroutecontroller "github.com/vmware-tanzu/nsx-operator/pkg/controllers/staticroute"
"github.com/vmware-tanzu/nsx-operator/pkg/controllers/subnet"
subnetbindingcontroller "github.com/vmware-tanzu/nsx-operator/pkg/controllers/subnetbinding"
"github.com/vmware-tanzu/nsx-operator/pkg/controllers/subnetport"
"github.com/vmware-tanzu/nsx-operator/pkg/controllers/subnetset"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/ipblocksinfo"
nodeservice "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/node"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/staticroute"
subnetservice "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/subnet"
subnetbindingservice "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/subnetbinding"
subnetportservice "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/subnetport"

commonctl "github.com/vmware-tanzu/nsx-operator/pkg/controllers/common"
Expand Down Expand Up @@ -232,6 +234,13 @@ func startServiceController(mgr manager.Manager, nsxClient *nsx.Client) {
os.Exit(1)
}
ipblocksInfoService := ipblocksinfo.InitializeIPBlocksInfoService(commonService)

subnetBindingService, err := subnetbindingservice.InitializeService(commonService)
if err != nil {
log.Error(err, "Failed to initialize SubnetConnectionBindingMap commonService")
os.Exit(1)
}

// Start controllers which only supports VPC
StartNetworkInfoController(mgr, vpcService, ipblocksInfoService)
StartNamespaceController(mgr, cf, vpcService)
Expand All @@ -250,10 +259,10 @@ func startServiceController(mgr manager.Manager, nsxClient *nsx.Client) {
}
}
// Start Subnet/SubnetSet controller.
if err := subnet.StartSubnetController(mgr, subnetService, subnetPortService, vpcService, hookServer); err != nil {
if err := subnet.StartSubnetController(mgr, subnetService, subnetPortService, vpcService, subnetBindingService, hookServer); err != nil {
os.Exit(1)
}
if err := subnetset.StartSubnetSetController(mgr, subnetService, subnetPortService, vpcService, hookServer); err != nil {
if err := subnetset.StartSubnetSetController(mgr, subnetService, subnetPortService, vpcService, subnetBindingService, hookServer); err != nil {
os.Exit(1)
}

Expand All @@ -264,6 +273,7 @@ func startServiceController(mgr manager.Manager, nsxClient *nsx.Client) {
StartIPAddressAllocationController(mgr, ipAddressAllocationService, vpcService)
networkpolicycontroller.StartNetworkPolicyController(mgr, commonService, vpcService)
service.StartServiceLbController(mgr, commonService)
subnetbindingcontroller.StartSubnetBindingController(mgr, subnetService, subnetBindingService)
}
// Start controllers which can run in non-VPC mode
securitypolicycontroller.StartSecurityPolicyController(mgr, commonService, vpcService)
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/vpc/v1alpha1/condition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
GatewayConnectionReady ConditionType = "GatewayConnectionReady"
AutoSnatEnabled ConditionType = "AutoSnatEnabled"
ExternalIPBlocksConfigured ConditionType = "ExternalIPBlocksConfigured"
DeleteFailure ConditionType = "DeletionFailed"
)

// Condition defines condition of custom resource.
Expand Down
7 changes: 7 additions & 0 deletions pkg/clean/clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/securitypolicy"
sr "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/staticroute"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/subnet"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/subnetbinding"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/subnetport"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/vpc"
nsxutil "github.com/vmware-tanzu/nsx-operator/pkg/nsx/util"
Expand Down Expand Up @@ -168,9 +169,15 @@ func InitializeCleanupService(cf *config.NSXOperatorConfig, nsxClient *nsx.Clien
return ipaddressallocation.InitializeIPAddressAllocation(service, vpcService, true)
}
}
wrapInitializeSubnetBinding := func(service common.Service) cleanupFunc {
return func() (cleanup, error) {
return subnetbinding.InitializeService(service)
}
}
// TODO: initialize other CR services
cleanupService = cleanupService.
AddCleanupService(wrapInitializeSubnetPort(commonService)).
AddCleanupService(wrapInitializeSubnetBinding(commonService)).
AddCleanupService(wrapInitializeSubnetService(commonService)).
AddCleanupService(wrapInitializeSecurityPolicy(commonService)).
AddCleanupService(wrapInitializeStaticRoute(commonService)).
Expand Down
11 changes: 9 additions & 2 deletions pkg/clean/clean_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/securitypolicy"
sr "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/staticroute"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/subnet"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/subnetbinding"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/subnetport"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/vpc"
)
Expand Down Expand Up @@ -180,11 +181,14 @@ func TestInitializeCleanupService_Success(t *testing.T) {
patches.ApplyFunc(ipaddressallocation.InitializeIPAddressAllocation, func(service common.Service, vpcService common.VPCServiceProvider, flag bool) (*ipaddressallocation.IPAddressAllocationService, error) {
return &ipaddressallocation.IPAddressAllocationService{}, nil
})
patches.ApplyFunc(subnetbinding.InitializeService, func(service common.Service) (*subnetbinding.BindingService, error) {
return &subnetbinding.BindingService{}, nil
})

cleanupService, err := InitializeCleanupService(cf, nsxClient)
assert.NoError(t, err)
assert.NotNil(t, cleanupService)
assert.Len(t, cleanupService.cleans, 6)
assert.Len(t, cleanupService.cleans, 7)
}

func TestInitializeCleanupService_VPCError(t *testing.T) {
Expand Down Expand Up @@ -214,10 +218,13 @@ func TestInitializeCleanupService_VPCError(t *testing.T) {
patches.ApplyFunc(ipaddressallocation.InitializeIPAddressAllocation, func(service common.Service, vpcService common.VPCServiceProvider, flag bool) (*ipaddressallocation.IPAddressAllocationService, error) {
return &ipaddressallocation.IPAddressAllocationService{}, nil
})
patches.ApplyFunc(subnetbinding.InitializeService, func(service common.Service) (*subnetbinding.BindingService, error) {
return &subnetbinding.BindingService{}, nil
})

cleanupService, err := InitializeCleanupService(cf, nsxClient)
assert.NoError(t, err)
assert.NotNil(t, cleanupService)
assert.Len(t, cleanupService.cleans, 4)
assert.Len(t, cleanupService.cleans, 5)
assert.Equal(t, expectedError, cleanupService.err)
}
91 changes: 91 additions & 0 deletions pkg/controllers/common/dependency_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package common

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/vmware-tanzu/nsx-operator/pkg/apis/vpc/v1alpha1"
)

type RequeueObjectByEvent func(ctx context.Context, c client.Client, obj client.Object, q workqueue.TypedRateLimitingInterface[reconcile.Request])
type RequeueObjectByUpdate func(ctx context.Context, c client.Client, objOld client.Object, objNew client.Object, q workqueue.TypedRateLimitingInterface[reconcile.Request])

type EnqueueRequestForDependency struct {
Client client.Client
ResourceType string
RequeueByCreate RequeueObjectByEvent
RequeueByDelete RequeueObjectByEvent
RequeueByUpdate RequeueObjectByUpdate
}

func (e *EnqueueRequestForDependency) Create(ctx context.Context, ev event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
obj := ev.Object
log.V(1).Info(fmt.Sprintf("%s create event", e.ResourceType), "Namespace", obj.GetNamespace(), "Name", obj.GetName())
if e.RequeueByCreate != nil {
e.RequeueByCreate(ctx, e.Client, obj, q)
}
}

func (e *EnqueueRequestForDependency) Delete(ctx context.Context, ev event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
obj := ev.Object
log.V(1).Info(fmt.Sprintf("%s delete event", e.ResourceType), "Namespace", obj.GetNamespace(), "Name", obj.GetName())
if e.RequeueByDelete != nil {
e.RequeueByDelete(ctx, e.Client, obj, q)
}
}

func (e *EnqueueRequestForDependency) Generic(_ context.Context, _ event.GenericEvent, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
log.V(1).Info(fmt.Sprintf("%s generic event, do nothing", e.ResourceType))
}

func (e *EnqueueRequestForDependency) Update(ctx context.Context, ev event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
objNew := ev.ObjectNew
log.V(1).Info(fmt.Sprintf("%s update event", e.ResourceType), "Namespace", objNew.GetNamespace(), "Name", objNew.GetName())
if e.RequeueByUpdate != nil {
objOld := ev.ObjectOld
e.RequeueByUpdate(ctx, e.Client, objOld, objNew, q)
}
}

func IsObjectUpdateToReady(oldConditions []v1alpha1.Condition, newConditions []v1alpha1.Condition) bool {
return !IsObjectReady(oldConditions) && IsObjectReady(newConditions)
}

func IsObjectUpdateToUnready(oldConditions []v1alpha1.Condition, newConditions []v1alpha1.Condition) bool {
return IsObjectReady(oldConditions) && !IsObjectReady(newConditions)
}

func IsObjectReady(conditions []v1alpha1.Condition) bool {
for _, con := range conditions {
if con.Type == v1alpha1.Ready && con.Status == corev1.ConditionTrue {
return true
}
}
return false
}

var PredicateFuncsWithSubnetBindings = predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return true
},
UpdateFunc: func(e event.UpdateEvent) bool {
oldBindingMap := e.ObjectOld.(*v1alpha1.SubnetConnectionBindingMap)
newBindingMap := e.ObjectNew.(*v1alpha1.SubnetConnectionBindingMap)
if oldBindingMap.Spec.TargetSubnetSetName != newBindingMap.Spec.TargetSubnetSetName ||
oldBindingMap.Spec.TargetSubnetName != newBindingMap.Spec.TargetSubnetName {
return true
}
return false
},
DeleteFunc: func(e event.DeleteEvent) bool { return true },
GenericFunc: func(e event.GenericEvent) bool {
return false
},
}
Loading

0 comments on commit 3773b1f

Please sign in to comment.