From bf93d9f2d9e2d0d07e0a10678a1120d575ac9026 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Nov 2023 10:35:51 +0000 Subject: [PATCH 1/7] Speed up deleting device messages --- synapse/handlers/device.py | 8 ++- synapse/storage/databases/main/deviceinbox.py | 66 +++++++++++++++++++ synapse/util/task_scheduler.py | 2 +- 3 files changed, 72 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 93472d011758..4e513afc361e 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -396,15 +396,17 @@ async def _delete_device_messages( up_to_stream_id = task.params["up_to_stream_id"] # Delete the messages in batches to avoid too much DB load. + from_stream_id = None while True: - res = await self.store.delete_messages_for_device( + from_stream_id = await self.store.delete_messages_for_device_between( user_id=user_id, device_id=device_id, - up_to_stream_id=up_to_stream_id, + from_stream_id=from_stream_id, + to_stream_id=up_to_stream_id, limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT, ) - if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT: + if from_stream_id is None: return TaskStatus.COMPLETE, None, None await self.clock.sleep(DeviceHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 3e7425d4a654..c25e8c1353c9 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -514,6 +514,72 @@ def delete_messages_for_device_txn(txn: LoggingTransaction) -> int: return count + @trace + async def delete_messages_for_device_between( + self, + user_id: str, + device_id: Optional[str], + from_stream_id: Optional[int], + to_stream_id: int, + limit: int, + ) -> Optional[int]: + """Delete N device messages between the stream IDs, returning the + highest stream ID deleted. + + This is more efficient than `delete_messages_for_device` when calling in + a loop to batch delete messages. + """ + + # Keeping track of a lower bound of stream ID where we've deleted + # everything below makes the queries much faster. Otherwise, every time + # we scan for rows to delete we'd re-scan across all the rows that have + # previously deleted (until the next table VACUUM). + + if from_stream_id is None: + # Minimum device stream ID is 1. + from_stream_id = 0 + + def delete_messages_for_device_between_txn( + txn: LoggingTransaction, + ) -> Optional[int]: + txn.execute( + """ + SELECT MAX(stream_id) FROM ( + SELECT stream_id FROM device_inbox + WHERE user_id = ? AND device_id = ? + AND ? < stream_id AND stream_id <= ? + ORDER BY stream_id + LIMIT ? + ) AS d + """, + (user_id, device_id, from_stream_id, to_stream_id, limit), + ) + row = txn.fetchone() + if row is None or row[0] is None: + return None + + (max_stream_id,) = row + + txn.execute( + """ + DELETE FROM device_inbox + WHERE user_id = ? AND device_id = ? + AND ? < stream_id AND stream_id <= ? + """, + (user_id, device_id, from_stream_id, max_stream_id), + ) + + if txn.rowcount < limit: + return None + + return max_stream_id + + return await self.db_pool.runInteraction( + "delete_messages_for_device_between", + delete_messages_for_device_between_txn, + db_autocommit=True, # We don't need to run in a transaction + ) + @trace async def get_new_device_msgs_for_remote( self, destination: str, last_stream_id: int, current_stream_id: int, limit: int diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index caf13b3474be..29c561e55528 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -193,7 +193,7 @@ async def update_task( result: Optional[JsonMapping] = None, error: Optional[str] = None, ) -> bool: - """Update some task associated values. This is exposed publically so it can + """Update some task associated values. This is exposed publicly so it can be used inside task functions, mainly to update the result and be able to resume a task at a specific step after a restart of synapse. From 1293012353dcfb423e8c61081497d19ce2ff85f0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Nov 2023 10:37:00 +0000 Subject: [PATCH 2/7] Newsfile --- changelog.d/16643.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/16643.misc diff --git a/changelog.d/16643.misc b/changelog.d/16643.misc new file mode 100644 index 000000000000..cc0cf0901f2d --- /dev/null +++ b/changelog.d/16643.misc @@ -0,0 +1 @@ +Speed up deleting of device messages when deleting a device. From 4d975c0051ec30d8971d490d4f96b6e683b0a543 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Nov 2023 12:32:31 +0000 Subject: [PATCH 3/7] Update synapse/storage/databases/main/deviceinbox.py Co-authored-by: reivilibre --- synapse/storage/databases/main/deviceinbox.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index c25e8c1353c9..5546d50789ef 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -524,7 +524,7 @@ async def delete_messages_for_device_between( limit: int, ) -> Optional[int]: """Delete N device messages between the stream IDs, returning the - highest stream ID deleted. + highest stream ID deleted, or None if nothing was deletable. This is more efficient than `delete_messages_for_device` when calling in a loop to batch delete messages. From c94b4a0e0965fb4c462a8dbf34b88504081a2e0a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Nov 2023 12:53:23 +0000 Subject: [PATCH 4/7] Review comments --- synapse/handlers/device.py | 2 +- synapse/storage/databases/main/deviceinbox.py | 47 +++++++++---------- 2 files changed, 23 insertions(+), 26 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 4e513afc361e..1af6d7754575 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -398,7 +398,7 @@ async def _delete_device_messages( # Delete the messages in batches to avoid too much DB load. from_stream_id = None while True: - from_stream_id = await self.store.delete_messages_for_device_between( + from_stream_id, _ = await self.store.delete_messages_for_device_between( user_id=user_id, device_id=device_id, from_stream_id=from_stream_id, diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 5546d50789ef..7beca87c5f89 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -478,24 +478,19 @@ async def delete_messages_for_device( log_kv({"message": "No changes in cache since last check"}) return 0 - def delete_messages_for_device_txn(txn: LoggingTransaction) -> int: - limit_statement = "" if limit is None else f"LIMIT {limit}" - sql = f""" - DELETE FROM device_inbox WHERE user_id = ? AND device_id = ? AND stream_id <= ( - SELECT MAX(stream_id) FROM ( - SELECT stream_id FROM device_inbox - WHERE user_id = ? AND device_id = ? AND stream_id <= ? - ORDER BY stream_id - {limit_statement} - ) AS q1 - ) - """ - txn.execute(sql, (user_id, device_id, user_id, device_id, up_to_stream_id)) - return txn.rowcount - - count = await self.db_pool.runInteraction( - "delete_messages_for_device", delete_messages_for_device_txn - ) + from_stream_id = None + count = 0 + while True: + from_stream_id, loop_count = await self.delete_messages_for_device_between( + user_id, + device_id, + from_stream_id=None, + to_stream_id=up_to_stream_id, + limit=1000, + ) + count += loop_count + if from_stream_id is None: + break log_kv({"message": f"deleted {count} messages for device", "count": count}) @@ -522,9 +517,10 @@ async def delete_messages_for_device_between( from_stream_id: Optional[int], to_stream_id: int, limit: int, - ) -> Optional[int]: + ) -> Tuple[Optional[int], int]: """Delete N device messages between the stream IDs, returning the - highest stream ID deleted, or None if nothing was deletable. + highest stream ID deleted (or None if all messages in the range have + been deleted) and the number of messages deleted. This is more efficient than `delete_messages_for_device` when calling in a loop to batch delete messages. @@ -541,7 +537,7 @@ async def delete_messages_for_device_between( def delete_messages_for_device_between_txn( txn: LoggingTransaction, - ) -> Optional[int]: + ) -> Tuple[Optional[int], int]: txn.execute( """ SELECT MAX(stream_id) FROM ( @@ -556,7 +552,7 @@ def delete_messages_for_device_between_txn( ) row = txn.fetchone() if row is None or row[0] is None: - return None + return None, 0 (max_stream_id,) = row @@ -569,10 +565,11 @@ def delete_messages_for_device_between_txn( (user_id, device_id, from_stream_id, max_stream_id), ) - if txn.rowcount < limit: - return None + num_deleted = txn.rowcount + if num_deleted < limit: + return None, num_deleted - return max_stream_id + return max_stream_id, num_deleted return await self.db_pool.runInteraction( "delete_messages_for_device_between", From 90cc7eb9eaa1ec8801fc66acb921e51f0e2b248b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Nov 2023 14:26:28 +0000 Subject: [PATCH 5/7] Apply suggestions from code review Co-authored-by: Patrick Cloke --- synapse/storage/databases/main/deviceinbox.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 7beca87c5f89..6809c77feb70 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -486,7 +486,7 @@ async def delete_messages_for_device( device_id, from_stream_id=None, to_stream_id=up_to_stream_id, - limit=1000, + limit=limit, ) count += loop_count if from_stream_id is None: From ab978022c1f8b77ed62e1ae707aefca7c7cc76c7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Nov 2023 14:27:14 +0000 Subject: [PATCH 6/7] Pass in 'from_stream_id' --- synapse/storage/databases/main/deviceinbox.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 6809c77feb70..ed45edb73f80 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -484,7 +484,7 @@ async def delete_messages_for_device( from_stream_id, loop_count = await self.delete_messages_for_device_between( user_id, device_id, - from_stream_id=None, + from_stream_id=from_stream_id, to_stream_id=up_to_stream_id, limit=limit, ) From ae48832fe721b0faedb37e400fff1ad3e77ee3a3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Nov 2023 14:33:13 +0000 Subject: [PATCH 7/7] Remove limit --- synapse/storage/databases/main/deviceinbox.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index ed45edb73f80..02dddd1da418 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -450,14 +450,12 @@ async def delete_messages_for_device( user_id: str, device_id: Optional[str], up_to_stream_id: int, - limit: Optional[int] = None, ) -> int: """ Args: user_id: The recipient user_id. device_id: The recipient device_id. up_to_stream_id: Where to delete messages up to. - limit: maximum number of messages to delete Returns: The number of messages deleted. @@ -486,7 +484,7 @@ async def delete_messages_for_device( device_id, from_stream_id=from_stream_id, to_stream_id=up_to_stream_id, - limit=limit, + limit=1000, ) count += loop_count if from_stream_id is None: @@ -494,11 +492,6 @@ async def delete_messages_for_device( log_kv({"message": f"deleted {count} messages for device", "count": count}) - # In this case we don't know if we hit the limit or the delete is complete - # so let's not update the cache. - if count == limit: - return count - # Update the cache, ensuring that we only ever increase the value updated_last_deleted_stream_id = self._last_device_delete_cache.get( (user_id, device_id), 0