diff --git a/apis/runtime/v1alpha1/vpc_types.go b/apis/runtime/v1alpha1/vpc_types.go new file mode 100644 index 00000000..cf7d28fa --- /dev/null +++ b/apis/runtime/v1alpha1/vpc_types.go @@ -0,0 +1,42 @@ +// Copyright 2022 Antrea Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type VpcInfo struct { + Name string + Id string + Tags map[string]string + Cidrs []string +} + +// +kubebuilder:object:root=true +type Vpc struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Info VpcInfo `json:"spec,omitempty"` +} + +// +kubebuilder:object:root=true +// VpcList is a list of Vpc objects. +type VpcList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []Vpc `json:"items"` +} diff --git a/apis/runtime/v1alpha1/zz_generated.deepcopy.go b/apis/runtime/v1alpha1/zz_generated.deepcopy.go index 81c6d5d5..4970a76d 100644 --- a/apis/runtime/v1alpha1/zz_generated.deepcopy.go +++ b/apis/runtime/v1alpha1/zz_generated.deepcopy.go @@ -125,3 +125,88 @@ func (in *VirtualMachinePolicyStatus) DeepCopy() *VirtualMachinePolicyStatus { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Vpc) DeepCopyInto(out *Vpc) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Info.DeepCopyInto(&out.Info) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Vpc. +func (in *Vpc) DeepCopy() *Vpc { + if in == nil { + return nil + } + out := new(Vpc) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *Vpc) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *VpcInfo) DeepCopyInto(out *VpcInfo) { + *out = *in + if in.Tags != nil { + in, out := &in.Tags, &out.Tags + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.Cidrs != nil { + in, out := &in.Cidrs, &out.Cidrs + *out = make([]string, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VpcInfo. +func (in *VpcInfo) DeepCopy() *VpcInfo { + if in == nil { + return nil + } + out := new(VpcInfo) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *VpcList) DeepCopyInto(out *VpcList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]Vpc, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VpcList. +func (in *VpcList) DeepCopy() *VpcList { + if in == nil { + return nil + } + out := new(VpcList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *VpcList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} diff --git a/pkg/cloud-provider/cloudapi/aws/aws_cloudinterface_impl.go b/pkg/cloud-provider/cloudapi/aws/aws_cloudinterface_impl.go index 61d7dc7b..14943504 100644 --- a/pkg/cloud-provider/cloudapi/aws/aws_cloudinterface_impl.go +++ b/pkg/cloud-provider/cloudapi/aws/aws_cloudinterface_impl.go @@ -20,6 +20,7 @@ import ( "antrea.io/nephe/apis/crd/v1alpha1" cloudv1alpha1 "antrea.io/nephe/apis/crd/v1alpha1" + runtimev1alpha1 "antrea.io/nephe/apis/runtime/v1alpha1" cloudcommon "antrea.io/nephe/pkg/cloud-provider/cloudapi/common" "antrea.io/nephe/pkg/cloud-provider/cloudapi/internal" "antrea.io/nephe/pkg/logging" @@ -103,3 +104,18 @@ func (c *awsCloud) RemoveAccountResourcesSelector(accNamespacedName *types.Names func (c *awsCloud) GetAccountStatus(accNamespacedName *types.NamespacedName) (*cloudv1alpha1.CloudProviderAccountStatus, error) { return c.cloudCommon.GetStatus(accNamespacedName) } + +// AddInventoryPoller adds poller for polling cloud inventory. +func (c *awsCloud) AddInventoryPoller(accountNamespacedName *types.NamespacedName) error { + return c.cloudCommon.AddInventoryPoller(accountNamespacedName) +} + +// DeleteInventoryPoller deletes an existing poller created for polling cloud inventory. +func (c *awsCloud) DeleteInventoryPoller(accountNamespacedName *types.NamespacedName) error { + return c.cloudCommon.DeleteInventoryPoller(accountNamespacedName) +} + +// GetVpcInventory pulls cloud vpc inventory from internal snapshot. +func (c *awsCloud) GetVpcInventory(accountNamespacedName *types.NamespacedName) (map[string]*runtimev1alpha1.Vpc, error) { + return c.cloudCommon.GetVpcInventory(accountNamespacedName) +} diff --git a/pkg/cloud-provider/cloudapi/aws/aws_ec2.go b/pkg/cloud-provider/cloudapi/aws/aws_ec2.go index 32c87f71..a82b7ca9 100644 --- a/pkg/cloud-provider/cloudapi/aws/aws_ec2.go +++ b/pkg/cloud-provider/cloudapi/aws/aws_ec2.go @@ -25,6 +25,7 @@ import ( "github.com/mohae/deepcopy" "antrea.io/nephe/apis/crd/v1alpha1" + runtimev1alpha1 "antrea.io/nephe/apis/runtime/v1alpha1" cloudcommon "antrea.io/nephe/pkg/cloud-provider/cloudapi/common" "antrea.io/nephe/pkg/cloud-provider/cloudapi/internal" ) @@ -40,11 +41,13 @@ type ec2ServiceConfig struct { // - key with nil value indicates no filters. Get all instances for account. // - key with "some-filter-string" value indicates some filter. Get instances matching those filters only. instanceFilters map[string][][]*ec2.Filter + pollVpc bool } // ec2ResourcesCacheSnapshot holds the results from querying for all instances. type ec2ResourcesCacheSnapshot struct { instances map[cloudcommon.InstanceID]*ec2.Instance + vpcs []*ec2.Vpc vpcIDs map[string]struct{} vpcNameToID map[string]string vpcPeers map[string][]string @@ -63,6 +66,7 @@ func newEC2ServiceConfig(name string, service awsServiceClientCreateInterface) ( resourcesCache: &internal.CloudServiceResourcesCache{}, inventoryStats: &internal.CloudServiceStats{}, instanceFilters: make(map[string][][]*ec2.Filter), + pollVpc: true, } return config, nil } @@ -165,6 +169,22 @@ func (ec2Cfg *ec2ServiceConfig) getCachedVpcNameToID() map[string]string { return vpcNameToIDCopy } +// GetCachedVpcs returns VPCs from cached snapshot for the account. +func (ec2Cfg *ec2ServiceConfig) GetCachedVpcs() []*ec2.Vpc { + snapshot := ec2Cfg.resourcesCache.GetSnapshot() + if snapshot == nil { + awsPluginLogger().V(4).Info("cache snapshot nil", "service", awsComputeServiceNameEC2, "account", ec2Cfg.accountName) + return []*ec2.Vpc{} + } + vpcs := snapshot.(*ec2ResourcesCacheSnapshot).vpcs + vpcsToReturn := make([]*ec2.Vpc, 0, len(vpcs)) + vpcsToReturn = append(vpcsToReturn, vpcs...) + + awsPluginLogger().V(1).Info("cached vpcs", "service", awsComputeServiceNameEC2, "account", ec2Cfg.accountName, + "vpcs", len(vpcsToReturn)) + return vpcsToReturn +} + // getVpcPeers returns all the peers of a vpc. func (ec2Cfg *ec2ServiceConfig) getVpcPeers(vpcID string) []string { snapshot := ec2Cfg.resourcesCache.GetSnapshot() @@ -210,8 +230,10 @@ func (ec2Cfg *ec2ServiceConfig) getInstances() ([]*ec2.Instance, error) { return instances, nil } -// doInstancesInventoryWorker gets inventory from cloud for given cloud account. +// DoResourceInventory gets inventory from cloud for given cloud account. func (ec2Cfg *ec2ServiceConfig) DoResourceInventory() error { + vpcs, _ := ec2Cfg.getVpcs() + instances, e := ec2Cfg.getInstances() if e != nil { awsPluginLogger().V(0).Info("error fetching ec2 instances", "account", ec2Cfg.accountName, "error", e) @@ -219,21 +241,21 @@ func (ec2Cfg *ec2ServiceConfig) DoResourceInventory() error { exists := struct{}{} vpcIDs := make(map[string]struct{}) instanceIDs := make(map[cloudcommon.InstanceID]*ec2.Instance) - vpcNameToID, _ := ec2Cfg.buildMapVpcNameToID() + vpcNameToID := ec2Cfg.buildMapVpcNameToID(vpcs) vpcPeers, _ := ec2Cfg.buildMapVpcPeers() for _, instance := range instances { id := cloudcommon.InstanceID(strings.ToLower(aws.StringValue(instance.InstanceId))) instanceIDs[id] = instance vpcIDs[strings.ToLower(*instance.VpcId)] = exists } - ec2Cfg.resourcesCache.UpdateSnapshot(&ec2ResourcesCacheSnapshot{instanceIDs, vpcIDs, vpcNameToID, vpcPeers}) + ec2Cfg.resourcesCache.UpdateSnapshot(&ec2ResourcesCacheSnapshot{instanceIDs, vpcs, vpcIDs, vpcNameToID, vpcPeers}) } ec2Cfg.inventoryStats.UpdateInventoryPollStats(e) return e } -// setInstanceFilters add/updates instances resource filter for the service. +// SetResourceFilters add/updates instances resource filter for the service. func (ec2Cfg *ec2ServiceConfig) SetResourceFilters(selector *v1alpha1.CloudEntitySelector) { if filters, found := convertSelectorToEC2InstanceFilters(selector); found { ec2Cfg.instanceFilters[selector.GetName()] = filters @@ -295,14 +317,9 @@ func (ec2Cfg *ec2ServiceConfig) UpdateServiceConfig(newConfig internal.CloudServ ec2Cfg.apiClient = newEc2ServiceConfig.apiClient } -func (ec2Cfg *ec2ServiceConfig) buildMapVpcNameToID() (map[string]string, error) { +func (ec2Cfg *ec2ServiceConfig) buildMapVpcNameToID(vpcs []*ec2.Vpc) map[string]string { vpcNameToID := make(map[string]string) - result, err := ec2Cfg.apiClient.describeVpcsWrapper(nil) - if err != nil { - awsPluginLogger().V(0).Info("error describing vpcs", "error", err) - return vpcNameToID, err - } - for _, vpc := range result.Vpcs { + for _, vpc := range vpcs { if len(vpc.Tags) == 0 { awsPluginLogger().V(4).Info("vpc name not found", "account", ec2Cfg.accountName, "vpc", vpc) continue @@ -316,7 +333,7 @@ func (ec2Cfg *ec2ServiceConfig) buildMapVpcNameToID() (map[string]string, error) } vpcNameToID[vpcName] = *vpc.VpcId } - return vpcNameToID, nil + return vpcNameToID } func (ec2Cfg *ec2ServiceConfig) buildMapVpcPeers() (map[string][]string, error) { @@ -333,3 +350,56 @@ func (ec2Cfg *ec2ServiceConfig) buildMapVpcPeers() (map[string][]string, error) } return vpcPeers, nil } + +func (ec2Cfg *ec2ServiceConfig) NeedPollWithoutFilter() bool { + return ec2Cfg.pollVpc +} + +func (ec2Cfg *ec2ServiceConfig) getVpcs() ([]*ec2.Vpc, error) { + result, err := ec2Cfg.apiClient.describeVpcsWrapper(nil) + if err != nil { + awsPluginLogger().V(0).Info("error describing vpcs", "account", ec2Cfg.accountName, "error", err) + return nil, err + } + return result.Vpcs, nil +} + +func (ec2Cfg *ec2ServiceConfig) GetVpcInventory() map[string]*runtimev1alpha1.Vpc { + vpcs := ec2Cfg.GetCachedVpcs() + + // Extract namespace from account namespaced name. + tokens := strings.Split(ec2Cfg.accountName, "/") + if len(tokens) != 2 { + awsPluginLogger().V(0).Error(fmt.Errorf("cannot extract namespace from account namespaced name"), + "nil", "account", ec2Cfg.accountName) + return nil + } + namespace := tokens[0] + // Convert to kubernetes object and return a map indexed using VPC ID. + vpcMap := map[string]*runtimev1alpha1.Vpc{} + for _, vpc := range vpcs { + vpcObj := new(runtimev1alpha1.Vpc) + vpcObj.Name = *vpc.VpcId + vpcObj.Namespace = namespace + vpcObj.Info.Id = *vpc.VpcId + if len(vpc.Tags) != 0 { + vpcObj.Info.Tags = make(map[string]string) + for _, tag := range vpc.Tags { + vpcObj.Info.Tags[*(tag.Key)] = *(tag.Value) + } + if value, found := vpcObj.Info.Tags["Name"]; found { + vpcObj.Info.Name = value + } + } + if len(vpc.CidrBlockAssociationSet) != 0 { + vpcObj.Info.Cidrs = make([]string, 0) + for _, cidr := range vpc.CidrBlockAssociationSet { + vpcObj.Info.Cidrs = append(vpcObj.Info.Cidrs, *cidr.CidrBlock) + } + } + + vpcMap[*vpc.VpcId] = vpcObj + } + + return vpcMap +} diff --git a/pkg/cloud-provider/cloudapi/aws/aws_test.go b/pkg/cloud-provider/cloudapi/aws/aws_test.go index da687982..90b2bf79 100644 --- a/pkg/cloud-provider/cloudapi/aws/aws_test.go +++ b/pkg/cloud-provider/cloudapi/aws/aws_test.go @@ -17,6 +17,7 @@ package aws import ( "context" "errors" + "fmt" "reflect" "sort" "time" @@ -118,7 +119,117 @@ var _ = Describe("AWS cloud", func() { mockawsCloudHelper.EXPECT().newServiceSdkConfigProvider(gomock.Any()).Return(mockawsService, nil) mockawsService.EXPECT().compute().Return(mockawsEC2, nil).AnyTimes() }) + It("On account add expect cloud api call for retrieving vpc list", func() { + credential := `{"accessKeyId": "keyId","accessKeySecret": "keySecret"}` + + secret = &corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: testAccountNamespacedName.Name, + Namespace: testAccountNamespacedName.Namespace, + }, + Data: map[string][]byte{ + "credentials": []byte(credential), + }, + } + instanceIds := []string{} + vpcIDs := []string{"testVpcID01", "testVpcID02"} + mockawsEC2.EXPECT().pagedDescribeInstancesWrapper(gomock.Any()).Return(getEc2InstanceObject(instanceIds), nil).AnyTimes() + mockawsEC2.EXPECT().pagedDescribeNetworkInterfaces(gomock.Any()).Return([]*ec2.NetworkInterface{}, nil).Times(0) + mockawsEC2.EXPECT().describeVpcsWrapper(gomock.Any()).Return(createVpcObject(vpcIDs), nil).AnyTimes() + mockawsEC2.EXPECT().describeVpcPeeringConnectionsWrapper(gomock.Any()).Return(&ec2.DescribeVpcPeeringConnectionsOutput{}, + nil).AnyTimes() + + _ = fakeClient.Create(context.Background(), secret) + c := newAWSCloud(mockawsCloudHelper) + + err := c.AddProviderAccount(fakeClient, account) + Expect(err).Should(BeNil()) + accCfg, found := c.cloudCommon.GetCloudAccountByName(&testAccountNamespacedName) + Expect(found).To(BeTrue()) + Expect(accCfg).To(Not(BeNil())) + + errPolAdd := c.AddInventoryPoller(&testAccountNamespacedName) + Expect(errPolAdd).Should(BeNil()) + + err = checkVpcPollResult(c, testAccountNamespacedName, vpcIDs) + Expect(err).Should(BeNil()) + }) + It("Fetch vpc list from snapshot", func() { + credential := `{"accessKeyId": "keyId","accessKeySecret": "keySecret"}` + + secret = &corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: testAccountNamespacedName.Name, + Namespace: testAccountNamespacedName.Namespace, + }, + Data: map[string][]byte{ + "credentials": []byte(credential), + }, + } + instanceIds := []string{} + vpcIDs := []string{"testVpcID01", "testVpcID02"} + mockawsEC2.EXPECT().pagedDescribeInstancesWrapper(gomock.Any()).Return(getEc2InstanceObject(instanceIds), nil).AnyTimes() + mockawsEC2.EXPECT().pagedDescribeNetworkInterfaces(gomock.Any()).Return([]*ec2.NetworkInterface{}, nil).Times(0) + mockawsEC2.EXPECT().describeVpcsWrapper(gomock.Any()).Return(createVpcObject(vpcIDs), nil).AnyTimes() + mockawsEC2.EXPECT().describeVpcPeeringConnectionsWrapper(gomock.Any()).Return(&ec2.DescribeVpcPeeringConnectionsOutput{}, + nil).AnyTimes() + + _ = fakeClient.Create(context.Background(), secret) + c := newAWSCloud(mockawsCloudHelper) + err := c.AddProviderAccount(fakeClient, account) + Expect(err).Should(BeNil()) + accCfg, found := c.cloudCommon.GetCloudAccountByName(&testAccountNamespacedName) + Expect(found).To(BeTrue()) + Expect(accCfg).To(Not(BeNil())) + + errPolAdd := c.AddInventoryPoller(&testAccountNamespacedName) + Expect(errPolAdd).Should(BeNil()) + + vpcMap, err := c.GetVpcInventory(&testAccountNamespacedName) + Expect(err).Should(BeNil()) + Expect(len(vpcMap)).Should(Equal(len(vpcIDs))) + }) + It("Stop cloud inventory poll on poller delete", func() { + credential := `{"accessKeyId": "keyId","accessKeySecret": "keySecret"}` + + secret = &corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: testAccountNamespacedName.Name, + Namespace: testAccountNamespacedName.Namespace, + }, + Data: map[string][]byte{ + "credentials": []byte(credential), + }, + } + instanceIds := []string{} + vpcIDs := []string{"testVpcID01", "testVpcID02"} + mockawsEC2.EXPECT().pagedDescribeInstancesWrapper(gomock.Any()).Return(getEc2InstanceObject(instanceIds), nil).AnyTimes() + mockawsEC2.EXPECT().pagedDescribeNetworkInterfaces(gomock.Any()).Return([]*ec2.NetworkInterface{}, nil).Times(0) + mockawsEC2.EXPECT().describeVpcsWrapper(gomock.Any()).Return(createVpcObject(vpcIDs), nil).AnyTimes() + mockawsEC2.EXPECT().describeVpcPeeringConnectionsWrapper(gomock.Any()).Return(&ec2.DescribeVpcPeeringConnectionsOutput{}, + nil).AnyTimes() + + _ = fakeClient.Create(context.Background(), secret) + c := newAWSCloud(mockawsCloudHelper) + + err := c.AddProviderAccount(fakeClient, account) + Expect(err).Should(BeNil()) + accCfg, found := c.cloudCommon.GetCloudAccountByName(&testAccountNamespacedName) + Expect(found).To(BeTrue()) + Expect(accCfg).To(Not(BeNil())) + + errPolAdd := c.AddInventoryPoller(&testAccountNamespacedName) + Expect(errPolAdd).Should(BeNil()) + + errPolDel := c.DeleteInventoryPoller(&testAccountNamespacedName) + Expect(errPolDel).Should(BeNil()) + + mockawsEC2.EXPECT().pagedDescribeInstancesWrapper(gomock.Any()).Return(getEc2InstanceObject(instanceIds), nil).Times(0) + mockawsEC2.EXPECT().pagedDescribeNetworkInterfaces(gomock.Any()).Return([]*ec2.NetworkInterface{}, nil).Times(0) + mockawsEC2.EXPECT().describeVpcsWrapper(gomock.Any()).Return(&ec2.DescribeVpcsOutput{}, nil).Times(0) + mockawsEC2.EXPECT().describeVpcPeeringConnectionsWrapper(gomock.Any()).Return(&ec2.DescribeVpcPeeringConnectionsOutput{}, nil).Times(0) + }) It("Should discover few instances with get ALL selector using credentials", func() { instanceIds := []string{"i-01", "i-02"} credential := `{"accessKeyId": "keyId","accessKeySecret": "keySecret"}` @@ -208,20 +319,6 @@ var _ = Describe("AWS cloud", func() { err = checkAccountAddSuccessCondition(c, testAccountNamespacedName, instanceIds) Expect(err).Should(BeNil()) }) - It("Should not call cloud api's with NO selector", func() { - instanceIds := []string{} - mockawsEC2.EXPECT().pagedDescribeInstancesWrapper(gomock.Any()).Return(getEc2InstanceObject(instanceIds), nil).Times(0) - mockawsEC2.EXPECT().pagedDescribeNetworkInterfaces(gomock.Any()).Return([]*ec2.NetworkInterface{}, nil).Times(0) - mockawsEC2.EXPECT().describeVpcsWrapper(gomock.Any()).Return(&ec2.DescribeVpcsOutput{}, nil).Times(0) - mockawsEC2.EXPECT().describeVpcPeeringConnectionsWrapper(gomock.Any()).Return(&ec2.DescribeVpcPeeringConnectionsOutput{}, nil).Times(0) - _ = fakeClient.Create(context.Background(), secret) - c := newAWSCloud(mockawsCloudHelper) - err := c.AddProviderAccount(fakeClient, account) - Expect(err).Should(BeNil()) - accCfg, found := c.cloudCommon.GetCloudAccountByName(&testAccountNamespacedName) - Expect(found).To(BeTrue()) - Expect(accCfg).To(Not(BeNil())) - }) }) }) @@ -587,6 +684,32 @@ func getEc2InstanceObject(instanceIDs []string) []*ec2.Instance { return ec2Instances } +func createVpcObject(vpcIDs []string) *ec2.DescribeVpcsOutput { + vpcsOutput := new(ec2.DescribeVpcsOutput) + + for i := range vpcIDs { + key := "Name" + value := vpcIDs[i] + tags := make([]*ec2.Tag, 0) + tag := &ec2.Tag{Key: &key, Value: &value} + tags = append(tags, tag) + cidrBlock := new(ec2.VpcCidrBlockAssociation) + cidr := "192.1.0.0/24" + cidrBlock.CidrBlock = &cidr + cidrBlockAssociationSet := make([]*ec2.VpcCidrBlockAssociation, 0) + cidrBlockAssociationSet = append(cidrBlockAssociationSet, cidrBlock) + + vpc := &ec2.Vpc{ + VpcId: &vpcIDs[i], + CidrBlockAssociationSet: cidrBlockAssociationSet, + Tags: tags, + } + + vpcsOutput.Vpcs = append(vpcsOutput.Vpcs, vpc) + } + return vpcsOutput +} + func checkAccountAddSuccessCondition(c *awsCloud, namespacedName types.NamespacedName, ids []string) error { conditionFunc := func() (done bool, e error) { accCfg, found := c.cloudCommon.GetCloudAccountByName(&namespacedName) @@ -612,3 +735,30 @@ func checkAccountAddSuccessCondition(c *awsCloud, namespacedName types.Namespace return wait.PollImmediate(1*time.Second, 5*time.Second, conditionFunc) } + +func checkVpcPollResult(c *awsCloud, namespacedName types.NamespacedName, ids []string) error { + conditionFunc := func() (done bool, e error) { + accCfg, found := c.cloudCommon.GetCloudAccountByName(&namespacedName) + if !found { + return true, errors.New("failed to find account") + } + + serviceConfig, _ := accCfg.GetServiceConfigByName(awsComputeServiceNameEC2) + vpcs := serviceConfig.(*ec2ServiceConfig).GetCachedVpcs() + vpcIDs := make([]string, 0, len(vpcs)) + for _, vpc := range vpcs { + _, _ = GinkgoWriter.Write([]byte(fmt.Sprintf("vpc id %s", *vpc.VpcId))) + vpcIDs = append(vpcIDs, *vpc.VpcId) + } + + sort.Strings(vpcIDs) + sort.Strings(ids) + equal := reflect.DeepEqual(vpcIDs, ids) + if equal { + return true, nil + } + return false, nil + } + + return wait.PollImmediate(1*time.Second, 5*time.Second, conditionFunc) +} diff --git a/pkg/cloud-provider/cloudapi/azure/azure_cloudinterface_impl.go b/pkg/cloud-provider/cloudapi/azure/azure_cloudinterface_impl.go index 013e5f6f..f6f896bb 100644 --- a/pkg/cloud-provider/cloudapi/azure/azure_cloudinterface_impl.go +++ b/pkg/cloud-provider/cloudapi/azure/azure_cloudinterface_impl.go @@ -20,6 +20,7 @@ import ( "antrea.io/nephe/apis/crd/v1alpha1" cloudv1alpha1 "antrea.io/nephe/apis/crd/v1alpha1" + runtimev1alpha1 "antrea.io/nephe/apis/runtime/v1alpha1" cloudcommon "antrea.io/nephe/pkg/cloud-provider/cloudapi/common" "antrea.io/nephe/pkg/cloud-provider/cloudapi/internal" "antrea.io/nephe/pkg/logging" @@ -103,3 +104,18 @@ func (c *azureCloud) RemoveAccountResourcesSelector(accNamespacedName *types.Nam func (c *azureCloud) GetAccountStatus(accNamespacedName *types.NamespacedName) (*cloudv1alpha1.CloudProviderAccountStatus, error) { return c.cloudCommon.GetStatus(accNamespacedName) } + +// AddInventoryPoller adds poller for polling cloud inventory. +func (c *azureCloud) AddInventoryPoller(accountNamespacedName *types.NamespacedName) error { + return c.cloudCommon.AddInventoryPoller(accountNamespacedName) +} + +// DeleteInventoryPoller deletes an existing poller created for polling cloud inventory. +func (c *azureCloud) DeleteInventoryPoller(accountNamespacedName *types.NamespacedName) error { + return c.cloudCommon.DeleteInventoryPoller(accountNamespacedName) +} + +// GetVpcInventory pulls cloud vpc inventory from internal snapshot. +func (c *azureCloud) GetVpcInventory(accountNamespacedName *types.NamespacedName) (map[string]*runtimev1alpha1.Vpc, error) { + return c.cloudCommon.GetVpcInventory(accountNamespacedName) +} diff --git a/pkg/cloud-provider/cloudapi/azure/azure_compute.go b/pkg/cloud-provider/cloudapi/azure/azure_compute.go index b88512cd..8e259dec 100644 --- a/pkg/cloud-provider/cloudapi/azure/azure_compute.go +++ b/pkg/cloud-provider/cloudapi/azure/azure_compute.go @@ -20,12 +20,15 @@ import ( "strings" "time" + "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2021-03-01/network" "github.com/cenkalti/backoff/v4" "github.com/mohae/deepcopy" "antrea.io/nephe/apis/crd/v1alpha1" + runtimev1alpha1 "antrea.io/nephe/apis/runtime/v1alpha1" cloudcommon "antrea.io/nephe/pkg/cloud-provider/cloudapi/common" "antrea.io/nephe/pkg/cloud-provider/cloudapi/internal" + "antrea.io/nephe/pkg/cloud-provider/utils" ) type computeServiceConfig struct { @@ -39,10 +42,12 @@ type computeServiceConfig struct { inventoryStats *internal.CloudServiceStats credentials *azureAccountConfig computeFilters map[string][]*string + pollVpc bool } type computeResourcesCacheSnapshot struct { virtualMachines map[cloudcommon.InstanceID]*virtualMachineTable + vnets []network.VirtualNetwork vnetIDs map[string]struct{} vnetPeers map[string][][]string } @@ -87,6 +92,7 @@ func newComputeServiceConfig(name string, service azureServiceClientCreateInterf inventoryStats: &internal.CloudServiceStats{}, credentials: credentials, computeFilters: make(map[string][]*string), + pollVpc: true, } return config, nil } @@ -202,18 +208,20 @@ func (computeCfg *computeServiceConfig) getComputeResourceFilters() ([]*string, } func (computeCfg *computeServiceConfig) DoResourceInventory() error { + vnets, _ := computeCfg.getVpcs() + virtualMachines, err := computeCfg.getVirtualMachines() if err == nil { exists := struct{}{} vnetIDs := make(map[string]struct{}) - vpcPeers, _ := computeCfg.buildMapVpcPeers() + vpcPeers := computeCfg.buildMapVpcPeers(vnets) vmIDToInfoMap := make(map[cloudcommon.InstanceID]*virtualMachineTable) for _, vm := range virtualMachines { id := cloudcommon.InstanceID(strings.ToLower(*vm.ID)) vmIDToInfoMap[id] = vm vnetIDs[*vm.VnetID] = exists } - computeCfg.resourcesCache.UpdateSnapshot(&computeResourcesCacheSnapshot{vmIDToInfoMap, vnetIDs, vpcPeers}) + computeCfg.resourcesCache.UpdateSnapshot(&computeResourcesCacheSnapshot{vmIDToInfoMap, vnets, vnetIDs, vpcPeers}) } return err @@ -288,13 +296,19 @@ func (computeCfg *computeServiceConfig) UpdateServiceConfig(newConfig internal.C computeCfg.credentials = newComputeServiceConfig.credentials } -func (computeCfg *computeServiceConfig) buildMapVpcPeers() (map[string][][]string, error) { - vpcPeers := make(map[string][][]string) +func (computeCfg *computeServiceConfig) getVpcs() ([]network.VirtualNetwork, error) { results, err := computeCfg.vnetAPIClient.listAllComplete(context.Background()) if err != nil { - azurePluginLogger().V(0).Info("error getting peering connections", "error", err) + azurePluginLogger().V(0).Info("error getting list of vnets", + "account", computeCfg.accountName, "error", err) return nil, err } + return results, nil +} + +func (computeCfg *computeServiceConfig) buildMapVpcPeers(results []network.VirtualNetwork) map[string][][]string { + vpcPeers := make(map[string][][]string) + for _, result := range results { if len(*result.VirtualNetworkPropertiesFormat.VirtualNetworkPeerings) > 0 { for _, peerConn := range *result.VirtualNetworkPropertiesFormat.VirtualNetworkPeerings { @@ -308,5 +322,51 @@ func (computeCfg *computeServiceConfig) buildMapVpcPeers() (map[string][][]strin } } } - return vpcPeers, nil + return vpcPeers +} + +func (computeCfg *computeServiceConfig) NeedPollWithoutFilter() bool { + return computeCfg.pollVpc +} + +func (computeCfg *computeServiceConfig) GetVpcInventory() map[string]*runtimev1alpha1.Vpc { + snapshot := computeCfg.resourcesCache.GetSnapshot() + if snapshot == nil { + azurePluginLogger().V(4).Info("compute service cache snapshot nil", "type", providerType, "account", computeCfg.accountName) + return nil + } + + // Extract namespace from account namespaced name. + tokens := strings.Split(computeCfg.accountName, "/") + if len(tokens) != 2 { + azurePluginLogger().V(0).Error(fmt.Errorf("cannot extract namespace from account namespaced name"), + "nil", "account", computeCfg.accountName) + return nil + } + namespace := tokens[0] + // Convert to kubernetes object and return a map indexed using VnetID. + vpcMap := map[string]*runtimev1alpha1.Vpc{} + for _, vpc := range snapshot.(*computeResourcesCacheSnapshot).vnets { + if *vpc.Location == computeCfg.credentials.region { + vpcObj := new(runtimev1alpha1.Vpc) + vpcObj.Name = utils.GenerateShortResourceIdentifier(*vpc.ID, *vpc.Name) + vpcObj.Namespace = namespace + vpcObj.Info.Id = *vpc.ID + vpcObj.Info.Name = *vpc.Name + if len(vpc.Tags) != 0 { + vpcObj.Info.Tags = make(map[string]string) + for k, v := range vpc.Tags { + vpcObj.Info.Tags[k] = *v + } + } + if vpc.AddressSpace != nil && vpc.AddressSpace.AddressPrefixes != nil { + vpcObj.Info.Cidrs = make([]string, 0) + vpcObj.Info.Cidrs = append(vpcObj.Info.Cidrs, *vpc.AddressSpace.AddressPrefixes...) + } + + vpcMap[*vpc.ID] = vpcObj + } + } + + return vpcMap } diff --git a/pkg/cloud-provider/cloudapi/azure/azure_security_test.go b/pkg/cloud-provider/cloudapi/azure/azure_security_test.go index eaebb2fe..a0e1925d 100644 --- a/pkg/cloud-provider/cloudapi/azure/azure_security_test.go +++ b/pkg/cloud-provider/cloudapi/azure/azure_security_test.go @@ -233,8 +233,7 @@ var _ = Describe("Azure Cloud Security", func() { vnetIDs[strings.ToLower(testVnetID01)] = struct{}{} vnetIDs[strings.ToLower(testVnetID02)] = struct{}{} vnetIDs[strings.ToLower(testVnetPeerID01)] = struct{}{} - vpcPeers, err := serviceConfig.(*computeServiceConfig).buildMapVpcPeers() - Expect(err).Should(BeNil()) + vpcPeers := serviceConfig.(*computeServiceConfig).buildMapVpcPeers(nil) vpcPeers[testVnetPeerID01] = [][]string{ {strings.ToLower(testVnetPeerID01), "destinationID", "sourceID"}, } @@ -251,7 +250,13 @@ var _ = Describe("Azure Cloud Security", func() { }, } - serviceConfig.(*computeServiceConfig).resourcesCache.UpdateSnapshot(&computeResourcesCacheSnapshot{vmIDToInfoMap, vnetIDs, vpcPeers}) + vnetList := []network.VirtualNetwork{} + vnet := new(network.VirtualNetwork) + vnet.Name = &testVnet01 + vnet.ID = &testVnetID01 + vnetList = append(vnetList, *vnet) + serviceConfig.(*computeServiceConfig).resourcesCache.UpdateSnapshot(&computeResourcesCacheSnapshot{ + vmIDToInfoMap, vnetList, vnetIDs, vpcPeers}) }) AfterEach(func() { @@ -506,9 +511,10 @@ var _ = Describe("Azure Cloud Security", func() { }, VnetID: &testVnetID03, } + accCfg, _ := c.cloudCommon.GetCloudAccountByName(testAccountNamespacedName) serviceConfig, _ := accCfg.GetServiceConfigByName(azureComputeServiceNameCompute) - serviceConfig.(*computeServiceConfig).resourcesCache.UpdateSnapshot(&computeResourcesCacheSnapshot{vmToUpdateMap, nil, nil}) + serviceConfig.(*computeServiceConfig).resourcesCache.UpdateSnapshot(&computeResourcesCacheSnapshot{vmToUpdateMap, nil, nil, nil}) serviceConfig.(*computeServiceConfig).GetResourceCRDs(testAccountNamespacedName.Namespace, testAccountNamespacedName.String()) }) diff --git a/pkg/cloud-provider/cloudapi/azure/azure_test.go b/pkg/cloud-provider/cloudapi/azure/azure_test.go index c1c63b04..3e42526e 100644 --- a/pkg/cloud-provider/cloudapi/azure/azure_test.go +++ b/pkg/cloud-provider/cloudapi/azure/azure_test.go @@ -17,7 +17,7 @@ package azure import ( "context" "fmt" - + "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2021-03-01/network" "github.com/Azure/azure-sdk-for-go/services/resourcegraph/mgmt/2021-03-01/resourcegraph" "github.com/Azure/go-autorest/autorest" "github.com/golang/mock/gomock" @@ -33,6 +33,10 @@ import ( "antrea.io/nephe/pkg/cloud-provider/cloudapi/common" ) +var ( + region = "eastus" +) + var _ = Describe("Azure", func() { var ( testAccountNamespacedName = &types.NamespacedName{Namespace: "namespace01", Name: "account01"} @@ -41,7 +45,7 @@ var _ = Describe("Azure", func() { testClientID = "ClientID" testClientKey = "ClientKey" testTenantID = "TenantID" - testRegion = "eastus" + testRegion = region testRG = "testRG" azureProvideType = "Azure" @@ -145,7 +149,6 @@ var _ = Describe("Azure", func() { mockazureService.EXPECT().applicationSecurityGroups(gomock.Any()).Return(mockazureAsgWrapper, nil).AnyTimes() mockazureService.EXPECT().virtualNetworks(gomock.Any()).Return(mockazureVirtualNetworksWrapper, nil).AnyTimes() mockazureService.EXPECT().resourceGraph().Return(mockazureResourceGraph, nil).AnyTimes() - mockazureVirtualNetworksWrapper.EXPECT().listAllComplete(gomock.Any()).AnyTimes() mockazureResourceGraph.EXPECT().resources(gomock.Any(), gomock.Any()).Return(getResourceGraphResult(), nil).AnyTimes() fakeClient, c = setupClientAndCloud(mockAzureServiceHelper, account, secret) @@ -156,7 +159,72 @@ var _ = Describe("Azure", func() { mockCtrl.Finish() }) + Context("Account Add and Delete scenarios", func() { + It("On account add expect cloud api call for retrieving vpc list", func() { + vnetIDs := []string{"testVnetID01", "testVnetID02"} + mockazureVirtualNetworksWrapper.EXPECT().listAllComplete(gomock.Any()).Return(createVnetObject(vnetIDs), nil).AnyTimes() + credential := `{"accessKeyId": "keyId","accessKeySecret": "keySecret"}` + secret = &corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: testAccountNamespacedName.Name, + Namespace: testAccountNamespacedName.Namespace, + }, + Data: map[string][]byte{ + "credentials": []byte(credential), + }, + } + + _ = fakeClient.Create(context.Background(), secret) + c := newAzureCloud(mockAzureServiceHelper) + + err := c.AddProviderAccount(fakeClient, account) + Expect(err).Should(BeNil()) + accCfg, found := c.cloudCommon.GetCloudAccountByName(testAccountNamespacedName) + Expect(found).To(BeTrue()) + Expect(accCfg).To(Not(BeNil())) + + errPolAdd := c.AddInventoryPoller(testAccountNamespacedName) + Expect(errPolAdd).Should(BeNil()) + + vnetMap, err := c.GetVpcInventory(testAccountNamespacedName) + Expect(err).Should(BeNil()) + Expect(len(vnetMap)).Should(Equal(len(vnetIDs))) + }) + It("Stop cloud inventory poll on poller delete", func() { + vnetIDs := []string{"testVnetID01", "testVnetID02"} + mockazureVirtualNetworksWrapper.EXPECT().listAllComplete(gomock.Any()).Return(createVnetObject(vnetIDs), nil).MinTimes(1) + credential := `{"accessKeyId": "keyId","accessKeySecret": "keySecret"}` + secret = &corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: testAccountNamespacedName.Name, + Namespace: testAccountNamespacedName.Namespace, + }, + Data: map[string][]byte{ + "credentials": []byte(credential), + }, + } + + _ = fakeClient.Create(context.Background(), secret) + c := newAzureCloud(mockAzureServiceHelper) + + err := c.AddProviderAccount(fakeClient, account) + Expect(err).Should(BeNil()) + accCfg, found := c.cloudCommon.GetCloudAccountByName(testAccountNamespacedName) + Expect(found).To(BeTrue()) + Expect(accCfg).To(Not(BeNil())) + + errPolAdd := c.AddInventoryPoller(testAccountNamespacedName) + Expect(errPolAdd).Should(BeNil()) + errPolDel := c.DeleteInventoryPoller(testAccountNamespacedName) + Expect(errPolDel).Should(BeNil()) + mockazureVirtualNetworksWrapper.EXPECT().listAllComplete(gomock.Any()).Return(createVnetObject(vnetIDs), nil).MinTimes(0) + }) + }) Context("VM Selector scenarios", func() { + BeforeEach(func() { + mockazureVirtualNetworksWrapper.EXPECT().listAllComplete(gomock.Any()).AnyTimes() + }) + It("Should match expected filter - single vpcID only match", func() { vnetIDs = []string{testVnetID01} var expectedQueryStrs []*string @@ -190,191 +258,191 @@ var _ = Describe("Azure", func() { _ = c.GetEnforcedSecurity() }) - }) - It("Should match expected filter with credential - multiple vpcID only match", func() { - vnetIDs = []string{testVnetID01, testVnetID02} - var expectedQueryStrs []*string - expectedQueryStr, _ := getVMsByVnetIDsMatchQuery(vnetIDs, - subIDs, tenantIDs, locations) - expectedQueryStrs = append(expectedQueryStrs, expectedQueryStr) - vmSelector := []v1alpha1.VirtualMachineSelector{ - { - VpcMatch: &v1alpha1.EntityMatch{MatchID: testVnetID01}, - VMMatch: []v1alpha1.EntityMatch{}, - }, - { - VpcMatch: &v1alpha1.EntityMatch{MatchID: testVnetID02}, - VMMatch: []v1alpha1.EntityMatch{}, - }, - } + It("Should match expected filter with credential - multiple vpcID only match", func() { + vnetIDs = []string{testVnetID01, testVnetID02} + var expectedQueryStrs []*string + expectedQueryStr, _ := getVMsByVnetIDsMatchQuery(vnetIDs, + subIDs, tenantIDs, locations) + expectedQueryStrs = append(expectedQueryStrs, expectedQueryStr) + vmSelector := []v1alpha1.VirtualMachineSelector{ + { + VpcMatch: &v1alpha1.EntityMatch{MatchID: testVnetID01}, + VMMatch: []v1alpha1.EntityMatch{}, + }, + { + VpcMatch: &v1alpha1.EntityMatch{MatchID: testVnetID02}, + VMMatch: []v1alpha1.EntityMatch{}, + }, + } - selector.Spec.VMSelector = vmSelector - selector.Name = "multiple-vpcIDOnly" - err := c.AddAccountResourceSelector(testAccountNamespacedName, selector) - Expect(err).Should(BeNil()) + selector.Spec.VMSelector = vmSelector + selector.Name = "multiple-vpcIDOnly" + err := c.AddAccountResourceSelector(testAccountNamespacedName, selector) + Expect(err).Should(BeNil()) - filters := getFilters(c, selector.Name) - Expect(filters).To(Equal(expectedQueryStrs)) + filters := getFilters(c, selector.Name) + Expect(filters).To(Equal(expectedQueryStrs)) - c.RemoveAccountResourcesSelector(testAccountNamespacedName, selector.Name) - expectedQueryStrs = expectedQueryStrs[:len(expectedQueryStrs)-1] - filters = getFilters(c, selector.Name) - Expect(len(filters)).To(Equal(len(expectedQueryStrs))) - }) + c.RemoveAccountResourcesSelector(testAccountNamespacedName, selector.Name) + expectedQueryStrs = expectedQueryStrs[:len(expectedQueryStrs)-1] + filters = getFilters(c, selector.Name) + Expect(len(filters)).To(Equal(len(expectedQueryStrs))) + }) - It("Should match expected filter - multiple with one all", func() { - var expectedQueryStrs []*string - vmSelector := []v1alpha1.VirtualMachineSelector{ - { - VpcMatch: &v1alpha1.EntityMatch{MatchID: testVnetID01}, - }, - { - VpcMatch: nil, - VMMatch: []v1alpha1.EntityMatch{}, - }, - } - selector.Spec.VMSelector = vmSelector - selector.Name = "multiple-with-one-all" - err := c.AddAccountResourceSelector(testAccountNamespacedName, selector) - Expect(err).Should(BeNil()) + It("Should match expected filter - multiple with one all", func() { + var expectedQueryStrs []*string + vmSelector := []v1alpha1.VirtualMachineSelector{ + { + VpcMatch: &v1alpha1.EntityMatch{MatchID: testVnetID01}, + }, + { + VpcMatch: nil, + VMMatch: []v1alpha1.EntityMatch{}, + }, + } + selector.Spec.VMSelector = vmSelector + selector.Name = "multiple-with-one-all" + err := c.AddAccountResourceSelector(testAccountNamespacedName, selector) + Expect(err).Should(BeNil()) - filters := getFilters(c, selector.Name) - Expect(len(filters)).To(Equal(len(expectedQueryStrs))) - }) + filters := getFilters(c, selector.Name) + Expect(len(filters)).To(Equal(len(expectedQueryStrs))) + }) - It("Should match expected filter - single VMID only match", func() { - vmIDs = []string{testVMID01} - var expectedQueryStrs []*string - expectedQueryStr, _ := getVMsByVMIDsMatchQuery(vmIDs, - subIDs, tenantIDs, locations) - expectedQueryStrs = append(expectedQueryStrs, expectedQueryStr) - vmSelector := []v1alpha1.VirtualMachineSelector{ - { - VMMatch: []v1alpha1.EntityMatch{{MatchID: testVMID01}}, - }, - } + It("Should match expected filter - single VMID only match", func() { + vmIDs = []string{testVMID01} + var expectedQueryStrs []*string + expectedQueryStr, _ := getVMsByVMIDsMatchQuery(vmIDs, + subIDs, tenantIDs, locations) + expectedQueryStrs = append(expectedQueryStrs, expectedQueryStr) + vmSelector := []v1alpha1.VirtualMachineSelector{ + { + VMMatch: []v1alpha1.EntityMatch{{MatchID: testVMID01}}, + }, + } - selector.Spec.VMSelector = vmSelector - selector.Name = "VMIDOnly" - err := c.AddAccountResourceSelector(testAccountNamespacedName, selector) - Expect(err).Should(BeNil()) + selector.Spec.VMSelector = vmSelector + selector.Name = "VMIDOnly" + err := c.AddAccountResourceSelector(testAccountNamespacedName, selector) + Expect(err).Should(BeNil()) - filters := getFilters(c, selector.Name) - Expect(filters).To(Equal(expectedQueryStrs)) + filters := getFilters(c, selector.Name) + Expect(filters).To(Equal(expectedQueryStrs)) - c.RemoveAccountResourcesSelector(testAccountNamespacedName, selector.Name) - expectedQueryStrs = expectedQueryStrs[:len(expectedQueryStrs)-1] - filters = getFilters(c, selector.Name) - Expect(len(filters)).To(Equal(len(expectedQueryStrs))) - }) + c.RemoveAccountResourcesSelector(testAccountNamespacedName, selector.Name) + expectedQueryStrs = expectedQueryStrs[:len(expectedQueryStrs)-1] + filters = getFilters(c, selector.Name) + Expect(len(filters)).To(Equal(len(expectedQueryStrs))) + }) - It("Should match expected filter - single VM Name only match", func() { - vmNames := []string{testVM01} - var expectedQueryStrs []*string - expectedQueryStr, _ := getVMsByVMNamesMatchQuery(vmNames, - subIDs, tenantIDs, locations) - expectedQueryStrs = append(expectedQueryStrs, expectedQueryStr) + It("Should match expected filter - single VM Name only match", func() { + vmNames := []string{testVM01} + var expectedQueryStrs []*string + expectedQueryStr, _ := getVMsByVMNamesMatchQuery(vmNames, + subIDs, tenantIDs, locations) + expectedQueryStrs = append(expectedQueryStrs, expectedQueryStr) - vmSelector := []v1alpha1.VirtualMachineSelector{ - { - VMMatch: []v1alpha1.EntityMatch{{MatchName: testVM01}}, - }, - } + vmSelector := []v1alpha1.VirtualMachineSelector{ + { + VMMatch: []v1alpha1.EntityMatch{{MatchName: testVM01}}, + }, + } - selector.Spec.VMSelector = vmSelector - selector.Name = "VMNameOnly" - err := c.AddAccountResourceSelector(testAccountNamespacedName, selector) - Expect(err).Should(BeNil()) + selector.Spec.VMSelector = vmSelector + selector.Name = "VMNameOnly" + err := c.AddAccountResourceSelector(testAccountNamespacedName, selector) + Expect(err).Should(BeNil()) - filters := getFilters(c, selector.Name) - Expect(filters).To(Equal(expectedQueryStrs)) + filters := getFilters(c, selector.Name) + Expect(filters).To(Equal(expectedQueryStrs)) - c.RemoveAccountResourcesSelector(testAccountNamespacedName, selector.Name) - expectedQueryStrs = expectedQueryStrs[:len(expectedQueryStrs)-1] - filters = getFilters(c, selector.Name) - Expect(len(filters)).To(Equal(len(expectedQueryStrs))) - }) + c.RemoveAccountResourcesSelector(testAccountNamespacedName, selector.Name) + expectedQueryStrs = expectedQueryStrs[:len(expectedQueryStrs)-1] + filters = getFilters(c, selector.Name) + Expect(len(filters)).To(Equal(len(expectedQueryStrs))) + }) - It("Should match expected filter - vpcID with VMID match", func() { - vnetIDs = []string{testVnetID01} - vmIDs = []string{testVMID01} - vmNames := []string{} - var expectedQueryStrs []*string - expectedQueryStr, _ := getVMsByVnetAndOtherMatchesQuery(vnetIDs, vmNames, vmIDs, - subIDs, tenantIDs, locations) - expectedQueryStrs = append(expectedQueryStrs, expectedQueryStr) - vmSelector := []v1alpha1.VirtualMachineSelector{ - { - VMMatch: []v1alpha1.EntityMatch{{MatchID: testVMID01}}, - VpcMatch: &v1alpha1.EntityMatch{MatchID: testVnetID01}, - }, - } + It("Should match expected filter - vpcID with VMID match", func() { + vnetIDs = []string{testVnetID01} + vmIDs = []string{testVMID01} + vmNames := []string{} + var expectedQueryStrs []*string + expectedQueryStr, _ := getVMsByVnetAndOtherMatchesQuery(vnetIDs, vmNames, vmIDs, + subIDs, tenantIDs, locations) + expectedQueryStrs = append(expectedQueryStrs, expectedQueryStr) + vmSelector := []v1alpha1.VirtualMachineSelector{ + { + VMMatch: []v1alpha1.EntityMatch{{MatchID: testVMID01}}, + VpcMatch: &v1alpha1.EntityMatch{MatchID: testVnetID01}, + }, + } - selector.Spec.VMSelector = vmSelector - selector.Name = "vpcID-VMID" - err := c.AddAccountResourceSelector(testAccountNamespacedName, selector) - Expect(err).Should(BeNil()) + selector.Spec.VMSelector = vmSelector + selector.Name = "vpcID-VMID" + err := c.AddAccountResourceSelector(testAccountNamespacedName, selector) + Expect(err).Should(BeNil()) - filters := getFilters(c, selector.Name) - Expect(filters).To(Equal(expectedQueryStrs)) + filters := getFilters(c, selector.Name) + Expect(filters).To(Equal(expectedQueryStrs)) - c.RemoveAccountResourcesSelector(testAccountNamespacedName, selector.Name) - expectedQueryStrs = expectedQueryStrs[:len(expectedQueryStrs)-1] - filters = getFilters(c, selector.Name) - Expect(len(filters)).To(Equal(len(expectedQueryStrs))) - }) + c.RemoveAccountResourcesSelector(testAccountNamespacedName, selector.Name) + expectedQueryStrs = expectedQueryStrs[:len(expectedQueryStrs)-1] + filters = getFilters(c, selector.Name) + Expect(len(filters)).To(Equal(len(expectedQueryStrs))) + }) - It("Should match expected filter - vpcID with VM Name match", func() { - vnetIDs = []string{testVnetID01} - vmIDs = []string{} - vmNames := []string{testVM01} - var expectedQueryStrs []*string - - expectedQueryStr, _ := getVMsByVnetAndOtherMatchesQuery(vnetIDs, vmNames, vmIDs, - subIDs, tenantIDs, locations) - expectedQueryStrs = append(expectedQueryStrs, expectedQueryStr) - vmSelector := []v1alpha1.VirtualMachineSelector{ - { - VMMatch: []v1alpha1.EntityMatch{{MatchName: testVM01}}, - VpcMatch: &v1alpha1.EntityMatch{MatchID: testVnetID01}, - }, - } + It("Should match expected filter - vpcID with VM Name match", func() { + vnetIDs = []string{testVnetID01} + vmIDs = []string{} + vmNames := []string{testVM01} + var expectedQueryStrs []*string - selector.Spec.VMSelector = vmSelector - selector.Name = "vpcID-VMName" - err := c.AddAccountResourceSelector(testAccountNamespacedName, selector) - Expect(err).Should(BeNil()) + expectedQueryStr, _ := getVMsByVnetAndOtherMatchesQuery(vnetIDs, vmNames, vmIDs, + subIDs, tenantIDs, locations) + expectedQueryStrs = append(expectedQueryStrs, expectedQueryStr) + vmSelector := []v1alpha1.VirtualMachineSelector{ + { + VMMatch: []v1alpha1.EntityMatch{{MatchName: testVM01}}, + VpcMatch: &v1alpha1.EntityMatch{MatchID: testVnetID01}, + }, + } - filters := getFilters(c, selector.Name) - Expect(filters).To(Equal(expectedQueryStrs)) + selector.Spec.VMSelector = vmSelector + selector.Name = "vpcID-VMName" + err := c.AddAccountResourceSelector(testAccountNamespacedName, selector) + Expect(err).Should(BeNil()) - c.RemoveAccountResourcesSelector(testAccountNamespacedName, selector.Name) - expectedQueryStrs = expectedQueryStrs[:len(expectedQueryStrs)-1] - filters = getFilters(c, selector.Name) - Expect(len(filters)).To(Equal(len(expectedQueryStrs))) - }) + filters := getFilters(c, selector.Name) + Expect(filters).To(Equal(expectedQueryStrs)) - It("Update Secret", func() { - credential2 := fmt.Sprintf(`{"subscriptionId": "%s", + c.RemoveAccountResourcesSelector(testAccountNamespacedName, selector.Name) + expectedQueryStrs = expectedQueryStrs[:len(expectedQueryStrs)-1] + filters = getFilters(c, selector.Name) + Expect(len(filters)).To(Equal(len(expectedQueryStrs))) + }) + + It("Update Secret", func() { + credential2 := fmt.Sprintf(`{"subscriptionId": "%s", "clientId": "%s", "tenantId": "%s", "clientKey": "%s" }`, "testSubID01", "testClientID", "testTenantID", "testClientKey") - secret = &corev1.Secret{ - ObjectMeta: v1.ObjectMeta{ - Name: testAccountNamespacedName.Name, - Namespace: testAccountNamespacedName.Namespace, - }, - Data: map[string][]byte{ - "credentials": []byte(credential2), - }, - } - err := fakeClient.Update(context.Background(), secret) - Expect(err).Should(BeNil()) - err = c.AddProviderAccount(fakeClient, account) - Expect(err).Should(BeNil()) + secret = &corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: testAccountNamespacedName.Name, + Namespace: testAccountNamespacedName.Namespace, + }, + Data: map[string][]byte{ + "credentials": []byte(credential2), + }, + } + err := fakeClient.Update(context.Background(), secret) + Expect(err).Should(BeNil()) + err = c.AddProviderAccount(fakeClient, account) + Expect(err).Should(BeNil()) + }) }) Context("VM Provider scenarios", func() { @@ -420,3 +488,30 @@ func setupClientAndCloud(mockAzureServiceHelper *MockazureServicesHelper, accoun Expect(err).Should(BeNil()) return fakeClient, c } + +func createVnetObject(vnetIDs []string) []network.VirtualNetwork { + vnets := []network.VirtualNetwork{} + for i := range vnetIDs { + name := vnetIDs[i] + key := "Name" + value := vnetIDs[i] + tags := make(map[string]*string) + tags[key] = &value + addressPrefix := make([]string, 0) + prefix := "192.16.0.0/24" + addressPrefix = append(addressPrefix, prefix) + vnet := &network.VirtualNetwork{ + Location: ®ion, + ID: &vnetIDs[i], + Name: &name, + Tags: tags, + VirtualNetworkPropertiesFormat: &network.VirtualNetworkPropertiesFormat{ + AddressSpace: &network.AddressSpace{AddressPrefixes: &addressPrefix}, + VirtualNetworkPeerings: &[]network.VirtualNetworkPeering{}, + }, + } + vnets = append(vnets, *vnet) + } + + return vnets +} diff --git a/pkg/cloud-provider/cloudapi/common/cloud-mock_test.go b/pkg/cloud-provider/cloudapi/common/cloud-mock_test.go index af972c46..bb67e5b4 100644 --- a/pkg/cloud-provider/cloudapi/common/cloud-mock_test.go +++ b/pkg/cloud-provider/cloudapi/common/cloud-mock_test.go @@ -23,6 +23,7 @@ import ( reflect "reflect" v1alpha1 "antrea.io/nephe/apis/crd/v1alpha1" + v1alpha10 "antrea.io/nephe/apis/runtime/v1alpha1" securitygroup "antrea.io/nephe/pkg/cloud-provider/securitygroup" gomock "github.com/golang/mock/gomock" types "k8s.io/apimachinery/pkg/types" @@ -66,6 +67,20 @@ func (mr *MockCloudInterfaceMockRecorder) AddAccountResourceSelector(accNamespac return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddAccountResourceSelector", reflect.TypeOf((*MockCloudInterface)(nil).AddAccountResourceSelector), accNamespacedName, selector) } +// AddInventoryPoller mocks base method. +func (m *MockCloudInterface) AddInventoryPoller(accountNamespacedName *types.NamespacedName) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddInventoryPoller", accountNamespacedName) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddInventoryPoller indicates an expected call of AddInventoryPoller. +func (mr *MockCloudInterfaceMockRecorder) AddInventoryPoller(accountNamespacedName interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddInventoryPoller", reflect.TypeOf((*MockCloudInterface)(nil).AddInventoryPoller), accountNamespacedName) +} + // AddProviderAccount mocks base method. func (m *MockCloudInterface) AddProviderAccount(client client.Client, account *v1alpha1.CloudProviderAccount) error { m.ctrl.T.Helper() @@ -95,6 +110,20 @@ func (mr *MockCloudInterfaceMockRecorder) CreateSecurityGroup(addressGroupIdenti return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateSecurityGroup", reflect.TypeOf((*MockCloudInterface)(nil).CreateSecurityGroup), addressGroupIdentifier, membershipOnly) } +// DeleteInventoryPoller mocks base method. +func (m *MockCloudInterface) DeleteInventoryPoller(accountNamespacedName *types.NamespacedName) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteInventoryPoller", accountNamespacedName) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteInventoryPoller indicates an expected call of DeleteInventoryPoller. +func (mr *MockCloudInterfaceMockRecorder) DeleteInventoryPoller(accountNamespacedName interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteInventoryPoller", reflect.TypeOf((*MockCloudInterface)(nil).DeleteInventoryPoller), accountNamespacedName) +} + // DeleteSecurityGroup mocks base method. func (m *MockCloudInterface) DeleteSecurityGroup(addressGroupIdentifier *securitygroup.CloudResource, membershipOnly bool) error { m.ctrl.T.Helper() @@ -138,6 +167,21 @@ func (mr *MockCloudInterfaceMockRecorder) GetEnforcedSecurity() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetEnforcedSecurity", reflect.TypeOf((*MockCloudInterface)(nil).GetEnforcedSecurity)) } +// GetVpcInventory mocks base method. +func (m *MockCloudInterface) GetVpcInventory(accountNamespacedName *types.NamespacedName) (map[string]*v1alpha10.Vpc, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetVpcInventory", accountNamespacedName) + ret0, _ := ret[0].(map[string]*v1alpha10.Vpc) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetVpcInventory indicates an expected call of GetVpcInventory. +func (mr *MockCloudInterfaceMockRecorder) GetVpcInventory(accountNamespacedName interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetVpcInventory", reflect.TypeOf((*MockCloudInterface)(nil).GetVpcInventory), accountNamespacedName) +} + // Instances mocks base method. func (m *MockCloudInterface) Instances() ([]*v1alpha1.VirtualMachine, error) { m.ctrl.T.Helper() @@ -271,6 +315,20 @@ func (mr *MockAccountMgmtInterfaceMockRecorder) AddAccountResourceSelector(accNa return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddAccountResourceSelector", reflect.TypeOf((*MockAccountMgmtInterface)(nil).AddAccountResourceSelector), accNamespacedName, selector) } +// AddInventoryPoller mocks base method. +func (m *MockAccountMgmtInterface) AddInventoryPoller(accountNamespacedName *types.NamespacedName) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddInventoryPoller", accountNamespacedName) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddInventoryPoller indicates an expected call of AddInventoryPoller. +func (mr *MockAccountMgmtInterfaceMockRecorder) AddInventoryPoller(accountNamespacedName interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddInventoryPoller", reflect.TypeOf((*MockAccountMgmtInterface)(nil).AddInventoryPoller), accountNamespacedName) +} + // AddProviderAccount mocks base method. func (m *MockAccountMgmtInterface) AddProviderAccount(client client.Client, account *v1alpha1.CloudProviderAccount) error { m.ctrl.T.Helper() @@ -285,6 +343,20 @@ func (mr *MockAccountMgmtInterfaceMockRecorder) AddProviderAccount(client, accou return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddProviderAccount", reflect.TypeOf((*MockAccountMgmtInterface)(nil).AddProviderAccount), client, account) } +// DeleteInventoryPoller mocks base method. +func (m *MockAccountMgmtInterface) DeleteInventoryPoller(accountNamespacedName *types.NamespacedName) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteInventoryPoller", accountNamespacedName) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteInventoryPoller indicates an expected call of DeleteInventoryPoller. +func (mr *MockAccountMgmtInterfaceMockRecorder) DeleteInventoryPoller(accountNamespacedName interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteInventoryPoller", reflect.TypeOf((*MockAccountMgmtInterface)(nil).DeleteInventoryPoller), accountNamespacedName) +} + // GetAccountStatus mocks base method. func (m *MockAccountMgmtInterface) GetAccountStatus(accNamespacedName *types.NamespacedName) (*v1alpha1.CloudProviderAccountStatus, error) { m.ctrl.T.Helper() @@ -300,6 +372,21 @@ func (mr *MockAccountMgmtInterfaceMockRecorder) GetAccountStatus(accNamespacedNa return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAccountStatus", reflect.TypeOf((*MockAccountMgmtInterface)(nil).GetAccountStatus), accNamespacedName) } +// GetVpcInventory mocks base method. +func (m *MockAccountMgmtInterface) GetVpcInventory(accountNamespacedName *types.NamespacedName) (map[string]*v1alpha10.Vpc, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetVpcInventory", accountNamespacedName) + ret0, _ := ret[0].(map[string]*v1alpha10.Vpc) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetVpcInventory indicates an expected call of GetVpcInventory. +func (mr *MockAccountMgmtInterfaceMockRecorder) GetVpcInventory(accountNamespacedName interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetVpcInventory", reflect.TypeOf((*MockAccountMgmtInterface)(nil).GetVpcInventory), accountNamespacedName) +} + // RemoveAccountResourcesSelector mocks base method. func (m *MockAccountMgmtInterface) RemoveAccountResourcesSelector(accNamespacedName *types.NamespacedName, selector string) { m.ctrl.T.Helper() diff --git a/pkg/cloud-provider/cloudapi/common/cloud.go b/pkg/cloud-provider/cloudapi/common/cloud.go index fdef59b9..8a8c2813 100644 --- a/pkg/cloud-provider/cloudapi/common/cloud.go +++ b/pkg/cloud-provider/cloudapi/common/cloud.go @@ -16,14 +16,14 @@ package common import ( "reflect" - "sigs.k8s.io/controller-runtime/pkg/client" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" "antrea.io/nephe/apis/crd/v1alpha1" - "antrea.io/nephe/pkg/cloud-provider/securitygroup" - cloudv1alpha1 "antrea.io/nephe/apis/crd/v1alpha1" + runtimev1alpha1 "antrea.io/nephe/apis/runtime/v1alpha1" + "antrea.io/nephe/pkg/cloud-provider/securitygroup" ) var ( @@ -63,6 +63,12 @@ type AccountMgmtInterface interface { RemoveAccountResourcesSelector(accNamespacedName *types.NamespacedName, selector string) // GetAccountStatus gets accounts status. GetAccountStatus(accNamespacedName *types.NamespacedName) (*cloudv1alpha1.CloudProviderAccountStatus, error) + // AddInventoryPoller kicks start a periodic cloud inventory polling. + AddInventoryPoller(accountNamespacedName *types.NamespacedName) error + // DeleteInventoryPoller stops cloud inventory polling. + DeleteInventoryPoller(accountNamespacedName *types.NamespacedName) error + // GetVpcInventory gets vpc inventory from internal stored snapshot. + GetVpcInventory(accountNamespacedName *types.NamespacedName) (map[string]*runtimev1alpha1.Vpc, error) } // ComputeInterface is an abstract providing set of methods to get Instance details to be implemented by cloud providers. diff --git a/pkg/cloud-provider/cloudapi/internal/accounts_common.go b/pkg/cloud-provider/cloudapi/internal/accounts_common.go index a3735a9f..a0d78d7b 100644 --- a/pkg/cloud-provider/cloudapi/internal/accounts_common.go +++ b/pkg/cloud-provider/cloudapi/internal/accounts_common.go @@ -16,16 +16,16 @@ package internal import ( "fmt" - "sigs.k8s.io/controller-runtime/pkg/client" "sync" "time" "go.uber.org/multierr" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + "sigs.k8s.io/controller-runtime/pkg/client" cloudv1alpha1 "antrea.io/nephe/apis/crd/v1alpha1" - + runtimev1alpha1 "antrea.io/nephe/apis/runtime/v1alpha1" "antrea.io/nephe/pkg/logging" ) @@ -37,6 +37,7 @@ type CloudAccountInterface interface { startPeriodicInventorySync() error stopPeriodicInventorySync() + GetVpcInventory() map[string]*runtimev1alpha1.Vpc } type cloudAccountConfig struct { @@ -181,7 +182,7 @@ func (accCfg *cloudAccountConfig) performInventorySync() error { defer wg.Done() hasFilters, isFilterNil := serviceCfg.hasFiltersConfigured() - if !hasFilters { + if !hasFilters && !serviceCfg.needPollWithoutFilter() { accCfg.logger().Info("fetching resources from cloud skipped", "service", serviceCfg.getName(), "account", accCfg.namespacedName, "resource-filters", "not-configured") return @@ -272,3 +273,14 @@ func (accCfg *cloudAccountConfig) stopPeriodicInventorySync() { serviceConfig.resetCachedState() } } + +func (accCfg *cloudAccountConfig) GetVpcInventory() map[string]*runtimev1alpha1.Vpc { + serviceConfigs := accCfg.serviceConfigs + for _, serviceConfig := range serviceConfigs { + if serviceConfig.getType() == CloudServiceTypeCompute { + return serviceConfig.getVpcInventory() + } + } + + return nil +} diff --git a/pkg/cloud-provider/cloudapi/internal/cloud_common.go b/pkg/cloud-provider/cloudapi/internal/cloud_common.go index ab10179b..90f0999d 100644 --- a/pkg/cloud-provider/cloudapi/internal/cloud_common.go +++ b/pkg/cloud-provider/cloudapi/internal/cloud_common.go @@ -25,6 +25,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" cloudv1alpha1 "antrea.io/nephe/apis/crd/v1alpha1" + runtimev1alpha1 "antrea.io/nephe/apis/runtime/v1alpha1" "antrea.io/nephe/pkg/logging" ) @@ -55,6 +56,12 @@ type CloudCommonInterface interface { RemoveSelector(accNamespacedName *types.NamespacedName, selectorName string) GetStatus(accNamespacedName *types.NamespacedName) (*cloudv1alpha1.CloudProviderAccountStatus, error) + + AddInventoryPoller(accountNamespacedName *types.NamespacedName) error + + DeleteInventoryPoller(accountNamespacedName *types.NamespacedName) error + + GetVpcInventory(accountNamespacedName *types.NamespacedName) (map[string]*runtimev1alpha1.Vpc, error) } type cloudCommon struct { @@ -198,6 +205,7 @@ func (c *cloudCommon) AddSelector(accountNamespacedName *types.NamespacedName, s serviceCfg.setResourceFilters(selector) } + // Invoke this function to force inventory scan for vms soon after CES add. err := accCfg.startPeriodicInventorySync() if err != nil { return fmt.Errorf("inventory sync failed [ account : %v, err: %v ]", *accountNamespacedName, err) @@ -216,8 +224,6 @@ func (c *cloudCommon) RemoveSelector(accNamespacedName *types.NamespacedName, se for _, serviceCfg := range accCfg.GetServiceConfigs() { serviceCfg.removeResourceFilters(selectorName) } - - accCfg.stopPeriodicInventorySync() } func (c *cloudCommon) GetStatus(accountNamespacedName *types.NamespacedName) (*cloudv1alpha1.CloudProviderAccountStatus, error) { @@ -228,3 +234,36 @@ func (c *cloudCommon) GetStatus(accountNamespacedName *types.NamespacedName) (*c return accCfg.GetStatus(), nil } + +func (c *cloudCommon) AddInventoryPoller(accountNamespacedName *types.NamespacedName) error { + accCfg, found := c.GetCloudAccountByName(accountNamespacedName) + if !found { + return fmt.Errorf("account not found %v", *accountNamespacedName) + } + + err := accCfg.startPeriodicInventorySync() + if err != nil { + return fmt.Errorf("inventory sync failed [ account : %v, err: %v ]", *accountNamespacedName, err) + } + + return nil +} + +func (c *cloudCommon) DeleteInventoryPoller(accountNamespacedName *types.NamespacedName) error { + accCfg, found := c.GetCloudAccountByName(accountNamespacedName) + if !found { + return fmt.Errorf("account not found %v", *accountNamespacedName) + } + + accCfg.stopPeriodicInventorySync() + return nil +} + +func (c *cloudCommon) GetVpcInventory(accountNamespacedName *types.NamespacedName) (map[string]*runtimev1alpha1.Vpc, error) { + accCfg, found := c.GetCloudAccountByName(accountNamespacedName) + if !found { + return nil, fmt.Errorf("unable to find cloud account:%v", *accountNamespacedName) + } + + return accCfg.GetVpcInventory(), nil +} diff --git a/pkg/cloud-provider/cloudapi/internal/services_common.go b/pkg/cloud-provider/cloudapi/internal/services_common.go index f0f045c3..9cadaf08 100644 --- a/pkg/cloud-provider/cloudapi/internal/services_common.go +++ b/pkg/cloud-provider/cloudapi/internal/services_common.go @@ -19,6 +19,7 @@ import ( "time" cloudv1alpha1 "antrea.io/nephe/apis/crd/v1alpha1" + runtimev1alpha1 "antrea.io/nephe/apis/runtime/v1alpha1" ) type CloudServiceName string @@ -61,6 +62,10 @@ type CloudServiceInterface interface { GetType() CloudServiceType // ResetCachedState clears any internal state build by the service as part of cloud resource discovery. ResetCachedState() + // NeedPollWithoutFilter is set to true to enable cloud inventory polling even when match selectors are not present. + NeedPollWithoutFilter() bool + // GetVpcInventory copies VPCs stored in internal snapshot(in cloud specific format) to kubernetes runtimev1alpha1.Vpc format. + GetVpcInventory() map[string]*runtimev1alpha1.Vpc } func (cfg *CloudServiceCommon) updateServiceConfig(newConfig CloudServiceInterface) { @@ -127,6 +132,20 @@ func (cfg *CloudServiceCommon) resetCachedState() { cfg.serviceInterface.ResetCachedState() } +func (cfg *CloudServiceCommon) needPollWithoutFilter() bool { + cfg.mutex.Lock() + defer cfg.mutex.Unlock() + + return cfg.serviceInterface.NeedPollWithoutFilter() +} + +func (cfg *CloudServiceCommon) getVpcInventory() map[string]*runtimev1alpha1.Vpc { + cfg.mutex.Lock() + defer cfg.mutex.Unlock() + + return cfg.serviceInterface.GetVpcInventory() +} + type CloudServiceResourceCRDs struct { virtualMachines []*cloudv1alpha1.VirtualMachine } diff --git a/pkg/controllers/cloud/account_poller.go b/pkg/controllers/cloud/account_poller.go index 532a38d5..05e5d601 100644 --- a/pkg/controllers/cloud/account_poller.go +++ b/pkg/controllers/cloud/account_poller.go @@ -18,6 +18,8 @@ import ( "context" "fmt" "reflect" + "strings" + "sync" "github.com/go-logr/logr" "go.uber.org/multierr" @@ -26,11 +28,11 @@ import ( "k8s.io/client-go/tools/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - "strings" cloudv1alpha1 "antrea.io/nephe/apis/crd/v1alpha1" cloudprovider "antrea.io/nephe/pkg/cloud-provider" "antrea.io/nephe/pkg/cloud-provider/cloudapi/common" + "antrea.io/nephe/pkg/logging" ) const ( @@ -52,6 +54,15 @@ type accountPoller struct { ch chan struct{} } +var ( + accPollers map[types.NamespacedName]*accountPoller + accPollerMutex sync.Mutex +) + +func init() { + accPollers = make(map[types.NamespacedName]*accountPoller) +} + func (p *accountPoller) doAccountPoller() { cloudInterface, e := cloudprovider.GetCloudInterface(common.ProviderType(p.cloudType)) if e != nil { @@ -76,11 +87,25 @@ func (p *accountPoller) doAccountPoller() { if e != nil { p.log.Info("failed to update account status", "account", p.namespacedName, "err", e) } - virtualMachines := p.getComputeResources(cloudInterface) - e = p.doVirtualMachineOperations(virtualMachines) + vpcMap, e := cloudInterface.GetVpcInventory(p.namespacedName) if e != nil { - p.log.Info("failed to perform virtual-machine operations", "account", p.namespacedName, "error", e) + p.log.Info("failed to fetch cloud vpc list from internal snapshot", "account", + p.namespacedName.String(), "error", e) + } + + err := buildVpcCache(vpcMap, p.namespacedName, string(account.UID), p.log) + if err != nil { + p.log.Info("failed to build vpc inventory", "account", p.namespacedName.String(), "error", err) + } + + // Perform VM Operations only when CES is added. + if p.selector != nil { + virtualMachines := p.getComputeResources(cloudInterface) + e = p.doVirtualMachineOperations(virtualMachines) + if e != nil { + p.log.Info("failed to perform virtual-machine operations", "account", p.namespacedName, "error", e) + } } } @@ -346,6 +371,52 @@ func updateAccountStatus(current, discovered *cloudv1alpha1.CloudProviderAccount current.Error = discovered.Error } +func addAccountPoller(cloudType cloudv1alpha1.CloudProvider, namespacedName *types.NamespacedName, + account *cloudv1alpha1.CloudProviderAccount, r *CloudProviderAccountReconciler) (*accountPoller, bool) { + accPollerMutex.Lock() + defer accPollerMutex.Unlock() + + if pollerScope, exists := accPollers[*namespacedName]; exists { + r.Log.Info("poller exists", "account", namespacedName) + return pollerScope, exists + } + + poller := &accountPoller{ + Client: r.Client, + scheme: r.Scheme, + log: logging.GetLogger("poller").WithName("AccountPoller"), + pollIntvInSeconds: *account.Spec.PollIntervalInSeconds, + cloudType: cloudType, + namespacedName: namespacedName, + selector: nil, + ch: make(chan struct{}), + } + accPollers[*namespacedName] = poller + + r.Log.Info("poller will be created", "account", namespacedName) + return poller, false +} + +func removeAccountPoller(namespacedName *types.NamespacedName) { + accPollerMutex.Lock() + defer accPollerMutex.Unlock() + + poller, found := accPollers[*namespacedName] + if found { + close(poller.ch) + delete(accPollers, *namespacedName) + } +} + +func getAccountPoller(name *types.NamespacedName) (*accountPoller, bool) { + accPollerMutex.Lock() + defer accPollerMutex.Unlock() + if pollerScope, exists := accPollers[*name]; exists { + return pollerScope, true + } + return nil, false +} + // createVirtualMachineCR creates VirtualMachine CR and updates the status. func (p *accountPoller) createVirtualMachineCR(vm *cloudv1alpha1.VirtualMachine) (err error) { err = controllerutil.SetControllerReference(p.selector, vm, p.scheme) diff --git a/pkg/controllers/cloud/account_poller_test.go b/pkg/controllers/cloud/account_poller_test.go new file mode 100644 index 00000000..4d5705bb --- /dev/null +++ b/pkg/controllers/cloud/account_poller_test.go @@ -0,0 +1,127 @@ +// Copyright 2022 Antrea Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cloud + +import ( + "context" + "sync" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + "antrea.io/nephe/apis/crd/v1alpha1" + "antrea.io/nephe/pkg/cloud-provider/cloudapi/common" + "antrea.io/nephe/pkg/controllers/utils" +) + +var _ = Describe("Account poller", func() { + Context("Account poller workflow", func() { + var ( + credentials = "credentials" + testAccountNamespacedName = types.NamespacedName{Namespace: "namespace01", Name: "account01"} + testSecretNamespacedName = types.NamespacedName{Namespace: "namespace01", Name: "secret01"} + account *v1alpha1.CloudProviderAccount + reconciler *CloudProviderAccountReconciler + secret *corev1.Secret + fakeClient client.WithWatch + ) + + BeforeEach(func() { + newScheme := runtime.NewScheme() + utilruntime.Must(clientgoscheme.AddToScheme(newScheme)) + utilruntime.Must(v1alpha1.AddToScheme(newScheme)) + + fakeClient = fake.NewClientBuilder().WithScheme(newScheme).Build() + reconciler = &CloudProviderAccountReconciler{ + Log: logf.Log, + Client: fakeClient, + Scheme: scheme, + mutex: sync.Mutex{}, + accountProviderType: make(map[types.NamespacedName]common.ProviderType), + } + var pollIntv uint = 1 + account = &v1alpha1.CloudProviderAccount{ + ObjectMeta: v1.ObjectMeta{ + Name: testAccountNamespacedName.Name, + Namespace: testAccountNamespacedName.Namespace, + }, + Spec: v1alpha1.CloudProviderAccountSpec{ + PollIntervalInSeconds: &pollIntv, + AWSConfig: &v1alpha1.CloudProviderAccountAWSConfig{ + Region: "us-east-1", + SecretRef: &v1alpha1.SecretReference{ + Name: testSecretNamespacedName.Name, + Namespace: testSecretNamespacedName.Namespace, + Key: credentials, + }, + }, + }, + } + + credential := `{"accessKeyId": "keyId","accessKeySecret": "keySecret"}` + secret = &corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: testSecretNamespacedName.Name, + Namespace: testSecretNamespacedName.Namespace, + }, + Data: map[string][]byte{ + "credentials": []byte(credential), + }, + } + }) + It("Account poller Add and Delete workflow", func() { + _ = fakeClient.Create(context.Background(), secret) + _ = fakeClient.Create(context.Background(), account) + + accountCloudType, err := utils.GetAccountProviderType(account) + Expect(err).ShouldNot(HaveOccurred()) + accPoller, exists := addAccountPoller(accountCloudType, &testAccountNamespacedName, account, reconciler) + Expect(accPoller).To(Not(BeNil())) + Expect(exists).To(BeFalse()) + + accPoller, exists = getAccountPoller(&testAccountNamespacedName) + Expect(accPoller).To(Not(BeNil())) + Expect(exists).To(BeTrue()) + + removeAccountPoller(&testAccountNamespacedName) + accPoller, exists = getAccountPoller(&testAccountNamespacedName) + Expect(accPoller).To(BeNil()) + Expect(exists).To(BeFalse()) + }) + It("Account poller re-add", func() { + _ = fakeClient.Create(context.Background(), secret) + _ = fakeClient.Create(context.Background(), account) + + accountCloudType, err := utils.GetAccountProviderType(account) + Expect(err).ShouldNot(HaveOccurred()) + accPoller, exists := addAccountPoller(accountCloudType, &testAccountNamespacedName, account, reconciler) + Expect(accPoller).To(Not(BeNil())) + Expect(exists).To(BeFalse()) + + accPoller, exists = addAccountPoller(accountCloudType, &testAccountNamespacedName, account, reconciler) + Expect(accPoller).To(Not(BeNil())) + Expect(exists).To(BeTrue()) + }) + }) +}) diff --git a/pkg/controllers/cloud/cloudentityselector_controller.go b/pkg/controllers/cloud/cloudentityselector_controller.go index 119d690b..bac85930 100644 --- a/pkg/controllers/cloud/cloudentityselector_controller.go +++ b/pkg/controllers/cloud/cloudentityselector_controller.go @@ -19,13 +19,11 @@ import ( "fmt" "strings" "sync" - "time" "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -33,7 +31,6 @@ import ( cloudv1alpha1 "antrea.io/nephe/apis/crd/v1alpha1" cloudprovider "antrea.io/nephe/pkg/cloud-provider" "antrea.io/nephe/pkg/cloud-provider/cloudapi/common" - "antrea.io/nephe/pkg/controllers/utils" ) const ( @@ -41,17 +38,18 @@ const ( virtualMachineSelectorMatchIndexerByID = "virtualmachine.selector.id" virtualMachineSelectorMatchIndexerByName = "virtualmachine.selector.name" virtualMachineSelectorMatchIndexerByVPC = "virtualmachine.selector.vpc.id" + errorMsgSelectorAddFail = "poller is nil, selector add failed" + errorMsgSelectorAccountMapNotFound = "failed to find account for selector" ) // CloudEntitySelectorReconciler reconciles a CloudEntitySelector object. // nolint:golint type CloudEntitySelectorReconciler struct { client.Client - Log logr.Logger - Scheme *runtime.Scheme - - mutex sync.Mutex - accPollers map[types.NamespacedName]*accountPoller + Log logr.Logger + Scheme *runtime.Scheme + mutex sync.Mutex + selectorToAccountMap map[types.NamespacedName]types.NamespacedName } // +kubebuilder:rbac:groups=crd.cloud.antrea.io,resources=cloudentityselectors,verbs=get;list;watch;create;update;patch;delete @@ -76,7 +74,8 @@ func (r *CloudEntitySelectorReconciler) Reconcile(ctx context.Context, req ctrl. } func (r *CloudEntitySelectorReconciler) SetupWithManager(mgr ctrl.Manager) error { - r.accPollers = make(map[types.NamespacedName]*accountPoller) + r.selectorToAccountMap = make(map[types.NamespacedName]types.NamespacedName) + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &cloudv1alpha1.VirtualMachine{}, virtualMachineIndexerByCloudAccount, func(obj client.Object) []string { vm := obj.(*cloudv1alpha1.VirtualMachine) @@ -95,7 +94,61 @@ func (r *CloudEntitySelectorReconciler) SetupWithManager(mgr ctrl.Manager) error func (r *CloudEntitySelectorReconciler) processCreateOrUpdate(selector *cloudv1alpha1.CloudEntitySelector, selectorNamespacedName *types.NamespacedName) error { - accPoller, preExists := r.addAccountPoller(selector) + accountNamespacedName := &types.NamespacedName{ + Namespace: selector.Namespace, + Name: selector.Spec.AccountName, + } + r.addSelectorAccountMapping(selectorNamespacedName, accountNamespacedName) + + accPoller, preExists := getAccountPoller(accountNamespacedName) + if accPoller == nil { + return fmt.Errorf("%s, account %s", errorMsgSelectorAddFail, accountNamespacedName.String()) + } + + // Populate selector specific fields in the accPoller created by CPA. + accPoller.selector = selector.DeepCopy() + + accPoller.vmSelector = cache.NewIndexer( + func(obj interface{}) (string, error) { + m := obj.(*cloudv1alpha1.VirtualMachineSelector) + // Create a unique key for each VirtualMachineSelector. + return fmt.Sprintf("%v-%v-%v", m.Agented, m.VpcMatch, m.VMMatch), nil + }, + cache.Indexers{ + virtualMachineSelectorMatchIndexerByID: func(obj interface{}) ([]string, error) { + m := obj.(*cloudv1alpha1.VirtualMachineSelector) + if len(m.VMMatch) == 0 { + return nil, nil + } + var match []string + for _, vmMatch := range m.VMMatch { + if len(vmMatch.MatchID) > 0 { + match = append(match, strings.ToLower(vmMatch.MatchID)) + } + } + return match, nil + }, + virtualMachineSelectorMatchIndexerByName: func(obj interface{}) ([]string, error) { + m := obj.(*cloudv1alpha1.VirtualMachineSelector) + if len(m.VMMatch) == 0 { + return nil, nil + } + var match []string + for _, vmMatch := range m.VMMatch { + if len(vmMatch.MatchName) > 0 { + match = append(match, strings.ToLower(vmMatch.MatchName)) + } + } + return match, nil + }, + virtualMachineSelectorMatchIndexerByVPC: func(obj interface{}) ([]string, error) { + m := obj.(*cloudv1alpha1.VirtualMachineSelector) + if m.VpcMatch != nil && len(m.VpcMatch.MatchID) > 0 { + return []string{strings.ToLower(m.VpcMatch.MatchID)}, nil + } + return nil, nil + }, + }) if selector.Spec.VMSelector != nil { // Indexer does not work with in-place update. Do delete->add. @@ -149,116 +202,50 @@ func (r *CloudEntitySelectorReconciler) processCreateOrUpdate(selector *cloudv1a return err } - if !preExists { - go wait.Until(accPoller.doAccountPoller, time.Duration(accPoller.pollIntvInSeconds)*time.Second, accPoller.ch) - } return nil } func (r *CloudEntitySelectorReconciler) processDelete(selectorNamespacedName *types.NamespacedName) error { - poller := r.removeAccountPoller(selectorNamespacedName) - if poller == nil { + accountNamespacedName := r.getSelectorAccountMapping(selectorNamespacedName) + if accountNamespacedName == nil { + return fmt.Errorf("%s %s", errorMsgSelectorAccountMapNotFound, selectorNamespacedName.String()) + } + tempAccountNamespacedName := *accountNamespacedName + r.deleteSelectorAccountMapping(selectorNamespacedName) + + accPoller, _ := getAccountPoller(&tempAccountNamespacedName) + if accPoller == nil { + r.Log.Info("account poller is already deleted", "accountNamespacedName", tempAccountNamespacedName.String()) return nil } - cloudInterface, err := cloudprovider.GetCloudInterface(common.ProviderType(poller.cloudType)) + cloudInterface, err := cloudprovider.GetCloudInterface(common.ProviderType(accPoller.cloudType)) if err != nil { return err } - cloudInterface.RemoveAccountResourcesSelector(poller.namespacedName, selectorNamespacedName.Name) + + cloudInterface.RemoveAccountResourcesSelector(&tempAccountNamespacedName, selectorNamespacedName.Name) return nil } -func (r *CloudEntitySelectorReconciler) addAccountPoller(selector *cloudv1alpha1.CloudEntitySelector) (*accountPoller, bool) { +func (r *CloudEntitySelectorReconciler) addSelectorAccountMapping(selectorNamespacedName *types.NamespacedName, + accountNamespacedName *types.NamespacedName) { r.mutex.Lock() defer r.mutex.Unlock() - - selectorNamespacedName := &types.NamespacedName{ - Namespace: selector.Namespace, - Name: selector.Name, - } - - if pollerScope, exists := r.accPollers[*selectorNamespacedName]; exists { - r.Log.Info("poller exists", "selector", selectorNamespacedName) - return pollerScope, exists - } - - accountNamespacedName := &types.NamespacedName{ - Namespace: selector.Namespace, - Name: selector.Spec.AccountName, - } - account := &cloudv1alpha1.CloudProviderAccount{} - _ = r.Get(context.TODO(), *accountNamespacedName, account) - accountCloudType, err := utils.GetAccountProviderType(account) - if err != nil { - return nil, false - } - poller := &accountPoller{ - Client: r.Client, - scheme: r.Scheme, - log: r.Log, - pollIntvInSeconds: *account.Spec.PollIntervalInSeconds, - cloudType: accountCloudType, - namespacedName: accountNamespacedName, - selector: selector.DeepCopy(), - ch: make(chan struct{}), - } - poller.vmSelector = cache.NewIndexer( - func(obj interface{}) (string, error) { - m := obj.(*cloudv1alpha1.VirtualMachineSelector) - // Create a unique key for each VirtualMachineSelector. - return fmt.Sprintf("%v-%v-%v", m.Agented, m.VpcMatch, m.VMMatch), nil - }, - cache.Indexers{ - virtualMachineSelectorMatchIndexerByID: func(obj interface{}) ([]string, error) { - m := obj.(*cloudv1alpha1.VirtualMachineSelector) - if len(m.VMMatch) == 0 { - return nil, nil - } - var match []string - for _, vmMatch := range m.VMMatch { - if len(vmMatch.MatchID) > 0 { - match = append(match, strings.ToLower(vmMatch.MatchID)) - } - } - return match, nil - }, - virtualMachineSelectorMatchIndexerByName: func(obj interface{}) ([]string, error) { - m := obj.(*cloudv1alpha1.VirtualMachineSelector) - if len(m.VMMatch) == 0 { - return nil, nil - } - var match []string - for _, vmMatch := range m.VMMatch { - if len(vmMatch.MatchName) > 0 { - match = append(match, strings.ToLower(vmMatch.MatchName)) - } - } - return match, nil - }, - virtualMachineSelectorMatchIndexerByVPC: func(obj interface{}) ([]string, error) { - m := obj.(*cloudv1alpha1.VirtualMachineSelector) - if m.VpcMatch != nil && len(m.VpcMatch.MatchID) > 0 { - return []string{strings.ToLower(m.VpcMatch.MatchID)}, nil - } - return nil, nil - }, - }) - - r.accPollers[*selectorNamespacedName] = poller - - r.Log.Info("poller will be created", "selector", selectorNamespacedName) - return poller, false + r.selectorToAccountMap[*selectorNamespacedName] = *accountNamespacedName } -func (r *CloudEntitySelectorReconciler) removeAccountPoller(selectorNamespacedName *types.NamespacedName) *accountPoller { +func (r *CloudEntitySelectorReconciler) deleteSelectorAccountMapping(selectorNamespacedName *types.NamespacedName) { r.mutex.Lock() defer r.mutex.Unlock() + delete(r.selectorToAccountMap, *selectorNamespacedName) +} - poller, found := r.accPollers[*selectorNamespacedName] - if found { - close(poller.ch) - delete(r.accPollers, *selectorNamespacedName) +func (r *CloudEntitySelectorReconciler) getSelectorAccountMapping(selectorNamespacedName *types.NamespacedName) *types.NamespacedName { + r.mutex.Lock() + defer r.mutex.Unlock() + if account, found := r.selectorToAccountMap[*selectorNamespacedName]; found { + return &account } - return poller + return nil } diff --git a/pkg/controllers/cloud/cloudentityselector_controller_test.go b/pkg/controllers/cloud/cloudentityselector_controller_test.go new file mode 100644 index 00000000..20a9df8b --- /dev/null +++ b/pkg/controllers/cloud/cloudentityselector_controller_test.go @@ -0,0 +1,178 @@ +// Copyright 2022 Antrea Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cloud + +import ( + "context" + "sync" + + mock "github.com/golang/mock/gomock" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + "antrea.io/nephe/apis/crd/v1alpha1" + "antrea.io/nephe/pkg/cloud-provider/cloudapi/common" + "antrea.io/nephe/pkg/testing/controllerruntimeclient" +) + +var _ = Describe("CloudEntitySelector Controller", func() { + Context("CES workflow", func() { + var ( + credentials = "credentials" + testAccountNamespacedName = types.NamespacedName{Namespace: "namespace01", Name: "account01"} + testSelectorNamespacedName = types.NamespacedName{Namespace: "namespace01", Name: "selector01"} + testSecretNamespacedName = types.NamespacedName{Namespace: "namespace01", Name: "secret01"} + account *v1alpha1.CloudProviderAccount + selector *v1alpha1.CloudEntitySelector + cesReconciler *CloudEntitySelectorReconciler + cpaReconciler *CloudProviderAccountReconciler + secret *corev1.Secret + fakeClient client.WithWatch + ) + BeforeEach(func() { + newScheme := runtime.NewScheme() + utilruntime.Must(clientgoscheme.AddToScheme(newScheme)) + utilruntime.Must(v1alpha1.AddToScheme(newScheme)) + + fakeClient = fake.NewClientBuilder().WithScheme(newScheme).Build() + mockCtrl = mock.NewController(GinkgoT()) + mockClient = controllerruntimeclient.NewMockClient(mockCtrl) + cpaReconciler = &CloudProviderAccountReconciler{ + Log: logf.Log, + Client: fakeClient, + Scheme: scheme, + mutex: sync.Mutex{}, + accountProviderType: make(map[types.NamespacedName]common.ProviderType), + } + cesReconciler = &CloudEntitySelectorReconciler{ + Log: logf.Log, + Client: fakeClient, + Scheme: scheme, + selectorToAccountMap: make(map[types.NamespacedName]types.NamespacedName), + } + accPollers = make(map[types.NamespacedName]*accountPoller) + + var pollIntv uint = 1 + account = &v1alpha1.CloudProviderAccount{ + ObjectMeta: v1.ObjectMeta{ + Name: testAccountNamespacedName.Name, + Namespace: testAccountNamespacedName.Namespace, + }, + Spec: v1alpha1.CloudProviderAccountSpec{ + PollIntervalInSeconds: &pollIntv, + AWSConfig: &v1alpha1.CloudProviderAccountAWSConfig{ + Region: "us-east-1", + SecretRef: &v1alpha1.SecretReference{ + Name: testSecretNamespacedName.Name, + Namespace: testSecretNamespacedName.Namespace, + Key: credentials, + }, + }, + }, + } + + credential := `{"accessKeyId": "keyId","accessKeySecret": "keySecret"}` + secret = &corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: testSecretNamespacedName.Name, + Namespace: testSecretNamespacedName.Namespace, + }, + Data: map[string][]byte{ + "credentials": []byte(credential), + }, + } + + selector = &v1alpha1.CloudEntitySelector{ + ObjectMeta: v1.ObjectMeta{ + Name: testSelectorNamespacedName.Name, + Namespace: testSelectorNamespacedName.Namespace, + OwnerReferences: []v1.OwnerReference{ + { + APIVersion: "crd.cloud.antrea.io/v1alpha1", + Kind: "CloudProviderAccount", + Name: testAccountNamespacedName.Name, + }, + }, + }, + Spec: v1alpha1.CloudEntitySelectorSpec{ + AccountName: testAccountNamespacedName.Name, + VMSelector: []v1alpha1.VirtualMachineSelector{ + { + VpcMatch: &v1alpha1.EntityMatch{ + MatchID: "xyzq", + }, + }, + }, + }, + } + + }) + It("CES Add and Delete workflow", func() { + _ = fakeClient.Create(context.Background(), secret) + + _ = fakeClient.Create(context.Background(), account) + + err := cpaReconciler.processCreate(&testAccountNamespacedName, account) + Expect(err).ShouldNot(HaveOccurred()) + + err = cesReconciler.processCreateOrUpdate(selector, &testSelectorNamespacedName) + Expect(err).ShouldNot(HaveOccurred()) + + err = cesReconciler.processDelete(&testSelectorNamespacedName) + Expect(err).ShouldNot(HaveOccurred()) + + err = cpaReconciler.processDelete(&testAccountNamespacedName) + Expect(err).ShouldNot(HaveOccurred()) + }) + It("CES add before CPA add", func() { + _ = fakeClient.Create(context.Background(), secret) + err := cesReconciler.processCreateOrUpdate(selector, &testSelectorNamespacedName) + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).Should(ContainSubstring(errorMsgSelectorAddFail)) + }) + + It("CPA delete before CES delete", func() { + _ = fakeClient.Create(context.Background(), secret) + _ = fakeClient.Create(context.Background(), account) + err := cpaReconciler.processCreate(&testAccountNamespacedName, account) + Expect(err).ShouldNot(HaveOccurred()) + + err = cesReconciler.processCreateOrUpdate(selector, &testSelectorNamespacedName) + Expect(err).ShouldNot(HaveOccurred()) + _ = fakeClient.Create(context.Background(), selector) + + err = cpaReconciler.processDelete(&testAccountNamespacedName) + Expect(err).ShouldNot(HaveOccurred()) + accPoller, exists := getAccountPoller(&testAccountNamespacedName) + Expect(accPoller).To(BeNil()) + Expect(exists).To(BeFalse()) + + err = cesReconciler.processDelete(&testSelectorNamespacedName) + Expect(err).ShouldNot(HaveOccurred()) + + err = cesReconciler.processDelete(&testSelectorNamespacedName) + Expect(err.Error()).Should(ContainSubstring(errorMsgSelectorAccountMapNotFound)) + }) + }) +}) diff --git a/pkg/controllers/cloud/cloudprovideraccount_controller.go b/pkg/controllers/cloud/cloudprovideraccount_controller.go index 05c9b924..6811b3c4 100644 --- a/pkg/controllers/cloud/cloudprovideraccount_controller.go +++ b/pkg/controllers/cloud/cloudprovideraccount_controller.go @@ -17,11 +17,13 @@ package cloud import ( "context" "sync" + "time" "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -85,15 +87,43 @@ func (r *CloudProviderAccountReconciler) processCreate(namespacedName *types.Nam if err != nil { return err } - return cloudInterface.AddProviderAccount(r.Client, account) + err = cloudInterface.AddProviderAccount(r.Client, account) + if err != nil { + return err + } + + err = cloudInterface.AddInventoryPoller(namespacedName) + if err != nil { + return err + } + accPoller, preExists := addAccountPoller(accountCloudType, namespacedName, account, r) + + if !preExists { + go wait.Until(accPoller.doAccountPoller, time.Duration(accPoller.pollIntvInSeconds)*time.Second, accPoller.ch) + } + + return nil } func (r *CloudProviderAccountReconciler) processDelete(namespacedName *types.NamespacedName) error { + r.Log.V(1).Info("Remove account poller", "account", namespacedName.String()) + removeAccountPoller(namespacedName) + + err := deleteVpcCache(namespacedName) + if err != nil { + return err + } + cloudType := r.getAccountProviderType(namespacedName) cloudInterface, err := cloudprovider.GetCloudInterface(cloudType) if err != nil { return err } + err = cloudInterface.DeleteInventoryPoller(namespacedName) + if err != nil { + return err + } + cloudInterface.RemoveProviderAccount(namespacedName) r.removeAccountProviderType(namespacedName) diff --git a/pkg/controllers/cloud/cloudprovideraccount_controller_test.go b/pkg/controllers/cloud/cloudprovideraccount_controller_test.go new file mode 100644 index 00000000..5d5e7d7b --- /dev/null +++ b/pkg/controllers/cloud/cloudprovideraccount_controller_test.go @@ -0,0 +1,151 @@ +// Copyright 2022 Antrea Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cloud + +import ( + "context" + "sync" + + mock "github.com/golang/mock/gomock" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + "antrea.io/nephe/apis/crd/v1alpha1" + cloudprovider "antrea.io/nephe/pkg/cloud-provider" + "antrea.io/nephe/pkg/cloud-provider/cloudapi/common" + "antrea.io/nephe/pkg/controllers/utils" + "antrea.io/nephe/pkg/testing/controllerruntimeclient" +) + +var _ = Describe("CloudProviderAccount Controller", func() { + Context("CPA workflow", func() { + var ( + credentials = "credentials" + testAccountNamespacedName = types.NamespacedName{Namespace: "namespace01", Name: "account01"} + testSecretNamespacedName = types.NamespacedName{Namespace: "namespace01", Name: "secret01"} + account *v1alpha1.CloudProviderAccount + reconciler *CloudProviderAccountReconciler + secret *corev1.Secret + fakeClient client.WithWatch + ) + + BeforeEach(func() { + newScheme := runtime.NewScheme() + utilruntime.Must(clientgoscheme.AddToScheme(newScheme)) + utilruntime.Must(v1alpha1.AddToScheme(newScheme)) + + fakeClient = fake.NewClientBuilder().WithScheme(newScheme).Build() + mockCtrl = mock.NewController(GinkgoT()) + mockClient = controllerruntimeclient.NewMockClient(mockCtrl) + reconciler = &CloudProviderAccountReconciler{ + Log: logf.Log, + Client: fakeClient, + Scheme: scheme, + mutex: sync.Mutex{}, + accountProviderType: make(map[types.NamespacedName]common.ProviderType), + } + accPollers = make(map[types.NamespacedName]*accountPoller) + + var pollIntv uint = 1 + account = &v1alpha1.CloudProviderAccount{ + ObjectMeta: v1.ObjectMeta{ + Name: testAccountNamespacedName.Name, + Namespace: testAccountNamespacedName.Namespace, + }, + Spec: v1alpha1.CloudProviderAccountSpec{ + PollIntervalInSeconds: &pollIntv, + AWSConfig: &v1alpha1.CloudProviderAccountAWSConfig{ + Region: "us-east-1", + SecretRef: &v1alpha1.SecretReference{ + Name: testSecretNamespacedName.Name, + Namespace: testSecretNamespacedName.Namespace, + Key: credentials, + }, + }, + }, + } + + credential := `{"accessKeyId": "keyId","accessKeySecret": "keySecret"}` + secret = &corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: testSecretNamespacedName.Name, + Namespace: testSecretNamespacedName.Namespace, + }, + Data: map[string][]byte{ + "credentials": []byte(credential), + }, + } + }) + + It("Account Add and Delete workflow", func() { + _ = fakeClient.Create(context.Background(), secret) + + err := reconciler.processCreate(&testAccountNamespacedName, account) + Expect(err).ShouldNot(HaveOccurred()) + + accPoller, exists := getAccountPoller(&testAccountNamespacedName) + Expect(accPoller).To(Not(BeNil())) + Expect(exists).To(BeTrue()) + + _ = fakeClient.Create(context.Background(), account) + + err = reconciler.processDelete(&testAccountNamespacedName) + Expect(err).ShouldNot(HaveOccurred()) + accPoller, exists = getAccountPoller(&testAccountNamespacedName) + Expect(accPoller).To(BeNil()) + Expect(exists).To(BeFalse()) + }) + It("Account add with unknown cloud type", func() { + account = &v1alpha1.CloudProviderAccount{ + ObjectMeta: v1.ObjectMeta{ + Name: testAccountNamespacedName.Name, + Namespace: testAccountNamespacedName.Namespace, + }, + Spec: v1alpha1.CloudProviderAccountSpec{}, + } + _ = fakeClient.Create(context.Background(), secret) + + err := reconciler.processCreate(&testAccountNamespacedName, account) + Expect(err.Error()).Should(ContainSubstring(utils.ErrorMsgUnknownCloudProvider)) + }) + It("Account create error due to no Secret", func() { + err := reconciler.processCreate(&testAccountNamespacedName, account) + Expect(err).Should(HaveOccurred()) + }) + It("Account delete error due to no account config entry", func() { + _ = fakeClient.Create(context.Background(), secret) + + err := reconciler.processCreate(&testAccountNamespacedName, account) + Expect(err).ShouldNot(HaveOccurred()) + + cloudType := reconciler.getAccountProviderType(&testAccountNamespacedName) + cloudInterface, err := cloudprovider.GetCloudInterface(cloudType) + Expect(err).ShouldNot(HaveOccurred()) + cloudInterface.RemoveProviderAccount(&testAccountNamespacedName) + + err = reconciler.processDelete(&testAccountNamespacedName) + Expect(err).Should(HaveOccurred()) + }) + }) +}) diff --git a/pkg/controllers/cloud/vpc_cache.go b/pkg/controllers/cloud/vpc_cache.go new file mode 100644 index 00000000..5b9a16d0 --- /dev/null +++ b/pkg/controllers/cloud/vpc_cache.go @@ -0,0 +1,120 @@ +// Copyright 2022 Antrea Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cloud + +import ( + "fmt" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" + + runtimev1alpha1 "antrea.io/nephe/apis/runtime/v1alpha1" +) + +var VpcIndexer cache.Indexer + +func init() { + defineIndexers() +} + +const ( + VpcIndexerByAccountNameSpacedName = "namespace.accountname" + VpcIndexerByAccountUUID = "account.uuid" + VpcIndexerByNamespace = "vpc.namespace" + VpcIndexerByName = "vpc.Name" + VpcIndexerByVpcID = "vpc.id" +) + +// defineIndexers defines an indexer. +func defineIndexers() { + VpcIndexer = cache.NewIndexer( + func(obj interface{}) (string, error) { + vpc := obj.(*runtimev1alpha1.Vpc) + return fmt.Sprintf("%v-%v", vpc.Annotations[VpcIndexerByAccountUUID], vpc.Info.Id), nil + }, + cache.Indexers{ + VpcIndexerByAccountNameSpacedName: func(obj interface{}) ([]string, error) { + vpc := obj.(*runtimev1alpha1.Vpc) + return []string{vpc.Annotations[VpcIndexerByAccountNameSpacedName]}, nil + }, + VpcIndexerByVpcID: func(obj interface{}) ([]string, error) { + vpc := obj.(*runtimev1alpha1.Vpc) + return []string{vpc.Info.Id}, nil + }, + VpcIndexerByNamespace: func(obj interface{}) ([]string, error) { + vpc := obj.(*runtimev1alpha1.Vpc) + return []string{vpc.Namespace}, nil + }, + VpcIndexerByName: func(obj interface{}) ([]string, error) { + vpc := obj.(*runtimev1alpha1.Vpc) + return []string{vpc.Name}, nil + }, + }) +} + +// buildVpcCache using vpc list from cloud, update global vpc cache(with vpcs applicable for the current cloud account). +func buildVpcCache(vpcMap map[string]*runtimev1alpha1.Vpc, namespacedName *types.NamespacedName, + uuid string, log logr.Logger) error { + vpcsInCache, err := VpcIndexer.ByIndex(VpcIndexerByAccountNameSpacedName, namespacedName.String()) + if err != nil { + return fmt.Errorf("fetching vpc list from global vpc cache failed, "+ + "index %v, error %v", *namespacedName, err) + } + + // Remove vpcs in global vpc cache which are not found in vpc list fetched from cloud. + for _, i := range vpcsInCache { + vpc := i.(*runtimev1alpha1.Vpc) + if _, found := vpcMap[vpc.Info.Id]; !found { + log.V(1).Info("Deleting a vpc from global vpc cache", "vpc id", vpc.Info.Id, "account", + vpc.Annotations[VpcIndexerByAccountNameSpacedName]) + if err := VpcIndexer.Delete(vpc); err != nil { + log.Error(err, "failed to delete entry from VpcIndexer", "vpc id", vpc.Info.Id, "account", + namespacedName.String()) + } + } + } + + for _, v := range vpcMap { + annotationsMap := map[string]string{ + VpcIndexerByAccountNameSpacedName: namespacedName.String(), + VpcIndexerByAccountUUID: uuid, + } + v.Annotations = annotationsMap + + err := VpcIndexer.Add(v) + if err != nil { + return fmt.Errorf("failed to add entry into VpcIndexer, vpc id %s, account %v, error %v", + v.Info.Id, *namespacedName, err) + } + } + + return nil +} + +// deleteVpcCache deletes all entries from global vpc cache. +func deleteVpcCache(namespacedName *types.NamespacedName) error { + vpcsInCache, err := VpcIndexer.ByIndex(VpcIndexerByAccountNameSpacedName, namespacedName.String()) + if err != nil { + return err + } + for _, i := range vpcsInCache { + vpc := i.(*runtimev1alpha1.Vpc) + if err := VpcIndexer.Delete(vpc); err != nil { + return fmt.Errorf("failed to delete entry from VpcIndexer, indexer %v, error %v", *namespacedName, err) + } + } + return nil +} diff --git a/pkg/controllers/cloud/vpc_cache_test.go b/pkg/controllers/cloud/vpc_cache_test.go new file mode 100644 index 00000000..cd0ebf78 --- /dev/null +++ b/pkg/controllers/cloud/vpc_cache_test.go @@ -0,0 +1,99 @@ +// Copyright 2022 Antrea Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cloud + +import ( + "fmt" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/types" + + runtimev1alpha1 "antrea.io/nephe/apis/runtime/v1alpha1" + "antrea.io/nephe/pkg/logging" +) + +var ( + testVpcID01 = "testVpcID01" + testVpcName01 = "testVpcName01" + testVpcID02 = "testVpcID02" + testVpcName02 = "testVpcName02" + namespace = "testNS" + accountName = "account01" + namespacedName = types.NamespacedName{Namespace: namespace, Name: accountName} + accountUUID = "xyz" + vpcCacheKey1 = fmt.Sprintf("%s-%s", accountUUID, testVpcID01) + vpcCacheKey2 = fmt.Sprintf("%s-%s", accountUUID, testVpcID02) +) + +var _ = Describe("Validate Vpc Cache", func() { + + It("Build vpc cache", func() { + vpcList1 := make(map[string]*runtimev1alpha1.Vpc) + vpcObj1 := new(runtimev1alpha1.Vpc) + vpcObj1.Name = "obj1" + vpcObj1.Namespace = namespace + vpcObj1.Info.Id = testVpcID01 + vpcObj1.Info.Name = testVpcName01 + vpcList1[testVpcID01] = vpcObj1 + vpcObj2 := new(runtimev1alpha1.Vpc) + vpcObj2.Name = "obj2" + vpcObj2.Namespace = namespace + vpcObj2.Info.Id = testVpcID02 + vpcObj2.Info.Name = testVpcName02 + vpcList1[testVpcID02] = vpcObj2 + + logr := logging.GetLogger("test") + err := buildVpcCache(vpcList1, &namespacedName, accountUUID, logr) + Expect(err).ShouldNot(HaveOccurred()) + + obj, _, err := VpcIndexer.GetByKey(vpcCacheKey1) + Expect(err).ShouldNot(HaveOccurred()) + vpc := obj.(*runtimev1alpha1.Vpc) + Expect(vpc.Info.Id).To(Equal(testVpcID01)) + + obj, _, err = VpcIndexer.GetByKey(vpcCacheKey2) + Expect(err).ShouldNot(HaveOccurred()) + vpc = obj.(*runtimev1alpha1.Vpc) + Expect(vpc.Info.Id).To(Equal(testVpcID02)) + + // When vpcList doesn't contain an object(vpc id testVpcID02) which is present in vpcCache, it is deleted from cache. + vpcList2 := make(map[string]*runtimev1alpha1.Vpc) + vpcObj := new(runtimev1alpha1.Vpc) + vpcObj.Name = "obj1" + vpcObj.Namespace = namespace + vpcObj.Info.Id = testVpcID01 + vpcObj.Info.Name = testVpcName01 + vpcList2[testVpcID01] = vpcObj + + err = buildVpcCache(vpcList2, &namespacedName, accountUUID, logr) + Expect(err).ShouldNot(HaveOccurred()) + + _, exist, err := VpcIndexer.GetByKey(vpcCacheKey1) + Expect(err).ShouldNot(HaveOccurred()) + Expect(exist).Should(BeTrue()) + _, exist, err = VpcIndexer.GetByKey(vpcCacheKey2) + Expect(err).ShouldNot(HaveOccurred()) + Expect(exist).Should(BeFalse()) + + // Delete vpc cache. + err = deleteVpcCache(&namespacedName) + Expect(err).ShouldNot(HaveOccurred()) + _, exist, err = VpcIndexer.GetByKey(vpcCacheKey1) + Expect(err).ShouldNot(HaveOccurred()) + Expect(exist).Should(BeFalse()) + }) + +})