From 447aed42d22d3ece245c69f397d348b3a5b7bfa8 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Sat, 27 Jan 2018 15:40:41 +0000 Subject: [PATCH 1/5] Add event_map param to resolve_state_groups --- synapse/state.py | 34 ++++++++++++++++++++++++++++++---- synapse/storage/events.py | 1 + 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/synapse/state.py b/synapse/state.py index 273f9911cacf..6c2aaa5e7aee 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -308,7 +308,7 @@ def resolve_state_groups_for_events(self, room_id, event_ids): )) result = yield self._state_resolution_handler.resolve_state_groups( - room_id, state_groups_ids, self._state_map_factory, + room_id, state_groups_ids, None, self._state_map_factory, ) defer.returnValue(result) @@ -371,7 +371,9 @@ def start_caching(self): @defer.inlineCallbacks @log_function - def resolve_state_groups(self, room_id, state_groups_ids, state_map_factory): + def resolve_state_groups( + self, room_id, state_groups_ids, event_map, state_map_factory, + ): """Resolves conflicts between a set of state groups Always generates a new state group (unless we hit the cache), so should @@ -383,6 +385,14 @@ def resolve_state_groups(self, room_id, state_groups_ids, state_map_factory): map from state group id to the state in that state group (where 'state' is a map from state key to event id) + event_map(dict[str,FrozenEvent]|None): + a dict from event_id to event, for any events that we happen to + have in flight (eg, those currently being persisted). This will be + used as a starting point fof finding the state we need; any missing + events will be requested via state_map_factory. + + If None, all events will be fetched via state_map_factory. + Returns: Deferred[_StateCacheEntry]: resolved state """ @@ -423,6 +433,7 @@ def resolve_state_groups(self, room_id, state_groups_ids, state_map_factory): with Measure(self.clock, "state._resolve_events"): new_state = yield resolve_events_with_factory( state_groups_ids.values(), + event_map=event_map, state_map_factory=state_map_factory, ) else: @@ -555,11 +566,20 @@ def _seperate(state_sets): @defer.inlineCallbacks -def resolve_events_with_factory(state_sets, state_map_factory): +def resolve_events_with_factory(state_sets, event_map, state_map_factory): """ Args: state_sets(list): List of dicts of (type, state_key) -> event_id, which are the different state groups to resolve. + + event_map(dict[str,FrozenEvent]|None): + a dict from event_id to event, for any events that we happen to + have in flight (eg, those currently being persisted). This will be + used as a starting point fof finding the state we need; any missing + events will be requested via state_map_factory. + + If None, all events will be fetched via state_map_factory. + state_map_factory(func): will be called with a list of event_ids that are needed, and should return with a Deferred of dict of event_id to event. @@ -580,12 +600,16 @@ def resolve_events_with_factory(state_sets, state_map_factory): for event_ids in conflicted_state.itervalues() for event_id in event_ids ) + if event_map is not None: + needed_events -= set(event_map.iterkeys()) logger.info("Asking for %d conflicted events", len(needed_events)) # dict[str, FrozenEvent]: a map from state event id to event. Only includes - # the state events which are in conflict. + # the state events which are in conflict (and those in event_map) state_map = yield state_map_factory(needed_events) + if event_map is not None: + state_map.update(event_map) # get the ids of the auth events which allow us to authenticate the # conflicted state, picking only from the unconflicting state. @@ -597,6 +621,8 @@ def resolve_events_with_factory(state_sets, state_map_factory): new_needed_events = set(auth_events.itervalues()) new_needed_events -= needed_events + if event_map is not None: + new_needed_events -= set(event_map.iterkeys()) logger.info("Asking for %d auth events", len(new_needed_events)) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2fead9eb0f9a..7b912ad413e3 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -586,6 +586,7 @@ def get_events(ev_ids): current_state = yield resolve_events_with_factory( state_sets, + event_map={}, state_map_factory=get_events, ) defer.returnValue(current_state) From 9fcbbe8e7d7557fef7fe03533166b376d6fa82ef Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Sat, 27 Jan 2018 09:49:15 +0000 Subject: [PATCH 2/5] Check that events being persisted have state_group --- synapse/storage/events.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 7b912ad413e3..9bceded7ba85 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -515,16 +515,21 @@ def _get_new_state_after_events(self, events_context, new_latest_event_ids): if ctx.current_state_ids is None: raise Exception("Unknown current state") + if ctx.state_group is None: + # I don't think this can happen, but let's double-check + raise Exception( + "Context for new extremity event %s has no state " + "group" % event_id, + ) + # If we've already seen the state group don't bother adding # it to the state sets again if ctx.state_group not in state_groups: state_sets.append(ctx.current_state_ids) if ctx.delta_ids or hasattr(ev, "state_key"): was_updated = True - if ctx.state_group: - # Add this as a seen state group (if it has a state - # group) - state_groups.add(ctx.state_group) + # Add this as a seen state group + state_groups.add(ctx.state_group) break else: # If we couldn't find it, then we'll need to pull From 225dc3b4cb8875fff52180d2f3b1e386dec26f4d Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 30 Jan 2018 10:17:55 +0000 Subject: [PATCH 3/5] Flatten _get_new_state_after_events rejig the if statements to simplify the logic and reduce indentation --- synapse/storage/events.py | 90 ++++++++++++++++++++------------------- 1 file changed, 46 insertions(+), 44 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 9bceded7ba85..1b5dffe1c7bf 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -503,6 +503,10 @@ def _get_new_state_after_events(self, events_context, new_latest_event_ids): None if there are no changes to the room state, or a dict of (type, state_key) -> event_id]. """ + + if not new_latest_event_ids: + defer.returnValue({}) + state_sets = [] state_groups = set() missing_event_ids = [] @@ -537,6 +541,9 @@ def _get_new_state_after_events(self, events_context, new_latest_event_ids): was_updated = True missing_event_ids.append(event_id) + if not was_updated: + return + if missing_event_ids: # Now pull out the state for any missing events from DB event_to_groups = yield self._get_state_group_for_events( @@ -549,54 +556,49 @@ def _get_new_state_after_events(self, events_context, new_latest_event_ids): group_to_state = yield self._get_state_for_groups(groups) state_sets.extend(group_to_state.itervalues()) - if not new_latest_event_ids: - defer.returnValue({}) - elif was_updated: - if len(state_sets) == 1: - # If there is only one state set, then we know what the current - # state is. - defer.returnValue(state_sets[0]) - else: - # We work out the current state by passing the state sets to the - # state resolution algorithm. It may ask for some events, including - # the events we have yet to persist, so we need a slightly more - # complicated event lookup function than simply looking the events - # up in the db. - - logger.info( - "Resolving state with %i state sets", len(state_sets), - ) + if len(state_sets) == 1: + # If there is only one state set, then we know what the current + # state is. + defer.returnValue(state_sets[0]) - events_map = {ev.event_id: ev for ev, _ in events_context} - - @defer.inlineCallbacks - def get_events(ev_ids): - # We get the events by first looking at the list of events we - # are trying to persist, and then fetching the rest from the DB. - db = [] - to_return = {} - for ev_id in ev_ids: - ev = events_map.get(ev_id, None) - if ev: - to_return[ev_id] = ev - else: - db.append(ev_id) + # We work out the current state by passing the state sets to the + # state resolution algorithm. It may ask for some events, including + # the events we have yet to persist, so we need a slightly more + # complicated event lookup function than simply looking the events + # up in the db. - if db: - evs = yield self.get_events( - ev_ids, get_prev_content=False, check_redacted=False, - ) - to_return.update(evs) - defer.returnValue(to_return) + logger.info( + "Resolving state with %i state sets", len(state_sets), + ) - current_state = yield resolve_events_with_factory( - state_sets, - event_map={}, - state_map_factory=get_events, + events_map = {ev.event_id: ev for ev, _ in events_context} + + @defer.inlineCallbacks + def get_events(ev_ids): + # We get the events by first looking at the list of events we + # are trying to persist, and then fetching the rest from the DB. + db = [] + to_return = {} + for ev_id in ev_ids: + ev = events_map.get(ev_id, None) + if ev: + to_return[ev_id] = ev + else: + db.append(ev_id) + + if db: + evs = yield self.get_events( + ev_ids, get_prev_content=False, check_redacted=False, ) - defer.returnValue(current_state) - else: - return + to_return.update(evs) + defer.returnValue(to_return) + + current_state = yield resolve_events_with_factory( + state_sets, + event_map={}, + state_map_factory=get_events, + ) + defer.returnValue(current_state) @defer.inlineCallbacks def _calculate_state_delta(self, room_id, current_state): From ebfe64e3d69d0047ee9902a05beaf0249f11e072 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 30 Jan 2018 11:06:15 +0000 Subject: [PATCH 4/5] Use StateResolutionHandler to resolve state in persist events ... and thus benefit (hopefully) from its cache. --- synapse/storage/events.py | 72 +++++++++++++-------------------------- 1 file changed, 24 insertions(+), 48 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 1b5dffe1c7bf..ca1d4a39864a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -27,7 +27,6 @@ from synapse.util.metrics import Measure from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError -from synapse.state import resolve_events_with_factory from synapse.util.caches.descriptors import cached from synapse.types import get_domain_from_id @@ -237,6 +236,8 @@ def __init__(self, db_conn, hs): self._event_persist_queue = _EventPeristenceQueue() + self._state_resolution_handler = hs.get_state_resolution_handler() + def persist_events(self, events_and_contexts, backfilled=False): """ Write events to the database @@ -402,6 +403,7 @@ def _persist_events(self, events_and_contexts, backfilled=False, "Calculating state delta for room %s", room_id, ) current_state = yield self._get_new_state_after_events( + room_id, ev_ctx_rm, new_latest_event_ids, ) if current_state is not None: @@ -487,11 +489,14 @@ def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids): defer.returnValue(new_latest_event_ids) @defer.inlineCallbacks - def _get_new_state_after_events(self, events_context, new_latest_event_ids): + def _get_new_state_after_events(self, room_id, events_context, new_latest_event_ids): """Calculate the current state dict after adding some new events to a room Args: + room_id (str): + room to which the events are being added. Used for logging etc + events_context (list[(EventBase, EventContext)]): events and contexts which are being added to the room @@ -507,8 +512,8 @@ def _get_new_state_after_events(self, events_context, new_latest_event_ids): if not new_latest_event_ids: defer.returnValue({}) - state_sets = [] - state_groups = set() + # map from state_group to ((type, key) -> event_id) state map + state_groups = {} missing_event_ids = [] was_updated = False for event_id in new_latest_event_ids: @@ -529,11 +534,9 @@ def _get_new_state_after_events(self, events_context, new_latest_event_ids): # If we've already seen the state group don't bother adding # it to the state sets again if ctx.state_group not in state_groups: - state_sets.append(ctx.current_state_ids) + state_groups[ctx.state_group] = ctx.current_state_ids if ctx.delta_ids or hasattr(ev, "state_key"): was_updated = True - # Add this as a seen state group - state_groups.add(ctx.state_group) break else: # If we couldn't find it, then we'll need to pull @@ -550,55 +553,28 @@ def _get_new_state_after_events(self, events_context, new_latest_event_ids): missing_event_ids, ) - groups = set(event_to_groups.itervalues()) - state_groups + groups = set(event_to_groups.itervalues()) - set(state_groups.iterkeys()) if groups: group_to_state = yield self._get_state_for_groups(groups) - state_sets.extend(group_to_state.itervalues()) + state_groups.update(group_to_state) - if len(state_sets) == 1: - # If there is only one state set, then we know what the current + if len(state_groups) == 1: + # If there is only one state group, then we know what the current # state is. - defer.returnValue(state_sets[0]) - - # We work out the current state by passing the state sets to the - # state resolution algorithm. It may ask for some events, including - # the events we have yet to persist, so we need a slightly more - # complicated event lookup function than simply looking the events - # up in the db. - - logger.info( - "Resolving state with %i state sets", len(state_sets), - ) - - events_map = {ev.event_id: ev for ev, _ in events_context} + defer.returnValue(state_groups.values()[0]) - @defer.inlineCallbacks def get_events(ev_ids): - # We get the events by first looking at the list of events we - # are trying to persist, and then fetching the rest from the DB. - db = [] - to_return = {} - for ev_id in ev_ids: - ev = events_map.get(ev_id, None) - if ev: - to_return[ev_id] = ev - else: - db.append(ev_id) - - if db: - evs = yield self.get_events( - ev_ids, get_prev_content=False, check_redacted=False, - ) - to_return.update(evs) - defer.returnValue(to_return) - - current_state = yield resolve_events_with_factory( - state_sets, - event_map={}, - state_map_factory=get_events, + return self.get_events( + ev_ids, get_prev_content=False, check_redacted=False, + ) + events_map = {ev.event_id: ev for ev, _ in events_context} + logger.debug("calling resolve_state_groups from preserve_events") + res = yield self._state_resolution_handler.resolve_state_groups( + room_id, state_groups, events_map, get_events ) - defer.returnValue(current_state) + + defer.returnValue(res.state) @defer.inlineCallbacks def _calculate_state_delta(self, room_id, current_state): From 630caf8a703250e0f568000958faee42f9336b72 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 13 Feb 2018 14:29:22 +0000 Subject: [PATCH 5/5] style nit --- synapse/storage/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ca1d4a39864a..3d5eb9bc026a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -528,7 +528,7 @@ def _get_new_state_after_events(self, room_id, events_context, new_latest_event_ # I don't think this can happen, but let's double-check raise Exception( "Context for new extremity event %s has no state " - "group" % event_id, + "group" % (event_id, ), ) # If we've already seen the state group don't bother adding