From 9a7f925a848322a91b524cbe3d7e83a2a4303000 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 23 Feb 2023 23:55:53 +0000 Subject: [PATCH 01/12] Pull out _FetchKeyRequest This will help to break an import cycle --- synapse/crypto/keyring.py | 20 +------------------- synapse/crypto/types.py | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 19 deletions(-) create mode 100644 synapse/crypto/types.py diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 86cd4af9bd5a..8e8dbaaf55a1 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -41,6 +41,7 @@ SynapseError, ) from synapse.config.key import TrustedKeyServer +from synapse.crypto.types import _FetchKeyRequest from synapse.events import EventBase from synapse.events.utils import prune_event_dict from synapse.logging.context import make_deferred_yieldable, run_in_background @@ -123,25 +124,6 @@ class KeyLookupError(ValueError): pass -@attr.s(slots=True, frozen=True, auto_attribs=True) -class _FetchKeyRequest: - """A request for keys for a given server. - - We will continue to try and fetch until we have all the keys listed under - `key_ids` (with an appropriate `valid_until_ts` property) or we run out of - places to fetch keys from. - - Attributes: - server_name: The name of the server that owns the keys. - minimum_valid_until_ts: The timestamp which the keys must be valid until. - key_ids: The IDs of the keys to attempt to fetch - """ - - server_name: str - minimum_valid_until_ts: int - key_ids: List[str] - - class Keyring: """Handles verifying signed JSON objects and fetching the keys needed to do so. diff --git a/synapse/crypto/types.py b/synapse/crypto/types.py new file mode 100644 index 000000000000..2342a8a125c8 --- /dev/null +++ b/synapse/crypto/types.py @@ -0,0 +1,35 @@ +# Copyright 2023- The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import List + +import attr + + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class _FetchKeyRequest: + """A request for keys for a given server. + + We will continue to try and fetch until we have all the keys listed under + `key_ids` (with an appropriate `valid_until_ts` property) or we run out of + places to fetch keys from. + + Attributes: + server_name: The name of the server that owns the keys. + minimum_valid_until_ts: The timestamp which the keys must be valid until. + key_ids: The IDs of the keys to attempt to fetch + """ + + server_name: str + minimum_valid_until_ts: int + key_ids: List[str] From e0841c5d3f01e7796d833baf1df7a1ce0dd27a34 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 23 Feb 2023 23:57:24 +0000 Subject: [PATCH 02/12] Keyring: isolate the keyfetching mechanism So I can call it from federation senders --- synapse/crypto/keyring.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 8e8dbaaf55a1..b5b57378ef02 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -273,9 +273,7 @@ async def process_request(self, verify_request: VerifyJsonRequest) -> None: minimum_valid_until_ts=verify_request.minimum_valid_until_ts, key_ids=list(key_ids_to_find), ) - found_keys_by_server = await self._fetch_keys_queue.add_to_queue( - key_request, key=verify_request.server_name - ) + found_keys_by_server = await self.fetch_keys(key_request) # Since we batch up requests the returned set of keys may contain keys # from other servers, so we pull out only the ones we care about. @@ -302,6 +300,15 @@ async def process_request(self, verify_request: VerifyJsonRequest) -> None: Codes.UNAUTHORIZED, ) + async def fetch_keys( + self, key_request: _FetchKeyRequest + ) -> Dict[str, Dict[str, FetchKeyResult]]: + """Returns: {server name: {key id: fetch key result}}""" + found_keys_by_server = await self._fetch_keys_queue.add_to_queue( + key_request, key=key_request.server_name + ) + return found_keys_by_server + async def _process_json( self, verify_key: VerifyKey, verify_request: VerifyJsonRequest ) -> None: From 0919513c3ade8cc79b714733540079d8b85926bf Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 23 Feb 2023 23:58:20 +0000 Subject: [PATCH 03/12] Define a new replication endpoint --- synapse/replication/http/__init__.py | 2 + synapse/replication/http/keys.py | 123 +++++++++++++++++++++++++++ 2 files changed, 125 insertions(+) create mode 100644 synapse/replication/http/keys.py diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index ac9a92240af2..393e19b01bcb 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -19,6 +19,7 @@ account_data, devices, federation, + keys, login, membership, presence, @@ -52,6 +53,7 @@ def register_servlets(self, hs: "HomeServer") -> None: account_data.register_servlets(hs, self) push.register_servlets(hs, self) state.register_servlets(hs, self) + keys.register_servlets(hs, self) # The following can't currently be instantiated on workers. if hs.config.worker.worker_app is None: diff --git a/synapse/replication/http/keys.py b/synapse/replication/http/keys.py new file mode 100644 index 000000000000..3a053e5acff5 --- /dev/null +++ b/synapse/replication/http/keys.py @@ -0,0 +1,123 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +from typing import TYPE_CHECKING, Dict, List, Tuple + +import attr +from signedjson.key import decode_verify_key_bytes, encode_verify_key_base64 +from unpaddedbase64 import decode_base64 + +from twisted.web.server import Request + +from synapse.crypto.types import _FetchKeyRequest +from synapse.http.server import HttpServer +from synapse.replication.http._base import ReplicationEndpoint +from synapse.storage.keys import FetchKeyResult +from synapse.types import JsonDict +from synapse.util.async_helpers import yieldable_gather_results + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__file__) + + +class ReplicationFetchKeysEndpoint(ReplicationEndpoint): + """Another worker is asking us to fetch keys for a homeserver X. + + The request looks like: + + POST /_synapse/replication/fetch_keys + { + keys_to_fetch: [ + { + "server_name": "example.com", + "minimum_valid_until_ts": 123456, + "key_ids": ["ABC", "DEF"] + } + ] + } + + We would normally return a group of FetchKeyResponse structs like the + normal code path does, but FetchKeyResponse holds a nacl.signing.VerifyKey + which is not JSON-serialisable. Instead, for each requested key we respond + with a boolean: `true` meaning we fetched this key, and `false` meaning we + didn't. + + The response takes the form: + + 200 OK + { + "fetched_count": 1 + } + """ + + NAME = "fetch_keys" + PATH_ARGS = () + METHOD = "POST" + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self._keyring = hs.get_keyring() + + async def _handle_request( # type: ignore[override] + self, + request: Request, + content: JsonDict, + ) -> Tuple[int, JsonDict]: + parsed_requests = [ + _FetchKeyRequest(**entry) for entry in content["keys_to_fetch"] + ] + + results: List[ + Dict[str, Dict[str, FetchKeyResult]] + ] = await yieldable_gather_results( + self._keyring.fetch_keys, + parsed_requests, + ) + + union_of_keys: Dict[str, Dict[str, JsonDict]] = {} + for result in results: + for server_name, keys in result.items(): + serialised_keys = { + key_id: _serialise_fetch_key_result(verify_key) + for key_id, verify_key in keys.items() + } + union_of_keys.setdefault(server_name, {}).update(serialised_keys) + + return 200, {"server_keys": union_of_keys} + + @staticmethod + async def _serialize_payload(*, keys_to_fetch: List[_FetchKeyRequest]) -> JsonDict: # type: ignore[override] + return {"keys_to_fetch": [attr.asdict(key) for key in keys_to_fetch]} + + +def _serialise_fetch_key_result(result: FetchKeyResult) -> JsonDict: + return { + "verify_key": encode_verify_key_base64(result.verify_key), + "valid_until_ts": result.valid_until_ts, + } + + +def deserialise_fetch_key_result(key_id: str, data: JsonDict) -> FetchKeyResult: + return FetchKeyResult( + verify_key=decode_verify_key_bytes(key_id, decode_base64(data["verify_key"])), + valid_until_ts=data["valid_until_ts"], + ) + + +def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: + if hs.config.worker.send_federation: + ReplicationFetchKeysEndpoint(hs).register(http_server) From cdec54468a4fd3c1abbbc32548e4d8ae465d316d Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 23 Feb 2023 23:59:12 +0000 Subject: [PATCH 04/12] Add new KeyFetcher impl --- synapse/crypto/keyring.py | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index b5b57378ef02..278afb591fb4 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -14,6 +14,7 @@ import abc import logging +import random from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Tuple import attr @@ -45,6 +46,10 @@ from synapse.events import EventBase from synapse.events.utils import prune_event_dict from synapse.logging.context import make_deferred_yieldable, run_in_background +from synapse.replication.http.keys import ( + ReplicationFetchKeysEndpoint, + deserialise_fetch_key_result, +) from synapse.storage.keys import FetchKeyResult from synapse.types import JsonDict from synapse.util import unwrapFirstError @@ -892,3 +897,35 @@ async def get_server_verify_keys_v2_direct( response_json=response, time_added_ms=time_now_ms, ) + + +class InternalWorkerRequestKeyFetcher(KeyFetcher): + """Ask a federation_sender worker to request keys for some homeserver X. + + It may choose to do so via a notary or directly from X itself; we don't care. + """ + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + self._federation_shard_config = hs.config.worker.federation_shard_config + self._client = ReplicationFetchKeysEndpoint.make_client(hs) + + async def _fetch_keys( + self, keys_to_fetch: List[_FetchKeyRequest] + ) -> Dict[str, Dict[str, FetchKeyResult]]: + # For simplicity's sake, pick a random federation sender + instance_name = random.choice(self._federation_shard_config.instances) + response = await self._client( + keys_to_fetch=keys_to_fetch, + instance_name=instance_name, + ) + + parsed_response: Dict[str, Dict[str, FetchKeyResult]] = {} + for server_name, keys in response["server_keys"].items(): + deserialised_keys = { + key_id: deserialise_fetch_key_result(key_id, verify_key) + for key_id, verify_key in keys.items() + } + parsed_response.setdefault(server_name, {}).update(deserialised_keys) + + return parsed_response From 8c5067609cb68f2f00a4b1d9011ea278608adcd1 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 24 Feb 2023 00:00:11 +0000 Subject: [PATCH 05/12] Use new KeyFetcher --- synapse/crypto/keyring.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 278afb591fb4..de9a256bf993 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -140,14 +140,22 @@ def __init__( self.clock = hs.get_clock() if key_fetchers is None: - key_fetchers = ( - # Fetch keys from the database. - StoreKeyFetcher(hs), - # Fetch keys from a configured Perspectives server. - PerspectivesKeyFetcher(hs), - # Fetch keys from the origin server directly. - ServerKeyFetcher(hs), - ) + if hs.config.worker.send_federation: + key_fetchers = ( + # Fetch keys from the database. + StoreKeyFetcher(hs), + # Fetch keys from a configured Perspectives server. + PerspectivesKeyFetcher(hs), + # Fetch keys from the origin server directly. + ServerKeyFetcher(hs), + ) + else: + key_fetchers = ( + # Fetch keys from the database. + StoreKeyFetcher(hs), + # Ask a federation sender to fetch the keys for us. + InternalWorkerRequestKeyFetcher(hs), + ) self._key_fetchers = key_fetchers self._fetch_keys_queue: BatchingQueue[ From bab3b58f7a79c030970c84ac25a410a66c806152 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 24 Feb 2023 00:02:45 +0000 Subject: [PATCH 06/12] Comments --- synapse/crypto/keyring.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index de9a256bf993..a5c9d720853a 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -471,6 +471,8 @@ async def _inner_fetch_key_request( class KeyFetcher(metaclass=abc.ABCMeta): + """Abstract gadget for fetching keys to validate other homeservers' signatures.""" + def __init__(self, hs: "HomeServer"): self._queue = BatchingQueue( self.__class__.__name__, hs.get_clock(), self._fetch_keys @@ -492,11 +494,15 @@ async def get_keys( async def _fetch_keys( self, keys_to_fetch: List[_FetchKeyRequest] ) -> Dict[str, Dict[str, FetchKeyResult]]: + """ + Returns: + Map from server_name -> key_id -> FetchKeyResult + """ pass class StoreKeyFetcher(KeyFetcher): - """KeyFetcher impl which fetches keys from our data store""" + """Try to retrieve a previously-fetched key from the DB.""" def __init__(self, hs: "HomeServer"): super().__init__(hs) @@ -520,6 +526,8 @@ async def _fetch_keys( class BaseV2KeyFetcher(KeyFetcher): + """Abstract helper. Fetch keys by requesting it from some server.""" + def __init__(self, hs: "HomeServer"): super().__init__(hs) @@ -622,7 +630,10 @@ async def process_v2_response( class PerspectivesKeyFetcher(BaseV2KeyFetcher): - """KeyFetcher impl which fetches keys from the "perspectives" servers""" + """Fetch keys for some homeserver X by requesting them from a trusted key server Y. + + These trusted key servers were seemingly once known as "perspectives" servers. + """ def __init__(self, hs: "HomeServer"): super().__init__(hs) @@ -805,7 +816,7 @@ def _validate_perspectives_response( class ServerKeyFetcher(BaseV2KeyFetcher): - """KeyFetcher impl which fetches keys from the origin servers""" + """Fetch keys for some homeserver X by requesting them directly from X.""" def __init__(self, hs: "HomeServer"): super().__init__(hs) From 3a9aa533d15d97a23e159a869b29274fb9de889d Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 24 Feb 2023 00:02:54 +0000 Subject: [PATCH 07/12] Test case --- tests/crypto/test_keyring.py | 84 +++++++++++++++++++++++++++++++++++- 1 file changed, 83 insertions(+), 1 deletion(-) diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py index 1b9696748fdc..f414418adb1d 100644 --- a/tests/crypto/test_keyring.py +++ b/tests/crypto/test_keyring.py @@ -25,10 +25,12 @@ from twisted.internet import defer from twisted.internet.defer import Deferred, ensureDeferred from twisted.test.proto_helpers import MemoryReactor +from twisted.web.resource import NoResource, Resource -from synapse.api.errors import SynapseError +from synapse.api.errors import HttpResponseException, SynapseError from synapse.crypto import keyring from synapse.crypto.keyring import ( + InternalWorkerRequestKeyFetcher, PerspectivesKeyFetcher, ServerKeyFetcher, StoreKeyFetcher, @@ -39,12 +41,15 @@ current_context, make_deferred_yieldable, ) +from synapse.rest.key.v2 import KeyResource from synapse.server import HomeServer from synapse.storage.keys import FetchKeyResult from synapse.types import JsonDict from synapse.util import Clock +from synapse.util.httpresourcetree import create_resource_tree from tests import unittest +from tests.replication._base import BaseMultiWorkerStreamTestCase from tests.test_utils import make_awaitable from tests.unittest import logcontext_clean, override_config @@ -757,6 +762,83 @@ def get_key_from_perspectives(response: JsonDict) -> Dict[str, FetchKeyResult]: self.assertEqual(keys, {}, "Expected empty dict with missing origin server sig") +class InternalWorkerRequestKeyFetcherTestCase(BaseMultiWorkerStreamTestCase): + def create_test_resource(self) -> Resource: # type: ignore[override] + return create_resource_tree( + {"/_matrix/key/v2": KeyResource(self.hs)}, root_resource=NoResource() + ) + + def default_config(self) -> Dict[str, Any]: + config = super().default_config() + config.update( + federation_sender_instances=["federation_sender1"], + instance_map={ + "federation_sender1": {"host": "testserv", "port": 1001}, + }, + ) + return config + + def test_key_fetching_works_across_workers(self) -> None: + """Test that a non-fed-sender worker requests keys via a fed-sender.""" + mock_http_client = Mock() + + # 1. Mock out the response from the notary server. + async def mock_post_json(*args: Any, **kwargs: Any) -> JsonDict: + """Mock the request to the notary server.""" + if kwargs.get("path") != "/_matrix/key/v2/query": + raise HttpResponseException(500, "ruh", b"roh") + return {"server_keys": []} + + mock_http_client.post_json = mock_post_json + + # 2. Build a valid response to /_matrix/key/v2/server for the server being + # queried. + SERVER_NAME = "server2" + testkey = signedjson.key.generate_signing_key("ver1") + testverifykey = signedjson.key.get_verify_key(testkey) + testverifykey_id = "ed25519:ver1" + VALID_UNTIL_TS = 200 * 1000 + response = { + "server_name": SERVER_NAME, + "old_verify_keys": {}, + "valid_until_ts": VALID_UNTIL_TS, + "verify_keys": { + testverifykey_id: { + "key": signedjson.key.encode_verify_key_base64(testverifykey) + } + }, + } + signedjson.sign.sign_json(response, SERVER_NAME, testkey) + + async def mock_get_json(*args: Any, **kwargs: Any) -> JsonDict: + if kwargs.get("path") != "/_matrix/key/v2/server": + raise HttpResponseException(500, "ruh", b"roh") + return response + + mock_http_client.get_json = mock_get_json + + # 3. Make a federation homeserver to actually make the request. + self.make_worker_hs( + "synapse.app.generic_worker", + { + "worker_name": "federation_sender1", + "federation_sender_instances": ["federation_sender1"], + }, + federation_http_client=mock_http_client, + ) + + # 4. Use the via-fed-sender fetcher to get keys. + fetcher = InternalWorkerRequestKeyFetcher(self.hs) + keys = self.get_success( + fetcher.get_keys(SERVER_NAME, [testverifykey_id], 0), by=0.1 + ) + k = keys[testverifykey_id] + self.assertEqual(k.valid_until_ts, VALID_UNTIL_TS) + self.assertEqual(k.verify_key, testverifykey) + self.assertEqual(k.verify_key.alg, "ed25519") + self.assertEqual(k.verify_key.version, "ver1") + + def get_key_id(key: SigningKey) -> str: """Get the matrix ID tag for a given SigningKey or VerifyKey""" return "%s:%s" % (key.alg, key.version) From 4e3d2e8b694bce89c4787b4e03c11a78e1dcce1a Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 21 Feb 2023 11:47:38 +0000 Subject: [PATCH 08/12] Changelog --- changelog.d/15121.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/15121.misc diff --git a/changelog.d/15121.misc b/changelog.d/15121.misc new file mode 100644 index 000000000000..b2cb638960b2 --- /dev/null +++ b/changelog.d/15121.misc @@ -0,0 +1 @@ +Route remote key requests via federation senders. From 15b357e6cda2f468ec306bfb985b21e8e7943895 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 24 Feb 2023 11:39:06 +0000 Subject: [PATCH 09/12] Make new replication endpoint accessible in complement --- docker/configure_workers_and_start.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 58c62f2231f3..3525290ed555 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -100,7 +100,7 @@ }, "federation_sender": { "app": "synapse.app.generic_worker", - "listener_resources": [], + "listener_resources": ["replication"], "endpoint_patterns": [], "shared_extra_conf": {}, "worker_extra_conf": "", @@ -345,7 +345,13 @@ def add_worker_roles_to_shared_config( shared_config.setdefault("pusher_instances", []).append(worker_name) elif worker_type == "federation_sender": + # Some outbound federation requests can be routed via federation senders, + # so federation senders need to be accessible by other workers. shared_config.setdefault("federation_sender_instances", []).append(worker_name) + instance_map[worker_name] = { + "host": "localhost", + "port": worker_port, + } elif worker_type == "event_persister": # Event persisters write to the events stream, so we need to update From 5fc4155d2d60e2688a1b20936fe19ee8b073b453 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 24 Feb 2023 17:00:17 +0000 Subject: [PATCH 10/12] Upgrade notes and docs --- docs/upgrade.md | 23 +++++++++++++++++++ .../configuration/config_documentation.md | 10 +++++++- docs/workers.md | 20 +++++++++++++++- 3 files changed, 51 insertions(+), 2 deletions(-) diff --git a/docs/upgrade.md b/docs/upgrade.md index 15167b8c5825..274a4082a7b1 100644 --- a/docs/upgrade.md +++ b/docs/upgrade.md @@ -87,6 +87,29 @@ process, for example: wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb ``` +# Upgrading to v1.79.0 + +## Changes to federation sender config + +_This notice only applies to deployments using multiple workers. Deployments +not using workers are not affected._ + +From Synapse 1.79, only [federation senders]( +https://matrix-org.github.io/synapse/release-v1.79/usage/configuration/config_documentation.html#federation_sender_instances +) will make outgoing key requests to homeservers and [trusted key servers]( +https://matrix-org.github.io/synapse/release-v1.79/usage/configuration/config_documentation.html#trusted_key_servers +). This will make it easier for server operators to reason about how Synapse +communicates with the wider federation. As a consequence, all other workers now +ask federation senders to fetch keys on their behalf. + +To facilitate this, + +- federation senders must now be present in the [instance map]( + https://matrix-org.github.io/synapse/release-v1.79/usage/configuration/config_documentation.html#instance_map + ), and +- federation senders must now run an [http listener]( + https://matrix-org.github.io/synapse/latest/usage/configuration/config_documentation.html#listeners + ) which includes the `replication` resource. # Upgrading to v1.78.0 diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 58c695568972..ca52f4a58b5c 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -3811,7 +3811,7 @@ send_federation: false ### `federation_sender_instances` It is possible to scale the processes that handle sending outbound federation requests -by running a [`generic_worker`](../../workers.md#synapseappgeneric_worker) and adding it's [`worker_name`](#worker_name) to +by running a [`generic_worker`](../../workers.md#synapseappgeneric_worker) and adding its [`worker_name`](#worker_name) to a `federation_sender_instances` map. Doing so will remove handling of this function from the main process. Multiple workers can be added to this map, in which case the work is balanced across them. @@ -3821,6 +3821,10 @@ sending, and if changed all federation sender workers must be stopped at the sam and then started, to ensure that all instances are running with the same config (otherwise events may be dropped). +Federation senders should have a replication [`http` listener](#listeners) +configured, and should be present in the [`instance_map`](#instance_map) +so that other workers can make internal http requests to the federation senders. + Example configuration for a single worker: ```yaml federation_sender_instances: @@ -3832,6 +3836,10 @@ federation_sender_instances: - federation_sender1 - federation_sender2 ``` + +_Changed in Synapse 1.79: Federation senders should now have an http listener +listening for `replication`, and should be present in the `instance_map`._ + --- ### `instance_map` diff --git a/docs/workers.md b/docs/workers.md index 2eb970ffa6a0..5f8cdfb27e1e 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -590,10 +590,18 @@ It is likely this option will be deprecated in the future and not recommended fo new installations. Instead, [use `synapse.app.generic_worker` with the `federation_sender_instances`](usage/configuration/config_documentation.md#federation_sender_instances). Handles sending federation traffic to other servers. Doesn't handle any -REST endpoints itself, but you should set +client-facing REST endpoints itself, but you should set [`send_federation: false`](usage/configuration/config_documentation.md#send_federation) in the shared configuration file to stop the main synapse sending this traffic. +Federation senders should have a replication [`http` listener]( +usage/configuration/config_documentation.md#listeners +) configured, and +should be present in the [`instance_map`]( +usage/configuration/config_documentation.md#instance_map +) so that other workers can make internal +http requests to the federation senders. + If running multiple federation senders then you must list each instance in the [`federation_sender_instances`](usage/configuration/config_documentation.md#federation_sender_instances) @@ -607,6 +615,13 @@ send_federation: false federation_sender_instances: - federation_sender1 - federation_sender2 +instance_map: + - federation_sender1: + - host: localhost + - port: 1001 + - federation_sender2: + - host: localhost + - port: 1002 ``` An example for a federation sender instance: @@ -615,6 +630,9 @@ An example for a federation sender instance: {{#include systemd-with-workers/workers/federation_sender.yaml}} ``` +_Changed in Synapse 1.79: Federation senders should now have an http listener +listening for `replication`, and should be present in the `instance_map`._ + ### `synapse.app.media_repository` Handles the media repository. It can handle all endpoints starting with: From 3452c2a23d549251839d578e6d0eecb9d59fc4e8 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 24 Feb 2023 17:12:02 +0000 Subject: [PATCH 11/12] Complain at startup if we can't find fed senders --- synapse/crypto/keyring.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index a5c9d720853a..7de3e8781fa6 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -41,6 +41,7 @@ RequestSendFailed, SynapseError, ) +from synapse.config import ConfigError from synapse.config.key import TrustedKeyServer from synapse.crypto.types import _FetchKeyRequest from synapse.events import EventBase @@ -927,6 +928,8 @@ class InternalWorkerRequestKeyFetcher(KeyFetcher): def __init__(self, hs: "HomeServer"): super().__init__(hs) self._federation_shard_config = hs.config.worker.federation_shard_config + if not self._federation_shard_config.instances: + raise ConfigError("No federation senders configured") self._client = ReplicationFetchKeysEndpoint.make_client(hs) async def _fetch_keys( From b4517d78f687aaf900c41862fb6b4c571aeb24dd Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 24 Feb 2023 17:44:42 +0000 Subject: [PATCH 12/12] Master is a fed sender if none are configured --- synapse/config/workers.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 2580660b6c27..0b03ad223eed 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -174,7 +174,10 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: "synapse.app.federation_sender", "federation_sender_instances", ) - self.send_federation = self.instance_name in federation_sender_instances + self.send_federation = (self.instance_name in federation_sender_instances) or ( + not federation_sender_instances and self.instance_name == "master" + ) + self.federation_shard_config = ShardedWorkerHandlingConfig( federation_sender_instances )