Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async iterators payload #2873

Merged
merged 9 commits into from
Mar 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions aiohttp/multipart.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
CONTENT_TRANSFER_ENCODING, CONTENT_TYPE)
from .helpers import CHAR, TOKEN, parse_mimetype, reify
from .http import HttpParser
from .payload import (JsonPayload, LookupError, Payload, StringPayload,
from .payload import (JsonPayload, LookupError, Order, Payload, StringPayload,
get_payload, payload_type)


Expand Down Expand Up @@ -434,7 +434,7 @@ def filename(self):
return content_disposition_filename(params, 'filename')


@payload_type(BodyPartReader)
@payload_type(BodyPartReader, order=Order.try_first)
class BodyPartReaderPayload(Payload):

def __init__(self, value, *args, **kwargs):
Expand Down
77 changes: 64 additions & 13 deletions aiohttp/payload.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import enum
import io
import json
import mimetypes
import os
import warnings
from abc import ABC, abstractmethod
from collections.abc import AsyncIterable
from itertools import chain

from multidict import CIMultiDict

Expand All @@ -16,30 +19,38 @@
__all__ = ('PAYLOAD_REGISTRY', 'get_payload', 'payload_type', 'Payload',
'BytesPayload', 'StringPayload',
'IOBasePayload', 'BytesIOPayload', 'BufferedReaderPayload',
'TextIOPayload', 'StringIOPayload', 'JsonPayload')
'TextIOPayload', 'StringIOPayload', 'JsonPayload',
'AsyncIterablePayload')

TOO_LARGE_BYTES_BODY = 2 ** 20
TOO_LARGE_BYTES_BODY = 2 ** 20 # 1 MB


class LookupError(Exception):
pass


class Order(enum.Enum):
normal = 'normal'
try_first = 'try_first'
try_last = 'try_last'


def get_payload(data, *args, **kwargs):
return PAYLOAD_REGISTRY.get(data, *args, **kwargs)


def register_payload(factory, type):
PAYLOAD_REGISTRY.register(factory, type)
def register_payload(factory, type, *, order=Order.normal):
PAYLOAD_REGISTRY.register(factory, type, order=order)


class payload_type:

def __init__(self, type):
def __init__(self, type, *, order=Order.normal):
self.type = type
self.order = order

def __call__(self, factory):
register_payload(factory, self.type)
register_payload(factory, self.type, order=self.order)
return factory


Expand All @@ -50,19 +61,28 @@ class PayloadRegistry:
"""

def __init__(self):
self._registry = []
self._first = []
self._normal = []
self._last = []

def get(self, data, *args, **kwargs):
def get(self, data, *args, _CHAIN=chain, **kwargs):
if isinstance(data, Payload):
return data
for factory, type in self._registry:
for factory, type in _CHAIN(self._first, self._normal, self._last):
if isinstance(data, type):
return factory(data, *args, **kwargs)

raise LookupError()

def register(self, factory, type):
self._registry.append((factory, type))
def register(self, factory, type, *, order=Order.normal):
if order is Order.try_first:
self._first.append((factory, type))
elif order is Order.normal:
self._normal.append((factory, type))
elif order is Order.try_last:
self._last.append((factory, type))
else:
raise ValueError("Unsupported order {!r}".format(order))


class Payload(ABC):
Expand Down Expand Up @@ -136,8 +156,9 @@ async def write(self, writer):
class BytesPayload(Payload):

def __init__(self, value, *args, **kwargs):
assert isinstance(value, (bytes, bytearray, memoryview)), \
"value argument must be byte-ish (%r)" % type(value)
if not isinstance(value, (bytes, bytearray, memoryview)):
raise TypeError("value argument must be byte-ish, not (!r)"
.format(type(value)))

if 'content_type' not in kwargs:
kwargs['content_type'] = 'application/octet-stream'
Expand Down Expand Up @@ -278,6 +299,32 @@ def __init__(self, value,
content_type=content_type, encoding=encoding, *args, **kwargs)


class AsyncIterablePayload(Payload):

def __init__(self, value, *args, **kwargs):
if not isinstance(value, AsyncIterable):
raise TypeError("value argument must support "
"collections.abc.AsyncIterablebe interface, "
"got {!r}".format(type(value)))

if 'content_type' not in kwargs:
kwargs['content_type'] = 'application/octet-stream'

super().__init__(value, *args, **kwargs)

self._iter = value.__aiter__()

async def write(self, writer):
try:
# iter is not None check prevents rare cases
# when the case iterable is used twice
while True:
chunk = await self._iter.__anext__()
await writer.write(chunk)
except StopAsyncIteration:
self._iter = None


PAYLOAD_REGISTRY = PayloadRegistry()
PAYLOAD_REGISTRY.register(BytesPayload, (bytes, bytearray, memoryview))
PAYLOAD_REGISTRY.register(StringPayload, str)
Expand All @@ -287,3 +334,7 @@ def __init__(self, value,
PAYLOAD_REGISTRY.register(
BufferedReaderPayload, (io.BufferedReader, io.BufferedRandom))
PAYLOAD_REGISTRY.register(IOBasePayload, io.IOBase)
# try_last for giving a chance to more specialized async interables like
# multidict.BodyPartReaderPayload override the default
PAYLOAD_REGISTRY.register(AsyncIterablePayload, AsyncIterable,
order=Order.try_last)
4 changes: 4 additions & 0 deletions aiohttp/payload_streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ async def file_sender(writer, file_name=None):
"""

import asyncio
import warnings

from .payload import Payload, payload_type

Expand All @@ -43,6 +44,9 @@ async def __call__(self, writer):
class streamer:

def __init__(self, coro):
warnings.warn("@streamer is deprecated, use async generators instead",
DeprecationWarning,
stacklevel=2)
self.coro = coro

def __call__(self, *args, **kwargs):
Expand Down
53 changes: 32 additions & 21 deletions docs/client_quickstart.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
.. _aiohttp-client-quickstart:

Client Quickstart
=================
===================
Client Quickstart
===================

.. currentmodule:: aiohttp

Expand All @@ -16,7 +17,7 @@ Let's get started with some simple examples.


Make a Request
--------------
==============

Begin by importing the aiohttp module::

Expand Down Expand Up @@ -62,7 +63,7 @@ Other HTTP methods are available as well::


Passing Parameters In URLs
--------------------------
==========================

You often want to send some sort of data in the URL's query string. If
you were constructing the URL by hand, this data would be given as key/value
Expand Down Expand Up @@ -123,7 +124,7 @@ is not encoded by library. Note that ``+`` is not encoded::
Passing *params* overrides ``encoded=True``, never use both options.

Response Content and Status Code
--------------------------------
================================

We can read the content of the server's response and it's status
code. Consider the GitHub time-line again::
Expand All @@ -144,7 +145,7 @@ specify custom encoding for the :meth:`~ClientResponse.text` method::


Binary Response Content
-----------------------
=======================

You can also access the response body as bytes, for non-text requests::

Expand All @@ -161,7 +162,7 @@ You can enable ``brotli`` transfer-encodings support,
just install `brotlipy <https://github.com/python-hyper/brotlipy>`_.

JSON Request
------------
============

Any of session's request methods like :func:`request`,
:meth:`ClientSession.get`, :meth:`ClientSesssion.post` etc. accept
Expand All @@ -188,7 +189,7 @@ parameter::
incompatible.

JSON Response Content
---------------------
=====================

There's also a built-in JSON decoder, in case you're dealing with JSON data::

Expand All @@ -207,7 +208,7 @@ decoder functions for the :meth:`~ClientResponse.json` call.


Streaming Response Content
--------------------------
==========================

While methods :meth:`~ClientResponse.read`,
:meth:`~ClientResponse.json` and :meth:`~ClientResponse.text` are very
Expand Down Expand Up @@ -237,7 +238,7 @@ It is not possible to use :meth:`~ClientResponse.read`,
explicit reading from :attr:`~ClientResponse.content`.

More complicated POST requests
------------------------------
==============================

Typically, you want to send some form-encoded data -- much like an HTML form.
To do this, simply pass a dictionary to the *data* argument. Your
Expand Down Expand Up @@ -278,7 +279,7 @@ To send text with appropriate content-type just use ``text`` attribute ::
...

POST a Multipart-Encoded File
-----------------------------
=============================

To upload Multipart-encoded files::

Expand Down Expand Up @@ -306,7 +307,7 @@ for supported format information.


Streaming uploads
-----------------
=================

:mod:`aiohttp` supports multiple types of streaming uploads, which allows you to
send large files without reading them into memory.
Expand All @@ -317,28 +318,38 @@ As a simple case, simply provide a file-like object for your body::
await session.post('http://httpbin.org/post', data=f)


Or you can use :class:`aiohttp.streamer` decorator::
Or you can use *asynchronous generator*::

@aiohttp.streamer
async def file_sender(writer, file_name=None):
with open(file_name, 'rb') as f:
chunk = f.read(2**16)
async def file_sender(file_name=None):
async with aiofiles.open(file_name, 'rb') as f:
chunk = await f.read(64*1024)
while chunk:
await writer.write(chunk)
chunk = f.read(2**16)
yield chunk
chunk = await f.read(64*1024)

# Then you can use file_sender as a data provider:

async with session.post('http://httpbin.org/post',
data=file_sender(file_name='huge_file')) as resp:
print(await resp.text())

.. note::

Python 3.5 has no support for asynchronous generators, use
``async_generator`` library as workaround.

.. deprecated:: 3.1

``aiohttp`` still supports ``aiohttp.streamer`` decorator but this
approach is deprecated in favor of *asynchronous generators* as
shown above.


.. _aiohttp-client-websockets:


WebSockets
----------
==========

:mod:`aiohttp` works with client websockets out-of-the-box.

Expand Down Expand Up @@ -372,7 +383,7 @@ multiple writer tasks which can only send data asynchronously (by


Timeouts
--------
========

By default all IO operations have 5min timeout. The timeout may be
overridden by passing ``timeout`` parameter into
Expand Down
3 changes: 2 additions & 1 deletion docs/client_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ The client session supports the context manager protocol for self closing.
.. versionadded:: 2.3

:param trace_request_ctx: Object used to give as a kw param for each new
:class:`TraceConfig` object instantiated, used to give information to the
:class:`TraceConfig` object instantiated,
used to give information to the
tracers that is only available at request time.

.. versionadded:: 3.0
Expand Down
Loading