Skip to content

Commit

Permalink
feat($asyncio): create demo for asyncio
Browse files Browse the repository at this point in the history
  • Loading branch information
johnnymillergh committed May 13, 2023
1 parent ba1d64e commit b6a426e
Show file tree
Hide file tree
Showing 10 changed files with 257 additions and 49 deletions.
2 changes: 2 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ pytest-cov = "==4.0.0"
pytest-html = "==3.2.0"
# pytest xdist plugin for distributed testing and loop-on-failing modes. https://github.com/pytest-dev/pytest-xdist/
pytest-xdist = "==3.2.1"
# Pytest support for asyncio. https://github.com/pytest-dev/pytest-asyncio
pytest-asyncio = "==0.21.0"
# Call stack profiler for Python. Shows you why your code is slow! https://github.com/joerick/pyinstrument
pyinstrument = "==4.4.0"
# Pytest plugin for analyzing resource usage during test sessions. https://github.com/CFMTech/pytest-monitor
Expand Down
44 changes: 26 additions & 18 deletions Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 55 additions & 6 deletions python_boilerplate/common/asynchronization.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,41 @@
import asyncio
import functools
import inspect
from asyncio import Task
from concurrent.futures import Future
from typing import Any, Callable, TypeVar

from loguru import logger

from python_boilerplate.configuration.thread_pool_configuration import (
done_callback,
executor,
)
from python_boilerplate.configuration.thread_pool_configuration import executor


def done_callback(future: Future[Any] | Task[Any]) -> None:
"""
The default callback for Future once it's done. This function must be called after submitting a Future, to prevent
the ThreadPoolExecutor swallows exception in other threads.
https://stackoverflow.com/questions/15359295/python-thread-pool-that-handles-exceptions
https://stackoverflow.com/a/66993893
:param future: an asynchronous computation
"""
logger.debug(
f"The worker has done its Future task. Done: {future.done()}, future task: {future}"
)
exception = future.exception()
if exception is not None:
logger.exception(
f"The worker has raised an exception while executing Future task: {future}, exception: {exception}"
)


R = TypeVar("R")


def async_function(func: Callable[..., R]) -> Callable[..., Future[R]]:
"""
An easy way to implement multi-tread feature with thread pool. The decorator to run function in thread pool.
An easy way to implement multi-tread feature with thread pool. The decorator to run sync function in thread pool.
The return value of decorated function will be `concurrent.futures._base.Future`.
Usage: decorate the function with `@async_function`. For example,
Expand All @@ -32,7 +52,7 @@ def async_function(func: Callable[..., R]) -> Callable[..., Future[R]]:
https://stackoverflow.com/questions/37203950/decorator-for-extra-thread
:param func: function to run in thread pool
:param func: a sync function to run in thread pool
"""

@functools.wraps(func)
Expand Down Expand Up @@ -66,3 +86,32 @@ def wrapped(*arg: Any, **kwarg: Any) -> Future[R]:
return submitted_future

return wrapped


def async_function_wrapper(func: Callable[..., Any]) -> Callable[..., Task[Any]]:
"""
The decorator to add `add_done_callback` for async function.
The return value of decorated function will be `concurrent.futures._base.Future`.
Usage: decorate the function with `@async_function`. For example,
* a function that accepts one integer argument:
>>> @async_function_wrapper
>>> async def an_async_function(a_int: int):
>>> pass
* a function without argument:
>>> @async_function_wrapper
>>> async def an_async_function():
>>> pass
:param func: a sync function to run in thread pool
"""

@functools.wraps(func)
def wrapped(*arg: Any, **kwarg: Any) -> Task[Any]:
future = asyncio.ensure_future(func(*arg, **kwarg))
future.add_done_callback(done_callback)
return future

return wrapped
44 changes: 43 additions & 1 deletion python_boilerplate/common/profiling.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import time
from datetime import timedelta
from typing import Any, Callable, TypeVar
from typing import Any, Callable, Coroutine, TypeVar

import psutil
from loguru import logger
Expand Down Expand Up @@ -48,6 +48,48 @@ def wrapped(*arg: Any, **kwarg: Any) -> Any:
return decorator


def async_elapsed_time(
level: str = "INFO",
) -> Callable[..., Callable[..., Coroutine[Any, Any, R]]]:
"""
The decorator to monitor the elapsed time of an async function.
Usage:
* decorate the function with `@async_elapsed_time()` to profile the function with INFO log
>>> @async_elapsed_time()
>>> async def some_function():
>>> pass
* decorate the function with `@async_elapsed_time("DEBUG")` to profile the function with DEBUG log
>>> @async_elapsed_time("DEBUG")
>>> async def some_function():
>>> pass
https://stackoverflow.com/questions/12295974/python-decorators-just-syntactic-sugar
:param level: logging level, default is "INFO". Available values: ["TRACE", "DEBUG", "INFO", "WARNING", "ERROR"]
"""

def decorator(
func: Callable[..., Coroutine[Any, Any, R]]
) -> Callable[..., Coroutine[Any, Any, R]]:
@functools.wraps(func)
async def wrapped(*arg: Any, **kwarg: Any) -> Any:
start_time = time.perf_counter()
return_value = await func(*arg, **kwarg)
elapsed = time.perf_counter() - start_time
logger.log(
level,
f"{func.__qualname__}() -> elapsed time: {timedelta(seconds=elapsed)}",
)
return return_value

return wrapped

return decorator


def get_memory_usage() -> int:
"""
Gets the usage of memory
Expand Down
6 changes: 2 additions & 4 deletions python_boilerplate/common/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@

from loguru import logger

from python_boilerplate.common.asynchronization import done_callback
from python_boilerplate.common.common_function import json_serial
from python_boilerplate.configuration.thread_pool_configuration import (
done_callback,
executor,
)
from python_boilerplate.configuration.thread_pool_configuration import executor
from python_boilerplate.repository.model.trace_log import TraceLog
from python_boilerplate.repository.trace_log_repository import save

Expand Down
19 changes: 1 addition & 18 deletions python_boilerplate/configuration/thread_pool_configuration.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Any
from concurrent.futures.thread import ThreadPoolExecutor

from loguru import logger

Expand All @@ -15,22 +14,6 @@
)


def done_callback(future: Future[Any]) -> None:
"""
The default callback for Future once it's done. This function must be called after submitting a Future, to prevent
the ThreadPoolExecutor swallows exception in other threads.
https://stackoverflow.com/questions/15359295/python-thread-pool-that-handles-exceptions
https://stackoverflow.com/a/66993893
:param future: an asynchronous computation
"""
logger.debug(f"The worker has done its job. Done: {future.done()}")
exception = future.exception()
if exception:
logger.exception(f"The worker has raised an exception. {exception}")


def configure() -> None:
"""
Configure thread pool.
Expand Down
49 changes: 49 additions & 0 deletions python_boilerplate/demo/async_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import asyncio
from typing import Any

from loguru import logger

from python_boilerplate.__main__ import startup
from python_boilerplate.common.asynchronization import async_function_wrapper
from python_boilerplate.common.profiling import async_elapsed_time


@async_elapsed_time()
@async_function_wrapper
async def coroutine1() -> int:
logger.info("Coroutine 1 starting...")
await asyncio.sleep(1)
logger.info("Coroutine 1 finished!")
return 42


@async_elapsed_time()
@async_function_wrapper
async def coroutine2() -> str:
logger.info("Coroutine 2 starting...")
await asyncio.sleep(2)
logger.info("Coroutine 2 finished!")
return "Hello, world!"


@async_elapsed_time()
@async_function_wrapper
async def coroutine3() -> None:
logger.info("Coroutine 3 starting...")
await asyncio.sleep(1)
raise ValueError("Something went wrong")


async def main() -> None:
# Run both coroutines concurrently using asyncio.gather()
results: list[Any] = await asyncio.gather(
*[coroutine1(), coroutine2(), coroutine3()], return_exceptions=True
)
logger.info(f"Results: {results}")


if __name__ == "__main__":
startup()
# Run the event loop
asyncio.run(main())
logger.info(type(coroutine3))
2 changes: 1 addition & 1 deletion python_boilerplate/message/email.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

_smtp: smtplib.SMTP

if _email_muted:
if _email_muted or _email_muted is None:
logger.warning(_muted_message)
else:
# Login to the email server
Expand Down
2 changes: 1 addition & 1 deletion tests/configuration/test_thread_pool_configuration.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from pytest_mock import MockerFixture

from python_boilerplate.common.asynchronization import done_callback
from python_boilerplate.configuration.thread_pool_configuration import (
cleanup,
configure,
done_callback,
executor,
)

Expand Down
Loading

0 comments on commit b6a426e

Please sign in to comment.