From 6406b70aebd61ecddf61e45196d7cb13428b509e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 11 May 2018 15:30:11 +0100 Subject: [PATCH] Use stream rather depth ordering for push actions This simplifies things as it is, but will also allow us to change the way we traverse topologically without having to update the way push actions work. --- synapse/storage/event_push_actions.py | 53 +++++++----------------- synapse/storage/receipts.py | 1 - tests/storage/test_event_push_actions.py | 4 +- 3 files changed, 18 insertions(+), 40 deletions(-) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index c22762eb5c8f..f084a5f54bad 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -18,8 +18,6 @@ from twisted.internet import defer from synapse.util.async import sleep from synapse.util.caches.descriptors import cachedInlineCallbacks -from synapse.types import RoomStreamToken -from .stream import lower_bound import logging import simplejson as json @@ -99,7 +97,7 @@ def get_unread_event_push_actions_by_room_for_user( def _get_unread_counts_by_receipt_txn(self, txn, room_id, user_id, last_read_event_id): sql = ( - "SELECT stream_ordering, topological_ordering" + "SELECT stream_ordering" " FROM events" " WHERE room_id = ? AND event_id = ?" ) @@ -111,17 +109,12 @@ def _get_unread_counts_by_receipt_txn(self, txn, room_id, user_id, return {"notify_count": 0, "highlight_count": 0} stream_ordering = results[0][0] - topological_ordering = results[0][1] return self._get_unread_counts_by_pos_txn( - txn, room_id, user_id, topological_ordering, stream_ordering + txn, room_id, user_id, stream_ordering ) - def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, topological_ordering, - stream_ordering): - token = RoomStreamToken( - topological_ordering, stream_ordering - ) + def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering): # First get number of notifications. # We don't need to put a notif=1 clause as all rows always have @@ -132,10 +125,10 @@ def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, topological_order " WHERE" " user_id = ?" " AND room_id = ?" - " AND %s" - ) % (lower_bound(token, self.database_engine, inclusive=False),) + " AND stream_ordering > ?" + ) - txn.execute(sql, (user_id, room_id)) + txn.execute(sql, (user_id, room_id, stream_ordering)) row = txn.fetchone() notify_count = row[0] if row else 0 @@ -155,10 +148,10 @@ def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, topological_order " highlight = 1" " AND user_id = ?" " AND room_id = ?" - " AND %s" - ) % (lower_bound(token, self.database_engine, inclusive=False),) + " AND stream_ordering > ?" + ) - txn.execute(sql, (user_id, room_id)) + txn.execute(sql, (user_id, room_id, stream_ordering)) row = txn.fetchone() highlight_count = row[0] if row else 0 @@ -209,7 +202,6 @@ def get_after_receipt(txn): " ep.highlight " " FROM (" " SELECT room_id," - " MAX(topological_ordering) as topological_ordering," " MAX(stream_ordering) as stream_ordering" " FROM events" " INNER JOIN receipts_linearized USING (room_id, event_id)" @@ -219,13 +211,7 @@ def get_after_receipt(txn): " event_push_actions AS ep" " WHERE" " ep.room_id = rl.room_id" - " AND (" - " ep.topological_ordering > rl.topological_ordering" - " OR (" - " ep.topological_ordering = rl.topological_ordering" - " AND ep.stream_ordering > rl.stream_ordering" - " )" - " )" + " AND ep.stream_ordering > rl.stream_ordering" " AND ep.user_id = ?" " AND ep.stream_ordering > ?" " AND ep.stream_ordering <= ?" @@ -318,7 +304,6 @@ def get_after_receipt(txn): " ep.highlight, e.received_ts" " FROM (" " SELECT room_id," - " MAX(topological_ordering) as topological_ordering," " MAX(stream_ordering) as stream_ordering" " FROM events" " INNER JOIN receipts_linearized USING (room_id, event_id)" @@ -329,13 +314,7 @@ def get_after_receipt(txn): " INNER JOIN events AS e USING (room_id, event_id)" " WHERE" " ep.room_id = rl.room_id" - " AND (" - " ep.topological_ordering > rl.topological_ordering" - " OR (" - " ep.topological_ordering = rl.topological_ordering" - " AND ep.stream_ordering > rl.stream_ordering" - " )" - " )" + " AND ep.stream_ordering > rl.stream_ordering" " AND ep.user_id = ?" " AND ep.stream_ordering > ?" " AND ep.stream_ordering <= ?" @@ -762,10 +741,10 @@ def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id): ) def _remove_old_push_actions_before_txn(self, txn, room_id, user_id, - topological_ordering, stream_ordering): + stream_ordering): """ Purges old push actions for a user and room before a given - topological_ordering. + stream_ordering. We however keep a months worth of highlighted notifications, so that users can still get a list of recent highlights. @@ -774,7 +753,7 @@ def _remove_old_push_actions_before_txn(self, txn, room_id, user_id, txn: The transcation room_id: Room ID to delete from user_id: user ID to delete for - topological_ordering: The lowest topological ordering which will + stream_ordering: The lowest stream ordering which will not be deleted. """ txn.call_after( @@ -793,9 +772,9 @@ def _remove_old_push_actions_before_txn(self, txn, room_id, user_id, txn.execute( "DELETE FROM event_push_actions " " WHERE user_id = ? AND room_id = ? AND " - " topological_ordering <= ?" + " stream_ordering <= ?" " AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)", - (user_id, room_id, topological_ordering, self.stream_ordering_month_ago) + (user_id, room_id, stream_ordering, self.stream_ordering_month_ago) ) txn.execute(""" diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 63997ed4491c..2f95e7e82a27 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -407,7 +407,6 @@ def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, txn, room_id=room_id, user_id=user_id, - topological_ordering=topological_ordering, stream_ordering=stream_ordering, ) diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index 9962ce8a5d98..3cbf9a78b183 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -55,7 +55,7 @@ def test_count_aggregation(self): def _assert_counts(noitf_count, highlight_count): counts = yield self.store.runInteraction( "", self.store._get_unread_counts_by_pos_txn, - room_id, user_id, 0, 0 + room_id, user_id, 0 ) self.assertEquals( counts, @@ -86,7 +86,7 @@ def _rotate(stream): def _mark_read(stream, depth): return self.store.runInteraction( "", self.store._remove_old_push_actions_before_txn, - room_id, user_id, depth, stream + room_id, user_id, stream ) yield _assert_counts(0, 0)