From b9fa878899ce9f9ce704ed83a670cce85c9bb81e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 7 Jun 2021 14:52:29 +0100 Subject: [PATCH 1/6] Use attrs for `_EventPersistQueueItem` --- synapse/storage/persist_events.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 33dc752d8fd0..12e56d65ac88 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -16,9 +16,10 @@ import itertools import logging -from collections import deque, namedtuple +from collections import deque from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple +import attr from prometheus_client import Counter, Histogram from twisted.internet import defer @@ -89,15 +90,18 @@ ) +@attr.s(auto_attribs=True, frozen=True, slots=True) +class _EventPersistQueueItem: + events_and_contexts: List[Tuple[EventBase, EventContext]] + backfilled: bool + deferred: ObservableDeferred + + class _EventPeristenceQueue: """Queues up events so that they can be persisted in bulk with only one concurrent transaction per room. """ - _EventPersistQueueItem = namedtuple( - "_EventPersistQueueItem", ("events_and_contexts", "backfilled", "deferred") - ) - def __init__(self): self._event_persist_queues = {} self._currently_persisting_rooms = set() @@ -132,7 +136,7 @@ def add_to_queue(self, room_id, events_and_contexts, backfilled): deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True) queue.append( - self._EventPersistQueueItem( + _EventPersistQueueItem( events_and_contexts=events_and_contexts, backfilled=backfilled, deferred=deferred, From 386b7d799630a1b44f3ac7a7e276d406842d0e06 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 7 Jun 2021 15:45:55 +0100 Subject: [PATCH 2/6] Refactor _EventPeristenceQueue a bit have it start the queue handler itself, and simplify the callback interface --- synapse/storage/persist_events.py | 95 ++++++++++++++++++------------- 1 file changed, 55 insertions(+), 40 deletions(-) diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 12e56d65ac88..2a0ac7c8cd75 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -17,12 +17,26 @@ import itertools import logging from collections import deque -from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple +from typing import ( + Awaitable, + Callable, + Collection, + Deque, + Dict, + Generic, + Iterable, + List, + Optional, + Set, + Tuple, + TypeVar, +) import attr from prometheus_client import Counter, Histogram from twisted.internet import defer +from twisted.internet.defer import Deferred from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase @@ -97,18 +111,36 @@ class _EventPersistQueueItem: deferred: ObservableDeferred -class _EventPeristenceQueue: +_PersistResult = TypeVar("_PersistResult") + + +class _EventPeristenceQueue(Generic[_PersistResult]): """Queues up events so that they can be persisted in bulk with only one concurrent transaction per room. """ - def __init__(self): - self._event_persist_queues = {} - self._currently_persisting_rooms = set() + def __init__( + self, + per_item_callback: Callable[ + [List[Tuple[EventBase, EventContext]], bool], + Awaitable[_PersistResult], + ], + ): + """Create a new event persistence queue + + The per_item_callback will be called for each item added via add_to_queue, + and its result will be returned via the Deferreds returned from add_to_queue. + """ + self._event_persist_queues: Dict[str, Deque[_EventPersistQueueItem]] = {} + self._currently_persisting_rooms: Set[str] = set() + self._per_item_callback = per_item_callback - def add_to_queue(self, room_id, events_and_contexts, backfilled): + def add_to_queue(self, room_id, events_and_contexts, backfilled) -> Deferred: """Add events to the queue, with the given persist_event options. + If we are not already processing events in this room, starts off a background + process to to so, calling the per_item_callback for each item. + NB: due to the normal usage pattern of this method, it does *not* follow the synapse logcontext rules, and leaves the logcontext in place whether or not the returned deferred is ready. @@ -120,9 +152,9 @@ def add_to_queue(self, room_id, events_and_contexts, backfilled): Returns: defer.Deferred: a deferred which will resolve once the events are - persisted. Runs its callbacks *without* a logcontext. The result - is the same as that returned by the callback passed to - `handle_queue`. + persisted. Runs its callbacks in the sentinel logcontext. The result + is the same as that returned by the `_per_item_callback` passed to + `__init__`. """ queue = self._event_persist_queues.setdefault(room_id, deque()) if queue: @@ -143,14 +175,16 @@ def add_to_queue(self, room_id, events_and_contexts, backfilled): ) ) + self._handle_queue(room_id) + return deferred.observe() - def handle_queue(self, room_id, per_item_callback): + def _handle_queue(self, room_id): """Attempts to handle the queue for a room if not already being handled. - The given callback will be invoked with for each item in the queue, + The queue's callback will be invoked with for each item in the queue, of type _EventPersistQueueItem. The per_item_callback will continuously - be called with new items, unless the queue becomnes empty. The return + be called with new items, unless the queue becomes empty. The return value of the function will be given to the deferreds waiting on the item, exceptions will be passed to the deferreds as well. @@ -160,7 +194,6 @@ def handle_queue(self, room_id, per_item_callback): If another callback is currently handling the queue then it will not be invoked. """ - if room_id in self._currently_persisting_rooms: return @@ -171,7 +204,9 @@ async def handle_queue_loop(): queue = self._get_drainining_queue(room_id) for item in queue: try: - ret = await per_item_callback(item) + ret = await self._per_item_callback( + item.events_and_contexts, item.backfilled + ) except Exception: with PreserveLoggingContext(): item.deferred.errback() @@ -218,7 +253,7 @@ def __init__(self, hs, stores: Databases): self._clock = hs.get_clock() self._instance_name = hs.get_instance_name() self.is_mine_id = hs.is_mine_id - self._event_persist_queue = _EventPeristenceQueue() + self._event_persist_queue = _EventPeristenceQueue(self._persist_event_batch) self._state_resolution_handler = hs.get_state_resolution_handler() async def persist_events( @@ -252,9 +287,6 @@ async def persist_events( ) deferreds.append(d) - for room_id in partitioned: - self._maybe_start_persisting(room_id) - # Each deferred returns a map from event ID to existing event ID if the # event was deduplicated. (The dict may also include other entries if # the event was persisted in a batch with other events). @@ -264,7 +296,7 @@ async def persist_events( ret_vals = await make_deferred_yieldable( defer.gatherResults(deferreds, consumeErrors=True) ) - replaced_events = {} + replaced_events: Dict[str, str] = {} for d in ret_vals: replaced_events.update(d) @@ -295,8 +327,6 @@ async def persist_event( event.room_id, [(event, context)], backfilled=backfilled ) - self._maybe_start_persisting(event.room_id) - # The deferred returns a map from event ID to existing event ID if the # event was deduplicated. (The dict may also include other entries if # the event was persisted in a batch with other events.) @@ -312,29 +342,14 @@ async def persist_event( pos = PersistedEventPosition(self._instance_name, event_stream_id) return event, pos, self.main_store.get_room_max_token() - def _maybe_start_persisting(self, room_id: str): - """Pokes the `_event_persist_queue` to start handling new items in the - queue, if not already in progress. - - Causes the deferreds returned by `add_to_queue` to resolve with: a - dictionary of event ID to event ID we didn't persist as we already had - another event persisted with the same TXN ID. - """ - - async def persisting_queue(item): - with Measure(self._clock, "persist_events"): - return await self._persist_events( - item.events_and_contexts, backfilled=item.backfilled - ) - - self._event_persist_queue.handle_queue(room_id, persisting_queue) - - async def _persist_events( + async def _persist_event_batch( self, events_and_contexts: List[Tuple[EventBase, EventContext]], backfilled: bool = False, ) -> Dict[str, str]: - """Calculates the change to current state and forward extremities, and + """Callback for the _event_persist_queue + + Calculates the change to current state and forward extremities, and persists the given events and with those updates. Returns: From 19aca5c6ef92b4bea58831e459ea33aebdf5927d Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 8 Jun 2021 18:24:08 +0100 Subject: [PATCH 3/6] simplify call paths in add_to_queue --- synapse/storage/persist_events.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 2a0ac7c8cd75..c751edac70e7 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -157,27 +157,25 @@ def add_to_queue(self, room_id, events_and_contexts, backfilled) -> Deferred: `__init__`. """ queue = self._event_persist_queues.setdefault(room_id, deque()) - if queue: - # if the last item in the queue has the same `backfilled` setting, - # we can just add these new events to that item. - end_item = queue[-1] - if end_item.backfilled == backfilled: - end_item.events_and_contexts.extend(events_and_contexts) - return end_item.deferred.observe() - deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True) + # if the last item in the queue has the same `backfilled` setting, + # we can just add these new events to that item. + if queue and queue[-1].backfilled == backfilled: + end_item = queue[-1] + else: + # need to make a new queue item + deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True) - queue.append( - _EventPersistQueueItem( - events_and_contexts=events_and_contexts, + end_item = _EventPersistQueueItem( + events_and_contexts=[], backfilled=backfilled, deferred=deferred, ) - ) + queue.append(end_item) + end_item.events_and_contexts.extend(events_and_contexts) self._handle_queue(room_id) - - return deferred.observe() + return end_item.deferred.observe() def _handle_queue(self, room_id): """Attempts to handle the queue for a room if not already being handled. From 4ddffb49e752daac50c817d6f36d23c86228cc91 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 8 Jun 2021 18:26:25 +0100 Subject: [PATCH 4/6] make `_EventPeristenceQueue` a regular async function --- synapse/storage/persist_events.py | 48 ++++++++++++++----------------- 1 file changed, 21 insertions(+), 27 deletions(-) diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index c751edac70e7..1e71662ce5d7 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -36,7 +36,6 @@ from prometheus_client import Counter, Histogram from twisted.internet import defer -from twisted.internet.defer import Deferred from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase @@ -52,7 +51,7 @@ StateMap, get_domain_from_id, ) -from synapse.util.async_helpers import ObservableDeferred +from synapse.util.async_helpers import ObservableDeferred, yieldable_gather_results from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -135,25 +134,24 @@ def __init__( self._currently_persisting_rooms: Set[str] = set() self._per_item_callback = per_item_callback - def add_to_queue(self, room_id, events_and_contexts, backfilled) -> Deferred: + async def add_to_queue( + self, + room_id: str, + events_and_contexts: Iterable[Tuple[EventBase, EventContext]], + backfilled: bool, + ) -> _PersistResult: """Add events to the queue, with the given persist_event options. If we are not already processing events in this room, starts off a background process to to so, calling the per_item_callback for each item. - NB: due to the normal usage pattern of this method, it does *not* - follow the synapse logcontext rules, and leaves the logcontext in - place whether or not the returned deferred is ready. - Args: room_id (str): events_and_contexts (list[(EventBase, EventContext)]): backfilled (bool): Returns: - defer.Deferred: a deferred which will resolve once the events are - persisted. Runs its callbacks in the sentinel logcontext. The result - is the same as that returned by the `_per_item_callback` passed to + the result returned by the `_per_item_callback` passed to `__init__`. """ queue = self._event_persist_queues.setdefault(room_id, deque()) @@ -175,7 +173,7 @@ def add_to_queue(self, room_id, events_and_contexts, backfilled) -> Deferred: end_item.events_and_contexts.extend(events_and_contexts) self._handle_queue(room_id) - return end_item.deferred.observe() + return await make_deferred_yieldable(end_item.deferred.observe()) def _handle_queue(self, room_id): """Attempts to handle the queue for a room if not already being handled. @@ -278,22 +276,20 @@ async def persist_events( for event, ctx in events_and_contexts: partitioned.setdefault(event.room_id, []).append((event, ctx)) - deferreds = [] - for room_id, evs_ctxs in partitioned.items(): - d = self._event_persist_queue.add_to_queue( + async def enqueue(item): + room_id, evs_ctxs = item + return await self._event_persist_queue.add_to_queue( room_id, evs_ctxs, backfilled=backfilled ) - deferreds.append(d) - # Each deferred returns a map from event ID to existing event ID if the - # event was deduplicated. (The dict may also include other entries if + ret_vals = await yieldable_gather_results(enqueue, partitioned.items()) + + # Each call to add_to_queue returns a map from event ID to existing event ID if + # the event was deduplicated. (The dict may also include other entries if # the event was persisted in a batch with other events). # - # Since we use `defer.gatherResults` we need to merge the returned list + # Since we use `concurrently_execute` we need to merge the returned list # of dicts into one. - ret_vals = await make_deferred_yieldable( - defer.gatherResults(deferreds, consumeErrors=True) - ) replaced_events: Dict[str, str] = {} for d in ret_vals: replaced_events.update(d) @@ -321,14 +317,12 @@ async def persist_event( event if it was deduplicated due to an existing event matching the transaction ID. """ - deferred = self._event_persist_queue.add_to_queue( - event.room_id, [(event, context)], backfilled=backfilled - ) - - # The deferred returns a map from event ID to existing event ID if the + # add_to_queue returns a map from event ID to existing event ID if the # event was deduplicated. (The dict may also include other entries if # the event was persisted in a batch with other events.) - replaced_events = await make_deferred_yieldable(deferred) + replaced_events = await self._event_persist_queue.add_to_queue( + event.room_id, [(event, context)], backfilled=backfilled + ) replaced_event = replaced_events.get(event.event_id) if replaced_event: event = await self.main_store.get_event(replaced_event) From c650d4b454160e9115a19251772c10c06e40f833 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 8 Jun 2021 18:28:49 +0100 Subject: [PATCH 5/6] changelog --- changelog.d/10145.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/10145.misc diff --git a/changelog.d/10145.misc b/changelog.d/10145.misc new file mode 100644 index 000000000000..2f0c643b08b6 --- /dev/null +++ b/changelog.d/10145.misc @@ -0,0 +1 @@ +Refactor EventPersistenceQueue. From 4eb644e5b9246d7293fd9b0e76f2fb99b2afa26f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 14 Jun 2021 10:24:40 +0100 Subject: [PATCH 6/6] review suggestion --- synapse/storage/persist_events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 1e71662ce5d7..c11f6c5845b2 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -288,7 +288,7 @@ async def enqueue(item): # the event was deduplicated. (The dict may also include other entries if # the event was persisted in a batch with other events). # - # Since we use `concurrently_execute` we need to merge the returned list + # Since we use `yieldable_gather_results` we need to merge the returned list # of dicts into one. replaced_events: Dict[str, str] = {} for d in ret_vals: