diff --git a/changelog.d/16613.feature b/changelog.d/16613.feature index 254d04a90e84..419c56fb83cd 100644 --- a/changelog.d/16613.feature +++ b/changelog.d/16613.feature @@ -1 +1 @@ -Improve the performance of claiming encryption keys in multi-worker deployments. +Improve the performance of some operations in multi-worker deployments. diff --git a/changelog.d/16616.feature b/changelog.d/16616.feature new file mode 100644 index 000000000000..419c56fb83cd --- /dev/null +++ b/changelog.d/16616.feature @@ -0,0 +1 @@ +Improve the performance of some operations in multi-worker deployments. diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index d7482a1f4e35..07f9b65af31c 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -747,8 +747,16 @@ def _add_account_data_for_user( ) # Invalidate the cache for any ignored users which were added or removed. - for ignored_user_id in previously_ignored_users ^ currently_ignored_users: - self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,)) + self._invalidate_cache_and_stream_bulk( + txn, + self.ignored_by, + [ + (ignored_user_id,) + for ignored_user_id in ( + previously_ignored_users ^ currently_ignored_users + ) + ], + ) self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,)) async def remove_account_data_for_user( @@ -824,10 +832,14 @@ def _remove_account_data_for_user_txn( ) # Invalidate the cache for ignored users which were removed. - for ignored_user_id in previously_ignored_users: - self._invalidate_cache_and_stream( - txn, self.ignored_by, (ignored_user_id,) - ) + self._invalidate_cache_and_stream_bulk( + txn, + self.ignored_by, + [ + (ignored_user_id,) + for ignored_user_id in previously_ignored_users + ], + ) # Invalidate for this user the cache tracking ignored users. self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,)) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 00618051507e..9c46c5d7bd62 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1222,14 +1222,13 @@ def _event_arbitrary_relations_txn(txn: LoggingTransaction) -> int: ) # Iterate the parent IDs and invalidate caches. - for parent_id in {r[1] for r in relations_to_insert}: - cache_tuple = (parent_id,) - self._invalidate_cache_and_stream( # type: ignore[attr-defined] - txn, self.get_relations_for_event, cache_tuple # type: ignore[attr-defined] - ) - self._invalidate_cache_and_stream( # type: ignore[attr-defined] - txn, self.get_thread_summary, cache_tuple # type: ignore[attr-defined] - ) + cache_tuples = {(r[1],) for r in relations_to_insert} + self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined] + txn, self.get_relations_for_event, cache_tuples # type: ignore[attr-defined] + ) + self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined] + txn, self.get_thread_summary, cache_tuples # type: ignore[attr-defined] + ) if results: latest_event_id = results[-1][0] diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py index ce88772f9e9e..c700872fdc59 100644 --- a/synapse/storage/databases/main/keys.py +++ b/synapse/storage/databases/main/keys.py @@ -107,13 +107,16 @@ def store_server_keys_response_txn(txn: LoggingTransaction) -> None: # invalidate takes a tuple corresponding to the params of # _get_server_keys_json. _get_server_keys_json only takes one # param, which is itself the 2-tuple (server_name, key_id). - for key_id in verify_keys: - self._invalidate_cache_and_stream( - txn, self._get_server_keys_json, ((server_name, key_id),) - ) - self._invalidate_cache_and_stream( - txn, self.get_server_key_json_for_remote, (server_name, key_id) - ) + self._invalidate_cache_and_stream_bulk( + txn, + self._get_server_keys_json, + [((server_name, key_id),) for key_id in verify_keys], + ) + self._invalidate_cache_and_stream_bulk( + txn, + self.get_server_key_json_for_remote, + [(server_name, key_id) for key_id in verify_keys], + ) await self.db_pool.runInteraction( "store_server_keys_response", store_server_keys_response_txn diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 3b444d2d07f1..0198bb09d230 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -363,10 +363,11 @@ def _add_users_to_send_full_presence_to(txn: LoggingTransaction) -> None: # for their user ID. value_values=[(presence_stream_id,) for _ in user_ids], ) - for user_id in user_ids: - self._invalidate_cache_and_stream( - txn, self._get_full_presence_stream_token_for_user, (user_id,) - ) + self._invalidate_cache_and_stream_bulk( + txn, + self._get_full_presence_stream_token_for_user, + [(user_id,) for user_id in user_ids], + ) return await self.db_pool.runInteraction( "add_users_to_send_full_presence_to", _add_users_to_send_full_presence_to diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 1e11bf2706dd..c3b3e2baafe2 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -295,19 +295,28 @@ def _purge_history_txn( # so make sure to keep this actually last. txn.execute("DROP TABLE events_to_purge") - for event_id, should_delete in event_rows: - self._invalidate_cache_and_stream( - txn, self._get_state_group_for_event, (event_id,) - ) + self._invalidate_cache_and_stream_bulk( + txn, + self._get_state_group_for_event, + [(event_id,) for event_id, _ in event_rows], + ) - # XXX: This is racy, since have_seen_events could be called between the - # transaction completing and the invalidation running. On the other hand, - # that's no different to calling `have_seen_events` just before the - # event is deleted from the database. + # XXX: This is racy, since have_seen_events could be called between the + # transaction completing and the invalidation running. On the other hand, + # that's no different to calling `have_seen_events` just before the + # event is deleted from the database. + self._invalidate_cache_and_stream_bulk( + txn, + self.have_seen_event, + [ + (room_id, event_id) + for event_id, should_delete in event_rows + if should_delete + ], + ) + + for event_id, should_delete in event_rows: if should_delete: - self._invalidate_cache_and_stream( - txn, self.have_seen_event, (room_id, event_id) - ) self.invalidate_get_event_cache_after_txn(txn, event_id) logger.info("[purge] done") diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index dec985857595..2c3f30e2eba2 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -561,16 +561,15 @@ def set_shadow_banned_txn(txn: LoggingTransaction) -> None: updatevalues={"shadow_banned": shadow_banned}, ) # In order for this to apply immediately, clear the cache for this user. - tokens = self.db_pool.simple_select_onecol_txn( + tokens = self.db_pool.simple_select_list_txn( txn, table="access_tokens", keyvalues={"user_id": user_id}, - retcol="token", + retcols=("token",), + ) + self._invalidate_cache_and_stream_bulk( + txn, self.get_user_by_access_token, tokens ) - for token in tokens: - self._invalidate_cache_and_stream( - txn, self.get_user_by_access_token, (token,) - ) self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) await self.db_pool.runInteraction("set_shadow_banned", set_shadow_banned_txn) @@ -2683,10 +2682,11 @@ def f(txn: LoggingTransaction) -> List[Tuple[str, int, Optional[str]]]: ) tokens_and_devices = [(r[0], r[1], r[2]) for r in txn] - for token, _, _ in tokens_and_devices: - self._invalidate_cache_and_stream( - txn, self.get_user_by_access_token, (token,) - ) + self._invalidate_cache_and_stream_bulk( + txn, + self.get_user_by_access_token, + [(token,) for token, _, _ in tokens_and_devices], + ) txn.execute("DELETE FROM access_tokens WHERE %s" % where_clause, values)