Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix clearing out old device list outbound pokes #1864

Merged
merged 3 commits into from
Jan 31, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions synapse/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,15 +436,27 @@ def mark_as_sent_devices_by_remote(self, destination, stream_id):
)

def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id):
# First we DELETE all rows such that only the latest row for each
# (destination, user_id is left. We do this by selecting first and
# deleting.
sql = """
SELECT user_id, coalesce(max(stream_id), 0) FROM device_lists_outbound_pokes
WHERE destination = ? AND stream_id <= ?
GROUP BY user_id
HAVING count(*) > 1
"""
txn.execute(sql, (destination, stream_id,))
rows = txn.fetchall()

sql = """
DELETE FROM device_lists_outbound_pokes
WHERE destination = ? AND stream_id < (
SELECT coalesce(max(stream_id), 0) FROM device_lists_outbound_pokes
WHERE destination = ? AND stream_id <= ?
)
WHERE destination = ? AND user_id = ? AND stream_id < ?
"""
txn.execute(sql, (destination, destination, stream_id,))
txn.executemany(
sql, ((destination, row[0], row[1],) for row in rows)
)

# Mark everything that is left as sent
sql = """
UPDATE device_lists_outbound_pokes SET sent = ?
WHERE destination = ? AND stream_id <= ?
Expand Down Expand Up @@ -545,18 +557,22 @@ def _prune_old_outbound_device_pokes(self):
(destination, user_id) tuple to ensure that the prev_ids remain correct
if the server does come back.
"""
now = self._clock.time_msec()
yesterday = self._clock.time_msec() - 24 * 60 * 60 * 1000

def _prune_txn(txn):
select_sql = """
SELECT destination, user_id, max(stream_id) as stream_id
FROM device_lists_outbound_pokes
GROUP BY destination, user_id
HAVING min(ts) < ? AND count(*) > 1
"""

txn.execute(select_sql)
txn.execute(select_sql, (yesterday,))
rows = txn.fetchall()

if not rows:
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we chuck a min(ts) into the SELECT so that we can filter out destinations that don't have anything to delete in them? I'm slightly worried at the number of deletes that we are spamming here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't expect min(ts) < ? to filter out many rows though, as most users won't update particularly frequently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you throw in a count then you could probably discard all the destination, user_id pairs that only have a single row in them.


delete_sql = """
DELETE FROM device_lists_outbound_pokes
WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ?
Expand All @@ -565,11 +581,13 @@ def _prune_txn(txn):
txn.executemany(
delete_sql,
(
(now, row["destination"], row["user_id"], row["stream_id"])
(yesterday, row[0], row[1], row[2])
for row in rows
)
)

logger.info("Pruned %d device list outbound pokes", txn.rowcount)

return self.runInteraction(
"_prune_old_outbound_device_pokes", _prune_txn
)