diff --git a/CHANGES b/CHANGES index 21047ab234..98f3f73679 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,8 @@ * Restore try/except clauses to __del__ methods. These will be removed in 4.0 when more explicit resource management if enforced. #1339 * Update the master_address when Sentinels promote a new master. #847 + * Update SentinelConnectionPool to not forcefully disconnect other in-use + connections which can negatively affect threaded applications. #1345 * 3.5.2 (May 14, 2020) * Tune the locking in ConnectionPool.get_connection so that the lock is not held while waiting for the socket to establish and validate the diff --git a/redis/connection.py b/redis/connection.py index 5c8fd3c4c1..e3c9b66287 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -1230,18 +1230,43 @@ def release(self, connection): "Releases the connection back to the pool" self._checkpid() with self._lock: - if connection.pid != self.pid: + try: + self._in_use_connections.remove(connection) + except KeyError: + # Gracefully fail when a connection is returned to this pool + # that the pool doesn't actually own + pass + + if self.owns_connection(connection): + self._available_connections.append(connection) + else: + # pool doesn't own this connection. do not add it back + # to the pool and decrement the count so that another + # connection can take its place if needed + self._created_connections -= 1 + connection.disconnect() return - self._in_use_connections.remove(connection) - self._available_connections.append(connection) - def disconnect(self): - "Disconnects all connections in the pool" + def owns_connection(self, connection): + return connection.pid == self.pid + + def disconnect(self, inuse_connections=True): + """ + Disconnects connections in the pool + + If ``inuse_connections`` is True, disconnect connections that are + current in use, potentially by other threads. Otherwise only disconnect + connections that are idle in the pool. + """ self._checkpid() with self._lock: - all_conns = chain(self._available_connections, - self._in_use_connections) - for connection in all_conns: + if inuse_connections: + connections = chain(self._available_connections, + self._in_use_connections) + else: + connections = self._available_connections + + for connection in connections: connection.disconnect() @@ -1375,7 +1400,13 @@ def release(self, connection): "Releases the connection back to the pool." # Make sure we haven't changed process. self._checkpid() - if connection.pid != self.pid: + if not self.owns_connection(connection): + # pool doesn't own this connection. do not add it back + # to the pool. instead add a None value which is a placeholder + # that will cause the pool to recreate the connection if + # its needed. + connection.disconnect() + self.pool.put_nowait(None) return # Put the connection back into the pool. diff --git a/redis/sentinel.py b/redis/sentinel.py index 3857c88f5a..2b212ea8bf 100644 --- a/redis/sentinel.py +++ b/redis/sentinel.py @@ -95,16 +95,22 @@ def reset(self): self.master_address = None self.slave_rr_counter = None + def owns_connection(self, connection): + check = not self.is_master or \ + (self.is_master and + self.master_address == (connection.host, connection.port)) + parent = super(SentinelConnectionPool, self) + return check and parent.owns_connection(connection) + def get_master_address(self): master_address = self.sentinel_manager.discover_master( self.service_name) if self.is_master: - if self.master_address is None: - self.master_address = master_address - elif master_address != self.master_address: - # Master address changed, disconnect all clients in this pool + if self.master_address != master_address: self.master_address = master_address - self.disconnect() + # disconnect any idle connections so that they reconnect + # to the new master the next time that they are used. + self.disconnect(inuse_connections=False) return master_address def rotate_slaves(self):