-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
94 lines (78 loc) · 3.15 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import time
import requests
def get_base_url(cloud: str, env: str):
print(env)
print(cloud)
base_url = ''
if env == 'dev':
base_url = 'http://127.0.0.1:8081'
elif cloud == 'gcp':
if env == 'staging':
base_url = 'https://api.staging.sigmacomputing.io'
elif env == 'prod' or env == 'production':
base_url = 'https://api.sigmacomputing.com'
elif cloud == 'aws':
if env == 'staging':
base_url = 'https://staging-aws-api.sigmacomputing.io'
elif env == 'prod' or env == 'production':
base_url = 'https://aws-api.sigmacomputing.com'
elif cloud == 'azure':
if env == 'staging':
base_url = 'https://api.staging.us.azure.sigmacomputing.io'
elif env == 'prod' or env == 'production':
base_url = 'https://api.us.azure.sigmacomputing.com'
return base_url
def get_access_token(base_url, client_id, client_secret):
""" Gets the access token from Sigma
:client_id: Client ID generated from Sigma
:client_secret: Client secret generated from Sigma
:returns: Access token
"""
payload = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret
}
response = requests.post(f"{base_url}/v2/auth/token", data=payload)
data = response.json()
return data["access_token"]
def get_headers(access_token):
""" Gets headers for API requests
:access_token: Generated access token
:returns: Headers for API requests
"""
return {"Authorization": "Bearer " + access_token}
class SigmaClient():
def __init__(self, env, cloud, client_id, client_secret):
self.client_id = client_id
self.client_secret = client_secret
self.base_url = get_base_url(cloud, env)
self.headers = self._get_headers()
def _get_headers(self):
return get_headers(get_access_token(self.base_url, self.client_id, self.client_secret))
def post(self, path, **kwargs):
return self._exec(requests.post, path, **kwargs)
def get(self, path, **kwargs):
return self._exec(requests.get, path, **kwargs)
def put(self, path, **kwargs):
return self._exec(requests.put, path, **kwargs)
def delete(self, path, **kwargs):
return self._exec(requests.delete, path, **kwargs)
def patch(self, path, **kwargs):
return self._exec(requests.patch, path, **kwargs)
def _exec(self, func, path, retries=5, exc=None, **kwargs):
if retries < 0:
raise exc
try:
url = f'{self.base_url}/{path}'
headers: dict = kwargs.pop('headers', {})
headers.update(self.headers)
response = func(url, headers=headers, **kwargs)
if response.status_code == 401:
self.headers = self._get_headers()
headers.update(self.headers)
response = func(url, headers=headers, **kwargs)
return response
except requests.exceptions.ConnectionError as e:
time.sleep(1)
return self._exec(func, path, retries - 1, exc=e, **kwargs)