Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove _get_events_cache check optimisation from _have_seen_events_dict #14161

Merged
merged 4 commits into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14161.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in 1.30.0 where purging and rejoining a room without restarting in-between would result in a broken room.
31 changes: 13 additions & 18 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1502,38 +1502,33 @@ async def _have_seen_events_dict(
Returns:
a dict {event_id -> bool}
"""
# if the event cache contains the event, obviously we've seen it.

cache_results = {
event_id
for event_id in event_ids
if await self._get_event_cache.contains((event_id,))
}
results = dict.fromkeys(cache_results, True)
remaining = [
event_id for event_id in event_ids if event_id not in cache_results
]
if not remaining:
return results
# TODO: We used to query the _get_event_cache here as a fast-path before
# hitting the database. For if an event were in the cache, we've presumably
# seen it before.
#
# But this is currently an invalid assumption due to the _get_event_cache
# not being invalidated when purging events from a room. The optimisation can
# be re-added after https://github.com/matrix-org/synapse/issues/13476

def have_seen_events_txn(txn: LoggingTransaction) -> None:
def have_seen_events_txn(txn: LoggingTransaction) -> Dict[str, bool]:
# we deliberately do *not* query the database for room_id, to make the
# query an index-only lookup on `events_event_id_key`.
#
# We therefore pull the events from the database into a set...

sql = "SELECT event_id FROM events AS e WHERE "
clause, args = make_in_list_sql_clause(
txn.database_engine, "e.event_id", remaining
txn.database_engine, "e.event_id", event_ids
)
txn.execute(sql + clause, args)
found_events = {eid for eid, in txn}

# ... and then we can update the results for each key
results.update({eid: (eid in found_events) for eid in remaining})
return {eid: (eid in found_events) for eid in event_ids}

await self.db_pool.runInteraction("have_seen_events", have_seen_events_txn)
return results
return await self.db_pool.runInteraction(
"have_seen_events", have_seen_events_txn
)

@cached(max_entries=100000, tree=True)
async def have_seen_event(self, room_id: str, event_id: str) -> bool:
Expand Down
12 changes: 0 additions & 12 deletions tests/storage/databases/main/test_events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,6 @@ def test_simple(self):
self.assertEqual(res, {self.event_ids[0]})
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)

def test_query_via_event_cache(self):
# fetch an event into the event cache
self.get_success(self.store.get_event(self.event_ids[0]))

# looking it up should now cause no db hits
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events(self.room_id, [self.event_ids[0]])
)
self.assertEqual(res, {self.event_ids[0]})
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)

def test_persisting_event_invalidates_cache(self):
"""
Test to make sure that the `have_seen_event` cache
Expand Down