-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprune-policy-graph
executable file
·131 lines (112 loc) · 4.6 KB
/
prune-policy-graph
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#!/usr/bin/python3
# -*- encoding: utf8 -*-
#
# Prune `qrexec-policy-graph` dot graph output to make it more usable for display (it tends to produce a graph with 10-60k edges).
#
# Reads the input graph from stdin and writes the output graph to stdout.
#
# Since `qrexec-policy-graph` data is based on the _actual_ RPC policy running on your system (and does _not_ separately interpret
# the RPC configuration at `/etc/qubes-rpc/policy/`, the resulting graph enables you to review whether it works as expected.
#
# Copyright (C) 2022 David Hobach GPLv3
# 0.7
#
#
# Relevant doc:
# - https://github.com/QubesOS/qubes-core-qrexec/blob/master/qrexec/tools/qrexec_policy_graph.py
# - https://networkx.org/documentation/networkx-1.11/
import sys
import re
import networkx as nx
from networkx.drawing.nx_agraph import read_dot #agraph is ~1000x faster and doesn't use GBs of memory
from networkx.drawing.nx_pydot import write_dot #agraph inserts lot of newlines
#merge_attributes(atts)
#Merge the given list of attributes to a single attribute dict.
def merge_attributes(atts):
ret = {}
for att in atts:
for key in att.keys():
val = att[key]
if key == 'label':
if not ret.get(key):
ret[key] = []
if isinstance(val, list):
for i in val:
if not i in ret[key]:
ret[key].append(i)
else:
if not val in ret[key]:
ret[key].append(val)
elif key == 'color':
if ret.get(key) != 'red': #red always wins
ret[key] = val
else:
raise Exception('Unexpected key found: %s' % key)
return ret
#merge_nodes(nodes)
#Merge the given list of nodes.
def merge_nodes(nodes):
ret = sorted(set(nodes))
return ','.join(ret)
# prune the given graph G by merging all nodes & edges that share a common identifier returned by `id_func(node1, node2, edge_attrs)`
# id_func must return a hashable object
def prune(G, id_func):
O = nx.MultiDiGraph()
id2pre = {} #ID --> { u: u, v: v, a: a }
for u,v,a in G.edges(data=True):
_id = id_func(u, v, a)
#init, if necessary
if not id2pre.get(_id):
id2pre[_id] = {}
id2pre[_id]['u'] = id2pre[_id].get('u',[])
id2pre[_id]['v'] = id2pre[_id].get('v',[])
id2pre[_id]['a'] = id2pre[_id].get('a',[])
#append
id2pre[_id]['u'].append(u)
id2pre[_id]['v'].append(v)
id2pre[_id]['a'].append(a)
for _id in id2pre:
group_u = merge_nodes(id2pre[_id]['u'])
group_v = merge_nodes(id2pre[_id]['v'])
atts = merge_attributes(id2pre[_id]['a'])
O.add_edge(group_u, group_v, **atts)
return O
def main():
print('Reading the RPC policy graph... (this may take some time)', file=sys.stderr)
G = read_dot(sys.stdin)
print('Fixing some details...', file=sys.stderr)
#FIXME: attribute order shouldn't matter --> better id_func functions needed
#fix some details about the qrexec-policy-graph output
to_remove = []
to_rename = {}
disp_pattern = re.compile('^disp[0-9]+$')
disp_template_pattern = re.compile('^@dispvm:(.*)$')
for n in G:
#remove disp* nodes
if disp_pattern.search(str(n)):
to_remove.append(n)
#move @dispvm: to the policies on the edges leading to such a node
else:
match = disp_template_pattern.match(str(n))
if match:
for p in G.predecessors(n):
for att in G[p][n].values(): #G[p][n] is a dict of index --> attribute dict
label = att['label']
att['label'] = 'disp:' + label
to_rename[n] = match.group(1)
G.remove_nodes_from(to_remove)
G = nx.relabel_nodes(G, to_rename, copy=False)
#prune: if more than one source shares common targets and attributes, merge them to a group
print('Pruning sources...', file=sys.stderr)
G = prune(G, lambda u,v,a: str(v) + str(a))
#prune: if more than one target shares common sources and attributes, merge them to a group
print('Pruning targets...', file=sys.stderr)
G = prune(G, lambda u,v,a: str(u) + str(a))
#prune: if more than one attribute shares common sources and targets, merge them to a group
print('Pruning attributes...', file=sys.stderr)
G = prune(G, lambda u,v,a: str(u) + str(v))
print('Writing output...', file=sys.stderr)
write_dot(G, sys.stdout)
print('All done.', file=sys.stderr)
if __name__ == '__main__':
sys.exit(main())