diff --git a/packages/service-library/requirements/_base.txt b/packages/service-library/requirements/_base.txt index 11c3313e2ad..1afea8b847d 100644 --- a/packages/service-library/requirements/_base.txt +++ b/packages/service-library/requirements/_base.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile +# This file is autogenerated by pip-compile with python 3.8 # To update, run: # # pip-compile --output-file=requirements/_base.txt requirements/_base.in @@ -67,10 +67,10 @@ pydantic==1.8.2 # via # -c requirements/../../../requirements/constraints.txt # -r requirements/_base.in -pyinstrument-cext==0.2.4 - # via pyinstrument pyinstrument==3.4.2 # via -r requirements/_base.in +pyinstrument-cext==0.2.4 + # via pyinstrument pyrsistent==0.17.3 # via jsonschema pyyaml==5.4.1 diff --git a/packages/service-library/requirements/_test.txt b/packages/service-library/requirements/_test.txt index 7c7b662b9d0..7497b5df289 100644 --- a/packages/service-library/requirements/_test.txt +++ b/packages/service-library/requirements/_test.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile +# This file is autogenerated by pip-compile with python 3.8 # To update, run: # # pip-compile --output-file=requirements/_test.txt requirements/_test.in @@ -49,12 +49,12 @@ cryptography==3.4.7 # paramiko distro==1.5.0 # via docker-compose +docker[ssh]==5.0.0 + # via docker-compose docker-compose==1.29.1 # via # -c requirements/../../../requirements/constraints.txt # pytest-docker -docker[ssh]==5.0.0 - # via docker-compose dockerpty==0.4.1 # via docker-compose docopt==0.6.2 @@ -68,7 +68,7 @@ idna==2.10 # yarl iniconfig==1.1.1 # via pytest -isort==5.9.0 +isort==5.9.1 # via pylint jsonschema==3.2.0 # via @@ -107,6 +107,15 @@ pyrsistent==0.17.3 # via # -c requirements/_base.txt # jsonschema +pytest==6.2.4 + # via + # -r requirements/_test.in + # pytest-aiohttp + # pytest-cov + # pytest-docker + # pytest-instafail + # pytest-mock + # pytest-sugar pytest-aiohttp==0.3.0 # via -r requirements/_test.in pytest-cov==2.12.1 @@ -121,15 +130,6 @@ pytest-runner==5.3.1 # via -r requirements/_test.in pytest-sugar==0.9.4 # via -r requirements/_test.in -pytest==6.2.4 - # via - # -r requirements/_test.in - # pytest-aiohttp - # pytest-cov - # pytest-docker - # pytest-instafail - # pytest-mock - # pytest-sugar python-dotenv==0.18.0 # via docker-compose pyyaml==5.4.1 diff --git a/packages/service-library/requirements/_tools.txt b/packages/service-library/requirements/_tools.txt index eb8bd3687d7..310b19d3d60 100644 --- a/packages/service-library/requirements/_tools.txt +++ b/packages/service-library/requirements/_tools.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile +# This file is autogenerated by pip-compile with python 3.8 # To update, run: # # pip-compile --output-file=requirements/_tools.txt requirements/_tools.in @@ -24,7 +24,7 @@ filelock==3.0.12 # via virtualenv identify==2.2.10 # via pre-commit -isort==5.9.0 +isort==5.9.1 # via # -c requirements/_test.txt # -r requirements/../../../requirements/devenv.txt @@ -36,7 +36,7 @@ pathspec==0.8.1 # via black pep517==0.10.0 # via pip-tools -pip-tools==6.1.0 +pip-tools==6.2.0 # via -r requirements/../../../requirements/devenv.txt pre-commit==2.13.0 # via -r requirements/../../../requirements/devenv.txt @@ -61,6 +61,9 @@ toml==0.10.2 # pre-commit virtualenv==20.4.7 # via pre-commit +wheel==0.36.2 + # via pip-tools # The following packages are considered to be unsafe in a requirements file: # pip +# setuptools diff --git a/packages/service-library/src/servicelib/archiving_utils.py b/packages/service-library/src/servicelib/archiving_utils.py index a8c2329e54d..7fc755e7fd6 100644 --- a/packages/service-library/src/servicelib/archiving_utils.py +++ b/packages/service-library/src/servicelib/archiving_utils.py @@ -1,10 +1,11 @@ import asyncio import logging import zipfile -from concurrent.futures import ProcessPoolExecutor from pathlib import Path from typing import Iterator, List, Set +from servicelib.pools import non_blocking_process_pool_executor + MAX_UNARCHIVING_WORKER_COUNT = 2 log = logging.getLogger(__name__) @@ -81,7 +82,7 @@ async def unarchive_dir( all tree leafs, which might include files or empty folders """ with zipfile.ZipFile(archive_to_extract, mode="r") as zip_file_handler: - with ProcessPoolExecutor(max_workers=max_workers) as pool: + with non_blocking_process_pool_executor(max_workers=max_workers) as pool: loop = asyncio.get_event_loop() # running in process poll is not ideal for concurrency issues @@ -141,7 +142,7 @@ async def archive_dir( dir_to_compress: Path, destination: Path, compress: bool, store_relative_path: bool ) -> bool: """Returns True if successuly archived""" - with ProcessPoolExecutor(max_workers=1) as pool: + with non_blocking_process_pool_executor(max_workers=1) as pool: return await asyncio.get_event_loop().run_in_executor( pool, _serial_add_to_archive, diff --git a/packages/service-library/src/servicelib/pools.py b/packages/service-library/src/servicelib/pools.py new file mode 100644 index 00000000000..11b03178f91 --- /dev/null +++ b/packages/service-library/src/servicelib/pools.py @@ -0,0 +1,37 @@ +from concurrent.futures import ProcessPoolExecutor +from contextlib import contextmanager + +# only gets created on use and is guaranteed to be the s +# ame for the entire lifetime of the application +__shared_process_pool_executor = {} + + +def get_shared_process_pool_executor(**kwargs) -> ProcessPoolExecutor: + # sometimes a pool requires a specific configuration + # the key helps to distinguish between them in the same application + key = "".join(sorted(["_".join((k, str(v))) for k, v in kwargs.items()])) + + if key not in __shared_process_pool_executor: + # pylint: disable=consider-using-with + __shared_process_pool_executor[key] = ProcessPoolExecutor(**kwargs) + + return __shared_process_pool_executor[key] + + +@contextmanager +def non_blocking_process_pool_executor(**kwargs) -> ProcessPoolExecutor: + """ + Avoids default context manger behavior which calls + shutdown with wait=True an blocks. + """ + executor = get_shared_process_pool_executor(**kwargs) + try: + yield executor + finally: + # due to an issue in cpython https://bugs.python.org/issue34073 + # bypassing shutdown and using a shared pool + # remove call to get_shared_process_pool_executor and replace with + # a new instance when the issue is fixed + # FIXME: uncomment below line when the issue is fixed + # executor.shutdown(wait=False) + pass diff --git a/packages/service-library/src/servicelib/utils.py b/packages/service-library/src/servicelib/utils.py index 67c9dfa3b62..06599816792 100644 --- a/packages/service-library/src/servicelib/utils.py +++ b/packages/service-library/src/servicelib/utils.py @@ -7,6 +7,7 @@ import asyncio import logging import os + from pathlib import Path from typing import Any, Awaitable, Coroutine, List, Optional, Union diff --git a/packages/service-library/tests/test_application_setup.py b/packages/service-library/tests/test_application_setup.py index b6b360f27e1..a003074862d 100644 --- a/packages/service-library/tests/test_application_setup.py +++ b/packages/service-library/tests/test_application_setup.py @@ -7,7 +7,6 @@ import pytest from aiohttp import web - from servicelib.application_keys import APP_CONFIG_KEY from servicelib.application_setup import ( APP_SETUP_KEY, @@ -37,7 +36,12 @@ def setup_zee(app: web.Application, arg1, kargs=55): @app_module_setup( - "package.needs_foo", ModuleCategory.SYSTEM, depends=["package.foo",], logger=log + "package.needs_foo", + ModuleCategory.SYSTEM, + depends=[ + "package.foo", + ], + logger=log, ) def setup_needs_foo(app: web.Application, arg1, kargs=55): return True diff --git a/packages/service-library/tests/test_archiving_utils_extra.py b/packages/service-library/tests/test_archiving_utils_extra.py index c61abd24615..c29831056b1 100644 --- a/packages/service-library/tests/test_archiving_utils_extra.py +++ b/packages/service-library/tests/test_archiving_utils_extra.py @@ -23,7 +23,7 @@ def print_tree(path: Path, level=0): @pytest.fixture def state_dir(tmp_path) -> Path: - """ Folder with some data, representing a given state""" + """Folder with some data, representing a given state""" base_dir = tmp_path / "original" base_dir.mkdir() (base_dir / "empty").mkdir() @@ -55,7 +55,7 @@ def state_dir(tmp_path) -> Path: @pytest.fixture def new_state_dir(tmp_path) -> Path: - """ Folder AFTER updated with new data """ + """Folder AFTER updated with new data""" base_dir = tmp_path / "updated" base_dir.mkdir() (base_dir / "d1").mkdir() diff --git a/packages/service-library/tests/test_incidents_monitoring.py b/packages/service-library/tests/test_incidents_monitoring.py index 3cbc39004f8..f7986cb0972 100644 --- a/packages/service-library/tests/test_incidents_monitoring.py +++ b/packages/service-library/tests/test_incidents_monitoring.py @@ -6,7 +6,6 @@ import time import pytest - from servicelib import monitor_slow_callbacks from servicelib.aiopg_utils import ( DatabaseError, diff --git a/packages/service-library/tests/test_incidents_utils.py b/packages/service-library/tests/test_incidents_utils.py index ef63d598ae0..e655d9bf435 100644 --- a/packages/service-library/tests/test_incidents_utils.py +++ b/packages/service-library/tests/test_incidents_utils.py @@ -5,7 +5,6 @@ import operator import attr - from servicelib.incidents import BaseIncident, LimitedOrderedStack diff --git a/packages/service-library/tests/test_openapi_validation.py b/packages/service-library/tests/test_openapi_validation.py index 706df603309..3be32c56c71 100644 --- a/packages/service-library/tests/test_openapi_validation.py +++ b/packages/service-library/tests/test_openapi_validation.py @@ -9,7 +9,6 @@ import pytest from aiohttp import web - from servicelib import openapi from servicelib.application_keys import APP_OPENAPI_SPECS_KEY from servicelib.rest_middlewares import ( @@ -53,7 +52,16 @@ def client(loop, aiohttp_client, specs): @pytest.mark.parametrize( - "path", ["/health", "/dict", "/envelope", "/list", "/attobj", "/string", "/number",] + "path", + [ + "/health", + "/dict", + "/envelope", + "/list", + "/attobj", + "/string", + "/number", + ], ) async def test_validate_handlers(path, client, specs): base = openapi.get_base_path(specs) diff --git a/packages/service-library/tests/test_pools.py b/packages/service-library/tests/test_pools.py new file mode 100644 index 00000000000..f3e80917b94 --- /dev/null +++ b/packages/service-library/tests/test_pools.py @@ -0,0 +1,34 @@ +from asyncio import BaseEventLoop +from concurrent.futures import ProcessPoolExecutor + + +from servicelib.pools import non_blocking_process_pool_executor + + +def return_int_one() -> int: + return 1 + + +async def test_default_thread_pool_executor(loop: BaseEventLoop) -> None: + assert await loop.run_in_executor(None, return_int_one) == 1 + + +async def test_blocking_process_pool_executor(loop: BaseEventLoop) -> None: + assert await loop.run_in_executor(ProcessPoolExecutor(), return_int_one) == 1 + + +async def test_non_blocking_process_pool_executor(loop: BaseEventLoop) -> None: + with non_blocking_process_pool_executor() as executor: + assert await loop.run_in_executor(executor, return_int_one) == 1 + + +async def test_same_pool_instances() -> None: + with non_blocking_process_pool_executor() as first, non_blocking_process_pool_executor() as second: + assert first == second + + +async def test_different_pool_instances() -> None: + with non_blocking_process_pool_executor( + max_workers=1 + ) as first, non_blocking_process_pool_executor() as second: + assert first != second diff --git a/packages/service-library/tests/test_rest_middlewares.py b/packages/service-library/tests/test_rest_middlewares.py index 4e33964e72c..c586e3e3567 100644 --- a/packages/service-library/tests/test_rest_middlewares.py +++ b/packages/service-library/tests/test_rest_middlewares.py @@ -4,7 +4,6 @@ import pytest from aiohttp import web - from servicelib import openapi from servicelib.application_keys import APP_OPENAPI_SPECS_KEY from servicelib.rest_middlewares import ( diff --git a/packages/service-library/tests/test_rest_routing.py b/packages/service-library/tests/test_rest_routing.py index 3d0b163a1de..4719611fe9b 100644 --- a/packages/service-library/tests/test_rest_routing.py +++ b/packages/service-library/tests/test_rest_routing.py @@ -3,7 +3,6 @@ # pylint:disable=redefined-outer-name import pytest - from servicelib import openapi from servicelib.rest_routing import ( create_routes_from_namespace, diff --git a/packages/service-library/tests/test_sandbox.py b/packages/service-library/tests/test_sandbox.py index 734bb477e80..a544feae992 100644 --- a/packages/service-library/tests/test_sandbox.py +++ b/packages/service-library/tests/test_sandbox.py @@ -3,7 +3,6 @@ # pylint:disable=redefined-outer-name import pytest - from servicelib import openapi diff --git a/packages/service-library/tests/tutils.py b/packages/service-library/tests/tutils.py index bce55348625..29a2165e2d4 100644 --- a/packages/service-library/tests/tutils.py +++ b/packages/service-library/tests/tutils.py @@ -6,7 +6,6 @@ import attr from aiohttp import web - from servicelib.rest_codecs import DataEncoder diff --git a/packages/service-library/tests/with_postgres/conftest.py b/packages/service-library/tests/with_postgres/conftest.py index 86ef3646067..0b485ef9ccb 100644 --- a/packages/service-library/tests/with_postgres/conftest.py +++ b/packages/service-library/tests/with_postgres/conftest.py @@ -6,7 +6,6 @@ import pytest import yaml - from servicelib.aiopg_utils import DataSourceName, is_postgres_responsive current_dir = Path(sys.argv[0] if __name__ == "__main__" else __file__).resolve().parent @@ -37,6 +36,8 @@ def postgres_service(docker_services, docker_ip, docker_compose_file) -> DataSou # Wait until service is responsive. docker_services.wait_until_responsive( - check=lambda: is_postgres_responsive(dsn), timeout=30.0, pause=0.1, + check=lambda: is_postgres_responsive(dsn), + timeout=30.0, + pause=0.1, ) return dsn diff --git a/services/catalog/src/simcore_service_catalog/api/routes/services.py b/services/catalog/src/simcore_service_catalog/api/routes/services.py index c5f0c2ae48b..be7f60a0bc3 100644 --- a/services/catalog/src/simcore_service_catalog/api/routes/services.py +++ b/services/catalog/src/simcore_service_catalog/api/routes/services.py @@ -3,7 +3,6 @@ import asyncio import logging import urllib.parse -from concurrent.futures.process import ProcessPoolExecutor from typing import Any, Dict, List, Optional, Set, Tuple from fastapi import APIRouter, Depends, Header, HTTPException, status @@ -27,6 +26,7 @@ list_frontend_services, ) from ...utils.requests_decorators import cancellable_request +from ...utils.pools import non_blocking_process_pool_executor from ..dependencies.database import get_repository from ..dependencies.director import DirectorApi, get_director_api @@ -155,7 +155,7 @@ async def list_services( # 3. then we compose the final service using as a base the registry service, overriding with the same # service from the database, adding also the access rights and the owner as email address instead of gid # NOTE: this final step runs in a process pool so that it runs asynchronously and does not block in any way - with ProcessPoolExecutor(max_workers=2) as pool: + with non_blocking_process_pool_executor(max_workers=2) as pool: services_details = await asyncio.gather( *[ asyncio.get_event_loop().run_in_executor( diff --git a/services/catalog/src/simcore_service_catalog/utils/pools.py b/services/catalog/src/simcore_service_catalog/utils/pools.py new file mode 100644 index 00000000000..479c81ad544 --- /dev/null +++ b/services/catalog/src/simcore_service_catalog/utils/pools.py @@ -0,0 +1,39 @@ +from concurrent.futures import ProcessPoolExecutor +from contextlib import contextmanager + +# only gets created on use and is guaranteed to be the s +# ame for the entire lifetime of the application +__shared_process_pool_executor = {} + + +def get_shared_process_pool_executor(**kwargs) -> ProcessPoolExecutor: + # sometimes a pool requires a specific configuration + # the key helps to distinguish between them in the same application + key = "".join(sorted(["_".join((k, str(v))) for k, v in kwargs.items()])) + + if key not in __shared_process_pool_executor: + # pylint: disable=consider-using-with + __shared_process_pool_executor[key] = ProcessPoolExecutor(**kwargs) + + return __shared_process_pool_executor[key] + + +# because there is no shared fastapi library, this is a +# duplicate of servicelib.pools.non_blocking_process_pool_executor +@contextmanager +def non_blocking_process_pool_executor(**kwargs) -> ProcessPoolExecutor: + """ + Avoids default context manger behavior which calls + shutdown with wait=True an blocks. + """ + executor = get_shared_process_pool_executor(**kwargs) + try: + yield executor + finally: + # due to an issue in cpython https://bugs.python.org/issue34073 + # bypassing shutdown and using a shared pool + # remove call to get_shared_process_pool_executor and replace with + # a new instance when the issue is fixed + # FIXME: uncomment below line when the issue is fixed + # executor.shutdown(wait=False) + pass diff --git a/services/web/server/src/simcore_service_webserver/diagnostics_monitoring.py b/services/web/server/src/simcore_service_webserver/diagnostics_monitoring.py index 9670016b9e3..1c859a27024 100644 --- a/services/web/server/src/simcore_service_webserver/diagnostics_monitoring.py +++ b/services/web/server/src/simcore_service_webserver/diagnostics_monitoring.py @@ -1,7 +1,6 @@ """ Enables monitoring of some quantities needed for diagnostics """ -import concurrent.futures import logging import time from typing import Callable, Coroutine @@ -32,14 +31,13 @@ def get_collector_registry(app: web.Application) -> CollectorRegistry: async def metrics_handler(request: web.Request): registry = get_collector_registry(request.app) - with concurrent.futures.ThreadPoolExecutor() as pool: - # NOTE: Cannot use ProcessPoolExecutor because registry is not pickable - result = await request.loop.run_in_executor( - pool, prometheus_client.generate_latest, registry - ) - response = web.Response(body=result) - response.content_type = CONTENT_TYPE_LATEST - return response + # NOTE: Cannot use ProcessPoolExecutor because registry is not pickable + result = await request.loop.run_in_executor( + None, prometheus_client.generate_latest, registry + ) + response = web.Response(body=result) + response.content_type = CONTENT_TYPE_LATEST + return response def middleware_factory(app_name: str) -> Coroutine: diff --git a/services/web/server/src/simcore_service_webserver/exporter/formatters/formatter_v2.py b/services/web/server/src/simcore_service_webserver/exporter/formatters/formatter_v2.py index 3012fbdd3e2..ef4e51b9416 100644 --- a/services/web/server/src/simcore_service_webserver/exporter/formatters/formatter_v2.py +++ b/services/web/server/src/simcore_service_webserver/exporter/formatters/formatter_v2.py @@ -2,7 +2,6 @@ import logging from pathlib import Path from typing import Optional -from concurrent.futures import ProcessPoolExecutor from collections import deque from aiohttp import web @@ -10,6 +9,7 @@ from aiopg.sa.engine import SAConnection from simcore_postgres_database.models.scicrunch_resources import scicrunch_resources +from servicelib.pools import non_blocking_process_pool_executor from ..exceptions import ExporterException from .formatter_v1 import FormatterV1 @@ -168,7 +168,7 @@ async def _write_sds_content( ) # writing SDS structure with process pool to avoid blocking - with ProcessPoolExecutor(max_workers=1) as pool: + with non_blocking_process_pool_executor(max_workers=1) as pool: return await asyncio.get_event_loop().run_in_executor( pool, write_sds_directory_content, diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_db.py b/services/web/server/src/simcore_service_webserver/projects/projects_db.py index e6dab821184..bc92187cdfd 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_db.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_db.py @@ -5,7 +5,6 @@ """ import asyncio -import concurrent.futures import logging import textwrap import uuid as uuidlib @@ -397,10 +396,9 @@ async def __load_projects( continue try: - with concurrent.futures.ThreadPoolExecutor() as pool: - await asyncio.get_event_loop().run_in_executor( - pool, ProjectAtDB.from_orm, row - ) + await asyncio.get_event_loop().run_in_executor( + None, ProjectAtDB.from_orm, row + ) except ValidationError as exc: log.warning( "project in db with uuid [%s] failed validation, please check. error: %s",