From 6350bf67450f749e600e60ab243eaf9cec917db8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Nov 2023 12:34:57 +0000 Subject: [PATCH 1/2] Speed up persisting large number of outliers --- synapse/handlers/federation_event.py | 18 +++---- synapse/util/iterutils.py | 51 +++++++++++++++++++ tests/util/test_itertools.py | 76 +++++++++++++++++++++++++++- 3 files changed, 133 insertions(+), 12 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index ba6b94a8b756..f4c17894aa7a 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -88,7 +88,7 @@ ) from synapse.types.state import StateFilter from synapse.util.async_helpers import Linearizer, concurrently_execute -from synapse.util.iterutils import batch_iter, partition +from synapse.util.iterutils import batch_iter, partition, sorted_topologically_batched from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import shortstr @@ -1669,14 +1669,13 @@ async def _auth_and_persist_outliers( # XXX: it might be possible to kick this process off in parallel with fetching # the events. - while event_map: - # build a list of events whose auth events are not in the queue. - roots = tuple( - ev - for ev in event_map.values() - if not any(aid in event_map for aid in ev.auth_event_ids()) - ) + # We need to persist an event's auth events before the event. + auth_graph = { + ev: [event_map[e_id] for e_id in ev.auth_event_ids() if e_id in event_map] + for ev in event_map.values() + } + for roots in sorted_topologically_batched(event_map.values(), auth_graph): if not roots: # if *none* of the remaining events are ready, that means # we have a loop. This either means a bug in our logic, or that @@ -1698,9 +1697,6 @@ async def _auth_and_persist_outliers( await self._auth_and_persist_outliers_inner(room_id, roots) - for ev in roots: - del event_map[ev.event_id] - async def _auth_and_persist_outliers_inner( self, room_id: str, fetched_events: Collection[EventBase] ) -> None: diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py index a0efb96d3b46..f4c0194af02f 100644 --- a/synapse/util/iterutils.py +++ b/synapse/util/iterutils.py @@ -135,3 +135,54 @@ def sorted_topologically( degree_map[edge] -= 1 if degree_map[edge] == 0: heapq.heappush(zero_degree, edge) + + +def sorted_topologically_batched( + nodes: Iterable[T], + graph: Mapping[T, Collection[T]], +) -> Generator[Collection[T], None, None]: + r"""Walk the graph topologically, returning batches of nodes where all nodes + that references it have been previously returned. + + For example, given the following graph: + + A + / \ + B C + \ / + D + + This function will return: `[[A], [B, C], [D]]`. + + This function is useful for e.g. batch persisting events in an auth chain, + where we can only persist an event if all its auth events have already been + persisted. + """ + + degree_map = {node: 0 for node in nodes} + reverse_graph: Dict[T, Set[T]] = {} + + for node, edges in graph.items(): + if node not in degree_map: + continue + + for edge in set(edges): + if edge in degree_map: + degree_map[node] += 1 + + reverse_graph.setdefault(edge, set()).add(node) + reverse_graph.setdefault(node, set()) + + zero_degree = [node for node, degree in degree_map.items() if degree == 0] + + while zero_degree: + new_zero_degree = [] + for node in zero_degree: + for edge in reverse_graph.get(node, []): + if edge in degree_map: + degree_map[edge] -= 1 + if degree_map[edge] == 0: + new_zero_degree.append(edge) + + yield zero_degree + zero_degree = new_zero_degree diff --git a/tests/util/test_itertools.py b/tests/util/test_itertools.py index 406c16cdcf30..fabb05c7e415 100644 --- a/tests/util/test_itertools.py +++ b/tests/util/test_itertools.py @@ -13,7 +13,11 @@ # limitations under the License. from typing import Dict, Iterable, List, Sequence -from synapse.util.iterutils import chunk_seq, sorted_topologically +from synapse.util.iterutils import ( + chunk_seq, + sorted_topologically, + sorted_topologically_batched, +) from tests.unittest import TestCase @@ -107,3 +111,73 @@ def test_multiple_paths(self) -> None: graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3, 2, 1]} self.assertEqual(list(sorted_topologically([4, 3, 2, 1], graph)), [1, 2, 3, 4]) + + +class SortTopologicallyBatched(TestCase): + "Test cases for `sorted_topologically_batched`" + + def test_empty(self) -> None: + "Test that an empty graph works correctly" + + graph: Dict[int, List[int]] = {} + self.assertEqual(list(sorted_topologically_batched([], graph)), []) + + def test_handle_empty_graph(self) -> None: + "Test that a graph where a node doesn't have an entry is treated as empty" + + graph: Dict[int, List[int]] = {} + + # For disconnected nodes the output is simply sorted. + self.assertEqual(list(sorted_topologically_batched([1, 2], graph)), [[1, 2]]) + + def test_disconnected(self) -> None: + "Test that a graph with no edges work" + + graph: Dict[int, List[int]] = {1: [], 2: []} + + # For disconnected nodes the output is simply sorted. + self.assertEqual(list(sorted_topologically_batched([1, 2], graph)), [[1, 2]]) + + def test_linear(self) -> None: + "Test that a simple `4 -> 3 -> 2 -> 1` graph works" + + graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3]} + + self.assertEqual( + list(sorted_topologically_batched([4, 3, 2, 1], graph)), + [[1], [2], [3], [4]], + ) + + def test_subset(self) -> None: + "Test that only sorting a subset of the graph works" + graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3]} + + self.assertEqual(list(sorted_topologically_batched([4, 3], graph)), [[3], [4]]) + + def test_fork(self) -> None: + "Test that a forked graph works" + graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [1], 4: [2, 3]} + + # Valid orderings are `[1, 3, 2, 4]` or `[1, 2, 3, 4]`, but we should + # always get the same one. + self.assertEqual( + list(sorted_topologically_batched([4, 3, 2, 1], graph)), [[1], [2, 3], [4]] + ) + + def test_duplicates(self) -> None: + "Test that a graph with duplicate edges work" + graph: Dict[int, List[int]] = {1: [], 2: [1, 1], 3: [2, 2], 4: [3]} + + self.assertEqual( + list(sorted_topologically_batched([4, 3, 2, 1], graph)), + [[1], [2], [3], [4]], + ) + + def test_multiple_paths(self) -> None: + "Test that a graph with multiple paths between two nodes work" + graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3, 2, 1]} + + self.assertEqual( + list(sorted_topologically_batched([4, 3, 2, 1], graph)), + [[1], [2], [3], [4]], + ) From 2441b378172df649a48da0c82b6e1244cda52242 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Nov 2023 12:46:16 +0000 Subject: [PATCH 2/2] Newsfile --- changelog.d/16649.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/16649.misc diff --git a/changelog.d/16649.misc b/changelog.d/16649.misc new file mode 100644 index 000000000000..cebd6aaee5c9 --- /dev/null +++ b/changelog.d/16649.misc @@ -0,0 +1 @@ +Speed up persisting large number of outliers.