Skip to content

Commit

Permalink
Cancel services request (#2302)
Browse files Browse the repository at this point in the history
* raise 503 if catalog timesout
* added requests decorator
* listing services is cancellable now
  • Loading branch information
sanderegg authored Apr 30, 2021
1 parent ce98441 commit 62c7003
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 23 deletions.
1 change: 0 additions & 1 deletion services/catalog/src/simcore_service_catalog/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import uvicorn
from fastapi import FastAPI

from simcore_service_catalog.core.application import init_app
from simcore_service_catalog.core.settings import AppSettings, BootModeEnum

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
)
from pydantic import ValidationError, constr
from pydantic.types import PositiveInt
from starlette.requests import Request

from ...db.repositories.groups import GroupsRepository
from ...db.repositories.services import ServicesRepository
Expand All @@ -23,6 +24,7 @@
is_frontend_service,
list_frontend_services,
)
from ...utils.requests_decorators import cancellable_request
from ..dependencies.database import get_repository
from ..dependencies.director import DirectorApi, get_director_api

Expand All @@ -43,7 +45,9 @@


@router.get("", response_model=List[ServiceOut], **RESPONSE_MODEL_POLICY)
@cancellable_request
async def list_services(
request: Request, # pylint:disable=unused-argument
user_id: PositiveInt,
details: Optional[bool] = True,
director_client: DirectorApi = Depends(get_director_api),
Expand All @@ -52,7 +56,6 @@ async def list_services(
x_simcore_products_name: str = Header(...),
):
# FIXME: too many DB calls

# Access layer
user_groups = await groups_repository.list_user_groups(user_id)
if not user_groups:
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import asyncio
import logging
from asyncio import CancelledError
from contextlib import suppress
from functools import wraps
from typing import Any, Callable, Coroutine

from fastapi import Request, Response

logger = logging.getLogger(__name__)

_DEFAULT_CHECK_INTERVAL_S: float = 0.5


async def _cancel_task_if_client_disconnected(
request: Request, task: asyncio.Task, interval: float = _DEFAULT_CHECK_INTERVAL_S
) -> bool:
with suppress(CancelledError):
while True:
if await request.is_disconnected():
logger.warning("client %s disconnected!", request.client)
task.cancel()
break
await asyncio.sleep(interval)


def cancellable_request(handler: Callable[[Any], Coroutine[Any, Any, Response]]):
"""this decorator periodically checks if the client disconnected and then will cancel the request and return a 499 code (a la nginx)."""

@wraps(handler)
async def decorator(request: Request, *args, **kwargs) -> Response:
handler_task = asyncio.get_event_loop().create_task(
handler(request, *args, **kwargs)
)
auto_cancel_task = asyncio.get_event_loop().create_task(
_cancel_task_if_client_disconnected(request, handler_task)
)
try:
return await handler_task
except CancelledError:
logger.warning(
"request %s was cancelled by client %s!", request.url, request.client
)
return Response("Oh No!", status_code=499)
finally:
auto_cancel_task.cancel()

return decorator
3 changes: 2 additions & 1 deletion services/catalog/tests/integration/test_none.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

# added as minimal integration tests


def test_mock():
assert True
3 changes: 1 addition & 2 deletions services/catalog/tests/unit/with_dbs/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
import pytest
import sqlalchemy as sa
from fastapi import FastAPI
from starlette.testclient import TestClient

from simcore_service_catalog.api.dependencies.director import get_director_api
from simcore_service_catalog.core.application import init_app
from starlette.testclient import TestClient

current_dir = Path(sys.argv[0] if __name__ == "__main__" else __file__).resolve().parent

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# pylint:disable=redefined-outer-name
# pylint:disable=protected-access
# pylint:disable=not-an-iterable
# pylint:disable=no-value-for-parameter

import asyncio
from datetime import datetime
Expand Down
62 changes: 44 additions & 18 deletions services/web/server/src/simcore_service_webserver/catalog_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
""" Requests to catalog service API
"""
import asyncio
import logging
import urllib.parse
from typing import Any, Dict, List, Optional
Expand Down Expand Up @@ -72,7 +73,9 @@ async def make_request_and_envelope_response(

try:

async with session.request(method, url, headers=headers, data=data) as resp:
async with session.request(
method, url, headers=headers, data=data, timeout=ClientTimeout(total=0.01)
) as resp:
payload = await resp.json()

try:
Expand All @@ -86,7 +89,7 @@ async def make_request_and_envelope_response(

return web.json_response(resp_data, status=resp.status)

except (TimeoutError, ClientConnectionError, ClientResponseError) as err:
except (asyncio.TimeoutError, ClientConnectionError, ClientResponseError) as err:
logger.warning(
"Catalog service errors upon request %s %s: %s", method, url.relative(), err
)
Expand All @@ -107,14 +110,23 @@ async def get_services_for_user_in_product(
.with_query({"user_id": user_id, "details": f"{not only_key_versions}"})
)
session = get_client_session(app)
async with session.get(url, headers={X_PRODUCT_NAME_HEADER: product_name}) as resp:
if resp.status >= 400:
logger.warning(
"Error while retrieving services for user %s. Returning an empty list",
user_id,
)
return []
return await resp.json()
try:
async with session.get(
url,
headers={X_PRODUCT_NAME_HEADER: product_name},
) as resp:
if resp.status >= 400:
logger.warning(
"Error while retrieving services for user %s. Returning an empty list",
user_id,
)
return []
return await resp.json()
except asyncio.TimeoutError as err:
logger.warning("Catalog service connection timeout error")
raise web.HTTPServiceUnavailable(
reason="catalog is currently unavailable"
) from err


async def get_service(
Expand All @@ -137,9 +149,17 @@ async def get_service(
)
)
session = get_client_session(app)
async with session.get(url, headers={X_PRODUCT_NAME_HEADER: product_name}) as resp:
resp.raise_for_status() # FIXME: error handling for session and response exceptions
return await resp.json()
try:
async with session.get(
url, headers={X_PRODUCT_NAME_HEADER: product_name}
) as resp:
resp.raise_for_status() # FIXME: error handling for session and response exceptions
return await resp.json()
except asyncio.TimeoutError as err:
logger.warning("Catalog service connection timeout error")
raise web.HTTPServiceUnavailable(
reason="catalog is currently unavailable"
) from err


async def update_service(
Expand All @@ -163,8 +183,14 @@ async def update_service(
)
)
session = get_client_session(app)
async with session.patch(
url, headers={X_PRODUCT_NAME_HEADER: product_name}, json=update_data
) as resp:
resp.raise_for_status() # FIXME: error handling for session and response exceptions
return await resp.json()
try:
async with session.patch(
url, headers={X_PRODUCT_NAME_HEADER: product_name}, json=update_data
) as resp:
resp.raise_for_status() # FIXME: error handling for session and response exceptions
return await resp.json()
except asyncio.TimeoutError as err:
logger.warning("Catalog service connection timeout error")
raise web.HTTPServiceUnavailable(
reason="catalog is currently unavailable"
) from err

0 comments on commit 62c7003

Please sign in to comment.