Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix perf of wait_for_stream_positions #16148

Merged
merged 5 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16148.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix performance degredation when there are a lot of in-flight replication requests.
19 changes: 12 additions & 7 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
"""A replication client for use by synapse workers.
"""
import logging
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple
from typing import TYPE_CHECKING, Dict, Iterable, MutableSequence, Optional, Set, Tuple

from sortedcontainers import SortedList

from twisted.internet import defer
from twisted.internet.defer import Deferred
Expand Down Expand Up @@ -84,7 +86,9 @@ def __init__(self, hs: "HomeServer"):

# Map from stream and instance to list of deferreds waiting for the stream to
# arrive at a particular position. The lists are sorted by stream position.
self._streams_to_waiters: Dict[Tuple[str, str], List[Tuple[int, Deferred]]] = {}
self._streams_to_waiters: Dict[
Tuple[str, str], SortedList[Tuple[int, Deferred]]
] = {}

async def on_rdata(
self, stream_name: str, instance_name: str, token: int, rows: list
Expand Down Expand Up @@ -226,7 +230,9 @@ async def on_rdata(
# Notify any waiting deferreds. The list is ordered by position so we
# just iterate through the list until we reach a position that is
# greater than the received row position.
waiting_list = self._streams_to_waiters.get((stream_name, instance_name), [])
waiting_list: MutableSequence[
Tuple[int, Deferred]
] = self._streams_to_waiters.get((stream_name, instance_name), [])
clokep marked this conversation as resolved.
Show resolved Hide resolved

# Index of first item with a position after the current token, i.e we
# have called all deferreds before this index. If not overwritten by
Expand All @@ -250,7 +256,7 @@ async def on_rdata(

# Drop all entries in the waiting list that were called in the above
# loop. (This maintains the order so no need to resort)
waiting_list[:] = waiting_list[index_of_first_deferred_not_called:]
del waiting_list[:index_of_first_deferred_not_called]

for deferred in deferreds_to_callback:
try:
Expand Down Expand Up @@ -310,11 +316,10 @@ async def wait_for_stream_position(
)

waiting_list = self._streams_to_waiters.setdefault(
(stream_name, instance_name), []
(stream_name, instance_name), SortedList(key=lambda t: t[0])
)

waiting_list.append((position, deferred))
waiting_list.sort(key=lambda t: t[0])
waiting_list.add((position, deferred))

# We measure here to get in flight counts and average waiting time.
with Measure(self._clock, "repl.wait_for_stream_position"):
Expand Down