-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy path_common.py
58 lines (44 loc) · 1.56 KB
/
_common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
"""Common utility functions. All of these are imported into __init__.py"""
import time
from typing import List, Optional, Dict
import requests
from requests.adapters import HTTPAdapter
from urllib3 import Retry
def doze(seconds: int, doze_period_ms: int = 100) -> int:
"""Sleeps for time given in seconds.
Note for Windows users: doze_period_ms less than 15 doesn't work correctly.
Args:
seconds: Seconds to doze for
doze_period_ms: Milliseconds per doze cycle
Returns: Time elapsed in nanoseconds
"""
loops = int(seconds * 1000 / doze_period_ms)
start = time.monotonic_ns()
for _ in range(loops):
time.sleep(doze_period_ms / 1000)
return time.monotonic_ns() - start
def session_with_retries(method_list: Optional[List[str]] = None):
"""
Provisions http session manager with retries.
:return:
"""
retry_strategy = Retry(
total=4,
status_forcelist=[429, 500, 502, 503, 504],
method_whitelist=method_list or ["HEAD", "GET", "OPTIONS", "TRACE"],
backoff_factor=1
)
adapter = HTTPAdapter(max_retries=retry_strategy)
http = requests.Session()
http.mount("https://", adapter)
http.mount("http://", adapter)
return http
def convert_datadog_tags(tag_dict: Optional[Dict[str, str]]) -> Optional[List[str]]:
"""
Converts tags dictionary to Datadog tag format.
:param tag_dict: Dictionary of tags.
:return: A list of tag_key:tag_value
"""
if not tag_dict:
return None
return [f"{k}:{v}" for k, v in tag_dict.items()]