diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 93472d011758..4e513afc361e 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -396,15 +396,17 @@ async def _delete_device_messages( up_to_stream_id = task.params["up_to_stream_id"] # Delete the messages in batches to avoid too much DB load. + from_stream_id = None while True: - res = await self.store.delete_messages_for_device( + from_stream_id = await self.store.delete_messages_for_device_between( user_id=user_id, device_id=device_id, - up_to_stream_id=up_to_stream_id, + from_stream_id=from_stream_id, + to_stream_id=up_to_stream_id, limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT, ) - if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT: + if from_stream_id is None: return TaskStatus.COMPLETE, None, None await self.clock.sleep(DeviceHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 3e7425d4a654..8ad61a98b055 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -514,6 +514,70 @@ def delete_messages_for_device_txn(txn: LoggingTransaction) -> int: return count + @trace + async def delete_messages_for_device_between( + self, + user_id: str, + device_id: Optional[str], + from_stream_id: Optional[int], + to_stream_id: int, + limit: int, + ) -> Optional[int]: + """Delete N device messages between the stream IDs, returning the + highest stream ID deleted. + + This is more efficient than `delete_messages_for_device` when calling in + a loop to batch delete messages. + """ + + # Keeping track of a lower bound of stream ID where we've deleted + # everything below makes the queries much faster. Otherwise, every time + # we scan for rows to delete we'd re-scan across all the rows that have + # previously deleted (until the next table VACUUM). + + if from_stream_id is None: + # Minimum device stream ID is 1. + from_stream_id = 0 + + def delete_messages_for_device_between_txn( + txn: LoggingTransaction, + ) -> Optional[int]: + txn.execute( + """ + SELECT MAX(stream_id) FROM ( + SELECT stream_id FROM device_inbox + WHERE user_id = ? AND device_id = ? + AND ? < stream_id AND stream_id <= ? + ORDER BY stream_id + LIMIT ? + ) AS d + """, + (user_id, device_id, from_stream_id, to_stream_id, limit), + ) + (max_stream_id,) = txn.fetchone() + if max_stream_id is None: + return None + + txn.execute( + """ + DELETE FROM device_inbox + WHERE user_id = ? AND device_id = ? + AND ? < stream_id AND stream_id <= ? + """, + (user_id, device_id, from_stream_id, max_stream_id), + ) + + if txn.rowcount < limit: + return None + + return max_stream_id + + return await self.db_pool.runInteraction( + "delete_messages_for_device_between", + delete_messages_for_device_between_txn, + db_autocommit=True, # We don't need to run in a transaction + ) + @trace async def get_new_device_msgs_for_remote( self, destination: str, last_stream_id: int, current_stream_id: int, limit: int diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index caf13b3474be..29c561e55528 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -193,7 +193,7 @@ async def update_task( result: Optional[JsonMapping] = None, error: Optional[str] = None, ) -> bool: - """Update some task associated values. This is exposed publically so it can + """Update some task associated values. This is exposed publicly so it can be used inside task functions, mainly to update the result and be able to resume a task at a specific step after a restart of synapse.