diff --git a/aiohttp/streams.py b/aiohttp/streams.py index aca8e91652b..8eec943ce8e 100644 --- a/aiohttp/streams.py +++ b/aiohttp/streams.py @@ -1,3 +1,4 @@ +import sys import asyncio import collections import functools @@ -10,6 +11,8 @@ 'FlowControlStreamReader', 'FlowControlDataQueue', 'FlowControlChunksQueue') +PY_35 = sys.version_info >= (3, 5) + EOF_MARKER = b'' DEFAULT_LIMIT = 2 ** 16 @@ -18,7 +21,74 @@ class EofStream(Exception): """eof stream indication.""" -class StreamReader(asyncio.StreamReader): +class AsyncStreamIterator: + + def __init__(self, read_func): + self.read_func = read_func + self.stopped = False + + @asyncio.coroutine + def __aiter__(self): + return self + + @asyncio.coroutine + def __anext__(self): + if self.stopped: + raise StopAsyncIteration # NOQA + try: + return (yield from self.read_func()) + except asyncio.streams.IncompleteReadError as e: + self.stopped = True + if e.partial == b'': + # we read exactly the last byte last time, + # but didn’t encounter EOF + raise StopAsyncIteration # NOQA + else: + # we hit EOF, but there’s stuff to send. Next, + # self.stopped will make us stop iteration + return e.partial + except EofStream: + self.stopped = True + raise StopAsyncIteration # NOQA + + +class AsyncStreamReaderMixin: + + if PY_35: + @asyncio.coroutine + def __aiter__(self): + return AsyncStreamIterator(self.readline) + + def iter_chunked(self, n): + """Returns an asynchronous iterator that yields chunks of size n. + + .. versionadded:: Python-3.5 available for Python 3.5+ only + """ + return AsyncStreamIterator(lambda: self.readexactly(n)) + + def iter_any(self): + """Returns an asynchronous iterator that yields slices of data as they come. + + .. versionadded:: Python-3.5 available for Python 3.5+ only + """ + return AsyncStreamIterator(self.readany) + + +class StreamReader(asyncio.StreamReader, AsyncStreamReaderMixin): + """An enhancement of :class:`asyncio.StreamReader`. + + Supports asynchronous iteration by line, chunk or as available:: + + async for line in reader: + ... + async for chunk in reader.iter_chunked(1024): + ... + async for slice in reader.iter_any(): + ... + + .. automethod:: AsyncStreamReaderMixin.iter_chunked + .. automethod:: AsyncStreamReaderMixin.iter_any + """ total_bytes = 0 @@ -270,7 +340,7 @@ def _read_nowait(self, n=None): return data -class EmptyStreamReader: +class EmptyStreamReader(AsyncStreamReaderMixin): def exception(self): return None @@ -386,6 +456,11 @@ def read(self): else: raise EofStream + if PY_35: + @asyncio.coroutine + def __aiter__(self): + return AsyncStreamIterator(self.read) + class ChunksQueue(DataQueue): """Like a :class:`DataQueue`, but for binary chunked data transfer.""" diff --git a/tests/test_py35/__init__.py b/tests/test_py35/__init__.py new file mode 100644 index 00000000000..2b94168a79e --- /dev/null +++ b/tests/test_py35/__init__.py @@ -0,0 +1,5 @@ +""" +Python 3.5 test module, testing new native async stuff. + +This __init_.py file allows files in here to be called the same as other test files. +""" \ No newline at end of file diff --git a/tests/test_py35/test_streams.py b/tests/test_py35/test_streams.py new file mode 100644 index 00000000000..dc9e6a7221d --- /dev/null +++ b/tests/test_py35/test_streams.py @@ -0,0 +1,72 @@ +import pytest + +from aiohttp import streams + + +DATA = b'line1\nline2\nline3\n' + + +def chunkify(seq, n): + for i in range(0, len(seq), n): + yield seq[i:i+n] + + +def create_stream(loop): + stream = streams.StreamReader(loop=loop) + stream.feed_data(DATA) + stream.feed_eof() + return stream + + +@pytest.mark.run_loop +async def test_stream_reader_lines(loop): + line_iter = iter(DATA.splitlines(keepends=True)) + async for line in create_stream(loop): + assert line == next(line_iter) + pytest.raises(StopIteration, next, line_iter) + + +@pytest.mark.run_loop +async def test_stream_reader_chunks_complete(loop): + """Tests if chunked iteration works if the chunking works out + (i.e. the data is divisible by the chunk size) + """ + chunk_iter = chunkify(DATA, 9) + async for line in create_stream(loop).iter_chunked(9): + assert line == next(chunk_iter) + pytest.raises(StopIteration, next, chunk_iter) + + +@pytest.mark.run_loop +async def test_stream_reader_chunks_incomplete(loop): + """Tests if chunked iteration works if the last chunk is incomplete""" + chunk_iter = chunkify(DATA, 8) + async for line in create_stream(loop).iter_chunked(8): + assert line == next(chunk_iter) + pytest.raises(StopIteration, next, chunk_iter) + + +@pytest.mark.run_loop +async def test_data_queue_empty(loop): + """Tests that async looping yields nothing if nothing is there""" + buffer = streams.DataQueue(loop=loop) + buffer.feed_eof() + + async for _ in buffer: + assert False + + +@pytest.mark.run_loop +async def test_data_queue_items(loop): + """Tests that async looping yields objects identically""" + buffer = streams.DataQueue(loop=loop) + + items = [object(), object()] + buffer.feed_data(items[0], 1) + buffer.feed_data(items[1], 1) + buffer.feed_eof() + + item_iter = iter(items) + async for item in buffer: + assert item is next(item_iter) + pytest.raises(StopIteration, next, item_iter)