diff --git a/changelog.d/15644.feature b/changelog.d/15644.feature new file mode 100644 index 000000000000..1b6126af53b9 --- /dev/null +++ b/changelog.d/15644.feature @@ -0,0 +1 @@ +Add Unix socket support for Redis connections. Contributed by Jason Little. diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 93b132b6e4cd..5ede6d0a827a 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -3979,6 +3979,8 @@ This setting has the following sub-options: * `enabled`: whether to use Redis support. Defaults to false. * `host` and `port`: Optional host and port to use to connect to redis. Defaults to localhost and 6379 +* `path`: The full path to a local Unix socket file. **If this is used, `host` and + `port` are ignored.** Defaults to `/tmp/redis.sock' * `password`: Optional password if configured on the Redis instance. * `dbid`: Optional redis dbid if needs to connect to specific redis logical db. * `use_tls`: Whether to use tls connection. Defaults to false. @@ -3991,6 +3993,8 @@ This setting has the following sub-options: _Changed in Synapse 1.84.0: Added use\_tls, certificate\_file, private\_key\_file, ca\_file and ca\_path attributes_ + _Changed in Synapse 1.85.0: Added path option to use a local Unix socket_ + Example configuration: ```yaml redis: diff --git a/stubs/txredisapi.pyi b/stubs/txredisapi.pyi index 695a2307c2c5..b7bd59d2ea17 100644 --- a/stubs/txredisapi.pyi +++ b/stubs/txredisapi.pyi @@ -61,6 +61,9 @@ def lazyConnection( # most methods to it via ConnectionHandler.__getattr__. class ConnectionHandler(RedisProtocol): def disconnect(self) -> "Deferred[None]": ... + def __repr__(self) -> str: ... + +class UnixConnectionHandler(ConnectionHandler): ... class RedisFactory(protocol.ReconnectingClientFactory): continueTrying: bool diff --git a/synapse/config/redis.py b/synapse/config/redis.py index 636cb450b8b5..3c4c499e22d3 100644 --- a/synapse/config/redis.py +++ b/synapse/config/redis.py @@ -33,6 +33,7 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: self.redis_host = redis_config.get("host", "localhost") self.redis_port = redis_config.get("port", 6379) + self.redis_path = redis_config.get("path", None) self.redis_dbid = redis_config.get("dbid", None) self.redis_password = redis_config.get("password") diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 233ad61d4930..5d108fe11b79 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -352,7 +352,15 @@ def start_replication(self, hs: "HomeServer") -> None: reactor = hs.get_reactor() redis_config = hs.config.redis - if hs.config.redis.redis_use_tls: + if redis_config.redis_path is not None: + reactor.connectUNIX( + redis_config.redis_path, + self._factory, + timeout=30, + checkPID=False, + ) + + elif hs.config.redis.redis_use_tls: ssl_context_factory = ClientContextFactory(hs.config.redis) reactor.connectSSL( redis_config.redis_host, diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index c8f4bf8b27c2..7e96145b3b22 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -17,7 +17,12 @@ from typing import TYPE_CHECKING, Any, Generic, List, Optional, Type, TypeVar, cast import attr -import txredisapi +from txredisapi import ( + ConnectionHandler, + RedisFactory, + SubscriberProtocol, + UnixConnectionHandler, +) from zope.interface import implementer from twisted.internet.address import IPv4Address, IPv6Address @@ -68,7 +73,7 @@ def __set__(self, obj: Optional[T], value: V) -> None: @implementer(IReplicationConnection) -class RedisSubscriber(txredisapi.SubscriberProtocol): +class RedisSubscriber(SubscriberProtocol): """Connection to redis subscribed to replication stream. This class fulfils two functions: @@ -95,7 +100,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol): synapse_handler: "ReplicationCommandHandler" synapse_stream_prefix: str synapse_channel_names: List[str] - synapse_outbound_redis_connection: txredisapi.ConnectionHandler + synapse_outbound_redis_connection: ConnectionHandler def __init__(self, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) @@ -229,7 +234,7 @@ async def _async_send_command(self, cmd: Command) -> None: ) -class SynapseRedisFactory(txredisapi.RedisFactory): +class SynapseRedisFactory(RedisFactory): """A subclass of RedisFactory that periodically sends pings to ensure that we detect dead connections. """ @@ -245,7 +250,7 @@ def __init__( dbid: Optional[int], poolsize: int, isLazy: bool = False, - handler: Type = txredisapi.ConnectionHandler, + handler: Type = ConnectionHandler, charset: str = "utf-8", password: Optional[str] = None, replyTimeout: int = 30, @@ -326,7 +331,7 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory): def __init__( self, hs: "HomeServer", - outbound_redis_connection: txredisapi.ConnectionHandler, + outbound_redis_connection: ConnectionHandler, channel_names: List[str], ): super().__init__( @@ -368,7 +373,7 @@ def lazyConnection( reconnect: bool = True, password: Optional[str] = None, replyTimeout: int = 30, -) -> txredisapi.ConnectionHandler: +) -> ConnectionHandler: """Creates a connection to Redis that is lazily set up and reconnects if the connections is lost. """ @@ -380,7 +385,7 @@ def lazyConnection( dbid=dbid, poolsize=1, isLazy=True, - handler=txredisapi.ConnectionHandler, + handler=ConnectionHandler, password=password, replyTimeout=replyTimeout, ) @@ -408,3 +413,44 @@ def lazyConnection( ) return factory.handler + + +def lazyUnixConnection( + hs: "HomeServer", + path: str = "/tmp/redis.sock", + dbid: Optional[int] = None, + reconnect: bool = True, + password: Optional[str] = None, + replyTimeout: int = 30, +) -> ConnectionHandler: + """Creates a connection to Redis that is lazily set up and reconnects if the + connection is lost. + + Returns: + A subclass of ConnectionHandler, which is a UnixConnectionHandler in this case. + """ + + uuid = path + + factory = SynapseRedisFactory( + hs, + uuid=uuid, + dbid=dbid, + poolsize=1, + isLazy=True, + handler=UnixConnectionHandler, + password=password, + replyTimeout=replyTimeout, + ) + factory.continueTrying = reconnect + + reactor = hs.get_reactor() + + reactor.connectUNIX( + path, + factory, + timeout=30, + checkPID=False, + ) + + return factory.handler diff --git a/synapse/server.py b/synapse/server.py index f6e245569cbb..cce5fb66ff02 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -864,22 +864,36 @@ def get_outbound_redis_connection(self) -> "ConnectionHandler": # We only want to import redis module if we're using it, as we have # `txredisapi` as an optional dependency. - from synapse.replication.tcp.redis import lazyConnection + from synapse.replication.tcp.redis import lazyConnection, lazyUnixConnection - logger.info( - "Connecting to redis (host=%r port=%r) for external cache", - self.config.redis.redis_host, - self.config.redis.redis_port, - ) + if self.config.redis.redis_path is None: + logger.info( + "Connecting to redis (host=%r port=%r) for external cache", + self.config.redis.redis_host, + self.config.redis.redis_port, + ) - return lazyConnection( - hs=self, - host=self.config.redis.redis_host, - port=self.config.redis.redis_port, - dbid=self.config.redis.redis_dbid, - password=self.config.redis.redis_password, - reconnect=True, - ) + return lazyConnection( + hs=self, + host=self.config.redis.redis_host, + port=self.config.redis.redis_port, + dbid=self.config.redis.redis_dbid, + password=self.config.redis.redis_password, + reconnect=True, + ) + else: + logger.info( + "Connecting to redis (path=%r) for external cache", + self.config.redis.redis_path, + ) + + return lazyUnixConnection( + hs=self, + path=self.config.redis.redis_path, + dbid=self.config.redis.redis_dbid, + password=self.config.redis.redis_password, + reconnect=True, + ) def should_send_federation(self) -> bool: "Should this server be sending federation traffic directly?"