Skip to content

Commit

Permalink
Use nonblocking sockets instead of selectors for healthy connections
Browse files Browse the repository at this point in the history
This replaces the work in 3.2.0 to use nonblocking sockets instead of
selectors. Selectors proved to be problematic for some environments
including eventlet and gevent. Nonblocking sockets should be available
in all environments.
  • Loading branch information
andymccurdy committed Jul 10, 2019
1 parent 9ed2132 commit acac4db
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 373 deletions.
5 changes: 4 additions & 1 deletion CHANGES
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
* 3.2.2 (in development)
* 3.3.0 (in development)
* Resolve a race condition with the PubSubWorkerThread. #1150
* Cleanup socket read error messages. Thanks Vic Yu. #1159
* Cleanup the Connection's selector correctly. Thanks Bruce Merry. #1153
Expand All @@ -17,6 +17,9 @@
cause the connection to be disconnected and cleaned up appropriately.
#923
* Add READONLY and READWRITE commands. Thanks @theodesp. #1114
* Remove selectors in favor of nonblocking sockets. Selectors had
issues in some environments including eventlet and gevent. This should
resolve those issues with no other side effects.
* 3.2.1
* Fix SentinelConnectionPool to work in multiprocess/forked environments.
* 3.2.0
Expand Down
2 changes: 1 addition & 1 deletion redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def int_or_str(value):
return value


__version__ = '3.2.1'
__version__ = '3.3.dev2'
VERSION = tuple(map(int_or_str, __version__.split('.')))

__all__ = [
Expand Down
4 changes: 3 additions & 1 deletion redis/_compat.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
"""Internal module for Python 2 backwards compatibility."""
import errno
import socket
import sys

# For Python older than 3.5, retry EINTR.
if sys.version_info[0] < 3 or (sys.version_info[0] == 3 and
sys.version_info[1] < 5):
# Adapted from https://bugs.python.org/review/23863/patch/14532/54418
import socket
import time

# Wrapper for handling interruptable system calls.
Expand Down Expand Up @@ -100,6 +100,7 @@ def byte_to_chr(x):
basestring = basestring
unicode = unicode
long = long
BlockingIOError = socket.error
else:
from urllib.parse import parse_qs, unquote, urlparse
from string import ascii_letters
Expand Down Expand Up @@ -129,6 +130,7 @@ def nativestr(x):
unicode = str
safe_unicode = str
long = int
BlockingIOError = BlockingIOError

try: # Python 3
from queue import LifoQueue, Empty, Full
Expand Down
155 changes: 105 additions & 50 deletions redis/connection.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import unicode_literals
from distutils.version import StrictVersion
from errno import EWOULDBLOCK
from itertools import chain
import io
import os
Expand All @@ -17,7 +18,7 @@
from redis._compat import (xrange, imap, byte_to_chr, unicode, long,
nativestr, basestring, iteritems,
LifoQueue, Empty, Full, urlparse, parse_qs,
recv, recv_into, unquote)
recv, recv_into, unquote, BlockingIOError)
from redis.exceptions import (
DataError,
RedisError,
Expand All @@ -31,7 +32,6 @@
ExecAbortError,
ReadOnlyError
)
from redis.selector import DefaultSelector
from redis.utils import HIREDIS_AVAILABLE
if HIREDIS_AVAILABLE:
import hiredis
Expand Down Expand Up @@ -61,6 +61,8 @@

SERVER_CLOSED_CONNECTION_ERROR = "Connection closed by server."

SENTINEL = object()


class Encoder(object):
"Encode strings to bytes and decode bytes to strings"
Expand Down Expand Up @@ -126,9 +128,10 @@ def parse_error(self, response):


class SocketBuffer(object):
def __init__(self, socket, socket_read_size):
def __init__(self, socket, socket_read_size, socket_timeout):
self._sock = socket
self.socket_read_size = socket_read_size
self.socket_timeout = socket_timeout
self._buffer = io.BytesIO()
# number of bytes written to the buffer from the socket
self.bytes_written = 0
Expand All @@ -139,25 +142,51 @@ def __init__(self, socket, socket_read_size):
def length(self):
return self.bytes_written - self.bytes_read

def _read_from_socket(self, length=None):
def _read_from_socket(self, length=None, timeout=SENTINEL,
raise_on_timeout=True):
sock = self._sock
socket_read_size = self.socket_read_size
buf = self._buffer
buf.seek(self.bytes_written)
marker = 0
custom_timeout = timeout is not SENTINEL

while True:
data = recv(self._sock, socket_read_size)
# an empty string indicates the server shutdown the socket
if isinstance(data, bytes) and len(data) == 0:
raise socket.error(SERVER_CLOSED_CONNECTION_ERROR)
buf.write(data)
data_length = len(data)
self.bytes_written += data_length
marker += data_length

if length is not None and length > marker:
continue
break
try:
if custom_timeout:
sock.settimeout(timeout)
while True:
data = recv(self._sock, socket_read_size)
# an empty string indicates the server shutdown the socket
if isinstance(data, bytes) and len(data) == 0:
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
buf.write(data)
data_length = len(data)
self.bytes_written += data_length
marker += data_length

if length is not None and length > marker:
continue
return True
except BlockingIOError as ex:
# if we're in nonblocking mode and the recv raises a
# blocking error, simply return False indicating that
# there's no data to be read. otherwise raise the
# original exception.
if raise_on_timeout or ex.errno != EWOULDBLOCK:
raise
return False
except socket.timeout:
if raise_on_timeout:
raise
return False
finally:
if custom_timeout:
sock.settimeout(self.socket_timeout)

def can_read(self, timeout):
return bool(self.length) or \
self._read_from_socket(timeout=timeout,
raise_on_timeout=False)

def read(self, length):
length = length + 2 # make sure to read the \r\n terminator
Expand Down Expand Up @@ -233,7 +262,9 @@ def __del__(self):
def on_connect(self, connection):
"Called when the socket connects"
self._sock = connection._sock
self._buffer = SocketBuffer(self._sock, self.socket_read_size)
self._buffer = SocketBuffer(self._sock,
self.socket_read_size,
connection.socket_timeout)
self.encoder = connection.encoder

def on_disconnect(self):
Expand All @@ -244,8 +275,8 @@ def on_disconnect(self):
self._buffer = None
self.encoder = None

def can_read(self):
return self._buffer and bool(self._buffer.length)
def can_read(self, timeout):
return self._buffer and self._buffer.can_read(timeout)

def read_response(self):
response = self._buffer.readline()
Expand Down Expand Up @@ -312,6 +343,7 @@ def __del__(self):

def on_connect(self, connection):
self._sock = connection._sock
self._socket_timeout = connection.socket_timeout
kwargs = {
'protocolError': InvalidResponse,
'replyError': self.parse_error,
Expand All @@ -333,13 +365,52 @@ def on_disconnect(self):
self._reader = None
self._next_response = False

def can_read(self):
def can_read(self, timeout):
if not self._reader:
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)

if self._next_response is False:
self._next_response = self._reader.gets()
return self._next_response is not False
if self._next_response is False:
return self.read_from_socket(timeout=timeout,
raise_on_timeout=False)
return True

def read_from_socket(self, timeout=SENTINEL, raise_on_timeout=True):
sock = self._sock
custom_timeout = timeout is not SENTINEL
try:
if custom_timeout:
sock.settimeout(timeout)
if HIREDIS_USE_BYTE_BUFFER:
bufflen = recv_into(self._sock, self._buffer)
if bufflen == 0:
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
self._reader.feed(self._buffer, 0, bufflen)
else:
buffer = recv(self._sock, self.socket_read_size)
# an empty string indicates the server shutdown the socket
if not isinstance(buffer, bytes) or len(buffer) == 0:
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
self._reader.feed(buffer)
# data was read from the socket and added to the buffer.
# return True to indicate that data was read.
return True
except BlockingIOError as ex:
# if we're in nonblocking mode and the recv raises a
# blocking error, simply return False indicating that
# there's no data to be read. otherwise raise the
# original exception.
if raise_on_timeout or ex.errno != EWOULDBLOCK:
raise
return False
except socket.timeout:
if not raise_on_timeout:
raise
return False
finally:
if custom_timeout:
sock.settimeout(self._socket_timeout)

def read_response(self):
if not self._reader:
Expand All @@ -352,21 +423,8 @@ def read_response(self):
return response

response = self._reader.gets()
socket_read_size = self.socket_read_size
while response is False:
if HIREDIS_USE_BYTE_BUFFER:
bufflen = recv_into(self._sock, self._buffer)
if bufflen == 0:
raise socket.error(SERVER_CLOSED_CONNECTION_ERROR)
else:
buffer = recv(self._sock, socket_read_size)
# an empty string indicates the server shutdown the socket
if not isinstance(buffer, bytes) or len(buffer) == 0:
raise socket.error(SERVER_CLOSED_CONNECTION_ERROR)
if HIREDIS_USE_BYTE_BUFFER:
self._reader.feed(self._buffer, 0, bufflen)
else:
self._reader.feed(buffer)
self.read_from_socket()
response = self._reader.gets()
# if an older version of hiredis is installed, we need to attempt
# to convert ResponseErrors to their appropriate types.
Expand Down Expand Up @@ -416,7 +474,6 @@ def __init__(self, host='localhost', port=6379, db=0, password=None,
self.retry_on_timeout = retry_on_timeout
self.encoder = Encoder(encoding, encoding_errors, decode_responses)
self._sock = None
self._selector = None
self._parser = parser_class(socket_read_size=socket_read_size)
self._description_args = {
'host': self.host,
Expand Down Expand Up @@ -454,7 +511,6 @@ def connect(self):
raise ConnectionError(self._error_message(e))

self._sock = sock
self._selector = DefaultSelector(sock)
try:
self.on_connect()
except RedisError:
Expand Down Expand Up @@ -538,9 +594,6 @@ def disconnect(self):
self._parser.on_disconnect()
if self._sock is None:
return
if self._selector is not None:
self._selector.close()
self._selector = None
try:
if os.getpid() == self.pid:
self._sock.shutdown(socket.SHUT_RDWR)
Expand Down Expand Up @@ -585,11 +638,7 @@ def can_read(self, timeout=0):
if not sock:
self.connect()
sock = self._sock
return self._parser.can_read() or self._selector.can_read(timeout)

def is_ready_for_command(self):
"Check if the connection is ready for a command"
return self._selector.is_ready_for_command()
return self._parser.can_read(timeout)

def read_response(self):
"Read the response from a previously sent command"
Expand Down Expand Up @@ -963,10 +1012,13 @@ def get_connection(self, command_name, *keys, **options):
# a command. if not, the connection was either returned to the
# pool before all data has been read or the socket has been
# closed. either way, reconnect and verify everything is good.
if not connection.is_ready_for_command():
try:
if connection.can_read():
raise ConnectionError('Connection has data')
except ConnectionError:
connection.disconnect()
connection.connect()
if not connection.is_ready_for_command():
if connection.can_read():
raise ConnectionError('Connection not ready')
except: # noqa: E722
# release the connection back to the pool so that we don't leak it
Expand Down Expand Up @@ -1111,10 +1163,13 @@ def get_connection(self, command_name, *keys, **options):
# a command. if not, the connection was either returned to the
# pool before all data has been read or the socket has been
# closed. either way, reconnect and verify everything is good.
if not connection.is_ready_for_command():
try:
if connection.can_read():
raise ConnectionError('Connection has data')
except ConnectionError:
connection.disconnect()
connection.connect()
if not connection.is_ready_for_command():
if connection.can_read():
raise ConnectionError('Connection not ready')
except: # noqa: E722
# release the connection back to the pool so that we don't leak it
Expand Down
Loading

0 comments on commit acac4db

Please sign in to comment.