Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Speed up deleting device messages (#16643)
Browse files Browse the repository at this point in the history
Keeping track of a lower bound of stream ID where we've deleted everything below makes the queries much faster. Otherwise, every time we scan for rows to delete we'd re-scan across all the rows that have previously deleted (until the next table VACUUM).
  • Loading branch information
erikjohnston authored Nov 16, 2023
1 parent 1b238e8 commit 3e8531d
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 29 deletions.
1 change: 1 addition & 0 deletions changelog.d/16643.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up deleting of device messages when deleting a device.
8 changes: 5 additions & 3 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,15 +396,17 @@ async def _delete_device_messages(
up_to_stream_id = task.params["up_to_stream_id"]

# Delete the messages in batches to avoid too much DB load.
from_stream_id = None
while True:
res = await self.store.delete_messages_for_device(
from_stream_id, _ = await self.store.delete_messages_for_device_between(
user_id=user_id,
device_id=device_id,
up_to_stream_id=up_to_stream_id,
from_stream_id=from_stream_id,
to_stream_id=up_to_stream_id,
limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
)

if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
if from_stream_id is None:
return TaskStatus.COMPLETE, None, None

await self.clock.sleep(DeviceHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0)
Expand Down
106 changes: 81 additions & 25 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,14 +450,12 @@ async def delete_messages_for_device(
user_id: str,
device_id: Optional[str],
up_to_stream_id: int,
limit: Optional[int] = None,
) -> int:
"""
Args:
user_id: The recipient user_id.
device_id: The recipient device_id.
up_to_stream_id: Where to delete messages up to.
limit: maximum number of messages to delete
Returns:
The number of messages deleted.
Expand All @@ -478,32 +476,22 @@ async def delete_messages_for_device(
log_kv({"message": "No changes in cache since last check"})
return 0

def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
limit_statement = "" if limit is None else f"LIMIT {limit}"
sql = f"""
DELETE FROM device_inbox WHERE user_id = ? AND device_id = ? AND stream_id <= (
SELECT MAX(stream_id) FROM (
SELECT stream_id FROM device_inbox
WHERE user_id = ? AND device_id = ? AND stream_id <= ?
ORDER BY stream_id
{limit_statement}
) AS q1
)
"""
txn.execute(sql, (user_id, device_id, user_id, device_id, up_to_stream_id))
return txn.rowcount

count = await self.db_pool.runInteraction(
"delete_messages_for_device", delete_messages_for_device_txn
)
from_stream_id = None
count = 0
while True:
from_stream_id, loop_count = await self.delete_messages_for_device_between(
user_id,
device_id,
from_stream_id=from_stream_id,
to_stream_id=up_to_stream_id,
limit=1000,
)
count += loop_count
if from_stream_id is None:
break

log_kv({"message": f"deleted {count} messages for device", "count": count})

# In this case we don't know if we hit the limit or the delete is complete
# so let's not update the cache.
if count == limit:
return count

# Update the cache, ensuring that we only ever increase the value
updated_last_deleted_stream_id = self._last_device_delete_cache.get(
(user_id, device_id), 0
Expand All @@ -514,6 +502,74 @@ def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:

return count

@trace
async def delete_messages_for_device_between(
self,
user_id: str,
device_id: Optional[str],
from_stream_id: Optional[int],
to_stream_id: int,
limit: int,
) -> Tuple[Optional[int], int]:
"""Delete N device messages between the stream IDs, returning the
highest stream ID deleted (or None if all messages in the range have
been deleted) and the number of messages deleted.
This is more efficient than `delete_messages_for_device` when calling in
a loop to batch delete messages.
"""

# Keeping track of a lower bound of stream ID where we've deleted
# everything below makes the queries much faster. Otherwise, every time
# we scan for rows to delete we'd re-scan across all the rows that have
# previously deleted (until the next table VACUUM).

if from_stream_id is None:
# Minimum device stream ID is 1.
from_stream_id = 0

def delete_messages_for_device_between_txn(
txn: LoggingTransaction,
) -> Tuple[Optional[int], int]:
txn.execute(
"""
SELECT MAX(stream_id) FROM (
SELECT stream_id FROM device_inbox
WHERE user_id = ? AND device_id = ?
AND ? < stream_id AND stream_id <= ?
ORDER BY stream_id
LIMIT ?
) AS d
""",
(user_id, device_id, from_stream_id, to_stream_id, limit),
)
row = txn.fetchone()
if row is None or row[0] is None:
return None, 0

(max_stream_id,) = row

txn.execute(
"""
DELETE FROM device_inbox
WHERE user_id = ? AND device_id = ?
AND ? < stream_id AND stream_id <= ?
""",
(user_id, device_id, from_stream_id, max_stream_id),
)

num_deleted = txn.rowcount
if num_deleted < limit:
return None, num_deleted

return max_stream_id, num_deleted

return await self.db_pool.runInteraction(
"delete_messages_for_device_between",
delete_messages_for_device_between_txn,
db_autocommit=True, # We don't need to run in a transaction
)

@trace
async def get_new_device_msgs_for_remote(
self, destination: str, last_stream_id: int, current_stream_id: int, limit: int
Expand Down
2 changes: 1 addition & 1 deletion synapse/util/task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ async def update_task(
result: Optional[JsonMapping] = None,
error: Optional[str] = None,
) -> bool:
"""Update some task associated values. This is exposed publically so it can
"""Update some task associated values. This is exposed publicly so it can
be used inside task functions, mainly to update the result and be able to
resume a task at a specific step after a restart of synapse.
Expand Down

0 comments on commit 3e8531d

Please sign in to comment.