-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
AwsEC2GetPublicSGRules.py
135 lines (109 loc) · 4.06 KB
/
AwsEC2GetPublicSGRules.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import demistomock as demisto
from CommonServerPython import *
from CommonServerUserPython import *
import json
import copy
def get_dict_value(data, key):
""" Returns dict value for a given key (case insensitive) """
for key_name in data.keys():
if key_name.lower() == key.lower():
return data[key_name]
return None
def get_ec2_sg_public_rules(group_id, ip_permissions, checked_protocol=None, checked_from_port=None,
checked_to_port=None, region=None, include_ipv6='no'):
"""
Get the list of public
which can be passed on to the following command:
aws-ec2-revoke-security-group-ingress-rule
"""
# If the SG only has one rule, we have to convert the dict to a list with one element
if (isinstance(ip_permissions, dict)):
ip_permissions = [ip_permissions]
public_rules = []
for rule in ip_permissions:
# Check protocol
protocol = get_dict_value(rule, 'IpProtocol')
if protocol != '-1':
if checked_protocol.lower() != protocol.lower():
continue
bad_rule = {
'groupId': group_id,
'ipProtocol': protocol
}
if region:
bad_rule.update(region=region)
# Check the ports
from_port = get_dict_value(rule, 'FromPort')
to_port = get_dict_value(rule, 'ToPort')
if from_port and to_port:
if from_port < checked_from_port and to_port < checked_from_port:
continue
elif from_port > checked_to_port and to_port > checked_to_port:
continue
bad_rule.update({
'fromPort': from_port,
'toPort': to_port
})
# Process IPV4
ip_ranges = get_dict_value(rule, 'ipv4Ranges')
if not ip_ranges:
ip_ranges = get_dict_value(rule, 'IpRanges')
if ip_ranges:
for ip_range in ip_ranges:
cidr_ip = get_dict_value(ip_range, 'CidrIp')
if cidr_ip == '0.0.0.0/0':
tmp = copy.copy(bad_rule)
tmp['cidrIp'] = '0.0.0.0/0'
public_rules.append(tmp)
# Process IPv6
if include_ipv6 == 'yes':
ip_ranges = get_dict_value(rule, 'Ipv6Ranges')
if ip_ranges:
for ip_range in ip_ranges:
cidr_ip = get_dict_value(ip_range, 'CidrIpv6')
if cidr_ip == '::/0':
tmp = copy.copy(bad_rule)
tmp['cidrIp'] = '::/0'
public_rules.append(tmp)
return public_rules
def main(args):
ip_perms = args.get('ipPermissions')
if isinstance(ip_perms, str):
try:
ip_perms = json.loads(ip_perms)
except json.JSONDecodeError:
return_error('Unable to parse ipPermissions. Invalid JSON string.')
# If checked from_port or to_port is not specified
# it will default to 0-65535 (all port)
if args.get('fromPort'):
from_port = int(args.get('fromPort'))
else:
from_port = 0
if args.get('toPort'):
to_port = int(args.get('toPort'))
else:
to_port = 65535
public_rules = get_ec2_sg_public_rules(
group_id=args.get('groupId'),
ip_permissions=ip_perms,
checked_protocol=args.get('protocol'),
checked_from_port=from_port,
checked_to_port=to_port,
region=args.get('region'),
include_ipv6=args.get('includeIPv6')
)
readable_output = tableToMarkdown('Public Security Group Rules', public_rules,
['groupId', 'ipProtocol', 'fromPort', 'toPort', 'cidrIp', 'region']
)
context = {
'AWS': {
'EC2': {
'SecurityGroup': {
'PublicRules': public_rules
}
}
}
}
return_outputs(readable_output, context, raw_response=public_rules)
if __name__ in ('builtins', '__builtin__'):
main(demisto.args())