Skip to content

Commit

Permalink
Refactor cycles
Browse files Browse the repository at this point in the history
  • Loading branch information
x42005e1f authored Dec 11, 2024
1 parent e8ae472 commit e3a8eba
Showing 1 changed file with 19 additions and 28 deletions.
47 changes: 19 additions & 28 deletions janus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,18 +511,14 @@ async def put(self, item: T) -> None:
async with parent._async_not_full:
with parent._sync_mutex:
parent._get_loop() # check the event loop
if parent._maxsize > 0:
do_wait = True
while do_wait:
do_wait = parent._qsize() >= parent._maxsize
if do_wait:
parent._async_not_full_waiting += 1
parent._sync_mutex.release()
try:
await parent._async_not_full.wait()
finally:
parent._sync_mutex.acquire()
parent._async_not_full_waiting -= 1
while 0 < parent._maxsize <= parent._qsize():
parent._async_not_full_waiting += 1
parent._sync_mutex.release()
try:
await parent._async_not_full.wait()
finally:
parent._sync_mutex.acquire()
parent._async_not_full_waiting -= 1

parent._put_internal(item)
if parent._async_not_empty_waiting:
Expand All @@ -539,9 +535,8 @@ def put_nowait(self, item: T) -> None:
parent._check_closing()
with parent._sync_mutex:
loop = parent._get_loop()
if parent._maxsize > 0:
if parent._qsize() >= parent._maxsize:
raise AsyncQueueFull
if 0 < parent._maxsize <= parent._qsize():
raise AsyncQueueFull

parent._put_internal(item)
if parent._async_not_empty_waiting:
Expand All @@ -561,18 +556,14 @@ async def get(self) -> T:
async with parent._async_not_empty:
with parent._sync_mutex:
parent._get_loop() # check the event loop
do_wait = True
while do_wait:
do_wait = parent._qsize() == 0

if do_wait:
parent._async_not_empty_waiting += 1
parent._sync_mutex.release()
try:
await parent._async_not_empty.wait()
finally:
parent._sync_mutex.acquire()
parent._async_not_empty_waiting -= 1
while not parent._qsize():
parent._async_not_empty_waiting += 1
parent._sync_mutex.release()
try:
await parent._async_not_empty.wait()
finally:
parent._sync_mutex.acquire()
parent._async_not_empty_waiting -= 1

item = parent._get()
if parent._async_not_full_waiting:
Expand All @@ -589,7 +580,7 @@ def get_nowait(self) -> T:
parent = self._parent
parent._check_closing()
with parent._sync_mutex:
if parent._qsize() == 0:
if not parent._qsize():
raise AsyncQueueEmpty

loop = parent._get_loop()
Expand Down

0 comments on commit e3a8eba

Please sign in to comment.