From cbfcbdd1acdb51a280c5e61eb5804b5b37fc1605 Mon Sep 17 00:00:00 2001 From: Jason Little Date: Mon, 22 May 2023 02:41:11 -0500 Subject: [PATCH 1/6] Add Unix socket support to Redis connections --- stubs/txredisapi.pyi | 3 ++ synapse/config/redis.py | 1 + synapse/replication/tcp/handler.py | 37 +++++++++++------- synapse/replication/tcp/redis.py | 62 ++++++++++++++++++++++++++---- synapse/server.py | 42 +++++++++++++------- 5 files changed, 109 insertions(+), 36 deletions(-) diff --git a/stubs/txredisapi.pyi b/stubs/txredisapi.pyi index 695a2307c2c5..a11249b5781f 100644 --- a/stubs/txredisapi.pyi +++ b/stubs/txredisapi.pyi @@ -62,6 +62,9 @@ def lazyConnection( class ConnectionHandler(RedisProtocol): def disconnect(self) -> "Deferred[None]": ... +class UnixConnectionHandler(ConnectionHandler): + def __repr__(self) -> str: ... + class RedisFactory(protocol.ReconnectingClientFactory): continueTrying: bool handler: ConnectionHandler diff --git a/synapse/config/redis.py b/synapse/config/redis.py index 636cb450b8b5..3c4c499e22d3 100644 --- a/synapse/config/redis.py +++ b/synapse/config/redis.py @@ -33,6 +33,7 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: self.redis_host = redis_config.get("host", "localhost") self.redis_port = redis_config.get("port", 6379) + self.redis_path = redis_config.get("path", None) self.redis_dbid = redis_config.get("dbid", None) self.redis_password = redis_config.get("password") diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 233ad61d4930..b40b0d6d90e5 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -352,24 +352,33 @@ def start_replication(self, hs: "HomeServer") -> None: reactor = hs.get_reactor() redis_config = hs.config.redis - if hs.config.redis.redis_use_tls: - ssl_context_factory = ClientContextFactory(hs.config.redis) - reactor.connectSSL( - redis_config.redis_host, - redis_config.redis_port, + if redis_config.redis_path is not None: + reactor.connectUNIX( + redis_config.redis_path, self._factory, - ssl_context_factory, timeout=30, - bindAddress=None, + checkPID=False, ) + else: - reactor.connectTCP( - redis_config.redis_host, - redis_config.redis_port, - self._factory, - timeout=30, - bindAddress=None, - ) + if hs.config.redis.redis_use_tls: + ssl_context_factory = ClientContextFactory(hs.config.redis) + reactor.connectSSL( + redis_config.redis_host, + redis_config.redis_port, + self._factory, + ssl_context_factory, + timeout=30, + bindAddress=None, + ) + else: + reactor.connectTCP( + redis_config.redis_host, + redis_config.redis_port, + self._factory, + timeout=30, + bindAddress=None, + ) def get_streams(self) -> Dict[str, Stream]: """Get a map from stream name to all streams.""" diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index c8f4bf8b27c2..7e96145b3b22 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -17,7 +17,12 @@ from typing import TYPE_CHECKING, Any, Generic, List, Optional, Type, TypeVar, cast import attr -import txredisapi +from txredisapi import ( + ConnectionHandler, + RedisFactory, + SubscriberProtocol, + UnixConnectionHandler, +) from zope.interface import implementer from twisted.internet.address import IPv4Address, IPv6Address @@ -68,7 +73,7 @@ def __set__(self, obj: Optional[T], value: V) -> None: @implementer(IReplicationConnection) -class RedisSubscriber(txredisapi.SubscriberProtocol): +class RedisSubscriber(SubscriberProtocol): """Connection to redis subscribed to replication stream. This class fulfils two functions: @@ -95,7 +100,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol): synapse_handler: "ReplicationCommandHandler" synapse_stream_prefix: str synapse_channel_names: List[str] - synapse_outbound_redis_connection: txredisapi.ConnectionHandler + synapse_outbound_redis_connection: ConnectionHandler def __init__(self, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) @@ -229,7 +234,7 @@ async def _async_send_command(self, cmd: Command) -> None: ) -class SynapseRedisFactory(txredisapi.RedisFactory): +class SynapseRedisFactory(RedisFactory): """A subclass of RedisFactory that periodically sends pings to ensure that we detect dead connections. """ @@ -245,7 +250,7 @@ def __init__( dbid: Optional[int], poolsize: int, isLazy: bool = False, - handler: Type = txredisapi.ConnectionHandler, + handler: Type = ConnectionHandler, charset: str = "utf-8", password: Optional[str] = None, replyTimeout: int = 30, @@ -326,7 +331,7 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory): def __init__( self, hs: "HomeServer", - outbound_redis_connection: txredisapi.ConnectionHandler, + outbound_redis_connection: ConnectionHandler, channel_names: List[str], ): super().__init__( @@ -368,7 +373,7 @@ def lazyConnection( reconnect: bool = True, password: Optional[str] = None, replyTimeout: int = 30, -) -> txredisapi.ConnectionHandler: +) -> ConnectionHandler: """Creates a connection to Redis that is lazily set up and reconnects if the connections is lost. """ @@ -380,7 +385,7 @@ def lazyConnection( dbid=dbid, poolsize=1, isLazy=True, - handler=txredisapi.ConnectionHandler, + handler=ConnectionHandler, password=password, replyTimeout=replyTimeout, ) @@ -408,3 +413,44 @@ def lazyConnection( ) return factory.handler + + +def lazyUnixConnection( + hs: "HomeServer", + path: str = "/tmp/redis.sock", + dbid: Optional[int] = None, + reconnect: bool = True, + password: Optional[str] = None, + replyTimeout: int = 30, +) -> ConnectionHandler: + """Creates a connection to Redis that is lazily set up and reconnects if the + connection is lost. + + Returns: + A subclass of ConnectionHandler, which is a UnixConnectionHandler in this case. + """ + + uuid = path + + factory = SynapseRedisFactory( + hs, + uuid=uuid, + dbid=dbid, + poolsize=1, + isLazy=True, + handler=UnixConnectionHandler, + password=password, + replyTimeout=replyTimeout, + ) + factory.continueTrying = reconnect + + reactor = hs.get_reactor() + + reactor.connectUNIX( + path, + factory, + timeout=30, + checkPID=False, + ) + + return factory.handler diff --git a/synapse/server.py b/synapse/server.py index f6e245569cbb..cce5fb66ff02 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -864,22 +864,36 @@ def get_outbound_redis_connection(self) -> "ConnectionHandler": # We only want to import redis module if we're using it, as we have # `txredisapi` as an optional dependency. - from synapse.replication.tcp.redis import lazyConnection + from synapse.replication.tcp.redis import lazyConnection, lazyUnixConnection - logger.info( - "Connecting to redis (host=%r port=%r) for external cache", - self.config.redis.redis_host, - self.config.redis.redis_port, - ) + if self.config.redis.redis_path is None: + logger.info( + "Connecting to redis (host=%r port=%r) for external cache", + self.config.redis.redis_host, + self.config.redis.redis_port, + ) - return lazyConnection( - hs=self, - host=self.config.redis.redis_host, - port=self.config.redis.redis_port, - dbid=self.config.redis.redis_dbid, - password=self.config.redis.redis_password, - reconnect=True, - ) + return lazyConnection( + hs=self, + host=self.config.redis.redis_host, + port=self.config.redis.redis_port, + dbid=self.config.redis.redis_dbid, + password=self.config.redis.redis_password, + reconnect=True, + ) + else: + logger.info( + "Connecting to redis (path=%r) for external cache", + self.config.redis.redis_path, + ) + + return lazyUnixConnection( + hs=self, + path=self.config.redis.redis_path, + dbid=self.config.redis.redis_dbid, + password=self.config.redis.redis_password, + reconnect=True, + ) def should_send_federation(self) -> bool: "Should this server be sending federation traffic directly?" From 32721fd6c348dcd937da05c2b5ce92b63badcf7d Mon Sep 17 00:00:00 2001 From: Jason Little Date: Mon, 22 May 2023 03:23:00 -0500 Subject: [PATCH 2/6] Changelog --- changelog.d/15644.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/15644.feature diff --git a/changelog.d/15644.feature b/changelog.d/15644.feature new file mode 100644 index 000000000000..1b6126af53b9 --- /dev/null +++ b/changelog.d/15644.feature @@ -0,0 +1 @@ +Add Unix socket support for Redis connections. Contributed by Jason Little. From cc432c6330d967397fd92e375427e39b89a00378 Mon Sep 17 00:00:00 2001 From: Jason Little Date: Mon, 22 May 2023 04:55:08 -0500 Subject: [PATCH 3/6] [REVERT ME] Add in Complement infrastructure support for testing. --- docker/conf-workers/shared.yaml.j2 | 1 + docker/conf-workers/supervisord.conf.j2 | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/docker/conf-workers/shared.yaml.j2 b/docker/conf-workers/shared.yaml.j2 index 92d25386dc34..2a74948609e9 100644 --- a/docker/conf-workers/shared.yaml.j2 +++ b/docker/conf-workers/shared.yaml.j2 @@ -6,6 +6,7 @@ {% if enable_redis %} redis: enabled: true + path: /tmp/redis.sock {% endif %} {% if appservice_registrations is not none %} diff --git a/docker/conf-workers/supervisord.conf.j2 b/docker/conf-workers/supervisord.conf.j2 index 9f1e03cfc0a2..733c4a052f09 100644 --- a/docker/conf-workers/supervisord.conf.j2 +++ b/docker/conf-workers/supervisord.conf.j2 @@ -19,7 +19,7 @@ username=www-data autorestart=true [program:redis] -command=/usr/local/bin/prefix-log /usr/local/bin/redis-server +command=/usr/local/bin/prefix-log /usr/local/bin/redis-server --unixsocket /tmp/redis.sock priority=1 stdout_logfile=/dev/stdout stdout_logfile_maxbytes=0 From f3d198104791b39bbe8789f8fb5ab68922035045 Mon Sep 17 00:00:00 2001 From: Jason Little Date: Mon, 22 May 2023 05:43:16 -0500 Subject: [PATCH 4/6] Revert "[REVERT ME] Add in Complement infrastructure support for testing." This reverts commit cc432c6330d967397fd92e375427e39b89a00378. --- docker/conf-workers/shared.yaml.j2 | 1 - docker/conf-workers/supervisord.conf.j2 | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/docker/conf-workers/shared.yaml.j2 b/docker/conf-workers/shared.yaml.j2 index 2a74948609e9..92d25386dc34 100644 --- a/docker/conf-workers/shared.yaml.j2 +++ b/docker/conf-workers/shared.yaml.j2 @@ -6,7 +6,6 @@ {% if enable_redis %} redis: enabled: true - path: /tmp/redis.sock {% endif %} {% if appservice_registrations is not none %} diff --git a/docker/conf-workers/supervisord.conf.j2 b/docker/conf-workers/supervisord.conf.j2 index 733c4a052f09..9f1e03cfc0a2 100644 --- a/docker/conf-workers/supervisord.conf.j2 +++ b/docker/conf-workers/supervisord.conf.j2 @@ -19,7 +19,7 @@ username=www-data autorestart=true [program:redis] -command=/usr/local/bin/prefix-log /usr/local/bin/redis-server --unixsocket /tmp/redis.sock +command=/usr/local/bin/prefix-log /usr/local/bin/redis-server priority=1 stdout_logfile=/dev/stdout stdout_logfile_maxbytes=0 From 7be9a0183679255f142de14a5b0ebecba766be52 Mon Sep 17 00:00:00 2001 From: Jason Little Date: Wed, 24 May 2023 19:51:54 -0500 Subject: [PATCH 5/6] Update stubs/txredisapi.pyi Co-authored-by: Patrick Cloke --- stubs/txredisapi.pyi | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/stubs/txredisapi.pyi b/stubs/txredisapi.pyi index a11249b5781f..c5bfd3bace6d 100644 --- a/stubs/txredisapi.pyi +++ b/stubs/txredisapi.pyi @@ -61,9 +61,10 @@ def lazyConnection( # most methods to it via ConnectionHandler.__getattr__. class ConnectionHandler(RedisProtocol): def disconnect(self) -> "Deferred[None]": ... + def __repr__(self) -> str: ... class UnixConnectionHandler(ConnectionHandler): - def __repr__(self) -> str: ... + ... class RedisFactory(protocol.ReconnectingClientFactory): continueTrying: bool From ce1179146fdeb7a66d0dd9afdad4da11e917e0d1 Mon Sep 17 00:00:00 2001 From: Jason Little Date: Thu, 25 May 2023 03:44:21 -0500 Subject: [PATCH 6/6] Address review feedback(docs, unindent with elif) and dust off some lint in the stub --- .../configuration/config_documentation.md | 4 +++ stubs/txredisapi.pyi | 3 +- synapse/replication/tcp/handler.py | 35 +++++++++---------- 3 files changed, 22 insertions(+), 20 deletions(-) diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 93b132b6e4cd..5ede6d0a827a 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -3979,6 +3979,8 @@ This setting has the following sub-options: * `enabled`: whether to use Redis support. Defaults to false. * `host` and `port`: Optional host and port to use to connect to redis. Defaults to localhost and 6379 +* `path`: The full path to a local Unix socket file. **If this is used, `host` and + `port` are ignored.** Defaults to `/tmp/redis.sock' * `password`: Optional password if configured on the Redis instance. * `dbid`: Optional redis dbid if needs to connect to specific redis logical db. * `use_tls`: Whether to use tls connection. Defaults to false. @@ -3991,6 +3993,8 @@ This setting has the following sub-options: _Changed in Synapse 1.84.0: Added use\_tls, certificate\_file, private\_key\_file, ca\_file and ca\_path attributes_ + _Changed in Synapse 1.85.0: Added path option to use a local Unix socket_ + Example configuration: ```yaml redis: diff --git a/stubs/txredisapi.pyi b/stubs/txredisapi.pyi index c5bfd3bace6d..b7bd59d2ea17 100644 --- a/stubs/txredisapi.pyi +++ b/stubs/txredisapi.pyi @@ -63,8 +63,7 @@ class ConnectionHandler(RedisProtocol): def disconnect(self) -> "Deferred[None]": ... def __repr__(self) -> str: ... -class UnixConnectionHandler(ConnectionHandler): - ... +class UnixConnectionHandler(ConnectionHandler): ... class RedisFactory(protocol.ReconnectingClientFactory): continueTrying: bool diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index b40b0d6d90e5..5d108fe11b79 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -360,25 +360,24 @@ def start_replication(self, hs: "HomeServer") -> None: checkPID=False, ) + elif hs.config.redis.redis_use_tls: + ssl_context_factory = ClientContextFactory(hs.config.redis) + reactor.connectSSL( + redis_config.redis_host, + redis_config.redis_port, + self._factory, + ssl_context_factory, + timeout=30, + bindAddress=None, + ) else: - if hs.config.redis.redis_use_tls: - ssl_context_factory = ClientContextFactory(hs.config.redis) - reactor.connectSSL( - redis_config.redis_host, - redis_config.redis_port, - self._factory, - ssl_context_factory, - timeout=30, - bindAddress=None, - ) - else: - reactor.connectTCP( - redis_config.redis_host, - redis_config.redis_port, - self._factory, - timeout=30, - bindAddress=None, - ) + reactor.connectTCP( + redis_config.redis_host, + redis_config.redis_port, + self._factory, + timeout=30, + bindAddress=None, + ) def get_streams(self) -> Dict[str, Stream]: """Get a map from stream name to all streams."""