From 9f459c3c2038bdf0311406f90cecaf21e28cf429 Mon Sep 17 00:00:00 2001 From: Andrei Neagu <5694077+GitHK@users.noreply.github.com> Date: Fri, 24 Jun 2022 12:34:56 +0200 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20refactor=20dv-2=20dy-sidec?= =?UTF-8?q?ar's=20API=20client=20(#3121)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/settings.py | 3 + .../schemas/dynamic_services/scheduler.py | 16 +- .../dynamic_sidecar/api_client/__init__.py | 19 + .../dynamic_sidecar/api_client/_base.py | 161 ++++++ .../dynamic_sidecar/api_client/_errors.py | 56 +++ .../dynamic_sidecar/api_client/_public.py | 311 ++++++++++++ .../dynamic_sidecar/api_client/_thin.py | 223 ++++++++ .../modules/dynamic_sidecar/client_api.py | 447 ---------------- .../modules/dynamic_sidecar/errors.py | 23 - .../modules/dynamic_sidecar/module_setup.py | 11 +- .../dynamic_sidecar/scheduler/events.py | 28 +- .../dynamic_sidecar/scheduler/events_utils.py | 5 +- .../modules/dynamic_sidecar/scheduler/task.py | 10 +- .../02/test_dynamic_services_routes.py | 2 +- ...ixed_dynamic_sidecar_and_legacy_project.py | 2 +- services/director-v2/tests/unit/conftest.py | 18 +- ...modules_dynamic_sidecar_client_api_base.py | 198 ++++++++ ...dules_dynamic_sidecar_client_api_public.py | 475 ++++++++++++++++++ ...modules_dynamic_sidecar_client_api_thin.py | 383 ++++++++++++++ .../test_modules_dynamic_sidecar_scheduler.py | 12 +- 20 files changed, 1898 insertions(+), 505 deletions(-) create mode 100644 services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/__init__.py create mode 100644 services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_base.py create mode 100644 services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_errors.py create mode 100644 services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_public.py create mode 100644 services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_thin.py delete mode 100644 services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/client_api.py create mode 100644 services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_base.py create mode 100644 services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_public.py create mode 100644 services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_thin.py diff --git a/services/director-v2/src/simcore_service_director_v2/core/settings.py b/services/director-v2/src/simcore_service_director_v2/core/settings.py index c7088a51b2c..beda23d930b 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/settings.py +++ b/services/director-v2/src/simcore_service_director_v2/core/settings.py @@ -183,6 +183,9 @@ class DynamicSidecarSettings(BaseCustomSettings): regex=SERVICE_NETWORK_RE, description="network all dynamic services are connected to", ) + DYNAMIC_SIDECAR_API_CLIENT_REQUEST_MAX_RETRIES: int = Field( + 4, description="maximum attempts to retry a request before giving up" + ) DYNAMIC_SIDECAR_API_REQUEST_TIMEOUT: PositiveFloat = Field( 15.0, description=( diff --git a/services/director-v2/src/simcore_service_director_v2/models/schemas/dynamic_services/scheduler.py b/services/director-v2/src/simcore_service_director_v2/models/schemas/dynamic_services/scheduler.py index cb95018c027..932e26892f0 100644 --- a/services/director-v2/src/simcore_service_director_v2/models/schemas/dynamic_services/scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/models/schemas/dynamic_services/scheduler.py @@ -11,7 +11,15 @@ SimcoreServiceLabels, ) from models_library.services_resources import ServiceResourcesDict -from pydantic import BaseModel, Extra, Field, PositiveInt, constr +from pydantic import ( + AnyHttpUrl, + BaseModel, + Extra, + Field, + PositiveInt, + constr, + parse_obj_as, +) from ..constants import ( DYNAMIC_PROXY_SERVICE_PREFIX, @@ -235,9 +243,11 @@ def can_save_state(self) -> bool: # consider adding containers for healthchecks but this is more difficult and it depends on each service @property - def endpoint(self): + def endpoint(self) -> AnyHttpUrl: """endpoint where all the services are exposed""" - return f"http://{self.hostname}:{self.port}" + return parse_obj_as( + AnyHttpUrl, f"http://{self.hostname}:{self.port}" # NOSONAR + ) @property def are_containers_ready(self) -> bool: diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/__init__.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/__init__.py new file mode 100644 index 00000000000..a116e134865 --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/__init__.py @@ -0,0 +1,19 @@ +from ._errors import BaseClientHTTPError, ClientHttpError, UnexpectedStatusError +from ._public import ( + DynamicSidecarClient, + get_dynamic_sidecar_client, + get_dynamic_sidecar_service_health, + setup, + shutdown, +) + +__all__: tuple[str, ...] = ( + "BaseClientHTTPError", + "ClientHttpError", + "DynamicSidecarClient", + "get_dynamic_sidecar_client", + "get_dynamic_sidecar_service_health", + "setup", + "shutdown", + "UnexpectedStatusError", +) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_base.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_base.py new file mode 100644 index 00000000000..499124af5df --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_base.py @@ -0,0 +1,161 @@ +import asyncio +import functools +import inspect +import logging +from logging import Logger +from typing import Any, Awaitable, Callable, Optional + +from httpx import AsyncClient, ConnectError, HTTPError, PoolTimeout, Response +from httpx._types import TimeoutTypes, URLTypes +from tenacity import RetryCallState +from tenacity._asyncio import AsyncRetrying +from tenacity.before import before_log +from tenacity.retry import retry_if_exception_type +from tenacity.stop import stop_after_attempt +from tenacity.wait import wait_exponential + +from ._errors import ClientHttpError, UnexpectedStatusError, _WrongReturnType + +logger = logging.getLogger(__name__) + + +def _log_requests_in_pool(client: AsyncClient, event_name: str) -> None: + # pylint: disable=protected-access + logger.warning( + "Requests while event '%s': %s", + event_name.upper(), + [ + (r.request.method, r.request.url, r.request.headers) + for r in client._transport._pool._requests + ], + ) + + +def _log_retry(log: Logger, max_retries: int) -> Callable[[RetryCallState], None]: + def log_it(retry_state: RetryCallState) -> None: + # pylint: disable=protected-access + + assert retry_state.outcome # nosec + e = retry_state.outcome.exception() + assert isinstance(e, HTTPError) # nosec + assert e._request # nosec + + log.info( + "[%s/%s]Retry. Unexpected %s while requesting '%s %s': %s", + retry_state.attempt_number, + max_retries, + e.__class__.__name__, + e._request.method, + e._request.url, + f"{e=}", + ) + + return log_it + + +def retry_on_errors( + request_func: Callable[..., Awaitable[Response]] +) -> Callable[..., Awaitable[Response]]: + """ + Will retry the request on `ConnectError` and `PoolTimeout`. + Also wraps `httpx.HTTPError` + raises: + - `ClientHttpError` + """ + assert asyncio.iscoroutinefunction(request_func) + + RETRY_ERRORS = (ConnectError, PoolTimeout) + + @functools.wraps(request_func) + async def request_wrapper(zelf: "BaseThinClient", *args, **kwargs) -> Response: + # pylint: disable=protected-access + try: + async for attempt in AsyncRetrying( + stop=stop_after_attempt(zelf._request_max_retries), + wait=wait_exponential(min=1), + retry=retry_if_exception_type(RETRY_ERRORS), + before=before_log(logger, logging.DEBUG), + after=_log_retry(logger, zelf._request_max_retries), + reraise=True, + ): + with attempt: + r: Response = await request_func(zelf, *args, **kwargs) + return r + except HTTPError as e: + if isinstance(e, PoolTimeout): + _log_requests_in_pool(zelf._client, "pool timeout") + raise ClientHttpError(e) from e + + return request_wrapper + + +def expect_status(expected_code: int): + """ + raises an `UnexpectedStatusError` if the request's status is different + from `expected_code` + NOTE: always apply after `retry_on_errors` + + raises: + - `UnexpectedStatusError` + - `ClientHttpError` + """ + + def decorator( + request_func: Callable[..., Awaitable[Response]] + ) -> Callable[..., Awaitable[Response]]: + assert asyncio.iscoroutinefunction(request_func) + + @functools.wraps(request_func) + async def request_wrapper(zelf: "BaseThinClient", *args, **kwargs) -> Response: + response = await request_func(zelf, *args, **kwargs) + if response.status_code != expected_code: + raise UnexpectedStatusError(response, expected_code) + + return response + + return request_wrapper + + return decorator + + +class BaseThinClient: + SKIP_METHODS: set[str] = {"close"} + + def __init__( + self, + *, + request_max_retries: int, + base_url: Optional[URLTypes] = None, + timeout: Optional[TimeoutTypes] = None, + ) -> None: + self._request_max_retries: int = request_max_retries + + client_args: dict[str, Any] = {} + if base_url: + client_args["base_url"] = base_url + if timeout: + client_args["timeout"] = timeout + self._client = AsyncClient(**client_args) + + # ensure all user defined public methods return `httpx.Response` + # NOTE: ideally these checks should be ran at import time! + public_methods = [ + t[1] + for t in inspect.getmembers(self, predicate=inspect.ismethod) + if not (t[0].startswith("_") or t[0] in self.SKIP_METHODS) + ] + + for method in public_methods: + signature = inspect.signature(method) + if signature.return_annotation != Response: + raise _WrongReturnType(method, signature.return_annotation) + + async def close(self) -> None: + _log_requests_in_pool(self._client, "closing") + await self._client.aclose() + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_t, exc_v, exc_tb): + await self.close() diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_errors.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_errors.py new file mode 100644 index 00000000000..e867005af65 --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_errors.py @@ -0,0 +1,56 @@ +""" +Exception hierarchy: + +* BaseClientError + x BaseRequestError + + ClientHttpError + + UnexpectedStatusError + x WrongReturnType +""" + +from httpx import Response + + +class BaseClientError(Exception): + """ + Used as based for all the raised errors + """ + + +class _WrongReturnType(BaseClientError): + """ + used internally to signal the user that the defined method + has an invalid return time annotation + """ + + def __init__(self, method, return_annotation) -> None: + super().__init__( + ( + f"{method=} should return an instance " + f"of {Response}, not '{return_annotation}'!" + ) + ) + + +class BaseClientHTTPError(BaseClientError): + """Base class to wrap all http related client errors""" + + +class ClientHttpError(BaseClientHTTPError): + """used to captures all httpx.HttpError""" + + def __init__(self, error: Exception) -> None: + super().__init__() + self.error: Exception = error + + +class UnexpectedStatusError(BaseClientHTTPError): + """raised when the status of the request is not the one it was expected""" + + def __init__(self, response: Response, expecting: int) -> None: + message = ( + f"Expected status: {expecting}, got {response.status_code} for: {response.url}: " + f"headers={response.headers}, body='{response.text}'" + ) + super().__init__(message) + self.response = response diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_public.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_public.py new file mode 100644 index 00000000000..688d59e6971 --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_public.py @@ -0,0 +1,311 @@ +import logging +from collections import deque +from typing import Any, Optional + +from fastapi import FastAPI, status +from models_library.projects import ProjectID +from models_library.projects_networks import DockerNetworkAlias +from pydantic import AnyHttpUrl +from servicelib.utils import logged_gather + +from ....models.schemas.dynamic_services import SchedulerData +from ....modules.dynamic_sidecar.docker_api import get_or_create_networks_ids +from ....utils.logging_utils import log_decorator +from ..errors import EntrypointContainerNotFoundError, NodeportsDidNotFindNodeError +from ._errors import BaseClientHTTPError, UnexpectedStatusError +from ._thin import ThinDynamicSidecarClient + +logger = logging.getLogger(__name__) + + +class DynamicSidecarClient: + def __init__(self, app: FastAPI): + self.thin_client: ThinDynamicSidecarClient = ThinDynamicSidecarClient(app) + + async def is_healthy(self, dynamic_sidecar_endpoint: AnyHttpUrl) -> bool: + """returns True if service is UP and running else False""" + try: + # this request uses a very short timeout + response = await self.thin_client.get_health(dynamic_sidecar_endpoint) + return response.json()["is_healthy"] + except BaseClientHTTPError: + return False + + @log_decorator(logger=logger) + async def containers_inspect( + self, dynamic_sidecar_endpoint: AnyHttpUrl + ) -> dict[str, Any]: + """ + returns dict containing docker inspect result form + all dynamic-sidecar started containers + """ + response = await self.thin_client.get_containers( + dynamic_sidecar_endpoint, only_status=False + ) + return response.json() + + @log_decorator(logger=logger) + async def containers_docker_status( + self, dynamic_sidecar_endpoint: AnyHttpUrl + ) -> dict[str, dict[str, str]]: + try: + response = await self.thin_client.get_containers( + dynamic_sidecar_endpoint, only_status=True + ) + return response.json() + except UnexpectedStatusError: + return {} + + @log_decorator(logger=logger) + async def start_service_creation( + self, dynamic_sidecar_endpoint: AnyHttpUrl, compose_spec: str + ) -> None: + response = await self.thin_client.post_containers( + dynamic_sidecar_endpoint, compose_spec=compose_spec + ) + logger.info("Spec submit result %s", response.text) + + @log_decorator(logger=logger) + async def begin_service_destruction( + self, dynamic_sidecar_endpoint: AnyHttpUrl + ) -> None: + """runs docker compose down on the started spec""" + response = await self.thin_client.post_containers_down(dynamic_sidecar_endpoint) + logger.info("Compose down result %s", response.text) + + @log_decorator(logger=logger) + async def service_save_state(self, dynamic_sidecar_endpoint: AnyHttpUrl) -> None: + await self.thin_client.post_containers_state_save(dynamic_sidecar_endpoint) + + @log_decorator(logger=logger) + async def service_restore_state(self, dynamic_sidecar_endpoint: AnyHttpUrl) -> None: + await self.thin_client.post_containers_state_restore(dynamic_sidecar_endpoint) + + @log_decorator(logger=logger) + async def service_pull_input_ports( + self, + dynamic_sidecar_endpoint: AnyHttpUrl, + port_keys: Optional[list[str]] = None, + ) -> int: + port_keys = [] if port_keys is None else port_keys + response = await self.thin_client.post_containers_ports_inputs_pull( + dynamic_sidecar_endpoint, port_keys=port_keys + ) + return int(response.text) + + @log_decorator(logger=logger) + async def service_disable_dir_watcher( + self, dynamic_sidecar_endpoint: AnyHttpUrl + ) -> None: + await self.thin_client.patch_containers_directory_watcher( + dynamic_sidecar_endpoint, is_enabled=False + ) + + @log_decorator(logger=logger) + async def service_enable_dir_watcher( + self, dynamic_sidecar_endpoint: AnyHttpUrl + ) -> None: + await self.thin_client.patch_containers_directory_watcher( + dynamic_sidecar_endpoint, is_enabled=True + ) + + @log_decorator(logger=logger) + async def service_outputs_create_dirs( + self, dynamic_sidecar_endpoint: AnyHttpUrl, outputs_labels: dict[str, Any] + ) -> None: + await self.thin_client.post_containers_ports_outputs_dirs( + dynamic_sidecar_endpoint, outputs_labels=outputs_labels + ) + + @log_decorator(logger=logger) + async def service_pull_output_ports( + self, + dynamic_sidecar_endpoint: AnyHttpUrl, + port_keys: Optional[list[str]] = None, + ) -> int: + response = await self.thin_client.post_containers_ports_outputs_pull( + dynamic_sidecar_endpoint, port_keys=port_keys + ) + return int(response.text) + + @log_decorator(logger=logger) + async def service_push_output_ports( + self, + dynamic_sidecar_endpoint: AnyHttpUrl, + port_keys: Optional[list[str]] = None, + ) -> None: + port_keys = [] if port_keys is None else port_keys + try: + await self.thin_client.post_containers_ports_outputs_push( + dynamic_sidecar_endpoint, port_keys=port_keys + ) + except UnexpectedStatusError as e: + if e.response.status_code == status.HTTP_404_NOT_FOUND: + json_error = e.response.json() + if json_error.get("code") == "dynamic_sidecar.nodeports.node_not_found": + raise NodeportsDidNotFindNodeError( + node_uuid=json_error["node_uuid"] + ) from e + raise e + + @log_decorator(logger=logger) + async def get_entrypoint_container_name( + self, dynamic_sidecar_endpoint: AnyHttpUrl, dynamic_sidecar_network_name: str + ) -> str: + """ + While this API raises EntrypointContainerNotFoundError + it should be called again, because in the menwhile the containers + might still be starting. + """ + try: + response = await self.thin_client.get_containers_name( + dynamic_sidecar_endpoint, + dynamic_sidecar_network_name=dynamic_sidecar_network_name, + ) + return response.json() + except UnexpectedStatusError as e: + if e.response.status_code == status.HTTP_404_NOT_FOUND: + raise EntrypointContainerNotFoundError() from e + raise e + + @log_decorator(logger=logger) + async def restart_containers(self, dynamic_sidecar_endpoint: AnyHttpUrl) -> None: + """ + runs docker-compose stop and docker-compose start in succession + resulting in a container restart without loosing state + """ + await self.thin_client.post_containers_restart(dynamic_sidecar_endpoint) + + async def _attach_container_to_network( + self, + dynamic_sidecar_endpoint: AnyHttpUrl, + container_id: str, + network_id: str, + network_aliases: list[str], + ) -> None: + """attaches a container to a network if not already attached""" + await self.thin_client.post_containers_networks_attach( + dynamic_sidecar_endpoint, + container_id=container_id, + network_id=network_id, + network_aliases=network_aliases, + ) + + async def _detach_container_from_network( + self, dynamic_sidecar_endpoint: AnyHttpUrl, container_id: str, network_id: str + ) -> None: + """detaches a container from a network if not already detached""" + await self.thin_client.post_containers_networks_detach( + dynamic_sidecar_endpoint, container_id=container_id, network_id=network_id + ) + + async def attach_service_containers_to_project_network( + self, + dynamic_sidecar_endpoint: AnyHttpUrl, + dynamic_sidecar_network_name: str, + project_network: str, + project_id: ProjectID, + network_alias: DockerNetworkAlias, + ) -> None: + """All containers spawned by the dynamic-sidecar need to be attached to the project network""" + try: + containers_status = await self.containers_docker_status( + dynamic_sidecar_endpoint=dynamic_sidecar_endpoint + ) + except BaseClientHTTPError: + # if no containers are found it is ok to skip the operations, + # there are no containers to attach the network to + return + + sorted_container_names = sorted(containers_status.keys()) + + entrypoint_container_name = await self.get_entrypoint_container_name( + dynamic_sidecar_endpoint=dynamic_sidecar_endpoint, + dynamic_sidecar_network_name=dynamic_sidecar_network_name, + ) + + network_names_to_ids: dict[str, str] = await get_or_create_networks_ids( + [project_network], project_id + ) + network_id = network_names_to_ids[project_network] + + tasks = deque() + + for k, container_name in enumerate(sorted_container_names): + # by default we attach `alias-0`, `alias-1`, etc... + # to all containers + aliases = [f"{network_alias}-{k}"] + if container_name == entrypoint_container_name: + # by definition the entrypoint container will be exposed as the `alias` + aliases.append(network_alias) + + tasks.append( + self._attach_container_to_network( + dynamic_sidecar_endpoint=dynamic_sidecar_endpoint, + container_id=container_name, + network_id=network_id, + network_aliases=aliases, + ) + ) + + await logged_gather(*tasks) + + async def detach_service_containers_from_project_network( + self, + dynamic_sidecar_endpoint: AnyHttpUrl, + project_network: str, + project_id: ProjectID, + ) -> None: + # the network needs to be detached from all started containers + try: + containers_status = await self.containers_docker_status( + dynamic_sidecar_endpoint=dynamic_sidecar_endpoint + ) + except BaseClientHTTPError: + # if no containers are found it is ok to skip the operations, + # there are no containers to detach the network from + return + + network_names_to_ids: dict[str, str] = await get_or_create_networks_ids( + [project_network], project_id + ) + network_id = network_names_to_ids[project_network] + + await logged_gather( + *[ + self._detach_container_from_network( + dynamic_sidecar_endpoint=dynamic_sidecar_endpoint, + container_id=container_name, + network_id=network_id, + ) + for container_name in containers_status + ] + ) + + +async def setup(app: FastAPI) -> None: + logger.debug("dynamic-sidecar api client setup") + app.state.dynamic_sidecar_api_client = DynamicSidecarClient(app) + + +async def shutdown(app: FastAPI) -> None: + logger.debug("dynamic-sidecar api client closing...") + client: Optional[DynamicSidecarClient] + if client := app.state.dynamic_sidecar_api_client: + await client.thin_client.close() + + +def get_dynamic_sidecar_client(app: FastAPI) -> DynamicSidecarClient: + assert app.state.dynamic_sidecar_api_client # nosec + return app.state.dynamic_sidecar_api_client + + +async def get_dynamic_sidecar_service_health( + app: FastAPI, scheduler_data: SchedulerData +) -> None: + api_client = get_dynamic_sidecar_client(app) + service_endpoint = scheduler_data.dynamic_sidecar.endpoint + + # update service health + is_healthy = await api_client.is_healthy(service_endpoint) + scheduler_data.dynamic_sidecar.is_available = is_healthy diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_thin.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_thin.py new file mode 100644 index 00000000000..afbb819a81c --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_thin.py @@ -0,0 +1,223 @@ +import json +import logging +from typing import Any, Optional + +from fastapi import FastAPI, status +from httpx import AsyncClient, Response, Timeout +from pydantic import AnyHttpUrl + +from ....core.settings import DynamicSidecarSettings +from ._base import BaseThinClient, expect_status, retry_on_errors + +logger = logging.getLogger(__name__) + + +class ThinDynamicSidecarClient(BaseThinClient): + """ + NOTE: all calls can raise the following errors. + - `UnexpectedStatusError` + - `ClientHttpError` wraps httpx.HttpError errors + """ + + API_VERSION = "v1" + + def __init__(self, app: FastAPI): + settings: DynamicSidecarSettings = ( + app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR + ) + + self._client = AsyncClient( + timeout=Timeout( + settings.DYNAMIC_SIDECAR_API_REQUEST_TIMEOUT, + connect=settings.DYNAMIC_SIDECAR_API_CONNECT_TIMEOUT, + ) + ) + self._request_max_retries: int = ( + settings.DYNAMIC_SIDECAR_API_CLIENT_REQUEST_MAX_RETRIES + ) + + # timeouts + self._health_request_timeout = Timeout(1.0, connect=1.0) + self._save_restore_timeout = Timeout( + settings.DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT, + connect=settings.DYNAMIC_SIDECAR_API_CONNECT_TIMEOUT, + ) + self._restart_containers_timeout = Timeout( + settings.DYNAMIC_SIDECAR_API_RESTART_CONTAINERS_TIMEOUT, + connect=settings.DYNAMIC_SIDECAR_API_CONNECT_TIMEOUT, + ) + self._attach_detach_network_timeout = Timeout( + settings.DYNAMIC_SIDECAR_PROJECT_NETWORKS_ATTACH_DETACH_S, + connect=settings.DYNAMIC_SIDECAR_API_CONNECT_TIMEOUT, + ) + + super().__init__(request_max_retries=self._request_max_retries) + + def _get_url( + self, + dynamic_sidecar_endpoint: AnyHttpUrl, + postfix: str, + no_api_version: bool = False, + ) -> str: + """formats and returns an url for the request""" + api_version = "" if no_api_version else f"/{self.API_VERSION}" + return f"{dynamic_sidecar_endpoint}{api_version}{postfix}" + + @retry_on_errors + @expect_status(status.HTTP_200_OK) + async def get_health(self, dynamic_sidecar_endpoint: AnyHttpUrl) -> Response: + url = self._get_url(dynamic_sidecar_endpoint, "/health", no_api_version=True) + return await self._client.get(url, timeout=self._health_request_timeout) + + @retry_on_errors + @expect_status(status.HTTP_200_OK) + async def get_containers( + self, dynamic_sidecar_endpoint: AnyHttpUrl, *, only_status: bool + ) -> Response: + url = self._get_url(dynamic_sidecar_endpoint, "/containers") + return await self._client.get(url, params=dict(only_status=only_status)) + + @retry_on_errors + @expect_status(status.HTTP_202_ACCEPTED) + async def post_containers( + self, dynamic_sidecar_endpoint: AnyHttpUrl, *, compose_spec: str + ) -> Response: + # NOTE: this sometimes takes longer that the default timeout, maybe raise timeout here as well! + url = self._get_url(dynamic_sidecar_endpoint, "/containers") + return await self._client.post(url, data=compose_spec) + + @retry_on_errors + @expect_status(status.HTTP_200_OK) + async def post_containers_down( + self, dynamic_sidecar_endpoint: AnyHttpUrl + ) -> Response: + url = self._get_url(dynamic_sidecar_endpoint, "/containers:down") + return await self._client.post(url) + + @retry_on_errors + @expect_status(status.HTTP_204_NO_CONTENT) + async def post_containers_state_save( + self, dynamic_sidecar_endpoint: AnyHttpUrl + ) -> Response: + url = self._get_url(dynamic_sidecar_endpoint, "/containers/state:save") + return await self._client.post(url, timeout=self._save_restore_timeout) + + @retry_on_errors + @expect_status(status.HTTP_204_NO_CONTENT) + async def post_containers_state_restore( + self, dynamic_sidecar_endpoint: AnyHttpUrl + ) -> Response: + url = self._get_url(dynamic_sidecar_endpoint, "/containers/state:restore") + return await self._client.post(url, timeout=self._save_restore_timeout) + + @retry_on_errors + @expect_status(status.HTTP_200_OK) + async def post_containers_ports_inputs_pull( + self, + dynamic_sidecar_endpoint: AnyHttpUrl, + *, + port_keys: Optional[list[str]] = None, + ) -> Response: + url = self._get_url(dynamic_sidecar_endpoint, "/containers/ports/inputs:pull") + port_keys = [] if port_keys is None else port_keys + return await self._client.post( + url, json=port_keys, timeout=self._save_restore_timeout + ) + + @retry_on_errors + @expect_status(status.HTTP_204_NO_CONTENT) + async def patch_containers_directory_watcher( + self, dynamic_sidecar_endpoint: AnyHttpUrl, *, is_enabled: bool + ) -> Response: + url = self._get_url(dynamic_sidecar_endpoint, "/containers/directory-watcher") + return await self._client.patch(url, json=dict(is_enabled=is_enabled)) + + @retry_on_errors + @expect_status(status.HTTP_204_NO_CONTENT) + async def post_containers_ports_outputs_dirs( + self, dynamic_sidecar_endpoint: AnyHttpUrl, *, outputs_labels: dict[str, Any] + ) -> Response: + url = self._get_url(dynamic_sidecar_endpoint, "/containers/ports/outputs/dirs") + return await self._client.post(url, json=dict(outputs_labels=outputs_labels)) + + @retry_on_errors + @expect_status(status.HTTP_200_OK) + async def post_containers_ports_outputs_pull( + self, + dynamic_sidecar_endpoint: AnyHttpUrl, + *, + port_keys: Optional[list[str]] = None, + ) -> Response: + url = self._get_url(dynamic_sidecar_endpoint, "/containers/ports/outputs:pull") + return await self._client.post( + url, json=port_keys, timeout=self._save_restore_timeout + ) + + @retry_on_errors + @expect_status(status.HTTP_204_NO_CONTENT) + async def post_containers_ports_outputs_push( + self, + dynamic_sidecar_endpoint: AnyHttpUrl, + *, + port_keys: Optional[list[str]] = None, + ) -> Response: + url = self._get_url(dynamic_sidecar_endpoint, "/containers/ports/outputs:push") + return await self._client.post( + url, json=port_keys, timeout=self._save_restore_timeout + ) + + @retry_on_errors + @expect_status(status.HTTP_200_OK) + async def get_containers_name( + self, dynamic_sidecar_endpoint: AnyHttpUrl, *, dynamic_sidecar_network_name: str + ) -> Response: + filters = json.dumps({"network": dynamic_sidecar_network_name}) + url = self._get_url( + dynamic_sidecar_endpoint, f"/containers/name?filters={filters}" + ) + return await self._client.get(url=url) + + @retry_on_errors + @expect_status(status.HTTP_204_NO_CONTENT) + async def post_containers_restart( + self, dynamic_sidecar_endpoint: AnyHttpUrl + ) -> Response: + url = self._get_url(dynamic_sidecar_endpoint, "/containers:restart") + return await self._client.post(url, timeout=self._restart_containers_timeout) + + @retry_on_errors + @expect_status(status.HTTP_204_NO_CONTENT) + async def post_containers_networks_attach( + self, + dynamic_sidecar_endpoint: AnyHttpUrl, + *, + container_id: str, + network_id: str, + network_aliases: list[str], + ) -> Response: + url = self._get_url( + dynamic_sidecar_endpoint, f"/containers/{container_id}/networks:attach" + ) + return await self._client.post( + url, + json=dict(network_id=network_id, network_aliases=network_aliases), + timeout=self._attach_detach_network_timeout, + ) + + @retry_on_errors + @expect_status(status.HTTP_204_NO_CONTENT) + async def post_containers_networks_detach( + self, + dynamic_sidecar_endpoint: AnyHttpUrl, + *, + container_id: str, + network_id: str, + ) -> Response: + url = self._get_url( + dynamic_sidecar_endpoint, f"/containers/{container_id}/networks:detach" + ) + return await self._client.post( + url, + json=dict(network_id=network_id), + timeout=self._attach_detach_network_timeout, + ) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/client_api.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/client_api.py deleted file mode 100644 index 654757e5dd3..00000000000 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/client_api.py +++ /dev/null @@ -1,447 +0,0 @@ -import json -import logging -from collections import deque -from typing import Any, Optional - -import httpx -from fastapi import FastAPI -from models_library.projects import ProjectID -from models_library.projects_networks import DockerNetworkAlias -from servicelib.utils import logged_gather -from starlette import status - -from ...core.settings import DynamicSidecarSettings -from ...models.schemas.dynamic_services import SchedulerData -from ...modules.dynamic_sidecar.docker_api import get_or_create_networks_ids -from ...utils.logging_utils import log_decorator -from .errors import ( - DynamicSidecarUnexpectedResponseStatus, - EntrypointContainerNotFoundError, - NodeportsDidNotFindNodeError, -) - -# PC -> SAN improvements to discuss -# -# TODO: Use logger, not logging! -# - compose error msgs instead of log functions -# TODO: Single instance of httpx client for all requests?: https://www.python-httpx.org/advanced/#why-use-a-client -# - see services/api-server/src/simcore_service_api_server/utils/client_base.py (-> move to servicelib/fastapi ?) -# TODO: context to unify session's error handling and logging -# TODO: client function names equal/very similar to server handlers -# - -logger = logging.getLogger(__name__) - - -def get_url(dynamic_sidecar_endpoint: str, postfix: str) -> str: - """formats and returns an url for the request""" - url = f"{dynamic_sidecar_endpoint}{postfix}" - return url - - -def log_httpx_http_error(url: str, method: str, formatted_traceback: str) -> None: - # mainly used to debug issues with the API - logging.debug( - ( - "%s -> %s generated:\n %s\nThe above logs can safely " - "be ignored, except when debugging an issue " - "regarding the dynamic-sidecar" - ), - method, - url, - formatted_traceback, - ) - - -class DynamicSidecarClient: - """Will handle connections to the service sidecar""" - - API_VERSION = "v1" - - def __init__(self, app: FastAPI): - settings: DynamicSidecarSettings = ( - app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR - ) - - self._app: FastAPI = app - - self._client = httpx.AsyncClient( - timeout=httpx.Timeout( - settings.DYNAMIC_SIDECAR_API_REQUEST_TIMEOUT, - connect=settings.DYNAMIC_SIDECAR_API_CONNECT_TIMEOUT, - ) - ) - - # timeouts - self._health_request_timeout = httpx.Timeout( - settings.DYNAMIC_SIDECAR_STATUS_API_TIMEOUT_S - ) - self._save_restore_timeout = httpx.Timeout( - settings.DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT, - connect=settings.DYNAMIC_SIDECAR_API_CONNECT_TIMEOUT, - ) - self._restart_containers_timeout = httpx.Timeout( - settings.DYNAMIC_SIDECAR_API_RESTART_CONTAINERS_TIMEOUT, - connect=settings.DYNAMIC_SIDECAR_API_CONNECT_TIMEOUT, - ) - self._attach_detach_network_timeout = httpx.Timeout( - settings.DYNAMIC_SIDECAR_PROJECT_NETWORKS_ATTACH_DETACH_S, - connect=settings.DYNAMIC_SIDECAR_API_CONNECT_TIMEOUT, - ) - - async def is_healthy(self, dynamic_sidecar_endpoint: str) -> bool: - """returns True if service is UP and running else False""" - url = get_url(dynamic_sidecar_endpoint, "/health") - try: - # this request uses a very short timeout - response = await self._client.get( - url=url, timeout=self._health_request_timeout - ) - response.raise_for_status() - - return response.json()["is_healthy"] - except httpx.HTTPError: - return False - - async def close(self) -> None: - await self._client.aclose() - - @log_decorator(logger=logger) - async def containers_inspect(self, dynamic_sidecar_endpoint: str) -> dict[str, Any]: - """ - returns dict containing docker inspect result form - all dynamic-sidecar started containers - """ - url = get_url(dynamic_sidecar_endpoint, f"/{self.API_VERSION}/containers") - - response = await self._client.get(url=url) - if response.status_code != status.HTTP_200_OK: - raise DynamicSidecarUnexpectedResponseStatus(response) - - return response.json() - - @log_decorator(logger=logger) - async def containers_docker_status( - self, dynamic_sidecar_endpoint: str - ) -> dict[str, dict[str, str]]: - url = get_url(dynamic_sidecar_endpoint, f"/{self.API_VERSION}/containers") - - response = await self._client.get(url=url, params=dict(only_status=True)) - if response.status_code != status.HTTP_200_OK: - logging.warning( - "Unexpected response: status=%s, body=%s", - response.status_code, - response.text, - ) - return {} - - return response.json() - - @log_decorator(logger=logger) - async def start_service_creation( - self, dynamic_sidecar_endpoint: str, compose_spec: str - ) -> None: - url = get_url(dynamic_sidecar_endpoint, f"/{self.API_VERSION}/containers") - response = await self._client.post(url, data=compose_spec) - if response.status_code != status.HTTP_202_ACCEPTED: - raise DynamicSidecarUnexpectedResponseStatus(response, "service creation") - - # request was ok - logger.info("Spec submit result %s", response.text) - - @log_decorator(logger=logger) - async def begin_service_destruction(self, dynamic_sidecar_endpoint: str) -> None: - """runs docker compose down on the started spec""" - url = get_url(dynamic_sidecar_endpoint, f"/{self.API_VERSION}/containers:down") - - response = await self._client.post(url) - if response.status_code != status.HTTP_200_OK: - raise DynamicSidecarUnexpectedResponseStatus( - response, "service destruction" - ) - - logger.info("Compose down result %s", response.text) - - @log_decorator(logger=logger) - async def service_save_state(self, dynamic_sidecar_endpoint: str) -> None: - url = get_url(dynamic_sidecar_endpoint, "/v1/containers/state:save") - - response = await self._client.post(url, timeout=self._save_restore_timeout) - if response.status_code != status.HTTP_204_NO_CONTENT: - raise DynamicSidecarUnexpectedResponseStatus(response, "state saving") - - @log_decorator(logger=logger) - async def service_restore_state(self, dynamic_sidecar_endpoint: str) -> None: - url = get_url(dynamic_sidecar_endpoint, "/v1/containers/state:restore") - - response = await self._client.post(url, timeout=self._save_restore_timeout) - if response.status_code != status.HTTP_204_NO_CONTENT: - raise DynamicSidecarUnexpectedResponseStatus(response, "state restore") - - @log_decorator(logger=logger) - async def service_pull_input_ports( - self, dynamic_sidecar_endpoint: str, port_keys: Optional[list[str]] = None - ) -> int: - port_keys = [] if port_keys is None else port_keys - url = get_url(dynamic_sidecar_endpoint, "/v1/containers/ports/inputs:pull") - - response = await self._client.post( - url, json=port_keys, timeout=self._save_restore_timeout - ) - if response.status_code != status.HTTP_200_OK: - raise DynamicSidecarUnexpectedResponseStatus(response, "pull input ports") - return int(response.text) - - @log_decorator(logger=logger) - async def service_disable_dir_watcher(self, dynamic_sidecar_endpoint: str) -> None: - url = get_url(dynamic_sidecar_endpoint, "/v1/containers/directory-watcher") - - response = await self._client.patch(url, json=dict(is_enabled=False)) - if response.status_code != status.HTTP_204_NO_CONTENT: - raise DynamicSidecarUnexpectedResponseStatus( - response, "disable dir watcher" - ) - - @log_decorator(logger=logger) - async def service_enable_dir_watcher(self, dynamic_sidecar_endpoint: str) -> None: - url = get_url(dynamic_sidecar_endpoint, "/v1/containers/directory-watcher") - - response = await self._client.patch(url, json=dict(is_enabled=True)) - if response.status_code != status.HTTP_204_NO_CONTENT: - raise DynamicSidecarUnexpectedResponseStatus(response, "enable dir watcher") - - @log_decorator(logger=logger) - async def service_outputs_create_dirs( - self, dynamic_sidecar_endpoint: str, outputs_labels: dict[str, Any] - ) -> None: - url = get_url(dynamic_sidecar_endpoint, "/v1/containers/ports/outputs/dirs") - - response = await self._client.post( - url, json=dict(outputs_labels=outputs_labels) - ) - if response.status_code != status.HTTP_204_NO_CONTENT: - raise DynamicSidecarUnexpectedResponseStatus( - response, "output dir creation" - ) - - @log_decorator(logger=logger) - async def service_pull_output_ports( - self, dynamic_sidecar_endpoint: str, port_keys: Optional[list[str]] = None - ) -> int: - port_keys = [] if port_keys is None else port_keys - url = get_url(dynamic_sidecar_endpoint, "/v1/containers/ports/outputs:pull") - - response = await self._client.post( - url, json=port_keys, timeout=self._save_restore_timeout - ) - if response.status_code != status.HTTP_200_OK: - raise DynamicSidecarUnexpectedResponseStatus(response, "output ports pull") - return int(response.text) - - @log_decorator(logger=logger) - async def service_push_output_ports( - self, dynamic_sidecar_endpoint: str, port_keys: Optional[list[str]] = None - ) -> None: - port_keys = [] if port_keys is None else port_keys - url = get_url(dynamic_sidecar_endpoint, "/v1/containers/ports/outputs:push") - - response = await self._client.post( - url, json=port_keys, timeout=self._save_restore_timeout - ) - if response.status_code == status.HTTP_404_NOT_FOUND: - json_error = response.json() - if json_error.get("code") == "dynamic_sidecar.nodeports.node_not_found": - raise NodeportsDidNotFindNodeError(node_uuid=json_error["node_uuid"]) - - if response.status_code != status.HTTP_204_NO_CONTENT: - raise DynamicSidecarUnexpectedResponseStatus(response, "output ports push") - - @log_decorator(logger=logger) - async def get_entrypoint_container_name( - self, dynamic_sidecar_endpoint: str, dynamic_sidecar_network_name: str - ) -> str: - """ - While this API raises EntrypointContainerNotFoundError - it should be called again, because in the menwhile the containers - might still be starting. - """ - filters = json.dumps({"network": dynamic_sidecar_network_name}) - url = get_url( - dynamic_sidecar_endpoint, - f"/{self.API_VERSION}/containers/name?filters={filters}", - ) - - response = await self._client.get(url=url) - if response.status_code == status.HTTP_404_NOT_FOUND: - raise EntrypointContainerNotFoundError() - response.raise_for_status() - - return response.json() - - @log_decorator(logger=logger) - async def restart_containers(self, dynamic_sidecar_endpoint: str) -> None: - """ - runs docker-compose stop and docker-compose start in succession - resulting in a container restart without loosing state - """ - url = get_url( - dynamic_sidecar_endpoint, f"/{self.API_VERSION}/containers:restart" - ) - - response = await self._client.post( - url=url, timeout=self._restart_containers_timeout - ) - if response.status_code != status.HTTP_204_NO_CONTENT: - raise DynamicSidecarUnexpectedResponseStatus(response, "containers restart") - - async def _attach_container_to_network( - self, - dynamic_sidecar_endpoint: str, - container_id: str, - network_id: str, - network_aliases: list[str], - ) -> None: - """attaches a container to a network if not already attached""" - url = get_url( - dynamic_sidecar_endpoint, f"/v1/containers/{container_id}/networks:attach" - ) - data = dict(network_id=network_id, network_aliases=network_aliases) - - async with httpx.AsyncClient( - timeout=self._attach_detach_network_timeout - ) as client: - response = await client.post(url, json=data) - if response.status_code != status.HTTP_204_NO_CONTENT: - raise DynamicSidecarUnexpectedResponseStatus( - response, "attach containers to network" - ) - - async def _detach_container_from_network( - self, dynamic_sidecar_endpoint: str, container_id: str, network_id: str - ) -> None: - """detaches a container from a network if not already detached""" - url = get_url( - dynamic_sidecar_endpoint, f"/v1/containers/{container_id}/networks:detach" - ) - data = dict(network_id=network_id) - - async with httpx.AsyncClient( - timeout=self._attach_detach_network_timeout - ) as client: - response = await client.post(url, json=data) - if response.status_code != status.HTTP_204_NO_CONTENT: - raise DynamicSidecarUnexpectedResponseStatus( - response, "detach containers from network" - ) - - async def attach_service_containers_to_project_network( - self, - dynamic_sidecar_endpoint: str, - dynamic_sidecar_network_name: str, - project_network: str, - project_id: ProjectID, - network_alias: DockerNetworkAlias, - ) -> None: - """All containers spawned by the dynamic-sidecar need to be attached to the project network""" - try: - containers_status = await self.containers_docker_status( - dynamic_sidecar_endpoint=dynamic_sidecar_endpoint - ) - except httpx.HTTPError: - return - - sorted_container_names = sorted(containers_status.keys()) - - entrypoint_container_name = await self.get_entrypoint_container_name( - dynamic_sidecar_endpoint=dynamic_sidecar_endpoint, - dynamic_sidecar_network_name=dynamic_sidecar_network_name, - ) - - network_names_to_ids: dict[str, str] = await get_or_create_networks_ids( - [project_network], project_id - ) - network_id = network_names_to_ids[project_network] - - tasks = deque() - - for k, container_name in enumerate(sorted_container_names): - # by default we attach `alias-0`, `alias-1`, etc... - # to all containers - aliases = [f"{network_alias}-{k}"] - if container_name == entrypoint_container_name: - # by definition the entrypoint container will be exposed as the `alias` - aliases.append(network_alias) - - tasks.append( - self._attach_container_to_network( - dynamic_sidecar_endpoint=dynamic_sidecar_endpoint, - container_id=container_name, - network_id=network_id, - network_aliases=aliases, - ) - ) - - await logged_gather(*tasks) - - async def detach_service_containers_from_project_network( - self, dynamic_sidecar_endpoint: str, project_network: str, project_id: ProjectID - ) -> None: - # the network needs to be detached from all started containers - try: - containers_status = await self.containers_docker_status( - dynamic_sidecar_endpoint=dynamic_sidecar_endpoint - ) - except httpx.HTTPError: - return - - network_names_to_ids: dict[str, str] = await get_or_create_networks_ids( - [project_network], project_id - ) - network_id = network_names_to_ids[project_network] - - await logged_gather( - *[ - self._detach_container_from_network( - dynamic_sidecar_endpoint=dynamic_sidecar_endpoint, - container_id=container_name, - network_id=network_id, - ) - for container_name in containers_status - ] - ) - - -async def setup_api_client(app: FastAPI) -> None: - logger.debug("dynamic-sidecar api client setup") - app.state.dynamic_sidecar_api_client = DynamicSidecarClient(app) - - -async def close_api_client(app: FastAPI) -> None: - logger.debug("dynamic-sidecar api client closing...") - client: Optional[DynamicSidecarClient] - if client := app.state.dynamic_sidecar_api_client: - # pylint: disable=protected-access - logger.debug( - "REQUESTS WHILE CLOSING %s", - [ - (r.request.method, r.request.url, r.request.headers) - for r in client._client._transport._pool._requests - ], - ) - await client.close() - - -def get_dynamic_sidecar_client(app: FastAPI) -> DynamicSidecarClient: - assert app.state.dynamic_sidecar_api_client # nosec - return app.state.dynamic_sidecar_api_client - - -async def update_dynamic_sidecar_health( - app: FastAPI, scheduler_data: SchedulerData -) -> None: - api_client = get_dynamic_sidecar_client(app) - service_endpoint = scheduler_data.dynamic_sidecar.endpoint - - # update service health - is_healthy = await api_client.is_healthy(service_endpoint) - scheduler_data.dynamic_sidecar.is_available = is_healthy diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/errors.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/errors.py index d0ff009df1b..90170b81cb1 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/errors.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/errors.py @@ -1,7 +1,4 @@ -from typing import Optional - from aiodocker.exceptions import DockerError -from httpx import Response from models_library.projects_nodes import NodeID from pydantic.errors import PydanticErrorMixin @@ -27,13 +24,6 @@ def __init__(self, node_uuid: NodeID): super().__init__(f"node {node_uuid} not found") -class DynamicSchedulerException(DirectorException): - """ - Used to signal that something was wrong with during - the service's observation. - """ - - class EntrypointContainerNotFoundError(DirectorException): """Raised while the entrypoint container was nto yet started""" @@ -42,19 +32,6 @@ class LegacyServiceIsNotSupportedError(DirectorException): """This API is not implemented by the director-v0""" -class DynamicSidecarUnexpectedResponseStatus(DirectorException): - """Used to signal that there was an issue with a request""" - - def __init__(self, response: Response, msg: Optional[str] = None): - formatted_tag = f"[during {msg}]" if msg is not None else "" - message = ( - f"Unexpected response {formatted_tag}: status={response.status_code}, " - f"url={response.url}, body={response.text}" - ) - super().__init__(message) - self.response = response - - class NodeportsDidNotFindNodeError(PydanticErrorMixin, DirectorException): code = "dynamic_scheduler.output_ports_pulling.node_not_found" msg_template = ( diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/module_setup.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/module_setup.py index 6202c3e194c..a38b3fcbde5 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/module_setup.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/module_setup.py @@ -1,17 +1,16 @@ from fastapi import FastAPI -from .client_api import close_api_client, setup_api_client -from .scheduler import setup_scheduler, shutdown_scheduler +from . import api_client, scheduler def setup(app: FastAPI) -> None: async def on_startup() -> None: - await setup_api_client(app) - await setup_scheduler(app) + await api_client.setup(app) + await scheduler.setup_scheduler(app) async def on_shutdown() -> None: - await shutdown_scheduler(app) - await close_api_client(app) + await scheduler.shutdown_scheduler(app) + await api_client.shutdown(app) app.add_event_handler("startup", on_startup) app.add_event_handler("shutdown", on_shutdown) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/events.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/events.py index d1cd703296d..be58547e560 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/events.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/events.py @@ -2,7 +2,6 @@ import logging from typing import Any, Coroutine, Final, Optional, Type, cast -import httpx from fastapi import FastAPI from fastapi.encoders import jsonable_encoder from models_library.aiodocker_api import AioDockerServiceSpec @@ -28,7 +27,11 @@ from ...catalog import CatalogClient from ...db.repositories.projects import ProjectsRepository from ...db.repositories.projects_networks import ProjectsNetworksRepository -from ..client_api import DynamicSidecarClient, get_dynamic_sidecar_client +from ..api_client import ( + BaseClientHTTPError, + DynamicSidecarClient, + get_dynamic_sidecar_client, +) from ..docker_api import ( are_all_services_present, constrain_service_to_node, @@ -49,11 +52,7 @@ get_dynamic_sidecar_spec, merge_settings_before_use, ) -from ..errors import ( - DynamicSidecarUnexpectedResponseStatus, - EntrypointContainerNotFoundError, - NodeportsDidNotFindNodeError, -) +from ..errors import EntrypointContainerNotFoundError, NodeportsDidNotFindNodeError from .abc import DynamicSchedulerEvent from .events_utils import ( all_containers_running, @@ -232,7 +231,7 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: ] = await dynamic_sidecar_client.containers_inspect( dynamic_sidecar_endpoint ) - except (httpx.HTTPError, DynamicSidecarUnexpectedResponseStatus): + except BaseClientHTTPError: # After the service creation it takes a bit of time for the container to start # If the same message appears in the log multiple times in a row (for the same # service) something might be wrong with the service. @@ -508,10 +507,12 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: await dynamic_sidecar_client.begin_service_destruction( dynamic_sidecar_endpoint=scheduler_data.dynamic_sidecar.endpoint ) - # NOTE: ANE: need to use more specific exception here - except Exception as e: # pylint: disable=broad-except + except BaseClientHTTPError as e: logger.warning( - "Could not contact dynamic-sidecar to begin destruction of %s\n%s", + ( + "Could not contact dynamic-sidecar to begin destruction of " + "%s\n%s. Will continue service removal!" + ), scheduler_data.service_name, f"{e}", ) @@ -557,8 +558,7 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: logger.warning("%s", f"{err}") logger.info("Ports data pushed by dynamic-sidecar") - # NOTE: ANE: need to use more specific exception here - except Exception as e: # pylint: disable=broad-except + except BaseClientHTTPError as e: logger.warning( ( "Could not contact dynamic-sidecar to save service " @@ -570,6 +570,8 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: # ensure dynamic-sidecar does not get removed # user data can be manually saved and manual # cleanup of the dynamic-sidecar is required + # TODO: ANE: maybe have a mechanism stop the dynamic sidecar + # and make the director warn about hanging sidecars? raise e # remove the 2 services diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/events_utils.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/events_utils.py index fa0bf13f740..6bc42d698bb 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/events_utils.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/events_utils.py @@ -3,18 +3,19 @@ from typing import Any, AsyncIterator, Deque, Dict, List, Optional, Type from fastapi import FastAPI +from pydantic import AnyHttpUrl from ....api.dependencies.database import get_base_repository from ....models.schemas.dynamic_services import DockerContainerInspect from ....models.schemas.dynamic_services.scheduler import DockerStatus from ....modules.db.repositories import BaseRepository from ....modules.director_v0 import DirectorV0Client -from ..client_api import DynamicSidecarClient +from ..api_client import DynamicSidecarClient @asynccontextmanager async def disabled_directory_watcher( - dynamic_sidecar_client: DynamicSidecarClient, dynamic_sidecar_endpoint: str + dynamic_sidecar_client: DynamicSidecarClient, dynamic_sidecar_endpoint: AnyHttpUrl ) -> AsyncIterator[None]: try: # disable file system event watcher while writing diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/task.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/task.py index c35c77eecd3..2b7a9a4746c 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/task.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/task.py @@ -20,7 +20,6 @@ from typing import Optional from uuid import UUID -import httpx from fastapi import FastAPI from models_library.projects_networks import DockerNetworkAlias from models_library.projects_nodes_io import NodeID @@ -37,10 +36,11 @@ RunningDynamicServiceDetails, SchedulerData, ) -from ..client_api import ( +from ..api_client import ( + ClientHttpError, DynamicSidecarClient, get_dynamic_sidecar_client, - update_dynamic_sidecar_health, + get_dynamic_sidecar_service_health, ) from ..docker_api import ( are_all_services_present, @@ -88,7 +88,7 @@ async def _apply_observation_cycle( node_uuid=scheduler_data.node_uuid, can_save=scheduler_data.dynamic_sidecar.can_save_state, ) - await update_dynamic_sidecar_health(app, scheduler_data) + await get_dynamic_sidecar_service_health(app, scheduler_data) for dynamic_scheduler_event in REGISTERED_EVENTS: if await dynamic_scheduler_event.will_trigger( @@ -229,7 +229,7 @@ async def get_stack_status(self, node_uuid: NodeID) -> RunningDynamicServiceDeta ] = await dynamic_sidecar_client.containers_docker_status( dynamic_sidecar_endpoint=scheduler_data.dynamic_sidecar.endpoint ) - except httpx.HTTPError: + except ClientHttpError: # error fetching docker_statues, probably someone should check return RunningDynamicServiceDetails.from_scheduler_data( node_uuid=node_uuid, diff --git a/services/director-v2/tests/integration/02/test_dynamic_services_routes.py b/services/director-v2/tests/integration/02/test_dynamic_services_routes.py index a986c759725..6c4f64710a1 100644 --- a/services/director-v2/tests/integration/02/test_dynamic_services_routes.py +++ b/services/director-v2/tests/integration/02/test_dynamic_services_routes.py @@ -192,7 +192,7 @@ def mock_project_repository(mocker: MockerFixture) -> None: @pytest.fixture def mock_dynamic_sidecar_api_calls(mocker: MockerFixture) -> None: class_path = ( - f"{DIRECTOR_V2_MODULES}.dynamic_sidecar.client_api.DynamicSidecarClient" + f"{DIRECTOR_V2_MODULES}.dynamic_sidecar.api_client.DynamicSidecarClient" ) for function_name, return_value in [ ("service_save_state", None), diff --git a/services/director-v2/tests/integration/02/test_mixed_dynamic_sidecar_and_legacy_project.py b/services/director-v2/tests/integration/02/test_mixed_dynamic_sidecar_and_legacy_project.py index f32dda21d82..e12414b8fba 100644 --- a/services/director-v2/tests/integration/02/test_mixed_dynamic_sidecar_and_legacy_project.py +++ b/services/director-v2/tests/integration/02/test_mixed_dynamic_sidecar_and_legacy_project.py @@ -215,7 +215,7 @@ def mock_dynamic_sidecar_client(mocker: MockerFixture) -> None: ("service_push_output_ports", None), ]: mocker.patch( - f"simcore_service_director_v2.modules.dynamic_sidecar.client_api.DynamicSidecarClient.{method_name}", + f"simcore_service_director_v2.modules.dynamic_sidecar.api_client.DynamicSidecarClient.{method_name}", return_value=return_value, ) diff --git a/services/director-v2/tests/unit/conftest.py b/services/director-v2/tests/unit/conftest.py index 8cf04bfca0f..9f4c459d672 100644 --- a/services/director-v2/tests/unit/conftest.py +++ b/services/director-v2/tests/unit/conftest.py @@ -2,14 +2,24 @@ # pylint: disable=unused-argument import json +import logging import random import urllib.parse -from typing import Any, AsyncIterable, AsyncIterator, Callable, Iterator, Mapping +from typing import ( + Any, + AsyncIterable, + AsyncIterator, + Callable, + Iterable, + Iterator, + Mapping, +) import pytest import respx import traitlets.config from _dask_helpers import DaskGatewayServer +from _pytest.logging import LogCaptureFixture from _pytest.monkeypatch import MonkeyPatch from dask.distributed import Scheduler, Worker from dask_gateway_server.app import DaskGateway @@ -361,3 +371,9 @@ def mocked_catalog_service_api( ).respond(json=fake_service_specifications) yield respx_mock + + +@pytest.fixture() +def caplog_info_level(caplog: LogCaptureFixture) -> Iterable[LogCaptureFixture]: + with caplog.at_level(logging.INFO): + yield caplog diff --git a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_base.py b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_base.py new file mode 100644 index 00000000000..8908b1e9eff --- /dev/null +++ b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_base.py @@ -0,0 +1,198 @@ +# pylint:disable=redefined-outer-name + +import pytest +from _pytest.logging import LogCaptureFixture +from httpx import ( + ConnectError, + HTTPError, + PoolTimeout, + Request, + RequestError, + Response, + codes, +) +from pydantic import AnyHttpUrl, parse_obj_as +from respx import MockRouter +from simcore_service_director_v2.modules.dynamic_sidecar.api_client._base import ( + BaseThinClient, + expect_status, + retry_on_errors, +) +from simcore_service_director_v2.modules.dynamic_sidecar.api_client._errors import ( + ClientHttpError, + UnexpectedStatusError, + _WrongReturnType, +) + +# UTILS + + +class TestThickClient(BaseThinClient): + @retry_on_errors + async def get_provided_url(self, provided_url: str) -> Response: + return await self._client.get(provided_url) + + @retry_on_errors + async def get_retry_for_status(self) -> Response: + return await self._client.get("http://missing-host:1111") + + +# FIXTURES + + +@pytest.fixture +def thick_client() -> TestThickClient: + return TestThickClient(request_max_retries=1) + + +@pytest.fixture +def test_url() -> AnyHttpUrl: + return parse_obj_as(AnyHttpUrl, "http://missing-host:1111") + + +# TESTS + + +async def test_base_with_async_context_manager(test_url: AnyHttpUrl) -> None: + async with TestThickClient(request_max_retries=1) as client: + with pytest.raises(ClientHttpError): + await client.get_provided_url(test_url) + + +async def test_connection_error( + thick_client: TestThickClient, test_url: AnyHttpUrl +) -> None: + with pytest.raises(ClientHttpError) as exe_info: + await thick_client.get_provided_url(test_url) + + assert isinstance(exe_info.value, ClientHttpError) + assert isinstance(exe_info.value.error, ConnectError) + + +@pytest.mark.parametrize("retry_count", [2, 1]) +async def test_retry_on_errors( + retry_count: int, test_url: AnyHttpUrl, caplog_info_level: LogCaptureFixture +) -> None: + client = TestThickClient(request_max_retries=retry_count) + + with pytest.raises(ClientHttpError): + await client.get_provided_url(test_url) + + # check if the right amount of messages was captured by the logs + assert len(caplog_info_level.messages) == retry_count + for i, log_message in enumerate(caplog_info_level.messages): + assert log_message.startswith( + f"[{i+1}/{retry_count}]Retry. Unexpected ConnectError" + ) + + +@pytest.mark.parametrize("error_class", [ConnectError, PoolTimeout]) +@pytest.mark.parametrize("retry_count", [1, 2]) +async def test_retry_on_errors_by_error_type( + error_class: type[RequestError], + caplog_info_level: LogCaptureFixture, + retry_count: int, + test_url: AnyHttpUrl, +) -> None: + class ATestClient(BaseThinClient): + # pylint: disable=no-self-use + @retry_on_errors + async def raises_request_error(self) -> Response: + raise error_class( + "mock_connect_error", + request=Request(method="GET", url=test_url), + ) + + client = ATestClient(request_max_retries=retry_count) + + with pytest.raises(ClientHttpError): + await client.raises_request_error() + + log_count = retry_count + 1 if error_class == PoolTimeout else retry_count + assert len(caplog_info_level.messages) == log_count + + if error_class == PoolTimeout: + for i, retry_message in enumerate(caplog_info_level.messages[:-1]): + assert retry_message.startswith( + f"[{i+1}/{retry_count}]Retry. Unexpected PoolTimeout" + ) + connections_message = caplog_info_level.messages[-1] + assert connections_message == "Requests while event 'POOL TIMEOUT': []" + else: + for i, log_message in enumerate(caplog_info_level.messages): + assert log_message.startswith( + f"[{i+1}/{retry_count}]Retry. Unexpected ConnectError" + ) + + +async def test_retry_on_errors_raises_client_http_error() -> None: + class ATestClient(BaseThinClient): + # pylint: disable=no-self-use + @retry_on_errors + async def raises_http_error(self) -> Response: + raise HTTPError("mock_http_error") + + client = ATestClient(request_max_retries=1) + + with pytest.raises(ClientHttpError): + await client.raises_http_error() + + +async def test_methods_do_not_return_response() -> None: + class OKTestClient(BaseThinClient): + async def public_method_ok(self) -> Response: # type: ignore + """this method will be ok even if no code is used""" + + # OK + OKTestClient(request_max_retries=1) + + class FailWrongAnnotationTestClient(BaseThinClient): + async def public_method_wrong_annotation(self) -> None: + """this method will raise an error""" + + with pytest.raises(_WrongReturnType): + FailWrongAnnotationTestClient(request_max_retries=1) + + class FailNoAnnotationTestClient(BaseThinClient): + async def public_method_no_annotation(self): + """this method will raise an error""" + + with pytest.raises(_WrongReturnType): + FailNoAnnotationTestClient(request_max_retries=1) + + +async def test_expect_state_decorator( + test_url: AnyHttpUrl, respx_mock: MockRouter +) -> None: + + url_get_200_ok = f"{test_url}/ok" + get_wrong_state = f"{test_url}/wrong-state" + error_status = codes.NOT_FOUND + + class ATestClient(BaseThinClient): + @expect_status(codes.OK) + async def get_200_ok(self) -> Response: + return await self._client.get(url_get_200_ok) + + @expect_status(error_status) + async def get_wrong_state(self) -> Response: + return await self._client.get(get_wrong_state) + + respx_mock.get(url_get_200_ok).mock(return_value=Response(codes.OK)) + respx_mock.get(get_wrong_state).mock(return_value=Response(codes.OK)) + + test_client = ATestClient(request_max_retries=1) + + # OK + response = await test_client.get_200_ok() + assert response.status_code == codes.OK + + # RAISES EXPECTED ERROR + with pytest.raises(UnexpectedStatusError) as err_info: + await test_client.get_wrong_state() + + assert err_info.value.response.status_code == codes.OK + assert ( + f"{err_info.value}" + == f"Expected status: {error_status}, got {codes.OK} for: {get_wrong_state}: headers=Headers({{}}), body=''" + ) diff --git a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_public.py b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_public.py new file mode 100644 index 00000000000..964009c0951 --- /dev/null +++ b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_public.py @@ -0,0 +1,475 @@ +# pylint:disable=unused-argument +# pylint:disable=redefined-outer-name + +from contextlib import contextmanager +from typing import Any, AsyncIterable, Callable, Iterator, Optional, Type +from unittest.mock import AsyncMock + +import pytest +from _pytest.logging import LogCaptureFixture +from _pytest.monkeypatch import MonkeyPatch +from fastapi import FastAPI, status +from httpx import HTTPError, Response +from pydantic import AnyHttpUrl, parse_obj_as +from pytest_mock import MockerFixture +from simcore_service_director_v2.core.settings import AppSettings +from simcore_service_director_v2.modules.dynamic_sidecar.api_client._errors import ( + ClientHttpError, + UnexpectedStatusError, +) +from simcore_service_director_v2.modules.dynamic_sidecar.api_client._public import ( + DynamicSidecarClient, + get_dynamic_sidecar_client, + setup, + shutdown, +) +from simcore_service_director_v2.modules.dynamic_sidecar.errors import ( + EntrypointContainerNotFoundError, + NodeportsDidNotFindNodeError, +) + +# FIXTURES + + +@pytest.fixture +def dynamic_sidecar_endpoint() -> AnyHttpUrl: + return parse_obj_as(AnyHttpUrl, "http://missing-host:1111") + + +@pytest.fixture +def mock_env(monkeypatch: MonkeyPatch, mock_env: None) -> None: + monkeypatch.setenv("S3_ACCESS_KEY", "") + monkeypatch.setenv("S3_SECRET_KEY", "") + monkeypatch.setenv("S3_BUCKET_NAME", "") + monkeypatch.setenv("R_CLONE_PROVIDER", "MINIO") + + monkeypatch.setenv("POSTGRES_HOST", "") + monkeypatch.setenv("POSTGRES_USER", "") + monkeypatch.setenv("POSTGRES_PASSWORD", "") + monkeypatch.setenv("POSTGRES_DB", "") + + # reduce number of retries to make more reliable + monkeypatch.setenv("DYNAMIC_SIDECAR_API_CLIENT_REQUEST_MAX_RETRIES", "1") + monkeypatch.setenv("S3_ENDPOINT", "") + + +@pytest.fixture +async def dynamic_sidecar_client(mock_env: None) -> AsyncIterable[DynamicSidecarClient]: + app = FastAPI() + app.state.settings = AppSettings.create_from_envs() + + await setup(app) + yield get_dynamic_sidecar_client(app) + await shutdown(app) + + +@pytest.fixture +def retry_count() -> int: + return 2 + + +@pytest.fixture +def raise_retry_count( + monkeypatch: MonkeyPatch, retry_count: int, mock_env: None +) -> None: + monkeypatch.setenv( + "DYNAMIC_SIDECAR_API_CLIENT_REQUEST_MAX_RETRIES", f"{retry_count}" + ) + + +@pytest.fixture +def get_patched_client( + dynamic_sidecar_client: DynamicSidecarClient, mocker: MockerFixture +) -> Callable: + @contextmanager + def wrapper( + method: str, + return_value: Optional[Any] = None, + side_effect: Optional[Callable] = None, + ) -> Iterator[DynamicSidecarClient]: + mocker.patch( + f"simcore_service_director_v2.modules.dynamic_sidecar.api_client._thin.ThinDynamicSidecarClient.{method}", + return_value=return_value, + side_effect=side_effect, + ) + yield dynamic_sidecar_client + + return wrapper + + +# TESTS + + +@pytest.mark.parametrize("is_healthy", [True, False]) +async def test_is_healthy_api_ok( + get_patched_client: Callable, dynamic_sidecar_endpoint: AnyHttpUrl, is_healthy: bool +) -> None: + mock_json = {"is_healthy": is_healthy} + with get_patched_client( + "get_health", + return_value=Response(status_code=status.HTTP_200_OK, json=mock_json), + ) as client: + assert await client.is_healthy(dynamic_sidecar_endpoint) == is_healthy + + +async def test_is_healthy_times_out( + raise_retry_count: None, + dynamic_sidecar_client: DynamicSidecarClient, + dynamic_sidecar_endpoint: AnyHttpUrl, + caplog_info_level: LogCaptureFixture, + retry_count: int, +) -> None: + assert await dynamic_sidecar_client.is_healthy(dynamic_sidecar_endpoint) is False + for i, log_message in enumerate(caplog_info_level.messages): + assert log_message.startswith( + f"[{i+1}/{retry_count}]Retry. Unexpected ConnectError" + ) + + +@pytest.mark.parametrize( + "side_effect", + [ + pytest.param( + UnexpectedStatusError( + Response( + status_code=status.HTTP_400_BAD_REQUEST, + content="some mocked error", + request=AsyncMock(), + ), + status.HTTP_200_OK, + ), + id="UnexpectedStatusError", + ), + pytest.param( + ClientHttpError(HTTPError("another mocked error")), id="HTTPError" + ), + ], +) +async def test_is_healthy_api_error( + get_patched_client: Callable, + dynamic_sidecar_endpoint: AnyHttpUrl, + side_effect: Exception, +) -> None: + with get_patched_client( + "get_health", + side_effect=side_effect, + ) as client: + assert await client.is_healthy(dynamic_sidecar_endpoint) == False + + +async def test_containers_inspect( + get_patched_client: Callable, dynamic_sidecar_endpoint: AnyHttpUrl +) -> None: + mock_json = {"ok": "data"} + with get_patched_client( + "get_containers", + return_value=Response(status_code=status.HTTP_200_OK, json=mock_json), + ) as client: + assert await client.containers_inspect(dynamic_sidecar_endpoint) == mock_json + + +async def test_containers_docker_status_api_ok( + get_patched_client: Callable, dynamic_sidecar_endpoint: AnyHttpUrl +) -> None: + mock_json = {"container_id": {"ok": "data"}} + with get_patched_client( + "get_containers", + return_value=Response(status_code=status.HTTP_200_OK, json=mock_json), + ) as client: + assert ( + await client.containers_docker_status(dynamic_sidecar_endpoint) == mock_json + ) + + +async def test_containers_docker_status_api_error( + get_patched_client: Callable, dynamic_sidecar_endpoint: AnyHttpUrl +) -> None: + with get_patched_client( + "get_containers", + side_effect=UnexpectedStatusError( + Response( + status_code=status.HTTP_400_BAD_REQUEST, + content="some mocked error", + request=AsyncMock(), + ), + status.HTTP_200_OK, + ), + ) as client: + assert await client.containers_docker_status(dynamic_sidecar_endpoint) == {} + + +async def test_start_service_creation( + get_patched_client: Callable, dynamic_sidecar_endpoint: AnyHttpUrl +) -> None: + docker_compose_reply = "a mocked docker compose plain string reply" + with get_patched_client( + "post_containers", + return_value=Response( + status_code=status.HTTP_200_OK, content=docker_compose_reply + ), + ) as client: + assert ( + await client.start_service_creation( + dynamic_sidecar_endpoint, compose_spec="mock compose spec" + ) + == None + ) + + +async def test_begin_service_destruction( + get_patched_client: Callable, dynamic_sidecar_endpoint: AnyHttpUrl +) -> None: + docker_compose_reply = "a mocked docker compose plain string reply" + with get_patched_client( + "post_containers_down", + return_value=Response( + status_code=status.HTTP_200_OK, content=docker_compose_reply + ), + ) as client: + assert await client.begin_service_destruction(dynamic_sidecar_endpoint) == None + + +async def test_service_save_state( + get_patched_client: Callable, dynamic_sidecar_endpoint: AnyHttpUrl +) -> None: + with get_patched_client( + "post_containers_state_save", + return_value=Response(status_code=status.HTTP_204_NO_CONTENT), + ) as client: + assert await client.service_save_state(dynamic_sidecar_endpoint) == None + + +async def test_service_restore_state( + get_patched_client: Callable, dynamic_sidecar_endpoint: AnyHttpUrl +) -> None: + with get_patched_client( + "post_containers_state_restore", + return_value=Response(status_code=status.HTTP_204_NO_CONTENT), + ) as client: + assert await client.service_restore_state(dynamic_sidecar_endpoint) == None + + +@pytest.mark.parametrize("port_keys", [None, ["1", [""], [""]]]) +async def test_service_pull_input_ports( + get_patched_client: Callable, + dynamic_sidecar_endpoint: AnyHttpUrl, + port_keys: Optional[list[str]], +) -> None: + with get_patched_client( + "post_containers_ports_inputs_pull", + return_value=Response(status_code=status.HTTP_200_OK, content="42"), + ) as client: + assert ( + await client.service_pull_input_ports(dynamic_sidecar_endpoint, port_keys) + == 42 + ) + + +async def test_service_disable_dir_watcher( + get_patched_client: Callable, + dynamic_sidecar_endpoint: AnyHttpUrl, +) -> None: + with get_patched_client( + "patch_containers_directory_watcher", + return_value=Response(status_code=status.HTTP_204_NO_CONTENT), + ) as client: + assert ( + await client.service_disable_dir_watcher(dynamic_sidecar_endpoint) == None + ) + + +async def test_service_enable_dir_watcher( + get_patched_client: Callable, + dynamic_sidecar_endpoint: AnyHttpUrl, +) -> None: + with get_patched_client( + "patch_containers_directory_watcher", + return_value=Response(status_code=status.HTTP_204_NO_CONTENT), + ) as client: + assert await client.service_enable_dir_watcher(dynamic_sidecar_endpoint) == None + + +@pytest.mark.parametrize("outputs_labels", [{}, {"ok": "data"}]) +async def test_service_outputs_create_dirs( + get_patched_client: Callable, + dynamic_sidecar_endpoint: AnyHttpUrl, + outputs_labels: dict[str, Any], +) -> None: + with get_patched_client( + "post_containers_ports_outputs_dirs", + return_value=Response(status_code=status.HTTP_204_NO_CONTENT), + ) as client: + assert ( + await client.service_outputs_create_dirs( + dynamic_sidecar_endpoint, outputs_labels + ) + == None + ) + + +@pytest.mark.parametrize("port_keys", [None, ["1", [""], [""]]]) +async def test_service_pull_output_ports( + get_patched_client: Callable, + dynamic_sidecar_endpoint: AnyHttpUrl, + port_keys: Optional[list[str]], +) -> None: + with get_patched_client( + "post_containers_ports_outputs_pull", + return_value=Response(status_code=status.HTTP_200_OK, content="42"), + ) as client: + assert ( + await client.service_pull_output_ports(dynamic_sidecar_endpoint, port_keys) + == 42 + ) + + +@pytest.mark.parametrize("port_keys", [None, ["1", [""], [""]]]) +async def test_service_push_output_ports_ok( + get_patched_client: Callable, + dynamic_sidecar_endpoint: AnyHttpUrl, + port_keys: Optional[list[str]], +) -> None: + with get_patched_client( + "post_containers_ports_outputs_push", + return_value=Response(status_code=status.HTTP_204_NO_CONTENT), + ) as client: + assert ( + await client.service_push_output_ports(dynamic_sidecar_endpoint, port_keys) + == None + ) + + +@pytest.mark.parametrize("port_keys", [None, ["1", [""], [""]]]) +@pytest.mark.parametrize( + "side_effect, expected_error", + [ + pytest.param( + UnexpectedStatusError( + Response( + status_code=status.HTTP_404_NOT_FOUND, + json={ + "code": "dynamic_sidecar.nodeports.node_not_found", + "node_uuid": "mock_node_uuid", + }, + request=AsyncMock(), + ), + status.HTTP_204_NO_CONTENT, + ), + NodeportsDidNotFindNodeError, + id="NodeportsDidNotFindNodeError", + ), + pytest.param( + UnexpectedStatusError( + Response( + status_code=status.HTTP_404_NOT_FOUND, + json={"code": "other"}, + request=AsyncMock(), + ), + status.HTTP_204_NO_CONTENT, + ), + UnexpectedStatusError, + id="UnexpectedStatusError", + ), + ], +) +async def test_service_push_output_ports_api_fail( + get_patched_client: Callable, + dynamic_sidecar_endpoint: AnyHttpUrl, + port_keys: Optional[list[str]], + side_effect: UnexpectedStatusError, + expected_error: Type[Exception], +) -> None: + with get_patched_client( + "post_containers_ports_outputs_push", side_effect=side_effect + ) as client: + with pytest.raises(expected_error): + await client.service_push_output_ports(dynamic_sidecar_endpoint, port_keys) + + +@pytest.mark.parametrize("dynamic_sidecar_network_name", ["a_test_network"]) +async def test_get_entrypoint_container_name_ok( + get_patched_client: Callable, + dynamic_sidecar_endpoint: AnyHttpUrl, + dynamic_sidecar_network_name: str, +) -> None: + with get_patched_client( + "get_containers_name", + return_value=Response(status_code=status.HTTP_200_OK, json="a_test_container"), + ) as client: + assert ( + await client.get_entrypoint_container_name( + dynamic_sidecar_endpoint, dynamic_sidecar_network_name + ) + == "a_test_container" + ) + + +@pytest.mark.parametrize("dynamic_sidecar_network_name", ["a_test_network"]) +async def test_get_entrypoint_container_name_api_not_found( + get_patched_client: Callable, + dynamic_sidecar_endpoint: AnyHttpUrl, + dynamic_sidecar_network_name: str, +) -> None: + with get_patched_client( + "get_containers_name", + side_effect=UnexpectedStatusError( + Response(status_code=status.HTTP_404_NOT_FOUND, request=AsyncMock()), + status.HTTP_204_NO_CONTENT, + ), + ) as client: + with pytest.raises(EntrypointContainerNotFoundError): + await client.get_entrypoint_container_name( + dynamic_sidecar_endpoint, dynamic_sidecar_network_name + ) + + +async def test_restart_containers( + get_patched_client: Callable, + dynamic_sidecar_endpoint: AnyHttpUrl, +) -> None: + with get_patched_client( + "post_containers_restart", + return_value=Response(status_code=status.HTTP_204_NO_CONTENT), + ) as client: + assert await client.restart_containers(dynamic_sidecar_endpoint) == None + + +@pytest.mark.parametrize("network_aliases", [[], ["an-alias"], ["alias-1", "alias-2"]]) +async def test_attach_container_to_network( + get_patched_client: Callable, + dynamic_sidecar_endpoint: AnyHttpUrl, + network_aliases: list[str], +) -> None: + with get_patched_client( + "post_containers_networks_attach", + return_value=Response(status_code=status.HTTP_204_NO_CONTENT), + ) as client: + assert ( + # pylint:disable=protected-access + await client._attach_container_to_network( + dynamic_sidecar_endpoint, + container_id="container_id", + network_id="network_id", + network_aliases=network_aliases, + ) + == None + ) + + +async def test_detach_container_from_network( + get_patched_client: Callable, + dynamic_sidecar_endpoint: AnyHttpUrl, +) -> None: + with get_patched_client( + "post_containers_networks_detach", + return_value=Response(status_code=status.HTTP_204_NO_CONTENT), + ) as client: + assert ( + # pylint:disable=protected-access + await client._detach_container_from_network( + dynamic_sidecar_endpoint, + container_id="container_id", + network_id="network_id", + ) + == None + ) diff --git a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_thin.py b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_thin.py new file mode 100644 index 00000000000..243a404a5cf --- /dev/null +++ b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_thin.py @@ -0,0 +1,383 @@ +# pylint:disable=unused-argument +# pylint:disable=redefined-outer-name + +import json +from typing import Any, Callable, Optional + +import pytest +from _pytest.monkeypatch import MonkeyPatch +from fastapi import FastAPI, status +from httpx import Response +from pydantic import AnyHttpUrl, parse_obj_as +from respx import MockRouter, Route +from respx.types import SideEffectTypes +from simcore_service_director_v2.core.settings import AppSettings +from simcore_service_director_v2.modules.dynamic_sidecar.api_client._thin import ( + ThinDynamicSidecarClient, +) + +# NOTE: typing and callables cannot +MockRequestType = Callable[ + [str, str, Optional[Response], Optional[SideEffectTypes]], Route +] + + +# UTILS + + +def assert_responses(mocked: Response, result: Optional[Response]) -> None: + assert result is not None + assert mocked.status_code == result.status_code + assert mocked.headers == result.headers + assert mocked.text == result.text + + +# FIXTURES + + +@pytest.fixture +def mocked_app(monkeypatch: MonkeyPatch, mock_env: None) -> FastAPI: + monkeypatch.setenv("S3_ENDPOINT", "") + monkeypatch.setenv("S3_ACCESS_KEY", "") + monkeypatch.setenv("S3_SECRET_KEY", "") + monkeypatch.setenv("S3_BUCKET_NAME", "") + monkeypatch.setenv("R_CLONE_PROVIDER", "MINIO") + + monkeypatch.setenv("POSTGRES_HOST", "") + monkeypatch.setenv("POSTGRES_USER", "") + monkeypatch.setenv("POSTGRES_PASSWORD", "") + monkeypatch.setenv("POSTGRES_DB", "") + + # reduce number of retries to make more reliable + monkeypatch.setenv("DYNAMIC_SIDECAR_API_CLIENT_REQUEST_MAX_RETRIES", "1") + + app = FastAPI() + app.state.settings = AppSettings.create_from_envs() + return app + + +@pytest.fixture +def thin_client(mocked_app: FastAPI) -> ThinDynamicSidecarClient: + return ThinDynamicSidecarClient(mocked_app) + + +@pytest.fixture +def dynamic_sidecar_endpoint() -> AnyHttpUrl: + return parse_obj_as(AnyHttpUrl, "http://missing-host:1111") + + +@pytest.fixture +def mock_request( + dynamic_sidecar_endpoint: AnyHttpUrl, respx_mock: MockRouter +) -> MockRequestType: + def request_mock( + method: str, + path: str, + return_value: Optional[Response] = None, + side_effect: Optional[SideEffectTypes] = None, + ) -> Route: + print(f"Mocking {path=}") + return respx_mock.request( + method=method, url=f"{dynamic_sidecar_endpoint}{path}" + ).mock(return_value=return_value, side_effect=side_effect) + + return request_mock + + +# TESTS + + +async def test_get_health( + thin_client: ThinDynamicSidecarClient, + dynamic_sidecar_endpoint: AnyHttpUrl, + mock_request: MockRequestType, +) -> None: + mock_response = Response(status.HTTP_200_OK) + mock_request("GET", "/health", mock_response, None) + + response = await thin_client.get_health(dynamic_sidecar_endpoint) + assert_responses(mock_response, response) + + +@pytest.mark.parametrize("only_status", [False, True]) +async def test_get_containers( + thin_client: ThinDynamicSidecarClient, + dynamic_sidecar_endpoint: AnyHttpUrl, + mock_request: MockRequestType, + only_status: bool, +) -> None: + mock_response = Response(status.HTTP_200_OK) + mock_request( + "GET", + f"{dynamic_sidecar_endpoint}/{thin_client.API_VERSION}/containers?only_status={str(only_status).lower()}", + mock_response, + None, + ) + + response = await thin_client.get_containers( + dynamic_sidecar_endpoint, only_status=only_status + ) + assert_responses(mock_response, response) + + +async def test_post_containers( + thin_client: ThinDynamicSidecarClient, + dynamic_sidecar_endpoint: AnyHttpUrl, + mock_request: MockRequestType, +) -> None: + mock_response = Response(status.HTTP_202_ACCEPTED) + mock_request( + "POST", + f"{dynamic_sidecar_endpoint}/{thin_client.API_VERSION}/containers", + mock_response, + None, + ) + + response = await thin_client.post_containers( + dynamic_sidecar_endpoint, compose_spec="some_fake_compose_as_str" + ) + assert_responses(mock_response, response) + + +async def test_post_containers_down( + thin_client: ThinDynamicSidecarClient, + dynamic_sidecar_endpoint: AnyHttpUrl, + mock_request: MockRequestType, +) -> None: + mock_response = Response(status.HTTP_200_OK) + mock_request( + "POST", + f"{dynamic_sidecar_endpoint}/{thin_client.API_VERSION}/containers:down", + mock_response, + None, + ) + + response = await thin_client.post_containers_down(dynamic_sidecar_endpoint) + assert_responses(mock_response, response) + + +async def test_post_containers_state_save( + thin_client: ThinDynamicSidecarClient, + dynamic_sidecar_endpoint: AnyHttpUrl, + mock_request: MockRequestType, +) -> None: + mock_response = Response(status.HTTP_204_NO_CONTENT) + mock_request( + "POST", + f"{dynamic_sidecar_endpoint}/{thin_client.API_VERSION}/containers/state:save", + mock_response, + None, + ) + + response = await thin_client.post_containers_state_save(dynamic_sidecar_endpoint) + assert_responses(mock_response, response) + + +async def test_post_containers_state_restore( + thin_client: ThinDynamicSidecarClient, + dynamic_sidecar_endpoint: AnyHttpUrl, + mock_request: MockRequestType, +) -> None: + mock_response = Response(status.HTTP_204_NO_CONTENT) + mock_request( + "POST", + f"{dynamic_sidecar_endpoint}/{thin_client.API_VERSION}/containers/state:restore", + mock_response, + None, + ) + + response = await thin_client.post_containers_state_restore(dynamic_sidecar_endpoint) + assert_responses(mock_response, response) + + +@pytest.mark.parametrize("port_keys", [None, ["1", "2"], []]) +async def test_post_containers_ports_inputs_pull( + thin_client: ThinDynamicSidecarClient, + dynamic_sidecar_endpoint: AnyHttpUrl, + mock_request: MockRequestType, + port_keys: Optional[list[str]], +) -> None: + mock_response = Response(status.HTTP_200_OK) + mock_request( + "POST", + f"{dynamic_sidecar_endpoint}/{thin_client.API_VERSION}/containers/ports/inputs:pull", + mock_response, + None, + ) + + response = await thin_client.post_containers_ports_inputs_pull( + dynamic_sidecar_endpoint, port_keys=port_keys + ) + assert_responses(mock_response, response) + + +@pytest.mark.parametrize("is_enabled", [False, True]) +async def test_post_patch_containers_directory_watcher( + thin_client: ThinDynamicSidecarClient, + dynamic_sidecar_endpoint: AnyHttpUrl, + mock_request: MockRequestType, + is_enabled: bool, +) -> None: + mock_response = Response(status.HTTP_204_NO_CONTENT) + mock_request( + "PATCH", + f"{dynamic_sidecar_endpoint}/{thin_client.API_VERSION}/containers/directory-watcher", + mock_response, + None, + ) + + response = await thin_client.patch_containers_directory_watcher( + dynamic_sidecar_endpoint, is_enabled=is_enabled + ) + assert_responses(mock_response, response) + + +@pytest.mark.parametrize("outputs_labels", [{}, {"some": "data"}]) +async def test_post_containers_ports_outputs_dirs( + thin_client: ThinDynamicSidecarClient, + dynamic_sidecar_endpoint: AnyHttpUrl, + mock_request: MockRequestType, + outputs_labels: dict[str, Any], +) -> None: + mock_response = Response(status.HTTP_204_NO_CONTENT) + mock_request( + "POST", + f"{dynamic_sidecar_endpoint}/{thin_client.API_VERSION}/containers/ports/outputs/dirs", + mock_response, + None, + ) + + response = await thin_client.post_containers_ports_outputs_dirs( + dynamic_sidecar_endpoint, outputs_labels=outputs_labels + ) + assert_responses(mock_response, response) + + +@pytest.mark.parametrize("port_keys", [None, ["1", "2"], []]) +async def test_post_containers_ports_outputs_pull( + thin_client: ThinDynamicSidecarClient, + dynamic_sidecar_endpoint: AnyHttpUrl, + mock_request: MockRequestType, + port_keys: Optional[list[str]], +) -> None: + mock_response = Response(status.HTTP_200_OK) + mock_request( + "POST", + f"{dynamic_sidecar_endpoint}/{thin_client.API_VERSION}/containers/ports/outputs:pull", + mock_response, + None, + ) + + response = await thin_client.post_containers_ports_outputs_pull( + dynamic_sidecar_endpoint, port_keys=port_keys + ) + assert_responses(mock_response, response) + + +@pytest.mark.parametrize("port_keys", [None, ["1", "2"], []]) +async def test_post_containers_ports_outputs_push( + thin_client: ThinDynamicSidecarClient, + dynamic_sidecar_endpoint: AnyHttpUrl, + mock_request: MockRequestType, + port_keys: Optional[list[str]], +) -> None: + mock_response = Response(status.HTTP_204_NO_CONTENT) + mock_request( + "POST", + f"{dynamic_sidecar_endpoint}/{thin_client.API_VERSION}/containers/ports/outputs:push", + mock_response, + None, + ) + + response = await thin_client.post_containers_ports_outputs_push( + dynamic_sidecar_endpoint, port_keys=port_keys + ) + assert_responses(mock_response, response) + + +@pytest.mark.parametrize("dynamic_sidecar_network_name", ["test_nw_name"]) +async def test_get_containers_name( + thin_client: ThinDynamicSidecarClient, + dynamic_sidecar_endpoint: AnyHttpUrl, + mock_request: MockRequestType, + dynamic_sidecar_network_name: str, +) -> None: + mock_response = Response(status.HTTP_200_OK) + encoded_filters = json.dumps(dict(network=dynamic_sidecar_network_name)) + mock_request( + "GET", + ( + f"{dynamic_sidecar_endpoint}/{thin_client.API_VERSION}" + f"/containers/name?filters={encoded_filters}" + ), + mock_response, + None, + ) + + response = await thin_client.get_containers_name( + dynamic_sidecar_endpoint, + dynamic_sidecar_network_name=dynamic_sidecar_network_name, + ) + assert_responses(mock_response, response) + + +async def test_post_containers_restart( + thin_client: ThinDynamicSidecarClient, + dynamic_sidecar_endpoint: AnyHttpUrl, + mock_request: MockRequestType, +) -> None: + mock_response = Response(status.HTTP_204_NO_CONTENT) + mock_request( + "POST", + f"{dynamic_sidecar_endpoint}/{thin_client.API_VERSION}/containers:restart", + mock_response, + None, + ) + + response = await thin_client.post_containers_restart(dynamic_sidecar_endpoint) + assert_responses(mock_response, response) + + +@pytest.mark.parametrize("network_aliases", [[], ["an_alias"], ["multuple_aliases"]]) +async def test_post_containers_networks_attach( + thin_client: ThinDynamicSidecarClient, + dynamic_sidecar_endpoint: AnyHttpUrl, + mock_request: MockRequestType, + network_aliases: list[str], +) -> None: + mock_response = Response(status.HTTP_204_NO_CONTENT) + container_id = "a_container_id" + mock_request( + "POST", + f"{dynamic_sidecar_endpoint}/{thin_client.API_VERSION}/containers/{container_id}/networks:attach", + mock_response, + None, + ) + + response = await thin_client.post_containers_networks_attach( + dynamic_sidecar_endpoint, + container_id=container_id, + network_id="network_id", + network_aliases=network_aliases, + ) + assert_responses(mock_response, response) + + +async def test_post_containers_networks_detach( + thin_client: ThinDynamicSidecarClient, + dynamic_sidecar_endpoint: AnyHttpUrl, + mock_request: MockRequestType, +) -> None: + mock_response = Response(status.HTTP_204_NO_CONTENT) + container_id = "a_container_id" + mock_request( + "POST", + f"{dynamic_sidecar_endpoint}/{thin_client.API_VERSION}/containers/{container_id}/networks:detach", + mock_response, + None, + ) + + response = await thin_client.post_containers_networks_detach( + dynamic_sidecar_endpoint, container_id=container_id, network_id="network_id" + ) + assert_responses(mock_response, response) diff --git a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_scheduler.py b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_scheduler.py index f10e1461182..c00fa0f46f2 100644 --- a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_scheduler.py +++ b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_scheduler.py @@ -25,7 +25,6 @@ SchedulerData, ServiceState, ) -from simcore_service_director_v2.modules.dynamic_sidecar.client_api import get_url from simcore_service_director_v2.modules.dynamic_sidecar.errors import ( DynamicSidecarError, DynamicSidecarNotFoundError, @@ -52,6 +51,10 @@ pytest_simcore_ops_services_selection = ["adminer"] +def get_url(dynamic_sidecar_endpoint: str, postfix: str) -> str: + return f"{dynamic_sidecar_endpoint}{postfix}" + + # UTILS @contextmanager def _mock_containers_docker_status( @@ -129,6 +132,8 @@ def mock_env( monkeypatch.setenv("POSTGRES_USER", "test") monkeypatch.setenv("POSTGRES_PASSWORD", "test") monkeypatch.setenv("POSTGRES_DB", "test") + # NOTE: makes retries go faster + monkeypatch.setenv("DYNAMIC_SIDECAR_API_CLIENT_REQUEST_MAX_RETRIES", "1") @pytest.fixture @@ -184,7 +189,7 @@ def scheduler_data(scheduler_data_from_http_request: SchedulerData) -> Scheduler @pytest.fixture -def mocked_client_api(scheduler_data: SchedulerData) -> Iterator[MockRouter]: +def mocked_api_client(scheduler_data: SchedulerData) -> Iterator[MockRouter]: service_endpoint = scheduler_data.dynamic_sidecar.endpoint with respx.mock as mock: mock.get(get_url(service_endpoint, "/health"), name="is_healthy").respond( @@ -270,7 +275,8 @@ async def test_scheduler_add_remove( manually_trigger_scheduler: Callable[[], Awaitable[None]], scheduler: DynamicSidecarsScheduler, scheduler_data: SchedulerData, - mocked_client_api: MockRouter, + mocked_api_client: MockRouter, + docker_swarm: None, mocked_dynamic_scheduler_events: None, mock_update_label: None, ) -> None: