diff --git a/python_boilerplate/__init__.py b/python_boilerplate/__init__.py index c2a3ba2..d753055 100644 --- a/python_boilerplate/__init__.py +++ b/python_boilerplate/__init__.py @@ -1,4 +1,5 @@ import atexit +import sys from loguru import logger @@ -17,7 +18,12 @@ ) from python_boilerplate.message.email import __init__ from python_boilerplate.message.email import cleanup as email_cleanup -from python_boilerplate.repository.startup_log_repository import save +from python_boilerplate.repository.model.startup_log import StartupLog +from python_boilerplate.repository.startup_log_repository import ( + retain_startup_log, + save, +) +from python_boilerplate.repository.trace_log_repository import retain_trace_log # Configuration application_configure() @@ -27,7 +33,12 @@ # Initialization __init__() logger.info(f"Application [{get_module_name()}] started") -save() + +# Saving startup log +# Cannot save startup log in parallel, because the ThreadPoolExecutor won't be able to start another future +# once the MainThread will end very soon. +# executor.submit(save, StartupLog(command_line=" ".join(sys.argv))).add_done_callback(done_callback) +save(StartupLog(command_line=" ".join(sys.argv))) @atexit.register @@ -36,5 +47,9 @@ def finalize() -> None: Register `finalize()` function to be executed upon normal program termination. """ logger.warning("Cleaning up…") + # Retain logs, in case the size of the SQLite database will be increasing like crazy. + retain_startup_log() + retain_trace_log() + # Shutdown tread pool and other connections thread_pool_cleanup() email_cleanup() diff --git a/python_boilerplate/common/common_function.py b/python_boilerplate/common/common_function.py index 8a57b1e..3fbf774 100644 --- a/python_boilerplate/common/common_function.py +++ b/python_boilerplate/common/common_function.py @@ -1,5 +1,7 @@ import os +from datetime import date, datetime from pathlib import Path +from typing import Any from loguru import logger @@ -67,3 +69,17 @@ def get_login_user() -> str: f"Failed to get current login user, falling back to `default_user`. {ex}" ) return "default_user" + + +def json_serial(obj: Any) -> str | dict: + """ + JSON serializer for objects not serializable by default json code + https://stackoverflow.com/questions/11875770/how-to-overcome-datetime-datetime-not-json-serializable/36142844#36142844 + :param obj: on object needs to be serialized + :return: string or dictionary + """ + if isinstance(obj, (datetime, date)): + return obj.isoformat() + if isinstance(obj, set): + return str(obj) + return obj.__dict__ diff --git a/python_boilerplate/common/trace.py b/python_boilerplate/common/trace.py index b5125f5..dcb402c 100644 --- a/python_boilerplate/common/trace.py +++ b/python_boilerplate/common/trace.py @@ -2,21 +2,26 @@ import json from typing import Callable -from python_boilerplate.configuration.thread_pool_configuration import executor +from python_boilerplate.common.common_function import json_serial +from python_boilerplate.configuration.thread_pool_configuration import ( + done_callback, + executor, +) from python_boilerplate.repository.model.trace_log import TraceLog +from python_boilerplate.repository.trace_log_repository import save def trace(func: Callable): def wrapped(*arg, **kwarg): function_arguments = {"arg": arg, "kwarg": kwarg} - trace_log = TraceLog( - called_by=inspect.stack()[1][3], - function_qualified_name=func.__qualname__, - function_arguments=json.dumps( - function_arguments, default=lambda x: x.__dict__ + executor.submit( + save, + TraceLog( + called_by=inspect.stack()[1][3], + function_qualified_name=func.__qualname__, + function_arguments=json.dumps(function_arguments, default=json_serial), ), - ) - executor.submit(lambda x: x.save(), trace_log) + ).add_done_callback(done_callback) return func(*arg, **kwarg) return wrapped diff --git a/python_boilerplate/configuration/thread_pool_configuration.py b/python_boilerplate/configuration/thread_pool_configuration.py index 9585f2a..e72c890 100644 --- a/python_boilerplate/configuration/thread_pool_configuration.py +++ b/python_boilerplate/configuration/thread_pool_configuration.py @@ -1,16 +1,34 @@ -import time -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import Future, ThreadPoolExecutor from loguru import logger from python_boilerplate.common.common_function import get_cpu_count +# Thread Concurrency Visualization https://www.jetbrains.com/help/pycharm/thread-concurrency-visualization.html + + max_workers = 2 * get_cpu_count() executor: ThreadPoolExecutor = ThreadPoolExecutor( max_workers=max_workers, thread_name_prefix="my_thread_pool" ) +def done_callback(future: Future): + """ + The default callback for Future once it's done. This function must be called after submitting a Future, to prevent + the ThreadPoolExecutor swallows exception in other threads. + + https://stackoverflow.com/questions/15359295/python-thread-pool-that-handles-exceptions + https://stackoverflow.com/a/66993893 + + :param future: an asynchronous computation + """ + logger.debug(f"The worker has done its job. Done: {future.done()}") + exception = future.exception() + if exception: + logger.exception(f"The worker has raised an exception. {exception}") + + def configure() -> None: """ Configure thread pool. @@ -32,19 +50,3 @@ def cleanup() -> None: logger.warning( f"Thread pool executor has been shutdown: {executor}, pending: {executor._work_queue.qsize()} jobs, threads: {len(executor._threads)}" ) - - -def simulate_get_html(times): - time.sleep(times) - logger.info(f"get page {times} finished") - return times - - -def simulate_multi_thread() -> None: - task1 = executor.submit(simulate_get_html, 3) - task2 = executor.submit(simulate_get_html, 2) - logger.info(task1.done()) - logger.info(task2.cancel()) - time.sleep(4) - logger.info(task1.done()) - logger.info(task1.result()) diff --git a/python_boilerplate/repository/startup_log_repository.py b/python_boilerplate/repository/startup_log_repository.py index 9886b19..ae5d7f7 100644 --- a/python_boilerplate/repository/startup_log_repository.py +++ b/python_boilerplate/repository/startup_log_repository.py @@ -1,21 +1,19 @@ -import sys - import arrow from loguru import logger +from python_boilerplate import get_module_name from python_boilerplate.common.trace import trace from python_boilerplate.repository.model.startup_log import StartupLog @trace -def save() -> StartupLog: +def save(startup_log: StartupLog) -> StartupLog: """ Save a new startup log. + :param: a StartupLog needs to save :return: a StartupLog object """ - startup_log: StartupLog = StartupLog(command_line=" ".join(sys.argv)) startup_log.save() - retain_startup_log() return startup_log @@ -26,7 +24,7 @@ def retain_startup_log() -> int: ) # the affected_rows is always 1 no matter how many rows were deleted logger.debug( - f"The program retains recent 7 days of startup log. " + f"The app [{get_module_name()}] retains recent 7 days of startup log. " f"Deleted {affected_rows} records that are before {a_week_ago}" ) return affected_rows diff --git a/python_boilerplate/repository/trace_log_repository.py b/python_boilerplate/repository/trace_log_repository.py new file mode 100644 index 0000000..39dea5d --- /dev/null +++ b/python_boilerplate/repository/trace_log_repository.py @@ -0,0 +1,23 @@ +import arrow +from loguru import logger + +from python_boilerplate import get_module_name +from python_boilerplate.repository.model.trace_log import TraceLog + + +def save(trace_log: TraceLog) -> TraceLog: + trace_log.save() + return trace_log + + +def retain_trace_log() -> int: + a_week_ago = arrow.now().shift(days=-7).format("YYYY-MM-DD") + affected_rows: int = ( + TraceLog.delete().where(TraceLog.start_time < a_week_ago).execute() + ) + # the affected_rows is always 1 no matter how many rows were deleted + logger.debug( + f"The app [{get_module_name()}] retains recent 7 days of trace log. " + f"Deleted {affected_rows} records that are before {a_week_ago}" + ) + return affected_rows diff --git a/tests/repository/test_startup_log_repository.py b/tests/repository/test_startup_log_repository.py index bca1af3..b035aee 100644 --- a/tests/repository/test_startup_log_repository.py +++ b/tests/repository/test_startup_log_repository.py @@ -1,3 +1,5 @@ +import sys + from loguru import logger from python_boilerplate.repository.model.startup_log import StartupLog @@ -5,9 +7,8 @@ def test_save() -> None: - saved_startup_log: StartupLog try: - saved_startup_log = save() + saved_startup_log = save(StartupLog(command_line=" ".join(sys.argv))) except Exception as ex: assert False, f"{save} raised an exception {ex}" assert saved_startup_log is not None