Skip to content

Commit

Permalink
Update Beeper preview query to correctly handle edit events (matrix-o…
Browse files Browse the repository at this point in the history
…rg#56)

* Update Beeper preview query to correctly handle edit events

We first want to ignore edit events from potential preview events as
these often replace events further back in history but come mots recent
on order.

We then do a second join for edit events that replace the selected (non
edit) preview event, and use where present, meaning edits on the latest
preview event show correctly.

* Rewrite preview query edit handling

* Fix join events w/relations

* Add preview edits test case

* Expand preview edit test to cover multiple edits

* Only generate previews if we have timeline events

* Remove cache on preview generation

Points towards there still being cache invalidation races somewhere...

Co-authored-by: Brad Murray <[email protected]>
  • Loading branch information
Fizzadar and bradtgmurray authored Jan 24, 2023
1 parent 3500b6f commit 1720967
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 40 deletions.
3 changes: 2 additions & 1 deletion synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -2470,7 +2470,8 @@ async def _generate_room_entry(
preview={},
)

if sync_config.beeper_previews:
# Only generate previews if we have new events that would change it
if batch.events and sync_config.beeper_previews:
preview = (
await self.store.beeper_preview_event_for_room_id_and_user_id(
room_id=room_id, user_id=user_id, to_key=now_token.room_key
Expand Down
82 changes: 48 additions & 34 deletions synapse/storage/databases/main/beeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
LoggingTransaction,
)
from synapse.types import RoomStreamToken
from synapse.util.caches.descriptors import cached

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -41,45 +40,60 @@ def __init__(
)
self.is_aggregating_notification_counts = False

@cached(max_entries=50000, num_args=2, tree=True)
async def beeper_preview_event_for_room_id_and_user_id(
self, room_id: str, user_id: str, to_key: RoomStreamToken
) -> Optional[Tuple[str, int]]:
def beeper_preview_txn(txn: LoggingTransaction) -> Optional[Tuple[str, int]]:
sql = """
SELECT e.event_id, COALESCE(re.origin_server_ts, e.origin_server_ts) as origin_server_ts
FROM events AS e
LEFT JOIN redactions as r
ON e.event_id = r.redacts
-- Join to relations to find replacements
LEFT JOIN event_relations as er
ON e.event_id = er.event_id AND er.relation_type = 'm.replace'
-- Join the original event that was replaced
LEFT JOIN events as re
ON re.event_id = er.relates_to_id
WHERE
e.stream_ordering <= ?
AND e.room_id = ?
AND r.redacts IS NULL
AND (
e.type = 'm.room.message'
OR e.type = 'm.room.encrypted'
OR e.type = 'm.reaction'
)
AND CASE
-- Only find non-redacted reactions to our own messages
WHEN (e.type = 'm.reaction') THEN (
SELECT ? = ee.sender AND ee.event_id NOT IN (
SELECT redacts FROM redactions WHERE redacts = ee.event_id
) FROM events as ee
WHERE ee.event_id = (
SELECT eer.relates_to_id FROM event_relations AS eer
WHERE eer.event_id = e.event_id
)
WITH latest_event AS (
SELECT e.event_id, e.origin_server_ts
FROM events AS e
LEFT JOIN redactions as r
ON e.event_id = r.redacts
-- Look to see if this event itself is an edit, as we don't want to
-- use edits ever as the "latest event"
LEFT JOIN event_relations as is_edit
ON e.event_id = is_edit.event_id AND is_edit.relation_type = 'm.replace'
WHERE
e.stream_ordering <= ?
AND e.room_id = ?
AND is_edit.event_id IS NULL
AND r.redacts IS NULL
AND (
e.type = 'm.room.message'
OR e.type = 'm.room.encrypted'
OR e.type = 'm.reaction'
)
ELSE (true) END
ORDER BY e.stream_ordering DESC
LIMIT 1
AND CASE
-- Only find non-redacted reactions to our own messages
WHEN (e.type = 'm.reaction') THEN (
SELECT ? = ee.sender AND ee.event_id NOT IN (
SELECT redacts FROM redactions WHERE redacts = ee.event_id
) FROM events as ee
WHERE ee.event_id = (
SELECT eer.relates_to_id FROM event_relations AS eer
WHERE eer.event_id = e.event_id
)
)
ELSE (true) END
ORDER BY e.stream_ordering DESC
LIMIT 1
),
latest_edit_for_latest_event AS (
SELECT e.event_id, e_replacement.event_id as replacement_event_id
FROM latest_event e
-- Find any events that edit this event, as we'll want to use the new content from
-- the edit as the preview
LEFT JOIN event_relations as er
ON e.event_id = er.relates_to_id AND er.relation_type = 'm.replace'
LEFT JOIN events as e_replacement
ON er.event_id = e_replacement.event_id
ORDER BY e_replacement.origin_server_ts DESC
LIMIT 1
)
SELECT COALESCE(lefle.replacement_event_id, le.event_id), le.origin_server_ts
FROM latest_event le
LEFT JOIN latest_edit_for_latest_event lefle ON le.event_id = lefle.event_id
"""

txn.execute(
Expand Down
4 changes: 0 additions & 4 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,6 @@ def _invalidate_caches_for_event(
"get_unread_event_push_actions_by_room_for_user", (room_id,)
)

self._attempt_to_invalidate_cache(
"beeper_preview_event_for_room_id_and_user_id", (room_id,)
)

# The `_get_membership_from_event_id` is immutable, except for the
# case where we look up an event *before* persisting it.
self._attempt_to_invalidate_cache("_get_membership_from_event_id", (event_id,))
Expand Down
80 changes: 79 additions & 1 deletion tests/rest/client/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,7 @@ def _check_unread_count(self, expected_count: int) -> None:
self.next_batch = channel.json_body["next_batch"]


class RoomPreviewTestCase(unittest.HomeserverTestCase):
class BeeperRoomPreviewTestCase(unittest.HomeserverTestCase):
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
Expand Down Expand Up @@ -1036,6 +1036,84 @@ def test_room_preview(self) -> None:
auth_token=self.tok2, expected={self.room_id: enc_1_body["event_id"]}
)

def test_room_preview_edits(self) -> None:
"""Tests that /sync returns a room preview with the latest message for room."""

# One user says hello.
# Check that a message we send returns a preview in the room (i.e. have multiple clients?)
send_body = self.helper.send(self.room_id, "hello", tok=self.tok)
self._check_preview_event_ids(
auth_token=self.tok, expected={self.room_id: send_body["event_id"]}
)

# Join new user. Should not show updated preview.
self.helper.join(room=self.room_id, user=self.user2, tok=self.tok2)
self._check_preview_event_ids(
auth_token=self.tok, expected={self.room_id: send_body["event_id"]}
)

# Second user says hello
# Check that the new user sending a message updates our preview
send_2_body = self.helper.send(self.room_id, "hello again!", tok=self.tok2)
self._check_preview_event_ids(self.tok, {self.room_id: send_2_body["event_id"]})

# First user edits their old message
# Check that this doesn't alter the preview
self.helper.send_event(
room_id=self.room_id,
type=EventTypes.Message,
content={
"body": "hello edit",
"msgtype": "m.text",
"m.relates_to": {
"rel_type": RelationTypes.REPLACE,
"event_id": send_body["event_id"],
},
},
tok=self.tok,
)
self._check_preview_event_ids(self.tok, {self.room_id: send_2_body["event_id"]})

# Now second user edits their (currently preview) message
# Check that this does become the preview
send_3_body = self.helper.send_event(
room_id=self.room_id,
type=EventTypes.Message,
content={
"body": "hello edit",
"msgtype": "m.text",
"m.relates_to": {
"rel_type": RelationTypes.REPLACE,
"event_id": send_2_body["event_id"],
},
},
tok=self.tok2,
)
self._check_preview_event_ids(self.tok, {self.room_id: send_3_body["event_id"]})

# Now second user edits their (currently preview) message again
# Check that this does become the preview, over the previous edit
send_4_body = self.helper.send_event(
room_id=self.room_id,
type=EventTypes.Message,
content={
"body": "hello edit 2",
"msgtype": "m.text",
"m.relates_to": {
"rel_type": RelationTypes.REPLACE,
"event_id": send_2_body["event_id"],
},
},
tok=self.tok2,
)
self._check_preview_event_ids(self.tok, {self.room_id: send_4_body["event_id"]})

# Finally, first user sends a message and this should become the preview
send_5_body = self.helper.send(self.room_id, "hello", tok=self.tok)
self._check_preview_event_ids(
auth_token=self.tok, expected={self.room_id: send_5_body["event_id"]}
)


class SyncCacheTestCase(unittest.HomeserverTestCase):
servlets = [
Expand Down

0 comments on commit 1720967

Please sign in to comment.