Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix state cache invalidation on workers #4715

Merged
merged 2 commits into from
Feb 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/4715.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve replication performance by reducing cache invalidation traffic.
7 changes: 1 addition & 6 deletions synapse/replication/slave/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,7 @@ def process_replication_rows(self, stream_name, token, rows):
members_changed = set(row.keys[1:])
self._invalidate_state_caches(room_id, members_changed)
else:
try:
getattr(self, row.cache_func).invalidate(tuple(row.keys))
except AttributeError:
# We probably haven't pulled in the cache in this worker,
# which is fine.
pass
self._attempt_to_invalidate_cache(row.cache_func, tuple(row.keys))

def _invalidate_cache_and_stream(self, txn, cache_func, keys):
txn.call_after(cache_func.invalidate, keys)
Expand Down
40 changes: 34 additions & 6 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1342,15 +1342,43 @@ def _invalidate_state_caches(self, room_id, members_changed):
changed
"""
for member in members_changed:
self.get_rooms_for_user_with_stream_ordering.invalidate((member,))
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", (member,),
)

for host in set(get_domain_from_id(u) for u in members_changed):
self.is_host_joined.invalidate((room_id, host))
self.was_host_joined.invalidate((room_id, host))
self._attempt_to_invalidate_cache(
"is_host_joined", (room_id, host,),
)
self._attempt_to_invalidate_cache(
"was_host_joined", (room_id, host,),
)

self._attempt_to_invalidate_cache(
"get_users_in_room", (room_id,),
)
self._attempt_to_invalidate_cache(
"get_room_summary", (room_id,),
)
self._attempt_to_invalidate_cache(
"get_current_state_ids", (room_id,),
)

def _attempt_to_invalidate_cache(self, cache_name, key):
"""Attempts to invalidate the cache of the given name, ignoring if the
cache doesn't exist. Mainly used for invalidating caches on workers,
where they may not have the cache.

self.get_users_in_room.invalidate((room_id,))
self.get_room_summary.invalidate((room_id,))
self.get_current_state_ids.invalidate((room_id,))
Args:
cache_name (str)
key (tuple)
"""
try:
getattr(self, cache_name).invalidate(key)
except AttributeError:
# We probably haven't pulled in the cache in this worker,
# which is fine.
pass

def _send_invalidation_to_replication(self, txn, cache_name, keys):
"""Notifies replication that given cache has been invalidated.
Expand Down