From 51b214b342837ef8c68b77df31f7049c724cd4ea Mon Sep 17 00:00:00 2001 From: Andrei Neagu <5694077+GitHK@users.noreply.github.com> Date: Thu, 19 Dec 2024 09:11:20 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=A8=20expose=20`service=5Frun=5Fid`=20?= =?UTF-8?q?as=20an=20env=20var=20for=20both=20comp=20and=20new=20style=20d?= =?UTF-8?q?ynamic=20services=20(#6942)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Andrei Neagu --- .../licensed_items_checkouts.py | 4 +- .../service_runs.py | 5 +- .../api_schemas_webserver/resource_usage.py | 4 +- .../src/models_library/rabbitmq_messages.py | 3 +- .../src/models_library/resource_tracker.py | 1 - .../src/models_library/services.py | 4 +- .../src/models_library/services_types.py | 34 +++++++++-- .../tests/test_services_types.py | 40 +++++++++++++ .../licensed_items_checkouts.py | 4 +- .../webserver/licenses/licensed_items.py | 4 +- .../simcore_service_agent/models/volumes.py | 4 +- services/agent/tests/unit/conftest.py | 12 ++-- .../agent/tests/unit/test_services_backup.py | 6 +- .../tests/unit/test_services_docker_utils.py | 6 +- .../unit/test_services_volumes_manager.py | 12 ++-- .../models/dynamic_services_scheduler.py | 8 +-- .../modules/comp_scheduler/_scheduler_base.py | 6 +- .../modules/comp_scheduler/_scheduler_dask.py | 9 ++- .../modules/comp_scheduler/_utils.py | 10 ---- .../modules/dask_client.py | 3 + .../dynamic_sidecar/docker_compose_specs.py | 4 ++ .../docker_service_specs/sidecar.py | 12 ++-- .../scheduler/_core/_event_create_sidecars.py | 4 +- .../scheduler/_core/_events_user_services.py | 1 + .../modules/dynamic_sidecar/volumes.py | 38 ++++++------ .../modules/osparc_variables/substitutions.py | 6 ++ .../simcore_service_director_v2/utils/dask.py | 3 + .../utils/osparc_variables.py | 2 +- .../utils/rabbitmq.py | 7 ++- services/director-v2/tests/unit/conftest.py | 27 +++++++-- .../tests/unit/test_modules_dask_client.py | 25 ++++++++ ...odules_dynamic_sidecar_volumes_resolver.py | 48 +++++++++------ .../unit/test_modules_osparc_variables.py | 10 +++- .../tests/unit/test_utils_comp_scheduler.py | 30 ---------- .../comp_scheduler/test_scheduler_dask.py | 2 + ...es_dynamic_sidecar_docker_service_specs.py | 60 ++++++++----------- .../tests/unit/with_dbs/test_utils_dask.py | 3 + .../core/docker_utils.py | 10 ++-- .../core/errors.py | 2 +- .../core/settings.py | 4 +- .../modules/mounted_fs.py | 42 +++++++------ services/dynamic-sidecar/tests/conftest.py | 14 ++--- .../tests/unit/test_core_docker_utils.py | 22 +++---- .../tests/unit/test_core_errors.py | 4 +- .../tests/unit/test_core_validation.py | 4 +- .../tests/unit/test_modules_mounted_fs.py | 25 +++++--- .../unit/test_modules_outputs_manager.py | 4 +- .../unit/test_modules_outputs_watcher.py | 4 +- ...test_modules_system_monitor__disk_usage.py | 4 +- .../api/rpc/_licensed_items_checkouts.py | 4 +- .../models/credit_transactions.py | 10 ++-- .../models/licensed_items_checkouts.py | 6 +- .../models/service_runs.py | 12 ++-- ...ackground_task_periodic_heartbeat_check.py | 6 +- .../services/licensed_items_checkouts.py | 5 +- .../services/modules/db/service_runs_db.py | 10 ++-- .../services/utils.py | 5 +- .../licenses/_licensed_checkouts_api.py | 4 +- .../licenses/_rpc.py | 4 +- 59 files changed, 396 insertions(+), 270 deletions(-) create mode 100644 packages/models-library/tests/test_services_types.py diff --git a/packages/models-library/src/models_library/api_schemas_resource_usage_tracker/licensed_items_checkouts.py b/packages/models-library/src/models_library/api_schemas_resource_usage_tracker/licensed_items_checkouts.py index 00e136d79ef..f14117b3439 100644 --- a/packages/models-library/src/models_library/api_schemas_resource_usage_tracker/licensed_items_checkouts.py +++ b/packages/models-library/src/models_library/api_schemas_resource_usage_tracker/licensed_items_checkouts.py @@ -3,10 +3,10 @@ from models_library.licensed_items import LicensedItemID from models_library.products import ProductName -from models_library.resource_tracker import ServiceRunId from models_library.resource_tracker_licensed_items_checkouts import ( LicensedItemCheckoutID, ) +from models_library.services_types import ServiceRunID from models_library.users import UserID from models_library.wallets import WalletID from pydantic import BaseModel, ConfigDict, PositiveInt @@ -18,7 +18,7 @@ class LicensedItemCheckoutGet(BaseModel): wallet_id: WalletID user_id: UserID product_name: ProductName - service_run_id: ServiceRunId + service_run_id: ServiceRunID started_at: datetime stopped_at: datetime | None num_of_seats: int diff --git a/packages/models-library/src/models_library/api_schemas_resource_usage_tracker/service_runs.py b/packages/models-library/src/models_library/api_schemas_resource_usage_tracker/service_runs.py index 72001f8b550..e16ba7ce108 100644 --- a/packages/models-library/src/models_library/api_schemas_resource_usage_tracker/service_runs.py +++ b/packages/models-library/src/models_library/api_schemas_resource_usage_tracker/service_runs.py @@ -6,14 +6,15 @@ from ..projects import ProjectID from ..projects_nodes_io import NodeID -from ..resource_tracker import CreditTransactionStatus, ServiceRunId, ServiceRunStatus +from ..resource_tracker import CreditTransactionStatus, ServiceRunStatus from ..services import ServiceKey, ServiceVersion +from ..services_types import ServiceRunID from ..users import UserID from ..wallets import WalletID class ServiceRunGet(BaseModel): - service_run_id: ServiceRunId + service_run_id: ServiceRunID wallet_id: WalletID | None wallet_name: str | None user_id: UserID diff --git a/packages/models-library/src/models_library/api_schemas_webserver/resource_usage.py b/packages/models-library/src/models_library/api_schemas_webserver/resource_usage.py index 3eea55c6d67..506db2aee3c 100644 --- a/packages/models-library/src/models_library/api_schemas_webserver/resource_usage.py +++ b/packages/models-library/src/models_library/api_schemas_webserver/resource_usage.py @@ -11,12 +11,12 @@ PricingPlanId, PricingUnitCostUpdate, PricingUnitId, - ServiceRunId, ServiceRunStatus, SpecificInfo, UnitExtraInfo, ) from ..services import ServiceKey, ServiceVersion +from ..services_types import ServiceRunID from ..users import UserID from ..wallets import WalletID from ._base import InputSchema, OutputSchema @@ -27,7 +27,7 @@ class ServiceRunGet( BaseModel ): # NOTE: this is already in use so I didnt modidy inheritance from OutputSchema - service_run_id: ServiceRunId + service_run_id: ServiceRunID wallet_id: WalletID | None wallet_name: str | None user_id: UserID diff --git a/packages/models-library/src/models_library/rabbitmq_messages.py b/packages/models-library/src/models_library/rabbitmq_messages.py index dd891758603..0cd6bd0874b 100644 --- a/packages/models-library/src/models_library/rabbitmq_messages.py +++ b/packages/models-library/src/models_library/rabbitmq_messages.py @@ -15,6 +15,7 @@ from .projects_state import RunningState from .services import ServiceKey, ServiceType, ServiceVersion from .services_resources import ServiceResourcesDict +from .services_types import ServiceRunID from .users import UserID from .utils.enums import StrAutoEnum from .wallets import WalletID @@ -178,7 +179,7 @@ class RabbitResourceTrackingMessageType(StrAutoEnum): class RabbitResourceTrackingBaseMessage(RabbitMessageBase): channel_name: Literal["io.simcore.service.tracking"] = "io.simcore.service.tracking" - service_run_id: str = Field( + service_run_id: ServiceRunID = Field( ..., description="uniquely identitifies the service run" ) created_at: datetime.datetime = Field( diff --git a/packages/models-library/src/models_library/resource_tracker.py b/packages/models-library/src/models_library/resource_tracker.py index 20e35b7e614..629633aa8c8 100644 --- a/packages/models-library/src/models_library/resource_tracker.py +++ b/packages/models-library/src/models_library/resource_tracker.py @@ -20,7 +20,6 @@ _logger = logging.getLogger(__name__) -ServiceRunId: TypeAlias = str PricingPlanId: TypeAlias = PositiveInt PricingUnitId: TypeAlias = PositiveInt PricingUnitCostId: TypeAlias = PositiveInt diff --git a/packages/models-library/src/models_library/services.py b/packages/models-library/src/models_library/services.py index 23874571f96..cd20682f52d 100644 --- a/packages/models-library/src/models_library/services.py +++ b/packages/models-library/src/models_library/services.py @@ -7,9 +7,9 @@ from .services_metadata_published import ServiceInputsDict, ServiceMetaDataPublished from .services_types import ( DynamicServiceKey, - RunID, ServiceKey, ServicePortKey, + ServiceRunID, ServiceVersion, ) @@ -21,7 +21,6 @@ "BootOptions", "DynamicServiceKey", "LATEST_INTEGRATION_VERSION", - "RunID", "ServiceInput", "ServiceInputsDict", "ServiceKey", @@ -29,6 +28,7 @@ "ServiceMetaDataPublished", "ServiceOutput", "ServicePortKey", + "ServiceRunID", "ServiceType", "ServiceVersion", ) diff --git a/packages/models-library/src/models_library/services_types.py b/packages/models-library/src/models_library/services_types.py index 03c0bb4bf5d..b6689fdf888 100644 --- a/packages/models-library/src/models_library/services_types.py +++ b/packages/models-library/src/models_library/services_types.py @@ -1,11 +1,17 @@ -from typing import Annotated, Any, TypeAlias +from typing import TYPE_CHECKING, Annotated, Any, Self, TypeAlias from uuid import uuid4 import arrow -from pydantic import GetCoreSchemaHandler, StringConstraints, ValidationInfo +from pydantic import ( + GetCoreSchemaHandler, + PositiveInt, + StringConstraints, + ValidationInfo, +) from pydantic_core import CoreSchema, core_schema from .basic_regex import PROPERTY_KEY_RE, SIMPLE_VERSION_RE +from .projects_nodes_io import NodeID from .services_regex import ( COMPUTATIONAL_SERVICE_KEY_RE, DYNAMIC_SERVICE_KEY_RE, @@ -13,6 +19,10 @@ SERVICE_ENCODED_KEY_RE, SERVICE_KEY_RE, ) +from .users import UserID + +if TYPE_CHECKING: + from .projects import ProjectID ServicePortKey: TypeAlias = Annotated[str, StringConstraints(pattern=PROPERTY_KEY_RE)] @@ -35,7 +45,7 @@ ServiceVersion: TypeAlias = Annotated[str, StringConstraints(pattern=SIMPLE_VERSION_RE)] -class RunID(str): +class ServiceRunID(str): """ Used to assign a unique identifier to the run of a service. @@ -44,12 +54,15 @@ class RunID(str): and old volumes for different runs. Avoids overwriting data that left dropped on the node (due to an error) and gives the osparc-agent an opportunity to back it up. + The resource-usage-tracker tracker uses these RunIDs to keep track of + resource usage from computational and dynamic services. """ __slots__ = () @classmethod - def create(cls) -> "RunID": + def get_resource_tracking_run_id_for_dynamic(cls) -> Self: + """used for dynamic services""" # NOTE: there was a legacy version of this RunID # legacy version: # '0ac3ed64-665b-42d2-95f7-e59e0db34242' @@ -59,6 +72,17 @@ def create(cls) -> "RunID": run_id_format = f"{utc_int_timestamp}_{uuid4()}" return cls(run_id_format) + @classmethod + def get_resource_tracking_run_id_for_computational( + cls, + user_id: UserID, + project_id: "ProjectID", + node_id: NodeID, + iteration: PositiveInt, + ) -> Self: + """used by computational services""" + return cls(f"comp_{user_id}_{project_id}_{node_id}_{iteration}") + @classmethod def __get_pydantic_core_schema__( cls, @@ -68,7 +92,7 @@ def __get_pydantic_core_schema__( return core_schema.no_info_after_validator_function(cls, handler(str)) @classmethod - def validate(cls, v: "RunID | str", _: ValidationInfo) -> "RunID": + def validate(cls, v: "ServiceRunID | str", _: ValidationInfo) -> "ServiceRunID": if isinstance(v, cls): return v if isinstance(v, str): diff --git a/packages/models-library/tests/test_services_types.py b/packages/models-library/tests/test_services_types.py new file mode 100644 index 00000000000..206c531a78f --- /dev/null +++ b/packages/models-library/tests/test_services_types.py @@ -0,0 +1,40 @@ +import pytest +from models_library.projects import ProjectID +from models_library.projects_nodes import NodeID +from models_library.services_types import ServiceRunID +from models_library.users import UserID +from pydantic import PositiveInt + + +@pytest.mark.parametrize( + "user_id, project_id, node_id, iteration, expected_result", + [ + ( + 2, + ProjectID("e08356e4-eb74-49e9-b769-2c26e34c61d9"), + NodeID("a08356e4-eb74-49e9-b769-2c26e34c61d1"), + 5, + "comp_2_e08356e4-eb74-49e9-b769-2c26e34c61d9_a08356e4-eb74-49e9-b769-2c26e34c61d1_5", + ) + ], +) +def test_run_id_get_resource_tracking_run_id( + user_id: UserID, + project_id: ProjectID, + node_id: NodeID, + iteration: PositiveInt, + expected_result: str, +): + resource_tracking_service_run_id = ( + ServiceRunID.get_resource_tracking_run_id_for_computational( + user_id, project_id, node_id, iteration + ) + ) + assert isinstance(resource_tracking_service_run_id, ServiceRunID) + assert resource_tracking_service_run_id == expected_result + + +def test_get_resource_tracking_run_id_for_dynamic(): + assert isinstance( + ServiceRunID.get_resource_tracking_run_id_for_dynamic(), ServiceRunID + ) diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/resource_usage_tracker/licensed_items_checkouts.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/resource_usage_tracker/licensed_items_checkouts.py index 62032c63383..ed8c85dfd37 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/resource_usage_tracker/licensed_items_checkouts.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/resource_usage_tracker/licensed_items_checkouts.py @@ -12,11 +12,11 @@ from models_library.licensed_items import LicensedItemID from models_library.products import ProductName from models_library.rabbitmq_basic_types import RPCMethodName -from models_library.resource_tracker import ServiceRunId from models_library.resource_tracker_licensed_items_checkouts import ( LicensedItemCheckoutID, ) from models_library.rest_ordering import OrderBy +from models_library.services_types import ServiceRunID from models_library.users import UserID from models_library.wallets import WalletID from pydantic import NonNegativeInt, TypeAdapter @@ -88,7 +88,7 @@ async def checkout_licensed_item( wallet_id: WalletID, product_name: ProductName, num_of_seats: int, - service_run_id: ServiceRunId, + service_run_id: ServiceRunID, user_id: UserID, user_email: str, ) -> LicensedItemCheckoutGet: diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/webserver/licenses/licensed_items.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/webserver/licenses/licensed_items.py index cb20f00be0a..f767882d247 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/webserver/licenses/licensed_items.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/webserver/licenses/licensed_items.py @@ -11,10 +11,10 @@ from models_library.licensed_items import LicensedItemID from models_library.products import ProductName from models_library.rabbitmq_basic_types import RPCMethodName -from models_library.resource_tracker import ServiceRunId from models_library.resource_tracker_licensed_items_checkouts import ( LicensedItemCheckoutID, ) +from models_library.services_types import ServiceRunID from models_library.users import UserID from models_library.wallets import WalletID from pydantic import TypeAdapter @@ -77,7 +77,7 @@ async def checkout_licensed_item_for_wallet( wallet_id: WalletID, licensed_item_id: LicensedItemID, num_of_seats: int, - service_run_id: ServiceRunId, + service_run_id: ServiceRunID, ) -> LicensedItemCheckoutGet: result = await rabbitmq_rpc_client.request( WEBSERVER_RPC_NAMESPACE, diff --git a/services/agent/src/simcore_service_agent/models/volumes.py b/services/agent/src/simcore_service_agent/models/volumes.py index cf227bf69e9..68f20cae559 100644 --- a/services/agent/src/simcore_service_agent/models/volumes.py +++ b/services/agent/src/simcore_service_agent/models/volumes.py @@ -6,14 +6,14 @@ ) from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID -from models_library.services_types import RunID +from models_library.services_types import ServiceRunID from models_library.users import UserID from pydantic import BaseModel, ConfigDict, Field, TypeAdapter class DynamicServiceVolumeLabels(BaseModel): node_uuid: NodeID - run_id: RunID + run_id: ServiceRunID source: str study_id: ProjectID swarm_stack_name: str diff --git a/services/agent/tests/unit/conftest.py b/services/agent/tests/unit/conftest.py index 1a49ce6ba57..4b23619f5a0 100644 --- a/services/agent/tests/unit/conftest.py +++ b/services/agent/tests/unit/conftest.py @@ -15,7 +15,7 @@ from fastapi.testclient import TestClient from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID -from models_library.services_types import RunID +from models_library.services_types import ServiceRunID from models_library.users import UserID from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict from settings_library.rabbit import RabbitSettings @@ -56,8 +56,8 @@ def test_client(initialized_app: FastAPI) -> TestClient: @pytest.fixture -def run_id() -> RunID: - return RunID.create() +def service_run_id() -> ServiceRunID: + return ServiceRunID.get_resource_tracking_run_id_for_dynamic() @pytest.fixture @@ -77,7 +77,7 @@ def volumes_path(tmp_path: Path) -> Path: @pytest.fixture async def create_dynamic_sidecar_volume( - run_id: RunID, + service_run_id: ServiceRunID, project_id: ProjectID, swarm_stack_name: str, user_id: UserID, @@ -89,13 +89,13 @@ async def create_dynamic_sidecar_volume( async with aiodocker.Docker() as docker_client: async def _(node_id: NodeID, in_use: bool, volume_name: str) -> str: - source = get_source(run_id, node_id, volumes_path / volume_name) + source = get_source(service_run_id, node_id, volumes_path / volume_name) volume = await docker_client.volumes.create( { "Name": source, "Labels": { "node_uuid": f"{node_id}", - "run_id": run_id, + "run_id": service_run_id, "source": source, "study_id": f"{project_id}", "swarm_stack_name": swarm_stack_name, diff --git a/services/agent/tests/unit/test_services_backup.py b/services/agent/tests/unit/test_services_backup.py index c986550da51..d544a25dfa5 100644 --- a/services/agent/tests/unit/test_services_backup.py +++ b/services/agent/tests/unit/test_services_backup.py @@ -11,7 +11,7 @@ from fastapi import FastAPI from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID -from models_library.services_types import RunID +from models_library.services_types import ServiceRunID from pydantic import NonNegativeInt from simcore_service_agent.core.settings import ApplicationSettings from simcore_service_agent.services.backup import backup_volume @@ -48,7 +48,7 @@ async def test_backup_volume( volume_content: Path, project_id: ProjectID, swarm_stack_name: str, - run_id: RunID, + service_run_id: ServiceRunID, downlaoded_from_s3: Path, create_dynamic_sidecar_volumes: Callable[[NodeID, bool], Awaitable[set[str]]], initialized_app: FastAPI, @@ -80,7 +80,7 @@ async def test_backup_volume( async with session.client("s3", endpoint_url=f"{settings.AGENT_VOLUMES_CLEANUP_S3_ENDPOINT}") as s3_client: # type: ignore list_response = await s3_client.list_objects_v2( Bucket=settings.AGENT_VOLUMES_CLEANUP_S3_BUCKET, - Prefix=f"{swarm_stack_name}/{project_id}/{node_id}/{run_id}", + Prefix=f"{swarm_stack_name}/{project_id}/{node_id}/{service_run_id}", ) synced_keys: list[str] = [o["Key"] for o in list_response["Contents"]] diff --git a/services/agent/tests/unit/test_services_docker_utils.py b/services/agent/tests/unit/test_services_docker_utils.py index 40f86529edb..f4a19c9b9aa 100644 --- a/services/agent/tests/unit/test_services_docker_utils.py +++ b/services/agent/tests/unit/test_services_docker_utils.py @@ -10,7 +10,7 @@ from aiodocker.docker import Docker from fastapi import FastAPI from models_library.projects_nodes_io import NodeID -from models_library.services_types import RunID +from models_library.services_types import ServiceRunID from pytest_mock import MockerFixture from servicelib.docker_constants import PREFIX_DYNAMIC_SIDECAR_VOLUMES from simcore_service_agent.services.docker_utils import ( @@ -43,9 +43,9 @@ def test__reverse_string(): ], ) def test__does_volume_require_backup( - run_id: RunID, volume_path_part: str, expected: bool + service_run_id: ServiceRunID, volume_path_part: str, expected: bool ) -> None: - volume_name = get_source(run_id, uuid4(), Path("/apath") / volume_path_part) + volume_name = get_source(service_run_id, uuid4(), Path("/apath") / volume_path_part) print(volume_name) assert _does_volume_require_backup(volume_name) is expected diff --git a/services/agent/tests/unit/test_services_volumes_manager.py b/services/agent/tests/unit/test_services_volumes_manager.py index 0dfc29ceb83..4ac429aeca9 100644 --- a/services/agent/tests/unit/test_services_volumes_manager.py +++ b/services/agent/tests/unit/test_services_volumes_manager.py @@ -14,7 +14,7 @@ from aiodocker.docker import Docker from fastapi import FastAPI from models_library.projects_nodes_io import NodeID -from models_library.services_types import RunID +from models_library.services_types import ServiceRunID from servicelib.rabbitmq.rpc_interfaces.agent.errors import ( NoServiceVolumesFoundRPCError, ) @@ -30,12 +30,14 @@ @dataclass class MockedVolumesProxy: - run_id: RunID + service_run_id: ServiceRunID volumes: set[str] = field(default_factory=set) def add_unused_volumes_for_service(self, node_id: NodeID) -> None: for folder_name in VOLUMES_TO_CREATE: - volume_name = get_source(self.run_id, node_id, Path("/apath") / folder_name) + volume_name = get_source( + self.service_run_id, node_id, Path("/apath") / folder_name + ) self.volumes.add(volume_name) def remove_volume(self, volume_name: str) -> None: @@ -47,9 +49,9 @@ def get_unused_dynamc_sidecar_volumes(self) -> set[str]: @pytest.fixture async def mock_docker_utils( - mocker: pytest_mock.MockerFixture, run_id: RunID + mocker: pytest_mock.MockerFixture, service_run_id: ServiceRunID ) -> MockedVolumesProxy: - proxy = MockedVolumesProxy(run_id) + proxy = MockedVolumesProxy(service_run_id) async def _remove_volume( app: FastAPI, docker: Docker, *, volume_name: str, requires_backup: bool diff --git a/services/director-v2/src/simcore_service_director_v2/models/dynamic_services_scheduler.py b/services/director-v2/src/simcore_service_director_v2/models/dynamic_services_scheduler.py index 560517b538c..56c2a27c9c2 100644 --- a/services/director-v2/src/simcore_service_director_v2/models/dynamic_services_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/models/dynamic_services_scheduler.py @@ -29,7 +29,7 @@ PathMappingsLabel, SimcoreServiceLabels, ) -from models_library.services import RunID +from models_library.services import ServiceRunID from models_library.services_resources import ServiceResourcesDict from models_library.wallets import WalletInfo from pydantic import ( @@ -379,8 +379,8 @@ class SchedulerData(CommonServiceDetails, DynamicSidecarServiceLabels): ..., description="Name of the current dynamic-sidecar being observed", ) - run_id: RunID = Field( - default_factory=RunID.create, + run_id: ServiceRunID = Field( + default_factory=ServiceRunID.get_resource_tracking_run_id_for_dynamic, description=( "Uniquely identify the dynamic sidecar session (a.k.a. 2 " "subsequent exact same services will have a different run_id)" @@ -486,7 +486,7 @@ def from_http_request( request_scheme: str, request_simcore_user_agent: str, can_save: bool, - run_id: RunID | None = None, + run_id: ServiceRunID | None = None, ) -> "SchedulerData": # This constructor method sets current product names_helper = DynamicSidecarNamesHelper.make(service.node_uuid) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py index b959c9c8014..22b8677611f 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py @@ -26,6 +26,7 @@ from models_library.projects_nodes_io import NodeID, NodeIDStr from models_library.projects_state import RunningState from models_library.services import ServiceType +from models_library.services_types import ServiceRunID from models_library.users import UserID from networkx.classes.reportviews import InDegreeView from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE @@ -66,7 +67,6 @@ TASK_TO_START_STATES, WAITING_FOR_START_STATES, create_service_resources_from_task, - get_resource_tracking_run_id, ) _logger = logging.getLogger(__name__) @@ -295,7 +295,7 @@ def _need_heartbeat(task: CompTaskAtDB) -> bool: *( publish_service_resource_tracking_heartbeat( self.rabbitmq_client, - get_resource_tracking_run_id( + ServiceRunID.get_resource_tracking_run_id_for_computational( user_id, t.project_id, t.node_id, iteration ), ) @@ -348,7 +348,7 @@ async def _process_started_tasks( *( publish_service_resource_tracking_started( self.rabbitmq_client, - service_run_id=get_resource_tracking_run_id( + service_run_id=ServiceRunID.get_resource_tracking_run_id_for_computational( user_id, t.project_id, t.node_id, iteration ), wallet_id=run_metadata.get("wallet_id"), diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py index 153378e9ee5..cc33c129f1b 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py @@ -18,6 +18,7 @@ from models_library.projects_nodes_io import NodeID from models_library.projects_state import RunningState from models_library.rabbitmq_messages import SimcorePlatformStatus +from models_library.services_types import ServiceRunID from models_library.users import UserID from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE from servicelib.logging_utils import log_catch @@ -48,7 +49,6 @@ from ..db.repositories.comp_runs import CompRunsRepository from ..db.repositories.comp_tasks import CompTasksRepository from ._scheduler_base import BaseCompScheduler -from ._utils import get_resource_tracking_run_id _logger = logging.getLogger(__name__) @@ -129,6 +129,9 @@ async def _start_tasks( hardware_info=task.hardware_info, callback=wake_up_callback, metadata=comp_run.metadata, + resource_tracking_run_id=ServiceRunID.get_resource_tracking_run_id_for_computational( + user_id, project_id, node_id, comp_run.iteration + ), ) for node_id, task in scheduled_tasks.items() ), @@ -319,7 +322,9 @@ async def _process_task_result( # resource tracking await publish_service_resource_tracking_stopped( self.rabbitmq_client, - get_resource_tracking_run_id(user_id, project_id, node_id, iteration), + ServiceRunID.get_resource_tracking_run_id_for_computational( + user_id, project_id, node_id, iteration + ), simcore_platform_status=simcore_platform_status, ) # instrumentation diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_utils.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_utils.py index 9d2722e3b6c..dc414376db0 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_utils.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_utils.py @@ -2,19 +2,15 @@ from fastapi import FastAPI from models_library.docker import DockerGenericTag -from models_library.projects import ProjectID -from models_library.projects_nodes_io import NodeID from models_library.projects_state import RunningState from models_library.services_resources import ( ResourceValue, ServiceResourcesDict, ServiceResourcesDictHelpers, ) -from models_library.users import UserID from servicelib.redis import RedisClientSDK from settings_library.redis import RedisDatabase -from ...models.comp_runs import Iteration from ...models.comp_tasks import CompTaskAtDB from ..redis import get_redis_client_manager @@ -55,12 +51,6 @@ } -def get_resource_tracking_run_id( - user_id: UserID, project_id: ProjectID, node_id: NodeID, iteration: Iteration -) -> str: - return f"comp_{user_id}_{project_id}_{node_id}_{iteration}" - - def create_service_resources_from_task(task: CompTaskAtDB) -> ServiceResourcesDict: assert task.image.node_requirements # nosec return ServiceResourcesDictHelpers.create_from_single_service( diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py index 181c6c22a6d..731388f8ae9 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py @@ -47,6 +47,7 @@ from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from models_library.resource_tracker import HardwareInfo +from models_library.services import ServiceRunID from models_library.users import UserID from pydantic import TypeAdapter, ValidationError from pydantic.networks import AnyUrl @@ -293,6 +294,7 @@ async def send_computation_tasks( remote_fct: ContainerRemoteFct | None = None, metadata: RunMetadataDict, hardware_info: HardwareInfo, + resource_tracking_run_id: ServiceRunID, ) -> list[PublishedComputationTask]: """actually sends the function remote_fct to be remotely executed. if None is kept then the default function that runs container will be started. @@ -396,6 +398,7 @@ async def send_computation_tasks( node_id=node_id, node_image=node_image, metadata=metadata, + resource_tracking_run_id=resource_tracking_run_id, ) task_owner = dask_utils.compute_task_owner( user_id, project_id, node_id, metadata.get("project_metadata", {}) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_compose_specs.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_compose_specs.py index 98ba1ea2f40..54f791a0a53 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_compose_specs.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_compose_specs.py @@ -20,6 +20,7 @@ ResourceValue, ServiceResourcesDict, ) +from models_library.services_types import ServiceRunID from models_library.users import UserID from models_library.utils.docker_compose import replace_env_vars_in_compose_spec from pydantic import ByteSize @@ -278,6 +279,7 @@ async def assemble_spec( # pylint: disable=too-many-arguments # noqa: PLR0913 node_id: NodeID, simcore_user_agent: str, swarm_stack_name: str, + service_run_id: ServiceRunID, ) -> str: """ returns a docker-compose spec used by @@ -350,6 +352,7 @@ async def assemble_spec( # pylint: disable=too-many-arguments # noqa: PLR0913 product_name=product_name, project_id=project_id, node_id=node_id, + service_run_id=service_run_id, ) add_egress_configuration( @@ -388,6 +391,7 @@ async def assemble_spec( # pylint: disable=too-many-arguments # noqa: PLR0913 product_name=product_name, project_id=project_id, node_id=node_id, + service_run_id=service_run_id, ) stringified_service_spec: str = replace_env_vars_in_compose_spec( diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py index 4ee83bee16f..35fa3ae9ae5 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py @@ -247,7 +247,7 @@ async def _get_mounts( DynamicSidecarVolumesPathsResolver.mount_shared_store( swarm_stack_name=dynamic_services_scheduler_settings.SWARM_STACK_NAME, node_uuid=scheduler_data.node_uuid, - run_id=scheduler_data.run_id, + service_run_id=scheduler_data.run_id, project_id=scheduler_data.project_id, user_id=scheduler_data.user_id, has_quota_support=has_quota_support, @@ -279,7 +279,7 @@ async def _get_mounts( swarm_stack_name=dynamic_services_scheduler_settings.SWARM_STACK_NAME, path=path_to_mount, node_uuid=scheduler_data.node_uuid, - run_id=scheduler_data.run_id, + service_run_id=scheduler_data.run_id, project_id=scheduler_data.project_id, user_id=scheduler_data.user_id, volume_size_limit=volume_size_limits.get(f"{path_to_mount}"), @@ -305,7 +305,7 @@ async def _get_mounts( swarm_stack_name=dynamic_services_scheduler_settings.SWARM_STACK_NAME, path=path_to_mount, node_uuid=scheduler_data.node_uuid, - run_id=scheduler_data.run_id, + service_run_id=scheduler_data.run_id, project_id=scheduler_data.project_id, user_id=scheduler_data.user_id, efs_settings=dynamic_sidecar_settings.DYNAMIC_SIDECAR_EFS_SETTINGS, @@ -319,7 +319,7 @@ async def _get_mounts( swarm_stack_name=dynamic_services_scheduler_settings.SWARM_STACK_NAME, path=path_to_mount, node_uuid=scheduler_data.node_uuid, - run_id=scheduler_data.run_id, + service_run_id=scheduler_data.run_id, project_id=scheduler_data.project_id, user_id=scheduler_data.user_id, r_clone_settings=dynamic_sidecar_settings.DYNAMIC_SIDECAR_R_CLONE_SETTINGS, @@ -331,7 +331,7 @@ async def _get_mounts( swarm_stack_name=dynamic_services_scheduler_settings.SWARM_STACK_NAME, path=path_to_mount, node_uuid=scheduler_data.node_uuid, - run_id=scheduler_data.run_id, + service_run_id=scheduler_data.run_id, project_id=scheduler_data.project_id, user_id=scheduler_data.user_id, volume_size_limit=volume_size_limits.get(f"{path_to_mount}"), @@ -372,7 +372,7 @@ async def _get_mounts( user_preferences_path=scheduler_data.user_preferences_path, swarm_stack_name=dynamic_services_scheduler_settings.SWARM_STACK_NAME, node_uuid=scheduler_data.node_uuid, - run_id=scheduler_data.run_id, + service_run_id=scheduler_data.run_id, project_id=scheduler_data.project_id, user_id=scheduler_data.user_id, has_quota_support=has_quota_support, diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_event_create_sidecars.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_event_create_sidecars.py index c6a4cba08f3..8352cab1f8a 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_event_create_sidecars.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_event_create_sidecars.py @@ -17,7 +17,7 @@ ProgressType, ) from models_library.service_settings_labels import SimcoreServiceSettingsLabel -from models_library.services import RunID +from models_library.services import ServiceRunID from servicelib.rabbitmq import RabbitMQClient, RabbitMQRPCClient from simcore_postgres_database.models.comp_tasks import NodeClass @@ -237,7 +237,7 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: # Each time a new dynamic-sidecar service is created # generate a new `run_id` to avoid resource collisions - scheduler_data.run_id = RunID.create() + scheduler_data.run_id = ServiceRunID.get_resource_tracking_run_id_for_dynamic() rpc_client: RabbitMQRPCClient = app.state.rabbitmq_rpc_client diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_user_services.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_user_services.py index a6976796560..289cfc162c9 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_user_services.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_user_services.py @@ -103,6 +103,7 @@ async def submit_compose_sepc(app: FastAPI, scheduler_data: SchedulerData) -> No node_id=scheduler_data.node_uuid, simcore_user_agent=scheduler_data.request_simcore_user_agent, swarm_stack_name=dynamic_services_scheduler_settings.SWARM_STACK_NAME, + service_run_id=scheduler_data.run_id, ) _logger.debug( diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/volumes.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/volumes.py index 7f55dc68498..bf375b29eed 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/volumes.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/volumes.py @@ -7,7 +7,7 @@ ) from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID -from models_library.services import RunID +from models_library.services import ServiceRunID from models_library.users import UserID from servicelib.docker_constants import PREFIX_DYNAMIC_SIDECAR_VOLUMES from settings_library.efs import ( @@ -120,7 +120,7 @@ def volume_name(cls, path: Path) -> str: return f"{path}".replace(os.sep, "_") @classmethod - def source(cls, path: Path, node_uuid: NodeID, run_id: RunID) -> str: + def source(cls, path: Path, node_uuid: NodeID, service_run_id: ServiceRunID) -> str: """Returns a valid and unique volume name that is composed out of identifiers, namely - relative target path - node_uuid @@ -138,7 +138,7 @@ def source(cls, path: Path, node_uuid: NodeID, run_id: RunID) -> str: reversed_volume_name = cls.volume_name(path)[::-1] # ensure prefix size does not change - prefix = f"{PREFIX_DYNAMIC_SIDECAR_VOLUMES}_{run_id}_{node_uuid}" + prefix = f"{PREFIX_DYNAMIC_SIDECAR_VOLUMES}_{service_run_id}_{node_uuid}" assert len(prefix) == CHARS_IN_VOLUME_NAME_BEFORE_DIR_NAME - 1 # nosec unique_name = f"{prefix}_{reversed_volume_name}" @@ -150,7 +150,7 @@ def mount_entry( swarm_stack_name: str, path: Path, node_uuid: NodeID, - run_id: RunID, + service_run_id: ServiceRunID, project_id: ProjectID, user_id: UserID, volume_size_limit: str | None, @@ -159,13 +159,13 @@ def mount_entry( Creates specification for mount to be added to containers created as part of a service """ return { - "Source": cls.source(path, node_uuid, run_id), + "Source": cls.source(path, node_uuid, service_run_id), "Target": cls.target(path), "Type": "volume", "VolumeOptions": { "Labels": { - "source": cls.source(path, node_uuid, run_id), - "run_id": f"{run_id}", + "source": cls.source(path, node_uuid, service_run_id), + "run_id": f"{service_run_id}", "node_uuid": f"{node_uuid}", "study_id": f"{project_id}", "user_id": f"{user_id}", @@ -182,7 +182,7 @@ def mount_entry( @classmethod def mount_shared_store( cls, - run_id: RunID, + service_run_id: ServiceRunID, node_uuid: NodeID, project_id: ProjectID, user_id: UserID, @@ -194,7 +194,7 @@ def mount_shared_store( swarm_stack_name=swarm_stack_name, path=DY_SIDECAR_SHARED_STORE_PATH, node_uuid=node_uuid, - run_id=run_id, + service_run_id=service_run_id, project_id=project_id, user_id=user_id, volume_size_limit="1M" if has_quota_support else None, @@ -204,7 +204,7 @@ def mount_shared_store( def mount_user_preferences( cls, user_preferences_path: Path, - run_id: RunID, + service_run_id: ServiceRunID, node_uuid: NodeID, project_id: ProjectID, user_id: UserID, @@ -216,7 +216,7 @@ def mount_user_preferences( swarm_stack_name=swarm_stack_name, path=user_preferences_path, node_uuid=node_uuid, - run_id=run_id, + service_run_id=service_run_id, project_id=project_id, user_id=user_id, # NOTE: the contents of this volume will be zipped and much @@ -231,19 +231,19 @@ def mount_r_clone( swarm_stack_name: str, path: Path, node_uuid: NodeID, - run_id: RunID, + service_run_id: ServiceRunID, project_id: ProjectID, user_id: UserID, r_clone_settings: RCloneSettings, ) -> dict[str, Any]: return { - "Source": cls.source(path, node_uuid, run_id), + "Source": cls.source(path, node_uuid, service_run_id), "Target": cls.target(path), "Type": "volume", "VolumeOptions": { "Labels": { - "source": cls.source(path, node_uuid, run_id), - "run_id": f"{run_id}", + "source": cls.source(path, node_uuid, service_run_id), + "run_id": f"{service_run_id}", "node_uuid": f"{node_uuid}", "study_id": f"{project_id}", "user_id": f"{user_id}", @@ -264,20 +264,20 @@ def mount_efs( swarm_stack_name: str, path: Path, node_uuid: NodeID, - run_id: RunID, + service_run_id: ServiceRunID, project_id: ProjectID, user_id: UserID, efs_settings: AwsEfsSettings, storage_directory_name: str, ) -> dict[str, Any]: return { - "Source": cls.source(path, node_uuid, run_id), + "Source": cls.source(path, node_uuid, service_run_id), "Target": cls.target(path), "Type": "volume", "VolumeOptions": { "Labels": { - "source": cls.source(path, node_uuid, run_id), - "run_id": f"{run_id}", + "source": cls.source(path, node_uuid, service_run_id), + "run_id": f"{service_run_id}", "node_uuid": f"{node_uuid}", "study_id": f"{project_id}", "user_id": f"{user_id}", diff --git a/services/director-v2/src/simcore_service_director_v2/modules/osparc_variables/substitutions.py b/services/director-v2/src/simcore_service_director_v2/modules/osparc_variables/substitutions.py index a855295d9e4..2249937341d 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/osparc_variables/substitutions.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/osparc_variables/substitutions.py @@ -18,6 +18,7 @@ from models_library.projects_nodes_io import NodeID from models_library.service_settings_labels import ComposeSpecLabelDict from models_library.services import ServiceKey, ServiceVersion +from models_library.services_types import ServiceRunID from models_library.users import UserID from models_library.utils.specs_substitution import SpecsSubstitutionsResolver from pydantic import BaseModel @@ -120,6 +121,7 @@ def create(cls, app: FastAPI): ("OSPARC_VARIABLE_NODE_ID", "node_id"), ("OSPARC_VARIABLE_PRODUCT_NAME", "product_name"), ("OSPARC_VARIABLE_STUDY_UUID", "project_id"), + ("OSPARC_VARIABLE_SERVICE_RUN_ID", "run_id"), ("OSPARC_VARIABLE_USER_ID", "user_id"), ("OSPARC_VARIABLE_API_HOST", "api_server_base_url"), ]: @@ -181,6 +183,7 @@ async def resolve_and_substitute_session_variables_in_model( product_name: str, project_id: ProjectID, node_id: NodeID, + service_run_id: ServiceRunID, ) -> TBaseModel: result: TBaseModel = model try: @@ -200,6 +203,7 @@ async def resolve_and_substitute_session_variables_in_model( product_name=product_name, project_id=project_id, node_id=node_id, + run_id=service_run_id, api_server_base_url=app.state.settings.DIRECTOR_V2_PUBLIC_API_BASE_URL, ), ) @@ -221,6 +225,7 @@ async def resolve_and_substitute_session_variables_in_specs( product_name: str, project_id: ProjectID, node_id: NodeID, + service_run_id: ServiceRunID, ) -> dict[str, Any]: table = OsparcSessionVariablesTable.get_from_app_state(app) resolver = SpecsSubstitutionsResolver(specs, upgrade=False) @@ -241,6 +246,7 @@ async def resolve_and_substitute_session_variables_in_specs( product_name=product_name, project_id=project_id, node_id=node_id, + run_id=service_run_id, api_server_base_url=app.state.settings.DIRECTOR_V2_PUBLIC_API_BASE_URL, ), ) diff --git a/services/director-v2/src/simcore_service_director_v2/utils/dask.py b/services/director-v2/src/simcore_service_director_v2/utils/dask.py index 13967b0c5da..be897b9f0d6 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/dask.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/dask.py @@ -26,6 +26,7 @@ from models_library.projects import ProjectID, ProjectIDStr from models_library.projects_nodes_io import NodeID, NodeIDStr from models_library.services import ServiceKey, ServiceVersion +from models_library.services_types import ServiceRunID from models_library.users import UserID from pydantic import AnyUrl, ByteSize, TypeAdapter, ValidationError from servicelib.logging_utils import log_catch, log_context @@ -342,6 +343,7 @@ async def compute_task_envs( node_id: NodeID, node_image: Image, metadata: RunMetadataDict, + resource_tracking_run_id: ServiceRunID, ) -> ContainerEnvsDict: product_name = metadata.get("product_name", UNDEFINED_DOCKER_LABEL) task_envs = node_image.envs @@ -360,6 +362,7 @@ async def compute_task_envs( product_name=product_name, project_id=project_id, node_id=node_id, + service_run_id=resource_tracking_run_id, ) # NOTE: see https://github.com/ITISFoundation/osparc-simcore/issues/3638 # we currently do not validate as we are using illegal docker key names with underscores diff --git a/services/director-v2/src/simcore_service_director_v2/utils/osparc_variables.py b/services/director-v2/src/simcore_service_director_v2/utils/osparc_variables.py index 2715858965b..28ce84c605d 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/osparc_variables.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/osparc_variables.py @@ -25,7 +25,7 @@ def _get_or_raise(context: ContextDict) -> Any: try: return context[parameter_name] except KeyError as err: - msg = "Parameter {keyname} missing from substitution context" + msg = f"{parameter_name=} missing from substitution context" raise CaptureError(msg) from err # For context["foo"] -> return operator.methodcaller("__getitem__", keyname) diff --git a/services/director-v2/src/simcore_service_director_v2/utils/rabbitmq.py b/services/director-v2/src/simcore_service_director_v2/utils/rabbitmq.py index 70249d3c1da..6f6e1693193 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/rabbitmq.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/rabbitmq.py @@ -15,6 +15,7 @@ ) from models_library.services import ServiceKey, ServiceType, ServiceVersion from models_library.services_resources import ServiceResourcesDict +from models_library.services_types import ServiceRunID from models_library.users import UserID from models_library.wallets import WalletID from pydantic import NonNegativeFloat @@ -70,7 +71,7 @@ async def publish_service_stopped_metrics( async def publish_service_resource_tracking_started( # pylint: disable=too-many-arguments # noqa: PLR0913 rabbitmq_client: RabbitMQClient, - service_run_id: str, + service_run_id: ServiceRunID, *, wallet_id: WalletID | None, wallet_name: str | None, @@ -127,7 +128,7 @@ async def publish_service_resource_tracking_started( # pylint: disable=too-many async def publish_service_resource_tracking_stopped( rabbitmq_client: RabbitMQClient, - service_run_id: str, + service_run_id: ServiceRunID, *, simcore_platform_status: SimcorePlatformStatus, ) -> None: @@ -138,7 +139,7 @@ async def publish_service_resource_tracking_stopped( async def publish_service_resource_tracking_heartbeat( - rabbitmq_client: RabbitMQClient, service_run_id: str + rabbitmq_client: RabbitMQClient, service_run_id: ServiceRunID ) -> None: message = RabbitResourceTrackingHeartbeatMessage(service_run_id=service_run_id) await rabbitmq_client.publish(message.channel_name, message) diff --git a/services/director-v2/tests/unit/conftest.py b/services/director-v2/tests/unit/conftest.py index 7f4bb33b47c..7a6ab8e439d 100644 --- a/services/director-v2/tests/unit/conftest.py +++ b/services/director-v2/tests/unit/conftest.py @@ -22,9 +22,17 @@ from models_library.generated_models.docker_rest_api import ( ServiceSpec as DockerServiceSpec, ) +from models_library.projects import ProjectID +from models_library.projects_nodes_io import NodeID from models_library.service_settings_labels import SimcoreServiceLabels -from models_library.services import RunID, ServiceKey, ServiceKeyVersion, ServiceVersion +from models_library.services import ( + ServiceKey, + ServiceKeyVersion, + ServiceRunID, + ServiceVersion, +) from models_library.services_enums import ServiceState +from models_library.users import UserID from models_library.utils._original_fastapi_encoders import jsonable_encoder from pydantic import TypeAdapter from pytest_mock.plugin import MockerFixture @@ -71,8 +79,17 @@ def dynamic_sidecar_port() -> PortInt: @pytest.fixture -def run_id() -> RunID: - return RunID.create() +def service_run_id() -> ServiceRunID: + return ServiceRunID.get_resource_tracking_run_id_for_dynamic() + + +@pytest.fixture +def resource_tracking_run_id( + user_id: UserID, project_id: ProjectID, node_id: NodeID +) -> ServiceRunID: + return ServiceRunID.get_resource_tracking_run_id_for_computational( + user_id, project_id, node_id, iteration=42 + ) @pytest.fixture @@ -104,7 +121,7 @@ def scheduler_data_from_http_request( request_scheme: str, request_simcore_user_agent: str, can_save: bool, - run_id: RunID, + service_run_id: ServiceRunID, ) -> SchedulerData: return SchedulerData.from_http_request( service=dynamic_service_create, @@ -114,7 +131,7 @@ def scheduler_data_from_http_request( request_scheme=request_scheme, request_simcore_user_agent=request_simcore_user_agent, can_save=can_save, - run_id=run_id, + run_id=service_run_id, ) diff --git a/services/director-v2/tests/unit/test_modules_dask_client.py b/services/director-v2/tests/unit/test_modules_dask_client.py index f8e1ccd6c61..2bb2787ddc3 100644 --- a/services/director-v2/tests/unit/test_modules_dask_client.py +++ b/services/director-v2/tests/unit/test_modules_dask_client.py @@ -45,6 +45,7 @@ from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from models_library.resource_tracker import HardwareInfo +from models_library.services_types import ServiceRunID from models_library.users import UserID from pydantic import AnyUrl, ByteSize, TypeAdapter from pytest_mock.plugin import MockerFixture @@ -442,6 +443,7 @@ async def test_send_computation_task( task_labels: ContainerLabelsDict, empty_hardware_info: HardwareInfo, faker: Faker, + resource_tracking_run_id: ServiceRunID, ): _DASK_EVENT_NAME = faker.pystr() @@ -503,6 +505,7 @@ def fake_sidecar_fct( ), metadata=comp_run_metadata, hardware_info=empty_hardware_info, + resource_tracking_run_id=resource_tracking_run_id, ) assert node_id_to_job_ids assert len(node_id_to_job_ids) == 1 @@ -559,6 +562,7 @@ async def test_computation_task_is_persisted_on_dask_scheduler( mocked_storage_service_api: respx.MockRouter, comp_run_metadata: RunMetadataDict, empty_hardware_info: HardwareInfo, + resource_tracking_run_id: ServiceRunID, ): """rationale: When a task is submitted to the dask backend, a dask future is returned. @@ -594,6 +598,7 @@ def fake_sidecar_fct( remote_fct=fake_sidecar_fct, metadata=comp_run_metadata, hardware_info=empty_hardware_info, + resource_tracking_run_id=resource_tracking_run_id, ) assert published_computation_task assert len(published_computation_task) == 1 @@ -649,6 +654,7 @@ async def test_abort_computation_tasks( faker: Faker, comp_run_metadata: RunMetadataDict, empty_hardware_info: HardwareInfo, + resource_tracking_run_id: ServiceRunID, ): _DASK_EVENT_NAME = faker.pystr() @@ -687,6 +693,7 @@ def fake_remote_fct( remote_fct=fake_remote_fct, metadata=comp_run_metadata, hardware_info=empty_hardware_info, + resource_tracking_run_id=resource_tracking_run_id, ) assert published_computation_task assert len(published_computation_task) == 1 @@ -738,6 +745,7 @@ async def test_failed_task_returns_exceptions( mocked_storage_service_api: respx.MockRouter, comp_run_metadata: RunMetadataDict, empty_hardware_info: HardwareInfo, + resource_tracking_run_id: ServiceRunID, ): # NOTE: this must be inlined so that the test works, # the dask-worker must be able to import the function @@ -758,6 +766,7 @@ def fake_failing_sidecar_fct( remote_fct=fake_failing_sidecar_fct, metadata=comp_run_metadata, hardware_info=empty_hardware_info, + resource_tracking_run_id=resource_tracking_run_id, ) assert published_computation_task assert len(published_computation_task) == 1 @@ -800,6 +809,7 @@ async def test_send_computation_task_with_missing_resources_raises( mocked_storage_service_api: respx.MockRouter, comp_run_metadata: RunMetadataDict, empty_hardware_info: HardwareInfo, + resource_tracking_run_id: ServiceRunID, ): # remove the workers that can handle gpu scheduler_info = dask_client.backend.client.scheduler_info() @@ -826,6 +836,7 @@ async def test_send_computation_task_with_missing_resources_raises( remote_fct=None, metadata=comp_run_metadata, hardware_info=empty_hardware_info, + resource_tracking_run_id=resource_tracking_run_id, ) mocked_user_completed_cb.assert_not_called() @@ -844,6 +855,7 @@ async def test_send_computation_task_with_hardware_info_raises( mocked_storage_service_api: respx.MockRouter, comp_run_metadata: RunMetadataDict, hardware_info: HardwareInfo, + resource_tracking_run_id: ServiceRunID, ): # NOTE: running on the default cluster will raise missing resources with pytest.raises(MissingComputationalResourcesError): @@ -855,6 +867,7 @@ async def test_send_computation_task_with_hardware_info_raises( remote_fct=None, metadata=comp_run_metadata, hardware_info=hardware_info, + resource_tracking_run_id=resource_tracking_run_id, ) mocked_user_completed_cb.assert_not_called() @@ -872,6 +885,7 @@ async def test_too_many_resources_send_computation_task( mocked_storage_service_api: respx.MockRouter, comp_run_metadata: RunMetadataDict, empty_hardware_info: HardwareInfo, + resource_tracking_run_id: ServiceRunID, ): # create an image that needs a huge amount of CPU image = Image( @@ -895,6 +909,7 @@ async def test_too_many_resources_send_computation_task( remote_fct=None, metadata=comp_run_metadata, hardware_info=empty_hardware_info, + resource_tracking_run_id=resource_tracking_run_id, ) mocked_user_completed_cb.assert_not_called() @@ -911,6 +926,7 @@ async def test_disconnected_backend_raises_exception( mocked_storage_service_api: respx.MockRouter, comp_run_metadata: RunMetadataDict, empty_hardware_info: HardwareInfo, + resource_tracking_run_id: ServiceRunID, ): # DISCONNECT THE CLUSTER await dask_spec_local_cluster.close() # type: ignore @@ -923,6 +939,7 @@ async def test_disconnected_backend_raises_exception( remote_fct=None, metadata=comp_run_metadata, hardware_info=empty_hardware_info, + resource_tracking_run_id=resource_tracking_run_id, ) mocked_user_completed_cb.assert_not_called() @@ -942,6 +959,7 @@ async def test_changed_scheduler_raises_exception( unused_tcp_port_factory: Callable, comp_run_metadata: RunMetadataDict, empty_hardware_info: HardwareInfo, + resource_tracking_run_id: ServiceRunID, ): # change the scheduler (stop the current one and start another at the same address) scheduler_address = URL(dask_spec_local_cluster.scheduler_address) @@ -971,6 +989,7 @@ async def test_changed_scheduler_raises_exception( remote_fct=None, metadata=comp_run_metadata, hardware_info=empty_hardware_info, + resource_tracking_run_id=resource_tracking_run_id, ) mocked_user_completed_cb.assert_not_called() @@ -988,6 +1007,7 @@ async def test_get_tasks_status( fail_remote_fct: bool, comp_run_metadata: RunMetadataDict, empty_hardware_info: HardwareInfo, + resource_tracking_run_id: ServiceRunID, ): # NOTE: this must be inlined so that the test works, # the dask-worker must be able to import the function @@ -1015,6 +1035,7 @@ def fake_remote_fct( remote_fct=fake_remote_fct, metadata=comp_run_metadata, hardware_info=empty_hardware_info, + resource_tracking_run_id=resource_tracking_run_id, ) assert published_computation_task assert len(published_computation_task) == 1 @@ -1069,6 +1090,7 @@ async def test_dask_sub_handlers( fake_task_handlers: TaskHandlers, comp_run_metadata: RunMetadataDict, empty_hardware_info: HardwareInfo, + resource_tracking_run_id: ServiceRunID, ): dask_client.register_handlers(fake_task_handlers) _DASK_START_EVENT = "start" @@ -1098,6 +1120,7 @@ def fake_remote_fct( remote_fct=fake_remote_fct, metadata=comp_run_metadata, hardware_info=empty_hardware_info, + resource_tracking_run_id=resource_tracking_run_id, ) assert published_computation_task assert len(published_computation_task) == 1 @@ -1142,6 +1165,7 @@ async def test_get_cluster_details( comp_run_metadata: RunMetadataDict, empty_hardware_info: HardwareInfo, faker: Faker, + resource_tracking_run_id: ServiceRunID, ): cluster_details = await dask_client.get_cluster_details() assert cluster_details @@ -1178,6 +1202,7 @@ def fake_sidecar_fct( ), metadata=comp_run_metadata, hardware_info=empty_hardware_info, + resource_tracking_run_id=resource_tracking_run_id, ) assert published_computation_task assert len(published_computation_task) == 1 diff --git a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_volumes_resolver.py b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_volumes_resolver.py index b617c3da637..4acacd3a4e4 100644 --- a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_volumes_resolver.py +++ b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_volumes_resolver.py @@ -13,7 +13,7 @@ CHARS_IN_VOLUME_NAME_BEFORE_DIR_NAME, ) from models_library.projects import ProjectID -from models_library.services import RunID +from models_library.services import ServiceRunID from models_library.users import UserID from simcore_service_director_v2.modules.dynamic_sidecar.volumes import ( DynamicSidecarVolumesPathsResolver, @@ -36,8 +36,8 @@ def state_paths() -> list[Path]: @pytest.fixture -def run_id() -> RunID: - return RunID.create() +def service_run_id() -> ServiceRunID: + return ServiceRunID.get_resource_tracking_run_id_for_dynamic() @pytest.fixture @@ -49,7 +49,7 @@ def project_id(faker: Faker) -> ProjectID: def expected_volume_config( swarm_stack_name: str, node_uuid: UUID, - run_id: RunID, + service_run_id: ServiceRunID, project_id: ProjectID, user_id: UserID, ) -> Callable[[str, str], dict[str, Any]]: @@ -62,7 +62,7 @@ def _callable(source: str, target: str) -> dict[str, Any]: "DriverConfig": None, "Labels": { "source": source, - "run_id": f"{run_id}", + "run_id": service_run_id, "study_id": f"{project_id}", "user_id": f"{user_id}", "swarm_stack_name": swarm_stack_name, @@ -79,7 +79,7 @@ def test_expected_paths( node_uuid: UUID, state_paths: list[Path], expected_volume_config: Callable[[str, str], dict[str, Any]], - run_id: RunID, + service_run_id: ServiceRunID, project_id: ProjectID, user_id: UserID, ) -> None: @@ -87,26 +87,38 @@ def test_expected_paths( inputs_path = Path(fake.file_path(depth=3)).parent assert DynamicSidecarVolumesPathsResolver.mount_entry( - swarm_stack_name, inputs_path, node_uuid, run_id, project_id, user_id, None + swarm_stack_name, + inputs_path, + node_uuid, + service_run_id, + project_id, + user_id, + None, ) == expected_volume_config( - source=f"dyv_{run_id}_{node_uuid}_{f'{inputs_path}'.replace('/', '_')[::-1]}", + source=f"dyv_{service_run_id}_{node_uuid}_{f'{inputs_path}'.replace('/', '_')[::-1]}", target=str(Path("/dy-volumes") / inputs_path.relative_to("/")), ) outputs_path = Path(fake.file_path(depth=3)).parent assert DynamicSidecarVolumesPathsResolver.mount_entry( - swarm_stack_name, outputs_path, node_uuid, run_id, project_id, user_id, None + swarm_stack_name, + outputs_path, + node_uuid, + service_run_id, + project_id, + user_id, + None, ) == expected_volume_config( - source=f"dyv_{run_id}_{node_uuid}_{f'{outputs_path}'.replace('/', '_')[::-1]}", + source=f"dyv_{service_run_id}_{node_uuid}_{f'{outputs_path}'.replace('/', '_')[::-1]}", target=str(Path("/dy-volumes") / outputs_path.relative_to("/")), ) for path in state_paths: name_from_path = f"{path}".replace(os.sep, "_")[::-1] assert DynamicSidecarVolumesPathsResolver.mount_entry( - swarm_stack_name, path, node_uuid, run_id, project_id, user_id, None + swarm_stack_name, path, node_uuid, service_run_id, project_id, user_id, None ) == expected_volume_config( - source=f"dyv_{run_id}_{node_uuid}_{name_from_path}", + source=f"dyv_{service_run_id}_{node_uuid}_{name_from_path}", target=str(Path("/dy-volumes/") / path.relative_to("/")), ) @@ -130,7 +142,7 @@ async def test_unique_name_creation_and_removal(faker: Faker): unique_volume_name = DynamicSidecarVolumesPathsResolver.source( path=Path("/some/random/path/to/a/workspace/folder"), node_uuid=faker.uuid4(cast_to=None), - run_id=RunID.create(), + service_run_id=ServiceRunID.get_resource_tracking_run_id_for_dynamic(), ) await assert_creation_and_removal(unique_volume_name) @@ -138,20 +150,20 @@ async def test_unique_name_creation_and_removal(faker: Faker): def test_volumes_get_truncated_as_expected(faker: Faker): node_uuid = faker.uuid4(cast_to=None) - run_id = RunID.create() - assert node_uuid != run_id + service_run_id = ServiceRunID.get_resource_tracking_run_id_for_dynamic() + assert node_uuid != service_run_id unique_volume_name = DynamicSidecarVolumesPathsResolver.source( path=Path( f"/home/user/a-{'-'.join(['very' for _ in range(34)])}-long-home-path/workspace" ), node_uuid=node_uuid, - run_id=run_id, + service_run_id=service_run_id, ) # if below fails the agent will have issues please check constant_part = unique_volume_name[: CHARS_IN_VOLUME_NAME_BEFORE_DIR_NAME - 1] - assert constant_part == f"dyv_{run_id}_{node_uuid}" + assert constant_part == f"dyv_{service_run_id}_{node_uuid}" assert len(unique_volume_name) == 255 - assert f"{run_id}" in unique_volume_name + assert f"{service_run_id}" in unique_volume_name assert f"{node_uuid}" in unique_volume_name diff --git a/services/director-v2/tests/unit/test_modules_osparc_variables.py b/services/director-v2/tests/unit/test_modules_osparc_variables.py index 635904292b8..61427034a45 100644 --- a/services/director-v2/tests/unit/test_modules_osparc_variables.py +++ b/services/director-v2/tests/unit/test_modules_osparc_variables.py @@ -18,6 +18,7 @@ from fastapi import FastAPI from models_library.service_settings_labels import ComposeSpecLabelDict from models_library.services import ServiceKey, ServiceVersion +from models_library.services_types import ServiceRunID from models_library.users import UserID from models_library.utils.specs_substitution import SubstitutionValue from models_library.utils.string_substitution import OSPARC_IDENTIFIER_PREFIX @@ -48,7 +49,9 @@ def session_context(faker: Faker) -> ContextDict: return ContextDict( app=FastAPI(), - service_key=TypeAdapter(ServiceKey).validate_python("simcore/services/dynamic/foo"), + service_key=TypeAdapter(ServiceKey).validate_python( + "simcore/services/dynamic/foo" + ), service_version=TypeAdapter(ServiceVersion).validate_python("1.2.3"), compose_spec=generate_fake_docker_compose(faker), product_name=faker.word(), @@ -101,7 +104,8 @@ async def request_user_email(app: FastAPI, user_id: UserID) -> SubstitutionValue # All values extracted from the context MUST be SubstitutionValue assert { - key: TypeAdapter(SubstitutionValue).validate_python(value) for key, value in environs.items() + key: TypeAdapter(SubstitutionValue).validate_python(value) + for key, value in environs.items() } for osparc_variable_name, context_name in [ @@ -170,6 +174,7 @@ async def test_resolve_and_substitute_session_variables_in_specs( "user_role": "${OSPARC_VARIABLE_USER_ROLE}", "api_key": "${OSPARC_VARIABLE_API_KEY}", "api_secret": "${OSPARC_VARIABLE_API_SECRET}", + "service_run_id": "${OSPARC_VARIABLE_SERVICE_RUN_ID}", } print("SPECS\n", specs) @@ -180,6 +185,7 @@ async def test_resolve_and_substitute_session_variables_in_specs( product_name="a_product", project_id=faker.uuid4(cast_to=None), node_id=faker.uuid4(cast_to=None), + service_run_id=ServiceRunID("some_run_id"), ) print("REPLACED SPECS\n", replaced_specs) diff --git a/services/director-v2/tests/unit/test_utils_comp_scheduler.py b/services/director-v2/tests/unit/test_utils_comp_scheduler.py index 05c899a5e40..e589d4a933f 100644 --- a/services/director-v2/tests/unit/test_utils_comp_scheduler.py +++ b/services/director-v2/tests/unit/test_utils_comp_scheduler.py @@ -5,18 +5,13 @@ import pytest from models_library.docker import DockerGenericTag -from models_library.projects import ProjectID -from models_library.projects_nodes import NodeID from models_library.projects_state import RunningState -from models_library.users import UserID from simcore_service_director_v2.models.comp_tasks import CompTaskAtDB from simcore_service_director_v2.modules.comp_scheduler._utils import ( COMPLETED_STATES, SCHEDULED_STATES, TASK_TO_START_STATES, - Iteration, create_service_resources_from_task, - get_resource_tracking_run_id, ) @@ -50,31 +45,6 @@ def test_scheduler_knows_all_the_states(): ) == set(RunningState) -@pytest.mark.parametrize( - "user_id, project_id, node_id, iteration, expected_result", - [ - ( - 2, - ProjectID("e08356e4-eb74-49e9-b769-2c26e34c61d9"), - NodeID("a08356e4-eb74-49e9-b769-2c26e34c61d1"), - 5, - "comp_2_e08356e4-eb74-49e9-b769-2c26e34c61d9_a08356e4-eb74-49e9-b769-2c26e34c61d1_5", - ) - ], -) -def test_get_resource_tracking_run_id( - user_id: UserID, - project_id: ProjectID, - node_id: NodeID, - iteration: Iteration, - expected_result: str, -): - assert ( - get_resource_tracking_run_id(user_id, project_id, node_id, iteration) - == expected_result - ) - - @pytest.mark.parametrize( "task", [ diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py index d9559b6c75e..fb06b116c70 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py @@ -255,6 +255,7 @@ async def _return_tasks_pending(job_ids: list[str]) -> list[DaskClientTaskState] callback=mock.ANY, metadata=mock.ANY, hardware_info=mock.ANY, + resource_tracking_run_id=mock.ANY, ) for p in expected_pending_tasks ], @@ -654,6 +655,7 @@ async def _return_random_task_result(job_id) -> TaskOutputData: callback=mock.ANY, metadata=mock.ANY, hardware_info=mock.ANY, + resource_tracking_run_id=mock.ANY, ) mocked_dask_client.send_computation_tasks.reset_mock() mocked_dask_client.get_tasks_status.assert_has_calls( diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py b/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py index 99596264831..6d62f1ca952 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py @@ -24,7 +24,7 @@ SimcoreServiceLabels, SimcoreServiceSettingsLabel, ) -from models_library.services import RunID, ServiceKeyVersion +from models_library.services import ServiceKeyVersion, ServiceRunID from models_library.wallets import WalletInfo from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict from pytest_simcore.helpers.typing_env import EnvVarsDict @@ -52,9 +52,7 @@ @pytest.fixture def mock_s3_settings() -> S3Settings: - return S3Settings.model_validate( - S3Settings.model_config["json_schema_extra"]["examples"][0] - ) + return S3Settings.model_validate(S3Settings.model_json_schema()["examples"][0]) @pytest.fixture @@ -127,14 +125,12 @@ def simcore_service_labels() -> SimcoreServiceLabels: @pytest.fixture def hardware_info() -> HardwareInfo: - return HardwareInfo.model_validate( - HardwareInfo.model_config["json_schema_extra"]["examples"][0] - ) + return HardwareInfo.model_validate(HardwareInfo.model_json_schema()["examples"][0]) @pytest.fixture def expected_dynamic_sidecar_spec( - run_id: RunID, + service_run_id: ServiceRunID, osparc_product_name: str, request_simcore_user_agent: str, hardware_info: HardwareInfo, @@ -157,7 +153,7 @@ def expected_dynamic_sidecar_spec( "container_http_entry": "rt-web", "hostname": "dy-sidecar_75c7f3f4-18f9-4678-8610-54a2ade78eaa", "port": 1222, - "run_id": run_id, + "run_id": service_run_id, "dynamic_sidecar": { "containers_inspect": [], "dynamic_sidecar_id": None, @@ -187,9 +183,9 @@ def expected_dynamic_sidecar_spec( "state_exclude": ["/tmp/strip_me/*"], # noqa: S108 "state_paths": ["/tmp/save_1", "/tmp_save_2"], # noqa: S108 }, - "callbacks_mapping": CallbacksMapping.model_config[ - "json_schema_extra" - ]["examples"][3], + "callbacks_mapping": CallbacksMapping.model_json_schema()[ + "examples" + ][3], "product_name": osparc_product_name, "project_id": "dd1d04d9-d704-4f7e-8f0f-1ca60cc771fe", "proxy_service_name": "dy-proxy_75c7f3f4-18f9-4678-8610-54a2ade78eaa", @@ -197,12 +193,8 @@ def expected_dynamic_sidecar_spec( "request_scheme": "http", "request_simcore_user_agent": request_simcore_user_agent, "restart_policy": "on-inputs-downloaded", - "wallet_info": WalletInfo.model_config["json_schema_extra"][ - "examples" - ][0], - "pricing_info": PricingInfo.model_config["json_schema_extra"][ - "examples" - ][0], + "wallet_info": WalletInfo.model_json_schema()["examples"][0], + "pricing_info": PricingInfo.model_json_schema()["examples"][0], "hardware_info": hardware_info, "service_name": "dy-sidecar_75c7f3f4-18f9-4678-8610-54a2ade78eaa", "service_port": 65534, @@ -238,7 +230,7 @@ def expected_dynamic_sidecar_spec( "Env": { "DYNAMIC_SIDECAR_COMPOSE_NAMESPACE": "dy-sidecar_75c7f3f4-18f9-4678-8610-54a2ade78eaa", "DY_SIDECAR_NODE_ID": "75c7f3f4-18f9-4678-8610-54a2ade78eaa", - "DY_SIDECAR_RUN_ID": run_id, + "DY_SIDECAR_RUN_ID": service_run_id, "DY_SIDECAR_PATH_INPUTS": "/tmp/inputs", # noqa: S108 "DY_SIDECAR_PATH_OUTPUTS": "/tmp/outputs", # noqa: S108 "DY_SIDECAR_PROJECT_ID": "dd1d04d9-d704-4f7e-8f0f-1ca60cc771fe", @@ -321,7 +313,7 @@ def expected_dynamic_sidecar_spec( "Type": "bind", }, { - "Source": f"dyv_{run_id}_75c7f3f4-18f9-4678-8610-54a2ade78eaa_erots-derahs_", + "Source": f"dyv_{service_run_id}_75c7f3f4-18f9-4678-8610-54a2ade78eaa_erots-derahs_", "Target": "/dy-volumes/shared-store", "Type": "volume", "VolumeOptions": { @@ -329,8 +321,8 @@ def expected_dynamic_sidecar_spec( "Labels": { "node_uuid": "75c7f3f4-18f9-4678-8610-54a2ade78eaa", "study_id": "dd1d04d9-d704-4f7e-8f0f-1ca60cc771fe", - "run_id": run_id, - "source": f"dyv_{run_id}_75c7f3f4-18f9-4678-8610-54a2ade78eaa_erots-derahs_", + "run_id": service_run_id, + "source": f"dyv_{service_run_id}_75c7f3f4-18f9-4678-8610-54a2ade78eaa_erots-derahs_", "swarm_stack_name": "test_swarm_name", "user_id": "234", }, @@ -338,14 +330,14 @@ def expected_dynamic_sidecar_spec( }, { "Target": "/dy-volumes/tmp/inputs", - "Source": f"dyv_{run_id}_75c7f3f4-18f9-4678-8610-54a2ade78eaa_stupni_pmt_", + "Source": f"dyv_{service_run_id}_75c7f3f4-18f9-4678-8610-54a2ade78eaa_stupni_pmt_", "Type": "volume", "VolumeOptions": { "Labels": { "node_uuid": "75c7f3f4-18f9-4678-8610-54a2ade78eaa", "study_id": "dd1d04d9-d704-4f7e-8f0f-1ca60cc771fe", - "run_id": run_id, - "source": f"dyv_{run_id}_75c7f3f4-18f9-4678-8610-54a2ade78eaa_stupni_pmt_", + "run_id": service_run_id, + "source": f"dyv_{service_run_id}_75c7f3f4-18f9-4678-8610-54a2ade78eaa_stupni_pmt_", "swarm_stack_name": "test_swarm_name", "user_id": "234", }, @@ -353,14 +345,14 @@ def expected_dynamic_sidecar_spec( }, { "Target": "/dy-volumes/tmp/outputs", - "Source": f"dyv_{run_id}_75c7f3f4-18f9-4678-8610-54a2ade78eaa_stuptuo_pmt_", + "Source": f"dyv_{service_run_id}_75c7f3f4-18f9-4678-8610-54a2ade78eaa_stuptuo_pmt_", "Type": "volume", "VolumeOptions": { "Labels": { "node_uuid": "75c7f3f4-18f9-4678-8610-54a2ade78eaa", "study_id": "dd1d04d9-d704-4f7e-8f0f-1ca60cc771fe", - "run_id": run_id, - "source": f"dyv_{run_id}_75c7f3f4-18f9-4678-8610-54a2ade78eaa_stuptuo_pmt_", + "run_id": service_run_id, + "source": f"dyv_{service_run_id}_75c7f3f4-18f9-4678-8610-54a2ade78eaa_stuptuo_pmt_", "swarm_stack_name": "test_swarm_name", "user_id": "234", }, @@ -368,14 +360,14 @@ def expected_dynamic_sidecar_spec( }, { "Target": "/dy-volumes/tmp/save_1", - "Source": f"dyv_{run_id}_75c7f3f4-18f9-4678-8610-54a2ade78eaa_1_evas_pmt_", + "Source": f"dyv_{service_run_id}_75c7f3f4-18f9-4678-8610-54a2ade78eaa_1_evas_pmt_", "Type": "volume", "VolumeOptions": { "Labels": { "node_uuid": "75c7f3f4-18f9-4678-8610-54a2ade78eaa", "study_id": "dd1d04d9-d704-4f7e-8f0f-1ca60cc771fe", - "run_id": run_id, - "source": f"dyv_{run_id}_75c7f3f4-18f9-4678-8610-54a2ade78eaa_1_evas_pmt_", + "run_id": service_run_id, + "source": f"dyv_{service_run_id}_75c7f3f4-18f9-4678-8610-54a2ade78eaa_1_evas_pmt_", "swarm_stack_name": "test_swarm_name", "user_id": "234", }, @@ -383,14 +375,14 @@ def expected_dynamic_sidecar_spec( }, { "Target": "/dy-volumes/tmp_save_2", - "Source": f"dyv_{run_id}_75c7f3f4-18f9-4678-8610-54a2ade78eaa_2_evas_pmt_", + "Source": f"dyv_{service_run_id}_75c7f3f4-18f9-4678-8610-54a2ade78eaa_2_evas_pmt_", "Type": "volume", "VolumeOptions": { "Labels": { "node_uuid": "75c7f3f4-18f9-4678-8610-54a2ade78eaa", "study_id": "dd1d04d9-d704-4f7e-8f0f-1ca60cc771fe", - "run_id": run_id, - "source": f"dyv_{run_id}_75c7f3f4-18f9-4678-8610-54a2ade78eaa_2_evas_pmt_", + "run_id": service_run_id, + "source": f"dyv_{service_run_id}_75c7f3f4-18f9-4678-8610-54a2ade78eaa_2_evas_pmt_", "swarm_stack_name": "test_swarm_name", "user_id": "234", }, diff --git a/services/director-v2/tests/unit/with_dbs/test_utils_dask.py b/services/director-v2/tests/unit/with_dbs/test_utils_dask.py index d02836de9e2..7fcaeca385d 100644 --- a/services/director-v2/tests/unit/with_dbs/test_utils_dask.py +++ b/services/director-v2/tests/unit/with_dbs/test_utils_dask.py @@ -34,6 +34,7 @@ from models_library.docker import to_simcore_runtime_docker_label_key from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID, SimCoreFileLink, SimcoreS3FileID +from models_library.services import ServiceRunID from models_library.users import UserID from pydantic import ByteSize, TypeAdapter from pydantic.networks import AnyUrl @@ -647,6 +648,7 @@ async def test_compute_task_envs( run_metadata: RunMetadataDict, input_task_envs: ContainerEnvsDict, expected_computed_task_envs: ContainerEnvsDict, + resource_tracking_run_id: ServiceRunID, ): sleeper_task: CompTaskAtDB = published_project.tasks[1] sleeper_task.image.envs = input_task_envs @@ -658,5 +660,6 @@ async def test_compute_task_envs( node_id=sleeper_task.node_id, node_image=sleeper_task.image, metadata=run_metadata, + resource_tracking_run_id=resource_tracking_run_id, ) assert task_envs == expected_computed_task_envs diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_utils.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_utils.py index de02572e238..eee04ad6be3 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_utils.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_utils.py @@ -10,7 +10,7 @@ from models_library.docker import DockerGenericTag from models_library.generated_models.docker_rest_api import ContainerState from models_library.generated_models.docker_rest_api import Status2 as ContainerStatus -from models_library.services import RunID +from models_library.services import ServiceRunID from pydantic import PositiveInt from servicelib.utils import logged_gather from starlette import status as http_status @@ -40,9 +40,11 @@ async def docker_client() -> AsyncGenerator[aiodocker.Docker, None]: await docker.close() -async def get_volume_by_label(label: str, run_id: RunID) -> dict[str, Any]: +async def get_volume_by_label( + label: str, service_run_id: ServiceRunID +) -> dict[str, Any]: async with docker_client() as docker: - filters = {"label": [f"source={label}", f"run_id={run_id}"]} + filters = {"label": [f"source={label}", f"run_id={service_run_id}"]} params = {"filters": clean_filters(filters)} data = await docker._query_json( # pylint: disable=protected-access # noqa: SLF001 "volumes", method="GET", params=params @@ -53,7 +55,7 @@ async def get_volume_by_label(label: str, run_id: RunID) -> dict[str, Any]: raise VolumeNotFoundError( volume_count=len(volumes), source_label=label, - run_id=run_id, + service_run_id=service_run_id, volume_names=" ".join(v.get("Name", "UNKNOWN") for v in volumes), status_code=http_status.HTTP_404_NOT_FOUND, ) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/errors.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/errors.py index b9a449ecb36..fc67b7072f8 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/errors.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/errors.py @@ -8,7 +8,7 @@ class BaseDynamicSidecarError(OsparcErrorMixin, Exception): class VolumeNotFoundError(BaseDynamicSidecarError): msg_template = ( "Expected 1 got {volume_count} volumes labels with " - "source_label={source_label}, run_id={run_id}: Found {volume_names}" + "source_label={source_label}, service_run_id={service_run_id}: Found {volume_names}" ) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/settings.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/settings.py index 795015e1520..1f42b5b848e 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/settings.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/settings.py @@ -10,7 +10,7 @@ from models_library.products import ProductName from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID -from models_library.services import DynamicServiceKey, RunID, ServiceVersion +from models_library.services import DynamicServiceKey, ServiceRunID, ServiceVersion from models_library.users import UserID from pydantic import ( AliasChoices, @@ -159,7 +159,7 @@ class ApplicationSettings(BaseApplicationSettings, MixinLoggingSettings): DY_SIDECAR_USER_ID: UserID DY_SIDECAR_PROJECT_ID: ProjectID DY_SIDECAR_NODE_ID: NodeID - DY_SIDECAR_RUN_ID: RunID + DY_SIDECAR_RUN_ID: ServiceRunID DY_SIDECAR_USER_SERVICES_HAVE_INTERNET_ACCESS: bool DY_SIDECAR_SERVICE_KEY: DynamicServiceKey | None = None diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/mounted_fs.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/mounted_fs.py index b07bfd87bc5..78ddbf41199 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/mounted_fs.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/mounted_fs.py @@ -5,7 +5,7 @@ from fastapi import FastAPI from models_library.projects_nodes_io import NodeID -from models_library.services import RunID +from models_library.services import ServiceRunID from servicelib.docker_constants import PREFIX_DYNAMIC_SIDECAR_VOLUMES from ..core.docker_utils import get_volume_by_label @@ -36,7 +36,7 @@ class MountedVolumes: def __init__( self, - run_id: RunID, + service_run_id: ServiceRunID, node_id: NodeID, inputs_path: Path, outputs_path: Path, @@ -46,7 +46,7 @@ def __init__( compose_namespace: str, dy_volumes: Path, ) -> None: - self.run_id: RunID = run_id + self.service_run_id: ServiceRunID = service_run_id self.node_id: NodeID = node_id self.inputs_path: Path = inputs_path self.outputs_path: Path = outputs_path @@ -62,14 +62,14 @@ def __init__( def volume_name_inputs(self) -> str: """Same name as the namespace, to easily track components""" return ( - f"{PREFIX_DYNAMIC_SIDECAR_VOLUMES}_{self.run_id}_{self.node_id}" + f"{PREFIX_DYNAMIC_SIDECAR_VOLUMES}_{self.service_run_id}_{self.node_id}" f"_{_name_from_full_path(self.inputs_path)[::-1]}" ) @cached_property def volume_name_outputs(self) -> str: return ( - f"{PREFIX_DYNAMIC_SIDECAR_VOLUMES}_{self.run_id}_{self.node_id}" + f"{PREFIX_DYNAMIC_SIDECAR_VOLUMES}_{self.service_run_id}_{self.node_id}" f"_{_name_from_full_path(self.outputs_path)[::-1]}" ) @@ -78,14 +78,14 @@ def volume_user_preferences(self) -> str | None: if self.user_preferences_path is None: return None return ( - f"{PREFIX_DYNAMIC_SIDECAR_VOLUMES}_{self.run_id}_{self.node_id}" + f"{PREFIX_DYNAMIC_SIDECAR_VOLUMES}_{self.service_run_id}_{self.node_id}" f"_{_name_from_full_path(self.user_preferences_path)[::-1]}" ) def volume_name_state_paths(self) -> Generator[str, None, None]: for state_path in self.state_paths: yield ( - f"{PREFIX_DYNAMIC_SIDECAR_VOLUMES}_{self.run_id}_{self.node_id}" + f"{PREFIX_DYNAMIC_SIDECAR_VOLUMES}_{self.service_run_id}_{self.node_id}" f"_{_name_from_full_path(state_path)[::-1]}" ) @@ -116,39 +116,45 @@ def _ensure_directories(self) -> None: _ensure_path(path) @staticmethod - async def _get_bind_path_from_label(label: str, run_id: RunID) -> Path: - volume_details = await get_volume_by_label(label=label, run_id=run_id) + async def _get_bind_path_from_label( + label: str, service_run_id: ServiceRunID + ) -> Path: + volume_details = await get_volume_by_label( + label=label, service_run_id=service_run_id + ) return Path(volume_details["Mountpoint"]) - async def get_inputs_docker_volume(self, run_id: RunID) -> str: + async def get_inputs_docker_volume(self, service_run_id: ServiceRunID) -> str: bind_path: Path = await self._get_bind_path_from_label( - self.volume_name_inputs, run_id + self.volume_name_inputs, service_run_id ) return f"{bind_path}:{self.inputs_path}" - async def get_outputs_docker_volume(self, run_id: RunID) -> str: + async def get_outputs_docker_volume(self, service_run_id: ServiceRunID) -> str: bind_path: Path = await self._get_bind_path_from_label( - self.volume_name_outputs, run_id + self.volume_name_outputs, service_run_id ) return f"{bind_path}:{self.outputs_path}" - async def get_user_preferences_path_volume(self, run_id: RunID) -> str | None: + async def get_user_preferences_path_volume( + self, service_run_id: ServiceRunID + ) -> str | None: if self.volume_user_preferences is None: return None bind_path: Path = await self._get_bind_path_from_label( - self.volume_user_preferences, run_id + self.volume_user_preferences, service_run_id ) return f"{bind_path}:{self.user_preferences_path}" async def iter_state_paths_to_docker_volumes( - self, run_id: RunID + self, service_run_id: ServiceRunID ) -> AsyncGenerator[str, None]: for volume_state_path, state_path in zip( self.volume_name_state_paths(), self.state_paths, strict=True ): bind_path: Path = await self._get_bind_path_from_label( - volume_state_path, run_id + volume_state_path, service_run_id ) yield f"{bind_path}:{state_path}" @@ -157,7 +163,7 @@ def setup_mounted_fs(app: FastAPI) -> MountedVolumes: settings: ApplicationSettings = app.state.settings app.state.mounted_volumes = MountedVolumes( - run_id=settings.DY_SIDECAR_RUN_ID, + service_run_id=settings.DY_SIDECAR_RUN_ID, node_id=settings.DY_SIDECAR_NODE_ID, inputs_path=settings.DY_SIDECAR_PATH_INPUTS, outputs_path=settings.DY_SIDECAR_PATH_OUTPUTS, diff --git a/services/dynamic-sidecar/tests/conftest.py b/services/dynamic-sidecar/tests/conftest.py index 62b93daa25c..5ad10622acd 100644 --- a/services/dynamic-sidecar/tests/conftest.py +++ b/services/dynamic-sidecar/tests/conftest.py @@ -18,7 +18,7 @@ from faker import Faker from models_library.projects import ProjectID from models_library.projects_nodes import NodeID -from models_library.services import RunID +from models_library.services import ServiceRunID from models_library.services_creation import CreateServiceMetricsAdditionalParams from models_library.users import UserID from pydantic import TypeAdapter @@ -124,8 +124,8 @@ def node_id(faker: Faker) -> NodeID: @pytest.fixture -def run_id() -> RunID: - return RunID.create() +def service_run_id() -> ServiceRunID: + return ServiceRunID.get_resource_tracking_run_id_for_dynamic() @pytest.fixture @@ -173,7 +173,7 @@ def base_mock_envs( state_paths_dirs: list[Path], state_exclude_dirs: list[Path], node_id: NodeID, - run_id: RunID, + service_run_id: ServiceRunID, ensure_shared_store_dir: None, ) -> EnvVarsDict: return { @@ -184,7 +184,7 @@ def base_mock_envs( "DYNAMIC_SIDECAR_SHARED_STORE_DIR": f"{shared_store_dir}", # envs on container "DYNAMIC_SIDECAR_COMPOSE_NAMESPACE": compose_namespace, - "DY_SIDECAR_RUN_ID": f"{run_id}", + "DY_SIDECAR_RUN_ID": service_run_id, "DY_SIDECAR_NODE_ID": f"{node_id}", "DY_SIDECAR_PATH_INPUTS": f"{inputs_dir}", "DY_SIDECAR_PATH_OUTPUTS": f"{outputs_dir}", @@ -216,7 +216,7 @@ def mock_environment( state_paths_dirs: list[Path], state_exclude_dirs: list[Path], node_id: NodeID, - run_id: RunID, + service_run_id: ServiceRunID, inputs_dir: Path, compose_namespace: str, outputs_dir: Path, @@ -242,7 +242,7 @@ def mock_environment( "DY_SIDECAR_PATH_INPUTS": f"{inputs_dir}", "DY_SIDECAR_PATH_OUTPUTS": f"{outputs_dir}", "DY_SIDECAR_PROJECT_ID": f"{project_id}", - "DY_SIDECAR_RUN_ID": run_id, + "DY_SIDECAR_RUN_ID": service_run_id, "DY_SIDECAR_STATE_EXCLUDE": json_dumps(state_exclude_dirs), "DY_SIDECAR_STATE_PATHS": json_dumps(state_paths_dirs), "DY_SIDECAR_USER_ID": f"{user_id}", diff --git a/services/dynamic-sidecar/tests/unit/test_core_docker_utils.py b/services/dynamic-sidecar/tests/unit/test_core_docker_utils.py index a7d2254425c..bf647faa5e4 100644 --- a/services/dynamic-sidecar/tests/unit/test_core_docker_utils.py +++ b/services/dynamic-sidecar/tests/unit/test_core_docker_utils.py @@ -10,7 +10,7 @@ from aiodocker.containers import DockerContainer from faker import Faker from models_library.generated_models.docker_rest_api import ContainerState -from models_library.services import RunID +from models_library.services import ServiceRunID from pydantic import PositiveInt from simcore_service_dynamic_sidecar.core.docker_utils import ( _get_containers_inspect_from_names, @@ -28,17 +28,19 @@ def volume_name() -> str: @pytest.fixture -def run_id() -> RunID: - return RunID.create() +def service_run_id() -> ServiceRunID: + return ServiceRunID.get_resource_tracking_run_id_for_dynamic() @pytest.fixture -async def volume_with_label(volume_name: str, run_id: RunID) -> AsyncIterable[None]: +async def volume_with_label( + volume_name: str, service_run_id: ServiceRunID +) -> AsyncIterable[None]: async with aiodocker.Docker() as docker_client: volume = await docker_client.volumes.create( { "Name": "test_volume_name_1", - "Labels": {"source": volume_name, "run_id": run_id}, + "Labels": {"source": volume_name, "run_id": service_run_id}, } ) @@ -77,17 +79,17 @@ async def started_services(container_names: list[str]) -> AsyncIterator[None]: async def test_volume_with_label( - volume_with_label: None, volume_name: str, run_id: RunID + volume_with_label: None, volume_name: str, service_run_id: ServiceRunID ) -> None: - assert await get_volume_by_label(volume_name, run_id) + assert await get_volume_by_label(volume_name, service_run_id) -async def test_volume_label_missing(run_id: RunID) -> None: +async def test_volume_label_missing(service_run_id: ServiceRunID) -> None: with pytest.raises(VolumeNotFoundError) as exc_info: - await get_volume_by_label("not_exist", run_id) + await get_volume_by_label("not_exist", service_run_id) error_msg = f"{exc_info.value}" - assert run_id in error_msg + assert service_run_id in error_msg assert "not_exist" in error_msg diff --git a/services/dynamic-sidecar/tests/unit/test_core_errors.py b/services/dynamic-sidecar/tests/unit/test_core_errors.py index 7b112878c9c..6b27efa3fcd 100644 --- a/services/dynamic-sidecar/tests/unit/test_core_errors.py +++ b/services/dynamic-sidecar/tests/unit/test_core_errors.py @@ -29,7 +29,7 @@ def test_legacy_interface_volume_not_found_error(): raise VolumeNotFoundError( # noqa: TRY301 volume_count=len(volumes), source_label="some", - run_id="run_id", + service_run_id="service_run_id", volume_names=volume_names, status_code=status.HTTP_404_NOT_FOUND, ) @@ -37,6 +37,6 @@ def test_legacy_interface_volume_not_found_error(): print(e) assert ( # noqa: PT017 e.message - == "Expected 1 got 2 volumes labels with source_label=some, run_id=run_id: Found UNKNOWN a_volume" + == "Expected 1 got 2 volumes labels with source_label=some, service_run_id=service_run_id: Found UNKNOWN a_volume" ) assert e.status_code == status.HTTP_404_NOT_FOUND # noqa: PT017 diff --git a/services/dynamic-sidecar/tests/unit/test_core_validation.py b/services/dynamic-sidecar/tests/unit/test_core_validation.py index 886f729dd30..f9af97453bd 100644 --- a/services/dynamic-sidecar/tests/unit/test_core_validation.py +++ b/services/dynamic-sidecar/tests/unit/test_core_validation.py @@ -7,7 +7,7 @@ import pytest from fastapi import FastAPI from models_library.projects_nodes_io import NodeID -from models_library.services_types import RunID +from models_library.services_types import ServiceRunID from pytest_mock import MockerFixture from servicelib.docker_constants import DEFAULT_USER_SERVICES_NETWORK_NAME from simcore_service_dynamic_sidecar.core.validation import ( @@ -156,7 +156,7 @@ def no_internet_spec(project_tests_dir: Path) -> str: @pytest.fixture def fake_mounted_volumes() -> MountedVolumes: return MountedVolumes( - run_id=RunID.create(), + service_run_id=ServiceRunID.get_resource_tracking_run_id_for_dynamic(), node_id=NodeID("a019b83f-7cce-46bf-90cf-d02f7f0f089a"), inputs_path=Path("/"), outputs_path=Path("/"), diff --git a/services/dynamic-sidecar/tests/unit/test_modules_mounted_fs.py b/services/dynamic-sidecar/tests/unit/test_modules_mounted_fs.py index d177b1aecf5..ac3114fc51b 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_mounted_fs.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_mounted_fs.py @@ -9,7 +9,7 @@ from aiodocker.volumes import DockerVolume from fastapi import FastAPI from models_library.projects_nodes_io import NodeID -from models_library.services import RunID +from models_library.services import ServiceRunID from simcore_service_dynamic_sidecar.core.application import AppState from simcore_service_dynamic_sidecar.models.shared_store import SharedStore from simcore_service_dynamic_sidecar.modules.mounted_fs import ( @@ -56,7 +56,7 @@ async def test_expected_paths_and_volumes( inputs_dir: Path, outputs_dir: Path, state_paths_dirs: list[Path], - run_id: RunID, + service_run_id: ServiceRunID, node_id: NodeID, ): assert ( @@ -65,7 +65,7 @@ async def test_expected_paths_and_volumes( { x async for x in mounted_volumes.iter_state_paths_to_docker_volumes( - run_id + service_run_id ) } ) @@ -89,15 +89,16 @@ async def test_expected_paths_and_volumes( # check volume mount point assert ( mounted_volumes.volume_name_outputs - == f"dyv_{run_id}_{node_id}_{_replace_slashes(outputs_dir)[::-1]}" + == f"dyv_{service_run_id}_{node_id}_{_replace_slashes(outputs_dir)[::-1]}" ) assert ( mounted_volumes.volume_name_inputs - == f"dyv_{run_id}_{node_id}_{_replace_slashes(inputs_dir)[::-1]}" + == f"dyv_{service_run_id}_{node_id}_{_replace_slashes(inputs_dir)[::-1]}" ) assert set(mounted_volumes.volume_name_state_paths()) == { - f"dyv_{run_id}_{node_id}_{_replace_slashes(x)[::-1]}" for x in state_paths_dirs + f"dyv_{service_run_id}_{node_id}_{_replace_slashes(x)[::-1]}" + for x in state_paths_dirs } def _get_container_mount(mount_path: str) -> str: @@ -105,15 +106,21 @@ def _get_container_mount(mount_path: str) -> str: # check docker_volume assert ( - _get_container_mount(await mounted_volumes.get_inputs_docker_volume(run_id)) + _get_container_mount( + await mounted_volumes.get_inputs_docker_volume(service_run_id) + ) == f"{mounted_volumes.inputs_path}" ) assert ( - _get_container_mount(await mounted_volumes.get_outputs_docker_volume(run_id)) + _get_container_mount( + await mounted_volumes.get_outputs_docker_volume(service_run_id) + ) == f"{mounted_volumes.outputs_path}" ) assert { _get_container_mount(x) - async for x in mounted_volumes.iter_state_paths_to_docker_volumes(run_id) + async for x in mounted_volumes.iter_state_paths_to_docker_volumes( + service_run_id + ) } == {f"{state_path}" for state_path in state_paths_dirs} diff --git a/services/dynamic-sidecar/tests/unit/test_modules_outputs_manager.py b/services/dynamic-sidecar/tests/unit/test_modules_outputs_manager.py index a38658f222b..f4e5c2dd18c 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_outputs_manager.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_outputs_manager.py @@ -14,7 +14,7 @@ from async_asgi_testclient import TestClient from faker import Faker from fastapi import FastAPI -from models_library.services import RunID +from models_library.services import ServiceRunID from pydantic import PositiveFloat from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict @@ -371,7 +371,7 @@ async def test_regression_io_log_redirect_cb( monkeypatch.setenv("RABBIT_SECURE", "false") mounted_volumes = MountedVolumes( - run_id=RunID.create(), + service_run_id=ServiceRunID.get_resource_tracking_run_id_for_dynamic(), node_id=faker.uuid4(cast_to=None), inputs_path=Path("/"), outputs_path=Path("/"), diff --git a/services/dynamic-sidecar/tests/unit/test_modules_outputs_watcher.py b/services/dynamic-sidecar/tests/unit/test_modules_outputs_watcher.py index ffa4dfbef45..eeea009cc32 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_outputs_watcher.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_outputs_watcher.py @@ -16,7 +16,7 @@ import pytest from aiofiles import os from faker import Faker -from models_library.services import RunID +from models_library.services import ServiceRunID from pydantic import ( ByteSize, NonNegativeFloat, @@ -63,7 +63,7 @@ @pytest.fixture def mounted_volumes(faker: Faker, tmp_path: Path) -> Iterator[MountedVolumes]: mounted_volumes = MountedVolumes( - run_id=RunID.create(), + service_run_id=ServiceRunID.get_resource_tracking_run_id_for_dynamic(), node_id=faker.uuid4(cast_to=None), inputs_path=tmp_path / "inputs", outputs_path=tmp_path / "outputs", diff --git a/services/dynamic-sidecar/tests/unit/test_modules_system_monitor__disk_usage.py b/services/dynamic-sidecar/tests/unit/test_modules_system_monitor__disk_usage.py index 06270e171ca..5cac0f59934 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_system_monitor__disk_usage.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_system_monitor__disk_usage.py @@ -16,7 +16,7 @@ MountPathCategory, ) from models_library.projects_nodes_io import NodeID -from models_library.services_types import RunID +from models_library.services_types import ServiceRunID from models_library.users import UserID from psutil._common import sdiskusage from pydantic import ByteSize, TypeAdapter @@ -42,7 +42,7 @@ def _( inputs: Path, outputs: Path, states: list[Path] ) -> dict[MountPathCategory, set[Path]]: mounted_volumes = MountedVolumes( - run_id=RunID.create(), + service_run_id=ServiceRunID.get_resource_tracking_run_id_for_dynamic(), node_id=node_id, inputs_path=dy_volumes / inputs, outputs_path=dy_volumes / outputs, diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/api/rpc/_licensed_items_checkouts.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/api/rpc/_licensed_items_checkouts.py index 19ff9847374..c17ff34655b 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/api/rpc/_licensed_items_checkouts.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/api/rpc/_licensed_items_checkouts.py @@ -5,11 +5,11 @@ ) from models_library.licensed_items import LicensedItemID from models_library.products import ProductName -from models_library.resource_tracker import ServiceRunId from models_library.resource_tracker_licensed_items_checkouts import ( LicensedItemCheckoutID, ) from models_library.rest_ordering import OrderBy +from models_library.services_types import ServiceRunID from models_library.users import UserID from models_library.wallets import WalletID from servicelib.rabbitmq import RPCRouter @@ -64,7 +64,7 @@ async def checkout_licensed_item( wallet_id: WalletID, product_name: ProductName, num_of_seats: int, - service_run_id: ServiceRunId, + service_run_id: ServiceRunID, user_id: UserID, user_email: str, ) -> LicensedItemCheckoutGet: diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/models/credit_transactions.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/models/credit_transactions.py index b9fd942fee0..ac461b37c8c 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/models/credit_transactions.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/models/credit_transactions.py @@ -9,11 +9,11 @@ PricingPlanId, PricingUnitCostId, PricingUnitId, - ServiceRunId, ) from models_library.resource_tracker_licensed_items_purchases import ( LicensedItemPurchaseID, ) +from models_library.services_types import ServiceRunID from models_library.users import UserID from models_library.wallets import WalletID from pydantic import BaseModel, ConfigDict @@ -31,7 +31,7 @@ class CreditTransactionCreate(BaseModel): osparc_credits: Decimal transaction_status: CreditTransactionStatus transaction_classification: CreditClassification - service_run_id: ServiceRunId | None + service_run_id: ServiceRunID | None payment_transaction_id: str | None created_at: datetime last_heartbeat_at: datetime @@ -39,13 +39,13 @@ class CreditTransactionCreate(BaseModel): class CreditTransactionCreditsUpdate(BaseModel): - service_run_id: ServiceRunId + service_run_id: ServiceRunID osparc_credits: Decimal last_heartbeat_at: datetime class CreditTransactionCreditsAndStatusUpdate(BaseModel): - service_run_id: ServiceRunId + service_run_id: ServiceRunID osparc_credits: Decimal transaction_status: CreditTransactionStatus @@ -63,7 +63,7 @@ class CreditTransactionDB(BaseModel): osparc_credits: Decimal transaction_status: CreditTransactionStatus transaction_classification: CreditClassification - service_run_id: ServiceRunId | None + service_run_id: ServiceRunID | None payment_transaction_id: str | None created: datetime last_heartbeat_at: datetime diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/models/licensed_items_checkouts.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/models/licensed_items_checkouts.py index 30ec170a9ec..774e4505230 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/models/licensed_items_checkouts.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/models/licensed_items_checkouts.py @@ -2,10 +2,10 @@ from models_library.licensed_items import LicensedItemID from models_library.products import ProductName -from models_library.resource_tracker import ServiceRunId from models_library.resource_tracker_licensed_items_checkouts import ( LicensedItemCheckoutID, ) +from models_library.services_types import ServiceRunID from models_library.users import UserID from models_library.wallets import WalletID from pydantic import BaseModel, ConfigDict @@ -18,7 +18,7 @@ class LicensedItemCheckoutDB(BaseModel): user_id: UserID user_email: str product_name: ProductName - service_run_id: ServiceRunId + service_run_id: ServiceRunID started_at: datetime stopped_at: datetime | None num_of_seats: int @@ -33,7 +33,7 @@ class CreateLicensedItemCheckoutDB(BaseModel): user_id: UserID user_email: str product_name: ProductName - service_run_id: ServiceRunId + service_run_id: ServiceRunID started_at: datetime num_of_seats: int diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/models/service_runs.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/models/service_runs.py index f78662defef..638a0bcb918 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/models/service_runs.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/models/service_runs.py @@ -10,10 +10,10 @@ PricingUnitCostId, PricingUnitId, ResourceTrackerServiceType, - ServiceRunId, ServiceRunStatus, ) from models_library.services import ServiceKey, ServiceVersion +from models_library.services_types import ServiceRunID from models_library.users import UserID from models_library.wallets import WalletID from pydantic import BaseModel, ConfigDict, NonNegativeInt @@ -21,7 +21,7 @@ class ServiceRunCreate(BaseModel): product_name: ProductName - service_run_id: ServiceRunId + service_run_id: ServiceRunID wallet_id: WalletID | None wallet_name: str | None pricing_plan_id: PricingPlanId | None @@ -51,12 +51,12 @@ class ServiceRunCreate(BaseModel): class ServiceRunLastHeartbeatUpdate(BaseModel): - service_run_id: ServiceRunId + service_run_id: ServiceRunID last_heartbeat_at: datetime class ServiceRunStoppedAtUpdate(BaseModel): - service_run_id: ServiceRunId + service_run_id: ServiceRunID stopped_at: datetime service_run_status: ServiceRunStatus service_run_status_msg: str | None @@ -64,7 +64,7 @@ class ServiceRunStoppedAtUpdate(BaseModel): class ServiceRunDB(BaseModel): product_name: ProductName - service_run_id: ServiceRunId + service_run_id: ServiceRunID wallet_id: WalletID | None wallet_name: str | None pricing_plan_id: PricingPlanId | None @@ -113,7 +113,7 @@ class OsparcCreditsAggregatedByServiceKeyDB(BaseModel): class ServiceRunForCheckDB(BaseModel): - service_run_id: ServiceRunId + service_run_id: ServiceRunID last_heartbeat_at: datetime missed_heartbeat_counter: NonNegativeInt modified: datetime diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check.py index fba9332502e..70abcb8f5a6 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check.py @@ -6,9 +6,9 @@ from models_library.resource_tracker import ( CreditTransactionStatus, ResourceTrackerServiceType, - ServiceRunId, ServiceRunStatus, ) +from models_library.services_types import ServiceRunID from pydantic import NonNegativeInt, PositiveInt from sqlalchemy.ext.asyncio import AsyncEngine @@ -28,7 +28,7 @@ async def _check_service_heartbeat( base_start_timestamp: datetime, resource_usage_tracker_missed_heartbeat_interval: timedelta, resource_usage_tracker_missed_heartbeat_counter_fail: NonNegativeInt, - service_run_id: ServiceRunId, + service_run_id: ServiceRunID, last_heartbeat_at: datetime, missed_heartbeat_counter: NonNegativeInt, modified_at: datetime, @@ -74,7 +74,7 @@ async def _check_service_heartbeat( async def _close_unhealthy_service( db_engine: AsyncEngine, - service_run_id: ServiceRunId, + service_run_id: ServiceRunID, base_start_timestamp: datetime, ): # 1. Close the service_run diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/licensed_items_checkouts.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/licensed_items_checkouts.py index 4f2446dfb14..753ea3f638f 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/licensed_items_checkouts.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/licensed_items_checkouts.py @@ -8,11 +8,12 @@ ) from models_library.licensed_items import LicensedItemID from models_library.products import ProductName -from models_library.resource_tracker import ServiceRunId, ServiceRunStatus +from models_library.resource_tracker import ServiceRunStatus from models_library.resource_tracker_licensed_items_checkouts import ( LicensedItemCheckoutID, ) from models_library.rest_ordering import OrderBy +from models_library.services_types import ServiceRunID from models_library.users import UserID from models_library.wallets import WalletID from servicelib.rabbitmq.rpc_interfaces.resource_usage_tracker.errors import ( @@ -104,7 +105,7 @@ async def checkout_licensed_item( wallet_id: WalletID, product_name: ProductName, num_of_seats: int, - service_run_id: ServiceRunId, + service_run_id: ServiceRunID, user_id: UserID, user_email: str, ) -> LicensedItemCheckoutGet: diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/modules/db/service_runs_db.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/modules/db/service_runs_db.py index e7e1ace3ff8..c1bf23df530 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/modules/db/service_runs_db.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/modules/db/service_runs_db.py @@ -9,10 +9,10 @@ from models_library.resource_tracker import ( CreditClassification, CreditTransactionStatus, - ServiceRunId, ServiceRunStatus, ) from models_library.rest_ordering import OrderBy, OrderDirection +from models_library.services_types import ServiceRunID from models_library.users import UserID from models_library.wallets import WalletID from pydantic import PositiveInt @@ -46,7 +46,7 @@ async def create_service_run( connection: AsyncConnection | None = None, *, data: ServiceRunCreate, -) -> ServiceRunId: +) -> ServiceRunID: async with transaction_context(engine, connection) as conn: insert_stmt = ( resource_tracker_service_runs.insert() @@ -88,7 +88,7 @@ async def create_service_run( row = result.first() if row is None: raise ServiceRunNotCreatedDBError(data=data) - return cast(ServiceRunId, row[0]) + return cast(ServiceRunID, row[0]) async def update_service_run_last_heartbeat( @@ -160,7 +160,7 @@ async def get_service_run_by_id( engine: AsyncEngine, connection: AsyncConnection | None = None, *, - service_run_id: ServiceRunId, + service_run_id: ServiceRunID, ) -> ServiceRunDB | None: async with transaction_context(engine, connection) as conn: stmt = sa.select(resource_tracker_service_runs).where( @@ -589,7 +589,7 @@ async def update_service_missed_heartbeat_counter( engine: AsyncEngine, connection: AsyncConnection | None = None, *, - service_run_id: ServiceRunId, + service_run_id: ServiceRunID, last_heartbeat_at: datetime, missed_heartbeat_counter: int, ) -> ServiceRunDB | None: diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/utils.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/utils.py index 6047ac2e904..2556322000e 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/utils.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/utils.py @@ -14,7 +14,8 @@ WalletCreditsLimitReachedMessage, WalletCreditsMessage, ) -from models_library.resource_tracker import ServiceRunId, ServiceRunStatus +from models_library.resource_tracker import ServiceRunStatus +from models_library.services_types import ServiceRunID from models_library.users import UserID from models_library.wallets import WalletID from pydantic import PositiveInt @@ -58,7 +59,7 @@ async def sum_credit_transactions_and_publish_to_rabbitmq( async def _publish_to_rabbitmq_wallet_credits_limit_reached( rabbitmq_client: RabbitMQClient, - service_run_id: ServiceRunId, + service_run_id: ServiceRunID, user_id: UserID, project_id: ProjectID, node_id: NodeID, diff --git a/services/web/server/src/simcore_service_webserver/licenses/_licensed_checkouts_api.py b/services/web/server/src/simcore_service_webserver/licenses/_licensed_checkouts_api.py index 869953fccd3..ba140d565d5 100644 --- a/services/web/server/src/simcore_service_webserver/licenses/_licensed_checkouts_api.py +++ b/services/web/server/src/simcore_service_webserver/licenses/_licensed_checkouts_api.py @@ -7,10 +7,10 @@ ) from models_library.licensed_items import LicensedItemID from models_library.products import ProductName -from models_library.resource_tracker import ServiceRunId from models_library.resource_tracker_licensed_items_checkouts import ( LicensedItemCheckoutID, ) +from models_library.services_types import ServiceRunID from models_library.users import UserID from models_library.wallets import WalletID from servicelib.rabbitmq.rpc_interfaces.resource_usage_tracker import ( @@ -32,7 +32,7 @@ async def checkout_licensed_item_for_wallet( # checkout args licensed_item_id: LicensedItemID, num_of_seats: int, - service_run_id: ServiceRunId, + service_run_id: ServiceRunID, ) -> webserver_licensed_items_checkouts.LicensedItemCheckoutGet: # Check whether user has access to the wallet await get_wallet_by_user( diff --git a/services/web/server/src/simcore_service_webserver/licenses/_rpc.py b/services/web/server/src/simcore_service_webserver/licenses/_rpc.py index 0b5d1b65fe9..8be794e4016 100644 --- a/services/web/server/src/simcore_service_webserver/licenses/_rpc.py +++ b/services/web/server/src/simcore_service_webserver/licenses/_rpc.py @@ -7,11 +7,11 @@ from models_library.basic_types import IDStr from models_library.licensed_items import LicensedItemID from models_library.products import ProductName -from models_library.resource_tracker import ServiceRunId from models_library.resource_tracker_licensed_items_checkouts import ( LicensedItemCheckoutID, ) from models_library.rest_ordering import OrderBy +from models_library.services_types import ServiceRunID from models_library.users import UserID from models_library.wallets import WalletID from servicelib.rabbitmq import RPCRouter @@ -67,7 +67,7 @@ async def checkout_licensed_item_for_wallet( wallet_id: WalletID, licensed_item_id: LicensedItemID, num_of_seats: int, - service_run_id: ServiceRunId, + service_run_id: ServiceRunID, ) -> LicensedItemCheckoutGet: return await _licensed_checkouts_api.checkout_licensed_item_for_wallet( app,