Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix serialization errors when rotating notifications #13118

Merged
merged 14 commits into from
Jun 28, 2022
Merged
1 change: 1 addition & 0 deletions changelog.d/13118.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce DB usage of `/sync` when a large number of unread messages have recently been sent in a room.
201 changes: 135 additions & 66 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,30 @@ def _get_unread_counts_by_pos_txn(

counts = NotifCounts()

# First we pull the counts from the summary table
# First we pull the counts from the summary table.
#
# We check that `last_receipt_stream_ordering` matches the stream
# ordering given. If it doesn't match then a new read receipt has arrived and
# we haven't yet updated the counts in `event_push_summary` to reflect
# that; in that case we simply ignore `event_push_summary` counts
# and do a manual count of all of the rows in the `event_push_actions` table
# for this user/room.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some edge case where this will go wrong if the previous RR was over 24 hours ago (so we've started removing things from event_push_actions)? Or do we not really care?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we don't care. The previous behaviour is that if you send a read receipt that is over 24 hours old then the updated counts will only include unread messages less than 24 hours old. I think this keeps that same behaviour.

#
# If `last_receipt_stream_ordering` is null then that means it's up to
# date (as the row was written by an older version of Synapse that
# updated `event_push_summary` synchronously when persisting a new read
# receipt).
txn.execute(
"""
SELECT stream_ordering, notif_count, COALESCE(unread_count, 0)
FROM event_push_summary
WHERE room_id = ? AND user_id = ? AND stream_ordering > ?
WHERE room_id = ? AND user_id = ?
AND (
(last_receipt_stream_ordering IS NULL AND stream_ordering > ?)
OR last_receipt_stream_ordering = ?
)
""",
(room_id, user_id, stream_ordering),
(room_id, user_id, stream_ordering, stream_ordering),
)
row = txn.fetchone()

Expand All @@ -263,9 +279,9 @@ def _get_unread_counts_by_pos_txn(
if row:
counts.highlight_count += row[0]

# Finally we need to count push actions that haven't been summarized
# yet.
# We only want to pull out push actions that we haven't summarized yet.
# Finally we need to count push actions that aren't included in the
# summary returned above, e.g. recent events that haven't been
# summarized yet, or the summary is empty due to a recent read receipt.
stream_ordering = max(stream_ordering, summary_stream_ordering)
notify_count, unread_count = self._get_notif_unread_count_for_user_room(
txn, room_id, user_id, stream_ordering
Expand Down Expand Up @@ -800,6 +816,19 @@ async def _rotate_notifs(self) -> None:
self._doing_notif_rotation = True

try:
# First we recalculate push summaries and delete stale push actions
# for rooms/users with new receipts.
while True:
logger.debug("Handling new receipts")

caught_up = await self.db_pool.runInteraction(
"_handle_new_receipts_for_notifs_txn",
self._handle_new_receipts_for_notifs_txn,
)
if caught_up:
break

# Then we update the event push summaries for any new events
while True:
logger.info("Rotating notifications")

Expand All @@ -810,10 +839,110 @@ async def _rotate_notifs(self) -> None:
break
await self.hs.get_clock().sleep(self._rotate_delay)

# Finally we clear out old event push actions.
await self._remove_old_push_actions_that_have_rotated()
finally:
self._doing_notif_rotation = False

def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
"""Check for new read receipts and delete from event push actions.

Any push actions which predate the user's most recent read receipt are
now redundant, so we can remove them from `event_push_actions` and
update `event_push_summary`.
"""

limit = 100

min_stream_id = self.db_pool.simple_select_one_onecol_txn(
txn,
table="event_push_summary_last_receipt_stream_id",
keyvalues={},
retcol="stream_id",
)

sql = """
SELECT r.stream_id, r.room_id, r.user_id, e.stream_ordering
FROM receipts_linearized AS r
INNER JOIN events AS e USING (event_id)
WHERE r.stream_id > ? AND user_id LIKE ?
ORDER BY r.stream_id ASC
LIMIT ?
"""

# We only want local users, so we add a dodgy filter to the above query
# and recheck it below.
user_filter = "%:" + self.hs.hostname

txn.execute(
sql,
(
min_stream_id,
user_filter,
limit,
),
)
rows = txn.fetchall()

# For each new read receipt we delete push actions from before it and
# recalculate the summary.
for _, room_id, user_id, stream_ordering in rows:
# Only handle our own read receipts.
if not self.hs.is_mine_id(user_id):
continue

txn.execute(
"""
DELETE FROM event_push_actions
WHERE room_id = ?
AND user_id = ?
AND stream_ordering <= ?
AND highlight = 0
""",
(room_id, user_id, stream_ordering),
)

old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
txn,
table="event_push_summary_stream_ordering",
keyvalues={},
retcol="stream_ordering",
)

notif_count, unread_count = self._get_notif_unread_count_for_user_room(
txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering
)

self.db_pool.simple_upsert_txn(
txn,
table="event_push_summary",
keyvalues={"room_id": room_id, "user_id": user_id},
values={
"notif_count": notif_count,
"unread_count": unread_count,
"stream_ordering": old_rotate_stream_ordering,
"last_receipt_stream_ordering": stream_ordering,
},
)

# We always update `event_push_summary_last_receipt_stream_id` to
# ensure that we don't rescan the same receipts for remote users.
#
# This requires repeatable read to be safe, as we need the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'd be quite nice to do an assertion that we have the right isolation level, but that looks a bit fiddly and probably a thing for another time.

# `MAX(stream_id)` to not include any new rows that have been committed
# since the start of the transaction (since those rows won't have been
# returned by the query above). Alternatively we could query the max
# stream ID at the start of the transaction and bound everything by
# that.
txn.execute(
"""
UPDATE event_push_summary_last_receipt_stream_id
SET stream_id = (SELECT COALESCE(MAX(stream_id), 0) FROM receipts_linearized)
"""
)

return len(rows) < limit

def _rotate_notifs_txn(self, txn: LoggingTransaction) -> bool:
"""Archives older notifications into event_push_summary. Returns whether
the archiving process has caught up or not.
Expand Down Expand Up @@ -1033,66 +1162,6 @@ def remove_old_push_actions_that_have_rotated_txn(
if done:
break

def _remove_old_push_actions_before_txn(
self, txn: LoggingTransaction, room_id: str, user_id: str, stream_ordering: int
) -> None:
"""
Purges old push actions for a user and room before a given
stream_ordering.

We however keep a months worth of highlighted notifications, so that
users can still get a list of recent highlights.

Args:
txn: The transaction
room_id: Room ID to delete from
user_id: user ID to delete for
stream_ordering: The lowest stream ordering which will
not be deleted.
"""
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate,
(room_id, user_id),
)

# We need to join on the events table to get the received_ts for
# event_push_actions and sqlite won't let us use a join in a delete so
# we can't just delete where received_ts < x. Furthermore we can
# only identify event_push_actions by a tuple of room_id, event_id
# we we can't use a subquery.
# Instead, we look up the stream ordering for the last event in that
# room received before the threshold time and delete event_push_actions
# in the room with a stream_odering before that.
txn.execute(
"DELETE FROM event_push_actions "
" WHERE user_id = ? AND room_id = ? AND "
" stream_ordering <= ?"
" AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)",
(user_id, room_id, stream_ordering, self.stream_ordering_month_ago),
)

old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
txn,
table="event_push_summary_stream_ordering",
keyvalues={},
retcol="stream_ordering",
)

notif_count, unread_count = self._get_notif_unread_count_for_user_room(
txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering
)

self.db_pool.simple_upsert_txn(
txn,
table="event_push_summary",
keyvalues={"room_id": room_id, "user_id": user_id},
values={
"notif_count": notif_count,
"unread_count": unread_count,
"stream_ordering": old_rotate_stream_ordering,
},
)

erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

class EventPushActionsStore(EventPushActionsWorkerStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
Expand Down
13 changes: 1 addition & 12 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
cast,
)

from synapse.api.constants import EduTypes, ReceiptTypes
from synapse.api.constants import EduTypes
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import ReceiptsStream
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
Expand Down Expand Up @@ -682,17 +682,6 @@ def _insert_linearized_receipt_txn(
lock=False,
)

# When updating a local users read receipt, remove any push actions
# which resulted from the receipt's event and all earlier events.
if (
self.hs.is_mine_id(user_id)
and receipt_type in (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE)
and stream_ordering is not None
):
self._remove_old_push_actions_before_txn( # type: ignore[attr-defined]
txn, room_id=room_id, user_id=user_id, stream_ordering=stream_ordering
)

return rx_ts

def _graph_to_linear(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Add a column that records the position of the read receipt for the user at
-- the time we summarised the push actions. This is used to check if the counts
-- are up to date after a new read receipt has been sent.
--
-- Null means that we can skip that check, as the row was written by an older
-- version of Synapse that updated `event_push_summary` synchronously when
-- persisting a new read receipt
ALTER TABLE event_push_summary ADD COLUMN last_receipt_stream_ordering BIGINT;


-- Tracks which new receipts we've handled
CREATE TABLE event_push_summary_last_receipt_stream_id (
Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
stream_id BIGINT NOT NULL,
CHECK (Lock='X')
);

INSERT INTO event_push_summary_last_receipt_stream_id (stream_id)
SELECT COALESCE(MAX(stream_id), 0)
FROM receipts_linearized;
35 changes: 30 additions & 5 deletions tests/storage/test_event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def test_get_unread_push_actions_for_user_in_range_for_email(self) -> None:

def test_count_aggregation(self) -> None:
room_id = "!foo:example.com"
user_id = "@user1235:example.com"
user_id = "@user1235:test"

last_read_stream_ordering = [0]

Expand All @@ -81,11 +81,26 @@ def _assert_counts(noitf_count: int, highlight_count: int) -> None:
def _inject_actions(stream: int, action: list) -> None:
event = Mock()
event.room_id = room_id
event.event_id = "$test:example.com"
event.event_id = f"$test{stream}:example.com"
event.internal_metadata.stream_ordering = stream
event.internal_metadata.is_outlier.return_value = False
event.depth = stream

self.get_success(
self.store.db_pool.simple_insert(
table="events",
values={
"stream_ordering": stream,
"topological_ordering": stream,
"type": "m.room.message",
"room_id": room_id,
"processed": True,
"outlier": False,
"event_id": event.event_id,
},
)
)

self.get_success(
self.store.add_push_actions_to_staging(
event.event_id,
Expand All @@ -105,18 +120,28 @@ def _inject_actions(stream: int, action: list) -> None:
def _rotate(stream: int) -> None:
self.get_success(
self.store.db_pool.runInteraction(
"", self.store._rotate_notifs_before_txn, stream
"rotate-receipts", self.store._handle_new_receipts_for_notifs_txn
)
)

self.get_success(
self.store.db_pool.runInteraction(
"rotate-notifs", self.store._rotate_notifs_before_txn, stream
)
)

def _mark_read(stream: int, depth: int) -> None:
last_read_stream_ordering[0] = stream

self.get_success(
self.store.db_pool.runInteraction(
"",
self.store._remove_old_push_actions_before_txn,
self.store._insert_linearized_receipt_txn,
room_id,
"m.read",
user_id,
f"$test{stream}:example.com",
{},
stream,
)
)
Expand Down Expand Up @@ -150,7 +175,7 @@ def _mark_read(stream: int, depth: int) -> None:

_assert_counts(1, 0)

_mark_read(7, 7)
_mark_read(6, 6)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a change in behavior then?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is just making sure we're inject a read receipt at an actual event (which we have to), rather than at some arbitrary place.

(Side note: we need to rewrite this test to not poke things directly into the DB, and instead create real rooms, events and receipts)

_assert_counts(0, 0)

_inject_actions(8, HIGHLIGHT)
Expand Down