From 10081061d783621f6bd02ed176605a67a388a21e Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Tue, 13 Sep 2022 09:26:46 +0100 Subject: [PATCH 01/15] Reimplement `get_rooms_for_user` and `get_rooms_for_users` This avoids the join on `events` to get stream ordering that is mostly unused. --- synapse/storage/databases/main/cache.py | 3 + synapse/storage/databases/main/events.py | 4 + synapse/storage/databases/main/roommember.py | 79 ++++++++++++++++++-- 3 files changed, 79 insertions(+), 7 deletions(-) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 12e9a423826a..7db2071fdaa2 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -205,6 +205,9 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: self.get_rooms_for_user_with_stream_ordering.invalidate( (data.state_key,) ) + self.get_rooms_for_user.invalidate( + (data.state_key,) + ) else: raise Exception("Unknown events stream row type %s" % (row.type,)) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index a4010ee28dca..7cce6f279707 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1178,6 +1178,10 @@ def _update_current_state_txn( self.store.get_rooms_for_user_with_stream_ordering.invalidate, (member,), ) + txn.call_after( + self.store.get_rooms_for_user.invalidate, + (member,), + ) self.store._invalidate_state_caches_and_stream( txn, room_id, members_changed diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index fdb4684e128e..260042c741c7 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -15,7 +15,6 @@ import logging from typing import ( TYPE_CHECKING, - Callable, Collection, Dict, FrozenSet, @@ -52,7 +51,6 @@ from synapse.util.async_helpers import Linearizer from synapse.util.caches import intern_string from synapse.util.caches.descriptors import _CacheContext, cached, cachedList -from synapse.util.cancellation import cancellable from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure @@ -670,19 +668,86 @@ def _get_users_server_still_shares_room_with_txn( _get_users_server_still_shares_room_with_txn, ) - @cancellable + @cached(max_entries=500000, iterable=True) async def get_rooms_for_user( - self, user_id: str, on_invalidate: Optional[Callable[[], None]] = None + self, user_id: str ) -> FrozenSet[str]: """Returns a set of room_ids the user is currently joined to. If a remote user only returns rooms this server is currently participating in. """ - rooms = await self.get_rooms_for_user_with_stream_ordering( - user_id, on_invalidate=on_invalidate + rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate(user_id) + if rooms: + return frozenset(r.room_id for r in rooms) + + return await self.db_pool.runInteraction( + "get_rooms_for_user", + self._get_rooms_for_user_txn, + user_id, ) - return frozenset(r.room_id for r in rooms) + + def _get_rooms_for_user_txn( + self, txn: LoggingTransaction, user_id: str + ) -> FrozenSet[str]: + sql = """ + SELECT room_id + FROM current_state_events AS c + WHERE + c.type = 'm.room.member' + AND c.state_key = ? + AND c.membership = ? + """ + + txn.execute(sql, (user_id, Membership.JOIN)) + return frozenset(txn) + + @cachedList( + cached_method_name="get_rooms_for_user", + list_name="user_ids", + ) + async def get_rooms_for_users( + self, user_ids: Collection[str] + ) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]: + """A batched version of `get_rooms_for_user`. + + Returns: + Map from user_id to set of rooms that is currently in. + """ + return await self.db_pool.runInteraction( + "get_rooms_for_users", + self._get_rooms_for_users_txn, + user_ids, + ) + + def _get_rooms_for_users_txn( + self, txn: LoggingTransaction, user_ids: Collection[str] + ) -> Dict[str, FrozenSet[str]]: + + clause, args = make_in_list_sql_clause( + self.database_engine, + "c.state_key", + user_ids, + ) + + sql = f""" + SELECT c.state_key, room_id + FROM current_state_events AS c + WHERE + c.type = 'm.room.member' + AND c.membership = ? + AND {clause} + """ + + txn.execute(sql, [Membership.JOIN] + args) + + result: Dict[str, Set[str]] = { + user_id: set() for user_id in user_ids + } + for user_id, room_id in txn: + result[user_id].add(room_id) + + return {user_id: frozenset(v) for user_id, v in result.items()} @cached(max_entries=10000) async def does_pair_of_users_share_a_room( From c98bbfc2c6f2dfee98a3d51743e3945ee1ea8a48 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Tue, 13 Sep 2022 09:27:51 +0100 Subject: [PATCH 02/15] Replace calls to `get_rooms_for_users_with_stream_ordering` None of the calls to this function ever use the stream ordering component returned, so can just use the simplified `get_rooms_for_users`. --- synapse/handlers/device.py | 4 ++-- synapse/handlers/sync.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index c5ac169644ac..fc54ef814f3c 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -271,11 +271,11 @@ async def get_user_ids_changed( possibly_left = possibly_changed | possibly_left # Double check if we still share rooms with the given user. - users_rooms = await self.store.get_rooms_for_users_with_stream_ordering( + users_rooms = await self.store.get_rooms_for_users( possibly_left ) for changed_user_id, entries in users_rooms.items(): - if any(e.room_id in room_ids for e in entries): + if any(room_id in room_ids for room_id in entries): possibly_left.discard(changed_user_id) else: possibly_joined.discard(changed_user_id) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 5293fa4d0e01..80acd6930350 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1474,7 +1474,7 @@ async def _generate_sync_entry_for_device_list( since_token.device_list_key ) if changed_users is not None: - result = await self.store.get_rooms_for_users_with_stream_ordering( + result = await self.store.get_rooms_for_users( changed_users ) @@ -1483,7 +1483,7 @@ async def _generate_sync_entry_for_device_list( # or if the changed user is the syncing user (as we always # want to include device list updates of their own devices). if user_id == changed_user_id or any( - e.room_id in joined_rooms for e in entries + rid in joined_rooms for rid in entries ): users_that_have_changed.add(changed_user_id) else: @@ -1518,12 +1518,12 @@ async def _generate_sync_entry_for_device_list( # Remove any users that we still share a room with. left_users_rooms = ( - await self.store.get_rooms_for_users_with_stream_ordering( + await self.store.get_rooms_for_users( newly_left_users ) ) for user_id, entries in left_users_rooms.items(): - if any(e.room_id in joined_rooms for e in entries): + if any(rid in joined_rooms for rid in entries): newly_left_users.discard(user_id) return DeviceListUpdates( From 7210fa12948afe5a9d073f1ac3e196535f0b7922 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Tue, 13 Sep 2022 09:28:03 +0100 Subject: [PATCH 03/15] Drop unused `get_rooms_for_users_with_stream_ordering` method --- synapse/storage/databases/main/roommember.py | 52 -------------------- 1 file changed, 52 deletions(-) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 260042c741c7..7b14ada866a3 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -581,58 +581,6 @@ def _get_rooms_for_user_with_stream_ordering_txn( for room_id, instance, stream_id in txn ) - @cachedList( - cached_method_name="get_rooms_for_user_with_stream_ordering", - list_name="user_ids", - ) - async def get_rooms_for_users_with_stream_ordering( - self, user_ids: Collection[str] - ) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]: - """A batched version of `get_rooms_for_user_with_stream_ordering`. - - Returns: - Map from user_id to set of rooms that is currently in. - """ - return await self.db_pool.runInteraction( - "get_rooms_for_users_with_stream_ordering", - self._get_rooms_for_users_with_stream_ordering_txn, - user_ids, - ) - - def _get_rooms_for_users_with_stream_ordering_txn( - self, txn: LoggingTransaction, user_ids: Collection[str] - ) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]: - - clause, args = make_in_list_sql_clause( - self.database_engine, - "c.state_key", - user_ids, - ) - - sql = f""" - SELECT c.state_key, room_id, e.instance_name, e.stream_ordering - FROM current_state_events AS c - INNER JOIN events AS e USING (room_id, event_id) - WHERE - c.type = 'm.room.member' - AND c.membership = ? - AND {clause} - """ - - txn.execute(sql, [Membership.JOIN] + args) - - result: Dict[str, Set[GetRoomsForUserWithStreamOrdering]] = { - user_id: set() for user_id in user_ids - } - for user_id, room_id, instance, stream_id in txn: - result[user_id].add( - GetRoomsForUserWithStreamOrdering( - room_id, PersistedEventPosition(instance, stream_id) - ) - ) - - return {user_id: frozenset(v) for user_id, v in result.items()} - async def get_users_server_still_shares_room_with( self, user_ids: Collection[str] ) -> Set[str]: From 7719d9ce873015760cced1acea9f72f9ca259735 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Tue, 13 Sep 2022 09:28:22 +0100 Subject: [PATCH 04/15] Drop simplified cache in sync tests --- tests/handlers/test_sync.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index e3f38fbcc5ce..ab5c101eb708 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -159,6 +159,7 @@ def test_unknown_room_version(self): # Blow away caches (supported room versions can only change due to a restart). self.store.get_rooms_for_user_with_stream_ordering.invalidate_all() + self.store.get_rooms_for_user.invalidate_all() self.get_success(self.store._get_event_cache.clear()) self.store._event_ref.clear() From b745518c475060f223769665b787932642eae07b Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Tue, 13 Sep 2022 10:03:57 +0100 Subject: [PATCH 05/15] Linting fixes --- synapse/handlers/device.py | 4 +--- synapse/handlers/sync.py | 10 ++-------- synapse/storage/databases/main/cache.py | 4 +--- synapse/storage/databases/main/roommember.py | 16 +++++++--------- 4 files changed, 11 insertions(+), 23 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index fc54ef814f3c..8b2e932d28bf 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -271,9 +271,7 @@ async def get_user_ids_changed( possibly_left = possibly_changed | possibly_left # Double check if we still share rooms with the given user. - users_rooms = await self.store.get_rooms_for_users( - possibly_left - ) + users_rooms = await self.store.get_rooms_for_users(possibly_left) for changed_user_id, entries in users_rooms.items(): if any(room_id in room_ids for room_id in entries): possibly_left.discard(changed_user_id) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 80acd6930350..4e59596aa680 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1474,9 +1474,7 @@ async def _generate_sync_entry_for_device_list( since_token.device_list_key ) if changed_users is not None: - result = await self.store.get_rooms_for_users( - changed_users - ) + result = await self.store.get_rooms_for_users(changed_users) for changed_user_id, entries in result.items(): # Check if the changed user shares any rooms with the user, @@ -1517,11 +1515,7 @@ async def _generate_sync_entry_for_device_list( newly_left_users.update(left_users) # Remove any users that we still share a room with. - left_users_rooms = ( - await self.store.get_rooms_for_users( - newly_left_users - ) - ) + left_users_rooms = await self.store.get_rooms_for_users(newly_left_users) for user_id, entries in left_users_rooms.items(): if any(rid in joined_rooms for rid in entries): newly_left_users.discard(user_id) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 7db2071fdaa2..7c9b72260c27 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -205,9 +205,7 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: self.get_rooms_for_user_with_stream_ordering.invalidate( (data.state_key,) ) - self.get_rooms_for_user.invalidate( - (data.state_key,) - ) + self.get_rooms_for_user.invalidate((data.state_key,)) else: raise Exception("Unknown events stream row type %s" % (row.type,)) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 7b14ada866a3..829543358178 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -617,15 +617,15 @@ def _get_users_server_still_shares_room_with_txn( ) @cached(max_entries=500000, iterable=True) - async def get_rooms_for_user( - self, user_id: str - ) -> FrozenSet[str]: + async def get_rooms_for_user(self, user_id: str) -> FrozenSet[str]: """Returns a set of room_ids the user is currently joined to. If a remote user only returns rooms this server is currently participating in. """ - rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate(user_id) + rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate( + user_id + ) if rooms: return frozenset(r.room_id for r in rooms) @@ -648,7 +648,7 @@ def _get_rooms_for_user_txn( """ txn.execute(sql, (user_id, Membership.JOIN)) - return frozenset(txn) + return frozenset(row[0] for row in txn) @cachedList( cached_method_name="get_rooms_for_user", @@ -656,7 +656,7 @@ def _get_rooms_for_user_txn( ) async def get_rooms_for_users( self, user_ids: Collection[str] - ) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]: + ) -> Dict[str, FrozenSet[str]]: """A batched version of `get_rooms_for_user`. Returns: @@ -689,9 +689,7 @@ def _get_rooms_for_users_txn( txn.execute(sql, [Membership.JOIN] + args) - result: Dict[str, Set[str]] = { - user_id: set() for user_id in user_ids - } + result: Dict[str, Set[str]] = {user_id: set() for user_id in user_ids} for user_id, room_id in txn: result[user_id].add(room_id) From 661e7cf7f27d577869ac1ca14ae8f68610f31239 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Tue, 13 Sep 2022 10:08:07 +0100 Subject: [PATCH 06/15] Fix call to `get_immediate` --- synapse/storage/databases/main/roommember.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 829543358178..9f51115cd30d 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -624,7 +624,7 @@ async def get_rooms_for_user(self, user_id: str) -> FrozenSet[str]: participating in. """ rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate( - user_id + (user_id,), None, update_metrics=False, ) if rooms: return frozenset(r.room_id for r in rooms) From 3df457992c2ddc909c8f42e7130a3f40bf16cf19 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Tue, 13 Sep 2022 10:26:17 +0100 Subject: [PATCH 07/15] Add placeholder changelog file --- changelog.d/13787.misc | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 changelog.d/13787.misc diff --git a/changelog.d/13787.misc b/changelog.d/13787.misc new file mode 100644 index 000000000000..e14972c0696b --- /dev/null +++ b/changelog.d/13787.misc @@ -0,0 +1,2 @@ +Optimise get rooms for user calls. Contributed by Nick @ Beeper (@fizzadar). + From 0497004a922dfd2ba1bccd2129628c9993d978af Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Tue, 13 Sep 2022 10:26:47 +0100 Subject: [PATCH 08/15] Fix formatting --- synapse/storage/databases/main/roommember.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 9f51115cd30d..5cdf69913c62 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -624,7 +624,9 @@ async def get_rooms_for_user(self, user_id: str) -> FrozenSet[str]: participating in. """ rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate( - (user_id,), None, update_metrics=False, + (user_id,), + None, + update_metrics=False, ) if rooms: return frozenset(r.room_id for r in rooms) From 7e127f850586d61e34c9eb30c77c70c9e19b0987 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Tue, 13 Sep 2022 11:40:10 +0100 Subject: [PATCH 09/15] Don't overwrite variable --- synapse/handlers/device.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 8b2e932d28bf..9658135f9a47 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -273,7 +273,7 @@ async def get_user_ids_changed( # Double check if we still share rooms with the given user. users_rooms = await self.store.get_rooms_for_users(possibly_left) for changed_user_id, entries in users_rooms.items(): - if any(room_id in room_ids for room_id in entries): + if any(rid in room_ids for rid in entries): possibly_left.discard(changed_user_id) else: possibly_joined.discard(changed_user_id) From 7512017371b825e0b66bd7f63101930af201d18c Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Wed, 28 Sep 2022 08:45:02 +0100 Subject: [PATCH 10/15] Remove blank changelog line Co-authored-by: Erik Johnston --- changelog.d/13787.misc | 1 - 1 file changed, 1 deletion(-) diff --git a/changelog.d/13787.misc b/changelog.d/13787.misc index e14972c0696b..a9b93717f059 100644 --- a/changelog.d/13787.misc +++ b/changelog.d/13787.misc @@ -1,2 +1 @@ Optimise get rooms for user calls. Contributed by Nick @ Beeper (@fizzadar). - From 21c8e8d69636c370df474226a81d8c85dc72d13d Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Wed, 28 Sep 2022 09:04:19 +0100 Subject: [PATCH 11/15] Use `simple_select_many_batch` in `get_rooms_for_users` --- synapse/storage/databases/main/roommember.py | 46 ++++++++------------ 1 file changed, 18 insertions(+), 28 deletions(-) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 29de84aeaa80..b1c2f1997227 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from collections import defaultdict from typing import ( TYPE_CHECKING, Collection, @@ -681,38 +682,27 @@ async def get_rooms_for_users( Returns: Map from user_id to set of rooms that is currently in. """ - return await self.db_pool.runInteraction( - "get_rooms_for_users", - self._get_rooms_for_users_txn, - user_ids, - ) - - def _get_rooms_for_users_txn( - self, txn: LoggingTransaction, user_ids: Collection[str] - ) -> Dict[str, FrozenSet[str]]: - clause, args = make_in_list_sql_clause( - self.database_engine, - "c.state_key", - user_ids, + rows = await self.db_pool.simple_select_many_batch( + table="current_state_events", + column="state_key", + iterable=user_ids, + retcols=( + "user_id", + "room_id", + ), + keyvalues={ + "type": EventTypes.Member, + "membership": Membership.JOIN, + }, + desc="get_rooms_for_users", ) - sql = f""" - SELECT c.state_key, room_id - FROM current_state_events AS c - WHERE - c.type = 'm.room.member' - AND c.membership = ? - AND {clause} - """ - - txn.execute(sql, [Membership.JOIN] + args) - - result: Dict[str, Set[str]] = {user_id: set() for user_id in user_ids} - for user_id, room_id in txn: - result[user_id].add(room_id) + user_rooms: Dict[str, Set[str]] = defaultdict(set) + for row in rows: + user_rooms[row["user_id"]].add(row["room_id"]) - return {user_id: frozenset(v) for user_id, v in result.items()} + return {key: frozenset(rooms) for key, rooms in user_rooms.items()} @cached(max_entries=10000) async def does_pair_of_users_share_a_room( From 9907aaac02a99cbffd4df6521d1fa47448e98153 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Wed, 28 Sep 2022 08:52:48 +0100 Subject: [PATCH 12/15] Use `simple_select_onecol` in `get_rooms_for_user` --- synapse/storage/databases/main/roommember.py | 28 +++++++------------- 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index b1c2f1997227..839c2a033a1e 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -649,26 +649,18 @@ async def get_rooms_for_user(self, user_id: str) -> FrozenSet[str]: if rooms: return frozenset(r.room_id for r in rooms) - return await self.db_pool.runInteraction( - "get_rooms_for_user", - self._get_rooms_for_user_txn, - user_id, + room_ids = await self.db_pool.simple_select_onecol( + table="current_state_events", + keyvalues={ + "type": EventTypes.Member, + "membership": Membership.JOIN, + "state_key": user_id, + }, + retcol="room_id", + desc="get_rooms_for_user", ) - def _get_rooms_for_user_txn( - self, txn: LoggingTransaction, user_id: str - ) -> FrozenSet[str]: - sql = """ - SELECT room_id - FROM current_state_events AS c - WHERE - c.type = 'm.room.member' - AND c.state_key = ? - AND c.membership = ? - """ - - txn.execute(sql, (user_id, Membership.JOIN)) - return frozenset(row[0] for row in txn) + return frozenset(room_ids) @cachedList( cached_method_name="get_rooms_for_user", From 4da7073169a33eff46fd52b3e231fb200da9f8a6 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Wed, 28 Sep 2022 14:07:02 +0100 Subject: [PATCH 13/15] Fixed merge formatting --- synapse/storage/_base.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index b8b5b51d72a4..bf42aeb8d187 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -94,9 +94,7 @@ def _invalidate_state_caches( self._attempt_to_invalidate_cache( "get_rooms_for_user_with_stream_ordering", (user_id,) ) - self._attempt_to_invalidate_cache( - "get_rooms_for_user", (user_id,) - ) + self._attempt_to_invalidate_cache("get_rooms_for_user", (user_id,)) # Purge other caches based on room state. self._attempt_to_invalidate_cache("get_room_summary", (room_id,)) From bd70af9db19a6c63b0ee75bd3895834ef3e94b91 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Wed, 28 Sep 2022 14:50:19 +0100 Subject: [PATCH 14/15] Fix `current_state_events` column name --- synapse/storage/databases/main/roommember.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 48b4c7205e31..0965f8ca5f08 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -686,7 +686,7 @@ async def get_rooms_for_users( column="state_key", iterable=user_ids, retcols=( - "user_id", + "state_key", "room_id", ), keyvalues={ @@ -698,7 +698,7 @@ async def get_rooms_for_users( user_rooms: Dict[str, Set[str]] = defaultdict(set) for row in rows: - user_rooms[row["user_id"]].add(row["room_id"]) + user_rooms[row["state_key"]].add(row["room_id"]) return {key: frozenset(rooms) for key, rooms in user_rooms.items()} From 7b0e31146cc7200eb22efbd73e4b7f8ad93e6ad9 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Wed, 28 Sep 2022 15:46:09 +0100 Subject: [PATCH 15/15] Always ensure we return a value for every user in `get_rooms_for_users` The cached list (sensibly) requires that every input value has a corresponding output. --- synapse/storage/databases/main/roommember.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 0965f8ca5f08..982e1f08e30b 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from collections import defaultdict from typing import ( TYPE_CHECKING, Collection, @@ -696,7 +695,8 @@ async def get_rooms_for_users( desc="get_rooms_for_users", ) - user_rooms: Dict[str, Set[str]] = defaultdict(set) + user_rooms: Dict[str, Set[str]] = {user_id: set() for user_id in user_ids} + for row in rows: user_rooms[row["state_key"]].add(row["room_id"])