Skip to content

Commit

Permalink
Implement vpc poller on account add.
Browse files Browse the repository at this point in the history
- Perform cloud inventory poll for vpcs on CPA add, vm instanes are skipped
  as configured filters are nil.
- Move account poller(fetches cloud data from internal snapshot) to CPA.
- Account poller is applicable for both vpc and vm instance purpose.
- Maintain vpc list globally using cache indexers.
- In CES, use existing account poller(created during CPA add).
- On CES delete, reset vmSelector filters and skip inventory poll for vms,
  do not distub inventory poll for vpcs.
- On CPA delete, stop cloud inventory poll and remove account poller.
- Add unit test the changes introduced.

Signed-off-by: Archana Holla <[email protected]>
  • Loading branch information
archanapholla committed Dec 20, 2022
1 parent 49aec49 commit f24f620
Show file tree
Hide file tree
Showing 22 changed files with 1,775 additions and 309 deletions.
42 changes: 42 additions & 0 deletions apis/runtime/v1alpha1/vpc_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2022 Antrea Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v1alpha1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type VpcInfo struct {
Name string
Id string
Tags map[string]string
Cidrs []string
}

// +kubebuilder:object:root=true
type Vpc struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Info VpcInfo `json:"spec,omitempty"`
}

// +kubebuilder:object:root=true
// VpcList is a list of Vpc objects.
type VpcList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []Vpc `json:"items"`
}
85 changes: 85 additions & 0 deletions apis/runtime/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions pkg/cloud-provider/cloudapi/aws/aws_cloudinterface_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"antrea.io/nephe/apis/crd/v1alpha1"
cloudv1alpha1 "antrea.io/nephe/apis/crd/v1alpha1"
runtimev1alpha1 "antrea.io/nephe/apis/runtime/v1alpha1"
cloudcommon "antrea.io/nephe/pkg/cloud-provider/cloudapi/common"
"antrea.io/nephe/pkg/cloud-provider/cloudapi/internal"
"antrea.io/nephe/pkg/logging"
Expand Down Expand Up @@ -103,3 +104,18 @@ func (c *awsCloud) RemoveAccountResourcesSelector(accNamespacedName *types.Names
func (c *awsCloud) GetAccountStatus(accNamespacedName *types.NamespacedName) (*cloudv1alpha1.CloudProviderAccountStatus, error) {
return c.cloudCommon.GetStatus(accNamespacedName)
}

// AddInventoryPoller adds poller for polling cloud inventory.
func (c *awsCloud) AddInventoryPoller(accountNamespacedName *types.NamespacedName) error {
return c.cloudCommon.AddInventoryPoller(accountNamespacedName)
}

// DeleteInventoryPoller deletes an existing poller created for polling cloud inventory.
func (c *awsCloud) DeleteInventoryPoller(accountNamespacedName *types.NamespacedName) error {
return c.cloudCommon.DeleteInventoryPoller(accountNamespacedName)
}

// GetVpcInventory pulls cloud vpc inventory from internal snapshot.
func (c *awsCloud) GetVpcInventory(accountNamespacedName *types.NamespacedName) (map[string]*runtimev1alpha1.Vpc, error) {
return c.cloudCommon.GetVpcInventory(accountNamespacedName)
}
94 changes: 82 additions & 12 deletions pkg/cloud-provider/cloudapi/aws/aws_ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/mohae/deepcopy"

"antrea.io/nephe/apis/crd/v1alpha1"
runtimev1alpha1 "antrea.io/nephe/apis/runtime/v1alpha1"
cloudcommon "antrea.io/nephe/pkg/cloud-provider/cloudapi/common"
"antrea.io/nephe/pkg/cloud-provider/cloudapi/internal"
)
Expand All @@ -40,11 +41,13 @@ type ec2ServiceConfig struct {
// - key with nil value indicates no filters. Get all instances for account.
// - key with "some-filter-string" value indicates some filter. Get instances matching those filters only.
instanceFilters map[string][][]*ec2.Filter
pollVpc bool
}

// ec2ResourcesCacheSnapshot holds the results from querying for all instances.
type ec2ResourcesCacheSnapshot struct {
instances map[cloudcommon.InstanceID]*ec2.Instance
vpcs []*ec2.Vpc
vpcIDs map[string]struct{}
vpcNameToID map[string]string
vpcPeers map[string][]string
Expand All @@ -63,6 +66,7 @@ func newEC2ServiceConfig(name string, service awsServiceClientCreateInterface) (
resourcesCache: &internal.CloudServiceResourcesCache{},
inventoryStats: &internal.CloudServiceStats{},
instanceFilters: make(map[string][][]*ec2.Filter),
pollVpc: true,
}
return config, nil
}
Expand Down Expand Up @@ -165,6 +169,22 @@ func (ec2Cfg *ec2ServiceConfig) getCachedVpcNameToID() map[string]string {
return vpcNameToIDCopy
}

// GetCachedVpcs returns VPCs from cached snapshot for the account.
func (ec2Cfg *ec2ServiceConfig) GetCachedVpcs() []*ec2.Vpc {
snapshot := ec2Cfg.resourcesCache.GetSnapshot()
if snapshot == nil {
awsPluginLogger().V(4).Info("cache snapshot nil", "service", awsComputeServiceNameEC2, "account", ec2Cfg.accountName)
return []*ec2.Vpc{}
}
vpcs := snapshot.(*ec2ResourcesCacheSnapshot).vpcs
vpcsToReturn := make([]*ec2.Vpc, 0, len(vpcs))
vpcsToReturn = append(vpcsToReturn, vpcs...)

awsPluginLogger().V(1).Info("cached vpcs", "service", awsComputeServiceNameEC2, "account", ec2Cfg.accountName,
"vpcs", len(vpcsToReturn))
return vpcsToReturn
}

// getVpcPeers returns all the peers of a vpc.
func (ec2Cfg *ec2ServiceConfig) getVpcPeers(vpcID string) []string {
snapshot := ec2Cfg.resourcesCache.GetSnapshot()
Expand Down Expand Up @@ -210,30 +230,32 @@ func (ec2Cfg *ec2ServiceConfig) getInstances() ([]*ec2.Instance, error) {
return instances, nil
}

// doInstancesInventoryWorker gets inventory from cloud for given cloud account.
// DoResourceInventory gets inventory from cloud for given cloud account.
func (ec2Cfg *ec2ServiceConfig) DoResourceInventory() error {
vpcs, _ := ec2Cfg.getVpcs()

instances, e := ec2Cfg.getInstances()
if e != nil {
awsPluginLogger().V(0).Info("error fetching ec2 instances", "account", ec2Cfg.accountName, "error", e)
} else {
exists := struct{}{}
vpcIDs := make(map[string]struct{})
instanceIDs := make(map[cloudcommon.InstanceID]*ec2.Instance)
vpcNameToID, _ := ec2Cfg.buildMapVpcNameToID()
vpcNameToID := ec2Cfg.buildMapVpcNameToID(vpcs)
vpcPeers, _ := ec2Cfg.buildMapVpcPeers()
for _, instance := range instances {
id := cloudcommon.InstanceID(strings.ToLower(aws.StringValue(instance.InstanceId)))
instanceIDs[id] = instance
vpcIDs[strings.ToLower(*instance.VpcId)] = exists
}
ec2Cfg.resourcesCache.UpdateSnapshot(&ec2ResourcesCacheSnapshot{instanceIDs, vpcIDs, vpcNameToID, vpcPeers})
ec2Cfg.resourcesCache.UpdateSnapshot(&ec2ResourcesCacheSnapshot{instanceIDs, vpcs, vpcIDs, vpcNameToID, vpcPeers})
}
ec2Cfg.inventoryStats.UpdateInventoryPollStats(e)

return e
}

// setInstanceFilters add/updates instances resource filter for the service.
// SetResourceFilters add/updates instances resource filter for the service.
func (ec2Cfg *ec2ServiceConfig) SetResourceFilters(selector *v1alpha1.CloudEntitySelector) {
if filters, found := convertSelectorToEC2InstanceFilters(selector); found {
ec2Cfg.instanceFilters[selector.GetName()] = filters
Expand Down Expand Up @@ -295,14 +317,9 @@ func (ec2Cfg *ec2ServiceConfig) UpdateServiceConfig(newConfig internal.CloudServ
ec2Cfg.apiClient = newEc2ServiceConfig.apiClient
}

func (ec2Cfg *ec2ServiceConfig) buildMapVpcNameToID() (map[string]string, error) {
func (ec2Cfg *ec2ServiceConfig) buildMapVpcNameToID(vpcs []*ec2.Vpc) map[string]string {
vpcNameToID := make(map[string]string)
result, err := ec2Cfg.apiClient.describeVpcsWrapper(nil)
if err != nil {
awsPluginLogger().V(0).Info("error describing vpcs", "error", err)
return vpcNameToID, err
}
for _, vpc := range result.Vpcs {
for _, vpc := range vpcs {
if len(vpc.Tags) == 0 {
awsPluginLogger().V(4).Info("vpc name not found", "account", ec2Cfg.accountName, "vpc", vpc)
continue
Expand All @@ -316,7 +333,7 @@ func (ec2Cfg *ec2ServiceConfig) buildMapVpcNameToID() (map[string]string, error)
}
vpcNameToID[vpcName] = *vpc.VpcId
}
return vpcNameToID, nil
return vpcNameToID
}

func (ec2Cfg *ec2ServiceConfig) buildMapVpcPeers() (map[string][]string, error) {
Expand All @@ -333,3 +350,56 @@ func (ec2Cfg *ec2ServiceConfig) buildMapVpcPeers() (map[string][]string, error)
}
return vpcPeers, nil
}

func (ec2Cfg *ec2ServiceConfig) NeedPollWithoutFilter() bool {
return ec2Cfg.pollVpc
}

func (ec2Cfg *ec2ServiceConfig) getVpcs() ([]*ec2.Vpc, error) {
result, err := ec2Cfg.apiClient.describeVpcsWrapper(nil)
if err != nil {
awsPluginLogger().V(0).Info("error describing vpcs", "account", ec2Cfg.accountName, "error", err)
return nil, err
}
return result.Vpcs, nil
}

func (ec2Cfg *ec2ServiceConfig) GetVpcInventory() map[string]*runtimev1alpha1.Vpc {
vpcs := ec2Cfg.GetCachedVpcs()

// Extract namespace from account namespaced name.
tokens := strings.Split(ec2Cfg.accountName, "/")
if len(tokens) != 2 {
awsPluginLogger().V(0).Error(fmt.Errorf("cannot extract namespace from account namespaced name"),
"nil", "account", ec2Cfg.accountName)
return nil
}
namespace := tokens[0]
// Convert to kubernetes object and return a map indexed using VPC ID.
vpcMap := map[string]*runtimev1alpha1.Vpc{}
for _, vpc := range vpcs {
vpcObj := new(runtimev1alpha1.Vpc)
vpcObj.Name = *vpc.VpcId
vpcObj.Namespace = namespace
vpcObj.Info.Id = *vpc.VpcId
if len(vpc.Tags) != 0 {
vpcObj.Info.Tags = make(map[string]string)
for _, tag := range vpc.Tags {
vpcObj.Info.Tags[*(tag.Key)] = *(tag.Value)
}
if value, found := vpcObj.Info.Tags["Name"]; found {
vpcObj.Info.Name = value
}
}
if len(vpc.CidrBlockAssociationSet) != 0 {
vpcObj.Info.Cidrs = make([]string, 0)
for _, cidr := range vpc.CidrBlockAssociationSet {
vpcObj.Info.Cidrs = append(vpcObj.Info.Cidrs, *cidr.CidrBlock)
}
}

vpcMap[*vpc.VpcId] = vpcObj
}

return vpcMap
}
Loading

0 comments on commit f24f620

Please sign in to comment.