-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathcf-sg-graph.jq
110 lines (96 loc) · 3.08 KB
/
cf-sg-graph.jq
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
module {
"name": "cf-sg-graph",
"description": "produce a JSON Graph representation of the security rules found in a given CloudFormation template",
"homepage": "https://github.com/stanch/jq-modules#readme",
"license": "MIT",
"author": "Nick",
"repository": {
"type": "git",
"url": "https://github.com/stanch/jq-modules.git"
}
};
# Generic map for JSON
def fmap(f):
if type == "object" then map_values(f) else if type == "array" then map(f) else . end end;
# Catamorphism, same as https://stedolan.github.io/jq/manual/#walk(f) in newer versions
def cata(f):
fmap(cata(f)) | f;
# Ignore Fn::Sub intrinsic functions and replace them with their arguments
def eliminate_subs: cata(
if type == "object" and has("Fn::Sub") then
if (.["Fn::Sub"] | type) == "array" then .["Fn::Sub"][0] else .["Fn::Sub"] end
else
.
end
);
# Ignore Fn::ImportValue intrinsic functions and replace them with their arguments
def eliminate_imports: cata(
if type == "object" and has("Fn::ImportValue") then
{ Ref: .["Fn::ImportValue"] }
else
.
end
);
# Reference to the subject security group from a security rule
def node_ref:
.GroupId.Ref // .GroupName.Ref;
# Reference to the source security group from a security rule
def source_node_ref:
.SourceSecurityGroupId.Ref // .SourceSecurityGroupName.Ref // .CidrIp;
# Reference to the destination security group from a security rule
def target_node_ref:
.DestinationSecurityGroupId.Ref // .DestinationSecurityGroupName.Ref // .CidrIp;
# Pretty-print -1 as * for IP protocols and port numbers
def wildcard(value):
if value == "-1" then "*" else value end;
# Label an edge in the graph with the details about the allowed connections
def edge_label(proto; from; to):
wildcard(proto) + " " +
(if proto == "icmp" then
wildcard(from) + " " + wildcard(to)
else
if from == to then from else from + "-" + to end
end);
# Gather all security rules of a given type (inbound or outbound) to form an array of edges in the graph
def edges(type; source; target):
.Resources | map(
select(.Type == type) | .Properties |
{
source: source,
target: target,
label: edge_label(.IpProtocol; .FromPort; .ToPort),
directed: true,
metadata: { type: type }
}
);
# Combine the edges for each pair of nodes, aggregating their labels
def combine_edges:
group_by(.source, .target, .metadata.type) | map(
. as $all | .[0] |
.label = " " + ([$all[].label] | join(", "))
);
# Gather all edges for the graph
def edges:
edges("AWS::EC2::SecurityGroupEgress"; node_ref; target_node_ref) +
edges("AWS::EC2::SecurityGroupIngress"; source_node_ref; node_ref) |
combine_edges;
# Gather all nodes referenced by the edges, fetching pretty descriptions where possible
def nodes:
. as $input |
edges | map(.source, .target) | unique | map(
. as $id |
{
id: $id,
label: ($input.Resources[$id].Properties.GroupDescription // $id)
}
);
# Build the graph
def graph:
eliminate_subs | eliminate_imports |
{
graph: {
directed: true,
nodes: nodes,
edges: edges
}
};