From acac4db0c064d2618d75f27a58384c97c16458fd Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Sat, 1 Jun 2019 08:38:01 -0700 Subject: [PATCH] Use nonblocking sockets instead of selectors for healthy connections This replaces the work in 3.2.0 to use nonblocking sockets instead of selectors. Selectors proved to be problematic for some environments including eventlet and gevent. Nonblocking sockets should be available in all environments. --- CHANGES | 5 +- redis/__init__.py | 2 +- redis/_compat.py | 4 +- redis/connection.py | 155 ++++++++++++++++++--------- redis/selector.py | 196 ---------------------------------- tests/test_connection_pool.py | 4 +- tests/test_pubsub.py | 13 +++ tests/test_selector.py | 122 --------------------- 8 files changed, 128 insertions(+), 373 deletions(-) delete mode 100644 redis/selector.py delete mode 100644 tests/test_selector.py diff --git a/CHANGES b/CHANGES index 5256faf71a..e2d06c18b7 100644 --- a/CHANGES +++ b/CHANGES @@ -1,4 +1,4 @@ -* 3.2.2 (in development) +* 3.3.0 (in development) * Resolve a race condition with the PubSubWorkerThread. #1150 * Cleanup socket read error messages. Thanks Vic Yu. #1159 * Cleanup the Connection's selector correctly. Thanks Bruce Merry. #1153 @@ -17,6 +17,9 @@ cause the connection to be disconnected and cleaned up appropriately. #923 * Add READONLY and READWRITE commands. Thanks @theodesp. #1114 + * Remove selectors in favor of nonblocking sockets. Selectors had + issues in some environments including eventlet and gevent. This should + resolve those issues with no other side effects. * 3.2.1 * Fix SentinelConnectionPool to work in multiprocess/forked environments. * 3.2.0 diff --git a/redis/__init__.py b/redis/__init__.py index b74c403c1e..2135fd82e5 100644 --- a/redis/__init__.py +++ b/redis/__init__.py @@ -29,7 +29,7 @@ def int_or_str(value): return value -__version__ = '3.2.1' +__version__ = '3.3.dev2' VERSION = tuple(map(int_or_str, __version__.split('.'))) __all__ = [ diff --git a/redis/_compat.py b/redis/_compat.py index bde6fb6195..d70af2ac2c 100644 --- a/redis/_compat.py +++ b/redis/_compat.py @@ -1,12 +1,12 @@ """Internal module for Python 2 backwards compatibility.""" import errno +import socket import sys # For Python older than 3.5, retry EINTR. if sys.version_info[0] < 3 or (sys.version_info[0] == 3 and sys.version_info[1] < 5): # Adapted from https://bugs.python.org/review/23863/patch/14532/54418 - import socket import time # Wrapper for handling interruptable system calls. @@ -100,6 +100,7 @@ def byte_to_chr(x): basestring = basestring unicode = unicode long = long + BlockingIOError = socket.error else: from urllib.parse import parse_qs, unquote, urlparse from string import ascii_letters @@ -129,6 +130,7 @@ def nativestr(x): unicode = str safe_unicode = str long = int + BlockingIOError = BlockingIOError try: # Python 3 from queue import LifoQueue, Empty, Full diff --git a/redis/connection.py b/redis/connection.py index 88286c8ada..7d4301a4db 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -1,5 +1,6 @@ from __future__ import unicode_literals from distutils.version import StrictVersion +from errno import EWOULDBLOCK from itertools import chain import io import os @@ -17,7 +18,7 @@ from redis._compat import (xrange, imap, byte_to_chr, unicode, long, nativestr, basestring, iteritems, LifoQueue, Empty, Full, urlparse, parse_qs, - recv, recv_into, unquote) + recv, recv_into, unquote, BlockingIOError) from redis.exceptions import ( DataError, RedisError, @@ -31,7 +32,6 @@ ExecAbortError, ReadOnlyError ) -from redis.selector import DefaultSelector from redis.utils import HIREDIS_AVAILABLE if HIREDIS_AVAILABLE: import hiredis @@ -61,6 +61,8 @@ SERVER_CLOSED_CONNECTION_ERROR = "Connection closed by server." +SENTINEL = object() + class Encoder(object): "Encode strings to bytes and decode bytes to strings" @@ -126,9 +128,10 @@ def parse_error(self, response): class SocketBuffer(object): - def __init__(self, socket, socket_read_size): + def __init__(self, socket, socket_read_size, socket_timeout): self._sock = socket self.socket_read_size = socket_read_size + self.socket_timeout = socket_timeout self._buffer = io.BytesIO() # number of bytes written to the buffer from the socket self.bytes_written = 0 @@ -139,25 +142,51 @@ def __init__(self, socket, socket_read_size): def length(self): return self.bytes_written - self.bytes_read - def _read_from_socket(self, length=None): + def _read_from_socket(self, length=None, timeout=SENTINEL, + raise_on_timeout=True): + sock = self._sock socket_read_size = self.socket_read_size buf = self._buffer buf.seek(self.bytes_written) marker = 0 + custom_timeout = timeout is not SENTINEL - while True: - data = recv(self._sock, socket_read_size) - # an empty string indicates the server shutdown the socket - if isinstance(data, bytes) and len(data) == 0: - raise socket.error(SERVER_CLOSED_CONNECTION_ERROR) - buf.write(data) - data_length = len(data) - self.bytes_written += data_length - marker += data_length - - if length is not None and length > marker: - continue - break + try: + if custom_timeout: + sock.settimeout(timeout) + while True: + data = recv(self._sock, socket_read_size) + # an empty string indicates the server shutdown the socket + if isinstance(data, bytes) and len(data) == 0: + raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) + buf.write(data) + data_length = len(data) + self.bytes_written += data_length + marker += data_length + + if length is not None and length > marker: + continue + return True + except BlockingIOError as ex: + # if we're in nonblocking mode and the recv raises a + # blocking error, simply return False indicating that + # there's no data to be read. otherwise raise the + # original exception. + if raise_on_timeout or ex.errno != EWOULDBLOCK: + raise + return False + except socket.timeout: + if raise_on_timeout: + raise + return False + finally: + if custom_timeout: + sock.settimeout(self.socket_timeout) + + def can_read(self, timeout): + return bool(self.length) or \ + self._read_from_socket(timeout=timeout, + raise_on_timeout=False) def read(self, length): length = length + 2 # make sure to read the \r\n terminator @@ -233,7 +262,9 @@ def __del__(self): def on_connect(self, connection): "Called when the socket connects" self._sock = connection._sock - self._buffer = SocketBuffer(self._sock, self.socket_read_size) + self._buffer = SocketBuffer(self._sock, + self.socket_read_size, + connection.socket_timeout) self.encoder = connection.encoder def on_disconnect(self): @@ -244,8 +275,8 @@ def on_disconnect(self): self._buffer = None self.encoder = None - def can_read(self): - return self._buffer and bool(self._buffer.length) + def can_read(self, timeout): + return self._buffer and self._buffer.can_read(timeout) def read_response(self): response = self._buffer.readline() @@ -312,6 +343,7 @@ def __del__(self): def on_connect(self, connection): self._sock = connection._sock + self._socket_timeout = connection.socket_timeout kwargs = { 'protocolError': InvalidResponse, 'replyError': self.parse_error, @@ -333,13 +365,52 @@ def on_disconnect(self): self._reader = None self._next_response = False - def can_read(self): + def can_read(self, timeout): if not self._reader: raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) if self._next_response is False: self._next_response = self._reader.gets() - return self._next_response is not False + if self._next_response is False: + return self.read_from_socket(timeout=timeout, + raise_on_timeout=False) + return True + + def read_from_socket(self, timeout=SENTINEL, raise_on_timeout=True): + sock = self._sock + custom_timeout = timeout is not SENTINEL + try: + if custom_timeout: + sock.settimeout(timeout) + if HIREDIS_USE_BYTE_BUFFER: + bufflen = recv_into(self._sock, self._buffer) + if bufflen == 0: + raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) + self._reader.feed(self._buffer, 0, bufflen) + else: + buffer = recv(self._sock, self.socket_read_size) + # an empty string indicates the server shutdown the socket + if not isinstance(buffer, bytes) or len(buffer) == 0: + raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) + self._reader.feed(buffer) + # data was read from the socket and added to the buffer. + # return True to indicate that data was read. + return True + except BlockingIOError as ex: + # if we're in nonblocking mode and the recv raises a + # blocking error, simply return False indicating that + # there's no data to be read. otherwise raise the + # original exception. + if raise_on_timeout or ex.errno != EWOULDBLOCK: + raise + return False + except socket.timeout: + if not raise_on_timeout: + raise + return False + finally: + if custom_timeout: + sock.settimeout(self._socket_timeout) def read_response(self): if not self._reader: @@ -352,21 +423,8 @@ def read_response(self): return response response = self._reader.gets() - socket_read_size = self.socket_read_size while response is False: - if HIREDIS_USE_BYTE_BUFFER: - bufflen = recv_into(self._sock, self._buffer) - if bufflen == 0: - raise socket.error(SERVER_CLOSED_CONNECTION_ERROR) - else: - buffer = recv(self._sock, socket_read_size) - # an empty string indicates the server shutdown the socket - if not isinstance(buffer, bytes) or len(buffer) == 0: - raise socket.error(SERVER_CLOSED_CONNECTION_ERROR) - if HIREDIS_USE_BYTE_BUFFER: - self._reader.feed(self._buffer, 0, bufflen) - else: - self._reader.feed(buffer) + self.read_from_socket() response = self._reader.gets() # if an older version of hiredis is installed, we need to attempt # to convert ResponseErrors to their appropriate types. @@ -416,7 +474,6 @@ def __init__(self, host='localhost', port=6379, db=0, password=None, self.retry_on_timeout = retry_on_timeout self.encoder = Encoder(encoding, encoding_errors, decode_responses) self._sock = None - self._selector = None self._parser = parser_class(socket_read_size=socket_read_size) self._description_args = { 'host': self.host, @@ -454,7 +511,6 @@ def connect(self): raise ConnectionError(self._error_message(e)) self._sock = sock - self._selector = DefaultSelector(sock) try: self.on_connect() except RedisError: @@ -538,9 +594,6 @@ def disconnect(self): self._parser.on_disconnect() if self._sock is None: return - if self._selector is not None: - self._selector.close() - self._selector = None try: if os.getpid() == self.pid: self._sock.shutdown(socket.SHUT_RDWR) @@ -585,11 +638,7 @@ def can_read(self, timeout=0): if not sock: self.connect() sock = self._sock - return self._parser.can_read() or self._selector.can_read(timeout) - - def is_ready_for_command(self): - "Check if the connection is ready for a command" - return self._selector.is_ready_for_command() + return self._parser.can_read(timeout) def read_response(self): "Read the response from a previously sent command" @@ -963,10 +1012,13 @@ def get_connection(self, command_name, *keys, **options): # a command. if not, the connection was either returned to the # pool before all data has been read or the socket has been # closed. either way, reconnect and verify everything is good. - if not connection.is_ready_for_command(): + try: + if connection.can_read(): + raise ConnectionError('Connection has data') + except ConnectionError: connection.disconnect() connection.connect() - if not connection.is_ready_for_command(): + if connection.can_read(): raise ConnectionError('Connection not ready') except: # noqa: E722 # release the connection back to the pool so that we don't leak it @@ -1111,10 +1163,13 @@ def get_connection(self, command_name, *keys, **options): # a command. if not, the connection was either returned to the # pool before all data has been read or the socket has been # closed. either way, reconnect and verify everything is good. - if not connection.is_ready_for_command(): + try: + if connection.can_read(): + raise ConnectionError('Connection has data') + except ConnectionError: connection.disconnect() connection.connect() - if not connection.is_ready_for_command(): + if connection.can_read(): raise ConnectionError('Connection not ready') except: # noqa: E722 # release the connection back to the pool so that we don't leak it diff --git a/redis/selector.py b/redis/selector.py deleted file mode 100644 index bce84a5da7..0000000000 --- a/redis/selector.py +++ /dev/null @@ -1,196 +0,0 @@ -import errno -import select -from redis.exceptions import RedisError - - -_DEFAULT_SELECTOR = None - - -class BaseSelector(object): - """ - Base class for all Selectors - """ - def __init__(self, sock): - self.sock = sock - - def can_read(self, timeout=0): - """ - Return True if data is ready to be read from the socket, - otherwise False. - - This doesn't guarentee that the socket is still connected, just that - there is data to read. - - Automatically retries EINTR errors based on PEP 475. - """ - while True: - try: - return self.check_can_read(timeout) - except (select.error, IOError) as ex: - if self.errno_from_exception(ex) == errno.EINTR: - continue - return False - - def is_ready_for_command(self, timeout=0): - """ - Return True if the socket is ready to send a command, - otherwise False. - - Automatically retries EINTR errors based on PEP 475. - """ - while True: - try: - return self.check_is_ready_for_command(timeout) - except (select.error, IOError) as ex: - if self.errno_from_exception(ex) == errno.EINTR: - continue - return False - - def check_can_read(self, timeout): - """ - Perform the can_read check. Subclasses should implement this. - """ - raise NotImplementedError - - def check_is_ready_for_command(self, timeout): - """ - Perform the is_ready_for_command check. Subclasses should - implement this. - """ - raise NotImplementedError - - def close(self): - """ - Close the selector. - """ - self.sock = None - - def errno_from_exception(self, ex): - """ - Get the error number from an exception - """ - if hasattr(ex, 'errno'): - return ex.errno - elif ex.args: - return ex.args[0] - else: - return None - - -if hasattr(select, 'select'): - class SelectSelector(BaseSelector): - """ - A select-based selector that should work on most platforms. - - This is the worst poll strategy and should only be used if no other - option is available. - """ - def check_can_read(self, timeout): - """ - Return True if data is ready to be read from the socket, - otherwise False. - - This doesn't guarentee that the socket is still connected, just - that there is data to read. - """ - return bool(select.select([self.sock], [], [], timeout)[0]) - - def check_is_ready_for_command(self, timeout): - """ - Return True if the socket is ready to send a command, - otherwise False. - """ - r, w, e = select.select([self.sock], [self.sock], [self.sock], - timeout) - return bool(w and not r and not e) - - -if hasattr(select, 'poll'): - class PollSelector(BaseSelector): - """ - A poll-based selector that should work on (almost?) all versions - of Unix - """ - READ_MASK = select.POLLIN | select.POLLPRI - ERROR_MASK = select.POLLERR | select.POLLHUP - WRITE_MASK = select.POLLOUT - - _READ_POLLER_MASK = READ_MASK | ERROR_MASK - _READY_POLLER_MASK = READ_MASK | ERROR_MASK | WRITE_MASK - - def __init__(self, sock): - super(PollSelector, self).__init__(sock) - self.read_poller = select.poll() - self.read_poller.register(sock, self._READ_POLLER_MASK) - self.ready_poller = select.poll() - self.ready_poller.register(sock, self._READY_POLLER_MASK) - - def close(self): - """ - Close the selector. - """ - for poller in (self.read_poller, self.ready_poller): - try: - poller.unregister(self.sock) - except (KeyError, ValueError): - # KeyError is raised if somehow the socket was not - # registered - # ValueError is raised if the socket's file descriptor is - # negative. - # In either case, we can't do anything better than to - # remove the reference to the poller. - pass - self.read_poller = None - self.ready_poller = None - self.sock = None - - def check_can_read(self, timeout=0): - """ - Return True if data is ready to be read from the socket, - otherwise False. - - This doesn't guarentee that the socket is still connected, just - that there is data to read. - """ - timeout = int(timeout * 1000) - events = self.read_poller.poll(timeout) - return bool(events and events[0][1] & self.READ_MASK) - - def check_is_ready_for_command(self, timeout=0): - """ - Return True if the socket is ready to send a command, - otherwise False - """ - timeout = timeout * 1000 - events = self.ready_poller.poll(timeout) - return bool(events and events[0][1] == self.WRITE_MASK) - - -def has_selector(selector): - "Determine if the current platform has the selector available" - try: - if selector == 'poll': - # the select module offers the poll selector even if the platform - # doesn't support it. Attempt to poll for nothing to make sure - # poll is available - p = select.poll() - p.poll(0) - else: - # the other selectors will fail when instantiated - getattr(select, selector)().close() - return True - except (OSError, AttributeError): - return False - - -def DefaultSelector(sock): - "Return the best selector for the platform" - global _DEFAULT_SELECTOR - if _DEFAULT_SELECTOR is None: - if has_selector('poll'): - _DEFAULT_SELECTOR = PollSelector - elif hasattr(select, 'select'): - _DEFAULT_SELECTOR = SelectSelector - else: - raise RedisError('Platform does not support any selectors') - return _DEFAULT_SELECTOR(sock) diff --git a/tests/test_connection_pool.py b/tests/test_connection_pool.py index f258411e33..2aea1e4814 100644 --- a/tests/test_connection_pool.py +++ b/tests/test_connection_pool.py @@ -19,8 +19,8 @@ def __init__(self, **kwargs): def connect(self): pass - def is_ready_for_command(self): - return True + def can_read(self): + return False class TestConnectionPool(object): diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index fc91abf912..7f94b4a053 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -490,3 +490,16 @@ def test_send_pubsub_ping_message(self, r): assert wait_for_message(p) == make_message(type='pong', channel=None, data='hello world', pattern=None) + + +class TestPubSubConnectionKilled(object): + + @skip_if_server_version_lt('3.0.0') + def test_connection_error_raised_when_connection_dies(self, r): + p = r.pubsub(ignore_subscribe_messages=True) + p.subscribe('foo') + for client in r.client_list(): + if client['cmd'] == 'subscribe': + r.client_kill_filter(_id=client['id']) + with pytest.raises(ConnectionError): + wait_for_message(p) diff --git a/tests/test_selector.py b/tests/test_selector.py deleted file mode 100644 index 07bd6dcea3..0000000000 --- a/tests/test_selector.py +++ /dev/null @@ -1,122 +0,0 @@ -import pytest -import time -from redis import selector - -_SELECTORS = ( - 'SelectSelector', - 'PollSelector', -) - - -@pytest.mark.parametrize('selector_name', _SELECTORS) -class TestSelector(object): - - @pytest.fixture() - def selector_patch(self, selector_name, request): - "A fixture to patch the DefaultSelector with each selector" - if not hasattr(selector, selector_name): - pytest.skip('selector %s unavailable' % selector_name) - default_selector = selector._DEFAULT_SELECTOR - - def revert_selector(): - selector._DEFAULT_SELECTOR = default_selector - request.addfinalizer(revert_selector) - - selector._DEFAULT_SELECTOR = getattr(selector, selector_name) - - def kill_connection(self, connection, r): - "Helper that tells the redis server to kill `connection`" - # set a name for the connection so that we can identify and kill it - connection.send_command('client', 'setname', 'redis-py-1') - assert connection.read_response() == b'OK' - - # find the client based on its name and kill it - for client in r.client_list(): - if client['name'] == 'redis-py-1': - assert r.client_kill(client['addr']) - break - else: - assert False, 'Client redis-py-1 not found in client list' - - def test_can_read(self, selector_patch, r): - c = r.connection_pool.get_connection('_') - - # a fresh connection should not be readable - assert not c.can_read() - - c.send_command('PING') - # a connection should be readable when a response is available - # note that we supply a timeout here to make sure the server has - # a chance to respond - assert c.can_read(1.0) - - assert c.read_response() == b'PONG' - - # once the response is read, the connection is no longer readable - assert not c.can_read() - - def test_is_ready_for_command(self, selector_patch, r): - c = r.connection_pool.get_connection('_') - - # a fresh connection should be ready for a new command - assert c.is_ready_for_command() - - c.send_command('PING') - # once the server replies with a response, the selector should report - # that the connection is no longer ready since there is data that - # can be read. note that we need to wait for the server to respond - wait_until = time.time() + 2 - while time.time() < wait_until: - if not c.is_ready_for_command(): - break - time.sleep(0.01) - - assert not c.is_ready_for_command() - - assert c.read_response() == b'PONG' - - # once the response is read, the connection should be ready again - assert c.is_ready_for_command() - - def test_killed_connection_no_longer_ready(self, selector_patch, r): - "A connection that becomes disconnected is no longer ready" - c = r.connection_pool.get_connection('_') - # the connection should start as ready - assert c.is_ready_for_command() - - self.kill_connection(c, r) - - # the selector should immediately report that the socket is no - # longer ready - assert not c.is_ready_for_command() - - def test_pool_restores_killed_connection(self, selector_patch, r2): - """ - The ConnectionPool only returns healthy connecdtions, even if the - connection was killed while idle in the pool. - """ - # r2 provides two separate clients/connection pools - r = r2[0] - c = r.connection_pool.get_connection('_') - c._test_client = True - # the connection should start as ready - assert c.is_ready_for_command() - - # release the connection back to the pool - r.connection_pool.release(c) - - # kill the connection that is now idle in the pool - # use the second redis client/pool instance run the kill command - # such that it doesn't manipulate the primary connection pool - self.kill_connection(c, r2[1]) - - assert not c.is_ready_for_command() - - # retrieving the connection from the pool should provide us with - # the same connection we were previously using and it should now - # be ready for a command - c2 = r.connection_pool.get_connection('_') - assert c2 == c - assert c2._test_client is True - - assert c.is_ready_for_command()