Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document streams #1150

Merged
merged 5 commits into from
Sep 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ CHANGES

-

-
- Default value for StreamReader.read_nowait is -1 from now

-

Expand Down
6 changes: 4 additions & 2 deletions aiohttp/client_reqrep.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ def update_body_from_data(self, data, skip_auto_headers):
if hdrs.CONTENT_LENGTH not in self.headers and not self.chunked:
self.headers[hdrs.CONTENT_LENGTH] = str(len(self.body))

elif isinstance(data, (asyncio.StreamReader, streams.DataQueue)):
elif isinstance(data, (asyncio.StreamReader, streams.StreamReader,
streams.DataQueue)):
self.body = data

elif asyncio.iscoroutine(data):
Expand Down Expand Up @@ -427,7 +428,8 @@ def write_bytes(self, request, reader):
'Bytes object is expected, got: %s.' %
type(result))

elif isinstance(self.body, asyncio.StreamReader):
elif isinstance(self.body, (asyncio.StreamReader,
streams.StreamReader)):
request.transport.set_tcp_nodelay(True)
chunk = yield from self.body.read(streams.DEFAULT_LIMIT)
while chunk:
Expand Down
21 changes: 12 additions & 9 deletions aiohttp/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ def iter_any(self):
return AsyncStreamIterator(self.readany)


class StreamReader(asyncio.StreamReader, AsyncStreamReaderMixin):
"""An enhancement of :class:`asyncio.StreamReader`.
class StreamReader(AsyncStreamReaderMixin):
"""An enhancement of asyncio.StreamReader.

Supports asynchronous iteration by line, chunk or as available::

Expand All @@ -77,8 +77,6 @@ class StreamReader(asyncio.StreamReader, AsyncStreamReaderMixin):
async for slice in reader.iter_any():
...

.. automethod:: AsyncStreamReaderMixin.iter_chunked
.. automethod:: AsyncStreamReaderMixin.iter_any
"""

total_bytes = 0
Expand Down Expand Up @@ -238,7 +236,7 @@ def readline(self):
offset = self._buffer_offset
ichar = self._buffer[0].find(b'\n', offset) + 1
# Read from current offset to found b'\n' or to the end.
data = self._read_nowait(ichar - offset if ichar else 0)
data = self._read_nowait(ichar - offset if ichar else -1)
line.append(data)
line_size += len(data)
if ichar:
Expand Down Expand Up @@ -302,7 +300,7 @@ def readany(self):
if not self._buffer and not self._eof:
yield from self._wait('readany')

return self._read_nowait()
return self._read_nowait(-1)

@asyncio.coroutine
def readexactly(self, n):
Expand All @@ -321,7 +319,12 @@ def readexactly(self, n):

return b''.join(blocks)

def read_nowait(self, n=None):
def read_nowait(self, n=-1):
# default was changed to be consistent with .read(-1)
#
# I believe the most users don't know about the method and
# they are not affected.
assert n is not None, "n should be -1"
if self._exception is not None:
raise self._exception

Expand All @@ -331,13 +334,13 @@ def read_nowait(self, n=None):

return self._read_nowait(n)

def _read_nowait(self, n=None):
def _read_nowait(self, n):
if not self._buffer:
return EOF_MARKER

first_buffer = self._buffer[0]
offset = self._buffer_offset
if n and len(first_buffer) - offset > n:
if n != -1 and len(first_buffer) - offset > n:
data = first_buffer[offset:offset + n]
self._buffer_offset += n

Expand Down
9 changes: 0 additions & 9 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -215,15 +215,6 @@ aiohttp.signals module
:undoc-members:
:show-inheritance:

aiohttp.streams module
----------------------

.. automodule:: aiohttp.streams
:members:
:undoc-members:
:show-inheritance:


aiohttp.wsgi module
-------------------

Expand Down
5 changes: 1 addition & 4 deletions docs/client_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ HTTP Client Reference
=====================

.. module:: aiohttp

.. currentmodule:: aiohttp


Expand Down Expand Up @@ -1121,9 +1120,7 @@ Response object

.. attribute:: content

Payload stream, contains response's BODY (:class:`StreamReader`
compatible instance, most likely
:class:`FlowControlStreamReader` one).
Payload stream, contains response's BODY (:class:`StreamReader`).

.. attribute:: cookies

Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ Contents
abc
server
multipart
streams
api
logging
testing
Expand Down
146 changes: 133 additions & 13 deletions docs/streams.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,149 @@
Streaming API
=============

.. module:: aiohttp
.. currentmodule:: aiohttp


``aiohttp`` uses streams for retrieving *BODIES*:
:attr:`aiohttp.web.Request.content` and
:attr:`aiohttp.ClientResponse.content` are streams.

:attr:`aiohttp.ClientResponse.content` are properties with stream API.

StreamReader
------------

.. class:: StreamReader

.. comethod:: read(n=-1)
The reader from incoming stream.

User should never instantiate streams manually but use existing
:attr:`aiohttp.web.Request.content` and
:attr:`aiohttp.ClientResponse.content` properties for accessing raw
BODY data.

Reading Methods
---------------

.. comethod:: StreamReader.read(n=-1)

Read up to *n* bytes. If *n* is not provided, or set to ``-1``, read until
EOF and return all read bytes.

If the EOF was received and the internal buffer is empty, return an
empty bytes object.

:param int n: how many bytes to read, ``-1`` for the whole stream.

:return bytes: the given data

.. comethod:: StreamReader.readany()

Read next data portion for the stream.

Returns immediately if internal buffer has a data.

:return bytes: the given data

.. comethod:: StreamReader.readexactly(n)

Read exactly *n* bytes.

Raise an :exc:`asyncio.IncompleteReadError` if the end of the
stream is reached before *n* can be read, the
:attr:`asyncio.IncompleteReadError.partial` attribute of the
exception contains the partial read bytes.

:param int n: how many bytes to read.

:return bytes: the given data


.. comethod:: StreamReader.readline()

Read one line, where “line” is a sequence of bytes ending
with ``\n``.

If EOF is received, and ``\n`` was not found, the method will
return the partial read bytes.

If the EOF was received and the internal buffer is empty, return an
empty bytes object.

:return bytes: the given line

Asynchronous Iteration Support
------------------------------


Stream reader supports asynchronous iteration over BODY.

By default it iterates over lines::

async for line in response.content:
print(line)

Also there are methods for iterating over data chunks with maximum
size limit and over any available data.

.. comethod:: StreamReader.iter_chunked(n)
:async-for:

Iterates over data chunks with maximum size limit::

async for data in response.content.iter_chunked(1024):
print(data)

.. comethod:: StreamReader.iter_any(n)
:async-for:

Iterates over data chunks in order of intaking them into the stream::

async for data in response.content.iter_any():
print(data)


Helpers
-------

.. method:: StreamReader.exception()

Get the exception occurred on data reading.

.. method:: is_eof()

Return ``True`` if EOF was reached.

Internal buffer may be not empty at the moment.

.. seealso::

:meth:`StreamReader.at_eof()`

.. method:: StreamReader.at_eof()

Return ``True`` if the buffer is empty and EOF was reached.

.. method:: StreamReader.read_nowait(n=None)

Returns data from internal buffer if any, empty bytes object otherwise.

Raises :exc:`RuntimeError` if other coroutine is waiting for stream.


:param int n: how many bytes to read, ``-1`` for the whole internal
buffer.

:return bytes: the given data

.. method:: StreamReader.unread_data(data)

Rollback reading some data from stream, inserting it to buffer head.

.. comethod:: readany()
:param bytes data: data to push back into the stream.

.. comethod:: readexactly(n)
.. warning:: The method doesn't wake up waiters.

.. comethod:: readline()
E.g. :meth:`~StreamReader.read()` will not be resumed.

.. method:: read_nowait(n=None)

.. comethod:: iter_chunked(n)
:async-for:
.. comethod:: wait_eof()

.. comethod:: iter_any(n)
:async-for:
Wait for EOF. The given data may be accessible by upcoming read calls.
2 changes: 1 addition & 1 deletion docs/web_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ like one using :meth:`Request.copy`.

.. attribute:: content

A :class:`~aiohttp.streams.FlowControlStreamReader` instance,
A :class:`~aiohttp.StreamReader` instance,
input stream for reading request's *BODY*.

Read-only property.
Expand Down