Skip to content

Commit

Permalink
Merge pull request #4678 from jwtty/cp_plsrg_1.27
Browse files Browse the repository at this point in the history
[release-1.27] feat: add annotation to control pls creation rg
  • Loading branch information
k8s-ci-robot authored Sep 21, 2023
2 parents c4dac73 + 5c32251 commit a962e3d
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 21 deletions.
3 changes: 3 additions & 0 deletions pkg/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,9 @@ const (
// ServiceAnnotationPLSCreation determines whether a PLS needs to be created.
ServiceAnnotationPLSCreation = "service.beta.kubernetes.io/azure-pls-create"

// ServiceAnnotationPLSResourceGroup determines the resource group to create the PLS in.
ServiceAnnotationPLSResourceGroup = "service.beta.kubernetes.io/azure-pls-resource-group"

// ServiceAnnotationPLSName determines name of the PLS resource to create.
ServiceAnnotationPLSName = "service.beta.kubernetes.io/azure-pls-name"

Expand Down
2 changes: 1 addition & 1 deletion pkg/provider/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ type Cloud struct {
// key: [resourceGroupName]
// Value: sync.Map of [pipName]*PublicIPAddress
pipCache *azcache.TimedCache
// use LB frontEndIpConfiguration ID as the key and search for PLS attached to the frontEnd
// use [resourceGroupName*LBFrontEndIpConfigurationID] as the key and search for PLS attached to the frontEnd
plsCache *azcache.TimedCache

// Add service lister to always get latest service
Expand Down
20 changes: 10 additions & 10 deletions pkg/provider/azure_backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,14 +572,14 @@ func (az *Cloud) CreateOrUpdateVMSS(resourceGroupName string, VMScaleSetName str
return nil
}

func (az *Cloud) CreateOrUpdatePLS(service *v1.Service, pls network.PrivateLinkService) error {
func (az *Cloud) CreateOrUpdatePLS(service *v1.Service, resourceGroup string, pls network.PrivateLinkService) error {
ctx, cancel := getContextWithCancel()
defer cancel()

rerr := az.PrivateLinkServiceClient.CreateOrUpdate(ctx, az.PrivateLinkServiceResourceGroup, pointer.StringDeref(pls.Name, ""), pls, pointer.StringDeref(pls.Etag, ""))
rerr := az.PrivateLinkServiceClient.CreateOrUpdate(ctx, resourceGroup, pointer.StringDeref(pls.Name, ""), pls, pointer.StringDeref(pls.Etag, ""))
if rerr == nil {
// Invalidate the cache right after updating
_ = az.plsCache.Delete(pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, ""))
_ = az.plsCache.Delete(getPLSCacheKey(resourceGroup, pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, "")))
return nil
}

Expand All @@ -589,26 +589,26 @@ func (az *Cloud) CreateOrUpdatePLS(service *v1.Service, pls network.PrivateLinkS
// Invalidate the cache because etag mismatch.
if rerr.HTTPStatusCode == http.StatusPreconditionFailed {
klog.V(3).Infof("Private link service cache for %s is cleanup because of http.StatusPreconditionFailed", pointer.StringDeref(pls.Name, ""))
_ = az.plsCache.Delete(pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, ""))
_ = az.plsCache.Delete(getPLSCacheKey(resourceGroup, pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, "")))
}
// Invalidate the cache because another new operation has canceled the current request.
if strings.Contains(strings.ToLower(rerr.Error().Error()), consts.OperationCanceledErrorMessage) {
klog.V(3).Infof("Private link service for %s is cleanup because CreateOrUpdatePrivateLinkService is canceled by another operation", pointer.StringDeref(pls.Name, ""))
_ = az.plsCache.Delete(pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, ""))
_ = az.plsCache.Delete(getPLSCacheKey(resourceGroup, pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, "")))
}
klog.Errorf("PrivateLinkServiceClient.CreateOrUpdate(%s) failed: %v", pointer.StringDeref(pls.Name, ""), rerr.Error())
return rerr.Error()
}

// DeletePLS invokes az.PrivateLinkServiceClient.Delete with exponential backoff retry
func (az *Cloud) DeletePLS(service *v1.Service, plsName string, plsLBFrontendID string) *retry.Error {
func (az *Cloud) DeletePLS(service *v1.Service, resourceGroup, plsName, plsLBFrontendID string) *retry.Error {
ctx, cancel := getContextWithCancel()
defer cancel()

rerr := az.PrivateLinkServiceClient.Delete(ctx, az.PrivateLinkServiceResourceGroup, plsName)
rerr := az.PrivateLinkServiceClient.Delete(ctx, resourceGroup, plsName)
if rerr == nil {
// Invalidate the cache right after deleting
_ = az.plsCache.Delete(plsLBFrontendID)
_ = az.plsCache.Delete(getPLSCacheKey(resourceGroup, plsLBFrontendID))
return nil
}

Expand All @@ -618,11 +618,11 @@ func (az *Cloud) DeletePLS(service *v1.Service, plsName string, plsLBFrontendID
}

// DeletePEConn invokes az.PrivateLinkServiceClient.DeletePEConnection with exponential backoff retry
func (az *Cloud) DeletePEConn(service *v1.Service, plsName string, peConnName string) *retry.Error {
func (az *Cloud) DeletePEConn(service *v1.Service, resourceGroup, plsName, peConnName string) *retry.Error {
ctx, cancel := getContextWithCancel()
defer cancel()

rerr := az.PrivateLinkServiceClient.DeletePEConnection(ctx, az.PrivateLinkServiceResourceGroup, plsName, peConnName)
rerr := az.PrivateLinkServiceClient.DeletePEConnection(ctx, resourceGroup, plsName, peConnName)
if rerr == nil {
return nil
}
Expand Down
71 changes: 71 additions & 0 deletions pkg/provider/azure_backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/interfaceclient/mockinterfaceclient"
"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/loadbalancerclient/mockloadbalancerclient"
"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/privatelinkserviceclient/mockprivatelinkserviceclient"
"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/publicipclient/mockpublicipclient"
"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/routeclient/mockrouteclient"
"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/routetableclient/mockroutetableclient"
Expand Down Expand Up @@ -592,6 +593,76 @@ func TestCreateOrUpdateVMSS(t *testing.T) {
}
}

func TestCreateOrUpdatePLS(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

tests := []struct {
clientErr *retry.Error
expectedErr error
}{
{
clientErr: &retry.Error{HTTPStatusCode: http.StatusPreconditionFailed},
expectedErr: fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 412, RawError: %w", error(nil)),
},
{
clientErr: &retry.Error{RawError: fmt.Errorf(consts.OperationCanceledErrorMessage)},
expectedErr: fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 0, RawError: %w", fmt.Errorf("canceledandsupersededduetoanotheroperation")),
},
}

for _, test := range tests {
az := GetTestCloud(ctrl)
az.pipCache.Set("rg*frontendID", "test")

mockPLSClient := az.PrivateLinkServiceClient.(*mockprivatelinkserviceclient.MockInterface)
mockPLSClient.EXPECT().CreateOrUpdate(gomock.Any(), "rg", gomock.Any(), gomock.Any(), gomock.Any()).Return(test.clientErr)
mockPLSClient.EXPECT().List(gomock.Any(), az.ResourceGroup).Return([]network.PrivateLinkService{}, nil)

err := az.CreateOrUpdatePLS(&v1.Service{}, "rg", network.PrivateLinkService{
Name: pointer.String("pls"),
Etag: pointer.String("etag"),
PrivateLinkServiceProperties: &network.PrivateLinkServiceProperties{
LoadBalancerFrontendIPConfigurations: &[]network.FrontendIPConfiguration{
{
ID: pointer.String("frontendID"),
},
},
},
})
assert.EqualError(t, test.expectedErr, err.Error())

// loadbalancer should be removed from cache if the etag is mismatch or the operation is canceled
pls, err := az.plsCache.GetWithDeepCopy("rg*frontendID", cache.CacheReadTypeDefault)
assert.NoError(t, err)
assert.Equal(t, consts.PrivateLinkServiceNotExistID, *pls.(*network.PrivateLinkService).ID)
}
}

func TestDeletePLS(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

az := GetTestCloud(ctrl)
mockPLSClient := az.PrivateLinkServiceClient.(*mockprivatelinkserviceclient.MockInterface)
mockPLSClient.EXPECT().Delete(gomock.Any(), "rg", "pls").Return(&retry.Error{HTTPStatusCode: http.StatusInternalServerError})

err := az.DeletePLS(&v1.Service{}, "rg", "pls", "frontendID")
assert.EqualError(t, fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 500, RawError: %w", error(nil)), fmt.Sprintf("%s", err.Error()))
}

func TestDeletePEConn(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

az := GetTestCloud(ctrl)
mockPLSClient := az.PrivateLinkServiceClient.(*mockprivatelinkserviceclient.MockInterface)
mockPLSClient.EXPECT().DeletePEConnection(gomock.Any(), "rg", "pls", "peConn").Return(&retry.Error{HTTPStatusCode: http.StatusInternalServerError})

err := az.DeletePEConn(&v1.Service{}, "rg", "pls", "peConn")
assert.EqualError(t, fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 500, RawError: %w", error(nil)), fmt.Sprintf("%s", err.Error()))
}

func TestRequestBackoff(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
21 changes: 16 additions & 5 deletions pkg/provider/azure_privatelinkservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (az *Cloud) reconcilePrivateLinkService(
}

// Secondly, check if there is a private link service already created
existingPLS, err := az.getPrivateLinkService(fipConfigID, azcache.CacheReadTypeDefault)
existingPLS, err := az.getPrivateLinkService(az.getPLSResourceGroup(service), fipConfigID, azcache.CacheReadTypeDefault)
if err != nil {
klog.Errorf("reconcilePrivateLinkService for service(%s): getPrivateLinkService(%s) failed: %v", serviceName, pointer.StringDeref(fipConfigID, ""), err)
return err
Expand Down Expand Up @@ -153,14 +153,14 @@ func (az *Cloud) reconcilePrivateLinkService(
return err
}
existingPLS.Etag = pointer.String("")
err = az.CreateOrUpdatePLS(service, existingPLS)
err = az.CreateOrUpdatePLS(service, az.getPLSResourceGroup(service), existingPLS)
if err != nil {
klog.Errorf("reconcilePrivateLinkService for service(%s) abort backoff: pls(%s) - updating: %s", serviceName, plsName, err.Error())
return err
}
}
} else if !wantPLS {
existingPLS, err := az.getPrivateLinkService(fipConfigID, azcache.CacheReadTypeDefault)
existingPLS, err := az.getPrivateLinkService(az.getPLSResourceGroup(service), fipConfigID, azcache.CacheReadTypeDefault)
if err != nil {
klog.Errorf("reconcilePrivateLinkService for service(%s): getPrivateLinkService(%s) failed: %v", serviceName, pointer.StringDeref(fipConfigID, ""), err)
return err
Expand All @@ -181,6 +181,17 @@ func (az *Cloud) reconcilePrivateLinkService(
return nil
}

func (az *Cloud) getPLSResourceGroup(service *v1.Service) string {
if resourceGroup, found := service.Annotations[consts.ServiceAnnotationPLSResourceGroup]; found {
resourceGroupName := strings.TrimSpace(resourceGroup)
if len(resourceGroupName) > 0 {
return resourceGroupName
}
}

return az.PrivateLinkServiceResourceGroup
}

func (az *Cloud) disablePLSNetworkPolicy(service *v1.Service) error {
serviceName := getServiceName(service)
subnetName := getPLSSubnetName(service)
Expand Down Expand Up @@ -218,14 +229,14 @@ func (az *Cloud) safeDeletePLS(pls *network.PrivateLinkService, service *v1.Serv
if peConns != nil {
for _, peConn := range *peConns {
klog.V(2).Infof("deletePLS: deleting PEConnection %s", pointer.StringDeref(peConn.Name, ""))
rerr := az.DeletePEConn(service, pointer.StringDeref(pls.Name, ""), pointer.StringDeref(peConn.Name, ""))
rerr := az.DeletePEConn(service, az.getPLSResourceGroup(service), pointer.StringDeref(pls.Name, ""), pointer.StringDeref(peConn.Name, ""))
if rerr != nil {
return rerr
}
}
}

rerr := az.DeletePLS(service, pointer.StringDeref(pls.Name, ""), pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, ""))
rerr := az.DeletePLS(service, az.getPLSResourceGroup(service), pointer.StringDeref(pls.Name, ""), pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, ""))
if rerr != nil {
return rerr
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/provider/azure_privatelinkservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,35 @@ func TestReconcilePrivateLinkService(t *testing.T) {
}
}

func TestGetPLSResourceGroup(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

testCases := []struct {
desc string
annotations map[string]string
expectedRG string
}{
{
desc: "getPLSResourceGroup should return resource group from annotation",
annotations: map[string]string{
consts.ServiceAnnotationPLSResourceGroup: "testRG",
},
expectedRG: "testRG",
},
{
desc: "getPLSResourceGroup should return resource group from azure config when annotation is not set",
expectedRG: "rg",
},
}
for i, test := range testCases {
az := GetTestCloud(ctrl)
service := getTestServiceWithAnnotation("test", test.annotations, false, 80)
rg := az.getPLSResourceGroup(&service)
assert.Equal(t, test.expectedRG, rg, "TestCase[%d]: %s", i, test.desc)
}
}

func TestDisablePLSNetworkPolicy(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
20 changes: 15 additions & 5 deletions pkg/provider/azure_wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ func (az *Cloud) getSecurityGroup(crt azcache.AzureCacheReadType) (network.Secur
return *(securityGroup.(*network.SecurityGroup)), nil
}

func (az *Cloud) getPrivateLinkService(frontendIPConfigID *string, crt azcache.AzureCacheReadType) (pls network.PrivateLinkService, err error) {
cachedPLS, err := az.plsCache.GetWithDeepCopy(*frontendIPConfigID, crt)
func (az *Cloud) getPrivateLinkService(resourceGroup string, frontendIPConfigID *string, crt azcache.AzureCacheReadType) (pls network.PrivateLinkService, err error) {
cachedPLS, err := az.plsCache.GetWithDeepCopy(getPLSCacheKey(resourceGroup, *frontendIPConfigID), crt)
if err != nil {
return pls, err
}
Expand Down Expand Up @@ -341,12 +341,22 @@ func (az *Cloud) newPIPCache() (*azcache.TimedCache, error) {
return azcache.NewTimedcache(time.Duration(az.PublicIPCacheTTLInSeconds)*time.Second, getter)
}

func getPLSCacheKey(resourceGroup, plsLBFrontendID string) string {
return fmt.Sprintf("%s*%s", resourceGroup, plsLBFrontendID)
}

func parsePLSCacheKey(key string) (string, string) {
splits := strings.Split(key, "*")
return splits[0], splits[1]
}

func (az *Cloud) newPLSCache() (*azcache.TimedCache, error) {
// for PLS cache, key is LBFrontendIPConfiguration ID
getter := func(key string) (interface{}, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
plsList, err := az.PrivateLinkServiceClient.List(ctx, az.PrivateLinkServiceResourceGroup)
resourceGroup, frontendID := parsePLSCacheKey(key)
plsList, err := az.PrivateLinkServiceClient.List(ctx, resourceGroup)
exists, rerr := checkResourceExistsFromError(err)
if rerr != nil {
return nil, rerr.Error()
Expand All @@ -363,15 +373,15 @@ func (az *Cloud) newPLSCache() (*azcache.TimedCache, error) {
continue
}
for _, fipConfig := range *fipConfigs {
if strings.EqualFold(*fipConfig.ID, key) {
if strings.EqualFold(*fipConfig.ID, frontendID) {
return &pls, nil
}
}

}
}

klog.V(2).Infof("No privateLinkService found for frontendIPConfig %q", key)
klog.V(2).Infof("No privateLinkService found for frontendIPConfig %q in rg %q", frontendID, resourceGroup)
plsNotExistID := consts.PrivateLinkServiceNotExistID
return &network.PrivateLinkService{ID: &plsNotExistID}, nil
}
Expand Down
44 changes: 44 additions & 0 deletions pkg/provider/azure_wrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/pointer"

"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/privatelinkserviceclient/mockprivatelinkserviceclient"
"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/publicipclient/mockpublicipclient"
azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
"sigs.k8s.io/cloud-provider-azure/pkg/consts"
Expand Down Expand Up @@ -406,3 +407,46 @@ func TestListPIP(t *testing.T) {
})
}
}

func TestGetPrivateLinkService(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

az := GetTestCloud(ctrl)
az.plsCache.Set("rg*frontendID", &network.PrivateLinkService{Name: pointer.String("pls")})

// cache hit
pls, err := az.getPrivateLinkService("rg", pointer.String("frontendID"), azcache.CacheReadTypeDefault)
assert.NoError(t, err)
assert.Equal(t, "pls", *pls.Name)

// cache miss
mockPLSClient := az.PrivateLinkServiceClient.(*mockprivatelinkserviceclient.MockInterface)
mockPLSClient.EXPECT().List(gomock.Any(), "rg1").Return([]network.PrivateLinkService{
{
Name: pointer.String("pls1"),
PrivateLinkServiceProperties: &network.PrivateLinkServiceProperties{
LoadBalancerFrontendIPConfigurations: &[]network.FrontendIPConfiguration{
{
ID: pointer.String("frontendID1"),
},
},
},
},
}, nil)
pls, err = az.getPrivateLinkService("rg1", pointer.String("frontendID1"), azcache.CacheReadTypeDefault)
assert.NoError(t, err)
assert.Equal(t, "pls1", *pls.Name)
}

func TestGetPLSCacheKey(t *testing.T) {
rg, frontendID := "rg", "/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb/frontendIPConfigurations/ipconfig"
assert.Equal(t, "rg*/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb/frontendIPConfigurations/ipconfig", getPLSCacheKey(rg, frontendID))
}

func TestParsePLSCacheKey(t *testing.T) {
key := "rg*/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb/frontendIPConfigurations/ipconfig"
rg, frontendID := parsePLSCacheKey(key)
assert.Equal(t, "rg", rg)
assert.Equal(t, "/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb/frontendIPConfigurations/ipconfig", frontendID)
}
Loading

0 comments on commit a962e3d

Please sign in to comment.