Skip to content

Commit

Permalink
Merge branch 'master' into dependabot/github_actions/actions/setup-no…
Browse files Browse the repository at this point in the history
…de-3.4.1
  • Loading branch information
pcrespov authored Jul 18, 2022
2 parents c64b3e2 + 09f85c1 commit 102a3e9
Show file tree
Hide file tree
Showing 29 changed files with 1,887 additions and 25 deletions.
1 change: 1 addition & 0 deletions packages/service-library/requirements/_fastapi.in
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
async-asgi-testclient # replacement for fastapi.testclient.TestClient [see b) below]
fastapi
fastapi_contrib[jaegertracing]
httpx
uvicorn

# NOTE: What test client to use for fastapi-based apps?
Expand Down
29 changes: 24 additions & 5 deletions packages/service-library/requirements/_fastapi.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@
# pip-compile --output-file=requirements/_fastapi.txt --strip-extras requirements/_fastapi.in
#
anyio==3.6.1
# via starlette
# via
# httpcore
# starlette
async-asgi-testclient==1.4.11
# via -r requirements/_fastapi.in
certifi==2022.6.15
# via requests
# via
# httpcore
# httpx
# requests
charset-normalizer==2.0.12
# via requests
click==8.1.3
Expand All @@ -20,12 +25,21 @@ fastapi==0.76.0
# fastapi-contrib
fastapi-contrib==0.2.11
# via -r requirements/_fastapi.in
h11==0.13.0
# via uvicorn
h11==0.12.0
# via
# httpcore
# uvicorn
httpcore==0.15.0
# via httpx
httpx==0.23.0
# via
# -c requirements/./../../../requirements/constraints.txt
# -r requirements/_fastapi.in
idna==3.3
# via
# anyio
# requests
# rfc3986
jaeger-client==4.8.0
# via fastapi-contrib
multidict==6.0.2
Expand All @@ -41,10 +55,15 @@ pydantic==1.9.0
# fastapi
requests==2.28.0
# via async-asgi-testclient
rfc3986==1.5.0
# via httpx
six==1.16.0
# via thrift
sniffio==1.2.0
# via anyio
# via
# anyio
# httpcore
# httpx
starlette==0.18.0
# via fastapi
threadloop==1.0.2
Expand Down
1 change: 1 addition & 0 deletions packages/service-library/requirements/_test.in
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
--constraint _fastapi.txt

# testing
asgi_lifespan
coverage
coveralls
faker
Expand Down
6 changes: 6 additions & 0 deletions packages/service-library/requirements/_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ aiosignal==1.2.0
# via
# -c requirements/_aiohttp.txt
# aiohttp
asgi-lifespan==1.0.1
# via -r requirements/_test.in
astroid==2.11.5
# via pylint
async-timeout==4.0.2
Expand Down Expand Up @@ -203,6 +205,10 @@ six==1.16.0
# paramiko
# python-dateutil
# websocket-client
sniffio==1.2.0
# via
# -c requirements/_fastapi.txt
# asgi-lifespan
termcolor==1.1.0
# via pytest-sugar
texttable==1.6.4
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@

from aiohttp.web import Application, middleware, Request, HTTPError
from servicelib.aiohttp.typing_extension import Handler, Middleware
import logging
import traceback

from aiohttp.web import Application, HTTPError, Request, middleware
from servicelib.aiohttp.typing_extension import Handler, Middleware

logger = logging.getLogger(__name__)

_SEP = "|||"
Expand Down
1 change: 0 additions & 1 deletion packages/service-library/src/servicelib/docker_compose.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import yaml


# Notes on below env var names:
# - SIMCORE_REGISTRY will be replaced by the url of the simcore docker registry
# deployed inside the platform
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from . import client, server

__all__: tuple[str, ...] = (
"client",
"server",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
from typing import Any, Optional

from fastapi import FastAPI, status
from httpx import AsyncClient
from pydantic import AnyHttpUrl, BaseModel, PositiveFloat, parse_obj_as

from ._errors import GenericClientError, TaskClientResultError
from ._models import TaskId, TaskResult, TaskStatus


class ClientConfiguration(BaseModel):
router_prefix: str
default_timeout: PositiveFloat


class Client:
"""
This is a client that aims to simplify the requests to get the
status, result and/or cancel of a long running task.
"""

def __init__(self, app: FastAPI, async_client: AsyncClient, base_url: AnyHttpUrl):
"""
`app`: used byt the `Client` to recover the `ClientConfiguration`
`async_client`: an AsyncClient instance used by `Client`
`base_url`: base endpoint where the server is listening on
"""
self.app = app
self._async_client = async_client
self._base_url = base_url

@property
def _client_configuration(self) -> ClientConfiguration:
return self.app.state.long_running_client_configuration

def _get_url(self, path: str) -> AnyHttpUrl:
return parse_obj_as(
AnyHttpUrl,
f"{self._base_url}{self._client_configuration.router_prefix}{path}",
)

async def get_task_status(
self, task_id: TaskId, *, timeout: Optional[PositiveFloat] = None
) -> TaskStatus:
timeout = timeout or self._client_configuration.default_timeout
result = await self._async_client.get(
self._get_url(f"/task/{task_id}"),
timeout=timeout,
)
if result.status_code != status.HTTP_200_OK:
raise GenericClientError(
action="getting_status",
task_id=task_id,
status=result.status_code,
body=result.text,
)

return TaskStatus.parse_obj(result.json())

async def get_task_result(
self, task_id: TaskId, *, timeout: Optional[PositiveFloat] = None
) -> Optional[Any]:
timeout = timeout or self._client_configuration.default_timeout
result = await self._async_client.get(
self._get_url(f"/task/{task_id}/result"),
timeout=timeout,
)
if result.status_code != status.HTTP_200_OK:
raise GenericClientError(
action="getting_result",
task_id=task_id,
status=result.status_code,
body=result.text,
)

task_result = TaskResult.parse_obj(result.json())
if task_result.error is not None:
raise TaskClientResultError(message=task_result.error)
return task_result.result

async def cancel_and_delete_task(
self, task_id: TaskId, *, timeout: Optional[PositiveFloat] = None
) -> bool:
timeout = timeout or self._client_configuration.default_timeout
result = await self._async_client.delete(
self._get_url(f"/task/{task_id}"),
timeout=timeout,
)
if result.status_code != status.HTTP_200_OK:
raise GenericClientError(
action="cancelling_and_removing_task",
task_id=task_id,
status=result.status_code,
body=result.text,
)
return result.json()


def setup(
app: FastAPI,
*,
router_prefix: str = "",
http_requests_timeout: PositiveFloat = 15,
):
"""
- `router_prefix` by default it is assumed the server mounts the APIs on
`/task/...` this will assume the APIs are as following
`{router_prefix}/task/...`
- `http_requests_timeout` short requests are used to interact with the
server API, a low timeout is sufficient
"""

async def on_startup() -> None:
app.state.long_running_client_configuration = ClientConfiguration(
router_prefix=router_prefix, default_timeout=http_requests_timeout
)

app.add_event_handler("startup", on_startup)
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import asyncio
from asyncio.log import logger
from contextlib import asynccontextmanager
from typing import Any, AsyncIterator, Awaitable, Callable, Optional

from pydantic import PositiveFloat

from ._client import Client
from ._errors import TaskClientTimeoutError
from ._models import TaskId, TaskStatus


class _ProgressManager:
"""
Avoids sending duplicate progress updates.
When polling the status, the same progress messages can arrive in a row.
This allows the client to filter out the flood of messages when it subscribes
for progress updates.
"""

def __init__(
self, update_callback: Optional[Callable[[str, float], Awaitable[None]]]
) -> None:
self._callback = update_callback
self._last_message: Optional[str] = None
self._last_percent: Optional[float] = None

async def update(
self, *, message: Optional[str] = None, percent: Optional[float] = None
) -> None:
if self._callback is None:
return

has_changes = False

if message is not None and self._last_message != message:
self._last_message = message
has_changes = True
if percent is not None and self._last_percent != percent:
self._last_percent = percent
has_changes = True

if has_changes:
await self._callback(self._last_message, self._last_percent)


@asynccontextmanager
async def periodic_task_result(
client: Client,
task_id: TaskId,
*,
task_timeout: PositiveFloat,
progress_callback: Optional[Callable[[str, float], Awaitable[None]]] = None,
status_poll_interval: PositiveFloat = 5,
) -> AsyncIterator[Optional[Any]]:
"""
A convenient wrapper around the Client. Polls for results and returns them
once available.
Parameters:
- `client`: an instance of `long_running_tasks.client.Client`
- `task_timeout`: when this expires the task will be cancelled and
removed form the server
- `progress` optional: user defined awaitable with two positional arguments:
* first argument `message`, type `str`
* second argument `percent`, type `float` between [0.0, 1.0]
- `status_poll_interval` optional: when waiting for a task to finish,
how frequent should the server be queried
raises: `TaskClientResultError` if the task finished with an error instead of
the expected result
raises: `asyncio.TimeoutError` NOTE: the remote task will also be removed
"""
progress_manager = _ProgressManager(progress_callback)

async def _status_update() -> TaskStatus:
task_status = await client.get_task_status(task_id)
logger.info("Task status %s", task_status.json())
await progress_manager.update(
message=task_status.task_progress.message,
percent=task_status.task_progress.percent,
)
return task_status

async def _wait_task_completion() -> None:
task_status = await _status_update()
while not task_status.done:
await asyncio.sleep(status_poll_interval)
task_status = await _status_update()

try:
await asyncio.wait_for(_wait_task_completion(), timeout=task_timeout)

result: Optional[Any] = await client.get_task_result(task_id)
yield result
except asyncio.TimeoutError as e:
task_removed = await client.cancel_and_delete_task(task_id)
raise TaskClientTimeoutError(
task_id=task_id,
timeout=task_timeout,
exception=e,
task_removed=task_removed,
) from e
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from fastapi import Request

from ._task import TaskManager


def get_task_manager(request: Request) -> TaskManager:
return request.app.state.long_running_task_manager
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from fastapi import status
from fastapi.encoders import jsonable_encoder
from starlette.requests import Request
from starlette.responses import JSONResponse

from ._errors import BaseLongRunningError, TaskNotFoundError


async def base_long_running_error_handler(
_: Request, exception: BaseLongRunningError
) -> JSONResponse:
error_fields = dict(code=exception.code, message=f"{exception}")
status_code = (
status.HTTP_404_NOT_FOUND
if isinstance(exception, TaskNotFoundError)
else status.HTTP_400_BAD_REQUEST
)
return JSONResponse(content=jsonable_encoder(error_fields), status_code=status_code)
Loading

0 comments on commit 102a3e9

Please sign in to comment.