-
Notifications
You must be signed in to change notification settings - Fork 0
/
run.py
67 lines (51 loc) · 1.61 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import glob
import json
import re
import requests
opa_base_url = 'http://localhost:8181/v1'
# Determine what API an object came from
def get_api(obj):
if 'selfLink' in obj:
selfLink = obj['selfLink']
pattern = r"^https:\/\/([a-z]+).googleapis.com\/([a-z]+)\/.*$"
m = re.fullmatch(pattern, selfLink)
if m:
if m.group(1) != "www":
return m.group(1)
else:
return m.group(2)
return None
def sample_data():
for file in glob.iglob("./sample_data/**/*.json", recursive=True):
with open(file) as f:
yield (file, json.load(f))
def opa_input(api, policy, action, obj):
opa_url = "{}/data/cloud{}/{}/{}".format(
opa_base_url,
api,
policy,
action
)
data = {
"input": obj
}
resp = requests.post(opa_url, data=json.dumps(data), headers={'Content-type': 'application/json'})
return resp.json().get('result')
api_to_policies_map = {
"storage": ["objectversioning"],
"sql": ["acl"]
}
if __name__ == "__main__":
for (file_name, obj) in sample_data():
name = file_name.rsplit('/', 1)[1]
print("TESTING {} ... ".format(name), end='')
api = get_api(obj)
policies = api_to_policies_map[api]
for policy in policies:
valid = opa_input(api, policy, 'is_valid', obj)
if valid:
print('validated')
else:
print('enforcement_needed')
newobj = opa_input(api, policy, 'enforce', obj)
print(json.dumps(newobj, indent=2))