From 3fb73c3d5a73ae36475f9199e25b7c0a39550302 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Jun 2022 12:21:14 +0100 Subject: [PATCH] Handle remote receipts better --- .../databases/main/event_push_actions.py | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 3defb443698e..811de60423a4 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -865,6 +865,16 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: rows = txn.fetchall() if not rows: + # We always update `event_push_summary_last_receipt_stream_id` to + # ensure that we don't rescan the same receipts for remote users. + # + # This requires repeatable read to be safe. + txn.execute( + """ + UPDATE event_push_summary_last_receipt_stream_id + SET stream_id = (SELECT COALESCE(MAX(stream_id), 0) FROM receipts_linearized) + """ + ) return True # For each new read receipt we delete push actions from before it and @@ -908,13 +918,15 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: }, ) - last_stream_id = rows[-1][0] - - self.db_pool.simple_update_one_txn( - txn, - table="event_push_summary_last_receipt_stream_id", - keyvalues={}, - updatevalues={"stream_id": last_stream_id}, + # We always update `event_push_summary_last_receipt_stream_id` to + # ensure that we don't rescan the same receipts for remote users. + # + # This requires repeatable read to be safe. + txn.execute( + """ + UPDATE event_push_summary_last_receipt_stream_id + SET stream_id = (SELECT COALESCE(MAX(stream_id), 0) FROM receipts_linearized) + """ ) return len(rows) < limit