From aedc79e8319e5accc7808238f5dbc7077f36ba5f Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 31 Jan 2017 18:28:24 -0800 Subject: [PATCH] Added autoping_interval parameter for websocket to automatically send ping message. #1024 #777 --- CHANGES.rst | 6 ++- aiohttp/client.py | 7 ++- aiohttp/client_ws.py | 41 +++++++++++++- aiohttp/helpers.py | 32 +++++++---- aiohttp/test_utils.py | 3 +- aiohttp/web_ws.py | 75 +++++++++++++++++++------- docs/client_reference.rst | 5 ++ docs/web_reference.rst | 4 ++ tests/test_client_ws_functional.py | 68 ++++++++++++++++++++--- tests/test_helpers.py | 11 ++-- tests/test_web_websocket_functional.py | 67 +++++++++++++++++++++-- 11 files changed, 269 insertions(+), 50 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 9487ee69f0f..0e8a3497f57 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -15,8 +15,10 @@ CHANGES - Do not use readline when reading the content of a part in the multipart reader #1535 -- Added `receive_timeout` timeout for websocket to receive complete message. #1024 #1325 - +- Added `receive_timeout` timeout for websocket to receive complete message. #1325 + +- Added `autoping_interval` parameter for websocket to automatically send `ping` message. #1024 #777 + - Remove `web.Application` dependency from `web.UrlDispatcher` #1510 - Accepting back-pressure from slow websocket clients #1367 diff --git a/aiohttp/client.py b/aiohttp/client.py index 99bb2662f11..323aba13ee1 100644 --- a/aiohttp/client.py +++ b/aiohttp/client.py @@ -213,6 +213,7 @@ def _request(self, method, url, *, proxy=proxy, proxy_auth=proxy_auth, timeout=read_timeout) # None conn_timeout is a Timeout no-op + print(conn_timeout) with Timeout(conn_timeout, loop=self._loop): conn = yield from self._connector.connect(req) conn.writer.set_tcp_nodelay(True) @@ -289,6 +290,7 @@ def ws_connect(self, url, *, receive_timeout=None, autoclose=True, autoping=True, + autoping_interval=15.0, auth=None, origin=None, headers=None, @@ -302,6 +304,7 @@ def ws_connect(self, url, *, receive_timeout=receive_timeout, autoclose=autoclose, autoping=autoping, + autoping_interval=autoping_interval, auth=auth, origin=origin, headers=headers, @@ -315,6 +318,7 @@ def _ws_connect(self, url, *, receive_timeout=None, autoclose=True, autoping=True, + autoping_interval=15.0, auth=None, origin=None, headers=None, @@ -407,7 +411,8 @@ def _ws_connect(self, url, *, autoping, self._loop, time_service=self.time_service, - receive_timeout=receive_timeout) + receive_timeout=receive_timeout, + autoping_interval=autoping_interval) def _prepare_headers(self, headers): """ Add default headers and transform it to CIMultiDict diff --git a/aiohttp/client_ws.py b/aiohttp/client_ws.py index a6abf82fd4e..cf3af27676a 100644 --- a/aiohttp/client_ws.py +++ b/aiohttp/client_ws.py @@ -14,7 +14,8 @@ class ClientWebSocketResponse: def __init__(self, reader, writer, protocol, response, timeout, autoclose, autoping, loop, *, - time_service=None, receive_timeout=None): + time_service=None, + receive_timeout=None, autoping_interval=None): self._response = response self._conn = response.connection @@ -29,10 +30,46 @@ def __init__(self, reader, writer, protocol, self._receive_timeout = receive_timeout self._autoclose = autoclose self._autoping = autoping + self._autoping_interval = autoping_interval + self._autoping_interval_cb = None + self._pong_response_cb = None self._loop = loop self._waiting = False self._exception = None + self._reset_autoping() + + def _cancel_autoping(self): + if self._pong_response_cb is not None: + self._pong_response_cb.cancel() + self._pong_response_cb = None + + if self._autoping_interval_cb is not None: + self._autoping_interval_cb.cancel() + self._autoping_interval_cb = None + + def _reset_autoping(self): + self._cancel_autoping() + + if self._autoping_interval is not None: + self._autoping_interval_cb = self._time_service.call_later( + self._autoping_interval, self._send_autoping) + + def _send_autoping(self): + if self._autoping_interval is not None and not self._closed: + self.ping() + + if self._pong_response_cb is not None: + self._pong_response_cb.cancel() + self._pong_response_cb = self._time_service.call_later( + self._autoping_interval/2.0, self._pong_not_received) + + def _pong_not_received(self): + self._closed = True + self._close_code = 1006 + self._exception = asyncio.TimeoutError() + self._response.close() + @property def closed(self): return self._closed @@ -79,6 +116,7 @@ def send_json(self, data, *, dumps=json.dumps): @asyncio.coroutine def close(self, *, code=1000, message=b''): if not self._closed: + self._cancel_autoping() self._closed = True try: self._writer.close(code, message) @@ -132,6 +170,7 @@ def receive(self, timeout=None): with self._time_service.timeout( timeout or self._receive_timeout): msg = yield from self._reader.read() + self._reset_autoping() except (asyncio.CancelledError, asyncio.TimeoutError): raise except WebSocketError as exc: diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index cebc86f7a71..e5689266155 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -10,6 +10,7 @@ import io import os import re +import time import warnings from collections import MutableSequence, namedtuple from functools import total_ordering @@ -592,11 +593,14 @@ def cancel(self): class TimeService: - def __init__(self, loop): + def __init__(self, loop, *, interval=1.0): self._loop = loop - self._time = loop.time() + self._interval = interval + self._time = time.time() + self._loop_time = loop.time() + self._count = 0 self._strtime = None - self._cb = loop.call_at(self._time + 1.0, self._on_cb) + self._cb = loop.call_at(self._loop_time + self._interval, self._on_cb) self._scheduled = [] def stop(self): @@ -611,12 +615,19 @@ def stop(self): self._scheduled = [] self._loop = None - def _on_cb(self): - self._time = self._loop.time() + def _on_cb(self, reset_count=10*60): + self._loop_time = self._loop.time() + + if self._count >= reset_count: + # reset timer every 10 minutes + self._count = 0 + self._time = time.time() + else: + self._time += self._interval # Handle 'later' callbacks that are ready. ready = [] - end_time = self._time + end_time = self._loop_time while self._scheduled: handle = self._scheduled[0] if handle._when >= end_time: @@ -629,7 +640,8 @@ def _on_cb(self): handle._run() self._strtime = None - self._cb = self._loop.call_later(self._time + 1.0, self._on_cb) + self._cb = self._loop.call_at( + self._loop_time + self._interval, self._on_cb) def _format_date_time(self): # Weekday and month names for HTTP date/time formatting; @@ -668,12 +680,12 @@ def call_later(self, delay, callback, *args): Time resolution is aproximatly one second. """ - return self.call_at(self._time + delay, callback, *args) + return self._call_at(self._loop_time + delay, callback, *args) - def call_at(self, when, callback, *args): + def _call_at(self, when, callback, *args): """Like call_later(), but uses an absolute time. - Absolute time corresponds to the time service's time() method. + Absolute time corresponds to the loop's time() method. """ timer = TimerHandle(when, callback, args, self._loop) heapq.heappush(self._scheduled, timer) diff --git a/aiohttp/test_utils.py b/aiohttp/test_utils.py index 1cc60097fec..8bed1353198 100644 --- a/aiohttp/test_utils.py +++ b/aiohttp/test_utils.py @@ -18,7 +18,7 @@ from aiohttp.client import _RequestContextManager from . import ClientSession, hdrs -from .helpers import sentinel +from .helpers import sentinel, TimeService from .protocol import HttpVersion, RawRequestMessage from .signals import Signal from .web import Application, Request, Server, UrlMappingMatchInfo @@ -201,6 +201,7 @@ def __init__(self, app_or_server, *, scheme=sentinel, host=sentinel, if cookie_jar is None: cookie_jar = aiohttp.CookieJar(unsafe=True, loop=self._loop) + kwargs['time_service'] = TimeService(self._loop, interval=0.1) self._session = ClientSession(loop=self._loop, cookie_jar=cookie_jar, **kwargs) diff --git a/aiohttp/web_ws.py b/aiohttp/web_ws.py index bff1fa72fc4..68a19754a5c 100644 --- a/aiohttp/web_ws.py +++ b/aiohttp/web_ws.py @@ -4,7 +4,7 @@ import warnings from collections import namedtuple -from . import Timeout, hdrs +from . import hdrs from ._ws_impl import (CLOSED_MESSAGE, WebSocketError, WSMessage, WSMsgType, do_handshake) from .errors import ClientDisconnectedError, HttpProcessingError @@ -33,7 +33,8 @@ class WebSocketResponse(StreamResponse): def __init__(self, *, timeout=10.0, receive_timeout=None, - autoclose=True, autoping=True, protocols=()): + autoclose=True, autoping=True, autoping_interval=15.0, + protocols=()): super().__init__(status=101) self._protocols = protocols self._protocol = None @@ -50,8 +51,44 @@ def __init__(self, *, self._receive_timeout = receive_timeout self._autoclose = autoclose self._autoping = autoping + self._autoping_interval = autoping_interval + self._autoping_interval_cb = None + self._pong_response_cb = None self._time_service = None + def _cancel_autoping(self): + if self._pong_response_cb is not None: + self._pong_response_cb.cancel() + self._pong_response_cb = None + + if self._autoping_interval_cb is not None: + self._autoping_interval_cb.cancel() + self._autoping_interval_cb = None + + def _reset_autoping(self): + self._cancel_autoping() + + if self._autoping_interval is not None: + self._autoping_interval_cb = self._time_service.call_later( + self._autoping_interval, self._send_autoping) + + def _send_autoping(self): + if self._autoping_interval is not None and not self._closed: + self.ping() + + if self._pong_response_cb is not None: + self._pong_response_cb.cancel() + self._pong_response_cb = self._time_service.call_later( + self._autoping_interval/2.0, self._pong_not_received) + + def _pong_not_received(self): + self._closed = True + self._close_code = 1006 + self._exception = asyncio.TimeoutError() + + if self._req is not None: + self._req.transport.close() + @asyncio.coroutine def prepare(self, request): # make pre-check to don't hide it by do_handshake() exceptions @@ -79,6 +116,7 @@ def _pre_start(self, request): raise HTTPInternalServerError() from err self._time_service = request.time_service + self._reset_autoping() if self.status != status: self.set_status(status) @@ -189,6 +227,7 @@ def close(self, *, code=1000, message=b''): raise RuntimeError('Call .prepare() first') if not self._closed: + self._cancel_autoping() self._closed = True try: self._writer.close(code, message) @@ -204,23 +243,20 @@ def close(self, *, code=1000, message=b''): if self._closing: return True - begin = self._loop.time() - while self._loop.time() - begin < self._timeout: - try: - with Timeout(timeout=self._timeout, - loop=self._loop): - msg = yield from self._reader.read() - except asyncio.CancelledError: - self._close_code = 1006 - raise - except Exception as exc: - self._close_code = 1006 - self._exception = exc - return True + try: + with self._time_service.timeout(self._timeout): + msg = yield from self._reader.read() + except asyncio.CancelledError: + self._close_code = 1006 + raise + except Exception as exc: + self._close_code = 1006 + self._exception = exc + return True - if msg.type == WSMsgType.CLOSE: - self._close_code = msg.data - return True + if msg.type == WSMsgType.CLOSE: + self._close_code = msg.data + return True self._close_code = 1006 self._exception = asyncio.TimeoutError() @@ -248,7 +284,8 @@ def receive(self, timeout=None): with self._time_service.timeout( timeout or self._receive_timeout): msg = yield from self._reader.read() - except (asyncio.CancelledError, asyncio.TimeoutError): + self._reset_autoping() + except (asyncio.CancelledError, asyncio.TimeoutError) as exc: raise except WebSocketError as exc: self._close_code = exc.code diff --git a/docs/client_reference.rst b/docs/client_reference.rst index 3598b697b96..7a320c3993f 100644 --- a/docs/client_reference.rst +++ b/docs/client_reference.rst @@ -390,6 +390,7 @@ The client session supports the context manager protocol for self closing. auth=None,\ autoclose=True,\ autoping=True,\ + autoping_interval=15.0,\ origin=None, \ proxy=None, proxy_auth=None) :async-with: @@ -417,6 +418,10 @@ The client session supports the context manager protocol for self closing. :param bool autoping: automatically send `pong` on `ping` message from server + :param float autoping_interval: Send `ping` message every `autoping_interval` seconds + and wait `pong` response, if `pong` response is not received + then close connection. + :param str origin: Origin header to send to server :param str proxy: Proxy URL, :class:`str` or :class:`~yarl.URL` (optional) diff --git a/docs/web_reference.rst b/docs/web_reference.rst index 2572d2e54d0..a62fcb22dc0 100644 --- a/docs/web_reference.rst +++ b/docs/web_reference.rst @@ -876,6 +876,10 @@ WebSocketResponse .. versionadded:: 1.3.0 + :param float autoping_interval: Send `ping` message every `autoping_interval` seconds + and wait `pong` response, if `pong` response is not received + then close connection. + :param float receive_timeout: Timeout value for `receive` operations. Default value is None (no timeout for receive operation) diff --git a/tests/test_client_ws_functional.py b/tests/test_client_ws_functional.py index 936bc8ccdfd..059aa1291c8 100644 --- a/tests/test_client_ws_functional.py +++ b/tests/test_client_ws_functional.py @@ -478,8 +478,7 @@ def test_receive_timeout(loop, test_client): def handler(request): ws = web.WebSocketResponse() yield from ws.prepare(request) - yield from ws.receive_str() - yield from asyncio.sleep(1.1, loop=request.app.loop) + yield from ws.receive() yield from ws.close() return ws @@ -488,10 +487,10 @@ def handler(request): client = yield from test_client(app) resp = yield from client.ws_connect('/', receive_timeout=0.1) - resp.send_str('ask') + resp._time_service._interval = 0.05 with pytest.raises(asyncio.TimeoutError): - yield from resp.receive(0.1) + yield from resp.receive(0.05) yield from resp.close() @@ -503,8 +502,7 @@ def test_custom_receive_timeout(loop, test_client): def handler(request): ws = web.WebSocketResponse() yield from ws.prepare(request) - yield from ws.receive_str() - yield from asyncio.sleep(1.1, loop=request.app.loop) + yield from ws.receive() yield from ws.close() return ws @@ -513,9 +511,63 @@ def handler(request): client = yield from test_client(app) resp = yield from client.ws_connect('/') - resp.send_str('ask') + resp._time_service._interval = 0.05 with pytest.raises(asyncio.TimeoutError): - yield from resp.receive(0.1) + yield from resp.receive(0.05) yield from resp.close() + + +@asyncio.coroutine +def test_autoping_interval(loop, test_client): + ping_received = False + + @asyncio.coroutine + def handler(request): + nonlocal ping_received + ws = web.WebSocketResponse(autoping=False) + yield from ws.prepare(request) + msg = yield from ws.receive() + if msg.type == aiohttp.WSMsgType.ping: + ping_received = True + yield from ws.close() + return ws + + app = web.Application(loop=loop) + app.router.add_route('GET', '/', handler) + + client = yield from test_client(app) + resp = yield from client.ws_connect('/', autoping_interval=0.01) + + yield from resp.receive() + yield from resp.close() + + assert ping_received + + +@asyncio.coroutine +def test_autoping_interval_no_pong(loop, test_client): + ping_received = False + + @asyncio.coroutine + def handler(request): + nonlocal ping_received + ws = web.WebSocketResponse(autoping=False) + yield from ws.prepare(request) + msg = yield from ws.receive() + if msg.type == aiohttp.WSMsgType.ping: + ping_received = True + yield from ws.receive() + return ws + + app = web.Application(loop=loop) + app.router.add_route('GET', '/', handler) + + client = yield from test_client(app) + resp = yield from client.ws_connect('/', autoping_interval=0.05) + + yield from resp.receive() + yield from resp.receive() + + assert ping_received diff --git a/tests/test_helpers.py b/tests/test_helpers.py index c6934ac73e5..3acd2cded53 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -409,7 +409,7 @@ def test_is_ip_address_invalid_type(): @pytest.fixture def time_service(loop): - return helpers.TimeService(loop) + return helpers.TimeService(loop, interval=0.1) class TestTimeService: @@ -454,14 +454,17 @@ def test_recalc_time(self, time_service, mocker): time_service._time = 123 time_service._strtime = 'asd' + time_service._count = 1000000 time_service._on_cb() assert time_service._strtime is None + assert time_service._time > 1234 + assert time_service._count == 0 assert time_service._loop.time.called def test_call_later(self, time_service): time_service._loop.time = mock.Mock() time_service._loop.time.return_value = 1477797232 - time_service._time = 1477797232 + time_service._loop_time = 1477797232 called = 0 @@ -485,7 +488,7 @@ def cb(): def test_call_cancel(self, time_service): time_service._loop.time = mock.Mock() time_service._loop.time.return_value = 1477797232 - time_service._time = 1477797232 + time_service._loop_time = 1477797232 called = 0 @@ -515,7 +518,7 @@ def long_running_task(): raise with pytest.raises(asyncio.TimeoutError): - with time_service.timeout(0.01): + with time_service.timeout(0.02): yield from long_running_task() assert canceled_raised, 'CancelledError was not raised' diff --git a/tests/test_web_websocket_functional.py b/tests/test_web_websocket_functional.py index 1e3e0b9c824..e6e6379133a 100644 --- a/tests/test_web_websocket_functional.py +++ b/tests/test_web_websocket_functional.py @@ -573,9 +573,10 @@ def test_receive_timeout(loop, test_client): @asyncio.coroutine def handler(request): - ws = web.WebSocketResponse(receive_timeout=1.0) + ws = web.WebSocketResponse(receive_timeout=0.1) yield from ws.prepare(request) + ws._time_service._interval = 0.05 try: yield from ws.receive() except asyncio.TimeoutError: @@ -590,7 +591,7 @@ def handler(request): client = yield from test_client(app) ws = yield from client.ws_connect('/') - yield from asyncio.sleep(1.06, loop=loop) + yield from ws.receive() yield from ws.close() assert raised @@ -604,8 +605,9 @@ def handler(request): ws = web.WebSocketResponse(receive_timeout=None) yield from ws.prepare(request) + ws._time_service._interval = 0.05 try: - yield from ws.receive(1.0) + yield from ws.receive(0.1) except asyncio.TimeoutError: nonlocal raised raised = True @@ -618,6 +620,63 @@ def handler(request): client = yield from test_client(app) ws = yield from client.ws_connect('/') - yield from asyncio.sleep(1.06, loop=loop) + yield from ws.receive() yield from ws.close() assert raised + + +@asyncio.coroutine +def test_autoping_interval(loop, test_client): + @asyncio.coroutine + def handler(request): + print('server', request._time_service) + request._time_service._interval = 0.1 + + ws = web.WebSocketResponse(autoping_interval=0.05) + yield from ws.prepare(request) + yield from ws.receive() + yield from ws.close() + return ws + + app = web.Application(loop=loop) + app.router.add_get('/', handler) + + client = yield from test_client(app) + ws = yield from client.ws_connect('/', autoping=False) + msg = yield from ws.receive() + + assert msg.type == aiohttp.WSMsgType.ping + + yield from ws.close() + + +@asyncio.coroutine +def test_autoping_interval_no_pong(loop, test_client): + cancelled = False + + @asyncio.coroutine + def handler(request): + nonlocal cancelled + request._time_service._interval = 0.1 + request._time_service._on_cb() + + ws = web.WebSocketResponse(autoping_interval=0.05) + yield from ws.prepare(request) + + try: + yield from ws.receive() + except asyncio.CancelledError: + cancelled = True + + return ws + + app = web.Application(loop=loop) + app.router.add_get('/', handler) + + client = yield from test_client(app) + ws = yield from client.ws_connect('/', autoping=False) + msg = yield from ws.receive() + assert msg.type == aiohttp.WSMsgType.ping + yield from ws.receive() + + assert cancelled