-
Notifications
You must be signed in to change notification settings - Fork 43
/
Copy path_client.py
358 lines (288 loc) · 11.7 KB
/
_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
from __future__ import annotations
import time
from http import HTTPStatus
from random import uniform
from typing import Protocol
import requests
from ._exceptions import APIException
from ._version import __version__
from .actions import ActionsClient
from .certificates import CertificatesClient
from .datacenters import DatacentersClient
from .firewalls import FirewallsClient
from .floating_ips import FloatingIPsClient
from .images import ImagesClient
from .isos import IsosClient
from .load_balancer_types import LoadBalancerTypesClient
from .load_balancers import LoadBalancersClient
from .locations import LocationsClient
from .networks import NetworksClient
from .placement_groups import PlacementGroupsClient
from .primary_ips import PrimaryIPsClient
from .server_types import ServerTypesClient
from .servers import ServersClient
from .ssh_keys import SSHKeysClient
from .volumes import VolumesClient
class BackoffFunction(Protocol):
def __call__(self, retries: int) -> float:
"""
Return a interval in seconds to wait between each API call.
:param retries: Number of calls already made.
"""
def constant_backoff_function(interval: float) -> BackoffFunction:
"""
Return a backoff function, implementing a constant backoff.
:param interval: Constant interval to return.
"""
# pylint: disable=unused-argument
def func(retries: int) -> float:
return interval
return func
def exponential_backoff_function(
*,
base: float,
multiplier: int,
cap: float,
jitter: bool = False,
) -> BackoffFunction:
"""
Return a backoff function, implementing a truncated exponential backoff with
optional full jitter.
:param base: Base for the exponential backoff algorithm.
:param multiplier: Multiplier for the exponential backoff algorithm.
:param cap: Value at which the interval is truncated.
:param jitter: Whether to add jitter.
"""
def func(retries: int) -> float:
interval = base * multiplier**retries # Exponential backoff
interval = min(cap, interval) # Cap backoff
if jitter:
interval = uniform(base, interval) # Add jitter
return interval
return func
class Client:
"""
Client for the Hetzner Cloud API.
The Hetzner Cloud API reference is available at https://docs.hetzner.cloud.
Make sure to follow our API changelog available at
https://docs.hetzner.cloud/changelog (or the RRS feed available at
https://docs.hetzner.cloud/changelog/feed.rss) to be notified about additions,
deprecations and removals.
**Retry mechanism**
The :attr:`Client.request` method will retry failed requests that match certain criteria. The
default retry interval is defined by an exponential backoff algorithm truncated to 60s
with jitter. The default maximal number of retries is 5.
The following rules define when a request can be retried:
- When the client returned a network timeout error.
- When the API returned an HTTP error, with the status code:
- ``502`` Bad Gateway
- ``504`` Gateway Timeout
- When the API returned an application error, with the code:
- ``conflict``
- ``rate_limit_exceeded``
Changes to the retry policy might occur between releases, and will not be considered
breaking changes.
"""
_version = __version__
__user_agent_prefix = "hcloud-python"
_retry_interval = staticmethod(
exponential_backoff_function(base=1.0, multiplier=2, cap=60.0, jitter=True)
)
_retry_max_retries = 5
def __init__(
self,
token: str,
api_endpoint: str = "https://api.hetzner.cloud/v1",
application_name: str | None = None,
application_version: str | None = None,
poll_interval: int | float | BackoffFunction = 1.0,
poll_max_retries: int = 120,
timeout: float | tuple[float, float] | None = None,
):
"""Create a new Client instance
:param token: Hetzner Cloud API token
:param api_endpoint: Hetzner Cloud API endpoint
:param application_name: Your application name
:param application_version: Your application _version
:param poll_interval:
Interval in seconds to use when polling actions from the API.
You may pass a function to compute a custom poll interval.
:param poll_max_retries:
Max retries before timeout when polling actions from the API.
:param timeout: Requests timeout in seconds
"""
self.token = token
self._api_endpoint = api_endpoint
self._application_name = application_name
self._application_version = application_version
self._requests_session = requests.Session()
self._requests_timeout = timeout
if isinstance(poll_interval, (int, float)):
self._poll_interval_func = constant_backoff_function(poll_interval)
else:
self._poll_interval_func = poll_interval
self._poll_max_retries = poll_max_retries
self.datacenters = DatacentersClient(self)
"""DatacentersClient Instance
:type: :class:`DatacentersClient <hcloud.datacenters.client.DatacentersClient>`
"""
self.locations = LocationsClient(self)
"""LocationsClient Instance
:type: :class:`LocationsClient <hcloud.locations.client.LocationsClient>`
"""
self.servers = ServersClient(self)
"""ServersClient Instance
:type: :class:`ServersClient <hcloud.servers.client.ServersClient>`
"""
self.server_types = ServerTypesClient(self)
"""ServerTypesClient Instance
:type: :class:`ServerTypesClient <hcloud.server_types.client.ServerTypesClient>`
"""
self.volumes = VolumesClient(self)
"""VolumesClient Instance
:type: :class:`VolumesClient <hcloud.volumes.client.VolumesClient>`
"""
self.actions = ActionsClient(self)
"""ActionsClient Instance
:type: :class:`ActionsClient <hcloud.actions.client.ActionsClient>`
"""
self.images = ImagesClient(self)
"""ImagesClient Instance
:type: :class:`ImagesClient <hcloud.images.client.ImagesClient>`
"""
self.isos = IsosClient(self)
"""ImagesClient Instance
:type: :class:`IsosClient <hcloud.isos.client.IsosClient>`
"""
self.ssh_keys = SSHKeysClient(self)
"""SSHKeysClient Instance
:type: :class:`SSHKeysClient <hcloud.ssh_keys.client.SSHKeysClient>`
"""
self.floating_ips = FloatingIPsClient(self)
"""FloatingIPsClient Instance
:type: :class:`FloatingIPsClient <hcloud.floating_ips.client.FloatingIPsClient>`
"""
self.primary_ips = PrimaryIPsClient(self)
"""PrimaryIPsClient Instance
:type: :class:`PrimaryIPsClient <hcloud.primary_ips.client.PrimaryIPsClient>`
"""
self.networks = NetworksClient(self)
"""NetworksClient Instance
:type: :class:`NetworksClient <hcloud.networks.client.NetworksClient>`
"""
self.certificates = CertificatesClient(self)
"""CertificatesClient Instance
:type: :class:`CertificatesClient <hcloud.certificates.client.CertificatesClient>`
"""
self.load_balancers = LoadBalancersClient(self)
"""LoadBalancersClient Instance
:type: :class:`LoadBalancersClient <hcloud.load_balancers.client.LoadBalancersClient>`
"""
self.load_balancer_types = LoadBalancerTypesClient(self)
"""LoadBalancerTypesClient Instance
:type: :class:`LoadBalancerTypesClient <hcloud.load_balancer_types.client.LoadBalancerTypesClient>`
"""
self.firewalls = FirewallsClient(self)
"""FirewallsClient Instance
:type: :class:`FirewallsClient <hcloud.firewalls.client.FirewallsClient>`
"""
self.placement_groups = PlacementGroupsClient(self)
"""PlacementGroupsClient Instance
:type: :class:`PlacementGroupsClient <hcloud.placement_groups.client.PlacementGroupsClient>`
"""
def _get_user_agent(self) -> str:
"""Get the user agent of the hcloud-python instance with the user application name (if specified)
:return: The user agent of this hcloud-python instance
"""
user_agents = []
for name, version in [
(self._application_name, self._application_version),
(self.__user_agent_prefix, self._version),
]:
if name is not None:
user_agents.append(name if version is None else f"{name}/{version}")
return " ".join(user_agents)
def _get_headers(self) -> dict:
headers = {
"User-Agent": self._get_user_agent(),
"Authorization": f"Bearer {self.token}",
}
return headers
def request( # type: ignore[no-untyped-def]
self,
method: str,
url: str,
**kwargs,
) -> dict:
"""Perform a request to the Hetzner Cloud API, wrapper around requests.request
:param method: HTTP Method to perform the Request
:param url: URL of the Endpoint
:param timeout: Requests timeout in seconds
:return: Response
"""
kwargs.setdefault("timeout", self._requests_timeout)
url = self._api_endpoint + url
headers = self._get_headers()
retries = 0
while True:
try:
response = self._requests_session.request(
method=method,
url=url,
headers=headers,
**kwargs,
)
return self._read_response(response)
except APIException as exception:
if retries < self._retry_max_retries and self._retry_policy(exception):
time.sleep(self._retry_interval(retries))
retries += 1
continue
raise
except requests.exceptions.Timeout:
if retries < self._retry_max_retries:
time.sleep(self._retry_interval(retries))
retries += 1
continue
raise
def _read_response(self, response: requests.Response) -> dict:
correlation_id = response.headers.get("X-Correlation-Id")
payload = {}
try:
if len(response.content) > 0:
payload = response.json()
except (TypeError, ValueError) as exc:
raise APIException(
code=response.status_code,
message=response.reason,
details={"content": response.content},
correlation_id=correlation_id,
) from exc
if not response.ok:
if not payload or "error" not in payload:
raise APIException(
code=response.status_code,
message=response.reason,
details={"content": response.content},
correlation_id=correlation_id,
)
error: dict = payload["error"]
raise APIException(
code=error["code"],
message=error["message"],
details=error.get("details"),
correlation_id=correlation_id,
)
return payload
def _retry_policy(self, exception: APIException) -> bool:
if isinstance(exception.code, str):
return exception.code in (
"rate_limit_exceeded",
"conflict",
)
if isinstance(exception.code, int):
return exception.code in (
HTTPStatus.BAD_GATEWAY,
HTTPStatus.GATEWAY_TIMEOUT,
)
return False