Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

basic request streaming support with flow control #1423

Merged
merged 8 commits into from
Dec 28, 2018
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 38 additions & 17 deletions README.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,44 @@
Sanic
=====

|Join the chat at https://gitter.im/sanic-python/Lobby| |Build Status| |AppVeyor Build Status| |Documentation| |Codecov| |PyPI| |PyPI version| |Code style black|
.. start-badges

.. list-table::
:stub-columns: 1

* - Build
- | |Build Status| |AppVeyor Build Status| |Codecov|
* - Docs
- |Documentation|
* - Package
- | |PyPI| |PyPI version| |Wheel| |Supported implementations| |Code style black|
* - Support
- |Join the chat at https://gitter.im/sanic-python/Lobby|

.. |Join the chat at https://gitter.im/sanic-python/Lobby| image:: https://badges.gitter.im/sanic-python/Lobby.svg
:target: https://gitter.im/sanic-python/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge
.. |Codecov| image:: https://codecov.io/gh/huge-success/sanic/branch/master/graph/badge.svg
:target: https://codecov.io/gh/huge-success/sanic
.. |Build Status| image:: https://travis-ci.org/huge-success/sanic.svg?branch=master
:target: https://travis-ci.org/huge-success/sanic
.. |AppVeyor Build Status| image:: https://ci.appveyor.com/api/projects/status/d8pt3ids0ynexi8c/branch/master?svg=true
:target: https://ci.appveyor.com/project/huge-success/sanic
.. |Documentation| image:: https://readthedocs.org/projects/sanic/badge/?version=latest
:target: http://sanic.readthedocs.io/en/latest/?badge=latest
.. |PyPI| image:: https://img.shields.io/pypi/v/sanic.svg
:target: https://pypi.python.org/pypi/sanic/
.. |PyPI version| image:: https://img.shields.io/pypi/pyversions/sanic.svg
:target: https://pypi.python.org/pypi/sanic/
.. |Code style black| image:: https://img.shields.io/badge/code%20style-black-000000.svg
:target: https://github.com/ambv/black
.. |Wheel| image:: https://img.shields.io/pypi/wheel/sanic.svg
:alt: PyPI Wheel
:target: https://pypi.python.org/pypi/sanic
.. |Supported implementations| image:: https://img.shields.io/pypi/implementation/sanic.svg
:alt: Supported implementations
:target: https://pypi.python.org/pypi/sanic

.. end-badges

Sanic is a Flask-like Python 3.5+ web server that's written to go fast. It's based on the work done by the amazing folks at magicstack, and was inspired by `this article <https://magic.io/blog/uvloop-blazing-fast-python-networking/>`_.

Expand Down Expand Up @@ -45,22 +82,6 @@ Documentation

`Documentation on Readthedocs <http://sanic.readthedocs.io/>`_.

.. |Join the chat at https://gitter.im/sanic-python/Lobby| image:: https://badges.gitter.im/sanic-python/Lobby.svg
:target: https://gitter.im/sanic-python/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge
.. |Codecov| image:: https://codecov.io/gh/huge-success/sanic/branch/master/graph/badge.svg
:target: https://codecov.io/gh/huge-success/sanic
.. |Build Status| image:: https://travis-ci.org/huge-success/sanic.svg?branch=master
:target: https://travis-ci.org/huge-success/sanic
.. |AppVeyor Build Status| image:: https://ci.appveyor.com/api/projects/status/d8pt3ids0ynexi8c/branch/master?svg=true
:target: https://ci.appveyor.com/project/huge-success/sanic
.. |Documentation| image:: https://readthedocs.org/projects/sanic/badge/?version=latest
:target: http://sanic.readthedocs.io/en/latest/?badge=latest
.. |PyPI| image:: https://img.shields.io/pypi/v/sanic.svg
:target: https://pypi.python.org/pypi/sanic/
.. |PyPI version| image:: https://img.shields.io/pypi/pyversions/sanic.svg
:target: https://pypi.python.org/pypi/sanic/
.. |Code style black| image:: https://img.shields.io/badge/code%20style-black-000000.svg
:target: https://github.com/ambv/black

Questions and Discussion
------------------------
Expand Down
1 change: 1 addition & 0 deletions docs/sanic/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ Out of the box there are just a few predefined values which can be overwritten w
| Variable | Default | Description |
| ------------------------- | --------- | ------------------------------------------------------ |
| REQUEST_MAX_SIZE | 100000000 | How big a request may be (bytes) |
| REQUEST_BUFFER_QUEUE_SIZE | 100 | Request streaming buffer queue size |
| REQUEST_TIMEOUT | 60 | How long a request can take to arrive (sec) |
| RESPONSE_TIMEOUT | 60 | How long a response can take to process (sec) |
| KEEP_ALIVE | True | Disables keep-alive when False |
Expand Down
10 changes: 5 additions & 5 deletions docs/sanic/streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Request Streaming

Sanic allows you to get request data by stream, as below. When the request ends, `request.stream.get()` returns `None`. Only post, put and patch decorator have stream argument.
Sanic allows you to get request data by stream, as below. When the request ends, `await request.stream.read()` returns `None`. Only post, put and patch decorator have stream argument.

```python
from sanic import Sanic
Expand All @@ -22,7 +22,7 @@ class SimpleView(HTTPMethodView):
async def post(self, request):
result = ''
while True:
body = await request.stream.get()
body = await request.stream.read()
if body is None:
break
result += body.decode('utf-8')
Expand All @@ -33,7 +33,7 @@ class SimpleView(HTTPMethodView):
async def handler(request):
async def streaming(response):
while True:
body = await request.stream.get()
body = await request.stream.read()
if body is None:
break
body = body.decode('utf-8').replace('1', 'A')
Expand All @@ -45,7 +45,7 @@ async def handler(request):
async def bp_handler(request):
result = ''
while True:
body = await request.stream.get()
body = await request.stream.read()
if body is None:
break
result += body.decode('utf-8').replace('1', 'A')
Expand All @@ -55,7 +55,7 @@ async def bp_handler(request):
async def post_handler(request):
result = ''
while True:
body = await request.stream.get()
body = await request.stream.read()
if body is None:
break
result += body.decode('utf-8')
Expand Down
1 change: 1 addition & 0 deletions sanic/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,7 @@ def _helper(
"response_timeout": self.config.RESPONSE_TIMEOUT,
"keep_alive_timeout": self.config.KEEP_ALIVE_TIMEOUT,
"request_max_size": self.config.REQUEST_MAX_SIZE,
"request_buffer_queue_size": self.config.REQUEST_BUFFER_QUEUE_SIZE,
"keep_alive": self.config.KEEP_ALIVE,
"loop": loop,
"register_sys_signals": register_sys_signals,
Expand Down
1 change: 1 addition & 0 deletions sanic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(self, defaults=None, load_env=True, keep_alive=True):
▀▀▄▄▀
"""
self.REQUEST_MAX_SIZE = 100000000 # 100 megabytes
self.REQUEST_BUFFER_QUEUE_SIZE = 100
self.REQUEST_TIMEOUT = 60 # 60 seconds
self.RESPONSE_TIMEOUT = 60 # 60 seconds
self.KEEP_ALIVE = keep_alive
Expand Down
18 changes: 18 additions & 0 deletions sanic/request.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import json
import sys

Expand Down Expand Up @@ -47,6 +48,23 @@ def getlist(self, name, default=None):
return super().get(name, default)


class StreamBuffer:
def __init__(self, buffer_size=100):
self._queue = asyncio.Queue(buffer_size)

async def read(self):
""" Stop reading when gets None """
payload = await self._queue.get()
self._queue.task_done()
return payload

async def put(self, payload):
await self._queue.put(payload)

def is_full(self):
return self._queue.full()


class Request(dict):
"""Properties of an HTTP request such as URL, headers, etc."""

Expand Down
25 changes: 20 additions & 5 deletions sanic/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
ServiceUnavailable,
)
from sanic.log import access_logger, logger
from sanic.request import Request
from sanic.request import Request, StreamBuffer
from sanic.response import HTTPResponse


Expand Down Expand Up @@ -59,6 +59,7 @@ class HttpProtocol(asyncio.Protocol):
"response_timeout",
"keep_alive_timeout",
"request_max_size",
"request_buffer_queue_size",
"request_class",
"is_request_stream",
"router",
Expand All @@ -82,11 +83,12 @@ def __init__(
request_handler,
error_handler,
signal=Signal(),
connections=set(),
connections=None,
ahopkins marked this conversation as resolved.
Show resolved Hide resolved
request_timeout=60,
response_timeout=60,
keep_alive_timeout=5,
request_max_size=None,
request_buffer_queue_size=100,
request_class=None,
access_log=True,
keep_alive=True,
Expand All @@ -105,10 +107,11 @@ def __init__(
self.router = router
self.signal = signal
self.access_log = access_log
self.connections = connections
self.connections = connections or set()
self.request_handler = request_handler
self.error_handler = error_handler
self.request_timeout = request_timeout
self.request_buffer_queue_size = request_buffer_queue_size
self.response_timeout = response_timeout
self.keep_alive_timeout = keep_alive_timeout
self.request_max_size = request_max_size
Expand Down Expand Up @@ -291,17 +294,27 @@ def on_headers_complete(self):
self.request
)
if self._is_stream_handler:
self.request.stream = asyncio.Queue()
self.request.stream = StreamBuffer(
self.request_buffer_queue_size
)
self.execute_request_handler()

def on_body(self, body):
if self.is_request_stream and self._is_stream_handler:
self._request_stream_task = self.loop.create_task(
self.request.stream.put(body)
self.body_append(body)
)
return
yunstanford marked this conversation as resolved.
Show resolved Hide resolved
self.request.body_push(body)

async def body_append(self, body):
yunstanford marked this conversation as resolved.
Show resolved Hide resolved
if self.request.stream.is_full():
self.transport.pause_reading()
await self.request.stream.put(body)
self.transport.resume_reading()
else:
await self.request.stream.put(body)

def on_message_complete(self):
# Entire request (headers and whole body) is received.
# We can cancel and remove the request timeout handler now.
Expand Down Expand Up @@ -568,6 +581,7 @@ def serve(
ssl=None,
sock=None,
request_max_size=None,
request_buffer_queue_size=100,
reuse_port=False,
loop=None,
protocol=HttpProtocol,
Expand Down Expand Up @@ -628,6 +642,7 @@ def serve(
outgoing bytes, the low-water limit is a
quarter of the high-water limit.
:param is_request_stream: disable/enable Request.stream
:param request_buffer_queue_size: streaming request buffer queue size
:param router: Router object
:param graceful_shutdown_timeout: How long take to Force close non-idle
connection
Expand Down
Loading