From 5c3225102839a6072f0968148bd159f06cad9ac2 Mon Sep 17 00:00:00 2001 From: Wantong Jiang Date: Tue, 19 Sep 2023 01:59:05 +0000 Subject: [PATCH] add annotation to control pls creation rg --- pkg/consts/consts.go | 3 + pkg/provider/azure.go | 2 +- pkg/provider/azure_backoff.go | 20 +++--- pkg/provider/azure_backoff_test.go | 71 +++++++++++++++++++ pkg/provider/azure_privatelinkservice.go | 21 ++++-- pkg/provider/azure_privatelinkservice_test.go | 29 ++++++++ pkg/provider/azure_wrap.go | 20 ++++-- pkg/provider/azure_wrap_test.go | 44 ++++++++++++ tests/e2e/network/private_link_service.go | 29 ++++++++ 9 files changed, 218 insertions(+), 21 deletions(-) diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go index 0f10b5f611..33ccbf35ae 100644 --- a/pkg/consts/consts.go +++ b/pkg/consts/consts.go @@ -489,6 +489,9 @@ const ( // ServiceAnnotationPLSCreation determines whether a PLS needs to be created. ServiceAnnotationPLSCreation = "service.beta.kubernetes.io/azure-pls-create" + // ServiceAnnotationPLSResourceGroup determines the resource group to create the PLS in. + ServiceAnnotationPLSResourceGroup = "service.beta.kubernetes.io/azure-pls-resource-group" + // ServiceAnnotationPLSName determines name of the PLS resource to create. ServiceAnnotationPLSName = "service.beta.kubernetes.io/azure-pls-name" diff --git a/pkg/provider/azure.go b/pkg/provider/azure.go index eee605f013..a918440db4 100644 --- a/pkg/provider/azure.go +++ b/pkg/provider/azure.go @@ -378,7 +378,7 @@ type Cloud struct { // key: [resourceGroupName] // Value: sync.Map of [pipName]*PublicIPAddress pipCache *azcache.TimedCache - // use LB frontEndIpConfiguration ID as the key and search for PLS attached to the frontEnd + // use [resourceGroupName*LBFrontEndIpConfigurationID] as the key and search for PLS attached to the frontEnd plsCache *azcache.TimedCache // Add service lister to always get latest service diff --git a/pkg/provider/azure_backoff.go b/pkg/provider/azure_backoff.go index 68b2b58cb4..e5bf0db879 100644 --- a/pkg/provider/azure_backoff.go +++ b/pkg/provider/azure_backoff.go @@ -572,14 +572,14 @@ func (az *Cloud) CreateOrUpdateVMSS(resourceGroupName string, VMScaleSetName str return nil } -func (az *Cloud) CreateOrUpdatePLS(service *v1.Service, pls network.PrivateLinkService) error { +func (az *Cloud) CreateOrUpdatePLS(service *v1.Service, resourceGroup string, pls network.PrivateLinkService) error { ctx, cancel := getContextWithCancel() defer cancel() - rerr := az.PrivateLinkServiceClient.CreateOrUpdate(ctx, az.PrivateLinkServiceResourceGroup, pointer.StringDeref(pls.Name, ""), pls, pointer.StringDeref(pls.Etag, "")) + rerr := az.PrivateLinkServiceClient.CreateOrUpdate(ctx, resourceGroup, pointer.StringDeref(pls.Name, ""), pls, pointer.StringDeref(pls.Etag, "")) if rerr == nil { // Invalidate the cache right after updating - _ = az.plsCache.Delete(pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, "")) + _ = az.plsCache.Delete(getPLSCacheKey(resourceGroup, pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, ""))) return nil } @@ -589,26 +589,26 @@ func (az *Cloud) CreateOrUpdatePLS(service *v1.Service, pls network.PrivateLinkS // Invalidate the cache because etag mismatch. if rerr.HTTPStatusCode == http.StatusPreconditionFailed { klog.V(3).Infof("Private link service cache for %s is cleanup because of http.StatusPreconditionFailed", pointer.StringDeref(pls.Name, "")) - _ = az.plsCache.Delete(pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, "")) + _ = az.plsCache.Delete(getPLSCacheKey(resourceGroup, pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, ""))) } // Invalidate the cache because another new operation has canceled the current request. if strings.Contains(strings.ToLower(rerr.Error().Error()), consts.OperationCanceledErrorMessage) { klog.V(3).Infof("Private link service for %s is cleanup because CreateOrUpdatePrivateLinkService is canceled by another operation", pointer.StringDeref(pls.Name, "")) - _ = az.plsCache.Delete(pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, "")) + _ = az.plsCache.Delete(getPLSCacheKey(resourceGroup, pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, ""))) } klog.Errorf("PrivateLinkServiceClient.CreateOrUpdate(%s) failed: %v", pointer.StringDeref(pls.Name, ""), rerr.Error()) return rerr.Error() } // DeletePLS invokes az.PrivateLinkServiceClient.Delete with exponential backoff retry -func (az *Cloud) DeletePLS(service *v1.Service, plsName string, plsLBFrontendID string) *retry.Error { +func (az *Cloud) DeletePLS(service *v1.Service, resourceGroup, plsName, plsLBFrontendID string) *retry.Error { ctx, cancel := getContextWithCancel() defer cancel() - rerr := az.PrivateLinkServiceClient.Delete(ctx, az.PrivateLinkServiceResourceGroup, plsName) + rerr := az.PrivateLinkServiceClient.Delete(ctx, resourceGroup, plsName) if rerr == nil { // Invalidate the cache right after deleting - _ = az.plsCache.Delete(plsLBFrontendID) + _ = az.plsCache.Delete(getPLSCacheKey(resourceGroup, plsLBFrontendID)) return nil } @@ -618,11 +618,11 @@ func (az *Cloud) DeletePLS(service *v1.Service, plsName string, plsLBFrontendID } // DeletePEConn invokes az.PrivateLinkServiceClient.DeletePEConnection with exponential backoff retry -func (az *Cloud) DeletePEConn(service *v1.Service, plsName string, peConnName string) *retry.Error { +func (az *Cloud) DeletePEConn(service *v1.Service, resourceGroup, plsName, peConnName string) *retry.Error { ctx, cancel := getContextWithCancel() defer cancel() - rerr := az.PrivateLinkServiceClient.DeletePEConnection(ctx, az.PrivateLinkServiceResourceGroup, plsName, peConnName) + rerr := az.PrivateLinkServiceClient.DeletePEConnection(ctx, resourceGroup, plsName, peConnName) if rerr == nil { return nil } diff --git a/pkg/provider/azure_backoff_test.go b/pkg/provider/azure_backoff_test.go index 620d4b6166..f02b49b388 100644 --- a/pkg/provider/azure_backoff_test.go +++ b/pkg/provider/azure_backoff_test.go @@ -34,6 +34,7 @@ import ( "sigs.k8s.io/cloud-provider-azure/pkg/azureclients/interfaceclient/mockinterfaceclient" "sigs.k8s.io/cloud-provider-azure/pkg/azureclients/loadbalancerclient/mockloadbalancerclient" + "sigs.k8s.io/cloud-provider-azure/pkg/azureclients/privatelinkserviceclient/mockprivatelinkserviceclient" "sigs.k8s.io/cloud-provider-azure/pkg/azureclients/publicipclient/mockpublicipclient" "sigs.k8s.io/cloud-provider-azure/pkg/azureclients/routeclient/mockrouteclient" "sigs.k8s.io/cloud-provider-azure/pkg/azureclients/routetableclient/mockroutetableclient" @@ -592,6 +593,76 @@ func TestCreateOrUpdateVMSS(t *testing.T) { } } +func TestCreateOrUpdatePLS(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + tests := []struct { + clientErr *retry.Error + expectedErr error + }{ + { + clientErr: &retry.Error{HTTPStatusCode: http.StatusPreconditionFailed}, + expectedErr: fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 412, RawError: %w", error(nil)), + }, + { + clientErr: &retry.Error{RawError: fmt.Errorf(consts.OperationCanceledErrorMessage)}, + expectedErr: fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 0, RawError: %w", fmt.Errorf("canceledandsupersededduetoanotheroperation")), + }, + } + + for _, test := range tests { + az := GetTestCloud(ctrl) + az.pipCache.Set("rg*frontendID", "test") + + mockPLSClient := az.PrivateLinkServiceClient.(*mockprivatelinkserviceclient.MockInterface) + mockPLSClient.EXPECT().CreateOrUpdate(gomock.Any(), "rg", gomock.Any(), gomock.Any(), gomock.Any()).Return(test.clientErr) + mockPLSClient.EXPECT().List(gomock.Any(), az.ResourceGroup).Return([]network.PrivateLinkService{}, nil) + + err := az.CreateOrUpdatePLS(&v1.Service{}, "rg", network.PrivateLinkService{ + Name: pointer.String("pls"), + Etag: pointer.String("etag"), + PrivateLinkServiceProperties: &network.PrivateLinkServiceProperties{ + LoadBalancerFrontendIPConfigurations: &[]network.FrontendIPConfiguration{ + { + ID: pointer.String("frontendID"), + }, + }, + }, + }) + assert.EqualError(t, test.expectedErr, err.Error()) + + // loadbalancer should be removed from cache if the etag is mismatch or the operation is canceled + pls, err := az.plsCache.GetWithDeepCopy("rg*frontendID", cache.CacheReadTypeDefault) + assert.NoError(t, err) + assert.Equal(t, consts.PrivateLinkServiceNotExistID, *pls.(*network.PrivateLinkService).ID) + } +} + +func TestDeletePLS(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + az := GetTestCloud(ctrl) + mockPLSClient := az.PrivateLinkServiceClient.(*mockprivatelinkserviceclient.MockInterface) + mockPLSClient.EXPECT().Delete(gomock.Any(), "rg", "pls").Return(&retry.Error{HTTPStatusCode: http.StatusInternalServerError}) + + err := az.DeletePLS(&v1.Service{}, "rg", "pls", "frontendID") + assert.EqualError(t, fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 500, RawError: %w", error(nil)), fmt.Sprintf("%s", err.Error())) +} + +func TestDeletePEConn(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + az := GetTestCloud(ctrl) + mockPLSClient := az.PrivateLinkServiceClient.(*mockprivatelinkserviceclient.MockInterface) + mockPLSClient.EXPECT().DeletePEConnection(gomock.Any(), "rg", "pls", "peConn").Return(&retry.Error{HTTPStatusCode: http.StatusInternalServerError}) + + err := az.DeletePEConn(&v1.Service{}, "rg", "pls", "peConn") + assert.EqualError(t, fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 500, RawError: %w", error(nil)), fmt.Sprintf("%s", err.Error())) +} + func TestRequestBackoff(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/pkg/provider/azure_privatelinkservice.go b/pkg/provider/azure_privatelinkservice.go index 1739b9b1c4..80f28045b9 100644 --- a/pkg/provider/azure_privatelinkservice.go +++ b/pkg/provider/azure_privatelinkservice.go @@ -86,7 +86,7 @@ func (az *Cloud) reconcilePrivateLinkService( } // Secondly, check if there is a private link service already created - existingPLS, err := az.getPrivateLinkService(fipConfigID, azcache.CacheReadTypeDefault) + existingPLS, err := az.getPrivateLinkService(az.getPLSResourceGroup(service), fipConfigID, azcache.CacheReadTypeDefault) if err != nil { klog.Errorf("reconcilePrivateLinkService for service(%s): getPrivateLinkService(%s) failed: %v", serviceName, pointer.StringDeref(fipConfigID, ""), err) return err @@ -153,14 +153,14 @@ func (az *Cloud) reconcilePrivateLinkService( return err } existingPLS.Etag = pointer.String("") - err = az.CreateOrUpdatePLS(service, existingPLS) + err = az.CreateOrUpdatePLS(service, az.getPLSResourceGroup(service), existingPLS) if err != nil { klog.Errorf("reconcilePrivateLinkService for service(%s) abort backoff: pls(%s) - updating: %s", serviceName, plsName, err.Error()) return err } } } else if !wantPLS { - existingPLS, err := az.getPrivateLinkService(fipConfigID, azcache.CacheReadTypeDefault) + existingPLS, err := az.getPrivateLinkService(az.getPLSResourceGroup(service), fipConfigID, azcache.CacheReadTypeDefault) if err != nil { klog.Errorf("reconcilePrivateLinkService for service(%s): getPrivateLinkService(%s) failed: %v", serviceName, pointer.StringDeref(fipConfigID, ""), err) return err @@ -181,6 +181,17 @@ func (az *Cloud) reconcilePrivateLinkService( return nil } +func (az *Cloud) getPLSResourceGroup(service *v1.Service) string { + if resourceGroup, found := service.Annotations[consts.ServiceAnnotationPLSResourceGroup]; found { + resourceGroupName := strings.TrimSpace(resourceGroup) + if len(resourceGroupName) > 0 { + return resourceGroupName + } + } + + return az.PrivateLinkServiceResourceGroup +} + func (az *Cloud) disablePLSNetworkPolicy(service *v1.Service) error { serviceName := getServiceName(service) subnetName := getPLSSubnetName(service) @@ -218,14 +229,14 @@ func (az *Cloud) safeDeletePLS(pls *network.PrivateLinkService, service *v1.Serv if peConns != nil { for _, peConn := range *peConns { klog.V(2).Infof("deletePLS: deleting PEConnection %s", pointer.StringDeref(peConn.Name, "")) - rerr := az.DeletePEConn(service, pointer.StringDeref(pls.Name, ""), pointer.StringDeref(peConn.Name, "")) + rerr := az.DeletePEConn(service, az.getPLSResourceGroup(service), pointer.StringDeref(pls.Name, ""), pointer.StringDeref(peConn.Name, "")) if rerr != nil { return rerr } } } - rerr := az.DeletePLS(service, pointer.StringDeref(pls.Name, ""), pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, "")) + rerr := az.DeletePLS(service, az.getPLSResourceGroup(service), pointer.StringDeref(pls.Name, ""), pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, "")) if rerr != nil { return rerr } diff --git a/pkg/provider/azure_privatelinkservice_test.go b/pkg/provider/azure_privatelinkservice_test.go index 3cfc1fc1f9..638a7323c2 100644 --- a/pkg/provider/azure_privatelinkservice_test.go +++ b/pkg/provider/azure_privatelinkservice_test.go @@ -357,6 +357,35 @@ func TestReconcilePrivateLinkService(t *testing.T) { } } +func TestGetPLSResourceGroup(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + testCases := []struct { + desc string + annotations map[string]string + expectedRG string + }{ + { + desc: "getPLSResourceGroup should return resource group from annotation", + annotations: map[string]string{ + consts.ServiceAnnotationPLSResourceGroup: "testRG", + }, + expectedRG: "testRG", + }, + { + desc: "getPLSResourceGroup should return resource group from azure config when annotation is not set", + expectedRG: "rg", + }, + } + for i, test := range testCases { + az := GetTestCloud(ctrl) + service := getTestServiceWithAnnotation("test", test.annotations, false, 80) + rg := az.getPLSResourceGroup(&service) + assert.Equal(t, test.expectedRG, rg, "TestCase[%d]: %s", i, test.desc) + } +} + func TestDisablePLSNetworkPolicy(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/pkg/provider/azure_wrap.go b/pkg/provider/azure_wrap.go index ea75719166..c8abee8722 100644 --- a/pkg/provider/azure_wrap.go +++ b/pkg/provider/azure_wrap.go @@ -193,8 +193,8 @@ func (az *Cloud) getSecurityGroup(crt azcache.AzureCacheReadType) (network.Secur return *(securityGroup.(*network.SecurityGroup)), nil } -func (az *Cloud) getPrivateLinkService(frontendIPConfigID *string, crt azcache.AzureCacheReadType) (pls network.PrivateLinkService, err error) { - cachedPLS, err := az.plsCache.GetWithDeepCopy(*frontendIPConfigID, crt) +func (az *Cloud) getPrivateLinkService(resourceGroup string, frontendIPConfigID *string, crt azcache.AzureCacheReadType) (pls network.PrivateLinkService, err error) { + cachedPLS, err := az.plsCache.GetWithDeepCopy(getPLSCacheKey(resourceGroup, *frontendIPConfigID), crt) if err != nil { return pls, err } @@ -341,12 +341,22 @@ func (az *Cloud) newPIPCache() (*azcache.TimedCache, error) { return azcache.NewTimedcache(time.Duration(az.PublicIPCacheTTLInSeconds)*time.Second, getter) } +func getPLSCacheKey(resourceGroup, plsLBFrontendID string) string { + return fmt.Sprintf("%s*%s", resourceGroup, plsLBFrontendID) +} + +func parsePLSCacheKey(key string) (string, string) { + splits := strings.Split(key, "*") + return splits[0], splits[1] +} + func (az *Cloud) newPLSCache() (*azcache.TimedCache, error) { // for PLS cache, key is LBFrontendIPConfiguration ID getter := func(key string) (interface{}, error) { ctx, cancel := getContextWithCancel() defer cancel() - plsList, err := az.PrivateLinkServiceClient.List(ctx, az.PrivateLinkServiceResourceGroup) + resourceGroup, frontendID := parsePLSCacheKey(key) + plsList, err := az.PrivateLinkServiceClient.List(ctx, resourceGroup) exists, rerr := checkResourceExistsFromError(err) if rerr != nil { return nil, rerr.Error() @@ -363,7 +373,7 @@ func (az *Cloud) newPLSCache() (*azcache.TimedCache, error) { continue } for _, fipConfig := range *fipConfigs { - if strings.EqualFold(*fipConfig.ID, key) { + if strings.EqualFold(*fipConfig.ID, frontendID) { return &pls, nil } } @@ -371,7 +381,7 @@ func (az *Cloud) newPLSCache() (*azcache.TimedCache, error) { } } - klog.V(2).Infof("No privateLinkService found for frontendIPConfig %q", key) + klog.V(2).Infof("No privateLinkService found for frontendIPConfig %q in rg %q", frontendID, resourceGroup) plsNotExistID := consts.PrivateLinkServiceNotExistID return &network.PrivateLinkService{ID: &plsNotExistID}, nil } diff --git a/pkg/provider/azure_wrap_test.go b/pkg/provider/azure_wrap_test.go index 4caee51957..654a7bd23d 100644 --- a/pkg/provider/azure_wrap_test.go +++ b/pkg/provider/azure_wrap_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/utils/pointer" + "sigs.k8s.io/cloud-provider-azure/pkg/azureclients/privatelinkserviceclient/mockprivatelinkserviceclient" "sigs.k8s.io/cloud-provider-azure/pkg/azureclients/publicipclient/mockpublicipclient" azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache" "sigs.k8s.io/cloud-provider-azure/pkg/consts" @@ -406,3 +407,46 @@ func TestListPIP(t *testing.T) { }) } } + +func TestGetPrivateLinkService(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + az := GetTestCloud(ctrl) + az.plsCache.Set("rg*frontendID", &network.PrivateLinkService{Name: pointer.String("pls")}) + + // cache hit + pls, err := az.getPrivateLinkService("rg", pointer.String("frontendID"), azcache.CacheReadTypeDefault) + assert.NoError(t, err) + assert.Equal(t, "pls", *pls.Name) + + // cache miss + mockPLSClient := az.PrivateLinkServiceClient.(*mockprivatelinkserviceclient.MockInterface) + mockPLSClient.EXPECT().List(gomock.Any(), "rg1").Return([]network.PrivateLinkService{ + { + Name: pointer.String("pls1"), + PrivateLinkServiceProperties: &network.PrivateLinkServiceProperties{ + LoadBalancerFrontendIPConfigurations: &[]network.FrontendIPConfiguration{ + { + ID: pointer.String("frontendID1"), + }, + }, + }, + }, + }, nil) + pls, err = az.getPrivateLinkService("rg1", pointer.String("frontendID1"), azcache.CacheReadTypeDefault) + assert.NoError(t, err) + assert.Equal(t, "pls1", *pls.Name) +} + +func TestGetPLSCacheKey(t *testing.T) { + rg, frontendID := "rg", "/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb/frontendIPConfigurations/ipconfig" + assert.Equal(t, "rg*/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb/frontendIPConfigurations/ipconfig", getPLSCacheKey(rg, frontendID)) +} + +func TestParsePLSCacheKey(t *testing.T) { + key := "rg*/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb/frontendIPConfigurations/ipconfig" + rg, frontendID := parsePLSCacheKey(key) + assert.Equal(t, "rg", rg) + assert.Equal(t, "/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb/frontendIPConfigurations/ipconfig", frontendID) +} diff --git a/tests/e2e/network/private_link_service.go b/tests/e2e/network/private_link_service.go index 11db51390b..73917fbc77 100644 --- a/tests/e2e/network/private_link_service.go +++ b/tests/e2e/network/private_link_service.go @@ -149,6 +149,35 @@ var _ = Describe("Private link service", Label(utils.TestSuiteLabelPrivateLinkSe Expect(*pls.Name).To(Equal(plsName)) }) + It("should support service annotation 'service.beta.kubernetes.io/azure-pls-resource-group'", func() { + By("creating a test resource group") + rg, cleanup := utils.CreateTestResourceGroup(tc) + defer cleanup(pointer.StringDeref(rg.Name, "")) + + By("creating a test pls specifying the test resource group") + plsName := "testpls" + annotation := map[string]string{ + consts.ServiceAnnotationLoadBalancerInternal: "true", + consts.ServiceAnnotationPLSCreation: "true", + consts.ServiceAnnotationPLSName: plsName, + consts.ServiceAnnotationPLSResourceGroup: pointer.StringDeref(rg.Name, ""), + } + + ips := createAndExposeDefaultServiceWithAnnotation(cs, tc.IPFamily, serviceName, ns.Name, labels, annotation, ports) + defer func() { + utils.Logf("cleaning up test service %s", serviceName) + err := utils.DeleteService(cs, ns.Name, serviceName) + Expect(err).NotTo(HaveOccurred()) + }() + Expect(len(ips)).NotTo(BeZero()) + ip := ips[0] + utils.Logf("Get Internal IP: %s", ip) + + // get pls from azure client + pls := getPrivateLinkServiceFromIP(tc, ip, pointer.StringDeref(rg.Name, ""), "", plsName) + Expect(*pls.Name).To(Equal(plsName)) + }) + It("should support service annotation 'service.beta.kubernetes.io/azure-pls-ip-configuration-subnet'", func() { subnetName := "pls-subnet" subnet, isNew := createNewSubnet(tc, subnetName)