Skip to content

Commit

Permalink
♻️ refactor dv-2 dy-sidecar's API client (#3121)
Browse files Browse the repository at this point in the history
  • Loading branch information
GitHK authored Jun 24, 2022
1 parent 8e1c4f2 commit 9f459c3
Show file tree
Hide file tree
Showing 20 changed files with 1,898 additions and 505 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ class DynamicSidecarSettings(BaseCustomSettings):
regex=SERVICE_NETWORK_RE,
description="network all dynamic services are connected to",
)
DYNAMIC_SIDECAR_API_CLIENT_REQUEST_MAX_RETRIES: int = Field(
4, description="maximum attempts to retry a request before giving up"
)
DYNAMIC_SIDECAR_API_REQUEST_TIMEOUT: PositiveFloat = Field(
15.0,
description=(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,15 @@
SimcoreServiceLabels,
)
from models_library.services_resources import ServiceResourcesDict
from pydantic import BaseModel, Extra, Field, PositiveInt, constr
from pydantic import (
AnyHttpUrl,
BaseModel,
Extra,
Field,
PositiveInt,
constr,
parse_obj_as,
)

from ..constants import (
DYNAMIC_PROXY_SERVICE_PREFIX,
Expand Down Expand Up @@ -235,9 +243,11 @@ def can_save_state(self) -> bool:
# consider adding containers for healthchecks but this is more difficult and it depends on each service

@property
def endpoint(self):
def endpoint(self) -> AnyHttpUrl:
"""endpoint where all the services are exposed"""
return f"http://{self.hostname}:{self.port}"
return parse_obj_as(
AnyHttpUrl, f"http://{self.hostname}:{self.port}" # NOSONAR
)

@property
def are_containers_ready(self) -> bool:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from ._errors import BaseClientHTTPError, ClientHttpError, UnexpectedStatusError
from ._public import (
DynamicSidecarClient,
get_dynamic_sidecar_client,
get_dynamic_sidecar_service_health,
setup,
shutdown,
)

__all__: tuple[str, ...] = (
"BaseClientHTTPError",
"ClientHttpError",
"DynamicSidecarClient",
"get_dynamic_sidecar_client",
"get_dynamic_sidecar_service_health",
"setup",
"shutdown",
"UnexpectedStatusError",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import asyncio
import functools
import inspect
import logging
from logging import Logger
from typing import Any, Awaitable, Callable, Optional

from httpx import AsyncClient, ConnectError, HTTPError, PoolTimeout, Response
from httpx._types import TimeoutTypes, URLTypes
from tenacity import RetryCallState
from tenacity._asyncio import AsyncRetrying
from tenacity.before import before_log
from tenacity.retry import retry_if_exception_type
from tenacity.stop import stop_after_attempt
from tenacity.wait import wait_exponential

from ._errors import ClientHttpError, UnexpectedStatusError, _WrongReturnType

logger = logging.getLogger(__name__)


def _log_requests_in_pool(client: AsyncClient, event_name: str) -> None:
# pylint: disable=protected-access
logger.warning(
"Requests while event '%s': %s",
event_name.upper(),
[
(r.request.method, r.request.url, r.request.headers)
for r in client._transport._pool._requests
],
)


def _log_retry(log: Logger, max_retries: int) -> Callable[[RetryCallState], None]:
def log_it(retry_state: RetryCallState) -> None:
# pylint: disable=protected-access

assert retry_state.outcome # nosec
e = retry_state.outcome.exception()
assert isinstance(e, HTTPError) # nosec
assert e._request # nosec

log.info(
"[%s/%s]Retry. Unexpected %s while requesting '%s %s': %s",
retry_state.attempt_number,
max_retries,
e.__class__.__name__,
e._request.method,
e._request.url,
f"{e=}",
)

return log_it


def retry_on_errors(
request_func: Callable[..., Awaitable[Response]]
) -> Callable[..., Awaitable[Response]]:
"""
Will retry the request on `ConnectError` and `PoolTimeout`.
Also wraps `httpx.HTTPError`
raises:
- `ClientHttpError`
"""
assert asyncio.iscoroutinefunction(request_func)

RETRY_ERRORS = (ConnectError, PoolTimeout)

@functools.wraps(request_func)
async def request_wrapper(zelf: "BaseThinClient", *args, **kwargs) -> Response:
# pylint: disable=protected-access
try:
async for attempt in AsyncRetrying(
stop=stop_after_attempt(zelf._request_max_retries),
wait=wait_exponential(min=1),
retry=retry_if_exception_type(RETRY_ERRORS),
before=before_log(logger, logging.DEBUG),
after=_log_retry(logger, zelf._request_max_retries),
reraise=True,
):
with attempt:
r: Response = await request_func(zelf, *args, **kwargs)
return r
except HTTPError as e:
if isinstance(e, PoolTimeout):
_log_requests_in_pool(zelf._client, "pool timeout")
raise ClientHttpError(e) from e

return request_wrapper


def expect_status(expected_code: int):
"""
raises an `UnexpectedStatusError` if the request's status is different
from `expected_code`
NOTE: always apply after `retry_on_errors`
raises:
- `UnexpectedStatusError`
- `ClientHttpError`
"""

def decorator(
request_func: Callable[..., Awaitable[Response]]
) -> Callable[..., Awaitable[Response]]:
assert asyncio.iscoroutinefunction(request_func)

@functools.wraps(request_func)
async def request_wrapper(zelf: "BaseThinClient", *args, **kwargs) -> Response:
response = await request_func(zelf, *args, **kwargs)
if response.status_code != expected_code:
raise UnexpectedStatusError(response, expected_code)

return response

return request_wrapper

return decorator


class BaseThinClient:
SKIP_METHODS: set[str] = {"close"}

def __init__(
self,
*,
request_max_retries: int,
base_url: Optional[URLTypes] = None,
timeout: Optional[TimeoutTypes] = None,
) -> None:
self._request_max_retries: int = request_max_retries

client_args: dict[str, Any] = {}
if base_url:
client_args["base_url"] = base_url
if timeout:
client_args["timeout"] = timeout
self._client = AsyncClient(**client_args)

# ensure all user defined public methods return `httpx.Response`
# NOTE: ideally these checks should be ran at import time!
public_methods = [
t[1]
for t in inspect.getmembers(self, predicate=inspect.ismethod)
if not (t[0].startswith("_") or t[0] in self.SKIP_METHODS)
]

for method in public_methods:
signature = inspect.signature(method)
if signature.return_annotation != Response:
raise _WrongReturnType(method, signature.return_annotation)

async def close(self) -> None:
_log_requests_in_pool(self._client, "closing")
await self._client.aclose()

async def __aenter__(self):
return self

async def __aexit__(self, exc_t, exc_v, exc_tb):
await self.close()
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""
Exception hierarchy:
* BaseClientError
x BaseRequestError
+ ClientHttpError
+ UnexpectedStatusError
x WrongReturnType
"""

from httpx import Response


class BaseClientError(Exception):
"""
Used as based for all the raised errors
"""


class _WrongReturnType(BaseClientError):
"""
used internally to signal the user that the defined method
has an invalid return time annotation
"""

def __init__(self, method, return_annotation) -> None:
super().__init__(
(
f"{method=} should return an instance "
f"of {Response}, not '{return_annotation}'!"
)
)


class BaseClientHTTPError(BaseClientError):
"""Base class to wrap all http related client errors"""


class ClientHttpError(BaseClientHTTPError):
"""used to captures all httpx.HttpError"""

def __init__(self, error: Exception) -> None:
super().__init__()
self.error: Exception = error


class UnexpectedStatusError(BaseClientHTTPError):
"""raised when the status of the request is not the one it was expected"""

def __init__(self, response: Response, expecting: int) -> None:
message = (
f"Expected status: {expecting}, got {response.status_code} for: {response.url}: "
f"headers={response.headers}, body='{response.text}'"
)
super().__init__(message)
self.response = response
Loading

0 comments on commit 9f459c3

Please sign in to comment.