diff --git a/packages/models-library/src/models_library/api_schemas_clusters_keeper/__init__.py b/packages/models-library/src/models_library/api_schemas_clusters_keeper/__init__.py new file mode 100644 index 00000000000..b6570d01c89 --- /dev/null +++ b/packages/models-library/src/models_library/api_schemas_clusters_keeper/__init__.py @@ -0,0 +1,9 @@ +from typing import Final + +from pydantic import parse_obj_as + +from ..rabbitmq_basic_types import RPCNamespace + +CLUSTERS_KEEPER_RPC_NAMESPACE: Final[RPCNamespace] = parse_obj_as( + RPCNamespace, "clusters-keeper" +) diff --git a/packages/models-library/src/models_library/rpc_schemas_clusters_keeper/clusters.py b/packages/models-library/src/models_library/api_schemas_clusters_keeper/clusters.py similarity index 100% rename from packages/models-library/src/models_library/rpc_schemas_clusters_keeper/clusters.py rename to packages/models-library/src/models_library/api_schemas_clusters_keeper/clusters.py diff --git a/packages/models-library/src/models_library/api_schemas_clusters_keeper/ec2_instances.py b/packages/models-library/src/models_library/api_schemas_clusters_keeper/ec2_instances.py new file mode 100644 index 00000000000..373a12af823 --- /dev/null +++ b/packages/models-library/src/models_library/api_schemas_clusters_keeper/ec2_instances.py @@ -0,0 +1,10 @@ +from dataclasses import dataclass + +from pydantic import ByteSize, PositiveInt + + +@dataclass(frozen=True) +class EC2InstanceType: + name: str + cpus: PositiveInt + ram: ByteSize diff --git a/packages/models-library/src/models_library/rpc_schemas_clusters_keeper/__init__.py b/packages/models-library/src/models_library/rpc_schemas_clusters_keeper/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/core/errors.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/core/errors.py index ece008e3f21..a465fd3e862 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/core/errors.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/core/errors.py @@ -17,6 +17,10 @@ class Ec2InstanceNotFoundError(ClustersKeeperRuntimeError): msg_template: str = "EC2 instance was not found" +class Ec2InstanceTypeInvalidError(ClustersKeeperRuntimeError): + msg_template: str = "EC2 instance type is invalid" + + class Ec2TooManyInstancesError(ClustersKeeperRuntimeError): msg_template: str = ( "The maximum amount of instances {num_instances} is already reached!" diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/models.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/models.py index d5b45afdcf7..6217940b44d 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/models.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/models.py @@ -2,17 +2,8 @@ from dataclasses import dataclass from typing import TypeAlias -from pydantic import ByteSize, PositiveInt from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType - -@dataclass(frozen=True) -class EC2InstanceType: - name: str - cpus: PositiveInt - ram: ByteSize - - InstancePrivateDNSName = str EC2Tags: TypeAlias = dict[str, str] diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/ec2.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/ec2.py index 9d6da40740c..11d2b712719 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/ec2.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/ec2.py @@ -7,7 +7,8 @@ import botocore.exceptions from aiobotocore.session import ClientCreatorContext from fastapi import FastAPI -from pydantic import ByteSize, parse_obj_as +from models_library.api_schemas_clusters_keeper.ec2_instances import EC2InstanceType +from pydantic import ByteSize from servicelib.logging_utils import log_context from tenacity._asyncio import AsyncRetrying from tenacity.before_sleep import before_sleep_log @@ -18,8 +19,10 @@ from types_aiobotocore_ec2.type_defs import FilterTypeDef from ..core.errors import ( + ClustersKeeperRuntimeError, ConfigurationError, Ec2InstanceNotFoundError, + Ec2InstanceTypeInvalidError, Ec2NotConnectedError, Ec2TooManyInstancesError, ) @@ -28,7 +31,7 @@ PrimaryEC2InstancesSettings, get_application_settings, ) -from ..models import EC2InstanceData, EC2InstanceType, EC2Tags +from ..models import EC2InstanceData, EC2Tags from ..utils.ec2 import compose_user_data logger = logging.getLogger(__name__) @@ -72,22 +75,27 @@ async def get_ec2_instance_capabilities( instance_type_names: set[InstanceTypeType], ) -> list[EC2InstanceType]: """instance_type_names must be a set of unique values""" - instance_types = await self.client.describe_instance_types( - InstanceTypes=list(instance_type_names) - ) - list_instances: list[EC2InstanceType] = [] - for instance in instance_types.get("InstanceTypes", []): - with contextlib.suppress(KeyError): - list_instances.append( - EC2InstanceType( - name=instance["InstanceType"], - cpus=instance["VCpuInfo"]["DefaultVCpus"], - ram=parse_obj_as( - ByteSize, f"{instance['MemoryInfo']['SizeInMiB']}MiB" - ), + try: + instance_types = await self.client.describe_instance_types( + InstanceTypes=list(instance_type_names) + ) + list_instances: list[EC2InstanceType] = [] + for instance in instance_types.get("InstanceTypes", []): + with contextlib.suppress(KeyError): + list_instances.append( + EC2InstanceType( + name=instance["InstanceType"], + cpus=instance["VCpuInfo"]["DefaultVCpus"], + ram=ByteSize( + int(instance["MemoryInfo"]["SizeInMiB"]) * 1024 * 1024 + ), + ) ) - ) - return list_instances + return list_instances + except botocore.exceptions.ClientError as exc: + if exc.response.get("Error", {}).get("Code", "") == "InvalidInstanceType": + raise Ec2InstanceTypeInvalidError from exc + raise ClustersKeeperRuntimeError from exc # pragma: no cover async def start_aws_instance( self, diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/rabbitmq.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/rabbitmq.py index e1e4feb9b07..c0ff928fe71 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/rabbitmq.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/rabbitmq.py @@ -1,15 +1,13 @@ import contextlib import logging -from typing import Final, cast +from typing import cast from fastapi import FastAPI from models_library.rabbitmq_messages import RabbitMessageBase -from pydantic import parse_obj_as from servicelib.logging_utils import log_catch from servicelib.rabbitmq import ( RabbitMQClient, RabbitMQRPCClient, - RPCNamespace, wait_till_rabbitmq_responsive, ) from settings_library.rabbit import RabbitSettings @@ -19,10 +17,6 @@ logger = logging.getLogger(__name__) -CLUSTERS_KEEPER_RPC_NAMESPACE: Final[RPCNamespace] = parse_obj_as( - RPCNamespace, "clusters-keeper" -) - def setup(app: FastAPI) -> None: async def on_startup() -> None: diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/clusters.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/clusters.py index c7c5c2d2a0d..babb123e210 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/clusters.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/clusters.py @@ -1,5 +1,5 @@ from fastapi import FastAPI -from models_library.rpc_schemas_clusters_keeper.clusters import OnDemandCluster +from models_library.api_schemas_clusters_keeper.clusters import OnDemandCluster from models_library.users import UserID from models_library.wallets import WalletID from servicelib.rabbitmq import RPCRouter diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/ec2_instances.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/ec2_instances.py new file mode 100644 index 00000000000..758564cf9dc --- /dev/null +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/ec2_instances.py @@ -0,0 +1,14 @@ +from fastapi import FastAPI +from models_library.api_schemas_clusters_keeper.ec2_instances import EC2InstanceType +from servicelib.rabbitmq import RPCRouter + +from ..modules.ec2 import get_ec2_client + +router = RPCRouter() + + +@router.expose() +async def get_instance_type_details( + app: FastAPI, *, instance_type_names: set[str] +) -> list[EC2InstanceType]: + return await get_ec2_client(app).get_ec2_instance_capabilities(instance_type_names) diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/rpc_routes.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/rpc_routes.py index 2b20838ba0e..6bce8825d80 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/rpc_routes.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/rpc_routes.py @@ -1,22 +1,21 @@ from collections.abc import Awaitable, Callable from fastapi import FastAPI +from models_library.api_schemas_clusters_keeper import CLUSTERS_KEEPER_RPC_NAMESPACE -from ..modules.rabbitmq import ( - CLUSTERS_KEEPER_RPC_NAMESPACE, - get_rabbitmq_rpc_client, - is_rabbitmq_enabled, -) +from ..modules.rabbitmq import get_rabbitmq_rpc_client, is_rabbitmq_enabled from .clusters import router as clusters_router +from .ec2_instances import router as ec2_instances_router def on_app_startup(app: FastAPI) -> Callable[[], Awaitable[None]]: async def _start() -> None: if is_rabbitmq_enabled(app): rpc_client = get_rabbitmq_rpc_client(app) - await rpc_client.register_router( - clusters_router, CLUSTERS_KEEPER_RPC_NAMESPACE, app - ) + for router in [clusters_router, ec2_instances_router]: + await rpc_client.register_router( + router, CLUSTERS_KEEPER_RPC_NAMESPACE, app + ) return _start diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py index 7718f84f199..f139a36dc19 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py @@ -3,11 +3,11 @@ import functools from typing import Any, Final -from models_library.clusters import NoAuthentication -from models_library.rpc_schemas_clusters_keeper.clusters import ( +from models_library.api_schemas_clusters_keeper.clusters import ( ClusterState, OnDemandCluster, ) +from models_library.clusters import NoAuthentication from models_library.users import UserID from models_library.wallets import WalletID from types_aiobotocore_ec2.literals import InstanceStateNameType diff --git a/services/clusters-keeper/tests/unit/conftest.py b/services/clusters-keeper/tests/unit/conftest.py index b61d27e7e96..eb240ee7848 100644 --- a/services/clusters-keeper/tests/unit/conftest.py +++ b/services/clusters-keeper/tests/unit/conftest.py @@ -5,7 +5,7 @@ import importlib.resources import json import random -from collections.abc import AsyncIterator, Callable, Iterator +from collections.abc import AsyncIterator, Awaitable, Callable, Iterator from datetime import timezone from pathlib import Path from typing import Any @@ -26,6 +26,7 @@ from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.utils_docker import get_localhost_ip from pytest_simcore.helpers.utils_envs import EnvVarsDict, setenvs_from_dict +from servicelib.rabbitmq import RabbitMQRPCClient from settings_library.rabbit import RabbitSettings from simcore_service_clusters_keeper.core.application import create_app from simcore_service_clusters_keeper.core.settings import ( @@ -397,3 +398,12 @@ def clusters_keeper_docker_compose() -> dict[str, Any]: ) assert data return yaml.safe_load(data) + + +@pytest.fixture +async def clusters_keeper_rabbitmq_rpc_client( + rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]] +) -> RabbitMQRPCClient: + rpc_client = await rabbitmq_rpc_client("pytest_clusters_keeper_rpc_client") + assert rpc_client + return rpc_client diff --git a/services/clusters-keeper/tests/unit/test_modules_ec2.py b/services/clusters-keeper/tests/unit/test_modules_ec2.py index fb2b9d46867..9962de4a5f6 100644 --- a/services/clusters-keeper/tests/unit/test_modules_ec2.py +++ b/services/clusters-keeper/tests/unit/test_modules_ec2.py @@ -15,6 +15,7 @@ from simcore_service_clusters_keeper.core.errors import ( ConfigurationError, Ec2InstanceNotFoundError, + Ec2InstanceTypeInvalidError, Ec2TooManyInstancesError, ) from simcore_service_clusters_keeper.core.settings import ( @@ -148,6 +149,25 @@ async def test_get_ec2_instance_capabilities( assert any(i.name == instance_type_name for i in instance_types) +async def test_get_ec2_instance_capabilities_with_empty_set_returns_all_options( + mocked_aws_server_envs: None, + clusters_keeper_ec2: ClustersKeeperEC2, +): + instance_types = await clusters_keeper_ec2.get_ec2_instance_capabilities(set()) + assert instance_types + # NOTE: this might need adaptation when moto is updated + assert 700 < len(instance_types) < 800 + + +async def test_get_ec2_instance_capabilities_with_invalid_names( + mocked_aws_server_envs: None, clusters_keeper_ec2: ClustersKeeperEC2, faker: Faker +): + with pytest.raises(Ec2InstanceTypeInvalidError): + await clusters_keeper_ec2.get_ec2_instance_capabilities( + faker.pyset(allowed_types=(str,)) + ) + + async def test_start_aws_instance( mocked_aws_server_envs: None, aws_vpc_id: str, diff --git a/services/clusters-keeper/tests/unit/test_rpc_clusters.py b/services/clusters-keeper/tests/unit/test_rpc_clusters.py index 9f55912c5ec..e0804bd0b6d 100644 --- a/services/clusters-keeper/tests/unit/test_rpc_clusters.py +++ b/services/clusters-keeper/tests/unit/test_rpc_clusters.py @@ -4,21 +4,19 @@ import datetime -from collections.abc import Awaitable, Callable from dataclasses import dataclass -from typing import Final from unittest.mock import MagicMock import arrow import pytest from faker import Faker from fastapi import FastAPI -from models_library.rpc_schemas_clusters_keeper.clusters import OnDemandCluster +from models_library.api_schemas_clusters_keeper import CLUSTERS_KEEPER_RPC_NAMESPACE +from models_library.api_schemas_clusters_keeper.clusters import OnDemandCluster from models_library.users import UserID from models_library.wallets import WalletID -from pydantic import parse_obj_as from pytest_mock.plugin import MockerFixture -from servicelib.rabbitmq import RabbitMQRPCClient, RPCMethodName, RPCNamespace +from servicelib.rabbitmq import RabbitMQRPCClient, RPCMethodName from simcore_service_clusters_keeper.utils.ec2 import HEARTBEAT_TAG_KEY from types_aiobotocore_ec2 import EC2Client @@ -29,20 +27,6 @@ pytest_simcore_ops_services_selection = [] -@pytest.fixture -async def clusters_keeper_rabbitmq_rpc_client( - rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]] -) -> RabbitMQRPCClient: - rpc_client = await rabbitmq_rpc_client("pytest_clusters_keeper_rpc_client") - assert rpc_client - return rpc_client - - -CLUSTERS_KEEPER_NAMESPACE: Final[RPCNamespace] = parse_obj_as( - RPCNamespace, "clusters-keeper" -) - - @pytest.fixture def user_id(faker: Faker) -> UserID: return faker.pyint(min_value=1) @@ -122,7 +106,7 @@ async def test_get_or_create_cluster( ): # send rabbitmq rpc to create_cluster rpc_response = await clusters_keeper_rabbitmq_rpc_client.request( - CLUSTERS_KEEPER_NAMESPACE, + CLUSTERS_KEEPER_RPC_NAMESPACE, RPCMethodName("get_or_create_cluster"), user_id=user_id, wallet_id=wallet_id if use_wallet_id else None, @@ -138,7 +122,7 @@ async def test_get_or_create_cluster( # calling it again returns the existing cluster rpc_response = await clusters_keeper_rabbitmq_rpc_client.request( - CLUSTERS_KEEPER_NAMESPACE, + CLUSTERS_KEEPER_RPC_NAMESPACE, RPCMethodName("get_or_create_cluster"), user_id=user_id, wallet_id=wallet_id if use_wallet_id else None, diff --git a/services/clusters-keeper/tests/unit/test_rpc_ec2_instances.py b/services/clusters-keeper/tests/unit/test_rpc_ec2_instances.py new file mode 100644 index 00000000000..adbb160f86d --- /dev/null +++ b/services/clusters-keeper/tests/unit/test_rpc_ec2_instances.py @@ -0,0 +1,61 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=unused-variable + +import pytest +from fastapi import FastAPI +from models_library.api_schemas_clusters_keeper import CLUSTERS_KEEPER_RPC_NAMESPACE +from models_library.api_schemas_clusters_keeper.ec2_instances import EC2InstanceType +from servicelib.rabbitmq import RabbitMQRPCClient, RPCMethodName + +pytest_simcore_core_services_selection = [ + "rabbit", +] + +pytest_simcore_ops_services_selection = [] + + +@pytest.fixture +def _base_configuration( + docker_swarm: None, + enabled_rabbitmq: None, + aws_subnet_id: str, + aws_security_group_id: str, + aws_ami_id: str, + aws_allowed_ec2_instance_type_names: list[str], + mocked_redis_server: None, + initialized_app: FastAPI, +) -> None: + ... + + +async def test_get_instance_type_details_all_options( + _base_configuration: None, + clusters_keeper_rabbitmq_rpc_client: RabbitMQRPCClient, +): + # an empty set returns all options + rpc_response = await clusters_keeper_rabbitmq_rpc_client.request( + CLUSTERS_KEEPER_RPC_NAMESPACE, + RPCMethodName("get_instance_type_details"), + instance_type_names=set(), + ) + assert rpc_response + assert isinstance(rpc_response, list) + assert isinstance(rpc_response[0], EC2InstanceType) + + +async def test_get_instance_type_details_specific_type_names( + _base_configuration: None, + clusters_keeper_rabbitmq_rpc_client: RabbitMQRPCClient, +): + # an empty set returns all options + rpc_response = await clusters_keeper_rabbitmq_rpc_client.request( + CLUSTERS_KEEPER_RPC_NAMESPACE, + RPCMethodName("get_instance_type_details"), + instance_type_names={"t2.micro", "g4dn.xlarge"}, + ) + assert rpc_response + assert isinstance(rpc_response, list) + assert len(rpc_response) == 2 + assert rpc_response[0].name == "g4dn.xlarge" + assert rpc_response[1].name == "t2.micro" diff --git a/services/clusters-keeper/tests/unit/test_utils_clusters.py b/services/clusters-keeper/tests/unit/test_utils_clusters.py index 2126e850815..495cdbb3d5f 100644 --- a/services/clusters-keeper/tests/unit/test_utils_clusters.py +++ b/services/clusters-keeper/tests/unit/test_utils_clusters.py @@ -8,7 +8,7 @@ import pytest from faker import Faker -from models_library.rpc_schemas_clusters_keeper.clusters import ClusterState +from models_library.api_schemas_clusters_keeper.clusters import ClusterState from pytest_simcore.helpers.utils_envs import EnvVarsDict from simcore_service_clusters_keeper.core.settings import ApplicationSettings from simcore_service_clusters_keeper.models import EC2InstanceData diff --git a/services/director-v2/src/simcore_service_director_v2/modules/clusters_keeper.py b/services/director-v2/src/simcore_service_director_v2/modules/clusters_keeper.py index 32a76b70e94..81d28c14bc0 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/clusters_keeper.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/clusters_keeper.py @@ -1,10 +1,10 @@ import logging -from models_library.clusters import BaseCluster, ClusterTypeInModel -from models_library.rpc_schemas_clusters_keeper.clusters import ( +from models_library.api_schemas_clusters_keeper.clusters import ( ClusterState, OnDemandCluster, ) +from models_library.clusters import BaseCluster, ClusterTypeInModel from models_library.users import UserID from models_library.wallets import WalletID from servicelib.rabbitmq import (