diff --git a/janus/__init__.py b/janus/__init__.py index 71f85aa..b02acda 100644 --- a/janus/__init__.py +++ b/janus/__init__.py @@ -176,51 +176,53 @@ def _put_internal(self, item: T) -> None: self._unfinished_tasks += 1 self._finished.clear() + def _sync_not_empty_notifier(self) -> None: + with self._sync_mutex: + self._sync_not_empty.notify() + def _notify_sync_not_empty(self) -> None: - def f() -> None: - with self._sync_mutex: - self._sync_not_empty.notify() + fut = self._loop.run_in_executor(None, self._sync_not_empty_notifier) + fut.add_done_callback(self._pending.discard) + self._pending.add(fut) - self._loop.run_in_executor(None, f) + def _sync_not_full_notifier(self) -> None: + with self._sync_mutex: + self._sync_not_full.notify() def _notify_sync_not_full(self) -> None: - def f() -> None: - with self._sync_mutex: - self._sync_not_full.notify() - - fut = asyncio.ensure_future(self._loop.run_in_executor(None, f)) + fut = self._loop.run_in_executor(None, self._sync_not_full_notifier) fut.add_done_callback(self._pending.discard) self._pending.add(fut) - def _notify_async_not_empty(self, *, threadsafe: bool) -> None: - async def f() -> None: - async with self._async_mutex: - self._async_not_empty.notify() + async def _async_not_empty_notifier(self) -> None: + async with self._async_mutex: + self._async_not_empty.notify() - def task_maker() -> None: - task = self._loop.create_task(f()) - task.add_done_callback(self._pending.discard) - self._pending.add(task) + def _make_async_not_empty_notifier(self) -> None: + task = self._loop.create_task(self._async_not_empty_notifier()) + task.add_done_callback(self._pending.discard) + self._pending.add(task) + def _notify_async_not_empty(self, *, threadsafe: bool) -> None: if threadsafe: - self._call_soon_threadsafe(task_maker) + self._call_soon_threadsafe(self._make_async_not_empty_notifier) else: - self._call_soon(task_maker) + self._call_soon(self._make_async_not_empty_notifier) - def _notify_async_not_full(self, *, threadsafe: bool) -> None: - async def f() -> None: - async with self._async_mutex: - self._async_not_full.notify() + async def _async_not_full_notifier(self) -> None: + async with self._async_mutex: + self._async_not_full.notify() - def task_maker() -> None: - task = self._loop.create_task(f()) - task.add_done_callback(self._pending.discard) - self._pending.add(task) + def _make_async_not_full_notifier(self) -> None: + task = self._loop.create_task(self._async_not_full_notifier()) + task.add_done_callback(self._pending.discard) + self._pending.add(task) + def _notify_async_not_full(self, *, threadsafe: bool) -> None: if threadsafe: - self._call_soon_threadsafe(task_maker) + self._call_soon_threadsafe(self._make_async_not_full_notifier) else: - self._call_soon(task_maker) + self._call_soon(self._make_async_not_full_notifier) def _check_closing(self) -> None: if self._closing: