Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Streaming #1791

Closed
wants to merge 77 commits into from
Closed
Show file tree
Hide file tree
Changes from 59 commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
6279eac
Streaming request by async for.
Tronic Feb 21, 2020
fe64a27
Make all requests streaming and preload body for non-streaming handlers.
Tronic Feb 21, 2020
f609a48
Cleanup of code and avoid mixing streaming responses.
Tronic Feb 21, 2020
f6a0b4a
Async http protocol loop.
Tronic Feb 24, 2020
3d05e1e
Change of test: don't require early bad request error but only after …
Tronic Feb 24, 2020
ef4f233
Add back streaming requests.
Tronic Feb 24, 2020
42d86bc
Rewritten request body parser.
Tronic Feb 26, 2020
6b9f0ec
Misc. cleanup, down to 4 failing tests.
Tronic Feb 26, 2020
b87364b
All tests OK.
Tronic Feb 26, 2020
29c6f3c
Entirely remove request body queue.
Tronic Feb 26, 2020
6d8f598
Let black f*ckup the layout
Tronic Feb 26, 2020
b2476bd
Better testing error messages on protocol errors.
Tronic Feb 28, 2020
d597137
Remove StreamBuffer tests because the type is about to be removed.
Tronic Feb 28, 2020
85c67a0
Remove tests using the deprecated get_headers function that can no lo…
Tronic Feb 28, 2020
85b1ad5
Major refactoring of HTTP protocol handling (new module http.py added…
Tronic Feb 28, 2020
8a1baeb
Terminate check_timeouts once connection_task finishes.
Tronic Feb 28, 2020
57202bf
Code cleanup, 14 tests failing.
Tronic Feb 29, 2020
a553e64
Much cleanup, 12 failing...
Tronic Feb 29, 2020
7f41c5f
Even more cleanup and error checking, 8 failing tests.
Tronic Feb 29, 2020
85be576
Remove keep-alive header from responses. First of all, it should say …
Tronic Mar 1, 2020
2840e4c
Everything but CustomServer OK.
Tronic Mar 1, 2020
f829893
Merge branch 'master' into streaming
Tronic Mar 1, 2020
5086076
Linter
Tronic Mar 1, 2020
fc16594
Disable custom protocol test
Tronic Mar 1, 2020
c0a0b50
Remove unnecessary variables, optimise performance.
Tronic Mar 1, 2020
5a96996
A test was missing that body_init/body_push/body_finish are never cal…
Tronic Mar 1, 2020
1c42a5e
Minor fixes.
Tronic Mar 1, 2020
0712026
Remove unused code.
Tronic Mar 1, 2020
d918655
Py 3.8 check for deprecated loop argument.
Tronic Mar 1, 2020
31a8706
Fix a middleware cancellation handling test with py38.
Tronic Mar 1, 2020
0835363
Linter 'n fixes
Tronic Mar 1, 2020
7e93ee1
Typing
Tronic Mar 1, 2020
9baa241
Merge branch 'master' into streaming
Tronic Mar 2, 2020
50cca39
Stricter handling of request header size
Tronic Mar 2, 2020
e73f26b
More specific error messages on Payload Too Large.
Tronic Mar 2, 2020
96a8b5c
Init http.response = None
Tronic Mar 2, 2020
85d58d7
Messages further tuned.
Tronic Mar 2, 2020
9c21457
Always try to consume request body, plus minor cleanup.
Tronic Mar 2, 2020
c2e5674
Add a missing check in case of close_if_idle on a dead connection.
Tronic Mar 2, 2020
cbabe7e
Avoid error messages on PayloadTooLarge.
Tronic Mar 2, 2020
dc6b492
Add test for new API.
Tronic Mar 4, 2020
eb66621
json takes str, not bytes
Tronic Mar 4, 2020
730de6a
Default to no maximum request size for streaming handlers.
Tronic Mar 4, 2020
4c34508
Merge branch 'master' into streaming
Tronic Mar 6, 2020
cbfeb1c
Fix chunked mode crash.
Tronic Mar 8, 2020
990ac52
Header values should be strictly ASCII but both UTF-8 and Latin-1 exi…
Tronic Mar 8, 2020
d348bb4
Refactoring and cleanup.
Tronic Mar 8, 2020
5351cda
Unify response header processing of ASGI and asyncio modes.
Tronic Mar 8, 2020
c86c29e
Avoid special handling of StreamingHTTPResponse.
Tronic Mar 8, 2020
a0e61ae
35 % speedup in HTTP/1.1 response formatting (not so much overall eff…
Tronic Mar 9, 2020
32ee539
Duplicate set-cookie headers were being produced.
Tronic Mar 9, 2020
a9d984e
Cleanup processed_headers some more.
Tronic Mar 9, 2020
9dc2ec9
Linting
Tronic Mar 9, 2020
2b63d2b
Import ordering
Tronic Mar 9, 2020
2adcc72
Response middleware ran by async request.respond().
Tronic Mar 9, 2020
d2d6008
Need to check if transport is closing to avoid getting stuck in sendi…
Tronic Mar 10, 2020
17d1004
Middleware and error handling refactoring.
Tronic Mar 10, 2020
9098493
Linter
Tronic Mar 10, 2020
23e54fc
Fix tracking of HTTP stage when writing to transport fails.
Tronic Mar 20, 2020
f1c85eb
Add clarifying comment
Tronic Mar 22, 2020
42af6e1
Add a check for request body functions and a test for NotImplementedE…
Tronic Mar 25, 2020
5832764
Merge branch 'master' into streaming
Tronic Mar 25, 2020
6bedec9
Linter and typing
Tronic Mar 25, 2020
f928ad2
These must be tuples + hack mypy warnings away.
Tronic Mar 25, 2020
1aac4f5
Merge branch 'master' into streaming
Tronic Mar 26, 2020
01480c4
New streaming test and minor fixes.
Tronic Mar 26, 2020
e4a9b43
Constant receive buffer size.
Tronic Mar 26, 2020
abc1e3e
256 KiB send and receive buffers.
Tronic Mar 26, 2020
39d10bf
Revert "256 KiB send and receive buffers."
Tronic Mar 26, 2020
c84163f
app.handle_exception already sends the response.
Tronic Mar 20, 2020
fcae70b
Improved handling of errors during request.
Tronic Mar 21, 2020
232177d
An odd hack to avoid an httpx limitation that causes test failures.
Tronic Mar 26, 2020
b6a4bd3
Merge branch 'master' into streaming
Tronic Mar 29, 2020
95b8b7c
Merge branch 'master' into streaming
Tronic Apr 7, 2020
9b6f259
Merge branch 'master' into streaming
Tronic Apr 9, 2020
36ffaa7
Merge branch 'master' into streaming
ahopkins May 13, 2020
ef10275
Merge branch 'master' into streaming
ahopkins Jun 3, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 62 additions & 70 deletions sanic/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from sanic.exceptions import SanicException, ServerError, URLBuildError
from sanic.handlers import ErrorHandler
from sanic.log import LOGGING_CONFIG_DEFAULTS, error_logger, logger
from sanic.response import HTTPResponse, StreamingHTTPResponse
from sanic.response import BaseHTTPResponse, HTTPResponse
from sanic.router import Router
from sanic.server import (
AsyncioServer,
Expand Down Expand Up @@ -82,7 +82,6 @@ def __init__(
self.strict_slashes = strict_slashes
self.listeners = defaultdict(list)
self.is_running = False
self.is_request_stream = False
self.websocket_enabled = False
self.websocket_tasks = set()
self.named_request_middleware = {}
Expand Down Expand Up @@ -187,9 +186,6 @@ def route(
if not uri.startswith("/"):
uri = "/" + uri

if stream:
self.is_request_stream = True

if strict_slashes is None:
strict_slashes = self.strict_slashes

Expand Down Expand Up @@ -942,7 +938,41 @@ def converted_response_type(self, response):
"""
pass

async def handle_request(self, request, write_callback, stream_callback):
async def handle_exception(self, request, exception):
try:
response = self.error_handler.response(request, exception)
if isawaitable(response):
response = await response
except Exception as e:
if isinstance(e, SanicException):
response = self.error_handler.default(request, e)
elif self.debug:
response = HTTPResponse(
f"Error while handling error: {e}\nStack: {format_exc()}",
status=500,
)
else:
response = HTTPResponse(
"An error occurred while handling an error", status=500
)
if response is not None:
try:
response = await request.respond(response)
except BaseException:
# Skip response middleware
request.stream.respond(response)
await response.send(end_stream=True)
raise
else:
response = request.stream.response
if isinstance(response, BaseHTTPResponse):
await response.send(end_stream=True)
else:
raise ServerError(
f"Invalid response type {response!r} (need HTTPResponse)"
)

async def handle_request(self, request):
"""Take a request from the HTTP Server and return a response object
to be sent back The HTTP Server only expects a response object, so
exception handling must be done here
Expand All @@ -958,11 +988,19 @@ async def handle_request(self, request, write_callback, stream_callback):
# Define `response` var here to remove warnings about
# allocation before assignment below.
response = None
cancelled = False
name = None
try:
# Fetch handler from router
handler, args, kwargs, uri, name = self.router.get(request)
request.name = name

if request.stream.request_body:
if self.router.is_stream_handler(request):
# Streaming handler: lift the size limit
request.stream.request_max_size = float("inf")
else:
# Non-streaming handler: preload body
await request.receive_body()

# -------------------------------------------- #
# Request Middleware
Expand Down Expand Up @@ -999,73 +1037,25 @@ async def handle_request(self, request, write_callback, stream_callback):
response = handler(request, *args, **kwargs)
if isawaitable(response):
response = await response
if response:
response = await request.respond(response)
else:
response = request.stream.response
# Make sure that response is finished / run StreamingHTTP callback
if isinstance(response, BaseHTTPResponse):
await response.send(end_stream=True)
else:
raise ServerError(
f"Invalid response type {response!r} (need HTTPResponse)"
)

except CancelledError:
# If response handler times out, the server handles the error
# and cancels the handle_request job.
# In this case, the transport is already closed and we cannot
# issue a response.
response = None
cancelled = True
raise
except Exception as e:
# -------------------------------------------- #
# Response Generation Failed
# -------------------------------------------- #

try:
response = self.error_handler.response(request, e)
if isawaitable(response):
response = await response
except Exception as e:
if isinstance(e, SanicException):
response = self.error_handler.default(
request=request, exception=e
)
elif self.debug:
response = HTTPResponse(
"Error while handling error: {}\nStack: {}".format(
e, format_exc()
),
status=500,
)
else:
response = HTTPResponse(
"An error occurred while handling an error", status=500
)
finally:
# -------------------------------------------- #
# Response Middleware
# -------------------------------------------- #
# Don't run response middleware if response is None
if response is not None:
try:
response = await self._run_response_middleware(
request, response, request_name=name
)
except CancelledError:
# Response middleware can timeout too, as above.
response = None
cancelled = True
except BaseException:
error_logger.exception(
"Exception occurred in one of response "
"middleware handlers"
)
if cancelled:
raise CancelledError()

# pass the response to the correct callback
if write_callback is None or isinstance(
response, StreamingHTTPResponse
):
if stream_callback:
await stream_callback(response)
else:
# Should only end here IF it is an ASGI websocket.
# TODO:
# - Add exception handling
pass
else:
write_callback(response)
await self.handle_exception(request, e)

# -------------------------------------------------------------------- #
# Testing
Expand Down Expand Up @@ -1346,6 +1336,8 @@ async def _run_response_middleware(
_response = await _response
if _response:
response = _response
if isinstance(response, BaseHTTPResponse):
response = request.stream.respond(response)
break
return response

Expand Down Expand Up @@ -1395,7 +1387,7 @@ def _helper(
server_settings = {
"protocol": protocol,
"request_class": self.request_class,
"is_request_stream": self.is_request_stream,
"is_request_stream": True,
"router": self.router,
"host": host,
"port": port,
Expand Down
Loading