Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

♻️ director-v2 uses one dy-sidecar API client per sidecar #4085

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
from fastapi import Request

from ...core.settings import DynamicSidecarSettings
from ...modules.dynamic_sidecar.api_client import DynamicSidecarClient
from ...modules.dynamic_sidecar.scheduler import DynamicSidecarsScheduler


def get_dynamic_sidecar_client(request: Request) -> DynamicSidecarClient:
return request.app.state.dynamic_sidecar_api_client


def get_dynamic_sidecar_scheduler(request: Request) -> DynamicSidecarsScheduler:
return request.app.state.dynamic_sidecar_scheduler

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import sys
from contextlib import asynccontextmanager
from enum import Enum
from typing import AsyncIterator, Optional
from typing import AsyncIterator

import typer
from fastapi import FastAPI, status
Expand Down Expand Up @@ -91,8 +91,6 @@ async def async_project_save_state(project_id: ProjectID, save_attempts: int) ->
project_at_db = await projects_repository.get_project(project_id)

typer.echo(f"Saving project '{project_at_db.uuid}' - '{project_at_db.name}'")

dynamic_sidecar_client = api_client.get_dynamic_sidecar_client(app)
nodes_failed_to_save: list[NodeIDStr] = []
for node_uuid, node_content in project_at_db.workbench.items():
# onl dynamic-sidecars are used
Expand All @@ -106,7 +104,7 @@ async def async_project_save_state(project_id: ProjectID, save_attempts: int) ->
try:
await _save_node_state(
app,
dynamic_sidecar_client,
api_client.get_dynamic_sidecar_client(app, node_uuid),
save_attempts,
node_uuid,
node_content.label,
Expand Down Expand Up @@ -146,7 +144,7 @@ class RenderData(BaseModel):

async def _get_dy_service_state(
client: AsyncClient, node_uuid: NodeIDStr
) -> Optional[DynamicServiceGet]:
) -> DynamicServiceGet | None:
try:
result = await client.get(
f"http://localhost:8000/v2/dynamic_services/{node_uuid}", # NOSONAR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
DynamicSidecarClient,
get_dynamic_sidecar_client,
get_dynamic_sidecar_service_health,
remove_dynamic_sidecar_client,
setup,
shutdown,
)
Expand All @@ -13,6 +14,7 @@
"DynamicSidecarClient",
"get_dynamic_sidecar_client",
"get_dynamic_sidecar_service_health",
"remove_dynamic_sidecar_client",
"setup",
"shutdown",
"UnexpectedStatusError",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import logging
from collections import deque
from functools import cached_property
from typing import Any, Final, Optional
from typing import Any, Final

from fastapi import FastAPI, status
from httpx import AsyncClient
from models_library.projects import ProjectID
from models_library.projects_networks import DockerNetworkAlias
from models_library.projects_nodes_io import NodeID
from pydantic import AnyHttpUrl, PositiveFloat
from servicelib.fastapi.long_running_tasks.client import (
Client,
Expand All @@ -16,6 +17,7 @@
TaskId,
periodic_task_result,
)
from servicelib.logging_utils import log_context
from servicelib.utils import logged_gather
from simcore_service_director_v2.core.settings import DynamicSidecarSettings

Expand Down Expand Up @@ -257,8 +259,8 @@ async def _await_for_result(
task_id: TaskId,
dynamic_sidecar_endpoint: AnyHttpUrl,
task_timeout: PositiveFloat,
progress_callback: Optional[ProgressCallback] = None,
) -> Optional[Any]:
progress_callback: ProgressCallback | None = None,
) -> Any | None:
async with periodic_task_result(
self._get_client(dynamic_sidecar_endpoint),
task_id,
Expand All @@ -273,7 +275,7 @@ async def create_containers(
self,
dynamic_sidecar_endpoint: AnyHttpUrl,
compose_spec: str,
progress_callback: Optional[ProgressCallback] = None,
progress_callback: ProgressCallback | None = None,
) -> None:
response = await self._thin_client.post_containers_tasks(
dynamic_sidecar_endpoint, compose_spec=compose_spec
Expand All @@ -290,7 +292,7 @@ async def create_containers(
async def stop_service(
self,
dynamic_sidecar_endpoint: AnyHttpUrl,
progress_callback: Optional[ProgressCallback] = None,
progress_callback: ProgressCallback | None = None,
) -> None:
response = await self._thin_client.post_containers_tasks_down(
dynamic_sidecar_endpoint
Expand Down Expand Up @@ -320,7 +322,7 @@ async def restore_service_state(self, dynamic_sidecar_endpoint: AnyHttpUrl) -> N
async def save_service_state(
self,
dynamic_sidecar_endpoint: AnyHttpUrl,
progress_callback: Optional[ProgressCallback] = None,
progress_callback: ProgressCallback | None = None,
) -> None:
response = await self._thin_client.post_containers_tasks_state_save(
dynamic_sidecar_endpoint
Expand All @@ -337,7 +339,7 @@ async def save_service_state(
async def pull_service_input_ports(
self,
dynamic_sidecar_endpoint: AnyHttpUrl,
port_keys: Optional[list[str]] = None,
port_keys: list[str] | None = None,
) -> int:
response = await self._thin_client.post_containers_tasks_ports_inputs_pull(
dynamic_sidecar_endpoint, port_keys
Expand All @@ -355,7 +357,7 @@ async def pull_service_input_ports(
async def pull_service_output_ports(
self,
dynamic_sidecar_endpoint: AnyHttpUrl,
port_keys: Optional[list[str]] = None,
port_keys: list[str] | None = None,
) -> None:
response = await self._thin_client.post_containers_tasks_ports_outputs_pull(
dynamic_sidecar_endpoint, port_keys
Expand All @@ -372,7 +374,7 @@ async def pull_service_output_ports(
async def push_service_output_ports(
self,
dynamic_sidecar_endpoint: AnyHttpUrl,
progress_callback: Optional[ProgressCallback] = None,
progress_callback: ProgressCallback | None = None,
) -> None:
response = await self._thin_client.post_containers_tasks_ports_outputs_push(
dynamic_sidecar_endpoint
Expand Down Expand Up @@ -401,28 +403,43 @@ async def restart_containers(self, dynamic_sidecar_endpoint: AnyHttpUrl) -> None


async def setup(app: FastAPI) -> None:
logger.debug("dynamic-sidecar api client setup")
app.state.dynamic_sidecar_api_client = DynamicSidecarClient(app)
with log_context(logger, logging.DEBUG, "dynamic-sidecar api client setup"):
app.state.dynamic_sidecar_api_clients: dict[str, DynamicSidecarClient] = {}


async def shutdown(app: FastAPI) -> None:
logger.debug("dynamic-sidecar api client closing...")
client: Optional[DynamicSidecarClient]
if client := app.state.dynamic_sidecar_api_client:
await client._thin_client.close() # pylint: disable=protected-access
with log_context(logger, logging.DEBUG, "dynamic-sidecar api client closing..."):
await logged_gather(
GitHK marked this conversation as resolved.
Show resolved Hide resolved
*(
x._thin_client.close() # pylint: disable=protected-access
for x in app.state.dynamic_sidecar_api_clients.values()
),
reraise=False,
)


def get_dynamic_sidecar_client(
app: FastAPI, node_id: str | NodeID
) -> DynamicSidecarClient:
str_node_id = f"{node_id}"

if str_node_id not in app.state.dynamic_sidecar_api_clients:
app.state.dynamic_sidecar_api_clients[str_node_id] = DynamicSidecarClient(app)

return app.state.dynamic_sidecar_api_clients[str_node_id]


def get_dynamic_sidecar_client(app: FastAPI) -> DynamicSidecarClient:
assert app.state.dynamic_sidecar_api_client # nosec
return app.state.dynamic_sidecar_api_client
def remove_dynamic_sidecar_client(app: FastAPI, node_id: NodeID) -> None:
app.state.dynamic_sidecar_api_clients.pop(f"{node_id}", None)


async def get_dynamic_sidecar_service_health(
app: FastAPI, scheduler_data: SchedulerData, *, with_retry: bool = True
) -> bool:
api_client = get_dynamic_sidecar_client(app)
service_endpoint = scheduler_data.endpoint
api_client = get_dynamic_sidecar_client(app, scheduler_data.node_uuid)

# update service health
is_healthy = await api_client.is_healthy(service_endpoint, with_retry=with_retry)
is_healthy = await api_client.is_healthy(
scheduler_data.endpoint, with_retry=with_retry
)
return is_healthy
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# pylint: disable=relative-beyond-top-level

import logging
from typing import Any, Final, Optional, cast
from typing import Any, Final, cast
from uuid import uuid4

from fastapi import FastAPI
Expand Down Expand Up @@ -137,7 +137,7 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None:
)

node_uuid_str = NodeIDStr(scheduler_data.node_uuid)
node: Optional[Node] = project.workbench.get(node_uuid_str)
node: Node | None = project.workbench.get(node_uuid_str)
boot_options = (
node.boot_options
if node is not None and node.boot_options is not None
Expand Down Expand Up @@ -320,7 +320,9 @@ async def will_trigger(cls, app: FastAPI, scheduler_data: SchedulerData) -> bool

@classmethod
async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None:
dynamic_sidecar_client = get_dynamic_sidecar_client(app)
dynamic_sidecar_client = get_dynamic_sidecar_client(
app, scheduler_data.node_uuid
)
dynamic_sidecar_endpoint = scheduler_data.endpoint
dynamic_sidecar_settings: DynamicSidecarSettings = (
app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR
Expand Down Expand Up @@ -425,7 +427,9 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None:
dynamic_sidecar_settings: DynamicSidecarSettings = (
app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR
)
dynamic_sidecar_client = get_dynamic_sidecar_client(app)
dynamic_sidecar_client = get_dynamic_sidecar_client(
app, scheduler_data.node_uuid
)
dynamic_sidecar_endpoint = scheduler_data.endpoint

# check values have been set by previous step
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
import logging
from collections import deque
from typing import Any, Deque, Final, Optional, cast
from typing import Any, Deque, Final, cast

from fastapi import FastAPI
from models_library.projects_networks import ProjectsNetworks
Expand Down Expand Up @@ -43,6 +43,7 @@
DynamicSidecarClient,
get_dynamic_sidecar_client,
get_dynamic_sidecar_service_health,
remove_dynamic_sidecar_client,
)
from ...docker_api import (
get_projects_networks_containers,
Expand Down Expand Up @@ -70,7 +71,7 @@ def get_director_v0_client(app: FastAPI) -> DirectorV0Client:


def parse_containers_inspect(
containers_inspect: Optional[dict[str, Any]]
containers_inspect: dict[str, Any] | None
) -> list[DockerContainerInspect]:
results: Deque[DockerContainerInspect] = deque()

Expand Down Expand Up @@ -103,7 +104,7 @@ async def service_remove_containers(
app: FastAPI,
node_uuid: NodeID,
dynamic_sidecar_client: DynamicSidecarClient,
progress_callback: Optional[ProgressCallback] = None,
progress_callback: ProgressCallback | None = None,
) -> None:
scheduler_data: SchedulerData = _get_scheduler_data(app, node_uuid)

Expand All @@ -126,7 +127,7 @@ async def service_save_state(
app: FastAPI,
node_uuid: NodeID,
dynamic_sidecar_client: DynamicSidecarClient,
progress_callback: Optional[ProgressCallback] = None,
progress_callback: ProgressCallback | None = None,
) -> None:
scheduler_data: SchedulerData = _get_scheduler_data(app, node_uuid)
await dynamic_sidecar_client.save_service_state(
Expand All @@ -138,7 +139,7 @@ async def service_push_outputs(
app: FastAPI,
node_uuid: NodeID,
dynamic_sidecar_client: DynamicSidecarClient,
progress_callback: Optional[ProgressCallback] = None,
progress_callback: ProgressCallback | None = None,
) -> None:
scheduler_data: SchedulerData = _get_scheduler_data(app, node_uuid)
await dynamic_sidecar_client.push_service_output_ports(
Expand All @@ -151,7 +152,7 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes(
app: FastAPI,
node_uuid: NodeID,
dynamic_sidecar_settings: DynamicSidecarSettings,
set_were_state_and_outputs_saved: Optional[bool] = None,
set_were_state_and_outputs_saved: bool | None = None,
) -> None:
scheduler_data: SchedulerData = _get_scheduler_data(app, node_uuid)

Expand Down Expand Up @@ -234,7 +235,9 @@ async def attempt_pod_removal_and_data_saving(
)

async def _remove_containers_save_state_and_outputs() -> None:
dynamic_sidecar_client: DynamicSidecarClient = get_dynamic_sidecar_client(app)
dynamic_sidecar_client: DynamicSidecarClient = get_dynamic_sidecar_client(
app, scheduler_data.node_uuid
)

await service_remove_containers(
app, scheduler_data.node_uuid, dynamic_sidecar_client
Expand All @@ -258,8 +261,6 @@ async def _remove_containers_save_state_and_outputs() -> None:
)

if can_really_save and scheduler_data.dynamic_sidecar.were_containers_created:
dynamic_sidecar_client = get_dynamic_sidecar_client(app)

logger.info("Calling into dynamic-sidecar to save: state and output ports")
try:
tasks = [
Expand Down Expand Up @@ -323,6 +324,9 @@ async def _remove_containers_save_state_and_outputs() -> None:
TaskProgress.create(), app, scheduler_data.node_uuid, dynamic_sidecar_settings
)

# remove sidecar's api client
remove_dynamic_sidecar_client(app, scheduler_data.node_uuid)

# instrumentation
message = InstrumentationRabbitMessage(
metrics="service_stopped",
Expand All @@ -341,7 +345,7 @@ async def _remove_containers_save_state_and_outputs() -> None:
async def attach_project_networks(app: FastAPI, scheduler_data: SchedulerData) -> None:
logger.debug("Attaching project networks for %s", scheduler_data.service_name)

dynamic_sidecar_client = get_dynamic_sidecar_client(app)
dynamic_sidecar_client = get_dynamic_sidecar_client(app, scheduler_data.node_uuid)
dynamic_sidecar_endpoint = scheduler_data.endpoint

projects_networks_repository: ProjectsNetworksRepository = cast(
Expand Down Expand Up @@ -396,7 +400,7 @@ async def prepare_services_environment(
app: FastAPI, scheduler_data: SchedulerData
) -> None:
app_settings: AppSettings = app.state.settings
dynamic_sidecar_client = get_dynamic_sidecar_client(app)
dynamic_sidecar_client = get_dynamic_sidecar_client(app, scheduler_data.node_uuid)
dynamic_sidecar_endpoint = scheduler_data.endpoint
dynamic_sidecar_settings: DynamicSidecarSettings = (
app_settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR
Expand Down
Loading