From d8b74cb534732e4a31b900aedd9ab81362a67a1f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 6 Jul 2023 16:41:26 -0500 Subject: [PATCH 1/5] Make the media `/upload` tracing less ambiguous Follow-up to https://github.com/matrix-org/synapse/pull/15850 Tracing instrumentation to media `/upload` code paths to investigate, https://github.com/matrix-org/synapse/issues/15841 --- synapse/media/media_storage.py | 12 +++++++----- synapse/media/storage_provider.py | 25 +++++++++++++------------ 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/synapse/media/media_storage.py b/synapse/media/media_storage.py index eebcbc48e8c9..7b87f0af4242 100644 --- a/synapse/media/media_storage.py +++ b/synapse/media/media_storage.py @@ -38,7 +38,7 @@ from synapse.api.errors import NotFoundError from synapse.logging.context import defer_to_thread, make_deferred_yieldable -from synapse.logging.opentracing import trace +from synapse.logging.opentracing import trace, trace_with_opname from synapse.util import Clock from synapse.util.file_consumer import BackgroundFileConsumer @@ -77,7 +77,7 @@ def __init__( self._spam_checker_module_callbacks = hs.get_module_api_callbacks().spam_checker self.clock = hs.get_clock() - @trace + @trace_with_opname("MediaStorage.store_file") async def store_file(self, source: IO, file_info: FileInfo) -> str: """Write `source` to the on disk media store, and also any other configured storage providers @@ -91,18 +91,19 @@ async def store_file(self, source: IO, file_info: FileInfo) -> str: """ with self.store_into_file(file_info) as (f, fname, finish_cb): - # Write to the main repository + # Write to the main media repository await self.write_to_file(source, f) + # Write to the other storage providers await finish_cb() return fname - @trace + @trace_with_opname("MediaStorage.write_to_file") async def write_to_file(self, source: IO, output: IO) -> None: """Asynchronously write the `source` to `output`.""" await defer_to_thread(self.reactor, _write_file_synchronously, source, output) - @trace + @trace_with_opname("MediaStorage.store_into_file") @contextlib.contextmanager def store_into_file( self, file_info: FileInfo @@ -142,6 +143,7 @@ def store_into_file( try: with open(fname, "wb") as f: + @trace_with_opname("MediaStorage.store_into_file.finish") async def finish() -> None: # Ensure that all writes have been flushed and close the # file. diff --git a/synapse/media/storage_provider.py b/synapse/media/storage_provider.py index 0aea3a7a0d87..70a45cfd5b78 100644 --- a/synapse/media/storage_provider.py +++ b/synapse/media/storage_provider.py @@ -20,7 +20,7 @@ from synapse.config._base import Config from synapse.logging.context import defer_to_thread, run_in_background -from synapse.logging.opentracing import trace +from synapse.logging.opentracing import start_active_span, trace_with_opname from synapse.util.async_helpers import maybe_awaitable from ._base import FileInfo, Responder @@ -87,7 +87,7 @@ def __init__( def __str__(self) -> str: return "StorageProviderWrapper[%s]" % (self.backend,) - @trace + @trace_with_opname("StorageProviderWrapper.store_file") async def store_file(self, path: str, file_info: FileInfo) -> None: if not file_info.server_name and not self.store_local: return None @@ -116,7 +116,7 @@ async def store() -> None: run_in_background(store) - @trace + @trace_with_opname("StorageProviderWrapper.fetch") async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]: if file_info.url_cache: # Files in the URL preview cache definitely aren't stored here, @@ -144,7 +144,7 @@ def __init__(self, hs: "HomeServer", config: str): def __str__(self) -> str: return "FileStorageProviderBackend[%s]" % (self.base_directory,) - @trace + @trace_with_opname("FileStorageProviderBackend.store_file") async def store_file(self, path: str, file_info: FileInfo) -> None: """See StorageProvider.store_file""" @@ -156,14 +156,15 @@ async def store_file(self, path: str, file_info: FileInfo) -> None: # mypy needs help inferring the type of the second parameter, which is generic shutil_copyfile: Callable[[str, str], str] = shutil.copyfile - await defer_to_thread( - self.hs.get_reactor(), - shutil_copyfile, - primary_fname, - backup_fname, - ) - - @trace + with start_active_span("shutil_copyfile"): + await defer_to_thread( + self.hs.get_reactor(), + shutil_copyfile, + primary_fname, + backup_fname, + ) + + @trace_with_opname("FileStorageProviderBackend.fetch") async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]: """See StorageProvider.fetch""" From 33500619c5fba6fa40b9236e2f8f269a208ac143 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 6 Jul 2023 16:44:13 -0500 Subject: [PATCH 2/5] Add changelog --- changelog.d/15888.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/15888.misc diff --git a/changelog.d/15888.misc b/changelog.d/15888.misc new file mode 100644 index 000000000000..0e49ab23fed3 --- /dev/null +++ b/changelog.d/15888.misc @@ -0,0 +1 @@ +Add tracing to media `/upload` code paths. From 4d458d800ffad43e64077d8bb970dc67f3517fa0 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 6 Jul 2023 17:21:31 -0500 Subject: [PATCH 3/5] Label two phases --- synapse/media/media_storage.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/synapse/media/media_storage.py b/synapse/media/media_storage.py index 7b87f0af4242..19bc0aeda9a3 100644 --- a/synapse/media/media_storage.py +++ b/synapse/media/media_storage.py @@ -38,7 +38,7 @@ from synapse.api.errors import NotFoundError from synapse.logging.context import defer_to_thread, make_deferred_yieldable -from synapse.logging.opentracing import trace, trace_with_opname +from synapse.logging.opentracing import start_active_span, trace, trace_with_opname from synapse.util import Clock from synapse.util.file_consumer import BackgroundFileConsumer @@ -91,10 +91,12 @@ async def store_file(self, source: IO, file_info: FileInfo) -> str: """ with self.store_into_file(file_info) as (f, fname, finish_cb): - # Write to the main media repository - await self.write_to_file(source, f) - # Write to the other storage providers - await finish_cb() + with start_active_span("writing to main media repo"): + # Write to the main media repository + await self.write_to_file(source, f) + with start_active_span("writing to other storage providers"): + # Write to the other storage providers + await finish_cb() return fname From f421bbe40f77df53fa53bd9f76c1bca9dfa037ac Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 6 Jul 2023 17:27:19 -0500 Subject: [PATCH 4/5] Label each backend provider --- synapse/media/media_storage.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/media/media_storage.py b/synapse/media/media_storage.py index 19bc0aeda9a3..fc0defc58492 100644 --- a/synapse/media/media_storage.py +++ b/synapse/media/media_storage.py @@ -165,7 +165,8 @@ async def finish() -> None: raise SpamMediaException(errcode=spam_check[0]) for provider in self.storage_providers: - await provider.store_file(path, file_info) + with start_active_span(str(provider)): + await provider.store_file(path, file_info) finished_called[0] = True From dbb281172cd1d0525f81e4b7bc336e1d8ba5e1f6 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 6 Jul 2023 17:51:08 -0500 Subject: [PATCH 5/5] Make the phase labels generic so they also apply to thumbnailing and URL preview --- synapse/media/media_storage.py | 79 ++++++++++++++++++++-------------- 1 file changed, 46 insertions(+), 33 deletions(-) diff --git a/synapse/media/media_storage.py b/synapse/media/media_storage.py index fc0defc58492..a17ccb3d801c 100644 --- a/synapse/media/media_storage.py +++ b/synapse/media/media_storage.py @@ -91,12 +91,10 @@ async def store_file(self, source: IO, file_info: FileInfo) -> str: """ with self.store_into_file(file_info) as (f, fname, finish_cb): - with start_active_span("writing to main media repo"): - # Write to the main media repository - await self.write_to_file(source, f) - with start_active_span("writing to other storage providers"): - # Write to the other storage providers - await finish_cb() + # Write to the main media repository + await self.write_to_file(source, f) + # Write to the other storage providers + await finish_cb() return fname @@ -120,9 +118,9 @@ def store_into_file( fname can be used to read the contents from after upload, e.g. to generate thumbnails. - finish_cb must be called and waited on after the file has been - successfully been written to. Should not be called if there was an - error. + finish_cb must be called and waited on after the file has been successfully been + written to. Should not be called if there was an error. Checks for spam and + stores the file into the configured storage providers. Args: file_info: Info about the file to store @@ -142,37 +140,48 @@ def store_into_file( finished_called = [False] + main_media_repo_write_trace_scope = start_active_span( + "writing to main media repo" + ) + main_media_repo_write_trace_scope.__enter__() + try: with open(fname, "wb") as f: - @trace_with_opname("MediaStorage.store_into_file.finish") async def finish() -> None: - # Ensure that all writes have been flushed and close the - # file. - f.flush() - f.close() - - spam_check = await self._spam_checker_module_callbacks.check_media_file_for_spam( - ReadableFileWrapper(self.clock, fname), file_info - ) - if spam_check != self._spam_checker_module_callbacks.NOT_SPAM: - logger.info("Blocking media due to spam checker") - # Note that we'll delete the stored media, due to the - # try/except below. The media also won't be stored in - # the DB. - # We currently ignore any additional field returned by - # the spam-check API. - raise SpamMediaException(errcode=spam_check[0]) - - for provider in self.storage_providers: - with start_active_span(str(provider)): - await provider.store_file(path, file_info) - - finished_called[0] = True + # When someone calls finish, we assume they are done writing to the main media repo + main_media_repo_write_trace_scope.__exit__(None, None, None) + + with start_active_span("writing to other storage providers"): + # Ensure that all writes have been flushed and close the + # file. + f.flush() + f.close() + + spam_check = await self._spam_checker_module_callbacks.check_media_file_for_spam( + ReadableFileWrapper(self.clock, fname), file_info + ) + if spam_check != self._spam_checker_module_callbacks.NOT_SPAM: + logger.info("Blocking media due to spam checker") + # Note that we'll delete the stored media, due to the + # try/except below. The media also won't be stored in + # the DB. + # We currently ignore any additional field returned by + # the spam-check API. + raise SpamMediaException(errcode=spam_check[0]) + + for provider in self.storage_providers: + with start_active_span(str(provider)): + await provider.store_file(path, file_info) + + finished_called[0] = True yield f, fname, finish except Exception as e: try: + main_media_repo_write_trace_scope.__exit__( + type(e), None, e.__traceback__ + ) os.remove(fname) except Exception: pass @@ -180,7 +189,11 @@ async def finish() -> None: raise e from None if not finished_called: - raise Exception("Finished callback not called") + exc = Exception("Finished callback not called") + main_media_repo_write_trace_scope.__exit__( + type(exc), None, exc.__traceback__ + ) + raise exc async def fetch_media(self, file_info: FileInfo) -> Optional[Responder]: """Attempts to fetch media described by file_info from the local cache