Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Speed up deleting device messages
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Nov 16, 2023
1 parent 43d1aa7 commit bf93d9f
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 4 deletions.
8 changes: 5 additions & 3 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,15 +396,17 @@ async def _delete_device_messages(
up_to_stream_id = task.params["up_to_stream_id"]

# Delete the messages in batches to avoid too much DB load.
from_stream_id = None
while True:
res = await self.store.delete_messages_for_device(
from_stream_id = await self.store.delete_messages_for_device_between(
user_id=user_id,
device_id=device_id,
up_to_stream_id=up_to_stream_id,
from_stream_id=from_stream_id,
to_stream_id=up_to_stream_id,
limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
)

if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
if from_stream_id is None:
return TaskStatus.COMPLETE, None, None

await self.clock.sleep(DeviceHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0)
Expand Down
66 changes: 66 additions & 0 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,72 @@ def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:

return count

@trace
async def delete_messages_for_device_between(
self,
user_id: str,
device_id: Optional[str],
from_stream_id: Optional[int],
to_stream_id: int,
limit: int,
) -> Optional[int]:
"""Delete N device messages between the stream IDs, returning the
highest stream ID deleted.
This is more efficient than `delete_messages_for_device` when calling in
a loop to batch delete messages.
"""

# Keeping track of a lower bound of stream ID where we've deleted
# everything below makes the queries much faster. Otherwise, every time
# we scan for rows to delete we'd re-scan across all the rows that have
# previously deleted (until the next table VACUUM).

if from_stream_id is None:
# Minimum device stream ID is 1.
from_stream_id = 0

def delete_messages_for_device_between_txn(
txn: LoggingTransaction,
) -> Optional[int]:
txn.execute(
"""
SELECT MAX(stream_id) FROM (
SELECT stream_id FROM device_inbox
WHERE user_id = ? AND device_id = ?
AND ? < stream_id AND stream_id <= ?
ORDER BY stream_id
LIMIT ?
) AS d
""",
(user_id, device_id, from_stream_id, to_stream_id, limit),
)
row = txn.fetchone()
if row is None or row[0] is None:
return None

(max_stream_id,) = row

txn.execute(
"""
DELETE FROM device_inbox
WHERE user_id = ? AND device_id = ?
AND ? < stream_id AND stream_id <= ?
""",
(user_id, device_id, from_stream_id, max_stream_id),
)

if txn.rowcount < limit:
return None

return max_stream_id

return await self.db_pool.runInteraction(
"delete_messages_for_device_between",
delete_messages_for_device_between_txn,
db_autocommit=True, # We don't need to run in a transaction
)

@trace
async def get_new_device_msgs_for_remote(
self, destination: str, last_stream_id: int, current_stream_id: int, limit: int
Expand Down
2 changes: 1 addition & 1 deletion synapse/util/task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ async def update_task(
result: Optional[JsonMapping] = None,
error: Optional[str] = None,
) -> bool:
"""Update some task associated values. This is exposed publically so it can
"""Update some task associated values. This is exposed publicly so it can
be used inside task functions, mainly to update the result and be able to
resume a task at a specific step after a restart of synapse.
Expand Down

0 comments on commit bf93d9f

Please sign in to comment.