From 57ac3fce968a6c5e82bd8bb28a19896789b0ba0d Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Thu, 12 Dec 2024 17:01:53 +0100 Subject: [PATCH] Refactor async notifiers implementation (#717) The PR doesn't affect existing functionality but simplifies the code. --- janus/__init__.py | 90 +++++++++++++--------------------------------- setup.cfg | 4 --- tests/test_sync.py | 4 +-- 3 files changed, 27 insertions(+), 71 deletions(-) diff --git a/janus/__init__.py b/janus/__init__.py index 1f7db59..cd63ef5 100644 --- a/janus/__init__.py +++ b/janus/__init__.py @@ -25,14 +25,7 @@ ) -if sys.version_info >= (3, 10): - from typing import ParamSpec -else: - from typing_extensions import ParamSpec - - T = TypeVar("T") -P = ParamSpec("P") OptFloat = Optional[float] @@ -114,18 +107,6 @@ def __init__(self, maxsize: int = 0) -> None: self._sync_queue = _SyncQueueProxy(self) self._async_queue = _AsyncQueueProxy(self) - def _call_soon_threadsafe( - self, callback: Callable[P, None], *args: P.args, **kwargs: P.kwargs - ) -> None: - if self._loop is None: - # async API didn't accessed yet, nothing to notify - return - try: - self._loop.call_soon_threadsafe(callback, *args) - except RuntimeError: - # swallowing agreed in #2 - pass - def _get_loop(self) -> asyncio.AbstractEventLoop: # Warning! # The function should be called when self._sync_mutex is locked, @@ -144,10 +125,8 @@ def close(self) -> None: for fut in self._pending: fut.cancel() if self._async_tasks_done_waiting: - if self._loop is not None: - self._call_soon_threadsafe( # unblocks all async_q.join() - self._make_async_tasks_done_notifier, self._loop - ) + # unblocks all async_q.join() + self._notify_async(self._async_tasks_done.notify_all) if self._sync_tasks_done_waiting: self._sync_tasks_done.notify_all() # unblocks all sync_q.join() @@ -208,32 +187,26 @@ def _put_internal(self, item: T) -> None: self._put(item) self._unfinished_tasks += 1 - async def _async_not_empty_notifier(self) -> None: - async with self._async_mutex: - self._async_not_empty.notify() - - def _make_async_not_empty_notifier(self, loop: asyncio.AbstractEventLoop) -> None: - task = loop.create_task(self._async_not_empty_notifier()) - task.add_done_callback(self._pending.remove) - self._pending.append(task) - - async def _async_not_full_notifier(self) -> None: + async def _do_async_notifier(self, method: Callable[[], None]) -> None: async with self._async_mutex: - self._async_not_full.notify() + method() - def _make_async_not_full_notifier(self, loop: asyncio.AbstractEventLoop) -> None: - task = loop.create_task(self._async_not_full_notifier()) + def _setup_async_notifier( + self, loop: asyncio.AbstractEventLoop, method: Callable[[], None] + ) -> None: + task = loop.create_task(self._do_async_notifier(method)) task.add_done_callback(self._pending.remove) self._pending.append(task) - async def _async_tasks_done_notifier(self) -> None: - async with self._async_mutex: - self._async_tasks_done.notify_all() - - def _make_async_tasks_done_notifier(self, loop: asyncio.AbstractEventLoop) -> None: - task = loop.create_task(self._async_tasks_done_notifier()) - task.add_done_callback(self._pending.remove) - self._pending.append(task) + def _notify_async(self, method: Callable[[], None]) -> None: + # Warning! + # The function should be called when self._sync_mutex is locked, + # otherwise the code is not thread-safe + loop = self._loop + if loop is None or loop.is_closed(): + # async API is not available, nothing to notify + return + loop.call_soon_threadsafe(self._setup_async_notifier, loop, method) def _check_closing(self) -> None: if self._closing: @@ -281,10 +254,7 @@ def task_done(self) -> None: if parent._sync_tasks_done_waiting: parent._sync_tasks_done.notify_all() if parent._async_tasks_done_waiting: - if parent._loop is not None: - parent._call_soon_threadsafe( - parent._make_async_tasks_done_notifier, parent._loop - ) + parent._notify_async(parent._async_tasks_done.notify_all) parent._unfinished_tasks = unfinished def join(self) -> None: @@ -382,10 +352,7 @@ def put(self, item: T, block: bool = True, timeout: OptFloat = None) -> None: if parent._sync_not_empty_waiting: parent._sync_not_empty.notify() if parent._async_not_empty_waiting: - if parent._loop is not None: - parent._call_soon_threadsafe( - parent._make_async_not_empty_notifier, parent._loop - ) + parent._notify_async(parent._async_not_empty.notify) def get(self, block: bool = True, timeout: OptFloat = None) -> T: """Remove and return an item from the queue. @@ -428,10 +395,7 @@ def get(self, block: bool = True, timeout: OptFloat = None) -> T: if parent._sync_not_full_waiting: parent._sync_not_full.notify() if parent._async_not_full_waiting: - if parent._loop is not None: - parent._call_soon_threadsafe( - parent._make_async_not_full_notifier, parent._loop - ) + parent._notify_async(parent._async_not_full.notify) return item def put_nowait(self, item: T) -> None: @@ -534,13 +498,13 @@ def put_nowait(self, item: T) -> None: parent = self._parent parent._check_closing() with parent._sync_mutex: - loop = parent._get_loop() + parent._get_loop() if 0 < parent._maxsize <= parent._qsize(): raise AsyncQueueFull parent._put_internal(item) if parent._async_not_empty_waiting: - parent._make_async_not_empty_notifier(loop) + parent._notify_async(parent._async_not_empty.notify) if parent._sync_not_empty_waiting: parent._sync_not_empty.notify() @@ -583,11 +547,10 @@ def get_nowait(self) -> T: if not parent._qsize(): raise AsyncQueueEmpty - loop = parent._get_loop() - + parent._get_loop() item = parent._get() if parent._async_not_full_waiting: - parent._make_async_not_full_notifier(loop) + parent._notify_async(parent._async_not_full.notify) if parent._sync_not_full_waiting: parent._sync_not_full.notify() return item @@ -614,10 +577,7 @@ def task_done(self) -> None: parent._unfinished_tasks -= 1 if parent._unfinished_tasks == 0: if parent._async_tasks_done_waiting: - if parent._loop is not None: - parent._call_soon_threadsafe( - parent._make_async_tasks_done_notifier, parent._loop - ) + parent._notify_async(parent._async_tasks_done.notify_all) if parent._sync_tasks_done_waiting: parent._sync_tasks_done.notify_all() diff --git a/setup.cfg b/setup.cfg index 514a241..ceb42b6 100644 --- a/setup.cfg +++ b/setup.cfg @@ -47,10 +47,6 @@ packages = find: zip_safe = True include_package_data = True -install_requires = - typing-extensions >= 4.0.0; python_version < "3.10" - - [flake8] exclude = .git,.env,__pycache__,.eggs max-line-length = 88 diff --git a/tests/test_sync.py b/tests/test_sync.py index f427c91..330e9f2 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -411,8 +411,8 @@ async def test_closed_loop_non_failing(self): _q = janus.Queue(QUEUE_SIZE) q = _q.sync_q # we are pacthing loop to follow setUp/tearDown agreement - with patch.object(loop, "call_soon_threadsafe") as func: - func.side_effect = RuntimeError() + with patch.object(loop, "is_closed") as func: + func.return_value = True task = loop.create_task(_q.async_q.get()) await asyncio.sleep(0) try: