Skip to content

Commit

Permalink
Make not-full and not-empty notificatios faster (#703)
Browse files Browse the repository at this point in the history
The speed-up removes creation of 4 internal functions per each `get` or
`put`, both sync and async calls are affected.
  • Loading branch information
asvetlov authored Dec 6, 2024
1 parent 3a81ce9 commit bbc350f
Showing 1 changed file with 31 additions and 29 deletions.
60 changes: 31 additions & 29 deletions janus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,51 +176,53 @@ def _put_internal(self, item: T) -> None:
self._unfinished_tasks += 1
self._finished.clear()

def _sync_not_empty_notifier(self) -> None:
with self._sync_mutex:
self._sync_not_empty.notify()

def _notify_sync_not_empty(self) -> None:
def f() -> None:
with self._sync_mutex:
self._sync_not_empty.notify()
fut = self._loop.run_in_executor(None, self._sync_not_empty_notifier)
fut.add_done_callback(self._pending.discard)
self._pending.add(fut)

self._loop.run_in_executor(None, f)
def _sync_not_full_notifier(self) -> None:
with self._sync_mutex:
self._sync_not_full.notify()

def _notify_sync_not_full(self) -> None:
def f() -> None:
with self._sync_mutex:
self._sync_not_full.notify()

fut = asyncio.ensure_future(self._loop.run_in_executor(None, f))
fut = self._loop.run_in_executor(None, self._sync_not_full_notifier)
fut.add_done_callback(self._pending.discard)
self._pending.add(fut)

def _notify_async_not_empty(self, *, threadsafe: bool) -> None:
async def f() -> None:
async with self._async_mutex:
self._async_not_empty.notify()
async def _async_not_empty_notifier(self) -> None:
async with self._async_mutex:
self._async_not_empty.notify()

def task_maker() -> None:
task = self._loop.create_task(f())
task.add_done_callback(self._pending.discard)
self._pending.add(task)
def _make_async_not_empty_notifier(self) -> None:
task = self._loop.create_task(self._async_not_empty_notifier())
task.add_done_callback(self._pending.discard)
self._pending.add(task)

def _notify_async_not_empty(self, *, threadsafe: bool) -> None:
if threadsafe:
self._call_soon_threadsafe(task_maker)
self._call_soon_threadsafe(self._make_async_not_empty_notifier)
else:
self._call_soon(task_maker)
self._call_soon(self._make_async_not_empty_notifier)

def _notify_async_not_full(self, *, threadsafe: bool) -> None:
async def f() -> None:
async with self._async_mutex:
self._async_not_full.notify()
async def _async_not_full_notifier(self) -> None:
async with self._async_mutex:
self._async_not_full.notify()

def task_maker() -> None:
task = self._loop.create_task(f())
task.add_done_callback(self._pending.discard)
self._pending.add(task)
def _make_async_not_full_notifier(self) -> None:
task = self._loop.create_task(self._async_not_full_notifier())
task.add_done_callback(self._pending.discard)
self._pending.add(task)

def _notify_async_not_full(self, *, threadsafe: bool) -> None:
if threadsafe:
self._call_soon_threadsafe(task_maker)
self._call_soon_threadsafe(self._make_async_not_full_notifier)
else:
self._call_soon(task_maker)
self._call_soon(self._make_async_not_full_notifier)

def _check_closing(self) -> None:
if self._closing:
Expand Down

0 comments on commit bbc350f

Please sign in to comment.