From 38360f9a05aa09ac2bf9a7077edbc2fb0e2520aa Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 25 Feb 2017 18:54:36 -0800 Subject: [PATCH] refactor timeout support --- aiohttp/client.py | 54 +++--- aiohttp/client_reqrep.py | 14 +- aiohttp/client_ws.py | 23 +-- aiohttp/connector.py | 113 +++++------ aiohttp/helpers.py | 151 +++++++-------- aiohttp/server.py | 22 +-- aiohttp/test_utils.py | 3 +- aiohttp/web_server.py | 4 + aiohttp/web_ws.py | 25 +-- docs/client_reference.rst | 32 ++-- tests/test_client_connection.py | 56 ++++++ tests/test_client_functional.py | 20 +- tests/test_client_response.py | 3 +- tests/test_client_ws_functional.py | 17 +- tests/test_connector.py | 256 ++++++++++--------------- tests/test_helpers.py | 84 -------- tests/test_web_websocket_functional.py | 20 +- 17 files changed, 417 insertions(+), 480 deletions(-) diff --git a/aiohttp/client.py b/aiohttp/client.py index d1162078470..4095b0b3f6c 100644 --- a/aiohttp/client.py +++ b/aiohttp/client.py @@ -15,14 +15,15 @@ from . import client_exceptions, client_reqrep, hdrs, http from .client_exceptions import * # noqa from .client_exceptions import (ClientError, ClientOSError, - ClientResponseError, WSServerHandshakeError) + ClientResponseError, ServerTimeoutError, + WSServerHandshakeError) from .client_reqrep import * # noqa from .client_reqrep import ClientRequest, ClientResponse from .client_ws import ClientWebSocketResponse from .connector import * # noqa from .connector import TCPConnector from .cookiejar import CookieJar -from .helpers import PY_35, TimeService, noop +from .helpers import PY_35, CeilTimeout, TimeoutHandle, noop from .http import WS_KEY, WebSocketReader, WebSocketWriter from .streams import FlowControlDataQueue @@ -48,8 +49,8 @@ def __init__(self, *, connector=None, loop=None, cookies=None, response_class=ClientResponse, ws_response_class=ClientWebSocketResponse, version=http.HttpVersion11, - cookie_jar=None, read_timeout=None, time_service=None, - connector_owner=True): + cookie_jar=None, connector_owner=True, + read_timeout=None, conn_timeout=None): implicit_loop = False if loop is None: @@ -93,6 +94,7 @@ def __init__(self, *, connector=None, loop=None, cookies=None, self._default_auth = auth self._version = version self._read_timeout = read_timeout + self._conn_timeout = conn_timeout # Convert to list of tuples if headers: @@ -110,12 +112,6 @@ def __init__(self, *, connector=None, loop=None, cookies=None, self._response_class = response_class self._ws_response_class = ws_response_class - self._time_service_owner = time_service is None - if time_service is None: - time_service = TimeService(self._loop) - - self._time_service = time_service - def __del__(self, _warnings=warnings): if not self.closed: self.close() @@ -128,10 +124,6 @@ def __del__(self, _warnings=warnings): context['source_traceback'] = self._source_traceback self._loop.call_exception_handler(context) - @property - def time_service(self): - return self._time_service - def request(self, method, url, **kwargs): """Perform HTTP request.""" return _RequestContextManager(self._request(method, url, **kwargs)) @@ -195,22 +187,16 @@ def _request(self, method, url, *, if proxy is not None: proxy = URL(proxy) - # request timeout - if timeout is None: - timeout = self._read_timeout - if timeout is None: - timeout = self._connector.conn_timeout - elif self._connector.conn_timeout is not None: - timeout = max(timeout, self._connector.conn_timeout) - # timeout is cumulative for all request operations # (request, redirects, responses, data consuming) - timer = self._time_service.timeout(timeout) + tm = TimeoutHandle( + self._loop, timeout if timeout is not None else self._read_timeout) + handle = tm.start() + timer = tm.timer() with timer: while True: url = URL(url).with_fragment(None) - cookies = self._cookie_jar.filter_cookies(url) req = self._request_class( @@ -221,7 +207,14 @@ def _request(self, method, url, *, loop=self._loop, response_class=self._response_class, proxy=proxy, proxy_auth=proxy_auth, timer=timer) - conn = yield from self._connector.connect(req) + # connection timeout + try: + with CeilTimeout(self._conn_timeout, loop=self._loop): + conn = yield from self._connector.connect(req) + except asyncio.TimeoutError as exc: + raise ServerTimeoutError( + 'Connection timeout to host {0}'.format(url)) from exc + conn.writer.set_tcp_nodelay(True) try: resp = req.send(conn) @@ -285,6 +278,13 @@ def _request(self, method, url, *, break + # register connection + if handle is not None: + if resp.connection is not None: + resp.connection.add_callback(handle.cancel) + else: + handle.cancel() + resp._history = tuple(history) return resp @@ -417,7 +417,6 @@ def _ws_connect(self, url, *, autoclose, autoping, self._loop, - time_service=self.time_service, receive_timeout=receive_timeout, heartbeat=heartbeat) @@ -496,9 +495,6 @@ def close(self): self._connector.close() self._connector = None - if self._time_service_owner: - self._time_service.close() - return noop() @property diff --git a/aiohttp/client_reqrep.py b/aiohttp/client_reqrep.py index db6ab77a8bd..5a672759d33 100644 --- a/aiohttp/client_reqrep.py +++ b/aiohttp/client_reqrep.py @@ -13,8 +13,7 @@ from . import hdrs, helpers, http, payload from .formdata import FormData -from .helpers import (PY_35, HeadersMixin, SimpleCookie, - _TimeServiceTimeoutNoop, noop) +from .helpers import PY_35, HeadersMixin, SimpleCookie, TimerNoop, noop from .http import HttpMessage from .log import client_logger from .streams import FlowControlStreamReader @@ -81,7 +80,7 @@ def __init__(self, method, url, *, self.compress = compress self.loop = loop self.response_class = response_class or ClientResponse - self._timer = timer if timer is not None else _TimeServiceTimeoutNoop() + self._timer = timer if timer is not None else TimerNoop() if loop.get_debug(): self._source_traceback = traceback.extract_stack(sys._getframe(1)) @@ -448,15 +447,16 @@ def __init__(self, method, url, *, assert isinstance(url, URL) self.method = method + self.headers = None + self.cookies = SimpleCookie() + self._url = url self._content = None self._writer = writer self._continue = continue100 self._closed = True self._history = () - self.headers = None - self._timer = timer if timer is not None else _TimeServiceTimeoutNoop() - self.cookies = SimpleCookie() + self._timer = timer if timer is not None else TimerNoop() @property def url(self): @@ -470,8 +470,6 @@ def url_obj(self): @property def host(self): - warnings.warn( - "Deprecated, use .url.host", DeprecationWarning, stacklevel=2) return self._url.host @property diff --git a/aiohttp/client_ws.py b/aiohttp/client_ws.py index bafff41007f..cfca5fa2b7b 100644 --- a/aiohttp/client_ws.py +++ b/aiohttp/client_ws.py @@ -4,7 +4,7 @@ import json from .client_exceptions import ClientError -from .helpers import PY_35, PY_352, create_future +from .helpers import PY_35, PY_352, Timeout, call_later, create_future from .http import (WS_CLOSED_MESSAGE, WS_CLOSING_MESSAGE, WebSocketError, WSMessage, WSMsgType) @@ -13,7 +13,6 @@ class ClientWebSocketResponse: def __init__(self, reader, writer, protocol, response, timeout, autoclose, autoping, loop, *, - time_service=None, receive_timeout=None, heartbeat=None): self._response = response self._conn = response.connection @@ -21,7 +20,6 @@ def __init__(self, reader, writer, protocol, self._writer = writer self._reader = reader self._protocol = protocol - self._time_service = time_service self._closed = False self._closing = False self._close_code = None @@ -31,6 +29,8 @@ def __init__(self, reader, writer, protocol, self._autoping = autoping self._heartbeat = heartbeat self._heartbeat_cb = None + if heartbeat is not None: + self._pong_heartbeat = heartbeat/2.0 self._pong_response_cb = None self._loop = loop self._waiting = None @@ -51,8 +51,8 @@ def _reset_heartbeat(self): self._cancel_heartbeat() if self._heartbeat is not None: - self._heartbeat_cb = self._time_service.call_later( - self._heartbeat, self._send_heartbeat) + self._heartbeat_cb = call_later( + self._send_heartbeat, self._heartbeat, self._loop) def _send_heartbeat(self): if self._heartbeat is not None and not self._closed: @@ -60,8 +60,8 @@ def _send_heartbeat(self): if self._pong_response_cb is not None: self._pong_response_cb.cancel() - self._pong_response_cb = self._time_service.call_later( - self._heartbeat/2.0, self._pong_not_received) + self._pong_response_cb = call_later( + self._pong_not_received, self._pong_heartbeat, self._loop) def _pong_not_received(self): self._closed = True @@ -133,7 +133,7 @@ def close(self, *, code=1000, message=b''): while True: try: - with self._time_service.timeout(self._timeout): + with Timeout(self._timeout, loop=self._loop): msg = yield from self._reader.read() except asyncio.CancelledError: self._close_code = 1006 @@ -168,11 +168,12 @@ def receive(self, timeout=None): try: try: self._waiting = create_future(self._loop) - with self._time_service.timeout( - timeout or self._receive_timeout): + with Timeout( + timeout or self._receive_timeout, + loop=self._loop): msg = yield from self._reader.read() - self._reset_heartbeat() finally: + self._reset_heartbeat() waiter = self._waiting self._waiting = None waiter.set_result(True) diff --git a/aiohttp/connector.py b/aiohttp/connector.py index 7b720e6832b..0fe571417b0 100644 --- a/aiohttp/connector.py +++ b/aiohttp/connector.py @@ -11,10 +11,10 @@ from . import hdrs, helpers from .client_exceptions import (ClientConnectorError, ClientHttpProxyError, ClientProxyConnectionError, - ServerFingerprintMismatch, ServerTimeoutError) + ServerFingerprintMismatch) from .client_proto import HttpClientProtocol from .client_reqrep import ClientRequest -from .helpers import SimpleCookie, is_ip_address, sentinel +from .helpers import SimpleCookie, is_ip_address, noop, sentinel from .resolver import DefaultResolver __all__ = ('BaseConnector', 'TCPConnector', 'UnixConnector') @@ -36,6 +36,7 @@ def __init__(self, connector, key, protocol, loop): self._connector = connector self._loop = loop self._protocol = protocol + self._callbacks = [] if loop.get_debug(): self._source_traceback = traceback.extract_stack(sys._getframe(1)) @@ -75,13 +76,30 @@ def protocol(self): def writer(self): return self._protocol.writer + def add_callback(self, callback): + if callback is not None: + self._callbacks.append(callback) + + def _notify_release(self): + callbacks, self._callbacks = self._callbacks[:], [] + + for cb in callbacks: + try: + cb() + except: + pass + def close(self): + self._notify_release() + if self._protocol is not None: self._connector._release( self._key, self._protocol, should_close=True) self._protocol = None def release(self): + self._notify_release() + if self._protocol is not None: self._connector._release( self._key, self._protocol, @@ -89,6 +107,8 @@ def release(self): self._protocol = None def detach(self): + self._notify_release() + if self._protocol is not None: self._connector._release_acquired(self._protocol) self._protocol = None @@ -108,7 +128,6 @@ def close(self): class BaseConnector(object): """Base connector class. - conn_timeout - (optional) Connect timeout. keepalive_timeout - (optional) Keep-alive timeout. force_close - Set to True to force close and do reconnect after each request (and between redirects). @@ -124,9 +143,9 @@ class BaseConnector(object): # abort transport after 2 seconds (cleanup broken connections) _cleanup_closed_period = 2.0 - def __init__(self, *, conn_timeout=None, keepalive_timeout=sentinel, + def __init__(self, *, keepalive_timeout=sentinel, force_close=False, limit=100, limit_per_host=0, - time_service=None, disable_cleanup_closed=False, loop=None): + disable_cleanup_closed=False, loop=None): if force_close: if keepalive_timeout is not None and \ @@ -149,18 +168,10 @@ def __init__(self, *, conn_timeout=None, keepalive_timeout=sentinel, self._limit_per_host = limit_per_host self._acquired = set() self._acquired_per_host = defaultdict(set) - self._conn_timeout = conn_timeout self._keepalive_timeout = keepalive_timeout self._force_close = force_close self._waiters = defaultdict(list) - if time_service is not None: - self._time_service_owner = False - self._time_service = time_service - else: - self._time_service_owner = True - self._time_service = helpers.TimeService(loop) - self._loop = loop self._factory = functools.partial(HttpClientProtocol, loop=loop) @@ -168,9 +179,6 @@ def __init__(self, *, conn_timeout=None, keepalive_timeout=sentinel, # start keep-alive connection cleanup task self._cleanup_handle = None - if (keepalive_timeout is not sentinel and - keepalive_timeout is not None): - self._cleanup() # start cleanup closed transports task self._cleanup_closed_handle = None @@ -203,10 +211,6 @@ def __enter__(self): def __exit__(self, *exc): self.close() - @property - def conn_timeout(self): - return self._conn_timeout - @property def force_close(self): """Ultimately close connection on releasing if True.""" @@ -237,11 +241,12 @@ def _cleanup(self): if self._cleanup_handle: self._cleanup_handle.cancel() - now = self._time_service.loop_time() + now = self._loop.time() + timeout = self._keepalive_timeout if self._conns: connections = {} - deadline = now - self._keepalive_timeout + deadline = now - timeout for key, conns in self._conns.items(): alive = [] for proto, use_time in conns: @@ -259,8 +264,9 @@ def _cleanup(self): self._conns = connections - self._cleanup_handle = self._time_service.call_later( - self._keepalive_timeout / 2.0, self._cleanup) + if self._conns: + self._cleanup_handle = helpers.weakref_handle( + self._cleanup, timeout, self._loop) def _cleanup_closed(self): """Double confirmation for transport close. @@ -276,30 +282,19 @@ def _cleanup_closed(self): self._cleanup_closed_transports = [] if not self._cleanup_closed_disabled: - self._cleanup_closed_handle = self._time_service.call_later( - self._cleanup_closed_period, self._cleanup_closed) + self._cleanup_closed_handle = helpers.weakref_handle( + self._cleanup_closed, self._cleanup_closed_period, self._loop) def close(self): """Close all opened transports.""" - ret = helpers.create_future(self._loop) - ret.set_result(None) if self._closed: - return ret + return noop() + self._closed = True try: if self._loop.is_closed(): - return ret - - if self._time_service_owner: - self._time_service.close() - - for data in self._conns.values(): - for proto, t0 in data: - proto.close() - - for proto in self._acquired: - proto.close() + return noop() # cacnel cleanup task if self._cleanup_handle: @@ -309,6 +304,13 @@ def close(self): if self._cleanup_closed_handle: self._cleanup_closed_handle.cancel() + for data in self._conns.values(): + for proto, t0 in data: + proto.close() + + for proto in self._acquired: + proto.close() + for transport in self._cleanup_closed_transports: if transport is not None: transport.abort() @@ -321,7 +323,7 @@ def close(self): self._cleanup_closed_transports.clear() self._cleanup_closed_handle = None - return ret + return noop() @property def closed(self): @@ -371,12 +373,7 @@ def connect(self, req): self._acquired.add(placeholder) self._acquired_per_host[key].add(placeholder) try: - with self._time_service.timeout(self._conn_timeout): - proto = yield from self._create_connection(req) - except asyncio.TimeoutError as exc: - raise ServerTimeoutError( - 'Connection timeout to host {0[0]}:{0[1]} ssl:{0[2]}' - .format(key)) from exc + proto = yield from self._create_connection(req) except OSError as exc: raise ClientConnectorError( exc.errno, @@ -396,7 +393,7 @@ def _get(self, key): except KeyError: return None - t1 = self._time_service.loop_time() + t1 = self._loop.time() while conns: proto, t0 = conns.pop() if proto.is_connected(): @@ -472,7 +469,11 @@ def _release(self, key, protocol, *, should_close=False): conns = self._conns.get(key) if conns is None: conns = self._conns[key] = [] - conns.append((protocol, self._time_service.loop_time())) + conns.append((protocol, self._loop.time())) + + if self._cleanup_handle is None: + self._cleanup_handle = helpers.weakref_handle( + self._cleanup, self._keepalive_timeout, self._loop) @asyncio.coroutine def _create_connection(self, req): @@ -498,7 +499,6 @@ class TCPConnector(BaseConnector): family - socket address family local_addr - local tuple of (host, port) to bind socket to - conn_timeout - (optional) Connect timeout. keepalive_timeout - (optional) Keep-alive timeout. force_close - Set to True to force close and do reconnect after each request (and between redirects). @@ -510,11 +510,9 @@ class TCPConnector(BaseConnector): def __init__(self, *, verify_ssl=True, fingerprint=None, resolve=sentinel, use_dns_cache=True, family=0, ssl_context=None, local_addr=None, - resolver=None, time_service=None, - conn_timeout=None, keepalive_timeout=sentinel, + resolver=None, keepalive_timeout=sentinel, force_close=False, limit=100, limit_per_host=0, loop=None): - super().__init__(time_service=time_service, conn_timeout=conn_timeout, - keepalive_timeout=keepalive_timeout, + super().__init__(keepalive_timeout=keepalive_timeout, force_close=force_close, limit=limit, limit_per_host=limit_per_host, loop=loop) @@ -754,7 +752,6 @@ class UnixConnector(BaseConnector): """Unix socket connector. path - Unix socket path. - conn_timeout - (optional) Connect timeout. keepalive_timeout - (optional) Keep-alive timeout. force_close - Set to True to force close and do reconnect after each request (and between redirects). @@ -770,13 +767,9 @@ class UnixConnector(BaseConnector): """ - def __init__(self, path, force_close=False, - time_service=None, - conn_timeout=None, keepalive_timeout=sentinel, + def __init__(self, path, force_close=False, keepalive_timeout=sentinel, limit=100, limit_per_host=0, loop=None): super().__init__(force_close=force_close, - time_service=time_service, - conn_timeout=conn_timeout, keepalive_timeout=keepalive_timeout, limit=limit, limit_per_host=limit_per_host, loop=loop) self._path = path diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index fae5bf3ec8c..cfdefc81f31 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -6,13 +6,14 @@ import cgi import datetime import functools -import heapq import os import re import sys import time +import weakref from collections import MutableSequence, namedtuple from functools import total_ordering +from math import ceil from pathlib import Path from time import gmtime from urllib.parse import quote @@ -518,39 +519,24 @@ def insert(self, pos, item): self._items.insert(pos, item) -class TimerHandle(asyncio.TimerHandle): - - def cancel(self): - asyncio.Handle.cancel(self) - - class TimeService: def __init__(self, loop, *, interval=1.0): self._loop = loop self._interval = interval self._time = time.time() - self._loop_time = loop.time() self._count = 0 self._strtime = None - self._cb = loop.call_at(self._loop_time + self._interval, self._on_cb) - self._scheduled = [] + self._cb = loop.call_later(self._interval, self._on_cb) def close(self): if self._cb: self._cb.cancel() - # cancel all scheduled handles - for handle in self._scheduled: - handle.cancel() - self._cb = None - self._scheduled = [] self._loop = None def _on_cb(self, reset_count=10*60): - self._loop_time = self._loop.time() - if self._count >= reset_count: # reset timer every 10 minutes self._count = 0 @@ -558,23 +544,8 @@ def _on_cb(self, reset_count=10*60): else: self._time += self._interval - # Handle 'later' callbacks that are ready. - ready = [] - end_time = self._loop_time - while self._scheduled: - handle = self._scheduled[0] - if handle._when >= end_time: - break - handle = heapq.heappop(self._scheduled) - ready.append(handle) - - for handle in ready: - if not handle._cancelled: - handle._run() - self._strtime = None - self._cb = self._loop.call_at( - self._loop_time + self._interval, self._on_cb) + self._cb = self._loop.call_later(self._interval, self._on_cb) def _format_date_time(self): # Weekday and month names for HTTP date/time formatting; @@ -599,54 +570,66 @@ def strtime(self): self._strtime = s = self._format_date_time() return self._strtime - def loop_time(self): - return self._loop_time - def call_later(self, delay, callback, *args): - """Arrange for a callback to be called at a given time. +def _weakref_handle(ref): + cb = ref() + if cb is not None: + cb() - Return a Handle: an opaque object with a cancel() method that - can be used to cancel the call. - The delay can be an int or float, expressed in seconds. It is - always relative to the current time. +def weakref_handle(cb, timeout, loop, ceil_timeout=True): + if timeout is not None and timeout > 0: + when = loop.time() + timeout + if ceil_timeout: + when = ceil(when) - Any positional arguments after the callback will be passed to - the callback when it is called. + return loop.call_at(when, _weakref_handle, weakref.ref(cb)) - Time resolution is aproximatly one second. - """ - return self._call_at(self._loop_time + delay, callback, *args) - def _call_at(self, when, callback, *args): - """Like call_later(), but uses an absolute time. +def call_later(cb, timeout, loop, ceil_timeout=True): + if timeout is not None and timeout > 0: + when = loop.time() + timeout + if ceil_timeout: + when = ceil(when) - Absolute time corresponds to the loop's time() method. - """ - timer = TimerHandle(when, callback, args, self._loop) - heapq.heappush(self._scheduled, timer) - return timer + return loop.call_at(when, cb) - def timeout(self, timeout): - """low resolution timeout context manager. - timeout - value in seconds or None to disable timeout logic - """ - if self._loop is None: - raise RuntimeError - - if timeout: - ctx = _TimeServiceTimeoutContext(self._loop) - when = self._loop_time + timeout - timer = TimerHandle(when, ctx.cancel, (), self._loop) - heapq.heappush(self._scheduled, timer) - else: - ctx = _TimeServiceTimeoutNoop() +class TimeoutHandle: + """ Timeout handle """ + + def __init__(self, loop, timeout): + self._timeout = timeout + self._loop = loop + self._callbacks = [] + + def register(self, callback, *args, **kwargs): + self._callbacks.append((callback, args, kwargs)) + + def close(self): + self._callbacks.clear() - return ctx + def start(self): + if self._timeout is not None and self._timeout > 0: + at = ceil(self._loop.time() + self._timeout) + return self._loop.call_at(at, self.__call__) + + def timer(self): + timer = TimerContext(self._loop) + self.register(timer.timeout) + return timer + + def __call__(self): + for cb, args, kwargs in self._callbacks: + try: + cb(*args, **kwargs) + except: + pass + + self._callbacks.clear() -class _TimeServiceTimeoutNoop: +class TimerNoop: def __enter__(self): return self @@ -655,12 +638,10 @@ def __exit__(self, exc_type, exc_val, exc_tb): return False -class _TimeServiceTimeoutContext: +class TimerContext: """ Low resolution timeout context manager """ def __init__(self, loop): - assert loop is not None, "loop is not set" - self._loop = loop self._tasks = [] self._cancelled = False @@ -680,20 +661,40 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): if self._tasks: - self._tasks.pop() + task = self._tasks.pop() + else: + task = None if exc_type is asyncio.CancelledError and self._cancelled: + for task in self._tasks: + task.cancel() raise asyncio.TimeoutError from None - def cancel(self): + if exc_type is None and self._cancelled and task is not None: + task.cancel() + + def timeout(self): if not self._cancelled: for task in self._tasks: task.cancel() - self._tasks = [] self._cancelled = True +class CeilTimeout(Timeout): + + def __enter__(self): + if self._timeout is not None and self._timeout > 0: + self._task = asyncio.Task.current_task(loop=self._loop) + if self._task is None: + raise RuntimeError('Timeout context manager should be used ' + 'inside a task') + self._cancel_handler = self._loop.call_at( + ceil(self._loop.time() + self._timeout), self._cancel_task) + + return self + + class HeadersMixin: _content_type = None diff --git a/aiohttp/server.py b/aiohttp/server.py index 8a8998059c1..ceb4ebbe929 100644 --- a/aiohttp/server.py +++ b/aiohttp/server.py @@ -11,7 +11,8 @@ from html import escape as html_escape from . import hdrs, helpers -from .helpers import TimeService, create_future, ensure_future +from .helpers import (CeilTimeout, TimeService, call_later, create_future, + ensure_future) from .http import HttpProcessingError, HttpRequestParser, Response from .log import access_logger, server_logger from .streams import StreamWriter @@ -178,7 +179,7 @@ def shutdown(self, timeout=15.0): if self._request_count and timeout and not closing: with suppress(asyncio.CancelledError): - with self.time_service.timeout(timeout): + with CeilTimeout(timeout, loop=self._loop): while self._request_handlers: h = None for handler in self._request_handlers: @@ -330,13 +331,12 @@ def log_exception(self, *args, **kw): self.logger.exception(*args, **kw) def _process_keepalive(self): - if self._closing: + if self._closing or not self._process_keepalive: return if self._request_handlers: - self._keepalive_handle = self._time_service.call_later( - self._keepalive_timeout, self._process_keepalive) - + self._keepalive_handle = call_later( + self._process_keepalive, self._keepalive_timeout, self._loop) elif self.transport is not None: self.transport.close() @@ -372,7 +372,6 @@ def start(self, message, payload): """ loop = self._loop handler = self._request_handlers[-1] - time_service = self.time_service while not self._closing: try: @@ -388,18 +387,17 @@ def start(self, message, payload): 'Start lingering close timer for %s sec.', self._lingering_time) - now = time_service.time() + now = loop.time() end_time = now + self._lingering_time with suppress(asyncio.TimeoutError): - while (not payload.is_eof() and - now < end_time): + while (not payload.is_eof() and now < end_time): timeout = min( end_time - now, self._lingering_timeout) - with time_service.timeout(timeout): + with CeilTimeout(timeout, loop=loop): # read and ignore yield from payload.readany() - now = time_service.time() + now = loop.time() except asyncio.CancelledError: self._closing = True self.log_debug('Ignored premature client disconnection') diff --git a/aiohttp/test_utils.py b/aiohttp/test_utils.py index 58a1b6ecf2c..b281cfe1dc8 100644 --- a/aiohttp/test_utils.py +++ b/aiohttp/test_utils.py @@ -17,7 +17,7 @@ from aiohttp.client import _RequestContextManager from . import ClientSession, hdrs -from .helpers import PY_35, TimeService, sentinel +from .helpers import PY_35, sentinel from .http import HttpVersion, RawRequestMessage from .signals import Signal from .web import Application, Request, Server, UrlMappingMatchInfo @@ -205,7 +205,6 @@ def __init__(self, app_or_server, *, scheme=sentinel, host=sentinel, if cookie_jar is None: cookie_jar = aiohttp.CookieJar(unsafe=True, loop=self._loop) - kwargs['time_service'] = TimeService(self._loop, interval=0.1) self._session = ClientSession(loop=self._loop, cookie_jar=cookie_jar, **kwargs) diff --git a/aiohttp/web_server.py b/aiohttp/web_server.py index dfcdeec8e76..1d36d118765 100644 --- a/aiohttp/web_server.py +++ b/aiohttp/web_server.py @@ -24,6 +24,10 @@ def __init__(self, manager, **kwargs): self._request_factory = manager.request_factory self._handler = manager.handler + @property + def time_service(self): + return self._time_service + def __repr__(self): if self._request is None: meth = 'none' diff --git a/aiohttp/web_ws.py b/aiohttp/web_ws.py index 204215f9396..1ea87d278e1 100644 --- a/aiohttp/web_ws.py +++ b/aiohttp/web_ws.py @@ -3,7 +3,7 @@ from collections import namedtuple from . import hdrs -from .helpers import PY_35, PY_352, create_future +from .helpers import PY_35, PY_352, Timeout, call_later, create_future from .http import (WS_CLOSED_MESSAGE, WS_CLOSING_MESSAGE, HttpProcessingError, WebSocketError, WebSocketReader, WSMessage, WSMsgType, do_handshake) @@ -50,8 +50,9 @@ def __init__(self, *, self._autoping = autoping self._heartbeat = heartbeat self._heartbeat_cb = None + if heartbeat is not None: + self._pong_heartbeat = heartbeat/2.0 self._pong_response_cb = None - self._time_service = None def _cancel_heartbeat(self): if self._pong_response_cb is not None: @@ -66,8 +67,8 @@ def _reset_heartbeat(self): self._cancel_heartbeat() if self._heartbeat is not None: - self._heartbeat_cb = self._time_service.call_later( - self._heartbeat, self._send_heartbeat) + self._heartbeat_cb = call_later( + self._send_heartbeat, self._heartbeat, self._loop) def _send_heartbeat(self): if self._heartbeat is not None and not self._closed: @@ -75,8 +76,8 @@ def _send_heartbeat(self): if self._pong_response_cb is not None: self._pong_response_cb.cancel() - self._pong_response_cb = self._time_service.call_later( - self._heartbeat/2.0, self._pong_not_received) + self._pong_response_cb = call_later( + self._pong_not_received, self._pong_heartbeat, self._loop) def _pong_not_received(self): self._closed = True @@ -99,6 +100,8 @@ def prepare(self, request): return payload_writer def _pre_start(self, request): + self._loop = request.app.loop + try: status, headers, _, writer, protocol = do_handshake( request.method, request.headers, request._protocol.writer, @@ -112,7 +115,6 @@ def _pre_start(self, request): else: # pragma: no cover raise HTTPInternalServerError() from err - self._time_service = request.time_service self._reset_heartbeat() if self.status != status: @@ -124,7 +126,6 @@ def _pre_start(self, request): def _post_start(self, request, protocol, writer): self._ws_protocol = protocol - self._loop = request.app.loop self._writer = writer self._reader = FlowControlDataQueue( request._protocol, limit=2 ** 16, loop=self._loop) @@ -225,7 +226,7 @@ def close(self, *, code=1000, message=b''): return True try: - with self._time_service.timeout(self._timeout): + with Timeout(self._timeout, loop=self._loop): msg = yield from self._reader.read() except asyncio.CancelledError: self._close_code = 1006 @@ -266,11 +267,11 @@ def receive(self, timeout=None): try: self._waiting = create_future(self._loop) try: - with self._time_service.timeout( - timeout or self._receive_timeout): + with Timeout( + timeout or self._receive_timeout, loop=self._loop): msg = yield from self._reader.read() - self._reset_heartbeat() finally: + self._reset_heartbeat() waiter = self._waiting self._waiting = None waiter.set_result(True) diff --git a/docs/client_reference.rst b/docs/client_reference.rst index c0b960c0d8d..850fcc8dc9f 100644 --- a/docs/client_reference.rst +++ b/docs/client_reference.rst @@ -45,7 +45,7 @@ The client session supports the context manager protocol for self closing. headers=None, skip_auto_headers=None, \ auth=None, \ version=aiohttp.HttpVersion11, \ - cookie_jar=None) + cookie_jar=None, read_timeout=None, conn_timeout=None) The class for creating client sessions and making requests. @@ -100,6 +100,13 @@ The client session supports the context manager protocol for self closing. .. versionadded:: 0.22 + :param float read_timeout: Request operations timeout. read_timeout is + cumulative for all request operations (request, redirects, responses, + data consuming) + + :param float conn_timeout: timeout for connection establishing + (optional). Values ``0`` or ``None`` mean no timeout. + .. versionchanged:: 1.0 ``.cookies`` attribute was dropped. Use :attr:`cookie_jar` @@ -113,7 +120,7 @@ The client session supports the context manager protocol for self closing. .. attribute:: connector - :class:`aiohttp.connector.BaseConnector` derived instance used + :class:`aiohttp.connector.BaseConnector` derived instance used for the session. A read-only property. @@ -139,7 +146,7 @@ The client session supports the context manager protocol for self closing. auth=None, allow_redirects=True,\ max_redirects=10, encoding='utf-8',\ version=HttpVersion(major=1, minor=1),\ - compress=None, chunked=None, expect100=False,\ + compress=None, chunked=None, expect100=False,\ read_until_eof=True,\ proxy=None, proxy_auth=None,\ timeout=5*60) @@ -588,26 +595,25 @@ constructor's parameter). BaseConnector ^^^^^^^^^^^^^ -.. class:: BaseConnector(*, conn_timeout=None, keepalive_timeout=30, \ - limit=20, \ +.. class:: BaseConnector(*, keepalive_timeout=30, \ + limit=100, limit_per_host=None, \ force_close=False, loop=None) Base class for all connectors. - :param float conn_timeout: timeout for connection establishing - (optional). Values ``0`` or ``None`` - mean no timeout. - :param float keepalive_timeout: timeout for connection reusing after releasing (optional). Values ``0``. For disabling *keep-alive* feature use ``force_close=True`` flag. - :param int limit: limit for simultaneous connections to the same - endpoint. Endpoints are the same if they are - have equal ``(host, port, is_ssl)`` triple. - If *limit* is ``None`` the connector has no limit (default: 20). + :param int limit: Total number simultaneous connections. If *limit* is + ``None`` the connector has no limit (default: 100). + + :param int limit_by_host: limit for simultaneous connections to the same + endpoint. Endpoints are the same if they are + have equal ``(host, port, is_ssl)`` triple. + If *limit* is ``None`` the connector has no limit (default: None). :param bool force_close: do close underlying sockets after connection releasing (optional). diff --git a/tests/test_client_connection.py b/tests/test_client_connection.py index 24a6ee42adf..86a9bbc5730 100644 --- a/tests/test_client_connection.py +++ b/tests/test_client_connection.py @@ -39,6 +39,62 @@ def test_ctor(connector, key, protocol, loop): conn.close() +def test_callbacks_on_close(connector, key, protocol, loop): + conn = Connection(connector, key, protocol, loop) + notified = False + + def cb(): + nonlocal notified + notified = True + + conn.add_callback(cb) + conn.close() + assert notified + + +def test_callbacks_on_release(connector, key, protocol, loop): + conn = Connection(connector, key, protocol, loop) + notified = False + + def cb(): + nonlocal notified + notified = True + + conn.add_callback(cb) + conn.release() + assert notified + + +def test_callbacks_on_detach(connector, key, protocol, loop): + conn = Connection(connector, key, protocol, loop) + notified = False + + def cb(): + nonlocal notified + notified = True + + conn.add_callback(cb) + conn.detach() + assert notified + + +def test_callbacks_exception(connector, key, protocol, loop): + conn = Connection(connector, key, protocol, loop) + notified = False + + def cb1(): + raise Exception + + def cb2(): + nonlocal notified + notified = True + + conn.add_callback(cb1) + conn.add_callback(cb2) + conn.close() + assert notified + + def test_del(connector, key, protocol, loop): loop.is_closed.return_value = False conn = Connection(connector, key, protocol, loop) diff --git a/tests/test_client_functional.py b/tests/test_client_functional.py index cf48daca9a8..a880005a185 100644 --- a/tests/test_client_functional.py +++ b/tests/test_client_functional.py @@ -37,6 +37,10 @@ def fname(here): return here / 'sample.key' +def ceil(val): + return val + + @asyncio.coroutine def test_keepalive_two_requests_success(loop, test_client): @asyncio.coroutine @@ -573,7 +577,8 @@ def handler(request): @asyncio.coroutine -def test_timeout_on_reading_headers(loop, test_client): +def test_timeout_on_reading_headers(loop, test_client, mocker): + mocker.patch('aiohttp.helpers.ceil').side_effect = ceil @asyncio.coroutine def handler(request): @@ -591,9 +596,11 @@ def handler(request): @asyncio.coroutine -def test_timeout_on_conn_reading_headers(loop, test_client): +def test_timeout_on_conn_reading_headers(loop, test_client, mocker): # tests case where user did not set a connection timeout + mocker.patch('aiohttp.helpers.ceil').side_effect = ceil + @asyncio.coroutine def handler(request): resp = web.StreamResponse() @@ -612,7 +619,9 @@ def handler(request): @asyncio.coroutine -def test_timeout_on_session_read_timeout(loop, test_client): +def test_timeout_on_session_read_timeout(loop, test_client, mocker): + mocker.patch('aiohttp.helpers.ceil').side_effect = ceil + @asyncio.coroutine def handler(request): resp = web.StreamResponse() @@ -772,14 +781,13 @@ def test_HTTP_200_OK_METHOD_connector(loop, test_client): def handler(request): return web.Response(text=request.method) - conn = aiohttp.TCPConnector( - conn_timeout=0.2, resolve=True, loop=loop) + conn = aiohttp.TCPConnector(resolve=True, loop=loop) conn.clear_dns_cache() app = web.Application(loop=loop) for meth in ('get', 'post', 'put', 'delete', 'head'): app.router.add_route(meth.upper(), '/', handler) - client = yield from test_client(app, connector=conn) + client = yield from test_client(app, connector=conn, conn_timeout=0.2) for meth in ('get', 'post', 'put', 'delete', 'head'): resp = yield from client.request(meth, '/') diff --git a/tests/test_client_response.py b/tests/test_client_response.py index 312ffb1eec8..0449c5132f5 100644 --- a/tests/test_client_response.py +++ b/tests/test_client_response.py @@ -385,8 +385,7 @@ def test_raise_for_status_4xx(): def test_resp_host(): response = ClientResponse('get', URL('http://del-cl-resp.org')) - with pytest.warns(DeprecationWarning): - assert 'del-cl-resp.org' == response.host + assert 'del-cl-resp.org' == response.host def test_content_type(): diff --git a/tests/test_client_ws_functional.py b/tests/test_client_ws_functional.py index 48164bca8d4..7901bca3bfb 100644 --- a/tests/test_client_ws_functional.py +++ b/tests/test_client_ws_functional.py @@ -6,6 +6,14 @@ from aiohttp import hdrs, helpers, web +@pytest.fixture +def ceil(mocker): + def ceil(val): + return val + + mocker.patch('aiohttp.helpers.ceil').side_effect = ceil + + @asyncio.coroutine def test_send_recv_text(loop, test_client): @@ -358,7 +366,8 @@ def handler(request): yield from ws.prepare(request) yield from ws.receive_bytes() ws.send_str('test') - yield from asyncio.sleep(10, loop=loop) + yield from asyncio.sleep(1, loop=loop) + return ws app = web.Application(loop=loop) app.router.add_route('GET', '/', handler) @@ -520,7 +529,6 @@ def handler(request): client = yield from test_client(app) resp = yield from client.ws_connect('/', receive_timeout=0.1) - resp._time_service._interval = 0.05 with pytest.raises(asyncio.TimeoutError): yield from resp.receive(0.05) @@ -544,7 +552,6 @@ def handler(request): client = yield from test_client(app) resp = yield from client.ws_connect('/') - resp._time_service._interval = 0.05 with pytest.raises(asyncio.TimeoutError): yield from resp.receive(0.05) @@ -553,7 +560,7 @@ def handler(request): @asyncio.coroutine -def test_heartbeat(loop, test_client): +def test_heartbeat(loop, test_client, ceil): ping_received = False @asyncio.coroutine @@ -580,7 +587,7 @@ def handler(request): @asyncio.coroutine -def test_heartbeat_no_pong(loop, test_client): +def test_heartbeat_no_pong(loop, test_client, ceil): ping_received = False @asyncio.coroutine diff --git a/tests/test_connector.py b/tests/test_connector.py index b0110d5f686..7a70bdcdf09 100644 --- a/tests/test_connector.py +++ b/tests/test_connector.py @@ -41,12 +41,12 @@ def ssl_key(): def test_del(loop): conn = aiohttp.BaseConnector( - loop=loop, time_service=unittest.mock.Mock()) - proto = unittest.mock.Mock() - conn._conns['a'] = [(proto, 123)] + loop=loop, disable_cleanup_closed=True) + proto = mock.Mock(should_close=False) + conn._release('a', proto) conns_impl = conn._conns - exc_handler = unittest.mock.Mock() + exc_handler = mock.Mock() loop.set_exception_handler(exc_handler) with pytest.warns(ResourceWarning): @@ -55,11 +55,11 @@ def test_del(loop): assert not conns_impl proto.close.assert_called_with() - msg = {'connector': unittest.mock.ANY, # conn was deleted - 'connections': unittest.mock.ANY, + msg = {'connector': mock.ANY, # conn was deleted + 'connections': mock.ANY, 'message': 'Unclosed connector'} if loop.get_debug(): - msg['source_traceback'] = unittest.mock.ANY + msg['source_traceback'] = mock.ANY exc_handler.assert_called_with(loop, msg) @@ -68,11 +68,11 @@ def test_del(loop): def test_del_with_scheduled_cleanup(loop): loop.set_debug(True) conn = aiohttp.BaseConnector(loop=loop, keepalive_timeout=0.01) - transp = unittest.mock.Mock() + transp = mock.Mock() conn._conns['a'] = [(transp, 'proto', 123)] conns_impl = conn._conns - exc_handler = unittest.mock.Mock() + exc_handler = mock.Mock() loop.set_exception_handler(exc_handler) with pytest.warns(ResourceWarning): @@ -84,20 +84,20 @@ def test_del_with_scheduled_cleanup(loop): assert not conns_impl transp.close.assert_called_with() - msg = {'connector': unittest.mock.ANY, # conn was deleted + msg = {'connector': mock.ANY, # conn was deleted 'message': 'Unclosed connector'} if loop.get_debug(): - msg['source_traceback'] = unittest.mock.ANY + msg['source_traceback'] = mock.ANY exc_handler.assert_called_with(loop, msg) def test_del_with_closed_loop(loop): conn = aiohttp.BaseConnector(loop=loop) - transp = unittest.mock.Mock() + transp = mock.Mock() conn._conns['a'] = [(transp, 'proto', 123)] conns_impl = conn._conns - exc_handler = unittest.mock.Mock() + exc_handler = mock.Mock() loop.set_exception_handler(exc_handler) loop.close() @@ -113,7 +113,7 @@ def test_del_with_closed_loop(loop): def test_del_empty_conector(loop): conn = aiohttp.BaseConnector(loop=loop) - exc_handler = unittest.mock.Mock() + exc_handler = mock.Mock() loop.set_exception_handler(exc_handler) del conn @@ -139,14 +139,14 @@ def test_context_manager(loop): def test_ctor_loop(): - with unittest.mock.patch('aiohttp.connector.asyncio') as m_asyncio: - session = aiohttp.BaseConnector(time_service=unittest.mock.Mock()) + with mock.patch('aiohttp.connector.asyncio') as m_asyncio: + session = aiohttp.BaseConnector() assert session._loop is m_asyncio.get_event_loop.return_value def test_close(loop): - proto = unittest.mock.Mock() + proto = mock.Mock() conn = aiohttp.BaseConnector(loop=loop) assert not conn.closed @@ -158,41 +158,11 @@ def test_close(loop): assert conn.closed -def test_close_time_service_owned(loop): - proto = unittest.mock.Mock() - - conn = aiohttp.BaseConnector(loop=loop) - assert not conn.closed - conn._conns[1] = [(proto, object())] - ts = conn._time_service = unittest.mock.Mock() - conn.close() - - assert not conn._conns - assert proto.close.called - assert conn.closed - assert ts.close.called - - -def test_close_time_service_unowned(loop): - proto = unittest.mock.Mock() - ts = unittest.mock.Mock() - - conn = aiohttp.BaseConnector(loop=loop, time_service=ts) - assert not conn.closed - conn._conns[1] = [(proto, object())] - conn.close() - - assert not conn._conns - assert proto.close.called - assert conn.closed - assert not ts.close.called - - def test_get(loop): conn = aiohttp.BaseConnector(loop=loop) assert conn._get(1) is None - proto = unittest.mock.Mock() + proto = mock.Mock() conn._conns[1] = [(proto, loop.time())] assert conn._get(1) == proto conn.close() @@ -202,7 +172,7 @@ def test_get_expired(loop): conn = aiohttp.BaseConnector(loop=loop) assert conn._get(('localhost', 80, False)) is None - proto = unittest.mock.Mock() + proto = mock.Mock() conn._conns[('localhost', 80, False)] = [(proto, loop.time() - 1000)] assert conn._get(('localhost', 80, False)) is None assert not conn._conns @@ -213,7 +183,7 @@ def test_get_expired_ssl(loop): conn = aiohttp.BaseConnector(loop=loop) assert conn._get(('localhost', 80, True)) is None - proto = unittest.mock.Mock() + proto = mock.Mock() conn._conns[('localhost', 80, True)] = [(proto, loop.time() - 1000)] assert conn._get(('localhost', 80, True)) is None assert not conn._conns @@ -222,9 +192,9 @@ def test_get_expired_ssl(loop): def test_release_acquired(loop, key): - proto = unittest.mock.Mock() + proto = mock.Mock() conn = aiohttp.BaseConnector(loop=loop, limit=5) - conn._release_waiter = unittest.mock.Mock() + conn._release_waiter = mock.Mock() conn._acquired.add(proto) conn._acquired_per_host[key].add(proto) @@ -241,9 +211,9 @@ def test_release_acquired(loop, key): def test_release_acquired_closed(loop, key): - proto = unittest.mock.Mock() + proto = mock.Mock() conn = aiohttp.BaseConnector(loop=loop, limit=5) - conn._release_waiter = unittest.mock.Mock() + conn._release_waiter = mock.Mock() conn._acquired.add(proto) conn._acquired_per_host[key].add(proto) @@ -259,10 +229,9 @@ def test_release(loop, key): loop.time = mock.Mock(return_value=10) conn = aiohttp.BaseConnector(loop=loop) - conn._release_waiter = unittest.mock.Mock() + conn._release_waiter = mock.Mock() - proto = unittest.mock.Mock() - proto.should_close = False + proto = mock.Mock(should_close=False) conn._acquired.add(proto) conn._acquired_per_host[key].add(proto) @@ -278,9 +247,9 @@ def test_release_ssl_transport(loop, ssl_key): loop.time = mock.Mock(return_value=10) conn = aiohttp.BaseConnector(loop=loop) - conn._release_waiter = unittest.mock.Mock() + conn._release_waiter = mock.Mock() - proto = unittest.mock.Mock() + proto = mock.Mock() conn._acquired.add(proto) conn._acquired_per_host[ssl_key].add(proto) @@ -292,13 +261,13 @@ def test_release_ssl_transport(loop, ssl_key): def test_release_already_closed(loop): conn = aiohttp.BaseConnector(loop=loop) - proto = unittest.mock.Mock() + proto = mock.Mock() key = 1 conn._acquired.add(proto) conn.close() - conn._release_waiters = unittest.mock.Mock() - conn._release_acquired = unittest.mock.Mock() + conn._release_waiters = mock.Mock() + conn._release_acquired = mock.Mock() conn._release(key, proto) assert not conn._release_waiters.called @@ -308,7 +277,7 @@ def test_release_already_closed(loop): def test_release_waiter(loop, key, key2): # limit is 0 conn = aiohttp.BaseConnector(limit=0, loop=loop) - w = unittest.mock.Mock() + w = mock.Mock() w.done.return_value = False conn._waiters[key].append(w) conn._release_waiter() @@ -318,7 +287,7 @@ def test_release_waiter(loop, key, key2): # release first available conn = aiohttp.BaseConnector(loop=loop) - w1, w2 = unittest.mock.Mock(), unittest.mock.Mock() + w1, w2 = mock.Mock(), mock.Mock() w1.done.return_value = False w2.done.return_value = False conn._waiters[key].append(w2) @@ -330,7 +299,7 @@ def test_release_waiter(loop, key, key2): # limited available conn = aiohttp.BaseConnector(loop=loop, limit=1) - w1, w2 = unittest.mock.Mock(), unittest.mock.Mock() + w1, w2 = mock.Mock(), mock.Mock() w1.done.return_value = False w2.done.return_value = False conn._waiters[key] = [w1, w2] @@ -341,7 +310,7 @@ def test_release_waiter(loop, key, key2): # limited available conn = aiohttp.BaseConnector(loop=loop, limit=1) - w1, w2 = unittest.mock.Mock(), unittest.mock.Mock() + w1, w2 = mock.Mock(), mock.Mock() w1.done.return_value = True w2.done.return_value = False conn._waiters[key] = [w1, w2] @@ -354,7 +323,7 @@ def test_release_waiter(loop, key, key2): def test_release_waiter_per_host(loop, key, key2): # no limit conn = aiohttp.BaseConnector(loop=loop, limit=0, limit_per_host=2) - w1, w2 = unittest.mock.Mock(), unittest.mock.Mock() + w1, w2 = mock.Mock(), mock.Mock() w1.done.return_value = False w2.done.return_value = False conn._waiters[key] = [w1] @@ -367,7 +336,7 @@ def test_release_waiter_per_host(loop, key, key2): def test_release_close(loop): conn = aiohttp.BaseConnector(loop=loop) - proto = unittest.mock.Mock(should_close=True) + proto = mock.Mock(should_close=True) key = ('localhost', 80, False) conn._acquired.add(proto) @@ -421,7 +390,7 @@ def test_release_close_do_not_add_to_pool(loop): conn = aiohttp.BaseConnector(loop=loop) key = ('127.0.0.1', 80, False) - proto = unittest.mock.Mock(should_close=True) + proto = mock.Mock(should_close=True) conn._acquired.add(proto) conn._release(key, proto) @@ -430,12 +399,12 @@ def test_release_close_do_not_add_to_pool(loop): def test_release_close_do_not_delete_existing_connections(loop): key = ('127.0.0.1', 80, False) - proto1 = unittest.mock.Mock() + proto1 = mock.Mock() conn = aiohttp.BaseConnector(loop=loop) conn._conns[key] = [(proto1, 1)] - proto = unittest.mock.Mock(should_close=True) + proto = mock.Mock(should_close=True) conn._acquired.add(proto) conn._release(key, proto) assert conn._conns[key] == [(proto1, 1)] @@ -446,7 +415,7 @@ def test_release_close_do_not_delete_existing_connections(loop): def test_release_not_started(loop): loop.time = mock.Mock(return_value=10) conn = aiohttp.BaseConnector(loop=loop) - proto = unittest.mock.Mock(should_close=False) + proto = mock.Mock(should_close=False) key = 1 conn._acquired.add(proto) conn._release(key, proto) @@ -458,7 +427,7 @@ def test_release_not_started(loop): def test_release_not_opened(loop): conn = aiohttp.BaseConnector(loop=loop) - proto = unittest.mock.Mock() + proto = mock.Mock() key = ('localhost', 80, False) conn._acquired.add(proto) conn._release(key, proto) @@ -467,7 +436,7 @@ def test_release_not_opened(loop): @asyncio.coroutine def test_connect(loop): - proto = unittest.mock.Mock() + proto = mock.Mock() proto.is_connected.return_value = True req = ClientRequest('GET', URL('http://host:80'), loop=loop) @@ -475,7 +444,7 @@ def test_connect(loop): conn = aiohttp.BaseConnector(loop=loop) key = ('host', 80, False) conn._conns[key] = [(proto, loop.time())] - conn._create_connection = unittest.mock.Mock() + conn._create_connection = mock.Mock() conn._create_connection.return_value = helpers.create_future(loop) conn._create_connection.return_value.set_result(proto) @@ -487,29 +456,16 @@ def test_connect(loop): connection.close() -@asyncio.coroutine -def test_connect_timeout(loop): - conn = aiohttp.BaseConnector(loop=loop) - conn._create_connection = unittest.mock.Mock() - conn._create_connection.return_value = helpers.create_future(loop) - conn._create_connection.return_value.set_exception( - asyncio.TimeoutError()) - - with pytest.raises(aiohttp.ServerTimeoutError): - req = unittest.mock.Mock() - yield from conn.connect(req) - - @asyncio.coroutine def test_connect_oserr(loop): conn = aiohttp.BaseConnector(loop=loop) - conn._create_connection = unittest.mock.Mock() + conn._create_connection = mock.Mock() conn._create_connection.return_value = helpers.create_future(loop) err = OSError(1, 'permission error') conn._create_connection.return_value.set_exception(err) with pytest.raises(aiohttp.ClientOSError) as ctx: - req = unittest.mock.Mock() + req = mock.Mock() yield from conn.connect(req) assert 1 == ctx.value.errno assert ctx.value.strerror.startswith('Cannot connect to') @@ -517,27 +473,27 @@ def test_connect_oserr(loop): def test_ctor_cleanup(): - loop = unittest.mock.Mock() + loop = mock.Mock() loop.time.return_value = 1.5 conn = aiohttp.BaseConnector(loop=loop, keepalive_timeout=10) - assert conn._cleanup_handle is not None + assert conn._cleanup_handle is None + assert conn._cleanup_closed_handle is not None def test_cleanup(): key = ('localhost', 80, False) testset = { - key: [(unittest.mock.Mock(), 10), - (unittest.mock.Mock(), 300)], + key: [(mock.Mock(), 10), + (mock.Mock(), 300)], } testset[key][0][0].is_connected.return_value = True testset[key][1][0].is_connected.return_value = False - loop = unittest.mock.Mock() - time_service = unittest.mock.Mock() - time_service.loop_time.return_value = 300 - conn = aiohttp.BaseConnector(loop=loop, time_service=time_service) + loop = mock.Mock() + loop.time.return_value = 300 + conn = aiohttp.BaseConnector(loop=loop) conn._conns = testset - existing_handle = conn._cleanup_handle = unittest.mock.Mock() + existing_handle = conn._cleanup_handle = mock.Mock() conn._cleanup() assert existing_handle.cancel.called @@ -546,16 +502,15 @@ def test_cleanup(): def test_cleanup_close_ssl_transport(): - proto = unittest.mock.Mock() + proto = mock.Mock() key = ('localhost', 80, True) testset = {key: [(proto, 10)]} - loop = unittest.mock.Mock() - time_service = unittest.mock.Mock() - time_service.loop_time.return_value = 300 - conn = aiohttp.BaseConnector(loop=loop, time_service=time_service) + loop = mock.Mock() + loop.time.return_value = 300 + conn = aiohttp.BaseConnector(loop=loop) conn._conns = testset - existing_handle = conn._cleanup_handle = unittest.mock.Mock() + existing_handle = conn._cleanup_handle = mock.Mock() conn._cleanup() assert existing_handle.cancel.called @@ -564,73 +519,65 @@ def test_cleanup_close_ssl_transport(): def test_cleanup2(): - testset = {1: [(unittest.mock.Mock(), 300)]} + testset = {1: [(mock.Mock(), 300)]} testset[1][0][0].is_connected.return_value = True - loop = unittest.mock.Mock() - time_service = unittest.mock.Mock() - time_service.loop_time.return_value = 300 + loop = mock.Mock() + loop.time.return_value = 300 - conn = aiohttp.BaseConnector( - loop=loop, keepalive_timeout=10, time_service=time_service) + conn = aiohttp.BaseConnector(loop=loop, keepalive_timeout=10) conn._conns = testset conn._cleanup() assert conn._conns == testset assert conn._cleanup_handle is not None - time_service.call_later.assert_called_with(5, conn._cleanup) + loop.call_at.assert_called_with(310, mock.ANY, mock.ANY) conn.close() def test_cleanup3(): key = ('localhost', 80, False) - testset = {key: [(unittest.mock.Mock(), 290.1), - (unittest.mock.Mock(), 305.1)]} + testset = {key: [(mock.Mock(), 290.1), + (mock.Mock(), 305.1)]} testset[key][0][0].is_connected.return_value = True - loop = unittest.mock.Mock() - time_service = unittest.mock.Mock() - time_service.loop_time.return_value = 308.5 + loop = mock.Mock() + loop.time.return_value = 308.5 - conn = aiohttp.BaseConnector( - loop=loop, keepalive_timeout=10, time_service=time_service) + conn = aiohttp.BaseConnector(loop=loop, keepalive_timeout=10) conn._conns = testset conn._cleanup() assert conn._conns == {key: [testset[key][1]]} assert conn._cleanup_handle is not None - time_service.call_later.assert_called_with(5, conn._cleanup) + loop.call_at.assert_called_with(319, mock.ANY, mock.ANY) conn.close() -def test_cleanup_closed(loop): - ts = unittest.mock.Mock() - conn = aiohttp.BaseConnector(loop=loop, time_service=ts) +def test_cleanup_closed(loop, mocker): + mocker.spy(loop, 'call_at') + conn = aiohttp.BaseConnector(loop=loop) - ts = conn._time_service = unittest.mock.Mock() - tr = unittest.mock.Mock() - conn._cleanup_closed_handle = cleanup_closed_handle = unittest.mock.Mock() + tr = mock.Mock() + conn._cleanup_closed_handle = cleanup_closed_handle = mock.Mock() conn._cleanup_closed_transports = [tr] conn._cleanup_closed() assert tr.abort.called assert not conn._cleanup_closed_transports - assert ts.call_later.called + assert loop.call_at.called assert cleanup_closed_handle.cancel.called -def test_cleanup_closed_disabled(loop): - ts = unittest.mock.Mock() +def test_cleanup_closed_disabled(loop, mocker): conn = aiohttp.BaseConnector( - loop=loop, time_service=ts, disable_cleanup_closed=True) + loop=loop, disable_cleanup_closed=True) - ts = conn._time_service = unittest.mock.Mock() - tr = unittest.mock.Mock() + tr = mock.Mock() conn._cleanup_closed_transports = [tr] conn._cleanup_closed() assert tr.abort.called assert not conn._cleanup_closed_transports - assert not ts.call_later.called def test_tcp_connector_ctor(loop): @@ -695,7 +642,7 @@ def test_respect_precreated_ssl_context(loop): def test_close_twice(loop): - proto = unittest.mock.Mock() + proto = mock.Mock() conn = aiohttp.BaseConnector(loop=loop) conn._conns[1] = [(proto, object())] @@ -712,13 +659,14 @@ def test_close_twice(loop): def test_close_cancels_cleanup_handle(loop): conn = aiohttp.BaseConnector(loop=loop) + conn._release(1, mock.Mock(should_close=False)) assert conn._cleanup_handle is not None conn.close() assert conn._cleanup_handle is None def test_close_abort_closed_transports(loop): - tr = unittest.mock.Mock() + tr = mock.Mock() conn = aiohttp.BaseConnector(loop=loop) conn._cleanup_closed_transports.append(tr) @@ -746,16 +694,16 @@ def test_ctor_with_default_loop(): @asyncio.coroutine def test_connect_with_limit(loop, key): - proto = unittest.mock.Mock() + proto = mock.Mock() proto.is_connected.return_value = True req = ClientRequest('GET', URL('http://localhost1:80'), loop=loop, - response_class=unittest.mock.Mock()) + response_class=mock.Mock()) conn = aiohttp.BaseConnector(loop=loop, limit=1) conn._conns[key] = [(proto, loop.time())] - conn._create_connection = unittest.mock.Mock() + conn._create_connection = mock.Mock() conn._create_connection.return_value = helpers.create_future(loop) conn._create_connection.return_value.set_result(proto) @@ -791,14 +739,14 @@ def f(): @asyncio.coroutine def test_connect_with_limit_and_limit_per_host(loop, key): - proto = unittest.mock.Mock() + proto = mock.Mock() proto.is_connected.return_value = True req = ClientRequest('GET', URL('http://localhost1:80'), loop=loop) conn = aiohttp.BaseConnector(loop=loop, limit=1000, limit_per_host=1) conn._conns[key] = [(proto, loop.time())] - conn._create_connection = unittest.mock.Mock() + conn._create_connection = mock.Mock() conn._create_connection.return_value = helpers.create_future(loop) conn._create_connection.return_value.set_result(proto) @@ -827,14 +775,14 @@ def f(): @asyncio.coroutine def test_connect_with_no_limit_and_limit_per_host(loop, key): - proto = unittest.mock.Mock() + proto = mock.Mock() proto.is_connected.return_value = True req = ClientRequest('GET', URL('http://localhost1:80'), loop=loop) conn = aiohttp.BaseConnector(loop=loop, limit=0, limit_per_host=1) conn._conns[key] = [(proto, loop.time())] - conn._create_connection = unittest.mock.Mock() + conn._create_connection = mock.Mock() conn._create_connection.return_value = helpers.create_future(loop) conn._create_connection.return_value.set_result(proto) @@ -861,14 +809,14 @@ def f(): @asyncio.coroutine def test_connect_with_no_limits(loop, key): - proto = unittest.mock.Mock() + proto = mock.Mock() proto.is_connected.return_value = True req = ClientRequest('GET', URL('http://localhost1:80'), loop=loop) conn = aiohttp.BaseConnector(loop=loop, limit=0, limit_per_host=0) conn._conns[key] = [(proto, loop.time())] - conn._create_connection = unittest.mock.Mock() + conn._create_connection = mock.Mock() conn._create_connection.return_value = helpers.create_future(loop) conn._create_connection.return_value.set_result(proto) @@ -896,7 +844,7 @@ def f(): @asyncio.coroutine def test_connect_with_limit_cancelled(loop): - proto = unittest.mock.Mock() + proto = mock.Mock() proto.is_connected.return_value = True req = ClientRequest('GET', URL('http://host:80'), loop=loop) @@ -904,7 +852,7 @@ def test_connect_with_limit_cancelled(loop): conn = aiohttp.BaseConnector(loop=loop, limit=1) key = ('host', 80, False) conn._conns[key] = [(proto, loop.time())] - conn._create_connection = unittest.mock.Mock() + conn._create_connection = mock.Mock() conn._create_connection.return_value = helpers.create_future(loop) conn._create_connection.return_value.set_result(proto) @@ -926,13 +874,13 @@ def test_connect_with_capacity_release_waiters(loop): def check_with_exc(err): conn = aiohttp.BaseConnector(limit=1, loop=loop) - conn._create_connection = unittest.mock.Mock() + conn._create_connection = mock.Mock() conn._create_connection.return_value = \ helpers.create_future(loop) conn._create_connection.return_value.set_exception(err) with pytest.raises(Exception): - req = unittest.mock.Mock() + req = mock.Mock() yield from conn.connect(req) assert not conn._waiters @@ -944,7 +892,7 @@ def check_with_exc(err): @asyncio.coroutine def test_connect_with_limit_concurrent(loop): - proto = unittest.mock.Mock() + proto = mock.Mock() proto.should_close = False proto.is_connected.return_value = True @@ -967,7 +915,7 @@ def create_connection(req): # Make a new transport mock each time because acquired # transports are stored in a set. Reusing the same object # messes with the count. - proto = unittest.mock.Mock(should_close=False) + proto = mock.Mock(should_close=False) proto.is_connected.return_value = True return proto @@ -1008,7 +956,7 @@ def f(start=True): @asyncio.coroutine def test_close_with_acquired_connection(loop): - proto = unittest.mock.Mock() + proto = mock.Mock() proto.is_connected.return_value = True req = ClientRequest('GET', URL('http://host:80'), loop=loop) @@ -1016,7 +964,7 @@ def test_close_with_acquired_connection(loop): conn = aiohttp.BaseConnector(loop=loop, limit=1) key = ('host', 80, False) conn._conns[key] = [(proto, loop.time())] - conn._create_connection = unittest.mock.Mock() + conn._create_connection = mock.Mock() conn._create_connection.return_value = helpers.create_future(loop) conn._create_connection.return_value.set_result(proto) @@ -1188,13 +1136,13 @@ def handler(request): session.close() def test_resolver_not_called_with_address_is_ip(self): - resolver = unittest.mock.MagicMock() + resolver = mock.MagicMock() connector = aiohttp.TCPConnector(resolver=resolver, loop=self.loop) req = ClientRequest('GET', URL('http://127.0.0.1:{}'.format(unused_port())), loop=self.loop, - response_class=unittest.mock.Mock()) + response_class=mock.Mock()) with self.assertRaises(OSError): self.loop.run_until_complete(connector.connect(req)) diff --git a/tests/test_helpers.py b/tests/test_helpers.py index be7aca8e0e9..4f51b335658 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -1,4 +1,3 @@ -import asyncio import datetime import sys from unittest import mock @@ -375,15 +374,6 @@ def test_stop(self, time_service): assert time_service._cb is None assert time_service._loop is None - def test_cancel_handles_on_stop(self, time_service): - def cb(x): - return x - - handle = time_service.call_later(10, cb, 'test') - time_service.close() - assert handle._cancelled - assert not time_service._scheduled - def test_double_stopping(self, time_service): time_service.close() time_service.close() @@ -410,80 +400,6 @@ def test_recalc_time(self, time_service, mocker): assert time_service._strtime is None assert time_service._time > 1234 assert time_service._count == 0 - assert time_service._loop.time.called - - def test_call_later(self, time_service): - time_service._loop.time = mock.Mock() - time_service._loop.time.return_value = 1477797232 - time_service._loop_time = 1477797232 - - called = 0 - - def cb(): - nonlocal called - called += 1 - - time_service.call_later(10, cb) - time_service.call_later(20, cb) - time_service._loop.time.return_value = 1477797232 + 11 - time_service._on_cb() - - assert called == 1 - - time_service._loop.time.return_value = 1477797232 + 21 - time_service._on_cb() - - assert called == 2 - assert not time_service._scheduled - - def test_call_cancel(self, time_service): - time_service._loop.time = mock.Mock() - time_service._loop.time.return_value = 1477797232 - time_service._loop_time = 1477797232 - - called = 0 - - def cb(): - nonlocal called - called += 1 - - handle = time_service.call_later(10, cb) - handle.cancel() - time_service._loop.time.return_value = 1477797232 + 11 - time_service._on_cb() - - assert called == 0 - assert not time_service._scheduled - - @asyncio.coroutine - def test_timeout(self, time_service, loop): - canceled_raised = False - - @asyncio.coroutine - def long_running_task(): - try: - yield from asyncio.sleep(10, loop=loop) - except asyncio.CancelledError: - nonlocal canceled_raised - canceled_raised = True - raise - - with pytest.raises(asyncio.TimeoutError): - with time_service.timeout(0.02): - yield from long_running_task() - assert canceled_raised, 'CancelledError was not raised' - - @asyncio.coroutine - def test_timeout_finish_in_time(self, time_service, loop): - @asyncio.coroutine - def long_running_task(): - yield from asyncio.sleep(0.01, loop=loop) - return 'done' - - with time_service.timeout(0.1): - resp = yield from long_running_task() - - assert resp == 'done' # ----------------------------------- FrozenList ---------------------- diff --git a/tests/test_web_websocket_functional.py b/tests/test_web_websocket_functional.py index ba2bffe92f8..49c584db342 100644 --- a/tests/test_web_websocket_functional.py +++ b/tests/test_web_websocket_functional.py @@ -2,11 +2,21 @@ import asyncio +import pytest + import aiohttp from aiohttp import helpers, web from aiohttp.http import WSMsgType +@pytest.fixture +def ceil(mocker): + def ceil(val): + return val + + mocker.patch('aiohttp.helpers.ceil').side_effect = ceil + + @asyncio.coroutine def test_websocket_json(loop, test_client): @asyncio.coroutine @@ -554,7 +564,7 @@ def handler(request): @asyncio.coroutine -def test_client_close_handshake(loop, test_client): +def test_client_close_handshake(loop, test_client, ceil): closed = helpers.create_future(loop) @@ -630,7 +640,6 @@ def handler(request): ws = web.WebSocketResponse(receive_timeout=0.1) yield from ws.prepare(request) - ws._time_service._interval = 0.05 try: yield from ws.receive() except asyncio.TimeoutError: @@ -659,7 +668,6 @@ def handler(request): ws = web.WebSocketResponse(receive_timeout=None) yield from ws.prepare(request) - ws._time_service._interval = 0.05 try: yield from ws.receive(0.1) except asyncio.TimeoutError: @@ -680,11 +688,9 @@ def handler(request): @asyncio.coroutine -def test_heartbeat(loop, test_client): +def test_heartbeat(loop, test_client, ceil): @asyncio.coroutine def handler(request): - request._time_service._interval = 0.1 - ws = web.WebSocketResponse(heartbeat=0.05) yield from ws.prepare(request) yield from ws.receive() @@ -704,7 +710,7 @@ def handler(request): @asyncio.coroutine -def test_heartbeat_no_pong(loop, test_client): +def test_heartbeat_no_pong(loop, test_client, ceil): cancelled = False @asyncio.coroutine