From 5a53a1ad08c3f9ab1495b9431396480f31c2d7f9 Mon Sep 17 00:00:00 2001 From: Jason Little Date: Tue, 11 Jul 2023 13:08:06 -0500 Subject: [PATCH] Unix Sockets for HTTP Replication (#15708) Unix socket support for `federation` and `client` Listeners has existed now for a little while(since [1.81.0](https://github.com/matrix-org/synapse/pull/15353)), but there was one last hold out before it could be complete: HTTP Replication communication. This should finish it up. The Listeners would have always worked, but would have had no way to be talked to/at. --------- Co-authored-by: Eric Eastwood Co-authored-by: Olivier Wilkinson (reivilibre) Co-authored-by: Eric Eastwood --- changelog.d/15708.feature | 1 + docker/conf-workers/nginx.conf.j2 | 4 + docker/conf-workers/shared.yaml.j2 | 3 + docker/conf-workers/supervisord.conf.j2 | 4 + docker/conf-workers/worker.yaml.j2 | 4 + docker/conf/homeserver.yaml | 10 +- docker/configure_workers_and_start.py | 104 +++++++++++++----- docs/development/contributing_guide.md | 1 + .../configuration/config_documentation.md | 52 ++++++++- docs/workers.md | 9 +- scripts-dev/complement.sh | 4 + synapse/config/workers.py | 24 +++- synapse/http/replicationagent.py | 47 +++++--- synapse/logging/opentracing.py | 6 +- tests/replication/_base.py | 7 +- tests/server.py | 32 +++++- 16 files changed, 260 insertions(+), 52 deletions(-) create mode 100644 changelog.d/15708.feature diff --git a/changelog.d/15708.feature b/changelog.d/15708.feature new file mode 100644 index 000000000000..06a6c959ab56 --- /dev/null +++ b/changelog.d/15708.feature @@ -0,0 +1 @@ +Add Unix Socket support for HTTP Replication Listeners. Document and provide usage instructions for utilizing Unix sockets in Synapse. Contributed by Jason Little. diff --git a/docker/conf-workers/nginx.conf.j2 b/docker/conf-workers/nginx.conf.j2 index 967fc65e798c..d1e02af72328 100644 --- a/docker/conf-workers/nginx.conf.j2 +++ b/docker/conf-workers/nginx.conf.j2 @@ -35,7 +35,11 @@ server { # Send all other traffic to the main process location ~* ^(\\/_matrix|\\/_synapse) { +{% if using_unix_sockets %} + proxy_pass http://unix:/run/main_public.sock; +{% else %} proxy_pass http://localhost:8080; +{% endif %} proxy_set_header X-Forwarded-For $remote_addr; proxy_set_header X-Forwarded-Proto $scheme; proxy_set_header Host $host; diff --git a/docker/conf-workers/shared.yaml.j2 b/docker/conf-workers/shared.yaml.j2 index 92d25386dc34..1dfc60ad1104 100644 --- a/docker/conf-workers/shared.yaml.j2 +++ b/docker/conf-workers/shared.yaml.j2 @@ -6,6 +6,9 @@ {% if enable_redis %} redis: enabled: true + {% if using_unix_sockets %} + path: /tmp/redis.sock + {% endif %} {% endif %} {% if appservice_registrations is not none %} diff --git a/docker/conf-workers/supervisord.conf.j2 b/docker/conf-workers/supervisord.conf.j2 index 9f1e03cfc0a2..da9335805175 100644 --- a/docker/conf-workers/supervisord.conf.j2 +++ b/docker/conf-workers/supervisord.conf.j2 @@ -19,7 +19,11 @@ username=www-data autorestart=true [program:redis] +{% if using_unix_sockets %} +command=/usr/local/bin/prefix-log /usr/local/bin/redis-server --unixsocket /tmp/redis.sock +{% else %} command=/usr/local/bin/prefix-log /usr/local/bin/redis-server +{% endif %} priority=1 stdout_logfile=/dev/stdout stdout_logfile_maxbytes=0 diff --git a/docker/conf-workers/worker.yaml.j2 b/docker/conf-workers/worker.yaml.j2 index 44c6e413cf94..29ec74b4ea04 100644 --- a/docker/conf-workers/worker.yaml.j2 +++ b/docker/conf-workers/worker.yaml.j2 @@ -8,7 +8,11 @@ worker_name: "{{ name }}" worker_listeners: - type: http +{% if using_unix_sockets %} + path: "/run/worker.{{ port }}" +{% else %} port: {{ port }} +{% endif %} {% if listener_resources %} resources: - names: diff --git a/docker/conf/homeserver.yaml b/docker/conf/homeserver.yaml index f10f78a48cd2..c46b955d6353 100644 --- a/docker/conf/homeserver.yaml +++ b/docker/conf/homeserver.yaml @@ -36,12 +36,17 @@ listeners: # Allow configuring in case we want to reverse proxy 8008 # using another process in the same container +{% if SYNAPSE_USE_UNIX_SOCKET %} + # Unix sockets don't care about TLS or IP addresses or ports + - path: '/run/main_public.sock' + type: http +{% else %} - port: {{ SYNAPSE_HTTP_PORT or 8008 }} tls: false bind_addresses: ['::'] type: http x_forwarded: false - +{% endif %} resources: - names: [client] compress: true @@ -57,8 +62,11 @@ database: user: "{{ POSTGRES_USER or "synapse" }}" password: "{{ POSTGRES_PASSWORD }}" database: "{{ POSTGRES_DB or "synapse" }}" +{% if not SYNAPSE_USE_UNIX_SOCKET %} +{# Synapse will use a default unix socket for Postgres when host/port is not specified (behavior from `psycopg2`). #} host: "{{ POSTGRES_HOST or "db" }}" port: "{{ POSTGRES_PORT or "5432" }}" +{% endif %} cp_min: 5 cp_max: 10 {% else %} diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 62fb88daabb5..dc824038b553 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -74,6 +74,9 @@ MAIN_PROCESS_INSTANCE_NAME = "main" MAIN_PROCESS_LOCALHOST_ADDRESS = "127.0.0.1" MAIN_PROCESS_REPLICATION_PORT = 9093 +# Obviously, these would only be used with the UNIX socket option +MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH = "/run/main_public.sock" +MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock" # A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced # during processing with the name of the worker. @@ -407,11 +410,15 @@ def add_worker_roles_to_shared_config( ) # Map of stream writer instance names to host/ports combos - instance_map[worker_name] = { - "host": "localhost", - "port": worker_port, - } - + if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False): + instance_map[worker_name] = { + "path": f"/run/worker.{worker_port}", + } + else: + instance_map[worker_name] = { + "host": "localhost", + "port": worker_port, + } # Update the list of stream writers. It's convenient that the name of the worker # type is the same as the stream to write. Iterate over the whole list in case there # is more than one. @@ -423,10 +430,15 @@ def add_worker_roles_to_shared_config( # Map of stream writer instance names to host/ports combos # For now, all stream writers need http replication ports - instance_map[worker_name] = { - "host": "localhost", - "port": worker_port, - } + if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False): + instance_map[worker_name] = { + "path": f"/run/worker.{worker_port}", + } + else: + instance_map[worker_name] = { + "host": "localhost", + "port": worker_port, + } def merge_worker_template_configs( @@ -718,17 +730,29 @@ def generate_worker_files( # Note that yaml cares about indentation, so care should be taken to insert lines # into files at the correct indentation below. + # Convenience helper for if using unix sockets instead of host:port + using_unix_sockets = environ.get("SYNAPSE_USE_UNIX_SOCKET", False) # First read the original config file and extract the listeners block. Then we'll # add another listener for replication. Later we'll write out the result to the # shared config file. - listeners = [ - { - "port": MAIN_PROCESS_REPLICATION_PORT, - "bind_address": MAIN_PROCESS_LOCALHOST_ADDRESS, - "type": "http", - "resources": [{"names": ["replication"]}], - } - ] + listeners: List[Any] + if using_unix_sockets: + listeners = [ + { + "path": MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH, + "type": "http", + "resources": [{"names": ["replication"]}], + } + ] + else: + listeners = [ + { + "port": MAIN_PROCESS_REPLICATION_PORT, + "bind_address": MAIN_PROCESS_LOCALHOST_ADDRESS, + "type": "http", + "resources": [{"names": ["replication"]}], + } + ] with open(config_path) as file_stream: original_config = yaml.safe_load(file_stream) original_listeners = original_config.get("listeners") @@ -769,7 +793,17 @@ def generate_worker_files( # A list of internal endpoints to healthcheck, starting with the main process # which exists even if no workers do. - healthcheck_urls = ["http://localhost:8080/health"] + # This list ends up being part of the command line to curl, (curl added support for + # Unix sockets in version 7.40). + if using_unix_sockets: + healthcheck_urls = [ + f"--unix-socket {MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH} " + # The scheme and hostname from the following URL are ignored. + # The only thing that matters is the path `/health` + "http://localhost/health" + ] + else: + healthcheck_urls = ["http://localhost:8080/health"] # Get the set of all worker types that we have configured all_worker_types_in_use = set(chain(*requested_worker_types.values())) @@ -806,8 +840,12 @@ def generate_worker_files( # given worker_type needs to stay assigned and not be replaced. worker_config["shared_extra_conf"].update(shared_config) shared_config = worker_config["shared_extra_conf"] - - healthcheck_urls.append("http://localhost:%d/health" % (worker_port,)) + if using_unix_sockets: + healthcheck_urls.append( + f"--unix-socket /run/worker.{worker_port} http://localhost/health" + ) + else: + healthcheck_urls.append("http://localhost:%d/health" % (worker_port,)) # Update the shared config with sharding-related options if necessary add_worker_roles_to_shared_config( @@ -826,6 +864,7 @@ def generate_worker_files( "/conf/workers/{name}.yaml".format(name=worker_name), **worker_config, worker_log_config_filepath=log_config_filepath, + using_unix_sockets=using_unix_sockets, ) # Save this worker's port number to the correct nginx upstreams @@ -846,8 +885,13 @@ def generate_worker_files( nginx_upstream_config = "" for upstream_worker_base_name, upstream_worker_ports in nginx_upstreams.items(): body = "" - for port in upstream_worker_ports: - body += f" server localhost:{port};\n" + if using_unix_sockets: + for port in upstream_worker_ports: + body += f" server unix:/run/worker.{port};\n" + + else: + for port in upstream_worker_ports: + body += f" server localhost:{port};\n" # Add to the list of configured upstreams nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format( @@ -877,10 +921,15 @@ def generate_worker_files( # If there are workers, add the main process to the instance_map too. if workers_in_use: instance_map = shared_config.setdefault("instance_map", {}) - instance_map[MAIN_PROCESS_INSTANCE_NAME] = { - "host": MAIN_PROCESS_LOCALHOST_ADDRESS, - "port": MAIN_PROCESS_REPLICATION_PORT, - } + if using_unix_sockets: + instance_map[MAIN_PROCESS_INSTANCE_NAME] = { + "path": MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH, + } + else: + instance_map[MAIN_PROCESS_INSTANCE_NAME] = { + "host": MAIN_PROCESS_LOCALHOST_ADDRESS, + "port": MAIN_PROCESS_REPLICATION_PORT, + } # Shared homeserver config convert( @@ -890,6 +939,7 @@ def generate_worker_files( appservice_registrations=appservice_registrations, enable_redis=workers_in_use, workers_in_use=workers_in_use, + using_unix_sockets=using_unix_sockets, ) # Nginx config @@ -900,6 +950,7 @@ def generate_worker_files( upstream_directives=nginx_upstream_config, tls_cert_path=os.environ.get("SYNAPSE_TLS_CERT"), tls_key_path=os.environ.get("SYNAPSE_TLS_KEY"), + using_unix_sockets=using_unix_sockets, ) # Supervisord config @@ -909,6 +960,7 @@ def generate_worker_files( "/etc/supervisor/supervisord.conf", main_config_path=config_path, enable_redis=workers_in_use, + using_unix_sockets=using_unix_sockets, ) convert( diff --git a/docs/development/contributing_guide.md b/docs/development/contributing_guide.md index e9210b177695..698687b91f2f 100644 --- a/docs/development/contributing_guide.md +++ b/docs/development/contributing_guide.md @@ -370,6 +370,7 @@ The above will run a monolithic (single-process) Synapse with SQLite as the data See the [worker documentation](../workers.md) for additional information on workers. - Passing `ASYNCIO_REACTOR=1` as an environment variable to use the Twisted asyncio reactor instead of the default one. - Passing `PODMAN=1` will use the [podman](https://podman.io/) container runtime, instead of docker. +- Passing `UNIX_SOCKETS=1` will utilise Unix socket functionality for Synapse, Redis, and Postgres(when applicable). To increase the log level for the tests, set `SYNAPSE_TEST_LOG_LEVEL`, e.g: ```sh diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index ff59cbccc19a..d9286e83bce9 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -462,6 +462,20 @@ See the docs [request log format](../administration/request_log.md). * `additional_resources`: Only valid for an 'http' listener. A map of additional endpoints which should be loaded via dynamic modules. +Unix socket support (_Added in Synapse 1.88.0_): +* `path`: A path and filename for a Unix socket. Make sure it is located in a + directory with read and write permissions, and that it already exists (the directory + will not be created). Defaults to `None`. + * **Note**: The use of both `path` and `port` options for the same `listener` is not + compatible. + * The `x_forwarded` option defaults to true when using Unix sockets and can be omitted. + * Other options that would not make sense to use with a UNIX socket, such as + `bind_addresses` and `tls` will be ignored and can be removed. +* `mode`: The file permissions to set on the UNIX socket. Defaults to `666` +* **Note:** Must be set as `type: http` (does not support `metrics` and `manhole`). + Also make sure that `metrics` is not included in `resources` -> `names` + + Valid resource names are: * `client`: the client-server API (/_matrix/client), and the synapse admin API (/_synapse/admin). Also implies `media` and `static`. @@ -474,7 +488,7 @@ Valid resource names are: * `media`: the media API (/_matrix/media). -* `metrics`: the metrics interface. See [here](../../metrics-howto.md). +* `metrics`: the metrics interface. See [here](../../metrics-howto.md). (Not compatible with Unix sockets) * `openid`: OpenID authentication. See [here](../../openid.md). @@ -533,6 +547,22 @@ listeners: bind_addresses: ['::1', '127.0.0.1'] type: manhole ``` +Example configuration #3: +```yaml +listeners: + # Unix socket listener: Ideal for Synapse deployments behind a reverse proxy, offering + # lightweight interprocess communication without TCP/IP overhead, avoid port + # conflicts, and providing enhanced security through system file permissions. + # + # Note that x_forwarded will default to true, when using a UNIX socket. Please see + # https://matrix-org.github.io/synapse/latest/reverse_proxy.html. + # + - path: /var/run/synapse/main_public.sock + type: http + resources: + - names: [client, federation] +``` + --- ### `manhole_settings` @@ -3949,6 +3979,14 @@ instance_map: host: localhost port: 8034 ``` +Example configuration(#2, for UNIX sockets): +```yaml +instance_map: + main: + path: /var/run/synapse/main_replication.sock + worker1: + path: /var/run/synapse/worker1_replication.sock +``` --- ### `stream_writers` @@ -4108,6 +4146,18 @@ worker_listeners: resources: - names: [client, federation] ``` +Example configuration(#2, using UNIX sockets with a `replication` listener): +```yaml +worker_listeners: + - type: http + path: /var/run/synapse/worker_public.sock + resources: + - names: [client, federation] + - type: http + path: /var/run/synapse/worker_replication.sock + resources: + - names: [replication] +``` --- ### `worker_manhole` diff --git a/docs/workers.md b/docs/workers.md index 828f082e7551..735cd3f18d2d 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -95,9 +95,12 @@ for the main process * Secondly, you need to enable [redis-based replication](usage/configuration/config_documentation.md#redis) * You will need to add an [`instance_map`](usage/configuration/config_documentation.md#instance_map) -with the `main` process defined, as well as the relevant connection information from -it's HTTP `replication` listener (defined in step 1 above). Note that the `host` defined -is the address the worker needs to look for the `main` process at, not necessarily the same address that is bound to. +with the `main` process defined, as well as the relevant connection information from +it's HTTP `replication` listener (defined in step 1 above). + * Note that the `host` defined is the address the worker needs to look for the `main` + process at, not necessarily the same address that is bound to. + * If you are using Unix sockets for the `replication` resource, make sure to + use a `path` to the socket file instead of a `port`. * Optionally, a [shared secret](usage/configuration/config_documentation.md#worker_replication_secret) can be used to authenticate HTTP traffic between workers. For example: diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh index 24b83cfeb605..fea76cb5afb6 100755 --- a/scripts-dev/complement.sh +++ b/scripts-dev/complement.sh @@ -253,6 +253,10 @@ if [[ -n "$ASYNCIO_REACTOR" ]]; then export PASS_SYNAPSE_COMPLEMENT_USE_ASYNCIO_REACTOR=true fi +if [[ -n "$UNIX_SOCKETS" ]]; then + # Enable full on Unix socket mode for Synapse, Redis and Postgresql + export PASS_SYNAPSE_USE_UNIX_SOCKET=1 +fi if [[ -n "$SYNAPSE_TEST_LOG_LEVEL" ]]; then # Set the log level to what is desired diff --git a/synapse/config/workers.py b/synapse/config/workers.py index ccfe75eaf3a6..e55ca12a3627 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -94,7 +94,7 @@ class Config: allow_mutation = False -class InstanceLocationConfig(ConfigModel): +class InstanceTcpLocationConfig(ConfigModel): """The host and port to talk to an instance via HTTP replication.""" host: StrictStr @@ -110,6 +110,23 @@ def netloc(self) -> str: return f"{self.host}:{self.port}" +class InstanceUnixLocationConfig(ConfigModel): + """The socket file to talk to an instance via HTTP replication.""" + + path: StrictStr + + def scheme(self) -> str: + """Hardcode a retrievable scheme""" + return "unix" + + def netloc(self) -> str: + """Nicely format the address location data""" + return f"{self.path}" + + +InstanceLocationConfig = Union[InstanceTcpLocationConfig, InstanceUnixLocationConfig] + + @attr.s class WriterLocations: """Specifies the instances that write various streams. @@ -270,9 +287,12 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: % MAIN_PROCESS_INSTANCE_MAP_NAME ) + # type-ignore: the expression `Union[A, B]` is not a Type[Union[A, B]] currently self.instance_map: Dict[ str, InstanceLocationConfig - ] = parse_and_validate_mapping(instance_map, InstanceLocationConfig) + ] = parse_and_validate_mapping( + instance_map, InstanceLocationConfig # type: ignore[arg-type] + ) # Map from type of streams to source, c.f. WriterLocations. writers = config.get("stream_writers") or {} diff --git a/synapse/http/replicationagent.py b/synapse/http/replicationagent.py index d6ba6f0e577d..3ba2f22dfdc1 100644 --- a/synapse/http/replicationagent.py +++ b/synapse/http/replicationagent.py @@ -18,7 +18,11 @@ from zope.interface import implementer from twisted.internet import defer -from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS +from twisted.internet.endpoints import ( + HostnameEndpoint, + UNIXClientEndpoint, + wrapClientTLS, +) from twisted.internet.interfaces import IStreamClientEndpoint from twisted.python.failure import Failure from twisted.web.client import URI, HTTPConnectionPool, _AgentBase @@ -32,7 +36,11 @@ IResponse, ) -from synapse.config.workers import InstanceLocationConfig +from synapse.config.workers import ( + InstanceLocationConfig, + InstanceTcpLocationConfig, + InstanceUnixLocationConfig, +) from synapse.types import ISynapseReactor logger = logging.getLogger(__name__) @@ -40,7 +48,7 @@ @implementer(IAgentEndpointFactory) class ReplicationEndpointFactory: - """Connect to a given TCP socket""" + """Connect to a given TCP or UNIX socket""" def __init__( self, @@ -64,24 +72,27 @@ def endpointForURI(self, uri: URI) -> IStreamClientEndpoint: # The given URI has a special scheme and includes the worker name. The # actual connection details are pulled from the instance map. worker_name = uri.netloc.decode("utf-8") - scheme = self.instance_map[worker_name].scheme() + location_config = self.instance_map[worker_name] + scheme = location_config.scheme() - if scheme in ("http", "https"): + if isinstance(location_config, InstanceTcpLocationConfig): endpoint = HostnameEndpoint( self.reactor, - self.instance_map[worker_name].host, - self.instance_map[worker_name].port, + location_config.host, + location_config.port, ) if scheme == "https": endpoint = wrapClientTLS( # The 'port' argument below isn't actually used by the function self.context_factory.creatorForNetloc( - self.instance_map[worker_name].host.encode("utf-8"), - self.instance_map[worker_name].port, + location_config.host.encode("utf-8"), + location_config.port, ), endpoint, ) return endpoint + elif isinstance(location_config, InstanceUnixLocationConfig): + return UNIXClientEndpoint(self.reactor, location_config.path) else: raise SchemeNotSupported(f"Unsupported scheme: {scheme}") @@ -138,13 +149,16 @@ def request( An existing connection from the connection pool may be used or a new one may be created. - Currently, HTTP and HTTPS schemes are supported in uri. + Currently, HTTP, HTTPS and UNIX schemes are supported in uri. This is copied from twisted.web.client.Agent, except: - * It uses a different pool key (combining the host & port). - * It does not call _ensureValidURI(...) since it breaks on some - UNIX paths. + * It uses a different pool key (combining the scheme with either host & port or + socket path). + * It does not call _ensureValidURI(...) as the strictness of IDNA2008 is not + required when using a worker's name as a 'hostname' for Synapse HTTP + Replication machinery. Specifically, this allows a range of ascii characters + such as '+' and '_' in hostnames/worker's names. See: twisted.web.iweb.IAgent.request """ @@ -154,9 +168,12 @@ def request( except SchemeNotSupported: return defer.fail(Failure()) + worker_name = parsedURI.netloc.decode("utf-8") + key_scheme = self._endpointFactory.instance_map[worker_name].scheme() + key_netloc = self._endpointFactory.instance_map[worker_name].netloc() # This sets the Pool key to be: - # (http(s), ) - key = (parsedURI.scheme, parsedURI.netloc) + # (http(s), ) or (unix, ) + key = (key_scheme, key_netloc) # _requestWithEndpoint comes from _AgentBase class return self._requestWithEndpoint( diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index 75217e3f45bb..be910128aa4e 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -1070,7 +1070,7 @@ def trace_servlet( tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER, tags.HTTP_METHOD: request.get_method(), tags.HTTP_URL: request.get_redacted_uri(), - tags.PEER_HOST_IPV6: request.getClientAddress().host, + tags.PEER_HOST_IPV6: request.get_client_ip_if_available(), } request_name = request.request_metrics.name @@ -1091,9 +1091,11 @@ def trace_servlet( # with JsonResource). scope.span.set_operation_name(request.request_metrics.name) + # Mypy seems to think that start_context.tag below can be Optional[str], but + # that doesn't appear to be correct and works in practice. request_tags[ SynapseTags.REQUEST_TAG - ] = request.request_metrics.start_context.tag + ] = request.request_metrics.start_context.tag # type: ignore[assignment] # set the tags *after* the servlet completes, in case it decided to # prioritise the span (tags will get dropped on unprioritised spans) diff --git a/tests/replication/_base.py b/tests/replication/_base.py index eb9b1f1cd9be..39aadb9ed5c3 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -22,6 +22,7 @@ from twisted.web.resource import Resource from synapse.app.generic_worker import GenericWorkerServer +from synapse.config.workers import InstanceTcpLocationConfig, InstanceUnixLocationConfig from synapse.http.site import SynapseRequest, SynapseSite from synapse.replication.http import ReplicationRestResource from synapse.replication.tcp.client import ReplicationDataHandler @@ -339,7 +340,7 @@ def make_worker_hs( # `_handle_http_replication_attempt` like we do with the master HS. instance_name = worker_hs.get_instance_name() instance_loc = worker_hs.config.worker.instance_map.get(instance_name) - if instance_loc: + if instance_loc and isinstance(instance_loc, InstanceTcpLocationConfig): # Ensure the host is one that has a fake DNS entry. if instance_loc.host not in self.reactor.lookups: raise Exception( @@ -360,6 +361,10 @@ def make_worker_hs( instance_loc.port, lambda: self._handle_http_replication_attempt(worker_hs, port), ) + elif instance_loc and isinstance(instance_loc, InstanceUnixLocationConfig): + raise Exception( + "Unix sockets are not supported for unit tests at this time." + ) store = worker_hs.get_datastores().main store.db_pool._db_pool = self.database_pool._db_pool diff --git a/tests/server.py b/tests/server.py index a12c3e3b9a09..c84a524e8ce7 100644 --- a/tests/server.py +++ b/tests/server.py @@ -53,6 +53,7 @@ IConnector, IConsumer, IHostnameResolver, + IListeningPort, IProducer, IProtocol, IPullProducer, @@ -62,7 +63,7 @@ IResolverSimple, ITransport, ) -from twisted.internet.protocol import ClientFactory, DatagramProtocol +from twisted.internet.protocol import ClientFactory, DatagramProtocol, Factory from twisted.python import threadpool from twisted.python.failure import Failure from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactorClock @@ -523,6 +524,35 @@ def add_tcp_client_callback( """ self._tcp_callbacks[(host, port)] = callback + def connectUNIX( + self, + address: str, + factory: ClientFactory, + timeout: float = 30, + checkPID: int = 0, + ) -> IConnector: + """ + Unix sockets aren't supported for unit tests yet. Make it obvious to any + developer trying it out that they will need to do some work before being able + to use it in tests. + """ + raise Exception("Unix sockets are not implemented for tests yet, sorry.") + + def listenUNIX( + self, + address: str, + factory: Factory, + backlog: int = 50, + mode: int = 0o666, + wantPID: int = 0, + ) -> IListeningPort: + """ + Unix sockets aren't supported for unit tests yet. Make it obvious to any + developer trying it out that they will need to do some work before being able + to use it in tests. + """ + raise Exception("Unix sockets are not implemented for tests, sorry") + def connectTCP( self, host: str,