From ed380a4deb57d151ea354c46fd4134ec4c89a2d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natt=C5=8Dsai=20Mit=C5=8D?= Date: Tue, 1 Mar 2022 15:43:03 +0900 Subject: [PATCH 01/12] wip --- asynckivy/_queue.py | 332 ++++++++++++++++++++++++++++++ asynckivy/exceptions.py | 24 ++- asynckivy/queue.py | 89 ++++++++ setup.cfg | 2 +- tests/test_queue_common.py | 201 ++++++++++++++++++ tests/test_queue_normal.py | 307 +++++++++++++++++++++++++++ tests/test_queue_zero_capacity.py | 225 ++++++++++++++++++++ 7 files changed, 1178 insertions(+), 2 deletions(-) create mode 100644 asynckivy/_queue.py create mode 100644 asynckivy/queue.py create mode 100644 tests/test_queue_common.py create mode 100644 tests/test_queue_normal.py create mode 100644 tests/test_queue_zero_capacity.py diff --git a/asynckivy/_queue.py b/asynckivy/_queue.py new file mode 100644 index 0000000..8acd7b2 --- /dev/null +++ b/asynckivy/_queue.py @@ -0,0 +1,332 @@ +''' +Notes for maintainers +--------------------- + +ZeroCapacityQueue(以後はZQと略す)とNormalQueuen(以後はNQと略す)は容量以外にも以下の違いがある。 + +- ZQは get_nowait() 時にputterを見に行くのに対しNQは見に行かない。なのでNQはputterがあったとしてもキューが空の時はWouldBlobkを起こす。 +- put_nowait() も同じ。 +- ZQではitemの中継はputterとgetterの両方を同時に取り出てから行うため、putterを進めた時にキューを閉じられたとしてもgetterには影響しない。 +対してNQでは同時に取り出さないため、putterを進めた時にキューが閉じられればgetterはその影響をうける。 + +これらの違いは無くそうと思えば無くせるかもしれないが実装がかなり複雑になるので諦めた。 +''' + +__all__ = ('ZeroCapacityQueue', 'NormalQueue', ) +from typing import Tuple, Any +import types +from functools import partial +from collections import deque +from kivy.clock import Clock +from asynckivy import sleep_forever, TaskState, Task, get_current_task +from asynckivy.exceptions import WouldBlock, ClosedResourceError, EndOfResource + + +Item = Any # type-hint for queue item + + +class ZeroCapacityQueue: + '''Queue, optimized for zero capacity''' + + def __init__(self): + self._allows_to_put = True + self._allows_to_get = True + self._putters = deque() # queue of tuple(task, item) s + self._getters = deque() # queue of task._step_coro s + self._trigger_consume = Clock.create_trigger(self._consume) + + def __len__(self): + return 0 + size = property(__len__) + + @property + def capacity(self) -> int: + return 0 + + @property + def is_empty(self): + raise AttributeError("The meaning of 'empty' is ambiguous for a zero capacity queue.") + + @property + def is_full(self): + raise AttributeError("The meaning of 'full' is ambiguous for a zero capacity queue.") + + @property + def order(self): + return None + + @property + def container(self): + return None + + @types.coroutine + def get(self): + if not self._allows_to_get: + raise ClosedResourceError + if not self._allows_to_put: + raise EndOfResource + self._trigger_consume() + return (yield self._getters.append)[0][0] + + def get_nowait(self): + if not self._allows_to_get: + raise ClosedResourceError + if not self._allows_to_put: + raise EndOfResource + putter, item = self._pop_putter() + if putter is None: + raise WouldBlock + putter._step_coro() + return item + + async def put(self, item): + if not self._allows_to_put: + raise ClosedResourceError + self._trigger_consume() + self._putters.append((await get_current_task(), item, )) + await sleep_forever() + + def put_nowait(self, item): + if not self._allows_to_put: + raise ClosedResourceError + getter = self._pop_getter() + if getter is None: + raise WouldBlock + getter._step_coro(item) + + def close(self): + self._allows_to_put = False + + # LOAD_FAST + pop_putter = self._pop_putter + pop_getter = self._pop_getter + CRE = ClosedResourceError + EOR = EndOfResource + + # TODO: refactor after python3.7 ends + while True: + putter, __ = pop_putter() + if putter is None: + break + putter._throw_exc(CRE) + while True: + getter = pop_getter() + if getter is None: + break + getter._throw_exc(EOR) + + def fullclose(self): + self._allows_to_put = False + self._allows_to_get = False + + # LOAD_FAST + CRE = ClosedResourceError + pop_putter = self._pop_putter + pop_getter = self._pop_getter + + # TODO: refactor after python3.7 ends + while True: + task, __ = pop_putter() + if task is None: + break + task._throw_exc(CRE) + while True: + task = pop_getter() + if task is None: + break + task._throw_exc(CRE) + + async def __aiter__(self): + try: + while True: + yield await self.get() + except EndOfResource: + pass + + def _consume(self, __): + # LOAD_FAST + pop_putter = self._pop_putter + pop_getter = self._pop_getter + + while True: + getter = pop_getter() + if getter is None: + break + putter, item = pop_putter() + if putter is None: + self._getters.appendleft(getter._step_coro) + break + putter._step_coro() + getter._step_coro(item) + self._trigger_consume.cancel() + + def _pop_getter(self, *, _STARTED=TaskState.STARTED) -> Task: + '''Take out a next getter. Return None if no one is available.''' + getters = self._getters + while getters: + task = getters.popleft().__self__ + if task.state is _STARTED: + return task + + def _pop_putter(self, *, _STARTED=TaskState.STARTED) -> Tuple[Task, Item]: + '''Take out a next putter and its item. Return (None, None) if no one is available.''' + putters = self._putters + while putters: + task, item = putters.popleft() + if task.state is _STARTED: + return (task, item, ) + return (None, None, ) + + +class NormalQueue: + def __init__(self, *, capacity, order): + self._allows_to_put = True + self._allows_to_get = True + self._putters = deque() # queue of tuple(task, item) s + self._getters = deque() # queue of task._step_coro s + self._trigger_consume = Clock.create_trigger(self._consume) + self._init_container(capacity, order) + self._capacity = capacity + self._order = order + + def _init_container(self, capacity, order): + # If the capacity is 1, there is no point of re-ordering items. + # Therefore, does not use heapq for a performance reason. + if capacity == 1 or order == 'lifo': + c = [] + c_get = c.pop + c_put = c.append + elif order == 'fifo': + c = deque(maxlen=capacity) + c_get = c.popleft + c_put = c.append + elif order == 'priority': + import heapq + c = [] + c_get = partial(heapq.heappop, c) + c_put = partial(heapq.heappush, c) + else: + raise ValueError("'order' must be one of 'lifo', 'fifo' or 'priority'.") + self._c = c + self._c_get = c_get + self._c_put = c_put + + def __len__(self): + return len(self._c) + size = property(__len__) + + @property + def capacity(self) -> int: + return self._capacity + + @property + def is_empty(self): + return not self._c + + @property + def is_full(self): + return len(self._c) == self._capacity + + @property + def order(self): + return self._order + + @property + def container(self): + return self._c + + @types.coroutine + def get(self): + if not self._allows_to_get: + raise ClosedResourceError + if (not self._allows_to_put) and self.is_empty: + raise EndOfResource + self._trigger_consume() + return (yield self._getters.append)[0][0] + + def get_nowait(self): + if not self._allows_to_get: + raise ClosedResourceError + if self.is_empty: + if self._allows_to_put: + raise WouldBlock + raise EndOfResource + self._trigger_consume() + return self._c_get() + + put = ZeroCapacityQueue.put + + def put_nowait(self, item): + if not self._allows_to_put: + raise ClosedResourceError + if self.is_full: + raise WouldBlock + self._c_put(item) + self._trigger_consume() + + def close(self): + self._allows_to_put = False + + # LOAD_FAST + pop_putter = self._pop_putter + pop_getter = self._pop_getter + CRE = ClosedResourceError + EOR = EndOfResource + + # TODO: refactor after python3.7 ends + while True: + putter, __ = pop_putter() + if putter is None: + break + putter._throw_exc(CRE) + if not self.is_empty: + return + while True: + getter = pop_getter() + if getter is None: + break + getter._throw_exc(EOR) + + def fullclose(self): + self._c.clear() + ZeroCapacityQueue.fullclose(self) + + __aiter__ = ZeroCapacityQueue.__aiter__ + + def _consume(self, __): + # LOAD_FAST + getters = self._getters + putters = self._putters + pop_putter = self._pop_putter + pop_getter = self._pop_getter + c_put = self._c_put + c_get = self._c_get + + while True: + while not self.is_full: + putter, item = pop_putter() + if putter is None: + break + c_put(item) + putter._step_coro() + if (not getters) or self.is_empty: + break + while not self.is_empty: + getter = pop_getter() + if getter is None: + break + getter._step_coro(c_get()) + else: + if not self._allows_to_put: + EOR = EndOfResource # LOAD_FAST + while True: # TODO: refactor after Python3.7 ends + getter = pop_getter() + if getter is None: + break + getter._throw_exc(EOR) + if (not putters) or self.is_full: + break + self._trigger_consume.cancel() + + _pop_getter = ZeroCapacityQueue._pop_getter + _pop_putter = ZeroCapacityQueue._pop_putter diff --git a/asynckivy/exceptions.py b/asynckivy/exceptions.py index 7f1f754..53ab1ef 100644 --- a/asynckivy/exceptions.py +++ b/asynckivy/exceptions.py @@ -1,5 +1,27 @@ -__all__ = ('MotionEventAlreadyEndedError', ) +__all__ = ( + 'MotionEventAlreadyEndedError', + 'WouldBlock', 'ClosedResourceError', 'EndOfResource', +) +from asyncgui.exceptions import * # noqa class MotionEventAlreadyEndedError(Exception): """A MotionEvent has already ended.""" + + +class WouldBlock(Exception): + """(took from trio) + Raised by X_nowait functions if X would block. + """ + + +class ClosedResourceError(Exception): + """(took from trio) + Raised when attempting to use a resource after it has been closed. + """ + + +class EndOfResource(Exception): + """(similar to trio's EndOfChannel) + This is analogous to an "end-of-file" condition, but for resources in general. + """ diff --git a/asynckivy/queue.py b/asynckivy/queue.py new file mode 100644 index 0000000..9d35c07 --- /dev/null +++ b/asynckivy/queue.py @@ -0,0 +1,89 @@ +__all__ = ('Queue', ) +from collections import deque +from typing import Union + + +class Queue: + @property + def size(self) -> int: + '''Number of items in the queue. This equals to ``len(queue)``. ''' + + @property + def capacity(self) -> Union[int, None]: + '''Number of items allowed in the queue. None if unbounded.''' + + @property + def is_empty(self) -> bool: + '''Whether the queue is empty. If the capacity of the queue is zero, raise AttributeError.''' + + @property + def is_full(self) -> bool: + '''Whether the queue is full. If the capacity of the queue is zero, raise AttributeError.''' + + @property + def order(self) -> Union[str, None]: + ''' 'fifo', 'lifo' or 'priority'. None if the capacity of the queue is zero. ''' + + @property + def container(self) -> Union[list, deque, None]: + '''Return an underlying container used by this queue. + None if the capacity of the queue is zero no matter what the order is. + A deque if the order of the queue is 'fifo'. + A list otherwise. + ''' + + async def get(self): + '''Take out an item from the queue. If one is not available, wait until it is. + If the queue is empty, and is partially closed, raise EndOfResource. + If the queue is fully closed, raise ClosedResourceError. + ''' + + def get_nowait(self): + '''Take out an item from the queue if one is immediately available, else raise WouldBlock. + Everything else is the same as ``get``. + ''' + + async def put(self, item): + '''Put an item into the queue. If no free slots are available, wait until one is avaiable. + If the queue is partially or fully closed, raise ClosedResourceError. + ''' + + def put_nowait(self, item): + '''Put an item into the queue if a free slot is immediately available, else raise WouldBlock. + Everything else is the same as ``put``. + ''' + + def close(self): + '''Partially close the queue. No further putting opeations are allowed. ''' + + def fullclose(self): + '''Fully close the queue. No further putting/getting operations are allowed. All items in the queue are + discarded. + ''' + + def __aiter__(self): + '''Return an async iterator that repeatedly ``get`` an item from the queue until it raises EndOfResource.''' + + def __init__(self, *, capacity: Union[int, None]=0, order='fifo'): + ''' + :Parameters: + `capacity`: int, defaults to 0 + If this is None, the queue will have infinite capacity. + `order`: str, defaults to 'fifo' + The order of the queue items. One of 'fifo', 'lifo' or 'priority'. + ''' + # 単に False だと私が使っているeditor(Visual Studio Code)の色付けが崩れてしまう為 bool(False) + assert bool(False), "This is an abstract class" + + def __new__(cls, *, capacity: Union[int, None]=0, order='fifo'): + from . import _queue + if capacity is None: + pass + elif isinstance(capacity, int): + if capacity == 0: + return _queue.ZeroCapacityQueue() + elif capacity < 0: + raise ValueError(f"If 'capacity' is an integer, it must be zero or greater. (was {capacity})") + else: + raise TypeError(f"'capacity' must be None or an integer. (was {type(capacity)})") + return _queue.NormalQueue(capacity=capacity, order=order) diff --git a/setup.cfg b/setup.cfg index 845ed19..a0ee8a9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -16,7 +16,7 @@ packages = find: install_requires = asyncgui>=0.5,<0.6 [flake8] -ignore = C901 +ignore = C901,E252 per-file-ignores= ./asynckivy/__init__.py:F401,F403 ./examples/misc/speed_comparision_between_schedule_interval_and_sleep.py:E402 diff --git a/tests/test_queue_common.py b/tests/test_queue_common.py new file mode 100644 index 0000000..6df0964 --- /dev/null +++ b/tests/test_queue_common.py @@ -0,0 +1,201 @@ +import pytest +p = pytest.mark.parametrize +p_capacity = p('capacity', [0, 1, 2, None, ]) +p_capacity2 = p('capacity', [0, 1, 2, 3, 4, None, ]) + + +@pytest.fixture(autouse=True) +def autouse_kivy_clock(kivy_clock): + pass + + +@p('capacity', (-1., 0., 1., "", "str", tuple(), [], )) +def test_invalid_capacity_type(capacity): + from asynckivy.queue import Queue + with pytest.raises(TypeError): + Queue(capacity=capacity) + + +@p('capacity', (-1, )) +def test_invalid_capacity_value(capacity): + from asynckivy.queue import Queue + with pytest.raises(ValueError): + Queue(capacity=capacity) + + +@p('capacity', [0, 1, 2, None, ]) +@p('order', ['fifo', 'lifo', 'priority', ]) +def test_instance_type(capacity, order): + from asynckivy.queue import Queue + from asynckivy._queue import ZeroCapacityQueue, NormalQueue + q = Queue(capacity=capacity, order=order) + if capacity == 0: + assert isinstance(q, ZeroCapacityQueue) + else: + assert isinstance(q, NormalQueue) + + +@p_capacity +@p('fullclose', [True, False, ]) +@p('nowait', [True, False, ]) +def test_put_to_closed_queue(capacity, fullclose, nowait): + import asynckivy as ak + from asynckivy.queue import Queue + q = Queue(capacity=capacity) + q.fullclose() if fullclose else q.close() + with pytest.raises(ak.ClosedResourceError): + q.put_nowait('Z') if nowait else ak.start(q.put('Z')) + + +@p_capacity +@p('fullclose', [True, False, ]) +@p('nowait', [True, False, ]) +def test_get_to_closed_queue(capacity, fullclose, nowait): + import asynckivy as ak + from asynckivy.queue import Queue + q = Queue(capacity=capacity) + if fullclose: + q.fullclose() + exc = ak.ClosedResourceError + else: + q.close() + exc = ak.EndOfResource + with pytest.raises(exc): + q.get_nowait() if nowait else ak.start(q.get()) + + +@p_capacity2 +def test_async_for(kivy_clock, capacity): + import asynckivy as ak + from asynckivy.queue import Queue + + async def producer(q, items): + for i in items: + await q.put(i) + + async def consumer(q): + return ''.join([item async for item in q]) + + q = Queue(capacity=capacity) + p = ak.start(producer(q, 'ABC')) + c = ak.start(consumer(q)) + assert not p.done + assert not c.done + kivy_clock.tick() + assert p.done + assert not c.done + q.close() + assert c.result == 'ABC' + + +@p_capacity2 +def test_one_producer_and_two_consumers(kivy_clock, capacity): + import asynckivy as ak + from asynckivy.queue import Queue + + async def producer(q): + for c in 'A1B2C3': + await q.put(c) + q.close() + + async def consumer(q): + return ''.join([item async for item in q]) + + q = Queue(capacity=capacity) + p = ak.start(producer(q)) + c1 = ak.start(consumer(q)) + c2 = ak.start(consumer(q)) + assert not p.done + assert not c1.done + assert not c2.done + kivy_clock.tick() + assert p.done + assert c1.result == 'ABC' + assert c2.result == '123' + + +@p_capacity2 +def test_two_producers_and_one_consumer(kivy_clock, capacity): + import asynckivy as ak + from asynckivy.queue import Queue + + async def producer(q, items): + for i in items: + await q.put(i) + + async def consumer(q): + return ''.join([item async for item in q]) + + q = Queue(capacity=capacity) + p1 = ak.start(producer(q, 'ABC')) + p2 = ak.start(producer(q, '123')) + c = ak.start(consumer(q)) + assert not p1.done + assert not p2.done + assert not c.done + kivy_clock.tick() + assert p1.done + assert p2.done + assert not c.done + q.close() + assert c.result == 'A1B2C3' + + +@p_capacity2 +def test_two_producers_and_two_consumers(kivy_clock, capacity): + import asynckivy as ak + from asynckivy.queue import Queue + + async def producer(q, items): + for i in items: + await q.put(i) + + async def consumer(q): + return ''.join([item async for item in q]) + + q = Queue(capacity=capacity) + p1 = ak.start(producer(q, 'ABC')) + p2 = ak.start(producer(q, '123')) + c1 = ak.start(consumer(q)) + c2 = ak.start(consumer(q)) + assert not p1.done + assert not p2.done + assert not c1.done + assert not c2.done + kivy_clock.tick() + assert p1.done + assert p2.done + assert not c1.done + assert not c2.done + q.close() + assert c1.result == 'ABC' + assert c2.result == '123' + + +@p('n_producers', range(3)) +@p('n_consumers', range(3)) +@p('fullclose', [True, False, ]) +@p_capacity2 +def test_close_a_queue_while_it_holding_putters_and_getters(n_producers, n_consumers, fullclose, capacity): + import asynckivy as ak + from asynckivy.queue import Queue + + async def producer(q): + with pytest.raises(ak.ClosedResourceError): + await q.put(None) + async def consumer(q): + with pytest.raises(ak.ClosedResourceError if fullclose else ak.EndOfResource): + await q.get() + + q = Queue(capacity=capacity) + producers = [ak.start(producer(q)) for __ in range(n_producers)] + consumers = [ak.start(consumer(q)) for __ in range(n_consumers)] + for p in producers: + assert not p.done + for c in consumers: + assert not c.done + q.fullclose() if fullclose else q.close() + for p in producers: + assert p.done + for c in consumers: + assert c.done diff --git a/tests/test_queue_normal.py b/tests/test_queue_normal.py new file mode 100644 index 0000000..4b72162 --- /dev/null +++ b/tests/test_queue_normal.py @@ -0,0 +1,307 @@ +import pytest +p = pytest.mark.parametrize +p_order = p('order', ('lifo', 'fifo', 'priority')) +p_capacity = p('capacity', [1, 2, None, ]) +p_capacity2 = p('capacity', [1, 2, 3, 4, None, ]) + + +@pytest.fixture(autouse=True) +def autouse_kivy_clock(kivy_clock): + pass + + +@p_order +def test_various_statistics(kivy_clock, order): + import asynckivy as ak + from asynckivy.queue import Queue + q = Queue(capacity=2, order=order) + assert q.order == order + assert len(q) == 0 + assert q.capacity == 2 + assert q.size == 0 + assert q.is_empty + assert not q.is_full + ak.start(q.put(1)) + assert q.size == 0 + assert q.is_empty + assert not q.is_full + kivy_clock.tick() + assert q.size == 1 + assert not q.is_empty + assert not q.is_full + ak.start(q.put(2)) + assert q.size == 1 + assert not q.is_empty + assert not q.is_full + kivy_clock.tick() + assert q.size == 2 + assert not q.is_empty + assert q.is_full + ak.start(q.get()) + assert q.size == 2 + assert not q.is_empty + assert q.is_full + kivy_clock.tick() + assert q.size == 1 + assert not q.is_empty + assert not q.is_full + ak.start(q.get()) + assert q.size == 1 + assert not q.is_empty + assert not q.is_full + kivy_clock.tick() + assert q.size == 0 + assert q.is_empty + assert not q.is_full + + +@p_order +def test_various_statistics_nowait(order): + from asynckivy.queue import Queue + q = Queue(capacity=2, order=order) + assert q.order == order + assert len(q) == 0 + assert q.capacity == 2 + assert q.size == 0 + assert q.is_empty + assert not q.is_full + q.put_nowait(1) + assert q.size == 1 + assert not q.is_empty + assert not q.is_full + q.put_nowait(2) + assert q.size == 2 + assert not q.is_empty + assert q.is_full + q.get_nowait() + assert q.size == 1 + assert not q.is_empty + assert not q.is_full + q.get_nowait() + assert q.size == 0 + assert q.is_empty + assert not q.is_full + + +@p_capacity +@p_order +def test_container_type(capacity, order): + from asynckivy.queue import Queue + q = Queue(capacity=capacity, order=order) + if capacity != 1 and order == 'fifo': + from collections import deque + assert isinstance(q.container, deque) + else: + assert isinstance(q.container, list) + + +@p_capacity +def test_get_nowait_while_there_are_no_putters_and_no_items(capacity): + import asynckivy as ak + from asynckivy.queue import Queue + q = Queue(capacity=capacity) + with pytest.raises(ak.WouldBlock): + q.get_nowait() + + +@p_capacity +def test_get_nowait_while_there_is_a_putter_but_no_items(capacity): + import asynckivy as ak + from asynckivy.queue import Queue + q = Queue(capacity=capacity) + ak.start(q.put('A')) + with pytest.raises(ak.WouldBlock): + q.get_nowait() + + +@p('capacity', [1, 2, ]) +def test_put_nowait_while_there_are_no_getters_and_full_of_items(capacity): + import asynckivy as ak + from asynckivy.queue import Queue + q = Queue(capacity=capacity) + for i in range(capacity): + q._c_put(i) + assert q.is_full + with pytest.raises(ak.WouldBlock): + q.put_nowait(99) + + +@p('capacity', [1, 2, ]) +def test_put_nowait_while_there_is_a_getter_and_full_of_items(capacity): + import asynckivy as ak + from asynckivy.queue import Queue + q = Queue(capacity=capacity) + for i in range(capacity): + q._c_put(i) + assert q.is_full + ak.start(q.get()) + with pytest.raises(ak.WouldBlock): + q.put_nowait(99) + + +def test_put_nowait_to_unbounded_queue_that_has_no_getters(): + from asynckivy.queue import Queue + q = Queue(capacity=None) + for i in range(10): + q._c_put(i) + assert not q.is_full + q.put_nowait('A') + + +def test_put_nowait_to_unbounded_queue_that_has_a_getter(): + import asynckivy as ak + from asynckivy.queue import Queue + q = Queue(capacity=None) + for i in range(10): + q._c_put(i) + assert not q.is_full + ak.start(q.get()) + q.put_nowait('A') + + +@p('fullclose', [True, False, ]) +@p_capacity2 +def test_putter_triggers_close(kivy_clock, fullclose, capacity): + import asynckivy as ak + from asynckivy.queue import Queue + + async def producer1(q): + await q.put('A') + q.fullclose() if fullclose else q.close() + async def producer2(q): + with pytest.raises(ak.ClosedResourceError): + await q.put('B') + async def consumer1(q): + if fullclose: + with pytest.raises(ak.ClosedResourceError): + await q.get() + else: + assert await q.get() == 'A' + async def consumer2(q): + exc = ak.ClosedResourceError if fullclose else ak.EndOfResource + with pytest.raises(exc): + await q.get() + + q = Queue(capacity=capacity) + p1 = ak.start(producer1(q)) + p2 = ak.start(producer2(q)) + c1 = ak.start(consumer1(q)) + c2 = ak.start(consumer2(q)) + assert not p1.done + assert not p2.done + assert not c1.done + assert not c2.done + kivy_clock.tick() + assert p1.done + assert p2.done + assert c1.done + assert c2.done + + +@p('fullclose', [True, False, ]) +@p_capacity2 +def test_getter_triggers_close(kivy_clock, fullclose, capacity): + import asynckivy as ak + from asynckivy.queue import Queue + + async def producer1(q): + await q.put('A') + async def producer2(q): + if capacity is not None and capacity < 2: + with pytest.raises(ak.ClosedResourceError): + await q.put('B') + else: + await q.put('B') + async def consumer1(q): + assert await q.get() == 'A' + q.fullclose() if fullclose else q.close() + async def consumer2(q): + if fullclose: + with pytest.raises(ak.ClosedResourceError): + await q.get() + elif capacity is not None and capacity < 2: + with pytest.raises(ak.EndOfResource): + await q.get() + else: + assert await q.get() == 'B' + + q = Queue(capacity=capacity) + p1 = ak.start(producer1(q)) + p2 = ak.start(producer2(q)) + c1 = ak.start(consumer1(q)) + c2 = ak.start(consumer2(q)) + assert not p1.done + assert not p2.done + assert not c1.done + assert not c2.done + kivy_clock.tick() + assert p1.done + assert p2.done + assert c1.done + assert c2.done + + +@p('capacity', [4, 5, None, ]) +@p('order,input,expect', [('fifo', '0123', '0123'), ('lifo', '0123', '3210'), ('priority', '3102', '0123'), ]) +def test_item_order__enough_capacity(kivy_clock, capacity, order, input, expect): + import asynckivy as ak + from asynckivy.queue import Queue + + async def producer(q): + for c in input: + await q.put(int(c)) + q.close() + + async def consumer(q): + return ''.join([str(i) async for i in q]) + + q = Queue(capacity=capacity, order=order) + p = ak.start(producer(q)) + c = ak.start(consumer(q)) + kivy_clock.tick() + assert p.done + assert c.result == expect + + +@p('order,input,expect', [('fifo', '0123', '0123'), ('lifo', '0123', '1032'), ('priority', '3102', '1302'), ]) +def test_item_order_2capacity(kivy_clock, order, input, expect): + '''NOTE: これは仕様というよりは現状の実装に対するtest''' + import asynckivy as ak + from asynckivy.queue import Queue + + async def producer(q): + for c in input: + await q.put(int(c)) + q.close() + + async def consumer(q): + return ''.join([str(i) async for i in q]) + + q = Queue(capacity=2, order=order) + p = ak.start(producer(q)) + c = ak.start(consumer(q)) + kivy_clock.tick() + assert p.done + assert c.result == expect + + +@p('order,input,expect', [('fifo', '0123', '0123'), ('lifo', '0123', '2103'), ('priority', '3102', '0132'), ]) +def test_item_3capacity(kivy_clock, order, input, expect): + '''NOTE: これは仕様というよりは現状の実装に対するtest''' + import asynckivy as ak + from asynckivy.queue import Queue + + async def producer(q): + for c in input: + await q.put(int(c)) + q.close() + + async def consumer(q): + return ''.join([str(i) async for i in q]) + + q = Queue(capacity=3, order=order) + p = ak.start(producer(q)) + c = ak.start(consumer(q)) + kivy_clock.tick() + assert p.done + assert c.result == expect diff --git a/tests/test_queue_zero_capacity.py b/tests/test_queue_zero_capacity.py new file mode 100644 index 0000000..408f61c --- /dev/null +++ b/tests/test_queue_zero_capacity.py @@ -0,0 +1,225 @@ +import pytest +p = pytest.mark.parametrize + + +@pytest.fixture() +def q(kivy_clock): # Queue生成時にClockを使っているのでkivy_clockが要る + from asynckivy.queue import Queue + return Queue(capacity=0) + + +def test_size_and_len_and_capacity(q): + import asynckivy as ak + assert q.size == 0 + assert len(q) == 0 + assert q.capacity == 0 + ak.start(q.put('A')) + assert q.size == 0 + assert len(q) == 0 + assert q.capacity == 0 + + +def test_is_empty(q): + import asynckivy as ak + with pytest.raises(AttributeError): + q.is_empty + ak.start(q.put('A')) + with pytest.raises(AttributeError): + q.is_empty + + +def test_is_full(q): + import asynckivy as ak + with pytest.raises(AttributeError): + q.is_full + ak.start(q.put('A')) + with pytest.raises(AttributeError): + q.is_full + + +def test_order(q): + assert q.order is None + + +def test_container(q): + assert q.container is None + + +def test_get_nowait_while_there_are_no_putters(q): + import asynckivy as ak + with pytest.raises(ak.WouldBlock): + q.get_nowait() + + +def test_put_nowait_while_there_are_no_getters(q): + import asynckivy as ak + with pytest.raises(ak.WouldBlock): + q.put_nowait('A') + + +@p('fullclose', [True, False, ]) +def test_get_nowait_triggers_close(q, fullclose): + import asynckivy as ak + async def producer1(q): + await q.put('A') + q.fullclose() if fullclose else q.close() + async def producer2(q): + with pytest.raises(ak.ClosedResourceError): + await q.put('B') + async def consumer(q): + with pytest.raises(ak.ClosedResourceError if fullclose else ak.EndOfResource): + await q.get() + p1 = ak.start(producer1(q)) + p2 = ak.start(producer2(q)) + c = ak.start(consumer(q)) + assert not p1.done + assert not p2.done + assert not c.done + assert q.get_nowait() == 'A' + assert p1.done + assert p2.done + assert c.done + + +@p('fullclose', [True, False, ]) +def test_putter_triggers_close(kivy_clock, q, fullclose): + import asynckivy as ak + async def producer1(q): + await q.put('A') + q.fullclose() if fullclose else q.close() + async def producer2(q): + with pytest.raises(ak.ClosedResourceError): + await q.put('B') + async def consumer1(q): + assert await q.get() == 'A' + async def consumer2(q): + with pytest.raises(ak.ClosedResourceError if fullclose else ak.EndOfResource): + await q.get() + p1 = ak.start(producer1(q)) + p2 = ak.start(producer2(q)) + c1 = ak.start(consumer1(q)) + c2 = ak.start(consumer2(q)) + assert not p1.done + assert not p2.done + assert not c1.done + assert not c2.done + kivy_clock.tick() + assert p1.done + assert p2.done + assert c1.done + assert c2.done + + +@p('fullclose', [True, False, ]) +def test_put_nowait_triggers_close(q, fullclose): + import asynckivy as ak + async def producer(q): + with pytest.raises(ak.ClosedResourceError): + await q.put('B') + async def consumer1(q): + assert await q.get() == 'A' + q.fullclose() if fullclose else q.close() + async def consumer2(q): + with pytest.raises(ak.ClosedResourceError if fullclose else ak.EndOfResource): + await q.get() + p = ak.start(producer(q)) + c1 = ak.start(consumer1(q)) + c2 = ak.start(consumer2(q)) + assert not p.done + assert not c1.done + assert not c2.done + q.put_nowait('A') + assert p.done + assert c1.done + assert c2.done + + +@p('fullclose', [True, False, ]) +def test_getter_triggers_close(kivy_clock, q, fullclose): + import asynckivy as ak + async def producer1(q): + await q.put('A') + async def producer2(q): + with pytest.raises(ak.ClosedResourceError): + await q.put('B') + async def consumer1(q): + assert await q.get() == 'A' + q.fullclose() if fullclose else q.close() + async def consumer2(q): + with pytest.raises(ak.ClosedResourceError if fullclose else ak.EndOfResource): + await q.get() + p1 = ak.start(producer1(q)) + p2 = ak.start(producer2(q)) + c1 = ak.start(consumer1(q)) + c2 = ak.start(consumer2(q)) + assert not p1.done + assert not p2.done + assert not c1.done + assert not c2.done + kivy_clock.tick() + assert p1.done + assert p2.done + assert c1.done + assert c2.done + + +def test_cancel_putter(kivy_clock, q): + import asynckivy as ak + async def consumer(q): + return ''.join([item async for item in q]) + p1 = ak.start(q.put('A')) + p2 = ak.start(q.put('B')) + c = ak.start(consumer(q)) + assert not p1.done + assert not p2.done + assert not c.done + p1.cancel() + kivy_clock.tick() + assert p1.cancelled + assert p2.done + assert not c.done + q.close() + assert c.result == 'B' + + +def test_cancel_getter(kivy_clock, q): + import asynckivy as ak + async def producer(q): + for i in 'ABCD': + await q.put(i) + q.close() + async def consumer(q): + return ''.join([item async for item in q]) + p = ak.start(producer(q)) + c1 = ak.start(consumer(q)) + c2 = ak.start(consumer(q)) + assert not p.done + assert not c1.done + assert not c2.done + c1.cancel() + kivy_clock.tick() + assert p.done + assert c1.cancelled + assert c2.result == 'ABCD' + + +def test_wait_for_a_frame_before_get(kivy_clock, q): + import asynckivy as ak + p = ak.start(q.put('A')) + kivy_clock.tick() + assert not p.done + c = ak.start(q.get()) + kivy_clock.tick() + assert p.done + assert c.result == 'A' + + +def test_wait_for_a_frame_before_put(kivy_clock, q): + import asynckivy as ak + c = ak.start(q.get()) + kivy_clock.tick() + assert not c.done + p = ak.start(q.put('A')) + kivy_clock.tick() + assert p.done + assert c.result == 'A' From fe7c0fd08b57dc0854e279a9dd41c2d9f4ff5bb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natt=C5=8Dsai=20Mit=C5=8D?= Date: Tue, 1 Mar 2022 22:49:26 +0900 Subject: [PATCH 02/12] add ah example --- examples/queue.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 examples/queue.py diff --git a/examples/queue.py b/examples/queue.py new file mode 100644 index 0000000..6c93332 --- /dev/null +++ b/examples/queue.py @@ -0,0 +1,29 @@ +from kivy.app import App +import asynckivy as ak +from asynckivy.queue import Queue + + +async def main(): + async def producer(q, items): + for c in items: + await q.put(c) + print("produced :", c) + q.close() + + async def consumer(q, name): + async for c in q: + print(f"consumed ({name}):", c) + + from string import ascii_uppercase + q = Queue() + await ak.and_( + producer(q, ascii_uppercase), + consumer(q, 'A'), + consumer(q, 'B'), + ) + print('Done all tasks') + App.get_running_app().stop() + + +ak.start_soon(main()) +App().run() From ddf6dba1a6bd1a33465c7fec2159e99c5c51d182 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natt=C5=8Dsai=20Mit=C5=8D?= Date: Thu, 3 Mar 2022 14:35:50 +0900 Subject: [PATCH 03/12] fix typo --- asynckivy/_queue.py | 4 ++-- asynckivy/queue.py | 2 +- tests/test_sleep.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/asynckivy/_queue.py b/asynckivy/_queue.py index 8acd7b2..30ec628 100644 --- a/asynckivy/_queue.py +++ b/asynckivy/_queue.py @@ -2,9 +2,9 @@ Notes for maintainers --------------------- -ZeroCapacityQueue(以後はZQと略す)とNormalQueuen(以後はNQと略す)は容量以外にも以下の違いがある。 +ZeroCapacityQueue(以後はZQと略す)とNormalQueue(以後はNQと略す)は容量以外にも以下の違いがある。 -- ZQは get_nowait() 時にputterを見に行くのに対しNQは見に行かない。なのでNQはputterがあったとしてもキューが空の時はWouldBlobkを起こす。 +- ZQは get_nowait() 時にputterを見に行くのに対しNQは見に行かない。なのでNQはputterがあったとしてもキューが空の時はWouldBlockを起こす。 - put_nowait() も同じ。 - ZQではitemの中継はputterとgetterの両方を同時に取り出てから行うため、putterを進めた時にキューを閉じられたとしてもgetterには影響しない。 対してNQでは同時に取り出さないため、putterを進めた時にキューが閉じられればgetterはその影響をうける。 diff --git a/asynckivy/queue.py b/asynckivy/queue.py index 9d35c07..6c158f9 100644 --- a/asynckivy/queue.py +++ b/asynckivy/queue.py @@ -44,7 +44,7 @@ def get_nowait(self): ''' async def put(self, item): - '''Put an item into the queue. If no free slots are available, wait until one is avaiable. + '''Put an item into the queue. If no free slots are available, wait until one is available. If the queue is partially or fully closed, raise ClosedResourceError. ''' diff --git a/tests/test_sleep.py b/tests/test_sleep.py index 01e5150..f1a418f 100644 --- a/tests/test_sleep.py +++ b/tests/test_sleep.py @@ -24,7 +24,7 @@ def test_sleep_free(): import asynckivy as ak if not hasattr(Clock, 'create_trigger_free'): - pytest.skip("free-type Clock is not avaiable") + pytest.skip("free-type Clock is not available") Clock.tick() task = ak.start(ak.sleep_free(.1)) assert not task.done From 3b545458caa2f33dfcdf54707152151814bd132f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natt=C5=8Dsai=20Mit=C5=8D?= Date: Thu, 3 Mar 2022 14:38:18 +0900 Subject: [PATCH 04/12] remove 'container' property --- asynckivy/_queue.py | 8 -------- asynckivy/queue.py | 9 --------- tests/test_queue_normal.py | 4 ++-- tests/test_queue_zero_capacity.py | 4 ---- 4 files changed, 2 insertions(+), 23 deletions(-) diff --git a/asynckivy/_queue.py b/asynckivy/_queue.py index 30ec628..ffc9297 100644 --- a/asynckivy/_queue.py +++ b/asynckivy/_queue.py @@ -55,10 +55,6 @@ def is_full(self): def order(self): return None - @property - def container(self): - return None - @types.coroutine def get(self): if not self._allows_to_get: @@ -231,10 +227,6 @@ def is_full(self): def order(self): return self._order - @property - def container(self): - return self._c - @types.coroutine def get(self): if not self._allows_to_get: diff --git a/asynckivy/queue.py b/asynckivy/queue.py index 6c158f9..d723f9c 100644 --- a/asynckivy/queue.py +++ b/asynckivy/queue.py @@ -1,5 +1,4 @@ __all__ = ('Queue', ) -from collections import deque from typing import Union @@ -24,14 +23,6 @@ def is_full(self) -> bool: def order(self) -> Union[str, None]: ''' 'fifo', 'lifo' or 'priority'. None if the capacity of the queue is zero. ''' - @property - def container(self) -> Union[list, deque, None]: - '''Return an underlying container used by this queue. - None if the capacity of the queue is zero no matter what the order is. - A deque if the order of the queue is 'fifo'. - A list otherwise. - ''' - async def get(self): '''Take out an item from the queue. If one is not available, wait until it is. If the queue is empty, and is partially closed, raise EndOfResource. diff --git a/tests/test_queue_normal.py b/tests/test_queue_normal.py index 4b72162..805a551 100644 --- a/tests/test_queue_normal.py +++ b/tests/test_queue_normal.py @@ -90,9 +90,9 @@ def test_container_type(capacity, order): q = Queue(capacity=capacity, order=order) if capacity != 1 and order == 'fifo': from collections import deque - assert isinstance(q.container, deque) + assert isinstance(q._c, deque) else: - assert isinstance(q.container, list) + assert isinstance(q._c, list) @p_capacity diff --git a/tests/test_queue_zero_capacity.py b/tests/test_queue_zero_capacity.py index 408f61c..b93b410 100644 --- a/tests/test_queue_zero_capacity.py +++ b/tests/test_queue_zero_capacity.py @@ -41,10 +41,6 @@ def test_order(q): assert q.order is None -def test_container(q): - assert q.container is None - - def test_get_nowait_while_there_are_no_putters(q): import asynckivy as ak with pytest.raises(ak.WouldBlock): From 1584d10a8b499e55d561f116950ad7039737d603 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natt=C5=8Dsai=20Mit=C5=8D?= Date: Thu, 3 Mar 2022 14:41:14 +0900 Subject: [PATCH 05/12] trivial fix --- tests/test_queue_common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_queue_common.py b/tests/test_queue_common.py index 6df0964..42e576d 100644 --- a/tests/test_queue_common.py +++ b/tests/test_queue_common.py @@ -23,7 +23,7 @@ def test_invalid_capacity_value(capacity): Queue(capacity=capacity) -@p('capacity', [0, 1, 2, None, ]) +@p_capacity @p('order', ['fifo', 'lifo', 'priority', ]) def test_instance_type(capacity, order): from asynckivy.queue import Queue From 8ca55653216dbf6cc42a87b223f3a9912d8d49ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natt=C5=8Dsai=20Mit=C5=8D?= Date: Thu, 3 Mar 2022 15:09:25 +0900 Subject: [PATCH 06/12] rename 'priority' to 'smallest' --- asynckivy/_queue.py | 4 ++-- asynckivy/queue.py | 4 ++-- tests/test_queue_common.py | 2 +- tests/test_queue_normal.py | 8 ++++---- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/asynckivy/_queue.py b/asynckivy/_queue.py index ffc9297..f80e311 100644 --- a/asynckivy/_queue.py +++ b/asynckivy/_queue.py @@ -196,13 +196,13 @@ def _init_container(self, capacity, order): c = deque(maxlen=capacity) c_get = c.popleft c_put = c.append - elif order == 'priority': + elif order == 'smallest': import heapq c = [] c_get = partial(heapq.heappop, c) c_put = partial(heapq.heappush, c) else: - raise ValueError("'order' must be one of 'lifo', 'fifo' or 'priority'.") + raise ValueError("'order' must be one of 'lifo', 'fifo' or 'smallest'.") self._c = c self._c_get = c_get self._c_put = c_put diff --git a/asynckivy/queue.py b/asynckivy/queue.py index d723f9c..2a2aeaf 100644 --- a/asynckivy/queue.py +++ b/asynckivy/queue.py @@ -21,7 +21,7 @@ def is_full(self) -> bool: @property def order(self) -> Union[str, None]: - ''' 'fifo', 'lifo' or 'priority'. None if the capacity of the queue is zero. ''' + ''' 'fifo', 'lifo' or 'smallest'. None if the capacity of the queue is zero. ''' async def get(self): '''Take out an item from the queue. If one is not available, wait until it is. @@ -61,7 +61,7 @@ def __init__(self, *, capacity: Union[int, None]=0, order='fifo'): `capacity`: int, defaults to 0 If this is None, the queue will have infinite capacity. `order`: str, defaults to 'fifo' - The order of the queue items. One of 'fifo', 'lifo' or 'priority'. + The order of the queue items. One of 'fifo', 'lifo' or 'smallest'. ''' # 単に False だと私が使っているeditor(Visual Studio Code)の色付けが崩れてしまう為 bool(False) assert bool(False), "This is an abstract class" diff --git a/tests/test_queue_common.py b/tests/test_queue_common.py index 42e576d..b902edf 100644 --- a/tests/test_queue_common.py +++ b/tests/test_queue_common.py @@ -24,7 +24,7 @@ def test_invalid_capacity_value(capacity): @p_capacity -@p('order', ['fifo', 'lifo', 'priority', ]) +@p('order', ['fifo', 'lifo', 'smallest', ]) def test_instance_type(capacity, order): from asynckivy.queue import Queue from asynckivy._queue import ZeroCapacityQueue, NormalQueue diff --git a/tests/test_queue_normal.py b/tests/test_queue_normal.py index 805a551..ed93894 100644 --- a/tests/test_queue_normal.py +++ b/tests/test_queue_normal.py @@ -1,6 +1,6 @@ import pytest p = pytest.mark.parametrize -p_order = p('order', ('lifo', 'fifo', 'priority')) +p_order = p('order', ('lifo', 'fifo', 'smallest')) p_capacity = p('capacity', [1, 2, None, ]) p_capacity2 = p('capacity', [1, 2, 3, 4, None, ]) @@ -242,7 +242,7 @@ async def consumer2(q): @p('capacity', [4, 5, None, ]) -@p('order,input,expect', [('fifo', '0123', '0123'), ('lifo', '0123', '3210'), ('priority', '3102', '0123'), ]) +@p('order,input,expect', [('fifo', '0123', '0123'), ('lifo', '0123', '3210'), ('smallest', '3102', '0123'), ]) def test_item_order__enough_capacity(kivy_clock, capacity, order, input, expect): import asynckivy as ak from asynckivy.queue import Queue @@ -263,7 +263,7 @@ async def consumer(q): assert c.result == expect -@p('order,input,expect', [('fifo', '0123', '0123'), ('lifo', '0123', '1032'), ('priority', '3102', '1302'), ]) +@p('order,input,expect', [('fifo', '0123', '0123'), ('lifo', '0123', '1032'), ('smallest', '3102', '1302'), ]) def test_item_order_2capacity(kivy_clock, order, input, expect): '''NOTE: これは仕様というよりは現状の実装に対するtest''' import asynckivy as ak @@ -285,7 +285,7 @@ async def consumer(q): assert c.result == expect -@p('order,input,expect', [('fifo', '0123', '0123'), ('lifo', '0123', '2103'), ('priority', '3102', '0132'), ]) +@p('order,input,expect', [('fifo', '0123', '0123'), ('lifo', '0123', '2103'), ('smallest', '3102', '0132'), ]) def test_item_3capacity(kivy_clock, order, input, expect): '''NOTE: これは仕様というよりは現状の実装に対するtest''' import asynckivy as ak From cf3fc873b545156c0bf18b4eb69989b08c3d5afb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natt=C5=8Dsai=20Mit=C5=8D?= Date: Thu, 3 Mar 2022 15:09:50 +0900 Subject: [PATCH 07/12] modify example --- examples/one_producer_two_consumers.py | 34 ++++++++++++++++++++++ examples/queue.py | 29 ------------------- examples/two_producers_one_consumer.py | 40 ++++++++++++++++++++++++++ 3 files changed, 74 insertions(+), 29 deletions(-) create mode 100644 examples/one_producer_two_consumers.py delete mode 100644 examples/queue.py create mode 100644 examples/two_producers_one_consumer.py diff --git a/examples/one_producer_two_consumers.py b/examples/one_producer_two_consumers.py new file mode 100644 index 0000000..1479c67 --- /dev/null +++ b/examples/one_producer_two_consumers.py @@ -0,0 +1,34 @@ +from kivy.config import Config +Config.set('graphics', 'width', 100) +Config.set('graphics', 'height', 100) +from kivy.app import App +import asynckivy as ak +from asynckivy.queue import Queue + + +async def main(): + async def producer(name, q, items): + for c in items: + await q.put(c) + print(name, "produced", c) + await ak.sleep(.1) + q.close() + + async def consumer(name, q): + async for c in q: + print(name, "consumed", c) + await ak.sleep(.3) + + from string import ascii_lowercase + q = Queue(capacity=None) + await ak.and_( + producer('P ', q, ascii_lowercase), + consumer('C1', q), + consumer('C2', q), + ) + print('Done all tasks') + App.get_running_app().stop() + + +ak.start_soon(main()) +App().run() diff --git a/examples/queue.py b/examples/queue.py deleted file mode 100644 index 6c93332..0000000 --- a/examples/queue.py +++ /dev/null @@ -1,29 +0,0 @@ -from kivy.app import App -import asynckivy as ak -from asynckivy.queue import Queue - - -async def main(): - async def producer(q, items): - for c in items: - await q.put(c) - print("produced :", c) - q.close() - - async def consumer(q, name): - async for c in q: - print(f"consumed ({name}):", c) - - from string import ascii_uppercase - q = Queue() - await ak.and_( - producer(q, ascii_uppercase), - consumer(q, 'A'), - consumer(q, 'B'), - ) - print('Done all tasks') - App.get_running_app().stop() - - -ak.start_soon(main()) -App().run() diff --git a/examples/two_producers_one_consumer.py b/examples/two_producers_one_consumer.py new file mode 100644 index 0000000..b78d0fe --- /dev/null +++ b/examples/two_producers_one_consumer.py @@ -0,0 +1,40 @@ +from kivy.config import Config +Config.set('graphics', 'width', 100) +Config.set('graphics', 'height', 100) +from kivy.app import App +import asynckivy as ak +from asynckivy.queue import Queue + + +async def main(): + async def producer(name, q, items): + for c in items: + await q.put(c) + print(name, "produced", c) + await ak.sleep(.12) + + async def producers(*producers): + # This function is necessary because you usually want to close the queue *after* all producers end. + await ak.and_from_iterable(producers) + q.close() + + async def consumer(name, q): + async for c in q: + print(name, "consumed", c) + await ak.sleep(.08) + + from string import ascii_lowercase, ascii_uppercase + q = Queue(capacity=None) + await ak.and_( + producers( + producer('P1', q, ascii_lowercase), + producer('P2', q, ascii_uppercase), + ), + consumer('C ', q), + ) + print('Done all tasks') + App.get_running_app().stop() + + +ak.start_soon(main()) +App().run() From 00e521fe13c8dbe53e6f0610a5f9cb078384ff76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natt=C5=8Dsai=20Mit=C5=8D?= Date: Sat, 5 Mar 2022 14:28:06 +0900 Subject: [PATCH 08/12] =?UTF-8?q?smallest=20=E3=82=92=20small-first=20?= =?UTF-8?q?=E3=81=AB=E5=A4=89=E6=9B=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- asynckivy/_queue.py | 4 ++-- asynckivy/queue.py | 4 ++-- tests/test_queue_common.py | 2 +- tests/test_queue_normal.py | 8 ++++---- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/asynckivy/_queue.py b/asynckivy/_queue.py index f80e311..24e3103 100644 --- a/asynckivy/_queue.py +++ b/asynckivy/_queue.py @@ -196,13 +196,13 @@ def _init_container(self, capacity, order): c = deque(maxlen=capacity) c_get = c.popleft c_put = c.append - elif order == 'smallest': + elif order == 'small-first': import heapq c = [] c_get = partial(heapq.heappop, c) c_put = partial(heapq.heappush, c) else: - raise ValueError("'order' must be one of 'lifo', 'fifo' or 'smallest'.") + raise ValueError("'order' must be one of 'lifo', 'fifo' or 'small-first'.") self._c = c self._c_get = c_get self._c_put = c_put diff --git a/asynckivy/queue.py b/asynckivy/queue.py index 2a2aeaf..c6e0771 100644 --- a/asynckivy/queue.py +++ b/asynckivy/queue.py @@ -21,7 +21,7 @@ def is_full(self) -> bool: @property def order(self) -> Union[str, None]: - ''' 'fifo', 'lifo' or 'smallest'. None if the capacity of the queue is zero. ''' + ''' 'fifo', 'lifo' or 'small-first'. None if the capacity of the queue is zero. ''' async def get(self): '''Take out an item from the queue. If one is not available, wait until it is. @@ -61,7 +61,7 @@ def __init__(self, *, capacity: Union[int, None]=0, order='fifo'): `capacity`: int, defaults to 0 If this is None, the queue will have infinite capacity. `order`: str, defaults to 'fifo' - The order of the queue items. One of 'fifo', 'lifo' or 'smallest'. + The order of the queue items. One of 'fifo', 'lifo' or 'small-first'. ''' # 単に False だと私が使っているeditor(Visual Studio Code)の色付けが崩れてしまう為 bool(False) assert bool(False), "This is an abstract class" diff --git a/tests/test_queue_common.py b/tests/test_queue_common.py index b902edf..9bce1d2 100644 --- a/tests/test_queue_common.py +++ b/tests/test_queue_common.py @@ -24,7 +24,7 @@ def test_invalid_capacity_value(capacity): @p_capacity -@p('order', ['fifo', 'lifo', 'smallest', ]) +@p('order', ['fifo', 'lifo', 'small-first', ]) def test_instance_type(capacity, order): from asynckivy.queue import Queue from asynckivy._queue import ZeroCapacityQueue, NormalQueue diff --git a/tests/test_queue_normal.py b/tests/test_queue_normal.py index ed93894..e75ad4b 100644 --- a/tests/test_queue_normal.py +++ b/tests/test_queue_normal.py @@ -1,6 +1,6 @@ import pytest p = pytest.mark.parametrize -p_order = p('order', ('lifo', 'fifo', 'smallest')) +p_order = p('order', ('lifo', 'fifo', 'small-first')) p_capacity = p('capacity', [1, 2, None, ]) p_capacity2 = p('capacity', [1, 2, 3, 4, None, ]) @@ -242,7 +242,7 @@ async def consumer2(q): @p('capacity', [4, 5, None, ]) -@p('order,input,expect', [('fifo', '0123', '0123'), ('lifo', '0123', '3210'), ('smallest', '3102', '0123'), ]) +@p('order,input,expect', [('fifo', '0123', '0123'), ('lifo', '0123', '3210'), ('small-first', '3102', '0123'), ]) def test_item_order__enough_capacity(kivy_clock, capacity, order, input, expect): import asynckivy as ak from asynckivy.queue import Queue @@ -263,7 +263,7 @@ async def consumer(q): assert c.result == expect -@p('order,input,expect', [('fifo', '0123', '0123'), ('lifo', '0123', '1032'), ('smallest', '3102', '1302'), ]) +@p('order,input,expect', [('fifo', '0123', '0123'), ('lifo', '0123', '1032'), ('small-first', '3102', '1302'), ]) def test_item_order_2capacity(kivy_clock, order, input, expect): '''NOTE: これは仕様というよりは現状の実装に対するtest''' import asynckivy as ak @@ -285,7 +285,7 @@ async def consumer(q): assert c.result == expect -@p('order,input,expect', [('fifo', '0123', '0123'), ('lifo', '0123', '2103'), ('smallest', '3102', '0132'), ]) +@p('order,input,expect', [('fifo', '0123', '0123'), ('lifo', '0123', '2103'), ('small-first', '3102', '0132'), ]) def test_item_3capacity(kivy_clock, order, input, expect): '''NOTE: これは仕様というよりは現状の実装に対するtest''' import asynckivy as ak From 0ea86881f9993fafd59794186a08e156e0bcad1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natt=C5=8Dsai=20Mit=C5=8D?= Date: Sat, 5 Mar 2022 14:28:47 +0900 Subject: [PATCH 09/12] edit doc --- asynckivy/_queue.py | 2 +- asynckivy/queue.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/asynckivy/_queue.py b/asynckivy/_queue.py index 24e3103..a2828ce 100644 --- a/asynckivy/_queue.py +++ b/asynckivy/_queue.py @@ -5,7 +5,7 @@ ZeroCapacityQueue(以後はZQと略す)とNormalQueue(以後はNQと略す)は容量以外にも以下の違いがある。 - ZQは get_nowait() 時にputterを見に行くのに対しNQは見に行かない。なのでNQはputterがあったとしてもキューが空の時はWouldBlockを起こす。 -- put_nowait() も同じ。 +- put_nowait() も同じで、NQはgetterを見に行かないのでgetterがあったとしてもキューが満タンの時はWouldBlockを起こす。 - ZQではitemの中継はputterとgetterの両方を同時に取り出てから行うため、putterを進めた時にキューを閉じられたとしてもgetterには影響しない。 対してNQでは同時に取り出さないため、putterを進めた時にキューが閉じられればgetterはその影響をうける。 diff --git a/asynckivy/queue.py b/asynckivy/queue.py index c6e0771..08446be 100644 --- a/asynckivy/queue.py +++ b/asynckivy/queue.py @@ -13,11 +13,11 @@ def capacity(self) -> Union[int, None]: @property def is_empty(self) -> bool: - '''Whether the queue is empty. If the capacity of the queue is zero, raise AttributeError.''' + '''Whether the queue is empty. Raise AttributeError if the capacity of the queue is zero.''' @property def is_full(self) -> bool: - '''Whether the queue is full. If the capacity of the queue is zero, raise AttributeError.''' + '''Whether the queue is full. Raise AttributeError if the capacity of the queue is zero.''' @property def order(self) -> Union[str, None]: From 4d850d1d0bc8b2d52ed08dc0c0e31a7a68ce3d9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natt=C5=8Dsai=20Mit=C5=8D?= Date: Sat, 5 Mar 2022 14:29:35 +0900 Subject: [PATCH 10/12] =?UTF-8?q?kivy.config=E3=81=AE=E9=83=BD=E5=90=88?= =?UTF-8?q?=E4=B8=8A=20module-level=E3=81=AEimport=E3=82=92=E3=81=A9?= =?UTF-8?q?=E3=81=93=E3=81=A7=E3=82=82=E8=A8=B1=E3=81=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index a0ee8a9..f660d15 100644 --- a/setup.cfg +++ b/setup.cfg @@ -16,7 +16,7 @@ packages = find: install_requires = asyncgui>=0.5,<0.6 [flake8] -ignore = C901,E252 +ignore = C901,E252,E402 per-file-ignores= ./asynckivy/__init__.py:F401,F403 ./examples/misc/speed_comparision_between_schedule_interval_and_sleep.py:E402 From 228b18abf33efe7d65fbc91b4a35a73e97713dd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natt=C5=8Dsai=20Mit=C5=8D?= Date: Sat, 5 Mar 2022 14:34:29 +0900 Subject: [PATCH 11/12] fix doc --- asynckivy/_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asynckivy/_queue.py b/asynckivy/_queue.py index a2828ce..cc30a57 100644 --- a/asynckivy/_queue.py +++ b/asynckivy/_queue.py @@ -7,7 +7,7 @@ - ZQは get_nowait() 時にputterを見に行くのに対しNQは見に行かない。なのでNQはputterがあったとしてもキューが空の時はWouldBlockを起こす。 - put_nowait() も同じで、NQはgetterを見に行かないのでgetterがあったとしてもキューが満タンの時はWouldBlockを起こす。 - ZQではitemの中継はputterとgetterの両方を同時に取り出てから行うため、putterを進めた時にキューを閉じられたとしてもgetterには影響しない。 -対してNQでは同時に取り出さないため、putterを進めた時にキューが閉じられればgetterはその影響をうける。 + 対してNQでは同時に取り出さないため、putterを進めた時にキューが閉じられればgetterはその影響をうける。 これらの違いは無くそうと思えば無くせるかもしれないが実装がかなり複雑になるので諦めた。 ''' From 65ee733d9abe7abb362fe1f236a806e8081c0067 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natt=C5=8Dsai=20Mit=C5=8D?= Date: Sat, 5 Mar 2022 14:38:43 +0900 Subject: [PATCH 12/12] =?UTF-8?q?kivy.config=E3=81=AE=E9=83=BD=E5=90=88?= =?UTF-8?q?=E4=B8=8Aexamples=E3=83=87=E3=82=A3=E3=83=AC=E3=82=AF=E3=83=88?= =?UTF-8?q?=E3=83=AA=E5=86=85=E3=81=A7=E3=81=AF=20module-level=E3=81=AEimp?= =?UTF-8?q?ort=E3=82=92=E3=81=A9=E3=81=93=E3=81=A7=E3=82=82=E8=A8=B1?= =?UTF-8?q?=E3=81=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- setup.cfg | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index f660d15..8423657 100644 --- a/setup.cfg +++ b/setup.cfg @@ -16,7 +16,8 @@ packages = find: install_requires = asyncgui>=0.5,<0.6 [flake8] -ignore = C901,E252,E402 +ignore = C901,E252 per-file-ignores= ./asynckivy/__init__.py:F401,F403 + ./examples/*.py:E402 ./examples/misc/speed_comparision_between_schedule_interval_and_sleep.py:E402