Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add Unix socket support for Redis connections #15644

Merged
merged 7 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15644.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add Unix socket support for Redis connections. Contributed by Jason Little.
4 changes: 4 additions & 0 deletions docs/usage/configuration/config_documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -3979,6 +3979,8 @@ This setting has the following sub-options:
* `enabled`: whether to use Redis support. Defaults to false.
* `host` and `port`: Optional host and port to use to connect to redis. Defaults to
localhost and 6379
* `path`: The full path to a local Unix socket file. **If this is used, `host` and
`port` are ignored.** Defaults to `/tmp/redis.sock'
* `password`: Optional password if configured on the Redis instance.
* `dbid`: Optional redis dbid if needs to connect to specific redis logical db.
* `use_tls`: Whether to use tls connection. Defaults to false.
Expand All @@ -3991,6 +3993,8 @@ This setting has the following sub-options:

_Changed in Synapse 1.84.0: Added use\_tls, certificate\_file, private\_key\_file, ca\_file and ca\_path attributes_

_Changed in Synapse 1.85.0: Added path option to use a local Unix socket_

Example configuration:
```yaml
redis:
Expand Down
3 changes: 3 additions & 0 deletions stubs/txredisapi.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ def lazyConnection(
# most methods to it via ConnectionHandler.__getattr__.
class ConnectionHandler(RedisProtocol):
def disconnect(self) -> "Deferred[None]": ...
def __repr__(self) -> str: ...

class UnixConnectionHandler(ConnectionHandler): ...

class RedisFactory(protocol.ReconnectingClientFactory):
continueTrying: bool
Expand Down
1 change: 1 addition & 0 deletions synapse/config/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:

self.redis_host = redis_config.get("host", "localhost")
self.redis_port = redis_config.get("port", 6379)
self.redis_path = redis_config.get("path", None)
clokep marked this conversation as resolved.
Show resolved Hide resolved
self.redis_dbid = redis_config.get("dbid", None)
self.redis_password = redis_config.get("password")

Expand Down
10 changes: 9 additions & 1 deletion synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,15 @@ def start_replication(self, hs: "HomeServer") -> None:

reactor = hs.get_reactor()
redis_config = hs.config.redis
if hs.config.redis.redis_use_tls:
if redis_config.redis_path is not None:
clokep marked this conversation as resolved.
Show resolved Hide resolved
reactor.connectUNIX(
redis_config.redis_path,
self._factory,
timeout=30,
checkPID=False,
)

elif hs.config.redis.redis_use_tls:
ssl_context_factory = ClientContextFactory(hs.config.redis)
reactor.connectSSL(
redis_config.redis_host,
Expand Down
62 changes: 54 additions & 8 deletions synapse/replication/tcp/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
from typing import TYPE_CHECKING, Any, Generic, List, Optional, Type, TypeVar, cast

import attr
import txredisapi
from txredisapi import (
ConnectionHandler,
RedisFactory,
SubscriberProtocol,
UnixConnectionHandler,
)
from zope.interface import implementer

from twisted.internet.address import IPv4Address, IPv6Address
Expand Down Expand Up @@ -68,7 +73,7 @@ def __set__(self, obj: Optional[T], value: V) -> None:


@implementer(IReplicationConnection)
class RedisSubscriber(txredisapi.SubscriberProtocol):
class RedisSubscriber(SubscriberProtocol):
"""Connection to redis subscribed to replication stream.
This class fulfils two functions:
Expand All @@ -95,7 +100,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
synapse_handler: "ReplicationCommandHandler"
synapse_stream_prefix: str
synapse_channel_names: List[str]
synapse_outbound_redis_connection: txredisapi.ConnectionHandler
synapse_outbound_redis_connection: ConnectionHandler

def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -229,7 +234,7 @@ async def _async_send_command(self, cmd: Command) -> None:
)


class SynapseRedisFactory(txredisapi.RedisFactory):
class SynapseRedisFactory(RedisFactory):
"""A subclass of RedisFactory that periodically sends pings to ensure that
we detect dead connections.
"""
Expand All @@ -245,7 +250,7 @@ def __init__(
dbid: Optional[int],
poolsize: int,
isLazy: bool = False,
handler: Type = txredisapi.ConnectionHandler,
handler: Type = ConnectionHandler,
charset: str = "utf-8",
password: Optional[str] = None,
replyTimeout: int = 30,
Expand Down Expand Up @@ -326,7 +331,7 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
def __init__(
self,
hs: "HomeServer",
outbound_redis_connection: txredisapi.ConnectionHandler,
outbound_redis_connection: ConnectionHandler,
channel_names: List[str],
):
super().__init__(
Expand Down Expand Up @@ -368,7 +373,7 @@ def lazyConnection(
reconnect: bool = True,
password: Optional[str] = None,
replyTimeout: int = 30,
) -> txredisapi.ConnectionHandler:
) -> ConnectionHandler:
"""Creates a connection to Redis that is lazily set up and reconnects if the
connections is lost.
"""
Expand All @@ -380,7 +385,7 @@ def lazyConnection(
dbid=dbid,
poolsize=1,
isLazy=True,
handler=txredisapi.ConnectionHandler,
handler=ConnectionHandler,
password=password,
replyTimeout=replyTimeout,
)
Expand Down Expand Up @@ -408,3 +413,44 @@ def lazyConnection(
)

return factory.handler


def lazyUnixConnection(
hs: "HomeServer",
path: str = "/tmp/redis.sock",
dbid: Optional[int] = None,
reconnect: bool = True,
password: Optional[str] = None,
replyTimeout: int = 30,
) -> ConnectionHandler:
"""Creates a connection to Redis that is lazily set up and reconnects if the
connection is lost.
Returns:
A subclass of ConnectionHandler, which is a UnixConnectionHandler in this case.
"""

uuid = path

factory = SynapseRedisFactory(
hs,
uuid=uuid,
dbid=dbid,
poolsize=1,
isLazy=True,
handler=UnixConnectionHandler,
password=password,
replyTimeout=replyTimeout,
)
factory.continueTrying = reconnect

reactor = hs.get_reactor()

reactor.connectUNIX(
path,
factory,
timeout=30,
checkPID=False,
)

return factory.handler
42 changes: 28 additions & 14 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -864,22 +864,36 @@ def get_outbound_redis_connection(self) -> "ConnectionHandler":

# We only want to import redis module if we're using it, as we have
# `txredisapi` as an optional dependency.
from synapse.replication.tcp.redis import lazyConnection
from synapse.replication.tcp.redis import lazyConnection, lazyUnixConnection

logger.info(
"Connecting to redis (host=%r port=%r) for external cache",
self.config.redis.redis_host,
self.config.redis.redis_port,
)
if self.config.redis.redis_path is None:
logger.info(
"Connecting to redis (host=%r port=%r) for external cache",
self.config.redis.redis_host,
self.config.redis.redis_port,
)

return lazyConnection(
hs=self,
host=self.config.redis.redis_host,
port=self.config.redis.redis_port,
dbid=self.config.redis.redis_dbid,
password=self.config.redis.redis_password,
reconnect=True,
)
return lazyConnection(
hs=self,
host=self.config.redis.redis_host,
port=self.config.redis.redis_port,
dbid=self.config.redis.redis_dbid,
password=self.config.redis.redis_password,
reconnect=True,
)
else:
logger.info(
"Connecting to redis (path=%r) for external cache",
self.config.redis.redis_path,
)

return lazyUnixConnection(
hs=self,
path=self.config.redis.redis_path,
dbid=self.config.redis.redis_dbid,
password=self.config.redis.redis_password,
reconnect=True,
)

def should_send_federation(self) -> bool:
"Should this server be sending federation traffic directly?"
Expand Down