Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Route key requests to federation senders #15121

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15121.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Route remote key requests via federation senders.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(As I've left this comment on #15133 and #15134 I should just copy it here...)

I think we also need to update:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The upgrade notes will need amending too; I think federation senders will need to be declared in the instance_map.

8 changes: 7 additions & 1 deletion docker/configure_workers_and_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
},
"federation_sender": {
"app": "synapse.app.generic_worker",
"listener_resources": [],
"listener_resources": ["replication"],
"endpoint_patterns": [],
"shared_extra_conf": {},
"worker_extra_conf": "",
Expand Down Expand Up @@ -345,7 +345,13 @@ def add_worker_roles_to_shared_config(
shared_config.setdefault("pusher_instances", []).append(worker_name)

elif worker_type == "federation_sender":
# Some outbound federation requests can be routed via federation senders,
# so federation senders need to be accessible by other workers.
shared_config.setdefault("federation_sender_instances", []).append(worker_name)
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}

elif worker_type == "event_persister":
# Event persisters write to the events stream, so we need to update
Expand Down
23 changes: 23 additions & 0 deletions docs/upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,29 @@ process, for example:
wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
```
# Upgrading to v1.79.0

## Changes to federation sender config

_This notice only applies to deployments using multiple workers. Deployments
not using workers are not affected._

From Synapse 1.79, only [federation senders](
https://matrix-org.github.io/synapse/release-v1.79/usage/configuration/config_documentation.html#federation_sender_instances
) will make outgoing key requests to homeservers and [trusted key servers](
https://matrix-org.github.io/synapse/release-v1.79/usage/configuration/config_documentation.html#trusted_key_servers
). This will make it easier for server operators to reason about how Synapse
communicates with the wider federation. As a consequence, all other workers now
ask federation senders to fetch keys on their behalf.

To facilitate this,

- federation senders must now be present in the [instance map](
https://matrix-org.github.io/synapse/release-v1.79/usage/configuration/config_documentation.html#instance_map
), and
- federation senders must now run an [http listener](
https://matrix-org.github.io/synapse/latest/usage/configuration/config_documentation.html#listeners
) which includes the `replication` resource.

# Upgrading to v1.78.0

Expand Down
10 changes: 9 additions & 1 deletion docs/usage/configuration/config_documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -3811,7 +3811,7 @@ send_federation: false
### `federation_sender_instances`

It is possible to scale the processes that handle sending outbound federation requests
by running a [`generic_worker`](../../workers.md#synapseappgeneric_worker) and adding it's [`worker_name`](#worker_name) to
by running a [`generic_worker`](../../workers.md#synapseappgeneric_worker) and adding its [`worker_name`](#worker_name) to
a `federation_sender_instances` map. Doing so will remove handling of this function from
the main process. Multiple workers can be added to this map, in which case the work is
balanced across them.
Expand All @@ -3821,6 +3821,10 @@ sending, and if changed all federation sender workers must be stopped at the sam
and then started, to ensure that all instances are running with the same config (otherwise
events may be dropped).

Federation senders should have a replication [`http` listener](#listeners)
configured, and should be present in the [`instance_map`](#instance_map)
so that other workers can make internal http requests to the federation senders.

Example configuration for a single worker:
```yaml
federation_sender_instances:
Expand All @@ -3832,6 +3836,10 @@ federation_sender_instances:
- federation_sender1
- federation_sender2
```

_Changed in Synapse 1.79: Federation senders should now have an http listener
listening for `replication`, and should be present in the `instance_map`._

---
### `instance_map`

Expand Down
20 changes: 19 additions & 1 deletion docs/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -590,10 +590,18 @@ It is likely this option will be deprecated in the future and not recommended fo
new installations. Instead, [use `synapse.app.generic_worker` with the `federation_sender_instances`](usage/configuration/config_documentation.md#federation_sender_instances).

Handles sending federation traffic to other servers. Doesn't handle any
REST endpoints itself, but you should set
client-facing REST endpoints itself, but you should set
[`send_federation: false`](usage/configuration/config_documentation.md#send_federation)
in the shared configuration file to stop the main synapse sending this traffic.

Federation senders should have a replication [`http` listener](
usage/configuration/config_documentation.md#listeners
) configured, and
should be present in the [`instance_map`](
usage/configuration/config_documentation.md#instance_map
) so that other workers can make internal
http requests to the federation senders.

If running multiple federation senders then you must list each
instance in the
[`federation_sender_instances`](usage/configuration/config_documentation.md#federation_sender_instances)
Expand All @@ -607,6 +615,13 @@ send_federation: false
federation_sender_instances:
- federation_sender1
- federation_sender2
instance_map:
- federation_sender1:
- host: localhost
- port: 1001
- federation_sender2:
- host: localhost
- port: 1002
Comment on lines +618 to +624
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
instance_map:
- federation_sender1:
- host: localhost
- port: 1001
- federation_sender2:
- host: localhost
- port: 1002
instance_map:
- federation_sender1:
- host: localhost
- port: 1001
- federation_sender2:
- host: localhost
- port: 1002

```

An example for a federation sender instance:
Expand All @@ -615,6 +630,9 @@ An example for a federation sender instance:
{{#include systemd-with-workers/workers/federation_sender.yaml}}
```

_Changed in Synapse 1.79: Federation senders should now have an http listener
listening for `replication`, and should be present in the `instance_map`._

### `synapse.app.media_repository`

Handles the media repository. It can handle all endpoints starting with:
Expand Down
5 changes: 4 additions & 1 deletion synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
"synapse.app.federation_sender",
"federation_sender_instances",
)
self.send_federation = self.instance_name in federation_sender_instances
self.send_federation = (self.instance_name in federation_sender_instances) or (
not federation_sender_instances and self.instance_name == "master"
)

self.federation_shard_config = ShardedWorkerHandlingConfig(
federation_sender_instances
)
Expand Down
114 changes: 81 additions & 33 deletions synapse/crypto/keyring.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import abc
import logging
import random
from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Tuple

import attr
Expand All @@ -40,10 +41,16 @@
RequestSendFailed,
SynapseError,
)
from synapse.config import ConfigError
from synapse.config.key import TrustedKeyServer
from synapse.crypto.types import _FetchKeyRequest
from synapse.events import EventBase
from synapse.events.utils import prune_event_dict
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.replication.http.keys import (
ReplicationFetchKeysEndpoint,
deserialise_fetch_key_result,
)
from synapse.storage.keys import FetchKeyResult
from synapse.types import JsonDict
from synapse.util import unwrapFirstError
Expand Down Expand Up @@ -123,25 +130,6 @@ class KeyLookupError(ValueError):
pass


@attr.s(slots=True, frozen=True, auto_attribs=True)
class _FetchKeyRequest:
"""A request for keys for a given server.

We will continue to try and fetch until we have all the keys listed under
`key_ids` (with an appropriate `valid_until_ts` property) or we run out of
places to fetch keys from.

Attributes:
server_name: The name of the server that owns the keys.
minimum_valid_until_ts: The timestamp which the keys must be valid until.
key_ids: The IDs of the keys to attempt to fetch
"""

server_name: str
minimum_valid_until_ts: int
key_ids: List[str]


class Keyring:
"""Handles verifying signed JSON objects and fetching the keys needed to do
so.
Expand All @@ -153,14 +141,22 @@ def __init__(
self.clock = hs.get_clock()

if key_fetchers is None:
key_fetchers = (
# Fetch keys from the database.
StoreKeyFetcher(hs),
# Fetch keys from a configured Perspectives server.
PerspectivesKeyFetcher(hs),
# Fetch keys from the origin server directly.
ServerKeyFetcher(hs),
)
if hs.config.worker.send_federation:
key_fetchers = (
# Fetch keys from the database.
StoreKeyFetcher(hs),
# Fetch keys from a configured Perspectives server.
PerspectivesKeyFetcher(hs),
# Fetch keys from the origin server directly.
ServerKeyFetcher(hs),
)
else:
key_fetchers = (
# Fetch keys from the database.
StoreKeyFetcher(hs),
# Ask a federation sender to fetch the keys for us.
InternalWorkerRequestKeyFetcher(hs),
)
self._key_fetchers = key_fetchers

self._fetch_keys_queue: BatchingQueue[
Expand Down Expand Up @@ -291,9 +287,7 @@ async def process_request(self, verify_request: VerifyJsonRequest) -> None:
minimum_valid_until_ts=verify_request.minimum_valid_until_ts,
key_ids=list(key_ids_to_find),
)
found_keys_by_server = await self._fetch_keys_queue.add_to_queue(
key_request, key=verify_request.server_name
)
found_keys_by_server = await self.fetch_keys(key_request)

# Since we batch up requests the returned set of keys may contain keys
# from other servers, so we pull out only the ones we care about.
Expand All @@ -320,6 +314,15 @@ async def process_request(self, verify_request: VerifyJsonRequest) -> None:
Codes.UNAUTHORIZED,
)

async def fetch_keys(
self, key_request: _FetchKeyRequest
) -> Dict[str, Dict[str, FetchKeyResult]]:
"""Returns: {server name: {key id: fetch key result}}"""
found_keys_by_server = await self._fetch_keys_queue.add_to_queue(
key_request, key=key_request.server_name
)
return found_keys_by_server

async def _process_json(
self, verify_key: VerifyKey, verify_request: VerifyJsonRequest
) -> None:
Expand Down Expand Up @@ -469,6 +472,8 @@ async def _inner_fetch_key_request(


class KeyFetcher(metaclass=abc.ABCMeta):
"""Abstract gadget for fetching keys to validate other homeservers' signatures."""

def __init__(self, hs: "HomeServer"):
self._queue = BatchingQueue(
self.__class__.__name__, hs.get_clock(), self._fetch_keys
Expand All @@ -490,11 +495,15 @@ async def get_keys(
async def _fetch_keys(
self, keys_to_fetch: List[_FetchKeyRequest]
) -> Dict[str, Dict[str, FetchKeyResult]]:
"""
Returns:
Map from server_name -> key_id -> FetchKeyResult
"""
pass


class StoreKeyFetcher(KeyFetcher):
"""KeyFetcher impl which fetches keys from our data store"""
"""Try to retrieve a previously-fetched key from the DB."""

def __init__(self, hs: "HomeServer"):
super().__init__(hs)
Expand All @@ -518,6 +527,8 @@ async def _fetch_keys(


class BaseV2KeyFetcher(KeyFetcher):
"""Abstract helper. Fetch keys by requesting it from some server."""

def __init__(self, hs: "HomeServer"):
super().__init__(hs)

Expand Down Expand Up @@ -620,7 +631,10 @@ async def process_v2_response(


class PerspectivesKeyFetcher(BaseV2KeyFetcher):
"""KeyFetcher impl which fetches keys from the "perspectives" servers"""
"""Fetch keys for some homeserver X by requesting them from a trusted key server Y.

These trusted key servers were seemingly once known as "perspectives" servers.
"""

def __init__(self, hs: "HomeServer"):
super().__init__(hs)
Expand Down Expand Up @@ -803,7 +817,7 @@ def _validate_perspectives_response(


class ServerKeyFetcher(BaseV2KeyFetcher):
"""KeyFetcher impl which fetches keys from the origin servers"""
"""Fetch keys for some homeserver X by requesting them directly from X."""

def __init__(self, hs: "HomeServer"):
super().__init__(hs)
Expand Down Expand Up @@ -903,3 +917,37 @@ async def get_server_verify_keys_v2_direct(
response_json=response,
time_added_ms=time_now_ms,
)


class InternalWorkerRequestKeyFetcher(KeyFetcher):
"""Ask a federation_sender worker to request keys for some homeserver X.

It may choose to do so via a notary or directly from X itself; we don't care.
"""

def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self._federation_shard_config = hs.config.worker.federation_shard_config
if not self._federation_shard_config.instances:
raise ConfigError("No federation senders configured")
self._client = ReplicationFetchKeysEndpoint.make_client(hs)

async def _fetch_keys(
self, keys_to_fetch: List[_FetchKeyRequest]
) -> Dict[str, Dict[str, FetchKeyResult]]:
# For simplicity's sake, pick a random federation sender
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we would want to choose the same federation sender that's talking to that instance? 🤷

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I thought, but federation_shard_config is a ShardedWorkerHandlingConfig rather than a class RoutableShardedWorkerHandlingConfig; only the latter defines get_instance publicly.

Personally I find this all incomprehensible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reading of it is that if all workers have the same federation_sender_instances config, then federation_shard_config can be upgraded to a RoutableShardedWorkerHandlingConfig, since all workers will route requests the same way.

instance_name = random.choice(self._federation_shard_config.instances)
response = await self._client(
keys_to_fetch=keys_to_fetch,
instance_name=instance_name,
)

parsed_response: Dict[str, Dict[str, FetchKeyResult]] = {}
for server_name, keys in response["server_keys"].items():
deserialised_keys = {
key_id: deserialise_fetch_key_result(key_id, verify_key)
for key_id, verify_key in keys.items()
}
parsed_response.setdefault(server_name, {}).update(deserialised_keys)

return parsed_response
35 changes: 35 additions & 0 deletions synapse/crypto/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright 2023- The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List

import attr


@attr.s(slots=True, frozen=True, auto_attribs=True)
class _FetchKeyRequest:
"""A request for keys for a given server.

We will continue to try and fetch until we have all the keys listed under
`key_ids` (with an appropriate `valid_until_ts` property) or we run out of
places to fetch keys from.

Attributes:
server_name: The name of the server that owns the keys.
minimum_valid_until_ts: The timestamp which the keys must be valid until.
key_ids: The IDs of the keys to attempt to fetch
"""

server_name: str
minimum_valid_until_ts: int
key_ids: List[str]
2 changes: 2 additions & 0 deletions synapse/replication/http/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
account_data,
devices,
federation,
keys,
login,
membership,
presence,
Expand Down Expand Up @@ -52,6 +53,7 @@ def register_servlets(self, hs: "HomeServer") -> None:
account_data.register_servlets(hs, self)
push.register_servlets(hs, self)
state.register_servlets(hs, self)
keys.register_servlets(hs, self)

# The following can't currently be instantiated on workers.
if hs.config.worker.worker_app is None:
Expand Down
Loading