Skip to content

Commit

Permalink
refactor timeout support
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Feb 26, 2017
1 parent 96c8190 commit 38360f9
Show file tree
Hide file tree
Showing 17 changed files with 417 additions and 480 deletions.
54 changes: 25 additions & 29 deletions aiohttp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
from . import client_exceptions, client_reqrep, hdrs, http
from .client_exceptions import * # noqa
from .client_exceptions import (ClientError, ClientOSError,
ClientResponseError, WSServerHandshakeError)
ClientResponseError, ServerTimeoutError,
WSServerHandshakeError)
from .client_reqrep import * # noqa
from .client_reqrep import ClientRequest, ClientResponse
from .client_ws import ClientWebSocketResponse
from .connector import * # noqa
from .connector import TCPConnector
from .cookiejar import CookieJar
from .helpers import PY_35, TimeService, noop
from .helpers import PY_35, CeilTimeout, TimeoutHandle, noop
from .http import WS_KEY, WebSocketReader, WebSocketWriter
from .streams import FlowControlDataQueue

Expand All @@ -48,8 +49,8 @@ def __init__(self, *, connector=None, loop=None, cookies=None,
response_class=ClientResponse,
ws_response_class=ClientWebSocketResponse,
version=http.HttpVersion11,
cookie_jar=None, read_timeout=None, time_service=None,
connector_owner=True):
cookie_jar=None, connector_owner=True,
read_timeout=None, conn_timeout=None):

implicit_loop = False
if loop is None:
Expand Down Expand Up @@ -93,6 +94,7 @@ def __init__(self, *, connector=None, loop=None, cookies=None,
self._default_auth = auth
self._version = version
self._read_timeout = read_timeout
self._conn_timeout = conn_timeout

# Convert to list of tuples
if headers:
Expand All @@ -110,12 +112,6 @@ def __init__(self, *, connector=None, loop=None, cookies=None,
self._response_class = response_class
self._ws_response_class = ws_response_class

self._time_service_owner = time_service is None
if time_service is None:
time_service = TimeService(self._loop)

self._time_service = time_service

def __del__(self, _warnings=warnings):
if not self.closed:
self.close()
Expand All @@ -128,10 +124,6 @@ def __del__(self, _warnings=warnings):
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)

@property
def time_service(self):
return self._time_service

def request(self, method, url, **kwargs):
"""Perform HTTP request."""
return _RequestContextManager(self._request(method, url, **kwargs))
Expand Down Expand Up @@ -195,22 +187,16 @@ def _request(self, method, url, *,
if proxy is not None:
proxy = URL(proxy)

# request timeout
if timeout is None:
timeout = self._read_timeout
if timeout is None:
timeout = self._connector.conn_timeout
elif self._connector.conn_timeout is not None:
timeout = max(timeout, self._connector.conn_timeout)

# timeout is cumulative for all request operations
# (request, redirects, responses, data consuming)
timer = self._time_service.timeout(timeout)
tm = TimeoutHandle(
self._loop, timeout if timeout is not None else self._read_timeout)
handle = tm.start()

timer = tm.timer()
with timer:
while True:
url = URL(url).with_fragment(None)

cookies = self._cookie_jar.filter_cookies(url)

req = self._request_class(
Expand All @@ -221,7 +207,14 @@ def _request(self, method, url, *,
loop=self._loop, response_class=self._response_class,
proxy=proxy, proxy_auth=proxy_auth, timer=timer)

conn = yield from self._connector.connect(req)
# connection timeout
try:
with CeilTimeout(self._conn_timeout, loop=self._loop):
conn = yield from self._connector.connect(req)
except asyncio.TimeoutError as exc:
raise ServerTimeoutError(
'Connection timeout to host {0}'.format(url)) from exc

conn.writer.set_tcp_nodelay(True)
try:
resp = req.send(conn)
Expand Down Expand Up @@ -285,6 +278,13 @@ def _request(self, method, url, *,

break

# register connection
if handle is not None:
if resp.connection is not None:
resp.connection.add_callback(handle.cancel)
else:
handle.cancel()

resp._history = tuple(history)
return resp

Expand Down Expand Up @@ -417,7 +417,6 @@ def _ws_connect(self, url, *,
autoclose,
autoping,
self._loop,
time_service=self.time_service,
receive_timeout=receive_timeout,
heartbeat=heartbeat)

Expand Down Expand Up @@ -496,9 +495,6 @@ def close(self):
self._connector.close()
self._connector = None

if self._time_service_owner:
self._time_service.close()

return noop()

@property
Expand Down
14 changes: 6 additions & 8 deletions aiohttp/client_reqrep.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@

from . import hdrs, helpers, http, payload
from .formdata import FormData
from .helpers import (PY_35, HeadersMixin, SimpleCookie,
_TimeServiceTimeoutNoop, noop)
from .helpers import PY_35, HeadersMixin, SimpleCookie, TimerNoop, noop
from .http import HttpMessage
from .log import client_logger
from .streams import FlowControlStreamReader
Expand Down Expand Up @@ -81,7 +80,7 @@ def __init__(self, method, url, *,
self.compress = compress
self.loop = loop
self.response_class = response_class or ClientResponse
self._timer = timer if timer is not None else _TimeServiceTimeoutNoop()
self._timer = timer if timer is not None else TimerNoop()

if loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(1))
Expand Down Expand Up @@ -448,15 +447,16 @@ def __init__(self, method, url, *,
assert isinstance(url, URL)

self.method = method
self.headers = None
self.cookies = SimpleCookie()

self._url = url
self._content = None
self._writer = writer
self._continue = continue100
self._closed = True
self._history = ()
self.headers = None
self._timer = timer if timer is not None else _TimeServiceTimeoutNoop()
self.cookies = SimpleCookie()
self._timer = timer if timer is not None else TimerNoop()

@property
def url(self):
Expand All @@ -470,8 +470,6 @@ def url_obj(self):

@property
def host(self):
warnings.warn(
"Deprecated, use .url.host", DeprecationWarning, stacklevel=2)
return self._url.host

@property
Expand Down
23 changes: 12 additions & 11 deletions aiohttp/client_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import json

from .client_exceptions import ClientError
from .helpers import PY_35, PY_352, create_future
from .helpers import PY_35, PY_352, Timeout, call_later, create_future
from .http import (WS_CLOSED_MESSAGE, WS_CLOSING_MESSAGE,
WebSocketError, WSMessage, WSMsgType)

Expand All @@ -13,15 +13,13 @@ class ClientWebSocketResponse:

def __init__(self, reader, writer, protocol,
response, timeout, autoclose, autoping, loop, *,
time_service=None,
receive_timeout=None, heartbeat=None):
self._response = response
self._conn = response.connection

self._writer = writer
self._reader = reader
self._protocol = protocol
self._time_service = time_service
self._closed = False
self._closing = False
self._close_code = None
Expand All @@ -31,6 +29,8 @@ def __init__(self, reader, writer, protocol,
self._autoping = autoping
self._heartbeat = heartbeat
self._heartbeat_cb = None
if heartbeat is not None:
self._pong_heartbeat = heartbeat/2.0
self._pong_response_cb = None
self._loop = loop
self._waiting = None
Expand All @@ -51,17 +51,17 @@ def _reset_heartbeat(self):
self._cancel_heartbeat()

if self._heartbeat is not None:
self._heartbeat_cb = self._time_service.call_later(
self._heartbeat, self._send_heartbeat)
self._heartbeat_cb = call_later(
self._send_heartbeat, self._heartbeat, self._loop)

def _send_heartbeat(self):
if self._heartbeat is not None and not self._closed:
self.ping()

if self._pong_response_cb is not None:
self._pong_response_cb.cancel()
self._pong_response_cb = self._time_service.call_later(
self._heartbeat/2.0, self._pong_not_received)
self._pong_response_cb = call_later(
self._pong_not_received, self._pong_heartbeat, self._loop)

def _pong_not_received(self):
self._closed = True
Expand Down Expand Up @@ -133,7 +133,7 @@ def close(self, *, code=1000, message=b''):

while True:
try:
with self._time_service.timeout(self._timeout):
with Timeout(self._timeout, loop=self._loop):
msg = yield from self._reader.read()
except asyncio.CancelledError:
self._close_code = 1006
Expand Down Expand Up @@ -168,11 +168,12 @@ def receive(self, timeout=None):
try:
try:
self._waiting = create_future(self._loop)
with self._time_service.timeout(
timeout or self._receive_timeout):
with Timeout(
timeout or self._receive_timeout,
loop=self._loop):
msg = yield from self._reader.read()
self._reset_heartbeat()
finally:
self._reset_heartbeat()
waiter = self._waiting
self._waiting = None
waiter.set_result(True)
Expand Down
Loading

0 comments on commit 38360f9

Please sign in to comment.