diff --git a/asynckivy/_queue.py b/asynckivy/_queue.py new file mode 100644 index 0000000..cc30a57 --- /dev/null +++ b/asynckivy/_queue.py @@ -0,0 +1,324 @@ +''' +Notes for maintainers +--------------------- + +ZeroCapacityQueue(以後はZQと略す)とNormalQueue(以後はNQと略す)は容量以外にも以下の違いがある。 + +- ZQは get_nowait() 時にputterを見に行くのに対しNQは見に行かない。なのでNQはputterがあったとしてもキューが空の時はWouldBlockを起こす。 +- put_nowait() も同じで、NQはgetterを見に行かないのでgetterがあったとしてもキューが満タンの時はWouldBlockを起こす。 +- ZQではitemの中継はputterとgetterの両方を同時に取り出てから行うため、putterを進めた時にキューを閉じられたとしてもgetterには影響しない。 + 対してNQでは同時に取り出さないため、putterを進めた時にキューが閉じられればgetterはその影響をうける。 + +これらの違いは無くそうと思えば無くせるかもしれないが実装がかなり複雑になるので諦めた。 +''' + +__all__ = ('ZeroCapacityQueue', 'NormalQueue', ) +from typing import Tuple, Any +import types +from functools import partial +from collections import deque +from kivy.clock import Clock +from asynckivy import sleep_forever, TaskState, Task, get_current_task +from asynckivy.exceptions import WouldBlock, ClosedResourceError, EndOfResource + + +Item = Any # type-hint for queue item + + +class ZeroCapacityQueue: + '''Queue, optimized for zero capacity''' + + def __init__(self): + self._allows_to_put = True + self._allows_to_get = True + self._putters = deque() # queue of tuple(task, item) s + self._getters = deque() # queue of task._step_coro s + self._trigger_consume = Clock.create_trigger(self._consume) + + def __len__(self): + return 0 + size = property(__len__) + + @property + def capacity(self) -> int: + return 0 + + @property + def is_empty(self): + raise AttributeError("The meaning of 'empty' is ambiguous for a zero capacity queue.") + + @property + def is_full(self): + raise AttributeError("The meaning of 'full' is ambiguous for a zero capacity queue.") + + @property + def order(self): + return None + + @types.coroutine + def get(self): + if not self._allows_to_get: + raise ClosedResourceError + if not self._allows_to_put: + raise EndOfResource + self._trigger_consume() + return (yield self._getters.append)[0][0] + + def get_nowait(self): + if not self._allows_to_get: + raise ClosedResourceError + if not self._allows_to_put: + raise EndOfResource + putter, item = self._pop_putter() + if putter is None: + raise WouldBlock + putter._step_coro() + return item + + async def put(self, item): + if not self._allows_to_put: + raise ClosedResourceError + self._trigger_consume() + self._putters.append((await get_current_task(), item, )) + await sleep_forever() + + def put_nowait(self, item): + if not self._allows_to_put: + raise ClosedResourceError + getter = self._pop_getter() + if getter is None: + raise WouldBlock + getter._step_coro(item) + + def close(self): + self._allows_to_put = False + + # LOAD_FAST + pop_putter = self._pop_putter + pop_getter = self._pop_getter + CRE = ClosedResourceError + EOR = EndOfResource + + # TODO: refactor after python3.7 ends + while True: + putter, __ = pop_putter() + if putter is None: + break + putter._throw_exc(CRE) + while True: + getter = pop_getter() + if getter is None: + break + getter._throw_exc(EOR) + + def fullclose(self): + self._allows_to_put = False + self._allows_to_get = False + + # LOAD_FAST + CRE = ClosedResourceError + pop_putter = self._pop_putter + pop_getter = self._pop_getter + + # TODO: refactor after python3.7 ends + while True: + task, __ = pop_putter() + if task is None: + break + task._throw_exc(CRE) + while True: + task = pop_getter() + if task is None: + break + task._throw_exc(CRE) + + async def __aiter__(self): + try: + while True: + yield await self.get() + except EndOfResource: + pass + + def _consume(self, __): + # LOAD_FAST + pop_putter = self._pop_putter + pop_getter = self._pop_getter + + while True: + getter = pop_getter() + if getter is None: + break + putter, item = pop_putter() + if putter is None: + self._getters.appendleft(getter._step_coro) + break + putter._step_coro() + getter._step_coro(item) + self._trigger_consume.cancel() + + def _pop_getter(self, *, _STARTED=TaskState.STARTED) -> Task: + '''Take out a next getter. Return None if no one is available.''' + getters = self._getters + while getters: + task = getters.popleft().__self__ + if task.state is _STARTED: + return task + + def _pop_putter(self, *, _STARTED=TaskState.STARTED) -> Tuple[Task, Item]: + '''Take out a next putter and its item. Return (None, None) if no one is available.''' + putters = self._putters + while putters: + task, item = putters.popleft() + if task.state is _STARTED: + return (task, item, ) + return (None, None, ) + + +class NormalQueue: + def __init__(self, *, capacity, order): + self._allows_to_put = True + self._allows_to_get = True + self._putters = deque() # queue of tuple(task, item) s + self._getters = deque() # queue of task._step_coro s + self._trigger_consume = Clock.create_trigger(self._consume) + self._init_container(capacity, order) + self._capacity = capacity + self._order = order + + def _init_container(self, capacity, order): + # If the capacity is 1, there is no point of re-ordering items. + # Therefore, does not use heapq for a performance reason. + if capacity == 1 or order == 'lifo': + c = [] + c_get = c.pop + c_put = c.append + elif order == 'fifo': + c = deque(maxlen=capacity) + c_get = c.popleft + c_put = c.append + elif order == 'small-first': + import heapq + c = [] + c_get = partial(heapq.heappop, c) + c_put = partial(heapq.heappush, c) + else: + raise ValueError("'order' must be one of 'lifo', 'fifo' or 'small-first'.") + self._c = c + self._c_get = c_get + self._c_put = c_put + + def __len__(self): + return len(self._c) + size = property(__len__) + + @property + def capacity(self) -> int: + return self._capacity + + @property + def is_empty(self): + return not self._c + + @property + def is_full(self): + return len(self._c) == self._capacity + + @property + def order(self): + return self._order + + @types.coroutine + def get(self): + if not self._allows_to_get: + raise ClosedResourceError + if (not self._allows_to_put) and self.is_empty: + raise EndOfResource + self._trigger_consume() + return (yield self._getters.append)[0][0] + + def get_nowait(self): + if not self._allows_to_get: + raise ClosedResourceError + if self.is_empty: + if self._allows_to_put: + raise WouldBlock + raise EndOfResource + self._trigger_consume() + return self._c_get() + + put = ZeroCapacityQueue.put + + def put_nowait(self, item): + if not self._allows_to_put: + raise ClosedResourceError + if self.is_full: + raise WouldBlock + self._c_put(item) + self._trigger_consume() + + def close(self): + self._allows_to_put = False + + # LOAD_FAST + pop_putter = self._pop_putter + pop_getter = self._pop_getter + CRE = ClosedResourceError + EOR = EndOfResource + + # TODO: refactor after python3.7 ends + while True: + putter, __ = pop_putter() + if putter is None: + break + putter._throw_exc(CRE) + if not self.is_empty: + return + while True: + getter = pop_getter() + if getter is None: + break + getter._throw_exc(EOR) + + def fullclose(self): + self._c.clear() + ZeroCapacityQueue.fullclose(self) + + __aiter__ = ZeroCapacityQueue.__aiter__ + + def _consume(self, __): + # LOAD_FAST + getters = self._getters + putters = self._putters + pop_putter = self._pop_putter + pop_getter = self._pop_getter + c_put = self._c_put + c_get = self._c_get + + while True: + while not self.is_full: + putter, item = pop_putter() + if putter is None: + break + c_put(item) + putter._step_coro() + if (not getters) or self.is_empty: + break + while not self.is_empty: + getter = pop_getter() + if getter is None: + break + getter._step_coro(c_get()) + else: + if not self._allows_to_put: + EOR = EndOfResource # LOAD_FAST + while True: # TODO: refactor after Python3.7 ends + getter = pop_getter() + if getter is None: + break + getter._throw_exc(EOR) + if (not putters) or self.is_full: + break + self._trigger_consume.cancel() + + _pop_getter = ZeroCapacityQueue._pop_getter + _pop_putter = ZeroCapacityQueue._pop_putter diff --git a/asynckivy/exceptions.py b/asynckivy/exceptions.py index 7f1f754..53ab1ef 100644 --- a/asynckivy/exceptions.py +++ b/asynckivy/exceptions.py @@ -1,5 +1,27 @@ -__all__ = ('MotionEventAlreadyEndedError', ) +__all__ = ( + 'MotionEventAlreadyEndedError', + 'WouldBlock', 'ClosedResourceError', 'EndOfResource', +) +from asyncgui.exceptions import * # noqa class MotionEventAlreadyEndedError(Exception): """A MotionEvent has already ended.""" + + +class WouldBlock(Exception): + """(took from trio) + Raised by X_nowait functions if X would block. + """ + + +class ClosedResourceError(Exception): + """(took from trio) + Raised when attempting to use a resource after it has been closed. + """ + + +class EndOfResource(Exception): + """(similar to trio's EndOfChannel) + This is analogous to an "end-of-file" condition, but for resources in general. + """ diff --git a/asynckivy/queue.py b/asynckivy/queue.py new file mode 100644 index 0000000..08446be --- /dev/null +++ b/asynckivy/queue.py @@ -0,0 +1,80 @@ +__all__ = ('Queue', ) +from typing import Union + + +class Queue: + @property + def size(self) -> int: + '''Number of items in the queue. This equals to ``len(queue)``. ''' + + @property + def capacity(self) -> Union[int, None]: + '''Number of items allowed in the queue. None if unbounded.''' + + @property + def is_empty(self) -> bool: + '''Whether the queue is empty. Raise AttributeError if the capacity of the queue is zero.''' + + @property + def is_full(self) -> bool: + '''Whether the queue is full. Raise AttributeError if the capacity of the queue is zero.''' + + @property + def order(self) -> Union[str, None]: + ''' 'fifo', 'lifo' or 'small-first'. None if the capacity of the queue is zero. ''' + + async def get(self): + '''Take out an item from the queue. If one is not available, wait until it is. + If the queue is empty, and is partially closed, raise EndOfResource. + If the queue is fully closed, raise ClosedResourceError. + ''' + + def get_nowait(self): + '''Take out an item from the queue if one is immediately available, else raise WouldBlock. + Everything else is the same as ``get``. + ''' + + async def put(self, item): + '''Put an item into the queue. If no free slots are available, wait until one is available. + If the queue is partially or fully closed, raise ClosedResourceError. + ''' + + def put_nowait(self, item): + '''Put an item into the queue if a free slot is immediately available, else raise WouldBlock. + Everything else is the same as ``put``. + ''' + + def close(self): + '''Partially close the queue. No further putting opeations are allowed. ''' + + def fullclose(self): + '''Fully close the queue. No further putting/getting operations are allowed. All items in the queue are + discarded. + ''' + + def __aiter__(self): + '''Return an async iterator that repeatedly ``get`` an item from the queue until it raises EndOfResource.''' + + def __init__(self, *, capacity: Union[int, None]=0, order='fifo'): + ''' + :Parameters: + `capacity`: int, defaults to 0 + If this is None, the queue will have infinite capacity. + `order`: str, defaults to 'fifo' + The order of the queue items. One of 'fifo', 'lifo' or 'small-first'. + ''' + # 単に False だと私が使っているeditor(Visual Studio Code)の色付けが崩れてしまう為 bool(False) + assert bool(False), "This is an abstract class" + + def __new__(cls, *, capacity: Union[int, None]=0, order='fifo'): + from . import _queue + if capacity is None: + pass + elif isinstance(capacity, int): + if capacity == 0: + return _queue.ZeroCapacityQueue() + elif capacity < 0: + raise ValueError(f"If 'capacity' is an integer, it must be zero or greater. (was {capacity})") + else: + raise TypeError(f"'capacity' must be None or an integer. (was {type(capacity)})") + return _queue.NormalQueue(capacity=capacity, order=order) diff --git a/examples/one_producer_two_consumers.py b/examples/one_producer_two_consumers.py new file mode 100644 index 0000000..1479c67 --- /dev/null +++ b/examples/one_producer_two_consumers.py @@ -0,0 +1,34 @@ +from kivy.config import Config +Config.set('graphics', 'width', 100) +Config.set('graphics', 'height', 100) +from kivy.app import App +import asynckivy as ak +from asynckivy.queue import Queue + + +async def main(): + async def producer(name, q, items): + for c in items: + await q.put(c) + print(name, "produced", c) + await ak.sleep(.1) + q.close() + + async def consumer(name, q): + async for c in q: + print(name, "consumed", c) + await ak.sleep(.3) + + from string import ascii_lowercase + q = Queue(capacity=None) + await ak.and_( + producer('P ', q, ascii_lowercase), + consumer('C1', q), + consumer('C2', q), + ) + print('Done all tasks') + App.get_running_app().stop() + + +ak.start_soon(main()) +App().run() diff --git a/examples/two_producers_one_consumer.py b/examples/two_producers_one_consumer.py new file mode 100644 index 0000000..b78d0fe --- /dev/null +++ b/examples/two_producers_one_consumer.py @@ -0,0 +1,40 @@ +from kivy.config import Config +Config.set('graphics', 'width', 100) +Config.set('graphics', 'height', 100) +from kivy.app import App +import asynckivy as ak +from asynckivy.queue import Queue + + +async def main(): + async def producer(name, q, items): + for c in items: + await q.put(c) + print(name, "produced", c) + await ak.sleep(.12) + + async def producers(*producers): + # This function is necessary because you usually want to close the queue *after* all producers end. + await ak.and_from_iterable(producers) + q.close() + + async def consumer(name, q): + async for c in q: + print(name, "consumed", c) + await ak.sleep(.08) + + from string import ascii_lowercase, ascii_uppercase + q = Queue(capacity=None) + await ak.and_( + producers( + producer('P1', q, ascii_lowercase), + producer('P2', q, ascii_uppercase), + ), + consumer('C ', q), + ) + print('Done all tasks') + App.get_running_app().stop() + + +ak.start_soon(main()) +App().run() diff --git a/setup.cfg b/setup.cfg index 845ed19..8423657 100644 --- a/setup.cfg +++ b/setup.cfg @@ -16,7 +16,8 @@ packages = find: install_requires = asyncgui>=0.5,<0.6 [flake8] -ignore = C901 +ignore = C901,E252 per-file-ignores= ./asynckivy/__init__.py:F401,F403 + ./examples/*.py:E402 ./examples/misc/speed_comparision_between_schedule_interval_and_sleep.py:E402 diff --git a/tests/test_queue_common.py b/tests/test_queue_common.py new file mode 100644 index 0000000..9bce1d2 --- /dev/null +++ b/tests/test_queue_common.py @@ -0,0 +1,201 @@ +import pytest +p = pytest.mark.parametrize +p_capacity = p('capacity', [0, 1, 2, None, ]) +p_capacity2 = p('capacity', [0, 1, 2, 3, 4, None, ]) + + +@pytest.fixture(autouse=True) +def autouse_kivy_clock(kivy_clock): + pass + + +@p('capacity', (-1., 0., 1., "", "str", tuple(), [], )) +def test_invalid_capacity_type(capacity): + from asynckivy.queue import Queue + with pytest.raises(TypeError): + Queue(capacity=capacity) + + +@p('capacity', (-1, )) +def test_invalid_capacity_value(capacity): + from asynckivy.queue import Queue + with pytest.raises(ValueError): + Queue(capacity=capacity) + + +@p_capacity +@p('order', ['fifo', 'lifo', 'small-first', ]) +def test_instance_type(capacity, order): + from asynckivy.queue import Queue + from asynckivy._queue import ZeroCapacityQueue, NormalQueue + q = Queue(capacity=capacity, order=order) + if capacity == 0: + assert isinstance(q, ZeroCapacityQueue) + else: + assert isinstance(q, NormalQueue) + + +@p_capacity +@p('fullclose', [True, False, ]) +@p('nowait', [True, False, ]) +def test_put_to_closed_queue(capacity, fullclose, nowait): + import asynckivy as ak + from asynckivy.queue import Queue + q = Queue(capacity=capacity) + q.fullclose() if fullclose else q.close() + with pytest.raises(ak.ClosedResourceError): + q.put_nowait('Z') if nowait else ak.start(q.put('Z')) + + +@p_capacity +@p('fullclose', [True, False, ]) +@p('nowait', [True, False, ]) +def test_get_to_closed_queue(capacity, fullclose, nowait): + import asynckivy as ak + from asynckivy.queue import Queue + q = Queue(capacity=capacity) + if fullclose: + q.fullclose() + exc = ak.ClosedResourceError + else: + q.close() + exc = ak.EndOfResource + with pytest.raises(exc): + q.get_nowait() if nowait else ak.start(q.get()) + + +@p_capacity2 +def test_async_for(kivy_clock, capacity): + import asynckivy as ak + from asynckivy.queue import Queue + + async def producer(q, items): + for i in items: + await q.put(i) + + async def consumer(q): + return ''.join([item async for item in q]) + + q = Queue(capacity=capacity) + p = ak.start(producer(q, 'ABC')) + c = ak.start(consumer(q)) + assert not p.done + assert not c.done + kivy_clock.tick() + assert p.done + assert not c.done + q.close() + assert c.result == 'ABC' + + +@p_capacity2 +def test_one_producer_and_two_consumers(kivy_clock, capacity): + import asynckivy as ak + from asynckivy.queue import Queue + + async def producer(q): + for c in 'A1B2C3': + await q.put(c) + q.close() + + async def consumer(q): + return ''.join([item async for item in q]) + + q = Queue(capacity=capacity) + p = ak.start(producer(q)) + c1 = ak.start(consumer(q)) + c2 = ak.start(consumer(q)) + assert not p.done + assert not c1.done + assert not c2.done + kivy_clock.tick() + assert p.done + assert c1.result == 'ABC' + assert c2.result == '123' + + +@p_capacity2 +def test_two_producers_and_one_consumer(kivy_clock, capacity): + import asynckivy as ak + from asynckivy.queue import Queue + + async def producer(q, items): + for i in items: + await q.put(i) + + async def consumer(q): + return ''.join([item async for item in q]) + + q = Queue(capacity=capacity) + p1 = ak.start(producer(q, 'ABC')) + p2 = ak.start(producer(q, '123')) + c = ak.start(consumer(q)) + assert not p1.done + assert not p2.done + assert not c.done + kivy_clock.tick() + assert p1.done + assert p2.done + assert not c.done + q.close() + assert c.result == 'A1B2C3' + + +@p_capacity2 +def test_two_producers_and_two_consumers(kivy_clock, capacity): + import asynckivy as ak + from asynckivy.queue import Queue + + async def producer(q, items): + for i in items: + await q.put(i) + + async def consumer(q): + return ''.join([item async for item in q]) + + q = Queue(capacity=capacity) + p1 = ak.start(producer(q, 'ABC')) + p2 = ak.start(producer(q, '123')) + c1 = ak.start(consumer(q)) + c2 = ak.start(consumer(q)) + assert not p1.done + assert not p2.done + assert not c1.done + assert not c2.done + kivy_clock.tick() + assert p1.done + assert p2.done + assert not c1.done + assert not c2.done + q.close() + assert c1.result == 'ABC' + assert c2.result == '123' + + +@p('n_producers', range(3)) +@p('n_consumers', range(3)) +@p('fullclose', [True, False, ]) +@p_capacity2 +def test_close_a_queue_while_it_holding_putters_and_getters(n_producers, n_consumers, fullclose, capacity): + import asynckivy as ak + from asynckivy.queue import Queue + + async def producer(q): + with pytest.raises(ak.ClosedResourceError): + await q.put(None) + async def consumer(q): + with pytest.raises(ak.ClosedResourceError if fullclose else ak.EndOfResource): + await q.get() + + q = Queue(capacity=capacity) + producers = [ak.start(producer(q)) for __ in range(n_producers)] + consumers = [ak.start(consumer(q)) for __ in range(n_consumers)] + for p in producers: + assert not p.done + for c in consumers: + assert not c.done + q.fullclose() if fullclose else q.close() + for p in producers: + assert p.done + for c in consumers: + assert c.done diff --git a/tests/test_queue_normal.py b/tests/test_queue_normal.py new file mode 100644 index 0000000..e75ad4b --- /dev/null +++ b/tests/test_queue_normal.py @@ -0,0 +1,307 @@ +import pytest +p = pytest.mark.parametrize +p_order = p('order', ('lifo', 'fifo', 'small-first')) +p_capacity = p('capacity', [1, 2, None, ]) +p_capacity2 = p('capacity', [1, 2, 3, 4, None, ]) + + +@pytest.fixture(autouse=True) +def autouse_kivy_clock(kivy_clock): + pass + + +@p_order +def test_various_statistics(kivy_clock, order): + import asynckivy as ak + from asynckivy.queue import Queue + q = Queue(capacity=2, order=order) + assert q.order == order + assert len(q) == 0 + assert q.capacity == 2 + assert q.size == 0 + assert q.is_empty + assert not q.is_full + ak.start(q.put(1)) + assert q.size == 0 + assert q.is_empty + assert not q.is_full + kivy_clock.tick() + assert q.size == 1 + assert not q.is_empty + assert not q.is_full + ak.start(q.put(2)) + assert q.size == 1 + assert not q.is_empty + assert not q.is_full + kivy_clock.tick() + assert q.size == 2 + assert not q.is_empty + assert q.is_full + ak.start(q.get()) + assert q.size == 2 + assert not q.is_empty + assert q.is_full + kivy_clock.tick() + assert q.size == 1 + assert not q.is_empty + assert not q.is_full + ak.start(q.get()) + assert q.size == 1 + assert not q.is_empty + assert not q.is_full + kivy_clock.tick() + assert q.size == 0 + assert q.is_empty + assert not q.is_full + + +@p_order +def test_various_statistics_nowait(order): + from asynckivy.queue import Queue + q = Queue(capacity=2, order=order) + assert q.order == order + assert len(q) == 0 + assert q.capacity == 2 + assert q.size == 0 + assert q.is_empty + assert not q.is_full + q.put_nowait(1) + assert q.size == 1 + assert not q.is_empty + assert not q.is_full + q.put_nowait(2) + assert q.size == 2 + assert not q.is_empty + assert q.is_full + q.get_nowait() + assert q.size == 1 + assert not q.is_empty + assert not q.is_full + q.get_nowait() + assert q.size == 0 + assert q.is_empty + assert not q.is_full + + +@p_capacity +@p_order +def test_container_type(capacity, order): + from asynckivy.queue import Queue + q = Queue(capacity=capacity, order=order) + if capacity != 1 and order == 'fifo': + from collections import deque + assert isinstance(q._c, deque) + else: + assert isinstance(q._c, list) + + +@p_capacity +def test_get_nowait_while_there_are_no_putters_and_no_items(capacity): + import asynckivy as ak + from asynckivy.queue import Queue + q = Queue(capacity=capacity) + with pytest.raises(ak.WouldBlock): + q.get_nowait() + + +@p_capacity +def test_get_nowait_while_there_is_a_putter_but_no_items(capacity): + import asynckivy as ak + from asynckivy.queue import Queue + q = Queue(capacity=capacity) + ak.start(q.put('A')) + with pytest.raises(ak.WouldBlock): + q.get_nowait() + + +@p('capacity', [1, 2, ]) +def test_put_nowait_while_there_are_no_getters_and_full_of_items(capacity): + import asynckivy as ak + from asynckivy.queue import Queue + q = Queue(capacity=capacity) + for i in range(capacity): + q._c_put(i) + assert q.is_full + with pytest.raises(ak.WouldBlock): + q.put_nowait(99) + + +@p('capacity', [1, 2, ]) +def test_put_nowait_while_there_is_a_getter_and_full_of_items(capacity): + import asynckivy as ak + from asynckivy.queue import Queue + q = Queue(capacity=capacity) + for i in range(capacity): + q._c_put(i) + assert q.is_full + ak.start(q.get()) + with pytest.raises(ak.WouldBlock): + q.put_nowait(99) + + +def test_put_nowait_to_unbounded_queue_that_has_no_getters(): + from asynckivy.queue import Queue + q = Queue(capacity=None) + for i in range(10): + q._c_put(i) + assert not q.is_full + q.put_nowait('A') + + +def test_put_nowait_to_unbounded_queue_that_has_a_getter(): + import asynckivy as ak + from asynckivy.queue import Queue + q = Queue(capacity=None) + for i in range(10): + q._c_put(i) + assert not q.is_full + ak.start(q.get()) + q.put_nowait('A') + + +@p('fullclose', [True, False, ]) +@p_capacity2 +def test_putter_triggers_close(kivy_clock, fullclose, capacity): + import asynckivy as ak + from asynckivy.queue import Queue + + async def producer1(q): + await q.put('A') + q.fullclose() if fullclose else q.close() + async def producer2(q): + with pytest.raises(ak.ClosedResourceError): + await q.put('B') + async def consumer1(q): + if fullclose: + with pytest.raises(ak.ClosedResourceError): + await q.get() + else: + assert await q.get() == 'A' + async def consumer2(q): + exc = ak.ClosedResourceError if fullclose else ak.EndOfResource + with pytest.raises(exc): + await q.get() + + q = Queue(capacity=capacity) + p1 = ak.start(producer1(q)) + p2 = ak.start(producer2(q)) + c1 = ak.start(consumer1(q)) + c2 = ak.start(consumer2(q)) + assert not p1.done + assert not p2.done + assert not c1.done + assert not c2.done + kivy_clock.tick() + assert p1.done + assert p2.done + assert c1.done + assert c2.done + + +@p('fullclose', [True, False, ]) +@p_capacity2 +def test_getter_triggers_close(kivy_clock, fullclose, capacity): + import asynckivy as ak + from asynckivy.queue import Queue + + async def producer1(q): + await q.put('A') + async def producer2(q): + if capacity is not None and capacity < 2: + with pytest.raises(ak.ClosedResourceError): + await q.put('B') + else: + await q.put('B') + async def consumer1(q): + assert await q.get() == 'A' + q.fullclose() if fullclose else q.close() + async def consumer2(q): + if fullclose: + with pytest.raises(ak.ClosedResourceError): + await q.get() + elif capacity is not None and capacity < 2: + with pytest.raises(ak.EndOfResource): + await q.get() + else: + assert await q.get() == 'B' + + q = Queue(capacity=capacity) + p1 = ak.start(producer1(q)) + p2 = ak.start(producer2(q)) + c1 = ak.start(consumer1(q)) + c2 = ak.start(consumer2(q)) + assert not p1.done + assert not p2.done + assert not c1.done + assert not c2.done + kivy_clock.tick() + assert p1.done + assert p2.done + assert c1.done + assert c2.done + + +@p('capacity', [4, 5, None, ]) +@p('order,input,expect', [('fifo', '0123', '0123'), ('lifo', '0123', '3210'), ('small-first', '3102', '0123'), ]) +def test_item_order__enough_capacity(kivy_clock, capacity, order, input, expect): + import asynckivy as ak + from asynckivy.queue import Queue + + async def producer(q): + for c in input: + await q.put(int(c)) + q.close() + + async def consumer(q): + return ''.join([str(i) async for i in q]) + + q = Queue(capacity=capacity, order=order) + p = ak.start(producer(q)) + c = ak.start(consumer(q)) + kivy_clock.tick() + assert p.done + assert c.result == expect + + +@p('order,input,expect', [('fifo', '0123', '0123'), ('lifo', '0123', '1032'), ('small-first', '3102', '1302'), ]) +def test_item_order_2capacity(kivy_clock, order, input, expect): + '''NOTE: これは仕様というよりは現状の実装に対するtest''' + import asynckivy as ak + from asynckivy.queue import Queue + + async def producer(q): + for c in input: + await q.put(int(c)) + q.close() + + async def consumer(q): + return ''.join([str(i) async for i in q]) + + q = Queue(capacity=2, order=order) + p = ak.start(producer(q)) + c = ak.start(consumer(q)) + kivy_clock.tick() + assert p.done + assert c.result == expect + + +@p('order,input,expect', [('fifo', '0123', '0123'), ('lifo', '0123', '2103'), ('small-first', '3102', '0132'), ]) +def test_item_3capacity(kivy_clock, order, input, expect): + '''NOTE: これは仕様というよりは現状の実装に対するtest''' + import asynckivy as ak + from asynckivy.queue import Queue + + async def producer(q): + for c in input: + await q.put(int(c)) + q.close() + + async def consumer(q): + return ''.join([str(i) async for i in q]) + + q = Queue(capacity=3, order=order) + p = ak.start(producer(q)) + c = ak.start(consumer(q)) + kivy_clock.tick() + assert p.done + assert c.result == expect diff --git a/tests/test_queue_zero_capacity.py b/tests/test_queue_zero_capacity.py new file mode 100644 index 0000000..b93b410 --- /dev/null +++ b/tests/test_queue_zero_capacity.py @@ -0,0 +1,221 @@ +import pytest +p = pytest.mark.parametrize + + +@pytest.fixture() +def q(kivy_clock): # Queue生成時にClockを使っているのでkivy_clockが要る + from asynckivy.queue import Queue + return Queue(capacity=0) + + +def test_size_and_len_and_capacity(q): + import asynckivy as ak + assert q.size == 0 + assert len(q) == 0 + assert q.capacity == 0 + ak.start(q.put('A')) + assert q.size == 0 + assert len(q) == 0 + assert q.capacity == 0 + + +def test_is_empty(q): + import asynckivy as ak + with pytest.raises(AttributeError): + q.is_empty + ak.start(q.put('A')) + with pytest.raises(AttributeError): + q.is_empty + + +def test_is_full(q): + import asynckivy as ak + with pytest.raises(AttributeError): + q.is_full + ak.start(q.put('A')) + with pytest.raises(AttributeError): + q.is_full + + +def test_order(q): + assert q.order is None + + +def test_get_nowait_while_there_are_no_putters(q): + import asynckivy as ak + with pytest.raises(ak.WouldBlock): + q.get_nowait() + + +def test_put_nowait_while_there_are_no_getters(q): + import asynckivy as ak + with pytest.raises(ak.WouldBlock): + q.put_nowait('A') + + +@p('fullclose', [True, False, ]) +def test_get_nowait_triggers_close(q, fullclose): + import asynckivy as ak + async def producer1(q): + await q.put('A') + q.fullclose() if fullclose else q.close() + async def producer2(q): + with pytest.raises(ak.ClosedResourceError): + await q.put('B') + async def consumer(q): + with pytest.raises(ak.ClosedResourceError if fullclose else ak.EndOfResource): + await q.get() + p1 = ak.start(producer1(q)) + p2 = ak.start(producer2(q)) + c = ak.start(consumer(q)) + assert not p1.done + assert not p2.done + assert not c.done + assert q.get_nowait() == 'A' + assert p1.done + assert p2.done + assert c.done + + +@p('fullclose', [True, False, ]) +def test_putter_triggers_close(kivy_clock, q, fullclose): + import asynckivy as ak + async def producer1(q): + await q.put('A') + q.fullclose() if fullclose else q.close() + async def producer2(q): + with pytest.raises(ak.ClosedResourceError): + await q.put('B') + async def consumer1(q): + assert await q.get() == 'A' + async def consumer2(q): + with pytest.raises(ak.ClosedResourceError if fullclose else ak.EndOfResource): + await q.get() + p1 = ak.start(producer1(q)) + p2 = ak.start(producer2(q)) + c1 = ak.start(consumer1(q)) + c2 = ak.start(consumer2(q)) + assert not p1.done + assert not p2.done + assert not c1.done + assert not c2.done + kivy_clock.tick() + assert p1.done + assert p2.done + assert c1.done + assert c2.done + + +@p('fullclose', [True, False, ]) +def test_put_nowait_triggers_close(q, fullclose): + import asynckivy as ak + async def producer(q): + with pytest.raises(ak.ClosedResourceError): + await q.put('B') + async def consumer1(q): + assert await q.get() == 'A' + q.fullclose() if fullclose else q.close() + async def consumer2(q): + with pytest.raises(ak.ClosedResourceError if fullclose else ak.EndOfResource): + await q.get() + p = ak.start(producer(q)) + c1 = ak.start(consumer1(q)) + c2 = ak.start(consumer2(q)) + assert not p.done + assert not c1.done + assert not c2.done + q.put_nowait('A') + assert p.done + assert c1.done + assert c2.done + + +@p('fullclose', [True, False, ]) +def test_getter_triggers_close(kivy_clock, q, fullclose): + import asynckivy as ak + async def producer1(q): + await q.put('A') + async def producer2(q): + with pytest.raises(ak.ClosedResourceError): + await q.put('B') + async def consumer1(q): + assert await q.get() == 'A' + q.fullclose() if fullclose else q.close() + async def consumer2(q): + with pytest.raises(ak.ClosedResourceError if fullclose else ak.EndOfResource): + await q.get() + p1 = ak.start(producer1(q)) + p2 = ak.start(producer2(q)) + c1 = ak.start(consumer1(q)) + c2 = ak.start(consumer2(q)) + assert not p1.done + assert not p2.done + assert not c1.done + assert not c2.done + kivy_clock.tick() + assert p1.done + assert p2.done + assert c1.done + assert c2.done + + +def test_cancel_putter(kivy_clock, q): + import asynckivy as ak + async def consumer(q): + return ''.join([item async for item in q]) + p1 = ak.start(q.put('A')) + p2 = ak.start(q.put('B')) + c = ak.start(consumer(q)) + assert not p1.done + assert not p2.done + assert not c.done + p1.cancel() + kivy_clock.tick() + assert p1.cancelled + assert p2.done + assert not c.done + q.close() + assert c.result == 'B' + + +def test_cancel_getter(kivy_clock, q): + import asynckivy as ak + async def producer(q): + for i in 'ABCD': + await q.put(i) + q.close() + async def consumer(q): + return ''.join([item async for item in q]) + p = ak.start(producer(q)) + c1 = ak.start(consumer(q)) + c2 = ak.start(consumer(q)) + assert not p.done + assert not c1.done + assert not c2.done + c1.cancel() + kivy_clock.tick() + assert p.done + assert c1.cancelled + assert c2.result == 'ABCD' + + +def test_wait_for_a_frame_before_get(kivy_clock, q): + import asynckivy as ak + p = ak.start(q.put('A')) + kivy_clock.tick() + assert not p.done + c = ak.start(q.get()) + kivy_clock.tick() + assert p.done + assert c.result == 'A' + + +def test_wait_for_a_frame_before_put(kivy_clock, q): + import asynckivy as ak + c = ak.start(q.get()) + kivy_clock.tick() + assert not c.done + p = ak.start(q.put('A')) + kivy_clock.tick() + assert p.done + assert c.result == 'A' diff --git a/tests/test_sleep.py b/tests/test_sleep.py index 01e5150..f1a418f 100644 --- a/tests/test_sleep.py +++ b/tests/test_sleep.py @@ -24,7 +24,7 @@ def test_sleep_free(): import asynckivy as ak if not hasattr(Clock, 'create_trigger_free'): - pytest.skip("free-type Clock is not avaiable") + pytest.skip("free-type Clock is not available") Clock.tick() task = ak.start(ak.sleep_free(.1)) assert not task.done