From c2c594299d4b6b44198a51dfd8ddfaaca79c4f6d Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 28 Nov 2023 14:02:02 -0500 Subject: [PATCH 1/5] Move logic to download media to TransportLayerClient. --- synapse/federation/transport/client.py | 25 +++++++++++++++++++++++++ synapse/media/media_repository.py | 17 ++++------------- 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index fab480071716..9cdfb3330f09 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -18,6 +18,7 @@ from typing import ( TYPE_CHECKING, Any, + BinaryIO, Callable, Collection, Dict, @@ -804,6 +805,30 @@ async def get_account_status( destination=destination, path=path, data={"user_ids": user_ids} ) + async def download_media_r0( + self, + destination: str, + media_id: str, + output_stream: BinaryIO, + max_size: int, + max_timeout_ms: int, + ) -> Tuple[int, Dict[bytes, List[bytes]]]: + path = f"/_matrix/media/r0/download/{destination}/{media_id}" + + return await self.client.get_file( + destination, + path, + output_stream=output_stream, + max_size=max_size, + args={ + # tell the remote server to 404 if it doesn't + # recognise the server_name, to make sure we don't + # end up with a routing loop. + "allow_remote": "false", + "timeout_ms": str(max_timeout_ms), + }, + ) + def _create_path(federation_prefix: str, path: str, *args: str) -> str: """ diff --git a/synapse/media/media_repository.py b/synapse/media/media_repository.py index bf976b9e7c2c..f2471a76eaeb 100644 --- a/synapse/media/media_repository.py +++ b/synapse/media/media_repository.py @@ -77,7 +77,7 @@ class MediaRepository: def __init__(self, hs: "HomeServer"): self.hs = hs self.auth = hs.get_auth() - self.client = hs.get_federation_http_client() + self.client = hs.get_federation_transport_client() self.clock = hs.get_clock() self.server_name = hs.hostname self.store = hs.get_datastores().main @@ -644,22 +644,13 @@ async def _download_remote_file( file_info = FileInfo(server_name=server_name, file_id=file_id) with self.media_storage.store_into_file(file_info) as (f, fname, finish): - request_path = "/".join( - ("/_matrix/media/r0/download", server_name, media_id) - ) try: - length, headers = await self.client.get_file( + length, headers = await self.client.download_media_r0( server_name, - request_path, + media_id, output_stream=f, max_size=self.max_upload_size, - args={ - # tell the remote server to 404 if it doesn't - # recognise the server_name, to make sure we don't - # end up with a routing loop. - "allow_remote": "false", - "timeout_ms": str(max_timeout_ms), - }, + max_timeout_ms=max_timeout_ms, ) except RequestSendFailed as e: logger.warning( From fa4f637e7b2f73b3bf0765213135451e3c395c6f Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 28 Nov 2023 14:07:51 -0500 Subject: [PATCH 2/5] Attempt both the v3 and r0 endpoints. --- synapse/federation/federation_client.py | 36 +++++++++++++++ synapse/federation/transport/client.py | 24 ++++++++++ synapse/media/media_repository.py | 4 +- tests/media/test_media_storage.py | 58 +++++++++++++++++++++++-- 4 files changed, 117 insertions(+), 5 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 1a7fa175ec63..2c3aa5dd4800 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -21,6 +21,7 @@ TYPE_CHECKING, AbstractSet, Awaitable, + BinaryIO, Callable, Collection, Container, @@ -1862,6 +1863,41 @@ def filter_user_id(user_id: str) -> bool: return filtered_statuses, filtered_failures + async def download_media( + self, + destination: str, + media_id: str, + output_stream: BinaryIO, + max_size: int, + max_timeout_ms: int, + ) -> Tuple[int, Dict[bytes, List[bytes]]]: + try: + return await self.transport_layer.download_media_v3( + destination, + media_id, + output_stream=output_stream, + max_size=max_size, + max_timeout_ms=max_timeout_ms, + ) + except HttpResponseException as e: + # If an error is received that is due to an unrecognised endpoint, + # fallback to the r0 endpoint. Otherwise, consider it a legitimate error + # and raise. + if not is_unknown_endpoint(e): + raise + + logger.debug( + "Couldn't download media with the v3 API, falling back to the r0 API" + ) + + return await self.transport_layer.download_media_r0( + destination, + media_id, + output_stream=output_stream, + max_size=max_size, + max_timeout_ms=max_timeout_ms, + ) + @attr.s(frozen=True, slots=True, auto_attribs=True) class TimestampToEventResponse: diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 9cdfb3330f09..0bb49057ec0a 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -829,6 +829,30 @@ async def download_media_r0( }, ) + async def download_media_v3( + self, + destination: str, + media_id: str, + output_stream: BinaryIO, + max_size: int, + max_timeout_ms: int, + ) -> Tuple[int, Dict[bytes, List[bytes]]]: + path = f"/_matrix/media/v3/download/{destination}/{media_id}" + + return await self.client.get_file( + destination, + path, + output_stream=output_stream, + max_size=max_size, + args={ + # tell the remote server to 404 if it doesn't + # recognise the server_name, to make sure we don't + # end up with a routing loop. + "allow_remote": "false", + "timeout_ms": str(max_timeout_ms), + }, + ) + def _create_path(federation_prefix: str, path: str, *args: str) -> str: """ diff --git a/synapse/media/media_repository.py b/synapse/media/media_repository.py index f2471a76eaeb..d62af22adb70 100644 --- a/synapse/media/media_repository.py +++ b/synapse/media/media_repository.py @@ -77,7 +77,7 @@ class MediaRepository: def __init__(self, hs: "HomeServer"): self.hs = hs self.auth = hs.get_auth() - self.client = hs.get_federation_transport_client() + self.client = hs.get_federation_client() self.clock = hs.get_clock() self.server_name = hs.hostname self.store = hs.get_datastores().main @@ -645,7 +645,7 @@ async def _download_remote_file( with self.media_storage.store_into_file(file_info) as (f, fname, finish): try: - length, headers = await self.client.download_media_r0( + length, headers = await self.client.download_media( server_name, media_id, output_stream=f, diff --git a/tests/media/test_media_storage.py b/tests/media/test_media_storage.py index f262304c3daa..e30b43475f4e 100644 --- a/tests/media/test_media_storage.py +++ b/tests/media/test_media_storage.py @@ -27,10 +27,11 @@ from twisted.internet import defer from twisted.internet.defer import Deferred +from twisted.python.failure import Failure from twisted.test.proto_helpers import MemoryReactor from twisted.web.resource import Resource -from synapse.api.errors import Codes +from synapse.api.errors import Codes, HttpResponseException from synapse.events import EventBase from synapse.http.types import QueryParams from synapse.logging.context import make_deferred_yieldable @@ -257,10 +258,15 @@ def write_to( output_stream.write(data) return response + def write_err(f: Failure) -> Failure: + f.trap(HttpResponseException) + output_stream.write(f.value.response) + return f + d: Deferred[Tuple[bytes, Tuple[int, Dict[bytes, List[bytes]]]]] = Deferred() self.fetches.append((d, destination, path, args)) # Note that this callback changes the value held by d. - d_after_callback = d.addCallback(write_to) + d_after_callback = d.addCallbacks(write_to, write_err) return make_deferred_yieldable(d_after_callback) # Mock out the homeserver's MatrixFederationHttpClient @@ -316,7 +322,7 @@ def _req( self.assertEqual(len(self.fetches), 1) self.assertEqual(self.fetches[0][1], "example.com") self.assertEqual( - self.fetches[0][2], "/_matrix/media/r0/download/" + self.media_id + self.fetches[0][2], "/_matrix/media/v3/download/" + self.media_id ) self.assertEqual( self.fetches[0][3], {"allow_remote": "false", "timeout_ms": "20000"} @@ -671,6 +677,52 @@ def test_cross_origin_resource_policy_header(self) -> None: [b"cross-origin"], ) + def test_unknown_v3_endpoint(self) -> None: + """ + If the v3 endpoint fails, try the r0 one. + """ + channel = self.make_request( + "GET", + f"/_matrix/media/v3/download/{self.media_id}", + shorthand=False, + await_result=False, + ) + self.pump() + + # We've made one fetch, to example.com, using the media URL, and asking + # the other server not to do a remote fetch + self.assertEqual(len(self.fetches), 1) + self.assertEqual(self.fetches[0][1], "example.com") + self.assertEqual( + self.fetches[0][2], "/_matrix/media/v3/download/" + self.media_id + ) + + # The result which says the endpoint is unknown. + unknown_endpoint = b'{"errcode":"M_UNRECOGNIZED","error":"Unknown request"}' + self.fetches[0][0].errback( + HttpResponseException(404, "NOT FOUND", unknown_endpoint) + ) + + self.pump() + + # There should now be another request to the r0 URL. + self.assertEqual(len(self.fetches), 2) + self.assertEqual(self.fetches[1][1], "example.com") + self.assertEqual( + self.fetches[1][2], f"/_matrix/media/r0/download/{self.media_id}" + ) + + headers = { + b"Content-Length": [b"%d" % (len(self.test_image.data))], + } + + self.fetches[1][0].callback( + (self.test_image.data, (len(self.test_image.data), headers)) + ) + + self.pump() + self.assertEqual(channel.code, 200) + class TestSpamCheckerLegacy: """A spam checker module that rejects all media that includes the bytes From 8837c38d67525c681f3bfde35cc2a7deeb9be930 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 28 Nov 2023 15:25:31 -0500 Subject: [PATCH 3/5] Follow redirects when fetching from /download. --- changelog.d/16701.feature | 1 + synapse/federation/transport/client.py | 4 ++ synapse/http/matrixfederationclient.py | 77 ++++++++++++++++------ tests/media/test_media_storage.py | 4 +- tests/replication/test_multi_media_repo.py | 2 +- 5 files changed, 66 insertions(+), 22 deletions(-) create mode 100644 changelog.d/16701.feature diff --git a/changelog.d/16701.feature b/changelog.d/16701.feature new file mode 100644 index 000000000000..2a66fc932adc --- /dev/null +++ b/changelog.d/16701.feature @@ -0,0 +1 @@ +Follow redirects when downloading media over federation (per [MSC3860](https://github.com/matrix-org/matrix-spec-proposals/pull/3860)). diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 0bb49057ec0a..30c4c50a2fd3 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -850,7 +850,11 @@ async def download_media_v3( # end up with a routing loop. "allow_remote": "false", "timeout_ms": str(max_timeout_ms), + # Matix 1.7 allows for this to redirect to another URL, this should + # just be ignored for an old homeserver, so always provide it. + "allow_redirect": "true", }, + follow_redirects=True, ) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index d5013e8e97c1..cc1db763ae4e 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -153,12 +153,18 @@ class MatrixFederationRequest: """Query arguments. """ - txn_id: Optional[str] = None - """Unique ID for this request (for logging) + txn_id: str = attr.ib(init=False) + """Unique ID for this request (for logging), this is autogenerated. """ - uri: bytes = attr.ib(init=False) - """The URI of this request + uri: bytes = b"" + """The URI of this request, usually generated from the above information. + """ + + _generate_uri: bool = True + """True to automatically generate the uri field based on the above information. + + Set to False if manually configuring the URI. """ def __attrs_post_init__(self) -> None: @@ -168,22 +174,23 @@ def __attrs_post_init__(self) -> None: object.__setattr__(self, "txn_id", txn_id) - destination_bytes = self.destination.encode("ascii") - path_bytes = self.path.encode("ascii") - query_bytes = encode_query_args(self.query) - - # The object is frozen so we can pre-compute this. - uri = urllib.parse.urlunparse( - ( - b"matrix-federation", - destination_bytes, - path_bytes, - None, - query_bytes, - b"", + if self._generate_uri: + destination_bytes = self.destination.encode("ascii") + path_bytes = self.path.encode("ascii") + query_bytes = encode_query_args(self.query) + + # The object is frozen so we can pre-compute this. + uri = urllib.parse.urlunparse( + ( + b"matrix-federation", + destination_bytes, + path_bytes, + None, + query_bytes, + b"", + ) ) - ) - object.__setattr__(self, "uri", uri) + object.__setattr__(self, "uri", uri) def get_json(self) -> Optional[JsonDict]: if self.json_callback: @@ -513,6 +520,7 @@ async def _send_request( ignore_backoff: bool = False, backoff_on_404: bool = False, backoff_on_all_error_codes: bool = False, + follow_redirects: bool = False, ) -> IResponse: """ Sends a request to the given server. @@ -555,6 +563,9 @@ async def _send_request( backoff_on_404: Back off if we get a 404 backoff_on_all_error_codes: Back off if we get any error response + follow_redirects: True to follow the Location header of 307/308 redirect + responses. This does not recurse. + Returns: Resolves with the HTTP response object on success. @@ -714,6 +725,26 @@ async def _send_request( response.code, response_phrase, ) + elif ( + response.code in (307, 308) + and follow_redirects + and response.headers.hasHeader("Location") + ): + # The Location header *might* be relative so resolve it. + location = response.headers.getRawHeaders(b"Location")[0] + new_uri = urllib.parse.urljoin(request.uri, location) + + return await self._send_request( + attr.evolve(request, uri=new_uri, generate_uri=False), + retry_on_dns_fail, + timeout, + long_retries, + ignore_backoff, + backoff_on_404, + backoff_on_all_error_codes, + # Do not continue following redirects. + follow_redirects=False, + ) else: logger.info( "{%s} [%s] Got response headers: %d %s", @@ -1383,6 +1414,7 @@ async def get_file( retry_on_dns_fail: bool = True, max_size: Optional[int] = None, ignore_backoff: bool = False, + follow_redirects: bool = False, ) -> Tuple[int, Dict[bytes, List[bytes]]]: """GETs a file from a given homeserver Args: @@ -1392,6 +1424,8 @@ async def get_file( args: Optional dictionary used to create the query string. ignore_backoff: true to ignore the historical backoff data and try the request anyway. + follow_redirects: True to follow the Location header of 307/308 redirect + responses. This does not recurse. Returns: Resolves with an (int,dict) tuple of @@ -1412,7 +1446,10 @@ async def get_file( ) response = await self._send_request( - request, retry_on_dns_fail=retry_on_dns_fail, ignore_backoff=ignore_backoff + request, + retry_on_dns_fail=retry_on_dns_fail, + ignore_backoff=ignore_backoff, + follow_redirects=follow_redirects, ) headers = dict(response.headers.getAllRawHeaders()) diff --git a/tests/media/test_media_storage.py b/tests/media/test_media_storage.py index e30b43475f4e..f981d1c0d8dd 100644 --- a/tests/media/test_media_storage.py +++ b/tests/media/test_media_storage.py @@ -248,6 +248,7 @@ def get_file( retry_on_dns_fail: bool = True, max_size: Optional[int] = None, ignore_backoff: bool = False, + follow_redirects: bool = False, ) -> "Deferred[Tuple[int, Dict[bytes, List[bytes]]]]": """A mock for MatrixFederationHttpClient.get_file.""" @@ -325,7 +326,8 @@ def _req( self.fetches[0][2], "/_matrix/media/v3/download/" + self.media_id ) self.assertEqual( - self.fetches[0][3], {"allow_remote": "false", "timeout_ms": "20000"} + self.fetches[0][3], + {"allow_remote": "false", "timeout_ms": "20000", "allow_redirect": "true"}, ) headers = { diff --git a/tests/replication/test_multi_media_repo.py b/tests/replication/test_multi_media_repo.py index 1e9994cc0bc7..9a7b675f54cb 100644 --- a/tests/replication/test_multi_media_repo.py +++ b/tests/replication/test_multi_media_repo.py @@ -133,7 +133,7 @@ def _get_media_req( self.assertEqual(request.method, b"GET") self.assertEqual( request.path, - f"/_matrix/media/r0/download/{target}/{media_id}".encode(), + f"/_matrix/media/v3/download/{target}/{media_id}".encode(), ) self.assertEqual( request.requestHeaders.getRawHeaders(b"host"), [target.encode("utf-8")] From 12c6193f6a570c93d302bda21edd15c3d459eca6 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 29 Nov 2023 13:37:51 -0500 Subject: [PATCH 4/5] Fix typo. Co-authored-by: David Robertson --- synapse/federation/transport/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 30c4c50a2fd3..5e36638b0a65 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -850,7 +850,7 @@ async def download_media_v3( # end up with a routing loop. "allow_remote": "false", "timeout_ms": str(max_timeout_ms), - # Matix 1.7 allows for this to redirect to another URL, this should + # Matrix 1.7 allows for this to redirect to another URL, this should # just be ignored for an old homeserver, so always provide it. "allow_redirect": "true", }, From b209ee70adba8cf3112fbc4c32263810ecdee9b0 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 29 Nov 2023 13:40:07 -0500 Subject: [PATCH 5/5] Update logging. --- synapse/federation/federation_client.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 2c3aa5dd4800..0ba03b0d0540 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -1887,7 +1887,9 @@ async def download_media( raise logger.debug( - "Couldn't download media with the v3 API, falling back to the r0 API" + "Couldn't download media %s/%s with the v3 API, falling back to the r0 API", + destination, + media_id, ) return await self.transport_layer.download_media_r0(