Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[StaticRoute] Add preliminary support for sendfile system call. #503

Merged
merged 11 commits into from
Sep 15, 2015
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 69 additions & 13 deletions aiohttp/web_urldispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class StaticRoute(Route):

def __init__(self, name, prefix, directory, *,
expect_handler=None, chunk_size=256*1024,
response_factory=None):
response_factory=None, sendfile_fallback=False):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fallback should be performed automatically.
User doesn't know (and doesn't care) is underlying OS support sendfile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but it's useful to force the use of the fallback sendfile in the tests. That's the primary reason why the option exists.
But fallback is done automatically if sendfile_fallback=False. This should really be named force_sendfile_fallback, but that's too long.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Public API is made not for tests, but for users. There is no need to pollute it with special flags, while you may call exact sendfile function in tests.

assert prefix.startswith('/'), prefix
assert prefix.endswith('/'), prefix
super().__init__(
Expand All @@ -163,6 +163,11 @@ def __init__(self, name, prefix, directory, *,
raise ValueError(
"No directory exists at '{}'".format(self._directory))

if sendfile_fallback or not hasattr(os, "sendfile"):
self.sendfile = self.sendfile_fallback
else:
self.sendfile = self.sendfile_system
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better have sendfile method explicitly defined in class while it behaviour is controlled by just hasattr(os, "sendfile") - if system has sendfile support who will abuse it usage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason why I did it this way was for the unittests.


def match(self, path):
if not path.startswith(self._prefix):
return None
Expand All @@ -174,6 +179,63 @@ def url(self, *, filename, query=None):
url = self._prefix + filename
return self._append_query(url, query)

def _sendfile_cb(self, fut, out_fd, in_fd, offset, count, loop):
try:
n = os.sendfile(out_fd, in_fd, offset, count)
except (BlockingIOError, InterruptedError):
return
except Exception as exc:
loop.remove_writer(out_fd)
fut.set_exception(exc)
return
else:
loop.remove_writer(out_fd)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please refactor code to avoid repeated remove_writer/add_writer calls.


if n < count:
loop.add_writer(out_fd, self._sendfile_cb, fut, out_fd, in_fd,
offset + n, count - n, loop)
else:
fut.set_result(None)

@asyncio.coroutine
def _sendfile(self, out_fd, in_fd, offset, count, loop):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method doesn't looks as coroutine, then why it is?

fut = asyncio.Future(loop=loop)
loop.add_writer(out_fd, self._sendfile_cb, fut, out_fd, in_fd, offset,
count, loop)
return fut

@asyncio.coroutine
def sendfile_system(self, resp, fobj):
"""
Write the content of `fobj` to `resp` using the ``sendfile`` system
call.

`fobj` should be an open file object.

`resp` should be a :obj:`aiohttp.web.StreamResponse` instance.
"""
yield from resp.drain()
req = resp._req
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pass request object into the method, don't access private attr.

loop = req.app.loop
out_fd = req._transport._sock_fd
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use public API, e.g. req.transport.get_extra_info('socket').

in_fd = fobj.fileno()
yield from self._sendfile(out_fd, in_fd, 0, os.fstat(in_fd).st_size,
loop)

@asyncio.coroutine
def sendfile_fallback(self, resp, fobj):
"""
Mimic the :meth:`sendfile` method, but without using the ``sendfile``
system call. This should be used on systems that don't support
``sendfile``.
"""
yield from resp.drain()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need drain here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To flush any headers, and give the client a chance to bail out early when the chunk_size is large.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That doesn't make sense: .drain() guarantees that data sent to OS buffer only, not derived to peer.

For sendfile version it's required, all headers should be sent to OS before pushing body.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, did not know that. I thought drain waited until the socket buffer was empty.
I'll remove it.

chunk = fobj.read(self._chunk_size)
while chunk:
resp.write(chunk)
yield from resp.drain()
chunk = fobj.read(self._chunk_size)

@asyncio.coroutine
def handle(self, request):
filename = request.match_info['filename']
Expand All @@ -200,20 +262,12 @@ def handle(self, request):
resp.last_modified = st.st_mtime

file_size = st.st_size
single_chunk = file_size < self._chunk_size

if single_chunk:
resp.content_length = file_size
resp.content_length = file_size
resp.start(request)

with open(filepath, 'rb') as f:
chunk = f.read(self._chunk_size)
if single_chunk:
resp.write(chunk)
else:
while chunk:
resp.write(chunk)
chunk = f.read(self._chunk_size)
yield from self.sendfile(resp, f)

return resp

Expand Down Expand Up @@ -413,7 +467,8 @@ def add_route(self, method, path, handler,
return route

def add_static(self, prefix, path, *, name=None, expect_handler=None,
chunk_size=256*1024, response_factory=None):
chunk_size=256*1024, response_factory=None,
sendfile_fallback=False):
"""
Adds static files view
:param prefix - url prefix
Expand All @@ -425,6 +480,7 @@ def add_static(self, prefix, path, *, name=None, expect_handler=None,
route = StaticRoute(name, prefix, path,
expect_handler=expect_handler,
chunk_size=chunk_size,
response_factory=response_factory)
response_factory=response_factory,
sendfile_fallback=sendfile_fallback)
self.register_route(route)
return route
11 changes: 9 additions & 2 deletions tests/test_web_functional.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import gc
import json
import os
import os.path
import socket
import unittest
Expand Down Expand Up @@ -312,14 +313,20 @@ def go():

self.loop.run_until_complete(go())

def test_static_file(self):
@unittest.skipUnless(hasattr(os, "sendfile"),
"system does not support 'sendfile' system call")
def test_static_file_sendfile(self):
self.test_static_file(False)

def test_static_file(self, sendfile_fallback=True):

@asyncio.coroutine
def go(dirname, filename):
app, _, url = yield from self.create_server(
'GET', '/static/' + filename
)
app.router.add_static('/static', dirname)
app.router.add_static('/static', dirname,
sendfile_fallback=sendfile_fallback)

resp = yield from request('GET', url, loop=self.loop)
self.assertEqual(200, resp.status)
Expand Down