Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛addresses blocking Thread/Process pool executors when shutting down #2397

Merged
merged 32 commits into from
Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions packages/service-library/requirements/_base.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# This file is autogenerated by pip-compile
# This file is autogenerated by pip-compile with python 3.8
# To update, run:
#
# pip-compile --output-file=requirements/_base.txt requirements/_base.in
Expand Down Expand Up @@ -67,10 +67,10 @@ pydantic==1.8.2
# via
# -c requirements/../../../requirements/constraints.txt
# -r requirements/_base.in
pyinstrument-cext==0.2.4
# via pyinstrument
pyinstrument==3.4.2
# via -r requirements/_base.in
pyinstrument-cext==0.2.4
# via pyinstrument
pyrsistent==0.17.3
# via jsonschema
pyyaml==5.4.1
Expand Down
26 changes: 13 additions & 13 deletions packages/service-library/requirements/_test.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# This file is autogenerated by pip-compile
# This file is autogenerated by pip-compile with python 3.8
# To update, run:
#
# pip-compile --output-file=requirements/_test.txt requirements/_test.in
Expand Down Expand Up @@ -49,12 +49,12 @@ cryptography==3.4.7
# paramiko
distro==1.5.0
# via docker-compose
docker[ssh]==5.0.0
# via docker-compose
docker-compose==1.29.1
# via
# -c requirements/../../../requirements/constraints.txt
# pytest-docker
docker[ssh]==5.0.0
# via docker-compose
dockerpty==0.4.1
# via docker-compose
docopt==0.6.2
Expand All @@ -68,7 +68,7 @@ idna==2.10
# yarl
iniconfig==1.1.1
# via pytest
isort==5.9.0
isort==5.9.1
# via pylint
jsonschema==3.2.0
# via
Expand Down Expand Up @@ -107,6 +107,15 @@ pyrsistent==0.17.3
# via
# -c requirements/_base.txt
# jsonschema
pytest==6.2.4
# via
# -r requirements/_test.in
# pytest-aiohttp
# pytest-cov
# pytest-docker
# pytest-instafail
# pytest-mock
# pytest-sugar
pytest-aiohttp==0.3.0
# via -r requirements/_test.in
pytest-cov==2.12.1
Expand All @@ -121,15 +130,6 @@ pytest-runner==5.3.1
# via -r requirements/_test.in
pytest-sugar==0.9.4
# via -r requirements/_test.in
pytest==6.2.4
# via
# -r requirements/_test.in
# pytest-aiohttp
# pytest-cov
# pytest-docker
# pytest-instafail
# pytest-mock
# pytest-sugar
python-dotenv==0.18.0
# via docker-compose
pyyaml==5.4.1
Expand Down
9 changes: 6 additions & 3 deletions packages/service-library/requirements/_tools.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# This file is autogenerated by pip-compile
# This file is autogenerated by pip-compile with python 3.8
# To update, run:
#
# pip-compile --output-file=requirements/_tools.txt requirements/_tools.in
Expand All @@ -24,7 +24,7 @@ filelock==3.0.12
# via virtualenv
identify==2.2.10
# via pre-commit
isort==5.9.0
isort==5.9.1
# via
# -c requirements/_test.txt
# -r requirements/../../../requirements/devenv.txt
Expand All @@ -36,7 +36,7 @@ pathspec==0.8.1
# via black
pep517==0.10.0
# via pip-tools
pip-tools==6.1.0
pip-tools==6.2.0
# via -r requirements/../../../requirements/devenv.txt
pre-commit==2.13.0
# via -r requirements/../../../requirements/devenv.txt
Expand All @@ -61,6 +61,9 @@ toml==0.10.2
# pre-commit
virtualenv==20.4.7
# via pre-commit
wheel==0.36.2
# via pip-tools

# The following packages are considered to be unsafe in a requirements file:
# pip
# setuptools
7 changes: 4 additions & 3 deletions packages/service-library/src/servicelib/archiving_utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import asyncio
import logging
import zipfile
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
from typing import Iterator, List, Set

from servicelib.pools import non_blocking_process_pool_executor

MAX_UNARCHIVING_WORKER_COUNT = 2

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -81,7 +82,7 @@ async def unarchive_dir(
all tree leafs, which might include files or empty folders
"""
with zipfile.ZipFile(archive_to_extract, mode="r") as zip_file_handler:
with ProcessPoolExecutor(max_workers=max_workers) as pool:
with non_blocking_process_pool_executor(max_workers=max_workers) as pool:
loop = asyncio.get_event_loop()

# running in process poll is not ideal for concurrency issues
Expand Down Expand Up @@ -141,7 +142,7 @@ async def archive_dir(
dir_to_compress: Path, destination: Path, compress: bool, store_relative_path: bool
) -> bool:
"""Returns True if successuly archived"""
with ProcessPoolExecutor(max_workers=1) as pool:
with non_blocking_process_pool_executor(max_workers=1) as pool:
return await asyncio.get_event_loop().run_in_executor(
pool,
_serial_add_to_archive,
Expand Down
36 changes: 36 additions & 0 deletions packages/service-library/src/servicelib/pools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from concurrent.futures import ProcessPoolExecutor
from contextlib import contextmanager

# only gets created on use and is guaranteed to be the s
# ame for the entire lifetime of the application
__shared_process_pool_executor = {}


def get_shared_process_pool_executor(**kwargs) -> ProcessPoolExecutor:
# sometimes a pool requires a specific configuration
# the key helps to distinguish between them in the same application
key = "".join(sorted(["_".join((k, str(v))) for k, v in kwargs.items()]))

if key not in __shared_process_pool_executor:
# pylint: disable=consider-using-with
__shared_process_pool_executor[key] = ProcessPoolExecutor(**kwargs)

return __shared_process_pool_executor[key]


@contextmanager
def non_blocking_process_pool_executor(**kwargs) -> ProcessPoolExecutor:
"""
Avoids default context manger behavior which calls
shutdown with wait=True an blocks.
"""
executor = get_shared_process_pool_executor(**kwargs)
try:
yield executor
finally:
# due to an issue in cpython https://bugs.python.org/issue34073
# bypassing shutdown and using a shared pool
# remove call to get_shared_process_pool_executor and replace with
# a new instance when the issue is fixed
# executor.shutdown(wait=False)
pass
pcrespov marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions packages/service-library/src/servicelib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import asyncio
import logging
import os

from pathlib import Path
from typing import Any, Awaitable, Coroutine, List, Optional, Union

Expand Down
8 changes: 6 additions & 2 deletions packages/service-library/tests/test_application_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import pytest
from aiohttp import web

from servicelib.application_keys import APP_CONFIG_KEY
from servicelib.application_setup import (
APP_SETUP_KEY,
Expand Down Expand Up @@ -37,7 +36,12 @@ def setup_zee(app: web.Application, arg1, kargs=55):


@app_module_setup(
"package.needs_foo", ModuleCategory.SYSTEM, depends=["package.foo",], logger=log
"package.needs_foo",
ModuleCategory.SYSTEM,
depends=[
"package.foo",
],
logger=log,
)
def setup_needs_foo(app: web.Application, arg1, kargs=55):
return True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def print_tree(path: Path, level=0):

@pytest.fixture
def state_dir(tmp_path) -> Path:
""" Folder with some data, representing a given state"""
"""Folder with some data, representing a given state"""
base_dir = tmp_path / "original"
base_dir.mkdir()
(base_dir / "empty").mkdir()
Expand Down Expand Up @@ -55,7 +55,7 @@ def state_dir(tmp_path) -> Path:

@pytest.fixture
def new_state_dir(tmp_path) -> Path:
""" Folder AFTER updated with new data """
"""Folder AFTER updated with new data"""
base_dir = tmp_path / "updated"
base_dir.mkdir()
(base_dir / "d1").mkdir()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import time

import pytest

from servicelib import monitor_slow_callbacks
from servicelib.aiopg_utils import (
DatabaseError,
Expand Down
1 change: 0 additions & 1 deletion packages/service-library/tests/test_incidents_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import operator

import attr

from servicelib.incidents import BaseIncident, LimitedOrderedStack


Expand Down
12 changes: 10 additions & 2 deletions packages/service-library/tests/test_openapi_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import pytest
from aiohttp import web

from servicelib import openapi
from servicelib.application_keys import APP_OPENAPI_SPECS_KEY
from servicelib.rest_middlewares import (
Expand Down Expand Up @@ -53,7 +52,16 @@ def client(loop, aiohttp_client, specs):


@pytest.mark.parametrize(
"path", ["/health", "/dict", "/envelope", "/list", "/attobj", "/string", "/number",]
"path",
[
"/health",
"/dict",
"/envelope",
"/list",
"/attobj",
"/string",
"/number",
],
)
async def test_validate_handlers(path, client, specs):
base = openapi.get_base_path(specs)
Expand Down
34 changes: 34 additions & 0 deletions packages/service-library/tests/test_pools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from asyncio import BaseEventLoop
from concurrent.futures import ProcessPoolExecutor


from servicelib.pools import non_blocking_process_pool_executor

GitHK marked this conversation as resolved.
Show resolved Hide resolved

def return_int_one() -> int:
return 1


async def test_default_thread_pool_executor(loop: BaseEventLoop) -> None:
assert await loop.run_in_executor(None, return_int_one) == 1


async def test_blocking_process_pool_executor(loop: BaseEventLoop) -> None:
assert await loop.run_in_executor(ProcessPoolExecutor(), return_int_one) == 1


async def test_non_blocking_process_pool_executor(loop: BaseEventLoop) -> None:
with non_blocking_process_pool_executor() as executor:
assert await loop.run_in_executor(executor, return_int_one) == 1


async def test_same_pool_instances() -> None:
with non_blocking_process_pool_executor() as first, non_blocking_process_pool_executor() as second:
assert first == second


async def test_different_pool_instances() -> None:
with non_blocking_process_pool_executor(
max_workers=1
) as first, non_blocking_process_pool_executor() as second:
assert first != second
1 change: 0 additions & 1 deletion packages/service-library/tests/test_rest_middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import pytest
from aiohttp import web

from servicelib import openapi
from servicelib.application_keys import APP_OPENAPI_SPECS_KEY
from servicelib.rest_middlewares import (
Expand Down
1 change: 0 additions & 1 deletion packages/service-library/tests/test_rest_routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# pylint:disable=redefined-outer-name

import pytest

from servicelib import openapi
from servicelib.rest_routing import (
create_routes_from_namespace,
Expand Down
1 change: 0 additions & 1 deletion packages/service-library/tests/test_sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# pylint:disable=redefined-outer-name

import pytest

from servicelib import openapi


Expand Down
1 change: 0 additions & 1 deletion packages/service-library/tests/tutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import attr
from aiohttp import web

from servicelib.rest_codecs import DataEncoder


Expand Down
5 changes: 3 additions & 2 deletions packages/service-library/tests/with_postgres/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import pytest
import yaml

from servicelib.aiopg_utils import DataSourceName, is_postgres_responsive

current_dir = Path(sys.argv[0] if __name__ == "__main__" else __file__).resolve().parent
Expand Down Expand Up @@ -37,6 +36,8 @@ def postgres_service(docker_services, docker_ip, docker_compose_file) -> DataSou

# Wait until service is responsive.
docker_services.wait_until_responsive(
check=lambda: is_postgres_responsive(dsn), timeout=30.0, pause=0.1,
check=lambda: is_postgres_responsive(dsn),
timeout=30.0,
pause=0.1,
)
return dsn
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import asyncio
import logging
import urllib.parse
from concurrent.futures.process import ProcessPoolExecutor
from typing import Any, Dict, List, Optional, Set, Tuple

from fastapi import APIRouter, Depends, Header, HTTPException, status
Expand All @@ -27,6 +26,7 @@
list_frontend_services,
)
from ...utils.requests_decorators import cancellable_request
from ...utils.pools import non_blocking_process_pool_executor
from ..dependencies.database import get_repository
from ..dependencies.director import DirectorApi, get_director_api

Expand Down Expand Up @@ -155,7 +155,7 @@ async def list_services(
# 3. then we compose the final service using as a base the registry service, overriding with the same
# service from the database, adding also the access rights and the owner as email address instead of gid
# NOTE: this final step runs in a process pool so that it runs asynchronously and does not block in any way
with ProcessPoolExecutor(max_workers=2) as pool:
with non_blocking_process_pool_executor(max_workers=2) as pool:
pcrespov marked this conversation as resolved.
Show resolved Hide resolved
services_details = await asyncio.gather(
*[
asyncio.get_event_loop().run_in_executor(
Expand Down
Loading