-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.py
86 lines (70 loc) · 2.7 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# This file provides functions to make HTTP requests to the TAMS API.
# The functions include a retry on authentication failures when using renewable credentials.
from typing import AsyncGenerator
from contextlib import asynccontextmanager
import aiohttp
from credentials import Credentials, RenewableCredentials
@asynccontextmanager
async def request(
session: aiohttp.ClientSession,
credentials: Credentials,
method: str,
url: str,
**kwargs
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""Execute a request and retry once if there is a credentials failure"""
# Extract the 'headers' as they need to be extended with the auth credentials
in_headers = {}
if "headers" in kwargs:
in_headers = kwargs.pop("headers")
if isinstance(credentials, RenewableCredentials):
await credentials.ensure_credentials()
have_retried = False
while True:
async with session.request(method, url, headers=in_headers | credentials.header(), **kwargs) as resp:
if resp.status != 401 or not isinstance(credentials, RenewableCredentials) or have_retried:
yield resp
break
# Renew the credentials and retry
await credentials.renew_credentials()
have_retried = True
@asynccontextmanager
async def get_request(
session: aiohttp.ClientSession,
credentials: Credentials,
url: str,
**kwargs
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""Execute a GET request and retry once if there is a credentials failure"""
async with request(session, credentials, "GET", url, **kwargs) as resp:
yield resp
@asynccontextmanager
async def put_request(
session: aiohttp.ClientSession,
credentials: Credentials,
url: str,
**kwargs
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""Execute a PUT request and retry once if there is a credentials failure"""
async with request(session, credentials, "PUT", url, **kwargs) as resp:
yield resp
@asynccontextmanager
async def post_request(
session: aiohttp.ClientSession,
credentials: Credentials,
url: str,
**kwargs
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""Execute a POST request and retry once if there is a credentials failure"""
async with request(session, credentials, "POST", url, **kwargs) as resp:
yield resp
@asynccontextmanager
async def delete_request(
session: aiohttp.ClientSession,
credentials: Credentials,
url: str,
**kwargs
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""Execute a DELETE request and retry once if there is a credentials failure"""
async with request(session, credentials, "DELETE", url, **kwargs) as resp:
yield resp