Skip to content

Commit

Permalink
Added autoping_interval parameter for websocket to automatically send…
Browse files Browse the repository at this point in the history
… ping message. #1024 #777
  • Loading branch information
Nikolay Kim committed Feb 1, 2017
1 parent a69ea90 commit aedc79e
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 50 deletions.
6 changes: 4 additions & 2 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ CHANGES
- Do not use readline when reading the content of a part
in the multipart reader #1535

- Added `receive_timeout` timeout for websocket to receive complete message. #1024 #1325

- Added `receive_timeout` timeout for websocket to receive complete message. #1325

- Added `autoping_interval` parameter for websocket to automatically send `ping` message. #1024 #777

- Remove `web.Application` dependency from `web.UrlDispatcher` #1510

- Accepting back-pressure from slow websocket clients #1367
Expand Down
7 changes: 6 additions & 1 deletion aiohttp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ def _request(self, method, url, *,
proxy=proxy, proxy_auth=proxy_auth, timeout=read_timeout)

# None conn_timeout is a Timeout no-op
print(conn_timeout)
with Timeout(conn_timeout, loop=self._loop):
conn = yield from self._connector.connect(req)
conn.writer.set_tcp_nodelay(True)
Expand Down Expand Up @@ -289,6 +290,7 @@ def ws_connect(self, url, *,
receive_timeout=None,
autoclose=True,
autoping=True,
autoping_interval=15.0,
auth=None,
origin=None,
headers=None,
Expand All @@ -302,6 +304,7 @@ def ws_connect(self, url, *,
receive_timeout=receive_timeout,
autoclose=autoclose,
autoping=autoping,
autoping_interval=autoping_interval,
auth=auth,
origin=origin,
headers=headers,
Expand All @@ -315,6 +318,7 @@ def _ws_connect(self, url, *,
receive_timeout=None,
autoclose=True,
autoping=True,
autoping_interval=15.0,
auth=None,
origin=None,
headers=None,
Expand Down Expand Up @@ -407,7 +411,8 @@ def _ws_connect(self, url, *,
autoping,
self._loop,
time_service=self.time_service,
receive_timeout=receive_timeout)
receive_timeout=receive_timeout,
autoping_interval=autoping_interval)

def _prepare_headers(self, headers):
""" Add default headers and transform it to CIMultiDict
Expand Down
41 changes: 40 additions & 1 deletion aiohttp/client_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ class ClientWebSocketResponse:

def __init__(self, reader, writer, protocol,
response, timeout, autoclose, autoping, loop, *,
time_service=None, receive_timeout=None):
time_service=None,
receive_timeout=None, autoping_interval=None):
self._response = response
self._conn = response.connection

Expand All @@ -29,10 +30,46 @@ def __init__(self, reader, writer, protocol,
self._receive_timeout = receive_timeout
self._autoclose = autoclose
self._autoping = autoping
self._autoping_interval = autoping_interval
self._autoping_interval_cb = None
self._pong_response_cb = None
self._loop = loop
self._waiting = False
self._exception = None

self._reset_autoping()

def _cancel_autoping(self):
if self._pong_response_cb is not None:
self._pong_response_cb.cancel()
self._pong_response_cb = None

if self._autoping_interval_cb is not None:
self._autoping_interval_cb.cancel()
self._autoping_interval_cb = None

def _reset_autoping(self):
self._cancel_autoping()

if self._autoping_interval is not None:
self._autoping_interval_cb = self._time_service.call_later(
self._autoping_interval, self._send_autoping)

def _send_autoping(self):
if self._autoping_interval is not None and not self._closed:
self.ping()

if self._pong_response_cb is not None:
self._pong_response_cb.cancel()
self._pong_response_cb = self._time_service.call_later(
self._autoping_interval/2.0, self._pong_not_received)

def _pong_not_received(self):
self._closed = True
self._close_code = 1006
self._exception = asyncio.TimeoutError()
self._response.close()

@property
def closed(self):
return self._closed
Expand Down Expand Up @@ -79,6 +116,7 @@ def send_json(self, data, *, dumps=json.dumps):
@asyncio.coroutine
def close(self, *, code=1000, message=b''):
if not self._closed:
self._cancel_autoping()
self._closed = True
try:
self._writer.close(code, message)
Expand Down Expand Up @@ -132,6 +170,7 @@ def receive(self, timeout=None):
with self._time_service.timeout(
timeout or self._receive_timeout):
msg = yield from self._reader.read()
self._reset_autoping()
except (asyncio.CancelledError, asyncio.TimeoutError):
raise
except WebSocketError as exc:
Expand Down
32 changes: 22 additions & 10 deletions aiohttp/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io
import os
import re
import time
import warnings
from collections import MutableSequence, namedtuple
from functools import total_ordering
Expand Down Expand Up @@ -592,11 +593,14 @@ def cancel(self):

class TimeService:

def __init__(self, loop):
def __init__(self, loop, *, interval=1.0):
self._loop = loop
self._time = loop.time()
self._interval = interval
self._time = time.time()
self._loop_time = loop.time()
self._count = 0
self._strtime = None
self._cb = loop.call_at(self._time + 1.0, self._on_cb)
self._cb = loop.call_at(self._loop_time + self._interval, self._on_cb)
self._scheduled = []

def stop(self):
Expand All @@ -611,12 +615,19 @@ def stop(self):
self._scheduled = []
self._loop = None

def _on_cb(self):
self._time = self._loop.time()
def _on_cb(self, reset_count=10*60):
self._loop_time = self._loop.time()

if self._count >= reset_count:
# reset timer every 10 minutes
self._count = 0
self._time = time.time()
else:
self._time += self._interval

# Handle 'later' callbacks that are ready.
ready = []
end_time = self._time
end_time = self._loop_time
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
Expand All @@ -629,7 +640,8 @@ def _on_cb(self):
handle._run()

self._strtime = None
self._cb = self._loop.call_later(self._time + 1.0, self._on_cb)
self._cb = self._loop.call_at(
self._loop_time + self._interval, self._on_cb)

def _format_date_time(self):
# Weekday and month names for HTTP date/time formatting;
Expand Down Expand Up @@ -668,12 +680,12 @@ def call_later(self, delay, callback, *args):
Time resolution is aproximatly one second.
"""
return self.call_at(self._time + delay, callback, *args)
return self._call_at(self._loop_time + delay, callback, *args)

def call_at(self, when, callback, *args):
def _call_at(self, when, callback, *args):
"""Like call_later(), but uses an absolute time.
Absolute time corresponds to the time service's time() method.
Absolute time corresponds to the loop's time() method.
"""
timer = TimerHandle(when, callback, args, self._loop)
heapq.heappush(self._scheduled, timer)
Expand Down
3 changes: 2 additions & 1 deletion aiohttp/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from aiohttp.client import _RequestContextManager

from . import ClientSession, hdrs
from .helpers import sentinel
from .helpers import sentinel, TimeService
from .protocol import HttpVersion, RawRequestMessage
from .signals import Signal
from .web import Application, Request, Server, UrlMappingMatchInfo
Expand Down Expand Up @@ -201,6 +201,7 @@ def __init__(self, app_or_server, *, scheme=sentinel, host=sentinel,
if cookie_jar is None:
cookie_jar = aiohttp.CookieJar(unsafe=True,
loop=self._loop)
kwargs['time_service'] = TimeService(self._loop, interval=0.1)
self._session = ClientSession(loop=self._loop,
cookie_jar=cookie_jar,
**kwargs)
Expand Down
75 changes: 56 additions & 19 deletions aiohttp/web_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import warnings
from collections import namedtuple

from . import Timeout, hdrs
from . import hdrs
from ._ws_impl import (CLOSED_MESSAGE, WebSocketError, WSMessage, WSMsgType,
do_handshake)
from .errors import ClientDisconnectedError, HttpProcessingError
Expand Down Expand Up @@ -33,7 +33,8 @@ class WebSocketResponse(StreamResponse):

def __init__(self, *,
timeout=10.0, receive_timeout=None,
autoclose=True, autoping=True, protocols=()):
autoclose=True, autoping=True, autoping_interval=15.0,
protocols=()):
super().__init__(status=101)
self._protocols = protocols
self._protocol = None
Expand All @@ -50,8 +51,44 @@ def __init__(self, *,
self._receive_timeout = receive_timeout
self._autoclose = autoclose
self._autoping = autoping
self._autoping_interval = autoping_interval
self._autoping_interval_cb = None
self._pong_response_cb = None
self._time_service = None

def _cancel_autoping(self):
if self._pong_response_cb is not None:
self._pong_response_cb.cancel()
self._pong_response_cb = None

if self._autoping_interval_cb is not None:
self._autoping_interval_cb.cancel()
self._autoping_interval_cb = None

def _reset_autoping(self):
self._cancel_autoping()

if self._autoping_interval is not None:
self._autoping_interval_cb = self._time_service.call_later(
self._autoping_interval, self._send_autoping)

def _send_autoping(self):
if self._autoping_interval is not None and not self._closed:
self.ping()

if self._pong_response_cb is not None:
self._pong_response_cb.cancel()
self._pong_response_cb = self._time_service.call_later(
self._autoping_interval/2.0, self._pong_not_received)

def _pong_not_received(self):
self._closed = True
self._close_code = 1006
self._exception = asyncio.TimeoutError()

if self._req is not None:
self._req.transport.close()

@asyncio.coroutine
def prepare(self, request):
# make pre-check to don't hide it by do_handshake() exceptions
Expand Down Expand Up @@ -79,6 +116,7 @@ def _pre_start(self, request):
raise HTTPInternalServerError() from err

self._time_service = request.time_service
self._reset_autoping()

if self.status != status:
self.set_status(status)
Expand Down Expand Up @@ -189,6 +227,7 @@ def close(self, *, code=1000, message=b''):
raise RuntimeError('Call .prepare() first')

if not self._closed:
self._cancel_autoping()
self._closed = True
try:
self._writer.close(code, message)
Expand All @@ -204,23 +243,20 @@ def close(self, *, code=1000, message=b''):
if self._closing:
return True

begin = self._loop.time()
while self._loop.time() - begin < self._timeout:
try:
with Timeout(timeout=self._timeout,
loop=self._loop):
msg = yield from self._reader.read()
except asyncio.CancelledError:
self._close_code = 1006
raise
except Exception as exc:
self._close_code = 1006
self._exception = exc
return True
try:
with self._time_service.timeout(self._timeout):
msg = yield from self._reader.read()
except asyncio.CancelledError:
self._close_code = 1006
raise
except Exception as exc:
self._close_code = 1006
self._exception = exc
return True

if msg.type == WSMsgType.CLOSE:
self._close_code = msg.data
return True
if msg.type == WSMsgType.CLOSE:
self._close_code = msg.data
return True

self._close_code = 1006
self._exception = asyncio.TimeoutError()
Expand Down Expand Up @@ -248,7 +284,8 @@ def receive(self, timeout=None):
with self._time_service.timeout(
timeout or self._receive_timeout):
msg = yield from self._reader.read()
except (asyncio.CancelledError, asyncio.TimeoutError):
self._reset_autoping()
except (asyncio.CancelledError, asyncio.TimeoutError) as exc:
raise
except WebSocketError as exc:
self._close_code = exc.code
Expand Down
5 changes: 5 additions & 0 deletions docs/client_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ The client session supports the context manager protocol for self closing.
auth=None,\
autoclose=True,\
autoping=True,\
autoping_interval=15.0,\
origin=None, \
proxy=None, proxy_auth=None)
:async-with:
Expand Down Expand Up @@ -417,6 +418,10 @@ The client session supports the context manager protocol for self closing.
:param bool autoping: automatically send `pong` on `ping`
message from server

:param float autoping_interval: Send `ping` message every `autoping_interval` seconds
and wait `pong` response, if `pong` response is not received
then close connection.

:param str origin: Origin header to send to server

:param str proxy: Proxy URL, :class:`str` or :class:`~yarl.URL` (optional)
Expand Down
4 changes: 4 additions & 0 deletions docs/web_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,10 @@ WebSocketResponse

.. versionadded:: 1.3.0

:param float autoping_interval: Send `ping` message every `autoping_interval` seconds
and wait `pong` response, if `pong` response is not received
then close connection.

:param float receive_timeout: Timeout value for `receive` operations.
Default value is None (no timeout for receive operation)

Expand Down
Loading

0 comments on commit aedc79e

Please sign in to comment.