Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛addresses blocking Thread/Process pool executors when shutting down #2397

Merged
merged 32 commits into from
Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions packages/service-library/src/servicelib/archiving_utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import asyncio
import logging
import zipfile
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
from typing import Iterator, List, Set

from servicelib.pools import non_blocking_process_pool_executor

MAX_UNARCHIVING_WORKER_COUNT = 2

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -81,7 +82,7 @@ async def unarchive_dir(
all tree leafs, which might include files or empty folders
"""
with zipfile.ZipFile(archive_to_extract, mode="r") as zip_file_handler:
with ProcessPoolExecutor(max_workers=max_workers) as pool:
with non_blocking_process_pool_executor(max_workers=max_workers) as pool:
loop = asyncio.get_event_loop()

# running in process poll is not ideal for concurrency issues
Expand Down Expand Up @@ -141,7 +142,7 @@ async def archive_dir(
dir_to_compress: Path, destination: Path, compress: bool, store_relative_path: bool
) -> bool:
"""Returns True if successuly archived"""
with ProcessPoolExecutor(max_workers=1) as pool:
with non_blocking_process_pool_executor(max_workers=1) as pool:
return await asyncio.get_event_loop().run_in_executor(
pool,
_serial_add_to_archive,
Expand Down
15 changes: 15 additions & 0 deletions packages/service-library/src/servicelib/pools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from concurrent.futures import ProcessPoolExecutor
from contextlib import contextmanager


@contextmanager
def non_blocking_process_pool_executor(**kwargs) -> ProcessPoolExecutor:
"""
Avoids default context manger behavior which calls
shutdown with wait=True an blocks.
"""
pool = ProcessPoolExecutor(**kwargs) # pylint: disable=consider-using-with
try:
yield pool
finally:
pool.shutdown(wait=False)
pcrespov marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions packages/service-library/src/servicelib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import asyncio
import logging
import os

from pathlib import Path
from typing import Any, Awaitable, Coroutine, List, Optional, Union

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import asyncio
import logging
import urllib.parse
from concurrent.futures.process import ProcessPoolExecutor
from typing import Any, Dict, List, Optional, Set, Tuple

from fastapi import APIRouter, Depends, Header, HTTPException, status
Expand All @@ -27,6 +26,7 @@
list_frontend_services,
)
from ...utils.requests_decorators import cancellable_request
from ...utils.pools import non_blocking_process_pool_executor
from ..dependencies.database import get_repository
from ..dependencies.director import DirectorApi, get_director_api

Expand Down Expand Up @@ -155,7 +155,7 @@ async def list_services(
# 3. then we compose the final service using as a base the registry service, overriding with the same
# service from the database, adding also the access rights and the owner as email address instead of gid
# NOTE: this final step runs in a process pool so that it runs asynchronously and does not block in any way
with ProcessPoolExecutor(max_workers=2) as pool:
with non_blocking_process_pool_executor(max_workers=2) as pool:
pcrespov marked this conversation as resolved.
Show resolved Hide resolved
services_details = await asyncio.gather(
*[
asyncio.get_event_loop().run_in_executor(
Expand Down
17 changes: 17 additions & 0 deletions services/catalog/src/simcore_service_catalog/utils/pools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from contextlib import contextmanager
from concurrent.futures import ProcessPoolExecutor


# because there is no shared fastapi library, this is a
# duplicate of servicelib.pools.non_blocking_process_pool_executor
@contextmanager
def non_blocking_process_pool_executor(**kwargs) -> ProcessPoolExecutor:
"""
Avoids default context manger behavior which calls
shutdown with wait=True an blocks.
"""
pool = ProcessPoolExecutor(**kwargs) # pylint: disable=consider-using-with
try:
yield pool
finally:
pool.shutdown(wait=False)
GitHK marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
""" Enables monitoring of some quantities needed for diagnostics

"""
import concurrent.futures
import logging
import time
from typing import Callable, Coroutine
Expand Down Expand Up @@ -32,14 +31,13 @@ def get_collector_registry(app: web.Application) -> CollectorRegistry:
async def metrics_handler(request: web.Request):
registry = get_collector_registry(request.app)

with concurrent.futures.ThreadPoolExecutor() as pool:
# NOTE: Cannot use ProcessPoolExecutor because registry is not pickable
result = await request.loop.run_in_executor(
pool, prometheus_client.generate_latest, registry
)
response = web.Response(body=result)
response.content_type = CONTENT_TYPE_LATEST
return response
# NOTE: Cannot use ProcessPoolExecutor because registry is not pickable
result = await request.loop.run_in_executor(
None, prometheus_client.generate_latest, registry
)
response = web.Response(body=result)
response.content_type = CONTENT_TYPE_LATEST
return response


def middleware_factory(app_name: str) -> Coroutine:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
import logging
from pathlib import Path
from typing import Optional
from concurrent.futures import ProcessPoolExecutor
from collections import deque

from aiohttp import web
from aiopg.sa.result import ResultProxy, RowProxy
from aiopg.sa.engine import SAConnection

from simcore_postgres_database.models.scicrunch_resources import scicrunch_resources
from servicelib.pools import non_blocking_process_pool_executor

from ..exceptions import ExporterException
from .formatter_v1 import FormatterV1
Expand Down Expand Up @@ -168,7 +168,7 @@ async def _write_sds_content(
)

# writing SDS structure with process pool to avoid blocking
with ProcessPoolExecutor(max_workers=1) as pool:
with non_blocking_process_pool_executor(max_workers=1) as pool:
pcrespov marked this conversation as resolved.
Show resolved Hide resolved
return await asyncio.get_event_loop().run_in_executor(
pool,
write_sds_directory_content,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

"""
import asyncio
import concurrent.futures
import logging
import textwrap
import uuid as uuidlib
Expand Down Expand Up @@ -397,10 +396,9 @@ async def __load_projects(
continue
try:

with concurrent.futures.ThreadPoolExecutor() as pool:
await asyncio.get_event_loop().run_in_executor(
pool, ProjectAtDB.from_orm, row
)
await asyncio.get_event_loop().run_in_executor(
None, ProjectAtDB.from_orm, row
pcrespov marked this conversation as resolved.
Show resolved Hide resolved
)
except ValidationError as exc:
log.warning(
"project in db with uuid [%s] failed validation, please check. error: %s",
Expand Down