Skip to content

Commit

Permalink
Merge pull request #14 from inloco/refactor/replaces-requests-in-api-…
Browse files Browse the repository at this point in the history
…and-token_manager

refactor(api, token_manager): creates BaseRequest calls and uses on existing routes
  • Loading branch information
joseviccruz authored May 16, 2022
2 parents 9ec0847 + 50949b4 commit f3e4c68
Show file tree
Hide file tree
Showing 7 changed files with 340 additions and 297 deletions.
3 changes: 2 additions & 1 deletion incognia/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
'feedback_events',
'json_util',
'models',
'token_manager']
'token_manager',
'base_request']
54 changes: 19 additions & 35 deletions incognia/api.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
import datetime as dt
import json
from typing import Final, Optional, List

import requests
from typing import Optional, List

from .datetime_util import total_milliseconds_since_epoch
from .endpoints import Endpoints
from .exceptions import IncogniaHTTPError, IncogniaError
from .json_util import encode
from .models import Coordinates, StructuredAddress, TransactionAddress, PaymentValue, PaymentMethod
from .token_manager import TokenManager
from .base_request import BaseRequest, JSON_CONTENT_HEADER


class IncogniaAPI:
JSON_CONTENT_HEADER: Final[dict] = {'Content-type': 'application/json'}

def __init__(self, client_id: str, client_secret: str):
self.__token_manager = TokenManager(client_id, client_secret)
self.__request = BaseRequest()

def __get_authorization_header(self) -> dict:
access_token, token_type = self.__token_manager.get()
Expand All @@ -32,20 +29,17 @@ def register_new_signup(self,

try:
headers = self.__get_authorization_header()
headers.update(self.JSON_CONTENT_HEADER)
headers.update(JSON_CONTENT_HEADER)
body = {
'installation_id': installation_id,
'address_line': address_line,
'structured_address': structured_address,
'address_coordinates': address_coordinates
}
data = encode(body)
response = requests.post(Endpoints.SIGNUPS, headers=headers, data=data)
response.raise_for_status()

return json.loads(response.content.decode('utf-8'))
return self.__request.post(Endpoints.SIGNUPS, headers=headers, data=data)

except requests.HTTPError as e:
except IncogniaHTTPError as e:
raise IncogniaHTTPError(e) from None

def get_signup_assessment(self, signup_id: str) -> dict:
Expand All @@ -54,12 +48,9 @@ def get_signup_assessment(self, signup_id: str) -> dict:

try:
headers = self.__get_authorization_header()
response = requests.get(f'{Endpoints.SIGNUPS}/{signup_id}', headers=headers)
response.raise_for_status()
return self.__request.get(f'{Endpoints.SIGNUPS}/{signup_id}', headers=headers)

return json.loads(response.content.decode('utf-8'))

except requests.HTTPError as e:
except IncogniaHTTPError as e:
raise IncogniaHTTPError(e) from None

def register_feedback(self,
Expand All @@ -78,7 +69,7 @@ def register_feedback(self,

try:
headers = self.__get_authorization_header()
headers.update(self.JSON_CONTENT_HEADER)
headers.update(JSON_CONTENT_HEADER)
body = {
'event': event,
'timestamp': total_milliseconds_since_epoch(timestamp),
Expand All @@ -90,10 +81,9 @@ def register_feedback(self,
'installation_id': installation_id
}
data = encode(body)
response = requests.post(Endpoints.FEEDBACKS, headers=headers, data=data)
response.raise_for_status()
return self.__request.post(Endpoints.FEEDBACKS, headers=headers, data=data)

except requests.HTTPError as e:
except IncogniaHTTPError as e:
raise IncogniaHTTPError(e) from None

def register_payment(self,
Expand All @@ -111,7 +101,7 @@ def register_payment(self,

try:
headers = self.__get_authorization_header()
headers.update(self.JSON_CONTENT_HEADER)
headers.update(JSON_CONTENT_HEADER)
params = None if evaluate is None else {'eval': evaluate}
body = {
'type': 'payment',
Expand All @@ -123,13 +113,10 @@ def register_payment(self,
'payment_methods': payment_methods
}
data = encode(body)
response = requests.post(Endpoints.TRANSACTIONS, headers=headers, params=params,
data=data)
response.raise_for_status()

return json.loads(response.content.decode('utf-8'))
return self.__request.post(Endpoints.TRANSACTIONS, headers=headers, params=params,
data=data)

except requests.HTTPError as e:
except IncogniaHTTPError as e:
raise IncogniaHTTPError(e) from None

def register_login(self,
Expand All @@ -144,7 +131,7 @@ def register_login(self,

try:
headers = self.__get_authorization_header()
headers.update(self.JSON_CONTENT_HEADER)
headers.update(JSON_CONTENT_HEADER)
params = None if evaluate is None else {'eval': evaluate}
body = {
'type': 'login',
Expand All @@ -153,11 +140,8 @@ def register_login(self,
'external_id': external_id
}
data = encode(body)
response = requests.post(Endpoints.TRANSACTIONS, headers=headers, params=params,
data=data)
response.raise_for_status()

return json.loads(response.content.decode('utf-8'))
return self.__request.post(Endpoints.TRANSACTIONS, headers=headers, params=params,
data=data)

except requests.HTTPError as e:
except IncogniaHTTPError as e:
raise IncogniaHTTPError(e) from None
60 changes: 60 additions & 0 deletions incognia/base_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import json
import platform
import sys
from typing import Final, Any, Union, Optional

import requests

from incognia.exceptions import IncogniaHTTPError

_LIBRARY_VERSION: Final[str] = sys.modules['incognia'].__version__
_OS_NAME: Final[str] = platform.system()
_OS_VERSION: Final[str] = platform.release()
_OS_ARCH: Final[str] = platform.architecture()[0]
_LANGUAGE_VERSION: Final[str] = platform.python_version()

USER_AGENT_HEADER: Final[dict] = {
'User-Agent': f'incognia-python/{_LIBRARY_VERSION}'
f' ({_OS_NAME} {_OS_VERSION} {_OS_ARCH})'
f' Python/{_LANGUAGE_VERSION}'
}

JSON_CONTENT_HEADER: Final[dict] = {
'Content-Type': 'application/json'
}


class BaseRequest:
def __init__(self, timeout: float = 5.0):
self.__timeout: float = timeout

def timeout(self) -> float:
return self.__timeout

def post(self, url: Union[str, bytes], headers: Any = None, data: Any = None,
params: Any = None,
auth: Optional[Any] = None) -> Optional[dict]:
headers = headers or {}
headers.update(USER_AGENT_HEADER)

try:
response = requests.post(url=url, headers=headers, data=data, params=params,
timeout=self.__timeout,
auth=auth)
response.raise_for_status()
return json.loads(response.content.decode('utf-8')) or None

except requests.HTTPError as e:
raise IncogniaHTTPError(e) from None

def get(self, url: Union[str, bytes], headers: Any = None, data: Any = None) -> Optional[dict]:
headers = headers or {}
headers.update(USER_AGENT_HEADER)

try:
response = requests.get(url=url, headers=headers, data=data, timeout=self.__timeout)
response.raise_for_status()
return json.loads(response.content.decode('utf-8')) or None

except requests.HTTPError as e:
raise IncogniaHTTPError(e) from None
35 changes: 17 additions & 18 deletions incognia/token_manager.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
import base64
import datetime as dt
import json
from threading import Lock
from typing import Final, Optional, NamedTuple

import requests

from .base_request import BaseRequest
from .endpoints import Endpoints
from .exceptions import IncogniaHTTPError

_TOKEN_REFRESH_BEFORE_SECONDS: Final[int] = 10


class TokenValues(NamedTuple):
access_token: str
token_type: str


class TokenManager:
TOKEN_REFRESH_BEFORE_SECONDS: Final[int] = 10

def __init__(self, client_id: str, client_secret: str):
self.__client_id: str = client_id
self.__client_secret: str = client_secret
self.__token_values: Optional[TokenValues] = None
self.__expiration_time: Optional[dt.datetime] = None
self.__request: BaseRequest = BaseRequest()
self.__mutex: Lock = Lock()

def __refresh_token(self) -> None:
client_id, client_secret = self.__client_id, self.__client_secret
Expand All @@ -30,26 +31,24 @@ def __refresh_token(self) -> None:
headers = {'Authorization': f'Basic {client_id_and_secret_encoded}'}

try:
response = requests.post(url=Endpoints.TOKEN, headers=headers,
auth=(client_id, client_secret))
response.raise_for_status()

parsed_response = json.loads(response.content.decode('utf-8'))
token_values = TokenValues(parsed_response['access_token'],
parsed_response['token_type'])
expiration_time = dt.datetime.now() + dt.timedelta(
seconds=int(parsed_response['expires_in']))
response = self.__request.post(url=Endpoints.TOKEN, headers=headers,
auth=(client_id, client_secret))
token_values = TokenValues(response['access_token'], response['token_type'])
expiration_time = dt.datetime.now() + dt.timedelta(seconds=int(response['expires_in']))

self.__token_values, self.__expiration_time = token_values, expiration_time

except requests.HTTPError as e:
except IncogniaHTTPError as e:
raise IncogniaHTTPError(e) from None

def __is_expired(self) -> bool:
return (self.__expiration_time - dt.datetime.now()).total_seconds() <= \
self.TOKEN_REFRESH_BEFORE_SECONDS
return (self.__expiration_time - dt.datetime.now()) \
.total_seconds() <= _TOKEN_REFRESH_BEFORE_SECONDS

def get(self) -> TokenValues:
self.__mutex.acquire()
if not self.__expiration_time or self.__is_expired():
self.__refresh_token()
return self.__token_values
token_values = self.__token_values
self.__mutex.release()
return token_values
Loading

0 comments on commit f3e4c68

Please sign in to comment.