From 06abb3c24b5b613bc60a7b6b886ee4e0057bc2cd Mon Sep 17 00:00:00 2001 From: Ilya Egorov <0x42005e1f@gmail.com> Date: Wed, 11 Dec 2024 16:12:05 +0400 Subject: [PATCH 1/8] Fix hang in AsyncQueue.join() --- janus/__init__.py | 94 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 68 insertions(+), 26 deletions(-) diff --git a/janus/__init__.py b/janus/__init__.py index 23c4076..e4499f3 100644 --- a/janus/__init__.py +++ b/janus/__init__.py @@ -94,7 +94,8 @@ def __init__(self, maxsize: int = 0) -> None: self._sync_not_empty_waiting = 0 self._sync_not_full = threading.Condition(self._sync_mutex) self._sync_not_full_waiting = 0 - self._all_tasks_done = threading.Condition(self._sync_mutex) + self._sync_tasks_done = threading.Condition(self._sync_mutex) + self._sync_tasks_done_waiting = 0 self._async_mutex = asyncio.Lock() if sys.version_info[:3] == (3, 10, 0): @@ -104,8 +105,8 @@ def __init__(self, maxsize: int = 0) -> None: self._async_not_empty_waiting = 0 self._async_not_full = asyncio.Condition(self._async_mutex) self._async_not_full_waiting = 0 - self._finished = asyncio.Event() - self._finished.set() + self._async_tasks_done = asyncio.Condition(self._async_mutex) + self._async_tasks_done_waiting = 0 self._closing = False self._pending: deque[asyncio.Future[Any]] = deque() @@ -142,8 +143,13 @@ def close(self) -> None: self._closing = True for fut in self._pending: fut.cancel() - self._finished.set() # unblocks all async_q.join() - self._all_tasks_done.notify_all() # unblocks all sync_q.join() + if self._async_tasks_done_waiting: + if self._loop is not None: + self._call_soon_threadsafe( # unblocks all async_q.join() + self._make_async_tasks_done_notifier, self._loop + ) + if self._sync_tasks_done_waiting: + self._sync_tasks_done.notify_all() # unblocks all sync_q.join() async def wait_closed(self) -> None: # should be called from loop after close(). @@ -201,7 +207,6 @@ def _get(self) -> T: def _put_internal(self, item: T) -> None: self._put(item) self._unfinished_tasks += 1 - self._finished.clear() async def _async_not_empty_notifier(self) -> None: async with self._async_mutex: @@ -221,6 +226,15 @@ def _make_async_not_full_notifier(self, loop: asyncio.AbstractEventLoop) -> None task.add_done_callback(self._pending.remove) self._pending.append(task) + async def _async_tasks_done_notifier(self) -> None: + async with self._async_mutex: + self._async_tasks_done.notify_all() + + def _make_async_tasks_done_notifier(self, loop: asyncio.AbstractEventLoop) -> None: + task = loop.create_task(self._async_tasks_done_notifier()) + task.add_done_callback(self._pending.remove) + self._pending.append(task) + def _check_closing(self) -> None: if self._closing: raise RuntimeError("Operation on the closed queue is forbidden") @@ -259,13 +273,18 @@ def task_done(self) -> None: """ parent = self._parent parent._check_closing() - with parent._all_tasks_done: + with parent._sync_tasks_done: unfinished = parent._unfinished_tasks - 1 if unfinished <= 0: if unfinished < 0: raise ValueError("task_done() called too many times") - parent._all_tasks_done.notify_all() - parent._call_soon_threadsafe(parent._finished.set) + if parent._sync_tasks_done_waiting: + parent._sync_tasks_done.notify_all() + if parent._async_tasks_done_waiting: + if parent._loop is not None: + parent._call_soon_threadsafe( + parent._make_async_tasks_done_notifier, parent._loop + ) parent._unfinished_tasks = unfinished def join(self) -> None: @@ -279,9 +298,13 @@ def join(self) -> None: """ parent = self._parent parent._check_closing() - with parent._all_tasks_done: + with parent._sync_tasks_done: while parent._unfinished_tasks: - parent._all_tasks_done.wait() + parent._sync_tasks_done_waiting += 1 + try: + parent._sync_tasks_done.wait() + finally: + parent._sync_tasks_done_waiting -= 1 parent._check_closing() def qsize(self) -> int: @@ -495,15 +518,15 @@ async def put(self, item: T) -> None: while do_wait: do_wait = parent._qsize() >= parent._maxsize if do_wait: + parent._async_not_full_waiting += 1 locked = False parent._sync_mutex.release() - parent._async_not_full_waiting += 1 try: await parent._async_not_full.wait() finally: + parent._sync_mutex.acquire() + locked = True parent._async_not_full_waiting -= 1 - parent._sync_mutex.acquire() - locked = True parent._put_internal(item) if parent._async_not_empty_waiting: @@ -552,15 +575,15 @@ async def get(self) -> T: do_wait = parent._qsize() == 0 if do_wait: + parent._async_not_empty_waiting += 1 locked = False parent._sync_mutex.release() - parent._async_not_empty_waiting += 1 try: await parent._async_not_empty.wait() finally: + parent._sync_mutex.acquire() + locked = True parent._async_not_empty_waiting -= 1 - parent._sync_mutex.acquire() - locked = True item = parent._get() if parent._async_not_full_waiting: @@ -608,13 +631,18 @@ def task_done(self) -> None: """ parent = self._parent parent._check_closing() - with parent._all_tasks_done: + with parent._sync_tasks_done: if parent._unfinished_tasks <= 0: raise ValueError("task_done() called too many times") parent._unfinished_tasks -= 1 if parent._unfinished_tasks == 0: - parent._finished.set() - parent._all_tasks_done.notify_all() + if parent._async_tasks_done_waiting: + if parent._loop is not None: + parent._call_soon_threadsafe( + parent._make_async_tasks_done_notifier, parent._loop + ) + if parent._sync_tasks_done_waiting: + parent._sync_tasks_done.notify_all() async def join(self) -> None: """Block until all items in the queue have been gotten and processed. @@ -625,12 +653,26 @@ async def join(self) -> None: When the count of unfinished tasks drops to zero, join() unblocks. """ parent = self._parent - while True: - with parent._sync_mutex: - parent._check_closing() - if parent._unfinished_tasks == 0: - break - await parent._finished.wait() + parent._check_closing() + async with parent._async_tasks_done: + parent._sync_mutex.acquire() + locked = True + parent._get_loop() # check the event loop + try: + while parent._unfinished_tasks: + parent._async_tasks_done_waiting += 1 + locked = False + parent._sync_mutex.release() + try: + await parent._async_tasks_done.wait() + finally: + parent._sync_mutex.acquire() + locked = True + parent._async_tasks_done_waiting -= 1 + parent._check_closing() + finally: + if locked: + parent._sync_mutex.release() class PriorityQueue(Queue[T]): From 6d1b8a74f74036f8ba057d4b5d084987f69f4f10 Mon Sep 17 00:00:00 2001 From: Ilya Egorov <0x42005e1f@gmail.com> Date: Wed, 11 Dec 2024 16:34:09 +0400 Subject: [PATCH 2/8] Simplify implementation --- janus/__init__.py | 27 ++++++++++----------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/janus/__init__.py b/janus/__init__.py index e4499f3..09e1c83 100644 --- a/janus/__init__.py +++ b/janus/__init__.py @@ -144,10 +144,9 @@ def close(self) -> None: for fut in self._pending: fut.cancel() if self._async_tasks_done_waiting: - if self._loop is not None: - self._call_soon_threadsafe( # unblocks all async_q.join() - self._make_async_tasks_done_notifier, self._loop - ) + self._call_soon_threadsafe( # unblocks all async_q.join() + self._make_async_tasks_done_notifier, self._loop + ) if self._sync_tasks_done_waiting: self._sync_tasks_done.notify_all() # unblocks all sync_q.join() @@ -281,10 +280,9 @@ def task_done(self) -> None: if parent._sync_tasks_done_waiting: parent._sync_tasks_done.notify_all() if parent._async_tasks_done_waiting: - if parent._loop is not None: - parent._call_soon_threadsafe( - parent._make_async_tasks_done_notifier, parent._loop - ) + parent._call_soon_threadsafe( + parent._make_async_tasks_done_notifier, parent._loop + ) parent._unfinished_tasks = unfinished def join(self) -> None: @@ -637,10 +635,9 @@ def task_done(self) -> None: parent._unfinished_tasks -= 1 if parent._unfinished_tasks == 0: if parent._async_tasks_done_waiting: - if parent._loop is not None: - parent._call_soon_threadsafe( - parent._make_async_tasks_done_notifier, parent._loop - ) + parent._call_soon_threadsafe( + parent._make_async_tasks_done_notifier, parent._loop + ) if parent._sync_tasks_done_waiting: parent._sync_tasks_done.notify_all() @@ -656,23 +653,19 @@ async def join(self) -> None: parent._check_closing() async with parent._async_tasks_done: parent._sync_mutex.acquire() - locked = True parent._get_loop() # check the event loop try: while parent._unfinished_tasks: parent._async_tasks_done_waiting += 1 - locked = False parent._sync_mutex.release() try: await parent._async_tasks_done.wait() finally: parent._sync_mutex.acquire() - locked = True parent._async_tasks_done_waiting -= 1 parent._check_closing() finally: - if locked: - parent._sync_mutex.release() + parent._sync_mutex.release() class PriorityQueue(Queue[T]): From 4c0c9645085759e29d268f6fccf7aafe7f25074e Mon Sep 17 00:00:00 2001 From: Ilya Egorov <0x42005e1f@gmail.com> Date: Wed, 11 Dec 2024 16:48:40 +0400 Subject: [PATCH 3/8] Add assertions --- janus/__init__.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/janus/__init__.py b/janus/__init__.py index 09e1c83..d38140f 100644 --- a/janus/__init__.py +++ b/janus/__init__.py @@ -144,6 +144,7 @@ def close(self) -> None: for fut in self._pending: fut.cancel() if self._async_tasks_done_waiting: + assert self._loop is not None self._call_soon_threadsafe( # unblocks all async_q.join() self._make_async_tasks_done_notifier, self._loop ) @@ -280,6 +281,7 @@ def task_done(self) -> None: if parent._sync_tasks_done_waiting: parent._sync_tasks_done.notify_all() if parent._async_tasks_done_waiting: + assert parent._loop is not None parent._call_soon_threadsafe( parent._make_async_tasks_done_notifier, parent._loop ) @@ -380,10 +382,10 @@ def put(self, item: T, block: bool = True, timeout: OptFloat = None) -> None: if parent._sync_not_empty_waiting: parent._sync_not_empty.notify() if parent._async_not_empty_waiting: - if parent._loop is not None: - parent._call_soon_threadsafe( - parent._make_async_not_empty_notifier, parent._loop - ) + assert parent._loop is not None + parent._call_soon_threadsafe( + parent._make_async_not_empty_notifier, parent._loop + ) def get(self, block: bool = True, timeout: OptFloat = None) -> T: """Remove and return an item from the queue. @@ -426,10 +428,10 @@ def get(self, block: bool = True, timeout: OptFloat = None) -> T: if parent._sync_not_full_waiting: parent._sync_not_full.notify() if parent._async_not_full_waiting: - if parent._loop is not None: - parent._call_soon_threadsafe( - parent._make_async_not_full_notifier, parent._loop - ) + assert parent._loop is not None + parent._call_soon_threadsafe( + parent._make_async_not_full_notifier, parent._loop + ) return item def put_nowait(self, item: T) -> None: @@ -635,6 +637,7 @@ def task_done(self) -> None: parent._unfinished_tasks -= 1 if parent._unfinished_tasks == 0: if parent._async_tasks_done_waiting: + assert parent._loop is not None parent._call_soon_threadsafe( parent._make_async_tasks_done_notifier, parent._loop ) From 640957fd4a322cb60b392761892ee8ce074e355a Mon Sep 17 00:00:00 2001 From: Ilya Egorov <0x42005e1f@gmail.com> Date: Wed, 11 Dec 2024 16:55:11 +0400 Subject: [PATCH 4/8] Revert checks --- janus/__init__.py | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/janus/__init__.py b/janus/__init__.py index d38140f..3126bde 100644 --- a/janus/__init__.py +++ b/janus/__init__.py @@ -144,10 +144,10 @@ def close(self) -> None: for fut in self._pending: fut.cancel() if self._async_tasks_done_waiting: - assert self._loop is not None - self._call_soon_threadsafe( # unblocks all async_q.join() - self._make_async_tasks_done_notifier, self._loop - ) + if self._loop is not None: + self._call_soon_threadsafe( # unblocks all async_q.join() + self._make_async_tasks_done_notifier, self._loop + ) if self._sync_tasks_done_waiting: self._sync_tasks_done.notify_all() # unblocks all sync_q.join() @@ -281,10 +281,10 @@ def task_done(self) -> None: if parent._sync_tasks_done_waiting: parent._sync_tasks_done.notify_all() if parent._async_tasks_done_waiting: - assert parent._loop is not None - parent._call_soon_threadsafe( - parent._make_async_tasks_done_notifier, parent._loop - ) + if parent._loop is not None: + parent._call_soon_threadsafe( + parent._make_async_tasks_done_notifier, parent._loop + ) parent._unfinished_tasks = unfinished def join(self) -> None: @@ -382,10 +382,10 @@ def put(self, item: T, block: bool = True, timeout: OptFloat = None) -> None: if parent._sync_not_empty_waiting: parent._sync_not_empty.notify() if parent._async_not_empty_waiting: - assert parent._loop is not None - parent._call_soon_threadsafe( - parent._make_async_not_empty_notifier, parent._loop - ) + if parent._loop is not None: + parent._call_soon_threadsafe( + parent._make_async_not_empty_notifier, parent._loop + ) def get(self, block: bool = True, timeout: OptFloat = None) -> T: """Remove and return an item from the queue. @@ -428,10 +428,10 @@ def get(self, block: bool = True, timeout: OptFloat = None) -> T: if parent._sync_not_full_waiting: parent._sync_not_full.notify() if parent._async_not_full_waiting: - assert parent._loop is not None - parent._call_soon_threadsafe( - parent._make_async_not_full_notifier, parent._loop - ) + if parent._loop is not None: + parent._call_soon_threadsafe( + parent._make_async_not_full_notifier, parent._loop + ) return item def put_nowait(self, item: T) -> None: @@ -637,10 +637,10 @@ def task_done(self) -> None: parent._unfinished_tasks -= 1 if parent._unfinished_tasks == 0: if parent._async_tasks_done_waiting: - assert parent._loop is not None - parent._call_soon_threadsafe( - parent._make_async_tasks_done_notifier, parent._loop - ) + if parent._loop is not None: + parent._call_soon_threadsafe( + parent._make_async_tasks_done_notifier, parent._loop + ) if parent._sync_tasks_done_waiting: parent._sync_tasks_done.notify_all() From f245ffb1b0eb043c949e997ff0579be23e203e0b Mon Sep 17 00:00:00 2001 From: Ilya Egorov <0x42005e1f@gmail.com> Date: Wed, 11 Dec 2024 17:03:57 +0400 Subject: [PATCH 5/8] Simplify implementation again --- janus/__init__.py | 29 ++++++----------------------- 1 file changed, 6 insertions(+), 23 deletions(-) diff --git a/janus/__init__.py b/janus/__init__.py index 3126bde..ad3d204 100644 --- a/janus/__init__.py +++ b/janus/__init__.py @@ -509,23 +509,19 @@ async def put(self, item: T) -> None: parent = self._parent parent._check_closing() async with parent._async_not_full: - parent._sync_mutex.acquire() - locked = True - parent._get_loop() # check the event loop - try: + with parent._sync_mutex: + parent._get_loop() # check the event loop if parent._maxsize > 0: do_wait = True while do_wait: do_wait = parent._qsize() >= parent._maxsize if do_wait: parent._async_not_full_waiting += 1 - locked = False parent._sync_mutex.release() try: await parent._async_not_full.wait() finally: parent._sync_mutex.acquire() - locked = True parent._async_not_full_waiting -= 1 parent._put_internal(item) @@ -533,9 +529,6 @@ async def put(self, item: T) -> None: parent._async_not_empty.notify() if parent._sync_not_empty_waiting: parent._sync_not_empty.notify() - finally: - if locked: - parent._sync_mutex.release() def put_nowait(self, item: T) -> None: """Put an item into the queue without blocking. @@ -566,23 +559,19 @@ async def get(self) -> T: parent = self._parent parent._check_closing() async with parent._async_not_empty: - parent._sync_mutex.acquire() - locked = True - parent._get_loop() # check the event loop - try: + with parent._sync_mutex: + parent._get_loop() # check the event loop do_wait = True while do_wait: do_wait = parent._qsize() == 0 if do_wait: parent._async_not_empty_waiting += 1 - locked = False parent._sync_mutex.release() try: await parent._async_not_empty.wait() finally: parent._sync_mutex.acquire() - locked = True parent._async_not_empty_waiting -= 1 item = parent._get() @@ -591,9 +580,6 @@ async def get(self) -> T: if parent._sync_not_full_waiting: parent._sync_not_full.notify() return item - finally: - if locked: - parent._sync_mutex.release() def get_nowait(self) -> T: """Remove and return an item from the queue. @@ -655,9 +641,8 @@ async def join(self) -> None: parent = self._parent parent._check_closing() async with parent._async_tasks_done: - parent._sync_mutex.acquire() - parent._get_loop() # check the event loop - try: + with parent._sync_mutex: + parent._get_loop() # check the event loop while parent._unfinished_tasks: parent._async_tasks_done_waiting += 1 parent._sync_mutex.release() @@ -667,8 +652,6 @@ async def join(self) -> None: parent._sync_mutex.acquire() parent._async_tasks_done_waiting -= 1 parent._check_closing() - finally: - parent._sync_mutex.release() class PriorityQueue(Queue[T]): From aca56088d6818aa1bea5a708703c1f5efa57b1a7 Mon Sep 17 00:00:00 2001 From: Ilya Egorov <0x42005e1f@gmail.com> Date: Wed, 11 Dec 2024 17:11:09 +0400 Subject: [PATCH 6/8] Update changes --- CHANGES.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGES.rst b/CHANGES.rst index 7f700ab..8b64649 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -16,6 +16,8 @@ Changes - Remove sync notifiers for a major speedup #714 +- Fix hang in AsyncQueue.join() #716 + 1.1.0 (2024-10-30) ------------------ From e8ae472a0b29bdd7559f2a340de73449701570e9 Mon Sep 17 00:00:00 2001 From: Ilya Egorov <0x42005e1f@gmail.com> Date: Wed, 11 Dec 2024 17:14:00 +0400 Subject: [PATCH 7/8] Reformat changes --- CHANGES.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 8b64649..7df434a 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -6,7 +6,7 @@ Changes - Optimize internal implementation for a little speedup #699 -- Make not-full and not-empty notificatios faster #703 +- Make not-full and not-empty notifications faster #703 - Add ``.aclose()`` async method #709 @@ -16,7 +16,7 @@ Changes - Remove sync notifiers for a major speedup #714 -- Fix hang in AsyncQueue.join() #716 +- Fix hang in ``AsyncQueue.join()`` #716 1.1.0 (2024-10-30) ------------------ From e3a8eba2ce09ba86610318beb677ef4b13995cf3 Mon Sep 17 00:00:00 2001 From: Ilya Egorov <0x42005e1f@gmail.com> Date: Wed, 11 Dec 2024 18:12:30 +0400 Subject: [PATCH 8/8] Refactor cycles --- janus/__init__.py | 47 +++++++++++++++++++---------------------------- 1 file changed, 19 insertions(+), 28 deletions(-) diff --git a/janus/__init__.py b/janus/__init__.py index ad3d204..1f7db59 100644 --- a/janus/__init__.py +++ b/janus/__init__.py @@ -511,18 +511,14 @@ async def put(self, item: T) -> None: async with parent._async_not_full: with parent._sync_mutex: parent._get_loop() # check the event loop - if parent._maxsize > 0: - do_wait = True - while do_wait: - do_wait = parent._qsize() >= parent._maxsize - if do_wait: - parent._async_not_full_waiting += 1 - parent._sync_mutex.release() - try: - await parent._async_not_full.wait() - finally: - parent._sync_mutex.acquire() - parent._async_not_full_waiting -= 1 + while 0 < parent._maxsize <= parent._qsize(): + parent._async_not_full_waiting += 1 + parent._sync_mutex.release() + try: + await parent._async_not_full.wait() + finally: + parent._sync_mutex.acquire() + parent._async_not_full_waiting -= 1 parent._put_internal(item) if parent._async_not_empty_waiting: @@ -539,9 +535,8 @@ def put_nowait(self, item: T) -> None: parent._check_closing() with parent._sync_mutex: loop = parent._get_loop() - if parent._maxsize > 0: - if parent._qsize() >= parent._maxsize: - raise AsyncQueueFull + if 0 < parent._maxsize <= parent._qsize(): + raise AsyncQueueFull parent._put_internal(item) if parent._async_not_empty_waiting: @@ -561,18 +556,14 @@ async def get(self) -> T: async with parent._async_not_empty: with parent._sync_mutex: parent._get_loop() # check the event loop - do_wait = True - while do_wait: - do_wait = parent._qsize() == 0 - - if do_wait: - parent._async_not_empty_waiting += 1 - parent._sync_mutex.release() - try: - await parent._async_not_empty.wait() - finally: - parent._sync_mutex.acquire() - parent._async_not_empty_waiting -= 1 + while not parent._qsize(): + parent._async_not_empty_waiting += 1 + parent._sync_mutex.release() + try: + await parent._async_not_empty.wait() + finally: + parent._sync_mutex.acquire() + parent._async_not_empty_waiting -= 1 item = parent._get() if parent._async_not_full_waiting: @@ -589,7 +580,7 @@ def get_nowait(self) -> T: parent = self._parent parent._check_closing() with parent._sync_mutex: - if parent._qsize() == 0: + if not parent._qsize(): raise AsyncQueueEmpty loop = parent._get_loop()