Skip to content

Commit

Permalink
compute rule update delta in plugins
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Liu <[email protected]>
  • Loading branch information
shenmo3 authored and reachjainrahul committed Jul 12, 2023
1 parent a306aee commit 9c6d262
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 123 deletions.
58 changes: 58 additions & 0 deletions pkg/cloudprovider/plugins/aws/aws_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package aws

import (
"fmt"
"net"
"strconv"
"strings"
Expand Down Expand Up @@ -137,6 +138,63 @@ func convertFromSecurityGroupPair(cloudGroups []*ec2.UserIdGroupPair, managedSGs
return cloudResourceIDs, desc
}

func convertIngressToIpPermission(rules []*cloudresource.CloudRule, cloudSGNameToObj map[string]*ec2.SecurityGroup) (
[]*ec2.IpPermission, error) {
ipPermissions := make([]*ec2.IpPermission, 0)
for _, obj := range rules {
rule := obj.Rule.(*cloudresource.IngressRule)
if rule == nil {
continue
}
description, err := utils.GenerateCloudDescription(obj.NpNamespacedName)
if err != nil {
return nil, fmt.Errorf("unable to generate rule description, err: %v", err)
}
idGroupPairs := buildEc2UserIDGroupPairs(rule.FromSecurityGroups, cloudSGNameToObj, &description)
ipv4Ranges, ipv6Ranges := convertToEc2IpRanges(rule.FromSrcIP, len(rule.FromSecurityGroups) > 0, &description)
startPort, endPort := convertToIPPermissionPort(rule.FromPort, rule.Protocol)
ipPermission := &ec2.IpPermission{
FromPort: startPort,
ToPort: endPort,
IpProtocol: convertToIPPermissionProtocol(rule.Protocol),
IpRanges: ipv4Ranges,
Ipv6Ranges: ipv6Ranges,
UserIdGroupPairs: idGroupPairs,
}
ipPermissions = append(ipPermissions, ipPermission)
}
return ipPermissions, nil
}

func convertEgressToIpPermission(rules []*cloudresource.CloudRule, cloudSGNameToObj map[string]*ec2.SecurityGroup) (
[]*ec2.IpPermission, error) {
ipPermissions := make([]*ec2.IpPermission, 0)
for _, obj := range rules {
rule := obj.Rule.(*cloudresource.EgressRule)
if rule == nil {
continue
}
description, err := utils.GenerateCloudDescription(obj.NpNamespacedName)
if err != nil {
return nil, fmt.Errorf("unable to generate rule description, err: %v", err)
}

idGroupPairs := buildEc2UserIDGroupPairs(rule.ToSecurityGroups, cloudSGNameToObj, &description)
ipv4Ranges, ipv6Ranges := convertToEc2IpRanges(rule.ToDstIP, len(rule.ToSecurityGroups) > 0, &description)
startPort, endPort := convertToIPPermissionPort(rule.ToPort, rule.Protocol)
ipPermission := &ec2.IpPermission{
FromPort: startPort,
ToPort: endPort,
IpProtocol: convertToIPPermissionProtocol(rule.Protocol),
IpRanges: ipv4Ranges,
Ipv6Ranges: ipv6Ranges,
UserIdGroupPairs: idGroupPairs,
}
ipPermissions = append(ipPermissions, ipPermission)
}
return ipPermissions, nil
}

// convertFromIngressIpPermissionToCloudRule converts AWS ingress rules from ec2.IpPermission to internal securitygroup.CloudRule.
// Each AT Sg can have one or more ANPs and an ANP can have one or more rules. Each rule can have a description.
func convertFromIngressIpPermissionToCloudRule(sgID string, ipPermissions []*ec2.IpPermission,
Expand Down
152 changes: 89 additions & 63 deletions pkg/cloudprovider/plugins/aws/aws_security.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package aws

import (
"fmt"
"reflect"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -206,100 +208,51 @@ func (ec2Cfg *ec2ServiceConfig) getCloudSecurityGroupsWithNameFromCloud(vpcIDs [
}

// realizeIngressIPPermissions invokes cloud api and realizes ingress rules on the cloud security group.
func (ec2Cfg *ec2ServiceConfig) realizeIngressIPPermissions(cloudSgObj *ec2.SecurityGroup, rules []*cloudresource.CloudRule,
cloudSGNameToObj map[string]*ec2.SecurityGroup, isDelete bool) error {
newIpPermissions := make([]*ec2.IpPermission, 0)
for _, obj := range rules {
rule := obj.Rule.(*cloudresource.IngressRule)
if rule == nil {
continue
}
description, err := utils.GenerateCloudDescription(obj.NpNamespacedName)
if err != nil {
return fmt.Errorf("unable to generate rule description, err: %v", err)
}
idGroupPairs := buildEc2UserIDGroupPairs(rule.FromSecurityGroups, cloudSGNameToObj, &description)
ipv4Ranges, ipv6Ranges := convertToEc2IpRanges(rule.FromSrcIP, len(rule.FromSecurityGroups) > 0, &description)
startPort, endPort := convertToIPPermissionPort(rule.FromPort, rule.Protocol)
ipPermission := &ec2.IpPermission{
FromPort: startPort,
ToPort: endPort,
IpProtocol: convertToIPPermissionProtocol(rule.Protocol),
IpRanges: ipv4Ranges,
Ipv6Ranges: ipv6Ranges,
UserIdGroupPairs: idGroupPairs,
}
newIpPermissions = append(newIpPermissions, ipPermission)
}

if len(newIpPermissions) == 0 {
func (ec2Cfg *ec2ServiceConfig) realizeIngressIPPermissions(cloudSgObj *ec2.SecurityGroup,
ipPermissions []*ec2.IpPermission, isDelete bool) error {
if len(ipPermissions) == 0 {
return nil
}

if isDelete {
awsPluginLogger().V(1).Info("Delete ingress rules", "rules", newIpPermissions)
awsPluginLogger().V(1).Info("Delete ingress rules", "rules", ipPermissions)
request := &ec2.RevokeSecurityGroupIngressInput{
GroupId: cloudSgObj.GroupId,
IpPermissions: newIpPermissions,
IpPermissions: ipPermissions,
}
_, err := ec2Cfg.apiClient.revokeSecurityGroupIngress(request)
return err
} else {
awsPluginLogger().V(1).Info("Add ingress rules", "rules", newIpPermissions)
awsPluginLogger().V(1).Info("Add ingress rules", "rules", ipPermissions)
request := &ec2.AuthorizeSecurityGroupIngressInput{
GroupId: cloudSgObj.GroupId,
IpPermissions: newIpPermissions,
IpPermissions: ipPermissions,
}
_, err := ec2Cfg.apiClient.authorizeSecurityGroupIngress(request)
return err
}
}

// realizeEgressIPPermissions invokes cloud api and realizes egress rules on the cloud security group.
func (ec2Cfg *ec2ServiceConfig) realizeEgressIPPermissions(cloudSgObj *ec2.SecurityGroup, rules []*cloudresource.CloudRule,
cloudSGNameToObj map[string]*ec2.SecurityGroup, isDelete bool) error {
newIpPermissions := make([]*ec2.IpPermission, 0)
for _, obj := range rules {
rule := obj.Rule.(*cloudresource.EgressRule)
if rule == nil {
continue
}
description, err := utils.GenerateCloudDescription(obj.NpNamespacedName)
if err != nil {
return fmt.Errorf("unable to generate rule description, err: %v", err)
}

idGroupPairs := buildEc2UserIDGroupPairs(rule.ToSecurityGroups, cloudSGNameToObj, &description)
ipv4Ranges, ipv6Ranges := convertToEc2IpRanges(rule.ToDstIP, len(rule.ToSecurityGroups) > 0, &description)
startPort, endPort := convertToIPPermissionPort(rule.ToPort, rule.Protocol)
ipPermission := &ec2.IpPermission{
FromPort: startPort,
ToPort: endPort,
IpProtocol: convertToIPPermissionProtocol(rule.Protocol),
IpRanges: ipv4Ranges,
Ipv6Ranges: ipv6Ranges,
UserIdGroupPairs: idGroupPairs,
}
newIpPermissions = append(newIpPermissions, ipPermission)
}

if len(newIpPermissions) == 0 {
func (ec2Cfg *ec2ServiceConfig) realizeEgressIPPermissions(cloudSgObj *ec2.SecurityGroup,
ipPermissions []*ec2.IpPermission, isDelete bool) error {
if len(ipPermissions) == 0 {
return nil
}

if isDelete {
awsPluginLogger().V(1).Info("Delete egress rules", "rule", newIpPermissions)
awsPluginLogger().V(1).Info("Delete egress rules", "rule", ipPermissions)
request := &ec2.RevokeSecurityGroupEgressInput{
GroupId: cloudSgObj.GroupId,
IpPermissions: newIpPermissions,
IpPermissions: ipPermissions,
}
_, err := ec2Cfg.apiClient.revokeSecurityGroupEgress(request)
return err
} else {
awsPluginLogger().V(1).Info("Add egress rules", "rule", newIpPermissions)
awsPluginLogger().V(1).Info("Add egress rules", "rule", ipPermissions)
request := &ec2.AuthorizeSecurityGroupEgressInput{
GroupId: cloudSgObj.GroupId,
IpPermissions: newIpPermissions,
IpPermissions: ipPermissions,
}
_, err := ec2Cfg.apiClient.authorizeSecurityGroupEgress(request)
return err
Expand Down Expand Up @@ -672,3 +625,76 @@ func getMemberNicCloudResourcesAttachedToOtherSGs(members []cloudresource.CloudR
}
return nicCloudResources
}

// normalizeIpPermissions aligns the format of ipPermissions received from the cloud to match the local format.
// It normalizes the protocol and separates each IP range and address group into separate rules.
func normalizeIpPermissions(ipPermissions []*ec2.IpPermission) []*ec2.IpPermission {
normalizedList := make([]*ec2.IpPermission, 0)
for _, ipPermission := range ipPermissions {
if protocol, ok := protocolNameNumMap[*ipPermission.IpProtocol]; ok {
ipPermission.IpProtocol = aws.String(strconv.Itoa(protocol))
}
for _, ipv4 := range ipPermission.IpRanges {
normalized := &ec2.IpPermission{
FromPort: ipPermission.FromPort,
IpProtocol: ipPermission.IpProtocol,
IpRanges: []*ec2.IpRange{ipv4},
ToPort: ipPermission.ToPort,
}
normalizedList = append(normalizedList, normalized)
}
for _, ipv6 := range ipPermission.Ipv6Ranges {
normalized := &ec2.IpPermission{
FromPort: ipPermission.FromPort,
IpProtocol: ipPermission.IpProtocol,
Ipv6Ranges: []*ec2.Ipv6Range{ipv6},
ToPort: ipPermission.ToPort,
}
normalizedList = append(normalizedList, normalized)
}
for _, group := range ipPermission.UserIdGroupPairs {
normalized := &ec2.IpPermission{
FromPort: ipPermission.FromPort,
IpProtocol: ipPermission.IpProtocol,
ToPort: ipPermission.ToPort,
UserIdGroupPairs: []*ec2.UserIdGroupPair{{GroupId: group.GroupId, Description: group.Description}},
}
normalizedList = append(normalizedList, normalized)
}
}
return normalizedList
}

// dedupIpPermissions identifies and returns a list of unique ipPermissions in local compared to cloud.
func dedupIpPermissions(local, cloud []*ec2.IpPermission) []*ec2.IpPermission {
uniqueIpPermissions := local[:0]
for _, localIpPermission := range local {
found := false
for idx, cloudIpPermission := range cloud {
if reflect.DeepEqual(cloudIpPermission, localIpPermission) {
cloud[idx] = nil
found = true
break
}
}
if !found {
uniqueIpPermissions = append(uniqueIpPermissions, localIpPermission)
}
}
return uniqueIpPermissions
}

// findDupIpPermissions finds and returns the duplicate ipPermissions in local compared to cloud.
func findDupIpPermissions(local, cloud []*ec2.IpPermission) []*ec2.IpPermission {
dupIpPermissions := local[:0]
for _, localIpPermission := range local {
for idx, cloudIpPermission := range cloud {
if reflect.DeepEqual(cloudIpPermission, localIpPermission) {
cloud[idx] = nil
dupIpPermissions = append(dupIpPermissions, cloudIpPermission)
break
}
}
}
return dupIpPermissions
}
38 changes: 31 additions & 7 deletions pkg/cloudprovider/plugins/aws/aws_security_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,37 +79,61 @@ func (c *awsCloud) UpdateSecurityGroupRules(appliedToGroupIdentifier *cloudresou
}

cloudSGObjToAddRules := cloudSGNameToCloudSGObj[appliedToGroupIdentifier.GetCloudName(false)]
cloudSGObjToAddRules.IpPermissions = normalizeIpPermissions(cloudSGObjToAddRules.IpPermissions)
cloudSGObjToAddRules.IpPermissionsEgress = normalizeIpPermissions(cloudSGObjToAddRules.IpPermissionsEgress)

addIngressRules, err := convertIngressToIpPermission(addIRule, cloudSGNameToCloudSGObj)
if err != nil {
return err
}
removeIngressRules, err := convertIngressToIpPermission(rmIRule, cloudSGNameToCloudSGObj)
if err != nil {
return err
}
addEgressRules, err := convertEgressToIpPermission(addERule, cloudSGNameToCloudSGObj)
if err != nil {
return err
}
removeEgressRules, err := convertEgressToIpPermission(rmERule, cloudSGNameToCloudSGObj)
if err != nil {
return err
}

addIngressRules = dedupIpPermissions(addIngressRules, cloudSGObjToAddRules.IpPermissions)
addEgressRules = dedupIpPermissions(addEgressRules, cloudSGObjToAddRules.IpPermissionsEgress)
removeIngressRules = findDupIpPermissions(removeIngressRules, cloudSGObjToAddRules.IpPermissions)
removeEgressRules = findDupIpPermissions(removeEgressRules, cloudSGObjToAddRules.IpPermissionsEgress)

// rollback operation for cloud api failures
rollbackRmIngress := false
rollbackAddIngress := false
rollbackRmEgress := false
defer func() {
if rollbackRmIngress {
_ = ec2Service.realizeIngressIPPermissions(cloudSGObjToAddRules, rmIRule, cloudSGNameToCloudSGObj, false)
_ = ec2Service.realizeIngressIPPermissions(cloudSGObjToAddRules, removeIngressRules, false)
}
if rollbackAddIngress {
_ = ec2Service.realizeIngressIPPermissions(cloudSGObjToAddRules, addIRule, cloudSGNameToCloudSGObj, true)
_ = ec2Service.realizeIngressIPPermissions(cloudSGObjToAddRules, addIngressRules, true)
}
if rollbackRmEgress {
_ = ec2Service.realizeEgressIPPermissions(cloudSGObjToAddRules, rmERule, cloudSGNameToCloudSGObj, false)
_ = ec2Service.realizeEgressIPPermissions(cloudSGObjToAddRules, removeEgressRules, false)
}
}()

// realize security group ingress and egress permissions
if err = ec2Service.realizeIngressIPPermissions(cloudSGObjToAddRules, rmIRule, cloudSGNameToCloudSGObj, true); err != nil {
if err = ec2Service.realizeIngressIPPermissions(cloudSGObjToAddRules, removeIngressRules, true); err != nil {
return err
}
if err = ec2Service.realizeIngressIPPermissions(cloudSGObjToAddRules, addIRule, cloudSGNameToCloudSGObj, false); err != nil {
if err = ec2Service.realizeIngressIPPermissions(cloudSGObjToAddRules, addIngressRules, false); err != nil {
rollbackRmIngress = true
return err
}
if err = ec2Service.realizeEgressIPPermissions(cloudSGObjToAddRules, rmERule, cloudSGNameToCloudSGObj, true); err != nil {
if err = ec2Service.realizeEgressIPPermissions(cloudSGObjToAddRules, removeEgressRules, true); err != nil {
rollbackRmIngress = true
rollbackAddIngress = true
return err
}
if err = ec2Service.realizeEgressIPPermissions(cloudSGObjToAddRules, addERule, cloudSGNameToCloudSGObj, false); err != nil {
if err = ec2Service.realizeEgressIPPermissions(cloudSGObjToAddRules, addEgressRules, false); err != nil {
rollbackRmIngress = true
rollbackAddIngress = true
rollbackRmEgress = true
Expand Down
Loading

0 comments on commit 9c6d262

Please sign in to comment.