Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add Unix socket support for Redis connections (#15644)
Browse files Browse the repository at this point in the history
Adds a new configuration setting to connect to Redis via a Unix
socket instead of over TCP. Disabled by default.
  • Loading branch information
realtyem authored May 26, 2023
1 parent 50918c4 commit c835bef
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 23 deletions.
1 change: 1 addition & 0 deletions changelog.d/15644.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add Unix socket support for Redis connections. Contributed by Jason Little.
4 changes: 4 additions & 0 deletions docs/usage/configuration/config_documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -3979,6 +3979,8 @@ This setting has the following sub-options:
* `enabled`: whether to use Redis support. Defaults to false.
* `host` and `port`: Optional host and port to use to connect to redis. Defaults to
localhost and 6379
* `path`: The full path to a local Unix socket file. **If this is used, `host` and
`port` are ignored.** Defaults to `/tmp/redis.sock'
* `password`: Optional password if configured on the Redis instance.
* `dbid`: Optional redis dbid if needs to connect to specific redis logical db.
* `use_tls`: Whether to use tls connection. Defaults to false.
Expand All @@ -3991,6 +3993,8 @@ This setting has the following sub-options:

_Changed in Synapse 1.84.0: Added use\_tls, certificate\_file, private\_key\_file, ca\_file and ca\_path attributes_

_Changed in Synapse 1.85.0: Added path option to use a local Unix socket_

Example configuration:
```yaml
redis:
Expand Down
3 changes: 3 additions & 0 deletions stubs/txredisapi.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ def lazyConnection(
# most methods to it via ConnectionHandler.__getattr__.
class ConnectionHandler(RedisProtocol):
def disconnect(self) -> "Deferred[None]": ...
def __repr__(self) -> str: ...

class UnixConnectionHandler(ConnectionHandler): ...

class RedisFactory(protocol.ReconnectingClientFactory):
continueTrying: bool
Expand Down
1 change: 1 addition & 0 deletions synapse/config/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:

self.redis_host = redis_config.get("host", "localhost")
self.redis_port = redis_config.get("port", 6379)
self.redis_path = redis_config.get("path", None)
self.redis_dbid = redis_config.get("dbid", None)
self.redis_password = redis_config.get("password")

Expand Down
10 changes: 9 additions & 1 deletion synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,15 @@ def start_replication(self, hs: "HomeServer") -> None:

reactor = hs.get_reactor()
redis_config = hs.config.redis
if hs.config.redis.redis_use_tls:
if redis_config.redis_path is not None:
reactor.connectUNIX(
redis_config.redis_path,
self._factory,
timeout=30,
checkPID=False,
)

elif hs.config.redis.redis_use_tls:
ssl_context_factory = ClientContextFactory(hs.config.redis)
reactor.connectSSL(
redis_config.redis_host,
Expand Down
62 changes: 54 additions & 8 deletions synapse/replication/tcp/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
from typing import TYPE_CHECKING, Any, Generic, List, Optional, Type, TypeVar, cast

import attr
import txredisapi
from txredisapi import (
ConnectionHandler,
RedisFactory,
SubscriberProtocol,
UnixConnectionHandler,
)
from zope.interface import implementer

from twisted.internet.address import IPv4Address, IPv6Address
Expand Down Expand Up @@ -68,7 +73,7 @@ def __set__(self, obj: Optional[T], value: V) -> None:


@implementer(IReplicationConnection)
class RedisSubscriber(txredisapi.SubscriberProtocol):
class RedisSubscriber(SubscriberProtocol):
"""Connection to redis subscribed to replication stream.
This class fulfils two functions:
Expand All @@ -95,7 +100,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
synapse_handler: "ReplicationCommandHandler"
synapse_stream_prefix: str
synapse_channel_names: List[str]
synapse_outbound_redis_connection: txredisapi.ConnectionHandler
synapse_outbound_redis_connection: ConnectionHandler

def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -229,7 +234,7 @@ async def _async_send_command(self, cmd: Command) -> None:
)


class SynapseRedisFactory(txredisapi.RedisFactory):
class SynapseRedisFactory(RedisFactory):
"""A subclass of RedisFactory that periodically sends pings to ensure that
we detect dead connections.
"""
Expand All @@ -245,7 +250,7 @@ def __init__(
dbid: Optional[int],
poolsize: int,
isLazy: bool = False,
handler: Type = txredisapi.ConnectionHandler,
handler: Type = ConnectionHandler,
charset: str = "utf-8",
password: Optional[str] = None,
replyTimeout: int = 30,
Expand Down Expand Up @@ -326,7 +331,7 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
def __init__(
self,
hs: "HomeServer",
outbound_redis_connection: txredisapi.ConnectionHandler,
outbound_redis_connection: ConnectionHandler,
channel_names: List[str],
):
super().__init__(
Expand Down Expand Up @@ -368,7 +373,7 @@ def lazyConnection(
reconnect: bool = True,
password: Optional[str] = None,
replyTimeout: int = 30,
) -> txredisapi.ConnectionHandler:
) -> ConnectionHandler:
"""Creates a connection to Redis that is lazily set up and reconnects if the
connections is lost.
"""
Expand All @@ -380,7 +385,7 @@ def lazyConnection(
dbid=dbid,
poolsize=1,
isLazy=True,
handler=txredisapi.ConnectionHandler,
handler=ConnectionHandler,
password=password,
replyTimeout=replyTimeout,
)
Expand Down Expand Up @@ -408,3 +413,44 @@ def lazyConnection(
)

return factory.handler


def lazyUnixConnection(
hs: "HomeServer",
path: str = "/tmp/redis.sock",
dbid: Optional[int] = None,
reconnect: bool = True,
password: Optional[str] = None,
replyTimeout: int = 30,
) -> ConnectionHandler:
"""Creates a connection to Redis that is lazily set up and reconnects if the
connection is lost.
Returns:
A subclass of ConnectionHandler, which is a UnixConnectionHandler in this case.
"""

uuid = path

factory = SynapseRedisFactory(
hs,
uuid=uuid,
dbid=dbid,
poolsize=1,
isLazy=True,
handler=UnixConnectionHandler,
password=password,
replyTimeout=replyTimeout,
)
factory.continueTrying = reconnect

reactor = hs.get_reactor()

reactor.connectUNIX(
path,
factory,
timeout=30,
checkPID=False,
)

return factory.handler
42 changes: 28 additions & 14 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -864,22 +864,36 @@ def get_outbound_redis_connection(self) -> "ConnectionHandler":

# We only want to import redis module if we're using it, as we have
# `txredisapi` as an optional dependency.
from synapse.replication.tcp.redis import lazyConnection
from synapse.replication.tcp.redis import lazyConnection, lazyUnixConnection

logger.info(
"Connecting to redis (host=%r port=%r) for external cache",
self.config.redis.redis_host,
self.config.redis.redis_port,
)
if self.config.redis.redis_path is None:
logger.info(
"Connecting to redis (host=%r port=%r) for external cache",
self.config.redis.redis_host,
self.config.redis.redis_port,
)

return lazyConnection(
hs=self,
host=self.config.redis.redis_host,
port=self.config.redis.redis_port,
dbid=self.config.redis.redis_dbid,
password=self.config.redis.redis_password,
reconnect=True,
)
return lazyConnection(
hs=self,
host=self.config.redis.redis_host,
port=self.config.redis.redis_port,
dbid=self.config.redis.redis_dbid,
password=self.config.redis.redis_password,
reconnect=True,
)
else:
logger.info(
"Connecting to redis (path=%r) for external cache",
self.config.redis.redis_path,
)

return lazyUnixConnection(
hs=self,
path=self.config.redis.redis_path,
dbid=self.config.redis.redis_dbid,
password=self.config.redis.redis_password,
reconnect=True,
)

def should_send_federation(self) -> bool:
"Should this server be sending federation traffic directly?"
Expand Down

0 comments on commit c835bef

Please sign in to comment.