Skip to content

Commit

Permalink
🐛addresses blocking Thread/Process pool executors when shutting down (#…
Browse files Browse the repository at this point in the history
…2397)

* refactor, no destruction is necessary

* avoids destroying the pool and blocking

* using non blocking process pool

* replaces blocking processpool

* replaces blocking processpoolexecutor

* refactored

* using try finally to ensure cleanup

* refactored

* pylint

* fixing error

* fixing pool creation

* extracted pools module

* marked as duplicate

* refactored to use new shared code

* transformed int a generator

* fix error

* it can be waited here

* using system default threadpool

* removing unused

* removing unused

* added testing dependency

* cpython issue was casing a hang

* added regression tests for pools

* updated comments

* fix pylint

* cleanup comments

* downgraded

* fixed tests

* update codestyle

* updated non_blocking_process_pool_executor in catalog

* adds tests and more comments

* added fixme notes

Co-authored-by: Andrei Neagu <[email protected]>
  • Loading branch information
GitHK and Andrei Neagu authored Jun 24, 2021
1 parent 3ab3bf6 commit cee7479
Show file tree
Hide file tree
Showing 22 changed files with 172 additions and 54 deletions.
6 changes: 3 additions & 3 deletions packages/service-library/requirements/_base.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# This file is autogenerated by pip-compile
# This file is autogenerated by pip-compile with python 3.8
# To update, run:
#
# pip-compile --output-file=requirements/_base.txt requirements/_base.in
Expand Down Expand Up @@ -67,10 +67,10 @@ pydantic==1.8.2
# via
# -c requirements/../../../requirements/constraints.txt
# -r requirements/_base.in
pyinstrument-cext==0.2.4
# via pyinstrument
pyinstrument==3.4.2
# via -r requirements/_base.in
pyinstrument-cext==0.2.4
# via pyinstrument
pyrsistent==0.17.3
# via jsonschema
pyyaml==5.4.1
Expand Down
26 changes: 13 additions & 13 deletions packages/service-library/requirements/_test.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# This file is autogenerated by pip-compile
# This file is autogenerated by pip-compile with python 3.8
# To update, run:
#
# pip-compile --output-file=requirements/_test.txt requirements/_test.in
Expand Down Expand Up @@ -49,12 +49,12 @@ cryptography==3.4.7
# paramiko
distro==1.5.0
# via docker-compose
docker[ssh]==5.0.0
# via docker-compose
docker-compose==1.29.1
# via
# -c requirements/../../../requirements/constraints.txt
# pytest-docker
docker[ssh]==5.0.0
# via docker-compose
dockerpty==0.4.1
# via docker-compose
docopt==0.6.2
Expand All @@ -68,7 +68,7 @@ idna==2.10
# yarl
iniconfig==1.1.1
# via pytest
isort==5.9.0
isort==5.9.1
# via pylint
jsonschema==3.2.0
# via
Expand Down Expand Up @@ -107,6 +107,15 @@ pyrsistent==0.17.3
# via
# -c requirements/_base.txt
# jsonschema
pytest==6.2.4
# via
# -r requirements/_test.in
# pytest-aiohttp
# pytest-cov
# pytest-docker
# pytest-instafail
# pytest-mock
# pytest-sugar
pytest-aiohttp==0.3.0
# via -r requirements/_test.in
pytest-cov==2.12.1
Expand All @@ -121,15 +130,6 @@ pytest-runner==5.3.1
# via -r requirements/_test.in
pytest-sugar==0.9.4
# via -r requirements/_test.in
pytest==6.2.4
# via
# -r requirements/_test.in
# pytest-aiohttp
# pytest-cov
# pytest-docker
# pytest-instafail
# pytest-mock
# pytest-sugar
python-dotenv==0.18.0
# via docker-compose
pyyaml==5.4.1
Expand Down
9 changes: 6 additions & 3 deletions packages/service-library/requirements/_tools.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# This file is autogenerated by pip-compile
# This file is autogenerated by pip-compile with python 3.8
# To update, run:
#
# pip-compile --output-file=requirements/_tools.txt requirements/_tools.in
Expand All @@ -24,7 +24,7 @@ filelock==3.0.12
# via virtualenv
identify==2.2.10
# via pre-commit
isort==5.9.0
isort==5.9.1
# via
# -c requirements/_test.txt
# -r requirements/../../../requirements/devenv.txt
Expand All @@ -36,7 +36,7 @@ pathspec==0.8.1
# via black
pep517==0.10.0
# via pip-tools
pip-tools==6.1.0
pip-tools==6.2.0
# via -r requirements/../../../requirements/devenv.txt
pre-commit==2.13.0
# via -r requirements/../../../requirements/devenv.txt
Expand All @@ -61,6 +61,9 @@ toml==0.10.2
# pre-commit
virtualenv==20.4.7
# via pre-commit
wheel==0.36.2
# via pip-tools

# The following packages are considered to be unsafe in a requirements file:
# pip
# setuptools
7 changes: 4 additions & 3 deletions packages/service-library/src/servicelib/archiving_utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import asyncio
import logging
import zipfile
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
from typing import Iterator, List, Set

from servicelib.pools import non_blocking_process_pool_executor

MAX_UNARCHIVING_WORKER_COUNT = 2

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -81,7 +82,7 @@ async def unarchive_dir(
all tree leafs, which might include files or empty folders
"""
with zipfile.ZipFile(archive_to_extract, mode="r") as zip_file_handler:
with ProcessPoolExecutor(max_workers=max_workers) as pool:
with non_blocking_process_pool_executor(max_workers=max_workers) as pool:
loop = asyncio.get_event_loop()

# running in process poll is not ideal for concurrency issues
Expand Down Expand Up @@ -141,7 +142,7 @@ async def archive_dir(
dir_to_compress: Path, destination: Path, compress: bool, store_relative_path: bool
) -> bool:
"""Returns True if successuly archived"""
with ProcessPoolExecutor(max_workers=1) as pool:
with non_blocking_process_pool_executor(max_workers=1) as pool:
return await asyncio.get_event_loop().run_in_executor(
pool,
_serial_add_to_archive,
Expand Down
37 changes: 37 additions & 0 deletions packages/service-library/src/servicelib/pools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from concurrent.futures import ProcessPoolExecutor
from contextlib import contextmanager

# only gets created on use and is guaranteed to be the s
# ame for the entire lifetime of the application
__shared_process_pool_executor = {}


def get_shared_process_pool_executor(**kwargs) -> ProcessPoolExecutor:
# sometimes a pool requires a specific configuration
# the key helps to distinguish between them in the same application
key = "".join(sorted(["_".join((k, str(v))) for k, v in kwargs.items()]))

if key not in __shared_process_pool_executor:
# pylint: disable=consider-using-with
__shared_process_pool_executor[key] = ProcessPoolExecutor(**kwargs)

return __shared_process_pool_executor[key]


@contextmanager
def non_blocking_process_pool_executor(**kwargs) -> ProcessPoolExecutor:
"""
Avoids default context manger behavior which calls
shutdown with wait=True an blocks.
"""
executor = get_shared_process_pool_executor(**kwargs)
try:
yield executor
finally:
# due to an issue in cpython https://bugs.python.org/issue34073
# bypassing shutdown and using a shared pool
# remove call to get_shared_process_pool_executor and replace with
# a new instance when the issue is fixed
# FIXME: uncomment below line when the issue is fixed
# executor.shutdown(wait=False)
pass
1 change: 1 addition & 0 deletions packages/service-library/src/servicelib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import asyncio
import logging
import os

from pathlib import Path
from typing import Any, Awaitable, Coroutine, List, Optional, Union

Expand Down
8 changes: 6 additions & 2 deletions packages/service-library/tests/test_application_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import pytest
from aiohttp import web

from servicelib.application_keys import APP_CONFIG_KEY
from servicelib.application_setup import (
APP_SETUP_KEY,
Expand Down Expand Up @@ -37,7 +36,12 @@ def setup_zee(app: web.Application, arg1, kargs=55):


@app_module_setup(
"package.needs_foo", ModuleCategory.SYSTEM, depends=["package.foo",], logger=log
"package.needs_foo",
ModuleCategory.SYSTEM,
depends=[
"package.foo",
],
logger=log,
)
def setup_needs_foo(app: web.Application, arg1, kargs=55):
return True
Expand Down
4 changes: 2 additions & 2 deletions packages/service-library/tests/test_archiving_utils_extra.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def print_tree(path: Path, level=0):

@pytest.fixture
def state_dir(tmp_path) -> Path:
""" Folder with some data, representing a given state"""
"""Folder with some data, representing a given state"""
base_dir = tmp_path / "original"
base_dir.mkdir()
(base_dir / "empty").mkdir()
Expand Down Expand Up @@ -55,7 +55,7 @@ def state_dir(tmp_path) -> Path:

@pytest.fixture
def new_state_dir(tmp_path) -> Path:
""" Folder AFTER updated with new data """
"""Folder AFTER updated with new data"""
base_dir = tmp_path / "updated"
base_dir.mkdir()
(base_dir / "d1").mkdir()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import time

import pytest

from servicelib import monitor_slow_callbacks
from servicelib.aiopg_utils import (
DatabaseError,
Expand Down
1 change: 0 additions & 1 deletion packages/service-library/tests/test_incidents_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import operator

import attr

from servicelib.incidents import BaseIncident, LimitedOrderedStack


Expand Down
12 changes: 10 additions & 2 deletions packages/service-library/tests/test_openapi_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import pytest
from aiohttp import web

from servicelib import openapi
from servicelib.application_keys import APP_OPENAPI_SPECS_KEY
from servicelib.rest_middlewares import (
Expand Down Expand Up @@ -53,7 +52,16 @@ def client(loop, aiohttp_client, specs):


@pytest.mark.parametrize(
"path", ["/health", "/dict", "/envelope", "/list", "/attobj", "/string", "/number",]
"path",
[
"/health",
"/dict",
"/envelope",
"/list",
"/attobj",
"/string",
"/number",
],
)
async def test_validate_handlers(path, client, specs):
base = openapi.get_base_path(specs)
Expand Down
34 changes: 34 additions & 0 deletions packages/service-library/tests/test_pools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from asyncio import BaseEventLoop
from concurrent.futures import ProcessPoolExecutor


from servicelib.pools import non_blocking_process_pool_executor


def return_int_one() -> int:
return 1


async def test_default_thread_pool_executor(loop: BaseEventLoop) -> None:
assert await loop.run_in_executor(None, return_int_one) == 1


async def test_blocking_process_pool_executor(loop: BaseEventLoop) -> None:
assert await loop.run_in_executor(ProcessPoolExecutor(), return_int_one) == 1


async def test_non_blocking_process_pool_executor(loop: BaseEventLoop) -> None:
with non_blocking_process_pool_executor() as executor:
assert await loop.run_in_executor(executor, return_int_one) == 1


async def test_same_pool_instances() -> None:
with non_blocking_process_pool_executor() as first, non_blocking_process_pool_executor() as second:
assert first == second


async def test_different_pool_instances() -> None:
with non_blocking_process_pool_executor(
max_workers=1
) as first, non_blocking_process_pool_executor() as second:
assert first != second
1 change: 0 additions & 1 deletion packages/service-library/tests/test_rest_middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import pytest
from aiohttp import web

from servicelib import openapi
from servicelib.application_keys import APP_OPENAPI_SPECS_KEY
from servicelib.rest_middlewares import (
Expand Down
1 change: 0 additions & 1 deletion packages/service-library/tests/test_rest_routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# pylint:disable=redefined-outer-name

import pytest

from servicelib import openapi
from servicelib.rest_routing import (
create_routes_from_namespace,
Expand Down
1 change: 0 additions & 1 deletion packages/service-library/tests/test_sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# pylint:disable=redefined-outer-name

import pytest

from servicelib import openapi


Expand Down
1 change: 0 additions & 1 deletion packages/service-library/tests/tutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import attr
from aiohttp import web

from servicelib.rest_codecs import DataEncoder


Expand Down
5 changes: 3 additions & 2 deletions packages/service-library/tests/with_postgres/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import pytest
import yaml

from servicelib.aiopg_utils import DataSourceName, is_postgres_responsive

current_dir = Path(sys.argv[0] if __name__ == "__main__" else __file__).resolve().parent
Expand Down Expand Up @@ -37,6 +36,8 @@ def postgres_service(docker_services, docker_ip, docker_compose_file) -> DataSou

# Wait until service is responsive.
docker_services.wait_until_responsive(
check=lambda: is_postgres_responsive(dsn), timeout=30.0, pause=0.1,
check=lambda: is_postgres_responsive(dsn),
timeout=30.0,
pause=0.1,
)
return dsn
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import asyncio
import logging
import urllib.parse
from concurrent.futures.process import ProcessPoolExecutor
from typing import Any, Dict, List, Optional, Set, Tuple

from fastapi import APIRouter, Depends, Header, HTTPException, status
Expand All @@ -27,6 +26,7 @@
list_frontend_services,
)
from ...utils.requests_decorators import cancellable_request
from ...utils.pools import non_blocking_process_pool_executor
from ..dependencies.database import get_repository
from ..dependencies.director import DirectorApi, get_director_api

Expand Down Expand Up @@ -155,7 +155,7 @@ async def list_services(
# 3. then we compose the final service using as a base the registry service, overriding with the same
# service from the database, adding also the access rights and the owner as email address instead of gid
# NOTE: this final step runs in a process pool so that it runs asynchronously and does not block in any way
with ProcessPoolExecutor(max_workers=2) as pool:
with non_blocking_process_pool_executor(max_workers=2) as pool:
services_details = await asyncio.gather(
*[
asyncio.get_event_loop().run_in_executor(
Expand Down
Loading

0 comments on commit cee7479

Please sign in to comment.