From 4b4fa312a5e75e49a68681a0ba314ccedf36a359 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 21 Aug 2023 11:42:21 +0100 Subject: [PATCH 1/4] Fix perf of `wait_for_stream_positions` If we have lots of stuff waiting we ended up consuming lots of CPU. --- synapse/replication/tcp/client.py | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 139f57cf8668..53c5ffe51227 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -14,7 +14,17 @@ """A replication client for use by synapse workers. """ import logging -from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple +from typing import ( + TYPE_CHECKING, + Dict, + Iterable, + MutableSequence, + Optional, + Set, + Tuple, +) + +from sortedcontainers import SortedList from twisted.internet import defer from twisted.internet.defer import Deferred @@ -84,7 +94,9 @@ def __init__(self, hs: "HomeServer"): # Map from stream and instance to list of deferreds waiting for the stream to # arrive at a particular position. The lists are sorted by stream position. - self._streams_to_waiters: Dict[Tuple[str, str], List[Tuple[int, Deferred]]] = {} + self._streams_to_waiters: Dict[ + Tuple[str, str], SortedList[Tuple[int, Deferred]] + ] = {} async def on_rdata( self, stream_name: str, instance_name: str, token: int, rows: list @@ -226,7 +238,9 @@ async def on_rdata( # Notify any waiting deferreds. The list is ordered by position so we # just iterate through the list until we reach a position that is # greater than the received row position. - waiting_list = self._streams_to_waiters.get((stream_name, instance_name), []) + waiting_list: MutableSequence[ + Tuple[int, Deferred] + ] = self._streams_to_waiters.get((stream_name, instance_name), []) # Index of first item with a position after the current token, i.e we # have called all deferreds before this index. If not overwritten by @@ -250,7 +264,7 @@ async def on_rdata( # Drop all entries in the waiting list that were called in the above # loop. (This maintains the order so no need to resort) - waiting_list[:] = waiting_list[index_of_first_deferred_not_called:] + del waiting_list[:index_of_first_deferred_not_called] for deferred in deferreds_to_callback: try: @@ -310,11 +324,10 @@ async def wait_for_stream_position( ) waiting_list = self._streams_to_waiters.setdefault( - (stream_name, instance_name), [] + (stream_name, instance_name), SortedList(key=lambda t: t[0]) ) - waiting_list.append((position, deferred)) - waiting_list.sort(key=lambda t: t[0]) + waiting_list.add((position, deferred)) # We measure here to get in flight counts and average waiting time. with Measure(self._clock, "repl.wait_for_stream_position"): From da4bf03457ab79b06a36cea858055575f5fc35ac Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 21 Aug 2023 11:44:09 +0100 Subject: [PATCH 2/4] Newsfile --- changelog.d/16148.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/16148.bugfix diff --git a/changelog.d/16148.bugfix b/changelog.d/16148.bugfix new file mode 100644 index 000000000000..fea316f8562b --- /dev/null +++ b/changelog.d/16148.bugfix @@ -0,0 +1 @@ +Fix performance degredation when there are a lot of in-flight replication requests. From 687ef9225a073c4dda7ea989d6c7e2c34797aed6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 21 Aug 2023 11:55:04 +0100 Subject: [PATCH 3/4] Lint --- synapse/replication/tcp/client.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 53c5ffe51227..5960d88c2bef 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -14,15 +14,7 @@ """A replication client for use by synapse workers. """ import logging -from typing import ( - TYPE_CHECKING, - Dict, - Iterable, - MutableSequence, - Optional, - Set, - Tuple, -) +from typing import TYPE_CHECKING, Dict, Iterable, MutableSequence, Optional, Set, Tuple from sortedcontainers import SortedList From 7ae963fe7dd4381a042610feae244997c9180a98 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Aug 2023 14:50:58 +0100 Subject: [PATCH 4/4] Return early instead --- synapse/replication/tcp/client.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 5960d88c2bef..078c8d707486 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -14,7 +14,7 @@ """A replication client for use by synapse workers. """ import logging -from typing import TYPE_CHECKING, Dict, Iterable, MutableSequence, Optional, Set, Tuple +from typing import TYPE_CHECKING, Dict, Iterable, Optional, Set, Tuple from sortedcontainers import SortedList @@ -230,9 +230,9 @@ async def on_rdata( # Notify any waiting deferreds. The list is ordered by position so we # just iterate through the list until we reach a position that is # greater than the received row position. - waiting_list: MutableSequence[ - Tuple[int, Deferred] - ] = self._streams_to_waiters.get((stream_name, instance_name), []) + waiting_list = self._streams_to_waiters.get((stream_name, instance_name)) + if not waiting_list: + return # Index of first item with a position after the current token, i.e we # have called all deferreds before this index. If not overwritten by