diff --git a/changelog.d/16318.misc b/changelog.d/16318.misc new file mode 100644 index 000000000000..1433a2f24645 --- /dev/null +++ b/changelog.d/16318.misc @@ -0,0 +1 @@ +Speed up task to delete to-device messages. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 0d3d5ebc86d7..86ad96d030d2 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -388,7 +388,8 @@ async def handle_room_un_partial_stated(self, room_id: str) -> None: "Trying handling device list state for partial join: not supported on workers." ) - DEVICE_MSGS_DELETE_BATCH_LIMIT = 100 + DEVICE_MSGS_DELETE_BATCH_LIMIT = 1000 + DEVICE_MSGS_DELETE_SLEEP_MS = 1000 async def _delete_device_messages( self, @@ -400,19 +401,19 @@ async def _delete_device_messages( device_id = task.params["device_id"] up_to_stream_id = task.params["up_to_stream_id"] - res = await self.store.delete_messages_for_device( - user_id=user_id, - device_id=device_id, - up_to_stream_id=up_to_stream_id, - limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT, - ) + # Delete the messages in batches to avoid too much DB load. + while True: + res = await self.store.delete_messages_for_device( + user_id=user_id, + device_id=device_id, + up_to_stream_id=up_to_stream_id, + limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT, + ) - if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT: - return TaskStatus.COMPLETE, None, None - else: - # There is probably still device messages to be deleted, let's keep the task active and it will be run - # again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running). - return TaskStatus.ACTIVE, None, None + if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT: + return TaskStatus.COMPLETE, None, None + + await self.clock.sleep(DeviceHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0) class DeviceHandler(DeviceWorkerHandler):