Skip to content

Commit

Permalink
Added Python 3.5 “async for” compatibility
Browse files Browse the repository at this point in the history
Instead of

while True:
	msg = await dataqueue.read()
	...

do:

async for msg in dataqueue:
	...
  • Loading branch information
flying-sheep committed Oct 3, 2015
1 parent 230bdf3 commit 29fa81b
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 2 deletions.
79 changes: 77 additions & 2 deletions aiohttp/streams.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import sys
import asyncio
import collections
import functools
Expand All @@ -10,6 +11,8 @@
'FlowControlStreamReader',
'FlowControlDataQueue', 'FlowControlChunksQueue')

PY_35 = sys.version_info >= (3, 5)

EOF_MARKER = b''
DEFAULT_LIMIT = 2 ** 16

Expand All @@ -18,7 +21,74 @@ class EofStream(Exception):
"""eof stream indication."""


class StreamReader(asyncio.StreamReader):
class AsyncStreamIterator:

def __init__(self, read_func):
self.read_func = read_func
self.stopped = False

@asyncio.coroutine
def __aiter__(self):
return self

@asyncio.coroutine
def __anext__(self):
if self.stopped:
raise StopAsyncIteration # NOQA
try:
return (yield from self.read_func())
except asyncio.streams.IncompleteReadError as e:
self.stopped = True
if e.partial == b'':
# we read exactly the last byte last time,
# but didn’t encounter EOF
raise StopAsyncIteration # NOQA
else:
# we hit EOF, but there’s stuff to send. Next,
# self.stopped will make us stop iteration
return e.partial
except EofStream:
self.stopped = True
raise StopAsyncIteration # NOQA


class AsyncStreamReaderMixin:

if PY_35:
@asyncio.coroutine
def __aiter__(self):
return AsyncStreamIterator(self.readline)

def iter_chunked(self, n):
"""Returns an asynchronous iterator that yields chunks of size n.
.. versionadded:: Python-3.5 available for Python 3.5+ only
"""
return AsyncStreamIterator(lambda: self.readexactly(n))

def iter_any(self):
"""Returns an asynchronous iterator that yields slices of data as they come.
.. versionadded:: Python-3.5 available for Python 3.5+ only
"""
return AsyncStreamIterator(self.readany)


class StreamReader(asyncio.StreamReader, AsyncStreamReaderMixin):
"""An enhancement of :class:`asyncio.StreamReader`.
Supports asynchronous iteration by line, chunk or as available::
async for line in reader:
...
async for chunk in reader.iter_chunked(1024):
...
async for slice in reader.iter_any():
...
.. automethod:: AsyncStreamReaderMixin.iter_chunked
.. automethod:: AsyncStreamReaderMixin.iter_any
"""

total_bytes = 0

Expand Down Expand Up @@ -270,7 +340,7 @@ def _read_nowait(self, n=None):
return data


class EmptyStreamReader:
class EmptyStreamReader(AsyncStreamReaderMixin):

def exception(self):
return None
Expand Down Expand Up @@ -386,6 +456,11 @@ def read(self):
else:
raise EofStream

if PY_35:
@asyncio.coroutine
def __aiter__(self):
return AsyncStreamIterator(self.read)


class ChunksQueue(DataQueue):
"""Like a :class:`DataQueue`, but for binary chunked data transfer."""
Expand Down
5 changes: 5 additions & 0 deletions tests/test_py35/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""
Python 3.5 test module, testing new native async stuff.
This __init_.py file allows files in here to be called the same as other test files.
"""
72 changes: 72 additions & 0 deletions tests/test_py35/test_streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import pytest

from aiohttp import streams


DATA = b'line1\nline2\nline3\n'


def chunkify(seq, n):
for i in range(0, len(seq), n):
yield seq[i:i+n]


def create_stream(loop):
stream = streams.StreamReader(loop=loop)
stream.feed_data(DATA)
stream.feed_eof()
return stream


@pytest.mark.run_loop
async def test_stream_reader_lines(loop):
line_iter = iter(DATA.splitlines(keepends=True))
async for line in create_stream(loop):
assert line == next(line_iter)
pytest.raises(StopIteration, next, line_iter)


@pytest.mark.run_loop
async def test_stream_reader_chunks_complete(loop):
"""Tests if chunked iteration works if the chunking works out
(i.e. the data is divisible by the chunk size)
"""
chunk_iter = chunkify(DATA, 9)
async for line in create_stream(loop).iter_chunked(9):
assert line == next(chunk_iter)
pytest.raises(StopIteration, next, chunk_iter)


@pytest.mark.run_loop
async def test_stream_reader_chunks_incomplete(loop):
"""Tests if chunked iteration works if the last chunk is incomplete"""
chunk_iter = chunkify(DATA, 8)
async for line in create_stream(loop).iter_chunked(8):
assert line == next(chunk_iter)
pytest.raises(StopIteration, next, chunk_iter)


@pytest.mark.run_loop
async def test_data_queue_empty(loop):
"""Tests that async looping yields nothing if nothing is there"""
buffer = streams.DataQueue(loop=loop)
buffer.feed_eof()

async for _ in buffer:
assert False


@pytest.mark.run_loop
async def test_data_queue_items(loop):
"""Tests that async looping yields objects identically"""
buffer = streams.DataQueue(loop=loop)

items = [object(), object()]
buffer.feed_data(items[0], 1)
buffer.feed_data(items[1], 1)
buffer.feed_eof()

item_iter = iter(items)
async for item in buffer:
assert item is next(item_iter)
pytest.raises(StopIteration, next, item_iter)

0 comments on commit 29fa81b

Please sign in to comment.