From 05b9f48ee577f1cbdd5c5837f22c0d9cbe4c44cc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 Jan 2017 10:08:55 +0000 Subject: [PATCH 1/3] Fix clearing out old device list outbound pokes --- synapse/storage/devices.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index f0353929dad9..e6fe67ee25c8 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -545,7 +545,7 @@ def _prune_old_outbound_device_pokes(self): (destination, user_id) tuple to ensure that the prev_ids remain correct if the server does come back. """ - now = self._clock.time_msec() + yesterday = self._clock.time_msec() - 24 * 60 * 60 * 1000 def _prune_txn(txn): select_sql = """ @@ -557,6 +557,9 @@ def _prune_txn(txn): txn.execute(select_sql) rows = txn.fetchall() + if not rows: + return + delete_sql = """ DELETE FROM device_lists_outbound_pokes WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ? @@ -565,11 +568,13 @@ def _prune_txn(txn): txn.executemany( delete_sql, ( - (now, row["destination"], row["user_id"], row["stream_id"]) + (yesterday, row[0], row[1], row[2]) for row in rows ) ) + logger.info("Pruned %d device list outbound pokes", txn.rowcount) + return self.runInteraction( "_prune_old_outbound_device_pokes", _prune_txn ) From d3169e8d28a4b7238256ff4d3151e3cc8feef0e1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 Jan 2017 11:20:03 +0000 Subject: [PATCH 2/3] Only fetch with row ts and count > 1 --- synapse/storage/devices.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index e6fe67ee25c8..cccefdd3d2ac 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -552,9 +552,10 @@ def _prune_txn(txn): SELECT destination, user_id, max(stream_id) as stream_id FROM device_lists_outbound_pokes GROUP BY destination, user_id + HAVING min(ts) < ? AND count(*) > 1 """ - txn.execute(select_sql) + txn.execute(select_sql, (yesterday,)) rows = txn.fetchall() if not rows: From ab55794b6f57988204605f3b1e7245a66e91dcec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 Jan 2017 13:22:41 +0000 Subject: [PATCH 3/3] Fix deletion of old sent devices correctly --- synapse/storage/devices.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index cccefdd3d2ac..8e1780036419 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -436,15 +436,27 @@ def mark_as_sent_devices_by_remote(self, destination, stream_id): ) def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id): + # First we DELETE all rows such that only the latest row for each + # (destination, user_id is left. We do this by selecting first and + # deleting. + sql = """ + SELECT user_id, coalesce(max(stream_id), 0) FROM device_lists_outbound_pokes + WHERE destination = ? AND stream_id <= ? + GROUP BY user_id + HAVING count(*) > 1 + """ + txn.execute(sql, (destination, stream_id,)) + rows = txn.fetchall() + sql = """ DELETE FROM device_lists_outbound_pokes - WHERE destination = ? AND stream_id < ( - SELECT coalesce(max(stream_id), 0) FROM device_lists_outbound_pokes - WHERE destination = ? AND stream_id <= ? - ) + WHERE destination = ? AND user_id = ? AND stream_id < ? """ - txn.execute(sql, (destination, destination, stream_id,)) + txn.executemany( + sql, ((destination, row[0], row[1],) for row in rows) + ) + # Mark everything that is left as sent sql = """ UPDATE device_lists_outbound_pokes SET sent = ? WHERE destination = ? AND stream_id <= ?