From a7d1e3a6fbca7456382217ad3f1d9922a5af550c Mon Sep 17 00:00:00 2001 From: Sylvain <35365065+sanderegg@users.noreply.github.com> Date: Mon, 16 Dec 2024 08:05:43 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9BAutoscaling:=20Warm=20buffers=20do?= =?UTF-8?q?=20not=20replace=20hot=20buffers=20(#6962)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .codecov.yml | 10 +- .github/workflows/ci-testing-deploy.yml | 24 +- .../src/pytest_simcore/helpers/aws_ec2.py | 5 +- .../modules/auto_scaling_core.py | 36 ++- .../modules/dask.py | 2 +- .../utils/utils_docker.py | 8 +- services/autoscaling/tests/unit/conftest.py | 202 +++++++++++- ...test_modules_auto_scaling_computational.py | 116 ++++++- .../unit/test_modules_auto_scaling_dynamic.py | 305 +++++++++++++++++- .../unit/test_modules_buffer_machine_core.py | 172 +--------- .../unit/test_utils_auto_scaling_core.py | 10 +- 11 files changed, 664 insertions(+), 226 deletions(-) diff --git a/.codecov.yml b/.codecov.yml index 02666df0a13..eb2e6697348 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -10,10 +10,10 @@ flag_management: statuses: - type: project target: auto - threshold: 1% + threshold: 2% - type: patch target: auto - threshold: 1% + threshold: 2% component_management: @@ -22,7 +22,7 @@ component_management: statuses: - type: project target: auto - threshold: 1% + threshold: 2% branches: - "!master" individual_components: @@ -116,12 +116,12 @@ coverage: project: default: informational: true - threshold: 1% + threshold: 2% patch: default: informational: true - threshold: 1% + threshold: 2% comment: layout: "header,diff,flags,components,footer" diff --git a/.github/workflows/ci-testing-deploy.yml b/.github/workflows/ci-testing-deploy.yml index aa1efbee7a9..789c552cc81 100644 --- a/.github/workflows/ci-testing-deploy.yml +++ b/.github/workflows/ci-testing-deploy.yml @@ -772,7 +772,7 @@ jobs: if: ${{ !cancelled() }} run: ./ci/github/unit-testing/catalog.bash test - name: upload failed tests logs - if: ${{ !cancelled() }} + if: ${{ failure() }} uses: actions/upload-artifact@v4 with: name: ${{ github.job }}_docker_logs @@ -879,7 +879,7 @@ jobs: if: ${{ !cancelled() }} run: ./ci/github/unit-testing/datcore-adapter.bash test - name: upload failed tests logs - if: ${{ !cancelled() }} + if: ${{ failure() }} uses: actions/upload-artifact@v4 with: name: ${{ github.job }}_docker_logs @@ -930,7 +930,7 @@ jobs: if: ${{ !cancelled() }} run: ./ci/github/unit-testing/director.bash test - name: upload failed tests logs - if: ${{ !cancelled() }} + if: ${{ failure() }} uses: actions/upload-artifact@v4 with: name: ${{ github.job }}_docker_logs @@ -981,7 +981,7 @@ jobs: if: ${{ !cancelled() }} run: ./ci/github/unit-testing/director-v2.bash test - name: upload failed tests logs - if: ${{ !cancelled() }} + if: ${{ failure() }} uses: actions/upload-artifact@v4 with: name: ${{ github.job }}_docker_logs @@ -1910,7 +1910,7 @@ jobs: - name: test run: ./ci/github/integration-testing/webserver.bash test 01 - name: upload failed tests logs - if: ${{ !cancelled() }} + if: ${{ failure() }} uses: actions/upload-artifact@v4 with: name: ${{ github.job }}_docker_logs @@ -1974,7 +1974,7 @@ jobs: - name: test run: ./ci/github/integration-testing/webserver.bash test 02 - name: upload failed tests logs - if: ${{ !cancelled() }} + if: ${{ failure() }} uses: actions/upload-artifact@v4 with: name: ${{ github.job }}_docker_logs @@ -2038,7 +2038,7 @@ jobs: - name: test run: ./ci/github/integration-testing/director-v2.bash test 01 - name: upload failed tests logs - if: ${{ !cancelled() }} + if: ${{ failure() }} uses: actions/upload-artifact@v4 with: name: ${{ github.job }}_docker_logs @@ -2111,7 +2111,7 @@ jobs: - name: test run: ./ci/github/integration-testing/director-v2.bash test 02 - name: upload failed tests logs - if: ${{ !cancelled() }} + if: ${{ failure() }} uses: actions/upload-artifact@v4 with: name: ${{ github.job }}_docker_logs @@ -2177,7 +2177,7 @@ jobs: - name: test run: ./ci/github/integration-testing/dynamic-sidecar.bash test 01 - name: upload failed tests logs - if: ${{ !cancelled() }} + if: ${{ failure() }} uses: actions/upload-artifact@v4 with: name: ${{ github.job }}_docker_logs @@ -2241,7 +2241,7 @@ jobs: - name: test run: ./ci/github/integration-testing/simcore-sdk.bash test - name: upload failed tests logs - if: ${{ !cancelled() }} + if: ${{ failure() }} uses: actions/upload-artifact@v4 with: name: ${{ github.job }}_docker_logs @@ -2330,7 +2330,7 @@ jobs: - name: test run: ./ci/github/system-testing/public-api.bash test - name: upload failed tests logs - if: ${{ !cancelled() }} + if: ${{ failure() }} uses: actions/upload-artifact@v4 with: name: ${{ github.job }}_docker_logs @@ -2395,7 +2395,7 @@ jobs: name: ${{ github.job }}_services_settings_schemas path: ./services/**/settings-schema.json - name: upload failed tests logs - if: ${{ !cancelled() }} + if: ${{ failure() }} uses: actions/upload-artifact@v4 with: name: ${{ github.job }}_docker_logs diff --git a/packages/pytest-simcore/src/pytest_simcore/helpers/aws_ec2.py b/packages/pytest-simcore/src/pytest_simcore/helpers/aws_ec2.py index 1e992f4ee45..7bb826149fe 100644 --- a/packages/pytest-simcore/src/pytest_simcore/helpers/aws_ec2.py +++ b/packages/pytest-simcore/src/pytest_simcore/helpers/aws_ec2.py @@ -42,7 +42,10 @@ async def assert_autoscaled_dynamic_ec2_instances( expected_instance_state: InstanceStateNameType, expected_additional_tag_keys: list[str], instance_filters: Sequence[FilterTypeDef] | None, + expected_user_data: list[str] | None = None, ) -> list[InstanceTypeDef]: + if expected_user_data is None: + expected_user_data = ["docker swarm join"] return await assert_ec2_instances( ec2_client, expected_num_reservations=expected_num_reservations, @@ -54,7 +57,7 @@ async def assert_autoscaled_dynamic_ec2_instances( "io.simcore.autoscaling.monitored_services_labels", *expected_additional_tag_keys, ], - expected_user_data=["docker swarm join"], + expected_user_data=expected_user_data, instance_filters=instance_filters, ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py index e2212195aed..9c45de0524b 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py @@ -418,15 +418,43 @@ async def _activate_drained_nodes( ) -async def _start_buffer_instances( +async def _start_warm_buffer_instances( app: FastAPI, cluster: Cluster, auto_scaling_mode: BaseAutoscaling ) -> Cluster: + """starts warm buffer if there are assigned tasks, or if a hot buffer of the same type is needed""" + + app_settings = get_application_settings(app) + assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec + instances_to_start = [ i.ec2_instance for i in cluster.buffer_ec2s if i.assigned_tasks ] + + if ( + len(cluster.buffer_drained_nodes) + < app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER + ): + # check if we can migrate warm buffers to hot buffers + hot_buffer_instance_type = cast( + InstanceTypeType, + next( + iter(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES) + ), + ) + free_startable_warm_buffers_to_replace_hot_buffers = [ + warm_buffer.ec2_instance + for warm_buffer in cluster.buffer_ec2s + if (warm_buffer.ec2_instance.type == hot_buffer_instance_type) + and not warm_buffer.assigned_tasks + ] + instances_to_start += free_startable_warm_buffers_to_replace_hot_buffers[ + : app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER + - len(cluster.buffer_drained_nodes) + ] + if not instances_to_start: return cluster - # change the buffer machine to an active one + with log_context( _logger, logging.INFO, f"start {len(instances_to_start)} buffer machines" ): @@ -1187,8 +1215,8 @@ async def _autoscale_cluster( # 2. activate available drained nodes to cover some of the tasks cluster = await _activate_drained_nodes(app, cluster, auto_scaling_mode) - # 3. start buffer instances to cover the remaining tasks - cluster = await _start_buffer_instances(app, cluster, auto_scaling_mode) + # 3. start warm buffer instances to cover the remaining tasks + cluster = await _start_warm_buffer_instances(app, cluster, auto_scaling_mode) # 4. scale down unused instances cluster = await _scale_down_unused_cluster_instances( diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py index 4c5ee00f86c..d57508babf8 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py @@ -273,7 +273,7 @@ def _list_processing_tasks_on_worker( async with _scheduler_client(scheduler_url, authentication) as client: worker_url, _ = _dask_worker_from_ec2_instance(client, ec2_instance) - _logger.debug("looking for processing tasksfor %s", f"{worker_url=}") + _logger.debug("looking for processing tasks for %s", f"{worker_url=}") # now get the used resources worker_processing_tasks: list[ diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py index 65caa0f40b1..4c5b5e6f79f 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py @@ -521,8 +521,14 @@ async def tag_node( tags: dict[DockerLabelKey, str], available: bool, ) -> Node: + assert node.spec # nosec + if (node.spec.labels == tags) and ( + (node.spec.availability is Availability.active) == available + ): + # nothing to do + return node with log_context( - logger, logging.DEBUG, msg=f"tagging {node.id=} with {tags=} and {available=}" + logger, logging.DEBUG, msg=f"tag {node.id=} with {tags=} and {available=}" ): assert node.id # nosec diff --git a/services/autoscaling/tests/unit/conftest.py b/services/autoscaling/tests/unit/conftest.py index 4a48f2776b6..9b7489268e6 100644 --- a/services/autoscaling/tests/unit/conftest.py +++ b/services/autoscaling/tests/unit/conftest.py @@ -28,11 +28,16 @@ EC2InstanceType, Resources, ) +from common_library.json_serialization import json_dumps from deepdiff import DeepDiff from faker import Faker from fakeredis.aioredis import FakeRedis from fastapi import FastAPI -from models_library.docker import DockerLabelKey, StandardSimcoreDockerLabels +from models_library.docker import ( + DockerGenericTag, + DockerLabelKey, + StandardSimcoreDockerLabels, +) from models_library.generated_models.docker_rest_api import Availability from models_library.generated_models.docker_rest_api import Node as DockerNode from models_library.generated_models.docker_rest_api import ( @@ -45,7 +50,7 @@ Service, TaskSpec, ) -from pydantic import ByteSize, PositiveInt, TypeAdapter +from pydantic import ByteSize, NonNegativeInt, PositiveInt, TypeAdapter from pytest_mock import MockType from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.host import get_localhost_ip @@ -57,6 +62,7 @@ ) from settings_library.rabbit import RabbitSettings from settings_library.ssm import SSMSettings +from simcore_service_autoscaling.constants import PRE_PULLED_IMAGES_EC2_TAG_KEY from simcore_service_autoscaling.core.application import create_app from simcore_service_autoscaling.core.settings import ( AUTOSCALING_ENV_PREFIX, @@ -71,8 +77,14 @@ DaskTaskResources, ) from simcore_service_autoscaling.modules import auto_scaling_core +from simcore_service_autoscaling.modules.auto_scaling_mode_dynamic import ( + DynamicAutoscaling, +) from simcore_service_autoscaling.modules.docker import AutoscalingDocker from simcore_service_autoscaling.modules.ec2 import SimcoreEC2API +from simcore_service_autoscaling.utils.buffer_machines_pool_core import ( + get_deactivated_buffer_ec2_tags, +) from simcore_service_autoscaling.utils.utils_docker import ( _OSPARC_SERVICE_READY_LABEL_KEY, _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY, @@ -81,7 +93,9 @@ from tenacity.retry import retry_if_exception_type from tenacity.stop import stop_after_delay from tenacity.wait import wait_fixed -from types_aiobotocore_ec2.literals import InstanceTypeType +from types_aiobotocore_ec2 import EC2Client +from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType +from types_aiobotocore_ec2.type_defs import TagTypeDef pytest_plugins = [ "pytest_simcore.aws_server", @@ -991,10 +1005,22 @@ def _creator( @pytest.fixture -def mock_machines_buffer(monkeypatch: pytest.MonkeyPatch) -> int: - num_machines_in_buffer = 5 - monkeypatch.setenv("EC2_INSTANCES_MACHINES_BUFFER", f"{num_machines_in_buffer}") - return num_machines_in_buffer +def num_hot_buffer() -> NonNegativeInt: + return 5 + + +@pytest.fixture +def with_instances_machines_hot_buffer( + num_hot_buffer: int, + app_environment: EnvVarsDict, + monkeypatch: pytest.MonkeyPatch, +) -> EnvVarsDict: + return app_environment | setenvs_from_dict( + monkeypatch, + { + "EC2_INSTANCES_MACHINES_BUFFER": f"{num_hot_buffer}", + }, + ) @pytest.fixture @@ -1042,3 +1068,165 @@ async def _( autospec=True, side_effect=_, ) + + +@pytest.fixture +def fake_pre_pull_images() -> list[DockerGenericTag]: + return TypeAdapter(list[DockerGenericTag]).validate_python( + [ + "nginx:latest", + "itisfoundation/my-very-nice-service:latest", + "simcore/services/dynamic/another-nice-one:2.4.5", + "asd", + ] + ) + + +@pytest.fixture +def ec2_instances_allowed_types_with_only_1_buffered( + faker: Faker, + fake_pre_pull_images: list[DockerGenericTag], + external_ec2_instances_allowed_types: None | dict[str, EC2InstanceBootSpecific], +) -> dict[InstanceTypeType, EC2InstanceBootSpecific]: + if not external_ec2_instances_allowed_types: + return { + "t2.micro": EC2InstanceBootSpecific( + ami_id=faker.pystr(), + pre_pull_images=fake_pre_pull_images, + buffer_count=faker.pyint(min_value=1, max_value=10), + ) + } + + allowed_ec2_types = external_ec2_instances_allowed_types + allowed_ec2_types_with_buffer_defined = dict( + filter( + lambda instance_type_and_settings: instance_type_and_settings[ + 1 + ].buffer_count + > 0, + allowed_ec2_types.items(), + ) + ) + assert ( + allowed_ec2_types_with_buffer_defined + ), "one type with buffer is needed for the tests!" + assert ( + len(allowed_ec2_types_with_buffer_defined) == 1 + ), "more than one type with buffer is disallowed in this test!" + return { + TypeAdapter(InstanceTypeType).validate_python(k): v + for k, v in allowed_ec2_types_with_buffer_defined.items() + } + + +@pytest.fixture +def buffer_count( + ec2_instances_allowed_types_with_only_1_buffered: dict[ + InstanceTypeType, EC2InstanceBootSpecific + ], +) -> int: + def _by_buffer_count( + instance_type_and_settings: tuple[InstanceTypeType, EC2InstanceBootSpecific] + ) -> bool: + _, boot_specific = instance_type_and_settings + return boot_specific.buffer_count > 0 + + allowed_ec2_types = ec2_instances_allowed_types_with_only_1_buffered + allowed_ec2_types_with_buffer_defined = dict( + filter(_by_buffer_count, allowed_ec2_types.items()) + ) + assert allowed_ec2_types_with_buffer_defined, "you need one type with buffer" + assert ( + len(allowed_ec2_types_with_buffer_defined) == 1 + ), "more than one type with buffer is disallowed in this test!" + return next(iter(allowed_ec2_types_with_buffer_defined.values())).buffer_count + + +@pytest.fixture +async def create_buffer_machines( + ec2_client: EC2Client, + aws_ami_id: str, + app_settings: ApplicationSettings, + initialized_app: FastAPI, +) -> Callable[ + [int, InstanceTypeType, InstanceStateNameType, list[DockerGenericTag] | None], + Awaitable[list[str]], +]: + async def _do( + num: int, + instance_type: InstanceTypeType, + instance_state_name: InstanceStateNameType, + pre_pull_images: list[DockerGenericTag] | None, + ) -> list[str]: + assert app_settings.AUTOSCALING_EC2_INSTANCES + + assert instance_state_name in [ + "running", + "stopped", + ], "only 'running' and 'stopped' are supported for testing" + + resource_tags: list[TagTypeDef] = [ + {"Key": tag_key, "Value": tag_value} + for tag_key, tag_value in get_deactivated_buffer_ec2_tags( + initialized_app, DynamicAutoscaling() + ).items() + ] + if pre_pull_images is not None and instance_state_name == "stopped": + resource_tags.append( + { + "Key": PRE_PULLED_IMAGES_EC2_TAG_KEY, + "Value": f"{json_dumps(pre_pull_images)}", + } + ) + with log_context( + logging.INFO, f"creating {num} buffer machines of {instance_type}" + ): + instances = await ec2_client.run_instances( + ImageId=aws_ami_id, + MaxCount=num, + MinCount=num, + InstanceType=instance_type, + KeyName=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME, + SecurityGroupIds=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_SECURITY_GROUP_IDS, + SubnetId=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_SUBNET_ID, + IamInstanceProfile={ + "Arn": app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ATTACHED_IAM_PROFILE + }, + TagSpecifications=[ + {"ResourceType": "instance", "Tags": resource_tags}, + {"ResourceType": "volume", "Tags": resource_tags}, + {"ResourceType": "network-interface", "Tags": resource_tags}, + ], + UserData="echo 'I am pytest'", + ) + instance_ids = [ + i["InstanceId"] for i in instances["Instances"] if "InstanceId" in i + ] + + waiter = ec2_client.get_waiter("instance_exists") + await waiter.wait(InstanceIds=instance_ids) + instances = await ec2_client.describe_instances(InstanceIds=instance_ids) + assert "Reservations" in instances + assert instances["Reservations"] + assert "Instances" in instances["Reservations"][0] + assert len(instances["Reservations"][0]["Instances"]) == num + for instance in instances["Reservations"][0]["Instances"]: + assert "State" in instance + assert "Name" in instance["State"] + assert instance["State"]["Name"] == "running" + + if instance_state_name == "stopped": + await ec2_client.stop_instances(InstanceIds=instance_ids) + instances = await ec2_client.describe_instances(InstanceIds=instance_ids) + assert "Reservations" in instances + assert instances["Reservations"] + assert "Instances" in instances["Reservations"][0] + assert len(instances["Reservations"][0]["Instances"]) == num + for instance in instances["Reservations"][0]["Instances"]: + assert "State" in instance + assert "Name" in instance["State"] + assert instance["State"]["Name"] == "stopped" + + return instance_ids + + return _do diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py b/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py index 6e7a0d7c828..bad4215a65e 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py +++ b/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py @@ -305,6 +305,18 @@ async def _(scale_up_params: _ScaleUpParams) -> list[distributed.Future]: return _ +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["with_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_drain_nodes_labelled", + ["without_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) async def test_cluster_scaling_with_no_tasks_does_nothing( minimal_configuration: None, app_settings: ApplicationSettings, @@ -330,6 +342,18 @@ async def test_cluster_scaling_with_no_tasks_does_nothing( @pytest.mark.acceptance_test( "Ensure this does not happen https://github.com/ITISFoundation/osparc-simcore/issues/6227" ) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["with_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_drain_nodes_labelled", + ["without_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) async def test_cluster_scaling_with_disabled_ssm_does_not_block_autoscaling( minimal_configuration: None, disabled_ssm: None, @@ -353,6 +377,18 @@ async def test_cluster_scaling_with_disabled_ssm_does_not_block_autoscaling( ) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["with_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_drain_nodes_labelled", + ["without_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) async def test_cluster_scaling_with_task_with_too_much_resources_starts_nothing( minimal_configuration: None, app_settings: ApplicationSettings, @@ -800,6 +836,18 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 mock_docker_compute_node_used_resources.assert_not_called() +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["with_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_drain_nodes_labelled", + ["without_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) async def test_cluster_does_not_scale_up_if_defined_instance_is_not_allowed( minimal_configuration: None, app_settings: ApplicationSettings, @@ -839,6 +887,18 @@ async def test_cluster_does_not_scale_up_if_defined_instance_is_not_allowed( assert "Unexpected error:" in error_messages[0] +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["with_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_drain_nodes_labelled", + ["without_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) async def test_cluster_does_not_scale_up_if_defined_instance_is_not_fitting_resources( minimal_configuration: None, app_settings: ApplicationSettings, @@ -878,6 +938,18 @@ async def test_cluster_does_not_scale_up_if_defined_instance_is_not_fitting_reso assert "Unexpected error:" in error_messages[0] +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["with_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_drain_nodes_labelled", + ["without_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) @pytest.mark.parametrize( "scale_up_params", [ @@ -948,6 +1020,18 @@ async def test_cluster_scaling_up_starts_multiple_instances( mock_rabbitmq_post_message.reset_mock() +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["with_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_drain_nodes_labelled", + ["without_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) @pytest.mark.parametrize( "scale_up_params", [ @@ -1044,6 +1128,18 @@ async def test_cluster_scaling_up_more_than_allowed_max_starts_max_instances_and ) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["with_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_drain_nodes_labelled", + ["without_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) async def test_cluster_scaling_up_more_than_allowed_with_multiple_types_max_starts_max_instances_and_not_more( patch_ec2_client_launch_instances_min_number_of_instances: mock.Mock, minimal_configuration: None, @@ -1141,6 +1237,18 @@ async def test_cluster_scaling_up_more_than_allowed_with_multiple_types_max_star ) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["with_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_drain_nodes_labelled", + ["without_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) @pytest.mark.parametrize( "scale_up_params", [ @@ -1305,11 +1413,15 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( @pytest.mark.parametrize( - "with_docker_join_drained", ["with_AUTOSCALING_DOCKER_JOIN_DRAINED"], indirect=True + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["with_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, ) @pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options "with_drain_nodes_labelled", - ["with_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + ["without_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], indirect=True, ) @pytest.mark.parametrize( diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py b/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py index ccdb2461c04..afd3c01e4a3 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py +++ b/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py @@ -24,6 +24,7 @@ from fastapi import FastAPI from models_library.docker import ( DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY, + DockerGenericTag, DockerLabelKey, StandardSimcoreDockerLabels, ) @@ -43,9 +44,13 @@ assert_cluster_state, create_fake_association, ) -from pytest_simcore.helpers.aws_ec2 import assert_autoscaled_dynamic_ec2_instances +from pytest_simcore.helpers.aws_ec2 import ( + assert_autoscaled_dynamic_ec2_instances, + assert_autoscaled_dynamic_warm_pools_ec2_instances, +) from pytest_simcore.helpers.logging_tools import log_context from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict +from simcore_service_autoscaling.constants import BUFFER_MACHINE_TAG_KEY from simcore_service_autoscaling.core.settings import ApplicationSettings from simcore_service_autoscaling.models import AssociatedInstance, Cluster from simcore_service_autoscaling.modules.auto_scaling_core import ( @@ -68,7 +73,7 @@ _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY, ) from types_aiobotocore_ec2.client import EC2Client -from types_aiobotocore_ec2.literals import InstanceTypeType +from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType from types_aiobotocore_ec2.type_defs import FilterTypeDef, InstanceTypeDef @@ -286,6 +291,18 @@ async def _(scale_up_params: _ScaleUpParams) -> list[Service]: return _ +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["without_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_drain_nodes_labelled", + ["with_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) async def test_cluster_scaling_with_no_services_does_nothing( minimal_configuration: None, app_settings: ApplicationSettings, @@ -304,10 +321,22 @@ async def test_cluster_scaling_with_no_services_does_nothing( ) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["without_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_drain_nodes_labelled", + ["with_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) async def test_cluster_scaling_with_no_services_and_machine_buffer_starts_expected_machines( patch_ec2_client_launch_instances_min_number_of_instances: mock.Mock, minimal_configuration: None, - mock_machines_buffer: int, + with_instances_machines_hot_buffer: EnvVarsDict, app_settings: ApplicationSettings, initialized_app: FastAPI, aws_allowed_ec2_instance_type_names_env: list[str], @@ -321,17 +350,13 @@ async def test_cluster_scaling_with_no_services_and_machine_buffer_starts_expect instance_type_filters: Sequence[FilterTypeDef], ): assert app_settings.AUTOSCALING_EC2_INSTANCES - assert ( - mock_machines_buffer - == app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER - ) await auto_scale_cluster( app=initialized_app, auto_scaling_mode=DynamicAutoscaling() ) await assert_autoscaled_dynamic_ec2_instances( ec2_client, expected_num_reservations=1, - expected_num_instances=mock_machines_buffer, + expected_num_instances=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER, expected_instance_type=cast( InstanceTypeType, next( @@ -346,7 +371,7 @@ async def test_cluster_scaling_with_no_services_and_machine_buffer_starts_expect mock_rabbitmq_post_message, app_settings, initialized_app, - instances_pending=mock_machines_buffer, + instances_pending=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER, ) mock_rabbitmq_post_message.reset_mock() # calling again should attach the new nodes to the reserve, but nothing should start @@ -356,7 +381,7 @@ async def test_cluster_scaling_with_no_services_and_machine_buffer_starts_expect await assert_autoscaled_dynamic_ec2_instances( ec2_client, expected_num_reservations=1, - expected_num_instances=mock_machines_buffer, + expected_num_instances=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER, expected_instance_type=cast( InstanceTypeType, next( @@ -375,14 +400,15 @@ async def test_cluster_scaling_with_no_services_and_machine_buffer_starts_expect mock_rabbitmq_post_message, app_settings, initialized_app, - nodes_total=mock_machines_buffer, - nodes_drained=mock_machines_buffer, - instances_running=mock_machines_buffer, + nodes_total=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER, + nodes_drained=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER, + instances_running=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER, cluster_total_resources={ - "cpus": mock_machines_buffer + "cpus": app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER * fake_node.description.resources.nano_cp_us / 1e9, - "ram": mock_machines_buffer * fake_node.description.resources.memory_bytes, + "ram": app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER + * fake_node.description.resources.memory_bytes, }, ) @@ -394,7 +420,7 @@ async def test_cluster_scaling_with_no_services_and_machine_buffer_starts_expect await assert_autoscaled_dynamic_ec2_instances( ec2_client, expected_num_reservations=1, - expected_num_instances=mock_machines_buffer, + expected_num_instances=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER, expected_instance_type=cast( InstanceTypeType, next( @@ -407,6 +433,18 @@ async def test_cluster_scaling_with_no_services_and_machine_buffer_starts_expect ) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["without_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_drain_nodes_labelled", + ["with_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) @pytest.mark.parametrize( "scale_up_params", [ @@ -990,6 +1028,18 @@ async def test_cluster_scaling_up_and_down( ) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["without_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_drain_nodes_labelled", + ["with_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) @pytest.mark.parametrize( "scale_up_params", [ @@ -1066,6 +1116,18 @@ async def test_cluster_scaling_up_and_down_against_aws( ) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["without_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_drain_nodes_labelled", + ["with_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) @pytest.mark.parametrize( "scale_up_params", [ @@ -1148,9 +1210,13 @@ async def test_cluster_scaling_up_starts_multiple_instances( @pytest.mark.parametrize( - "with_docker_join_drained", ["with_AUTOSCALING_DOCKER_JOIN_DRAINED"], indirect=True + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["without_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, ) @pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options "with_drain_nodes_labelled", ["with_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], indirect=True, @@ -1445,6 +1511,18 @@ async def test_cluster_adapts_machines_on_the_fly( # noqa: PLR0915 assert instance["InstanceType"] == scale_up_params2.expected_instance_type +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["without_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_drain_nodes_labelled", + ["with_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) @pytest.mark.parametrize( "scale_up_params", [ @@ -1606,6 +1684,18 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( ) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["without_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_drain_nodes_labelled", + ["with_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) async def test__find_terminateable_nodes_with_no_hosts( minimal_configuration: None, initialized_app: FastAPI, @@ -1626,6 +1716,18 @@ async def test__find_terminateable_nodes_with_no_hosts( assert await _find_terminateable_instances(initialized_app, active_cluster) == [] +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["without_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_drain_nodes_labelled", + ["with_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) async def test__try_scale_down_cluster_with_no_nodes( minimal_configuration: None, with_valid_time_before_termination: datetime.timedelta, @@ -1650,6 +1752,18 @@ async def test__try_scale_down_cluster_with_no_nodes( mock_remove_nodes.assert_not_called() +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["without_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_drain_nodes_labelled", + ["with_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) async def test__activate_drained_nodes_with_no_tasks( minimal_configuration: None, with_valid_time_before_termination: datetime.timedelta, @@ -1683,6 +1797,18 @@ async def test__activate_drained_nodes_with_no_tasks( mock_docker_tag_node.assert_not_called() +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["without_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_drain_nodes_labelled", + ["with_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) async def test__activate_drained_nodes_with_no_drained_nodes( minimal_configuration: None, with_valid_time_before_termination: datetime.timedelta, @@ -1724,6 +1850,18 @@ async def test__activate_drained_nodes_with_no_drained_nodes( mock_docker_tag_node.assert_not_called() +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["without_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_drain_nodes_labelled", + ["with_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) async def test__activate_drained_nodes_with_drained_node( minimal_configuration: None, with_valid_time_before_termination: datetime.timedelta, @@ -1790,3 +1928,136 @@ async def test__activate_drained_nodes_with_drained_node( }, available=True, ) + + +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["without_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_drain_nodes_labelled", + ["with_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) +async def test_warm_buffers_are_started_to_replace_missing_hot_buffers( + patch_ec2_client_launch_instances_min_number_of_instances: mock.Mock, + minimal_configuration: None, + with_instances_machines_hot_buffer: EnvVarsDict, + ec2_client: EC2Client, + initialized_app: FastAPI, + app_settings: ApplicationSettings, + ec2_instance_custom_tags: dict[str, str], + buffer_count: int, + create_buffer_machines: Callable[ + [int, InstanceTypeType, InstanceStateNameType, list[DockerGenericTag] | None], + Awaitable[list[str]], + ], + spied_cluster_analysis: MockType, + instance_type_filters: Sequence[FilterTypeDef], + mock_find_node_with_name_returns_fake_node: mock.Mock, + mock_compute_node_used_resources: mock.Mock, + mock_docker_tag_node: mock.Mock, +): + # pre-requisites + assert app_settings.AUTOSCALING_EC2_INSTANCES + assert app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER > 0 + + # we have nothing running now + all_instances = await ec2_client.describe_instances() + assert not all_instances["Reservations"] + + # have a few warm buffers ready with the same type as the hot buffer machines + buffer_machines = await create_buffer_machines( + buffer_count, + cast( + InstanceTypeType, + next( + iter(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES) + ), + ), + "stopped", + None, + ) + await assert_autoscaled_dynamic_warm_pools_ec2_instances( + ec2_client, + expected_num_reservations=1, + expected_num_instances=buffer_count, + expected_instance_type=cast( + InstanceTypeType, + next( + iter(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES) + ), + ), + expected_instance_state="stopped", + expected_additional_tag_keys=list(ec2_instance_custom_tags), + expected_pre_pulled_images=None, + instance_filters=None, + ) + + # let's autoscale, this should move the warm buffers to hot buffers + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + ) + mock_docker_tag_node.assert_not_called() + # at analysis time, we had no machines running + analyzed_cluster = assert_cluster_state( + spied_cluster_analysis, + expected_calls=1, + expected_num_machines=0, + ) + assert not analyzed_cluster.active_nodes + assert analyzed_cluster.buffer_ec2s + assert len(analyzed_cluster.buffer_ec2s) == len(buffer_machines) + + # now we should have a warm buffer moved to the hot buffer + await assert_autoscaled_dynamic_ec2_instances( + ec2_client, + expected_num_reservations=1, + expected_num_instances=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER, + expected_instance_type=cast( + InstanceTypeType, + next( + iter(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES) + ), + ), + expected_instance_state="running", + expected_additional_tag_keys=[ + *list(ec2_instance_custom_tags), + BUFFER_MACHINE_TAG_KEY, + ], + instance_filters=instance_type_filters, + expected_user_data=[], + ) + + # let's autoscale again, to check the cluster analysis and tag the nodes + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + ) + mock_docker_tag_node.assert_called() + assert ( + mock_docker_tag_node.call_count + == app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER + ) + # at analysis time, we had no machines running + analyzed_cluster = assert_cluster_state( + spied_cluster_analysis, + expected_calls=1, + expected_num_machines=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER, + ) + assert not analyzed_cluster.active_nodes + assert len(analyzed_cluster.buffer_ec2s) == max( + 0, + buffer_count + - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER, + ), ( + "the warm buffers were not used as expected there should be" + f" {buffer_count - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER} remaining, " + f"found {len(analyzed_cluster.buffer_ec2s)}" + ) + assert ( + len(analyzed_cluster.pending_ec2s) + == app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER + ) diff --git a/services/autoscaling/tests/unit/test_modules_buffer_machine_core.py b/services/autoscaling/tests/unit/test_modules_buffer_machine_core.py index 24a552f342b..26375418417 100644 --- a/services/autoscaling/tests/unit/test_modules_buffer_machine_core.py +++ b/services/autoscaling/tests/unit/test_modules_buffer_machine_core.py @@ -16,9 +16,7 @@ import pytest import tenacity -from aws_library.ec2 import AWSTagKey, EC2InstanceBootSpecific -from common_library.json_serialization import json_dumps -from faker import Faker +from aws_library.ec2 import AWSTagKey from fastapi import FastAPI from fastapi.encoders import jsonable_encoder from models_library.docker import DockerGenericTag @@ -30,68 +28,15 @@ from pytest_simcore.helpers.logging_tools import log_context from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict from simcore_service_autoscaling.constants import PRE_PULLED_IMAGES_EC2_TAG_KEY -from simcore_service_autoscaling.core.settings import ApplicationSettings from simcore_service_autoscaling.modules.auto_scaling_mode_dynamic import ( DynamicAutoscaling, ) from simcore_service_autoscaling.modules.buffer_machines_pool_core import ( monitor_buffer_machines, ) -from simcore_service_autoscaling.utils.buffer_machines_pool_core import ( - get_deactivated_buffer_ec2_tags, -) from types_aiobotocore_ec2 import EC2Client from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType -from types_aiobotocore_ec2.type_defs import FilterTypeDef, TagTypeDef - - -@pytest.fixture -def fake_pre_pull_images() -> list[DockerGenericTag]: - return TypeAdapter(list[DockerGenericTag]).validate_python( - [ - "nginx:latest", - "itisfoundation/my-very-nice-service:latest", - "simcore/services/dynamic/another-nice-one:2.4.5", - "asd", - ] - ) - - -@pytest.fixture -def ec2_instances_allowed_types_with_only_1_buffered( - faker: Faker, - fake_pre_pull_images: list[DockerGenericTag], - external_ec2_instances_allowed_types: None | dict[str, EC2InstanceBootSpecific], -) -> dict[InstanceTypeType, EC2InstanceBootSpecific]: - if not external_ec2_instances_allowed_types: - return { - "t2.micro": EC2InstanceBootSpecific( - ami_id=faker.pystr(), - pre_pull_images=fake_pre_pull_images, - buffer_count=faker.pyint(min_value=1, max_value=10), - ) - } - - allowed_ec2_types = external_ec2_instances_allowed_types - allowed_ec2_types_with_buffer_defined = dict( - filter( - lambda instance_type_and_settings: instance_type_and_settings[ - 1 - ].buffer_count - > 0, - allowed_ec2_types.items(), - ) - ) - assert ( - allowed_ec2_types_with_buffer_defined - ), "one type with buffer is needed for the tests!" - assert ( - len(allowed_ec2_types_with_buffer_defined) == 1 - ), "more than one type with buffer is disallowed in this test!" - return { - TypeAdapter(InstanceTypeType).validate_python(k): v - for k, v in allowed_ec2_types_with_buffer_defined.items() - } +from types_aiobotocore_ec2.type_defs import FilterTypeDef @pytest.fixture @@ -345,96 +290,6 @@ async def test_monitor_buffer_machines( ) -@pytest.fixture -async def create_buffer_machines( - ec2_client: EC2Client, - aws_ami_id: str, - app_settings: ApplicationSettings, - initialized_app: FastAPI, -) -> Callable[ - [int, InstanceTypeType, InstanceStateNameType, list[DockerGenericTag]], - Awaitable[list[str]], -]: - async def _do( - num: int, - instance_type: InstanceTypeType, - instance_state_name: InstanceStateNameType, - pre_pull_images: list[DockerGenericTag], - ) -> list[str]: - assert app_settings.AUTOSCALING_EC2_INSTANCES - - assert instance_state_name in [ - "running", - "stopped", - ], "only 'running' and 'stopped' are supported for testing" - - resource_tags: list[TagTypeDef] = [ - {"Key": tag_key, "Value": tag_value} - for tag_key, tag_value in get_deactivated_buffer_ec2_tags( - initialized_app, DynamicAutoscaling() - ).items() - ] - if pre_pull_images is not None and instance_state_name == "stopped": - resource_tags.append( - { - "Key": PRE_PULLED_IMAGES_EC2_TAG_KEY, - "Value": f"{json_dumps(pre_pull_images)}", - } - ) - with log_context( - logging.INFO, f"creating {num} buffer machines of {instance_type}" - ): - instances = await ec2_client.run_instances( - ImageId=aws_ami_id, - MaxCount=num, - MinCount=num, - InstanceType=instance_type, - KeyName=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME, - SecurityGroupIds=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_SECURITY_GROUP_IDS, - SubnetId=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_SUBNET_ID, - IamInstanceProfile={ - "Arn": app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ATTACHED_IAM_PROFILE - }, - TagSpecifications=[ - {"ResourceType": "instance", "Tags": resource_tags}, - {"ResourceType": "volume", "Tags": resource_tags}, - {"ResourceType": "network-interface", "Tags": resource_tags}, - ], - UserData="echo 'I am pytest'", - ) - instance_ids = [ - i["InstanceId"] for i in instances["Instances"] if "InstanceId" in i - ] - - waiter = ec2_client.get_waiter("instance_exists") - await waiter.wait(InstanceIds=instance_ids) - instances = await ec2_client.describe_instances(InstanceIds=instance_ids) - assert "Reservations" in instances - assert instances["Reservations"] - assert "Instances" in instances["Reservations"][0] - assert len(instances["Reservations"][0]["Instances"]) == num - for instance in instances["Reservations"][0]["Instances"]: - assert "State" in instance - assert "Name" in instance["State"] - assert instance["State"]["Name"] == "running" - - if instance_state_name == "stopped": - await ec2_client.stop_instances(InstanceIds=instance_ids) - instances = await ec2_client.describe_instances(InstanceIds=instance_ids) - assert "Reservations" in instances - assert instances["Reservations"] - assert "Instances" in instances["Reservations"][0] - assert len(instances["Reservations"][0]["Instances"]) == num - for instance in instances["Reservations"][0]["Instances"]: - assert "State" in instance - assert "Name" in instance["State"] - assert instance["State"]["Name"] == "stopped" - - return instance_ids - - return _do - - @dataclass class _BufferMachineParams: instance_state_name: InstanceStateNameType @@ -652,29 +507,6 @@ async def test_monitor_buffer_machines_terminates_unneeded_pool( ) -@pytest.fixture -def buffer_count( - ec2_instances_allowed_types_with_only_1_buffered: dict[ - InstanceTypeType, EC2InstanceBootSpecific - ], -) -> int: - def _by_buffer_count( - instance_type_and_settings: tuple[InstanceTypeType, EC2InstanceBootSpecific] - ) -> bool: - _, boot_specific = instance_type_and_settings - return boot_specific.buffer_count > 0 - - allowed_ec2_types = ec2_instances_allowed_types_with_only_1_buffered - allowed_ec2_types_with_buffer_defined = dict( - filter(_by_buffer_count, allowed_ec2_types.items()) - ) - assert allowed_ec2_types_with_buffer_defined, "you need one type with buffer" - assert ( - len(allowed_ec2_types_with_buffer_defined) == 1 - ), "more than one type with buffer is disallowed in this test!" - return next(iter(allowed_ec2_types_with_buffer_defined.values())).buffer_count - - @pytest.fixture def pre_pull_images( ec2_instances_allowed_types_with_only_1_buffered: dict[InstanceTypeType, Any] diff --git a/services/autoscaling/tests/unit/test_utils_auto_scaling_core.py b/services/autoscaling/tests/unit/test_utils_auto_scaling_core.py index f576292ec6b..5a5a3240057 100644 --- a/services/autoscaling/tests/unit/test_utils_auto_scaling_core.py +++ b/services/autoscaling/tests/unit/test_utils_auto_scaling_core.py @@ -323,7 +323,7 @@ def test_sort_empty_drained_nodes( def test_sort_drained_nodes( - mock_machines_buffer: int, + with_instances_machines_hot_buffer: EnvVarsDict, minimal_configuration: None, app_settings: ApplicationSettings, random_fake_available_instances: list[EC2InstanceType], @@ -332,7 +332,9 @@ def test_sort_drained_nodes( ): machine_buffer_type = get_machine_buffer_type(random_fake_available_instances) _NUM_DRAINED_NODES = 20 - _NUM_NODE_WITH_TYPE_BUFFER = 3 * mock_machines_buffer + _NUM_NODE_WITH_TYPE_BUFFER = ( + 3 * app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER + ) _NUM_NODES_TERMINATING = 13 fake_drained_nodes = [] for _ in range(_NUM_DRAINED_NODES): @@ -388,10 +390,6 @@ def test_sort_drained_nodes( app_settings, fake_drained_nodes, random_fake_available_instances ) assert app_settings.AUTOSCALING_EC2_INSTANCES - assert ( - mock_machines_buffer - == app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER - ) assert len(sorted_drained_nodes) == ( _NUM_DRAINED_NODES + _NUM_NODE_WITH_TYPE_BUFFER