Skip to content

Commit

Permalink
Merge branch 'master' into fix/e2e-open-with-resources
Browse files Browse the repository at this point in the history
  • Loading branch information
odeimaiz authored Jul 24, 2023
2 parents cd8581a + 351f1e4 commit 3e9322f
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import logging
from typing import Annotated, Final

from fastapi import APIRouter, Depends, HTTPException, status
from models_library.projects_nodes import NodeID
from pydantic import BaseModel
from pydantic import BaseModel, PositiveInt
from servicelib.fastapi.long_running_tasks.client import (
ProgressMessage,
ProgressPercent,
Expand All @@ -13,11 +16,20 @@
get_tasks_manager,
start_task,
)
from tenacity import retry
from tenacity.before_sleep import before_sleep_log
from tenacity.retry import retry_if_result
from tenacity.stop import stop_after_delay
from tenacity.wait import wait_random_exponential

from ...modules.dynamic_sidecar.scheduler import DynamicSidecarsScheduler
from ...utils.routes import NoContentResponse
from ..dependencies.dynamic_sidecar import get_dynamic_sidecar_scheduler

_logger = logging.getLogger(__name__)

_MINUTE: Final[PositiveInt] = 60


class ObservationItem(BaseModel):
is_disabled: bool
Expand All @@ -26,6 +38,23 @@ class ObservationItem(BaseModel):
router = APIRouter()


@retry(
wait=wait_random_exponential(max=10),
stop=stop_after_delay(1 * _MINUTE),
retry=retry_if_result(lambda result: result is False),
reraise=False,
before_sleep=before_sleep_log(_logger, logging.WARNING, exc_info=True),
)
def _toggle_observation_succeeded(
dynamic_sidecars_scheduler: DynamicSidecarsScheduler,
node_uuid: NodeID,
*,
is_disabled: bool,
) -> bool:
# returns True if the `toggle_observation` operation succeeded
return dynamic_sidecars_scheduler.toggle_observation(node_uuid, is_disabled)


@router.patch(
"/services/{node_uuid}/observation",
summary="Enable/disable observation of the service",
Expand All @@ -34,12 +63,14 @@ class ObservationItem(BaseModel):
async def update_service_observation(
node_uuid: NodeID,
observation_item: ObservationItem,
dynamic_sidecars_scheduler: DynamicSidecarsScheduler = Depends(
get_dynamic_sidecar_scheduler
),
dynamic_sidecars_scheduler: Annotated[
DynamicSidecarsScheduler, Depends(get_dynamic_sidecar_scheduler)
],
) -> NoContentResponse:
if dynamic_sidecars_scheduler.toggle_observation(
node_uuid, observation_item.is_disabled
if _toggle_observation_succeeded(
dynamic_sidecars_scheduler=dynamic_sidecars_scheduler,
node_uuid=node_uuid,
is_disabled=observation_item.is_disabled,
):
return NoContentResponse()

Expand All @@ -62,10 +93,10 @@ async def update_service_observation(
)
async def delete_service_containers(
node_uuid: NodeID,
tasks_manager: TasksManager = Depends(get_tasks_manager),
dynamic_sidecars_scheduler: DynamicSidecarsScheduler = Depends(
get_dynamic_sidecar_scheduler
),
tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)],
dynamic_sidecars_scheduler: Annotated[
DynamicSidecarsScheduler, Depends(get_dynamic_sidecar_scheduler)
],
):
async def _task_remove_service_containers(
task_progress: TaskProgress, node_uuid: NodeID
Expand All @@ -80,13 +111,12 @@ async def _progress_callback(
)

try:
task_id = start_task(
return start_task(
tasks_manager,
task=_task_remove_service_containers,
unique=True,
node_uuid=node_uuid,
)
return task_id
except TaskAlreadyRunningError as e:
raise HTTPException(status.HTTP_409_CONFLICT, detail=f"{e}") from e

Expand All @@ -104,10 +134,10 @@ async def _progress_callback(
)
async def save_service_state(
node_uuid: NodeID,
tasks_manager: TasksManager = Depends(get_tasks_manager),
dynamic_sidecars_scheduler: DynamicSidecarsScheduler = Depends(
get_dynamic_sidecar_scheduler
),
tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)],
dynamic_sidecars_scheduler: Annotated[
DynamicSidecarsScheduler, Depends(get_dynamic_sidecar_scheduler)
],
):
async def _task_save_service_state(
task_progress: TaskProgress,
Expand All @@ -123,13 +153,12 @@ async def _progress_callback(
)

try:
task_id = start_task(
return start_task(
tasks_manager,
task=_task_save_service_state,
unique=True,
node_uuid=node_uuid,
)
return task_id
except TaskAlreadyRunningError as e:
raise HTTPException(status.HTTP_409_CONFLICT, detail=f"{e}") from e

Expand All @@ -147,10 +176,10 @@ async def _progress_callback(
)
async def push_service_outputs(
node_uuid: NodeID,
tasks_manager: TasksManager = Depends(get_tasks_manager),
dynamic_sidecars_scheduler: DynamicSidecarsScheduler = Depends(
get_dynamic_sidecar_scheduler
),
tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)],
dynamic_sidecars_scheduler: Annotated[
DynamicSidecarsScheduler, Depends(get_dynamic_sidecar_scheduler)
],
):
async def _task_push_service_outputs(
task_progress: TaskProgress, node_uuid: NodeID
Expand All @@ -165,13 +194,12 @@ async def _progress_callback(
)

try:
task_id = start_task(
return start_task(
tasks_manager,
task=_task_push_service_outputs,
unique=True,
node_uuid=node_uuid,
)
return task_id
except TaskAlreadyRunningError as e:
raise HTTPException(status.HTTP_409_CONFLICT, detail=f"{e}") from e

Expand All @@ -189,10 +217,10 @@ async def _progress_callback(
)
async def delete_service_docker_resources(
node_uuid: NodeID,
tasks_manager: TasksManager = Depends(get_tasks_manager),
dynamic_sidecars_scheduler: DynamicSidecarsScheduler = Depends(
get_dynamic_sidecar_scheduler
),
tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)],
dynamic_sidecars_scheduler: Annotated[
DynamicSidecarsScheduler, Depends(get_dynamic_sidecar_scheduler)
],
):
async def _task_cleanup_service_docker_resources(
task_progress: TaskProgress, node_uuid: NodeID
Expand All @@ -202,12 +230,11 @@ async def _task_cleanup_service_docker_resources(
)

try:
task_id = start_task(
return start_task(
tasks_manager,
task=_task_cleanup_service_docker_resources,
unique=True,
node_uuid=node_uuid,
)
return task_id
except TaskAlreadyRunningError as e:
raise HTTPException(status.HTTP_409_CONFLICT, detail=f"{e}") from e
37 changes: 25 additions & 12 deletions services/director-v2/tests/unit/test_api_route_dynamic_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@
# pylint: disable=unused-variable

import asyncio
from typing import AsyncIterator
from collections.abc import AsyncIterator

import pytest
import respx
from fastapi import status
from httpx import Response
from models_library.service_settings_labels import SimcoreServiceLabels
from pytest import MonkeyPatch
from pytest_mock.plugin import MockerFixture
from pytest_simcore.helpers.typing_env import EnvVarsDict
from requests import Response
from simcore_service_director_v2.models.domains.dynamic_services import (
DynamicServiceCreate,
)
Expand All @@ -29,11 +28,11 @@


@pytest.fixture
def mock_env(
def mock_env( # noqa: PT004
disable_rabbitmq: None,
disable_postgres: None,
mock_env: EnvVarsDict,
monkeypatch: MonkeyPatch,
monkeypatch: pytest.MonkeyPatch,
docker_swarm: None,
) -> None:
monkeypatch.setenv("SC_BOOT_MODE", "default")
Expand All @@ -57,13 +56,23 @@ def dynamic_sidecar_scheduler(client: TestClient) -> DynamicSidecarsScheduler:


@pytest.fixture
async def mock_sidecar_api(scheduler_data: SchedulerData) -> AsyncIterator[None]:
def mock_apply_observation_cycle(mocker: MockerFixture) -> None: # noqa: PT004
module_base = (
"simcore_service_director_v2.modules.dynamic_sidecar.scheduler._core._observer"
)
mocker.patch(f"{module_base}._apply_observation_cycle", autospec=True)


@pytest.fixture
async def mock_sidecar_api( # noqa: PT004
scheduler_data: SchedulerData,
) -> AsyncIterator[None]:
with respx.mock(
assert_all_called=False,
assert_all_mocked=True,
) as respx_mock:
respx_mock.get(f"{scheduler_data.endpoint}/health", name="is_healthy").respond(
json=dict(is_healthy=True)
json={"is_healthy": True}
)

yield
Expand All @@ -89,13 +98,13 @@ async def observed_service(
can_save,
)
# pylint:disable=protected-access
return dynamic_sidecar_scheduler._scheduler.get_scheduler_data(
return dynamic_sidecar_scheduler._scheduler.get_scheduler_data( # noqa: SLF001
dynamic_service_create.node_uuid
)


@pytest.fixture
def mock_scheduler_service_shutdown_tasks(mocker: MockerFixture) -> None:
def mock_scheduler_service_shutdown_tasks(mocker: MockerFixture) -> None: # noqa: PT004
module_base = "simcore_service_director_v2.modules.dynamic_sidecar.scheduler._core._events_utils"
mocker.patch(f"{module_base}.service_push_outputs", autospec=True)
mocker.patch(f"{module_base}.service_remove_containers", autospec=True)
Expand All @@ -112,17 +121,20 @@ async def test_update_service_observation_node_not_found(
with pytest.raises(DynamicSidecarNotFoundError):
client.patch(
f"/v2/dynamic_scheduler/services/{scheduler_data.node_uuid}/observation",
json=dict(is_disabled=False),
json={"is_disabled": False},
)


async def test_update_service_observation(
mock_sidecar_api: None, client: TestClient, observed_service: SchedulerData
mock_apply_observation_cycle: None,
mock_sidecar_api: None,
client: TestClient,
observed_service: SchedulerData,
):
def _toggle(*, is_disabled: bool) -> Response:
return client.patch(
f"/v2/dynamic_scheduler/services/{observed_service.node_uuid}/observation",
json=dict(is_disabled=is_disabled),
json={"is_disabled": is_disabled},
)

# trying to lock the service
Expand Down Expand Up @@ -170,6 +182,7 @@ def _toggle(*, is_disabled: bool) -> Response:
],
)
async def test_409_response(
mock_apply_observation_cycle: None,
mock_scheduler_service_shutdown_tasks: None,
client: TestClient,
observed_service: SchedulerData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@
# pylint:disable=redefined-outer-name
# pylint:disable=unused-argument

from typing import AsyncIterator
from collections.abc import AsyncIterator
from unittest.mock import AsyncMock

import pytest
from fastapi import FastAPI
from pytest import MonkeyPatch
from pytest_mock.plugin import MockerFixture
from pytest_simcore.helpers.utils_envs import EnvVarsDict, setenvs_from_dict
from simcore_service_director_v2.core.settings import AppSettings
Expand All @@ -27,15 +26,17 @@


@pytest.fixture
def disable_observation(mocker: MockerFixture) -> None:
def disable_observation(mocker: MockerFixture) -> None: # noqa: PT004
mocker.patch(
"simcore_service_director_v2.modules.dynamic_sidecar.scheduler._task.DynamicSidecarsScheduler.start",
autospec=True,
)


@pytest.fixture
def mock_are_sidecar_and_proxy_services_present(mocker: MockerFixture) -> None:
def mock_are_sidecar_and_proxy_services_present( # noqa: PT004
mocker: MockerFixture,
) -> None:
mocker.patch(
"simcore_service_director_v2.modules.dynamic_sidecar.scheduler._core._observer.are_sidecar_and_proxy_services_present",
autospec=True,
Expand All @@ -44,7 +45,7 @@ def mock_are_sidecar_and_proxy_services_present(mocker: MockerFixture) -> None:


@pytest.fixture
def mock_events(mocker: MockerFixture) -> None:
def mock_events(mocker: MockerFixture) -> None: # noqa: PT004
for event_to_mock in (
"CreateSidecars",
"WaitForSidecarAPI",
Expand All @@ -63,11 +64,11 @@ def mock_events(mocker: MockerFixture) -> None:


@pytest.fixture
def mock_env(
def mock_env( # noqa: PT004
disable_postgres: None,
docker_swarm: None,
mock_env: EnvVarsDict,
monkeypatch: MonkeyPatch,
monkeypatch: pytest.MonkeyPatch,
) -> None:
setenvs_from_dict(
monkeypatch,
Expand Down Expand Up @@ -111,7 +112,7 @@ def _is_observation_task_present(
) -> bool:
return (
scheduler_data_from_http_request.service_name
in dynamic_sidecar_scheduler._scheduler._service_observation_task
in dynamic_sidecar_scheduler._scheduler._service_observation_task # noqa: SLF001
)


Expand All @@ -125,7 +126,7 @@ async def test_regression_break_endless_loop_cancellation_edge_case(
can_save: bool | None,
):
# in this situation the scheduler would never end loops forever
await dynamic_sidecar_scheduler._scheduler._add_service(
await dynamic_sidecar_scheduler._scheduler._add_service( # noqa: SLF001
scheduler_data_from_http_request
)

Expand Down Expand Up @@ -153,7 +154,7 @@ async def test_regression_break_endless_loop_cancellation_edge_case(
)

# requires an extra pass to remove the service
for _ in range(2):
for _ in range(3):
await _apply_observation_cycle(
dynamic_sidecar_scheduler, scheduler_data_from_http_request
)
Expand Down

0 comments on commit 3e9322f

Please sign in to comment.