Skip to content

Commit

Permalink
Refactor async notifiers implementation (#717)
Browse files Browse the repository at this point in the history
The PR doesn't affect existing functionality but simplifies the code.
  • Loading branch information
asvetlov authored Dec 12, 2024
1 parent a85cc40 commit 57ac3fc
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 71 deletions.
90 changes: 25 additions & 65 deletions janus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,7 @@
)


if sys.version_info >= (3, 10):
from typing import ParamSpec
else:
from typing_extensions import ParamSpec


T = TypeVar("T")
P = ParamSpec("P")
OptFloat = Optional[float]


Expand Down Expand Up @@ -114,18 +107,6 @@ def __init__(self, maxsize: int = 0) -> None:
self._sync_queue = _SyncQueueProxy(self)
self._async_queue = _AsyncQueueProxy(self)

def _call_soon_threadsafe(
self, callback: Callable[P, None], *args: P.args, **kwargs: P.kwargs
) -> None:
if self._loop is None:
# async API didn't accessed yet, nothing to notify
return
try:
self._loop.call_soon_threadsafe(callback, *args)
except RuntimeError:
# swallowing agreed in #2
pass

def _get_loop(self) -> asyncio.AbstractEventLoop:
# Warning!
# The function should be called when self._sync_mutex is locked,
Expand All @@ -144,10 +125,8 @@ def close(self) -> None:
for fut in self._pending:
fut.cancel()
if self._async_tasks_done_waiting:
if self._loop is not None:
self._call_soon_threadsafe( # unblocks all async_q.join()
self._make_async_tasks_done_notifier, self._loop
)
# unblocks all async_q.join()
self._notify_async(self._async_tasks_done.notify_all)
if self._sync_tasks_done_waiting:
self._sync_tasks_done.notify_all() # unblocks all sync_q.join()

Expand Down Expand Up @@ -208,32 +187,26 @@ def _put_internal(self, item: T) -> None:
self._put(item)
self._unfinished_tasks += 1

async def _async_not_empty_notifier(self) -> None:
async with self._async_mutex:
self._async_not_empty.notify()

def _make_async_not_empty_notifier(self, loop: asyncio.AbstractEventLoop) -> None:
task = loop.create_task(self._async_not_empty_notifier())
task.add_done_callback(self._pending.remove)
self._pending.append(task)

async def _async_not_full_notifier(self) -> None:
async def _do_async_notifier(self, method: Callable[[], None]) -> None:
async with self._async_mutex:
self._async_not_full.notify()
method()

def _make_async_not_full_notifier(self, loop: asyncio.AbstractEventLoop) -> None:
task = loop.create_task(self._async_not_full_notifier())
def _setup_async_notifier(
self, loop: asyncio.AbstractEventLoop, method: Callable[[], None]
) -> None:
task = loop.create_task(self._do_async_notifier(method))
task.add_done_callback(self._pending.remove)
self._pending.append(task)

async def _async_tasks_done_notifier(self) -> None:
async with self._async_mutex:
self._async_tasks_done.notify_all()

def _make_async_tasks_done_notifier(self, loop: asyncio.AbstractEventLoop) -> None:
task = loop.create_task(self._async_tasks_done_notifier())
task.add_done_callback(self._pending.remove)
self._pending.append(task)
def _notify_async(self, method: Callable[[], None]) -> None:
# Warning!
# The function should be called when self._sync_mutex is locked,
# otherwise the code is not thread-safe
loop = self._loop
if loop is None or loop.is_closed():
# async API is not available, nothing to notify
return
loop.call_soon_threadsafe(self._setup_async_notifier, loop, method)

def _check_closing(self) -> None:
if self._closing:
Expand Down Expand Up @@ -281,10 +254,7 @@ def task_done(self) -> None:
if parent._sync_tasks_done_waiting:
parent._sync_tasks_done.notify_all()
if parent._async_tasks_done_waiting:
if parent._loop is not None:
parent._call_soon_threadsafe(
parent._make_async_tasks_done_notifier, parent._loop
)
parent._notify_async(parent._async_tasks_done.notify_all)
parent._unfinished_tasks = unfinished

def join(self) -> None:
Expand Down Expand Up @@ -382,10 +352,7 @@ def put(self, item: T, block: bool = True, timeout: OptFloat = None) -> None:
if parent._sync_not_empty_waiting:
parent._sync_not_empty.notify()
if parent._async_not_empty_waiting:
if parent._loop is not None:
parent._call_soon_threadsafe(
parent._make_async_not_empty_notifier, parent._loop
)
parent._notify_async(parent._async_not_empty.notify)

def get(self, block: bool = True, timeout: OptFloat = None) -> T:
"""Remove and return an item from the queue.
Expand Down Expand Up @@ -428,10 +395,7 @@ def get(self, block: bool = True, timeout: OptFloat = None) -> T:
if parent._sync_not_full_waiting:
parent._sync_not_full.notify()
if parent._async_not_full_waiting:
if parent._loop is not None:
parent._call_soon_threadsafe(
parent._make_async_not_full_notifier, parent._loop
)
parent._notify_async(parent._async_not_full.notify)
return item

def put_nowait(self, item: T) -> None:
Expand Down Expand Up @@ -534,13 +498,13 @@ def put_nowait(self, item: T) -> None:
parent = self._parent
parent._check_closing()
with parent._sync_mutex:
loop = parent._get_loop()
parent._get_loop()
if 0 < parent._maxsize <= parent._qsize():
raise AsyncQueueFull

parent._put_internal(item)
if parent._async_not_empty_waiting:
parent._make_async_not_empty_notifier(loop)
parent._notify_async(parent._async_not_empty.notify)
if parent._sync_not_empty_waiting:
parent._sync_not_empty.notify()

Expand Down Expand Up @@ -583,11 +547,10 @@ def get_nowait(self) -> T:
if not parent._qsize():
raise AsyncQueueEmpty

loop = parent._get_loop()

parent._get_loop()
item = parent._get()
if parent._async_not_full_waiting:
parent._make_async_not_full_notifier(loop)
parent._notify_async(parent._async_not_full.notify)
if parent._sync_not_full_waiting:
parent._sync_not_full.notify()
return item
Expand All @@ -614,10 +577,7 @@ def task_done(self) -> None:
parent._unfinished_tasks -= 1
if parent._unfinished_tasks == 0:
if parent._async_tasks_done_waiting:
if parent._loop is not None:
parent._call_soon_threadsafe(
parent._make_async_tasks_done_notifier, parent._loop
)
parent._notify_async(parent._async_tasks_done.notify_all)
if parent._sync_tasks_done_waiting:
parent._sync_tasks_done.notify_all()

Expand Down
4 changes: 0 additions & 4 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ packages = find:
zip_safe = True
include_package_data = True

install_requires =
typing-extensions >= 4.0.0; python_version < "3.10"


[flake8]
exclude = .git,.env,__pycache__,.eggs
max-line-length = 88
Expand Down
4 changes: 2 additions & 2 deletions tests/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,8 @@ async def test_closed_loop_non_failing(self):
_q = janus.Queue(QUEUE_SIZE)
q = _q.sync_q
# we are pacthing loop to follow setUp/tearDown agreement
with patch.object(loop, "call_soon_threadsafe") as func:
func.side_effect = RuntimeError()
with patch.object(loop, "is_closed") as func:
func.return_value = True
task = loop.create_task(_q.async_q.get())
await asyncio.sleep(0)
try:
Expand Down

0 comments on commit 57ac3fc

Please sign in to comment.