Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix perf of wait_for_stream_positions #16148

Merged
merged 5 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16148.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix performance degredation when there are a lot of in-flight replication requests.
19 changes: 12 additions & 7 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
"""A replication client for use by synapse workers.
"""
import logging
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple
from typing import TYPE_CHECKING, Dict, Iterable, Optional, Set, Tuple

from sortedcontainers import SortedList

from twisted.internet import defer
from twisted.internet.defer import Deferred
Expand Down Expand Up @@ -84,7 +86,9 @@ def __init__(self, hs: "HomeServer"):

# Map from stream and instance to list of deferreds waiting for the stream to
# arrive at a particular position. The lists are sorted by stream position.
self._streams_to_waiters: Dict[Tuple[str, str], List[Tuple[int, Deferred]]] = {}
self._streams_to_waiters: Dict[
Tuple[str, str], SortedList[Tuple[int, Deferred]]
] = {}

async def on_rdata(
self, stream_name: str, instance_name: str, token: int, rows: list
Expand Down Expand Up @@ -226,7 +230,9 @@ async def on_rdata(
# Notify any waiting deferreds. The list is ordered by position so we
# just iterate through the list until we reach a position that is
# greater than the received row position.
waiting_list = self._streams_to_waiters.get((stream_name, instance_name), [])
waiting_list = self._streams_to_waiters.get((stream_name, instance_name))
if not waiting_list:
return

# Index of first item with a position after the current token, i.e we
# have called all deferreds before this index. If not overwritten by
Expand All @@ -250,7 +256,7 @@ async def on_rdata(

# Drop all entries in the waiting list that were called in the above
# loop. (This maintains the order so no need to resort)
waiting_list[:] = waiting_list[index_of_first_deferred_not_called:]
del waiting_list[:index_of_first_deferred_not_called]

for deferred in deferreds_to_callback:
try:
Expand Down Expand Up @@ -310,11 +316,10 @@ async def wait_for_stream_position(
)

waiting_list = self._streams_to_waiters.setdefault(
(stream_name, instance_name), []
(stream_name, instance_name), SortedList(key=lambda t: t[0])
)

waiting_list.append((position, deferred))
waiting_list.sort(key=lambda t: t[0])
waiting_list.add((position, deferred))

# We measure here to get in flight counts and average waiting time.
with Measure(self._clock, "repl.wait_for_stream_position"):
Expand Down