From 7b7fd273e312b62f4a39aa8b38cfd87c716e1ade Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 4 Jun 2018 01:04:20 +0300 Subject: [PATCH 01/31] untested attempt at deduplicating lazy-loaded members as per the proposal; we can deduplicate redundant lazy-loaded members which are sent in the same sync sequence. we do this heuristically rather than requiring the client to somehow tell us which members it has chosen to cache, by instead caching the last N members sent to a client, and not sending them again. For now we hardcode N to 100. Each cache for a given (user,device) tuple is in turn cached for up to X minutes (to avoid the caches building up). For now we hardcode X to 30. --- synapse/handlers/sync.py | 40 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index aaf2a406df92..7c50f8f7022c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -19,6 +19,8 @@ from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure, measure_func from synapse.util.caches.response_cache import ResponseCache +from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.caches.lrucache import LruCache from synapse.push.clientformat import format_push_rules_for_user from synapse.visibility import filter_events_for_client from synapse.types import RoomStreamToken @@ -33,6 +35,14 @@ logger = logging.getLogger(__name__) +# Store the cache that tracks which lazy-loaded members have been sent to a given +# client for no more than 30 minutes. +LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000 + +# Remember the last 100 members we sent to a client for the purposes of +# avoiding redundantly sending the same lazy-loaded members to the client +LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100 + SyncConfig = collections.namedtuple("SyncConfig", [ "user", @@ -182,6 +192,12 @@ def __init__(self, hs): self.response_cache = ResponseCache(hs, "sync") self.state = hs.get_state_handler() + # ExpiringCache((User, Device)) -> LruCache(member mxid string) + self.lazy_loaded_members_cache = ExpiringCache( + "lazy_loaded_members_cache", self.clock, + max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE + ) + def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, full_state=False): """Get the sync for a client if we have new data for it now. Otherwise @@ -599,10 +615,32 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke else: state_ids = {} if lazy_load_members: - # TODO: filter out redundant members based on their mxids (not their + # we can filter out redundant members based on their mxids (not their # event_ids) at this point. We know we can do it based on mxid as this # is an non-gappy incremental sync. + cache_key = (sync_config.user, sync_config.device) + cache = self.lazy_loaded_members_cache.get(cache_key) + if cache is None: + cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_AGE) + self.lazy_loaded_members_cache[cache_key] = cache + + # if it's a new sync sequence, then assume the client has had + # amnesia and doesn't want any recent lazy-loaded members + # de-duplicated. + if since_token is None: + cache.clear() + else: + # only send members which aren't in our LruCache (either because + # they're new to this client or have been pushed out of the cache) + types = [ + t for t in types if not cache.get(t[1]) + ] + + # add any types we are about to send into our LruCache + for t in types: + cache.put(t[1], True) + # strip off the (None, None) and filter to just room members types = types[:-1] if types: From f7bd5dad09bf2d44622682fa006f7759ba016c06 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sun, 10 Jun 2018 14:39:29 +0300 Subject: [PATCH 02/31] add include_redundant_members filter option & make it work --- synapse/api/filtering.py | 9 +++++ synapse/handlers/sync.py | 79 ++++++++++++++++++++++++++-------------- 2 files changed, 61 insertions(+), 27 deletions(-) diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 1278f8c07c49..00955fef6e27 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -116,6 +116,9 @@ "lazy_load_members": { "type": "boolean" }, + "include_redundant_members": { + "type": "boolean" + }, } } @@ -266,6 +269,9 @@ def ephemeral_limit(self): def lazy_load_members(self): return self._room_state_filter.lazy_load_members() + def include_redundant_members(self): + return self._room_state_filter.include_redundant_members() + def filter_presence(self, events): return self._presence_filter.filter(events) @@ -425,6 +431,9 @@ def limit(self): def lazy_load_members(self): return self.filter_json.get("lazy_load_members", False) + def include_redundant_members(self): + return self.filter_json.get("include_redundant_members", False) + def _matches_wildcard(actual_value, filter_value): if filter_value.endswith("*"): diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 7c50f8f7022c..966aa24c813b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -515,6 +515,9 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke types = None member_state_ids = {} lazy_load_members = sync_config.filter_collection.lazy_load_members() + include_redundant_members = ( + sync_config.filter_collection.include_redundant_members() + ) if lazy_load_members: # We only request state for the members needed to display the @@ -528,10 +531,26 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke ) ] - # We can't remove redundant member types at this stage as it has + # We can't remove redundant member types at this stage as it sometimes has # to be done based on event_id, and we don't have the member # event ids until we've pulled them out of the DB. + # however, we can create the cache if needed: + if not include_redundant_members: + # we can filter out redundant members based on their mxids (not + # their event_ids) at this point. We know we can do it based on + # mxid as this is an non-gappy incremental sync. + + cache_key = (sync_config.user.to_string(), sync_config.device_id) + cache = self.lazy_loaded_members_cache.get(cache_key) + if cache is None: + logger.debug("creating LruCache for %r", cache_key) + cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_AGE) + self.lazy_loaded_members_cache[cache_key] = cache + else: + logger.debug("found LruCache for %r", cache_key) + + if not types: # an optimisation to stop needlessly trying to calculate # member_state_ids @@ -565,6 +584,11 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke for t in state_ids if t[0] == EventTypes.Member } + if not include_redundant_members: + # add any types we are about to send into our LruCache + for t in types[:-1]: + cache.set(t[1], True) + timeline_state = { (event.type, event.state_key): event.event_id for event in batch.events if event.is_state() @@ -600,6 +624,11 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke for t in state_at_timeline_start if t[0] == EventTypes.Member } + if not include_redundant_members: + # add any types we are about to send into our LruCache + for t in types[:-1]: + cache.set(t[1], True) + timeline_state = { (event.type, event.state_key): event.event_id for event in batch.events if event.is_state() @@ -615,34 +644,30 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke else: state_ids = {} if lazy_load_members: - # we can filter out redundant members based on their mxids (not their - # event_ids) at this point. We know we can do it based on mxid as this - # is an non-gappy incremental sync. - - cache_key = (sync_config.user, sync_config.device) - cache = self.lazy_loaded_members_cache.get(cache_key) - if cache is None: - cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_AGE) - self.lazy_loaded_members_cache[cache_key] = cache - - # if it's a new sync sequence, then assume the client has had - # amnesia and doesn't want any recent lazy-loaded members - # de-duplicated. - if since_token is None: - cache.clear() - else: - # only send members which aren't in our LruCache (either because - # they're new to this client or have been pushed out of the cache) - types = [ - t for t in types if not cache.get(t[1]) - ] - - # add any types we are about to send into our LruCache - for t in types: - cache.put(t[1], True) - # strip off the (None, None) and filter to just room members types = types[:-1] + + if not include_redundant_members: + # if it's a new sync sequence, then assume the client has had + # amnesia and doesn't want any recent lazy-loaded members + # de-duplicated. + if since_token is None: + logger.debug("clearing LruCache for %r", cache_key) + cache.clear() + else: + # only send members which aren't in our LruCache (either + # because they're new to this client or have been pushed out + # of the cache) + logger.debug("filtering types from %r...", types) + types = [ + t for t in types if not cache.get(t[1]) + ] + logger.debug("...to %r", types) + + # add any types we are about to send into our LruCache + for t in types: + cache.set(t[1], True) + if types: state_ids = yield self.store.get_state_ids_for_event( batch.events[0].event_id, types=types From a08b37b296bd9b37e0ba9b9224e265a4e145d1e4 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 20 Jul 2018 00:20:13 +0100 Subject: [PATCH 03/31] fix bad merge --- synapse/handlers/sync.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 0dad3626fbff..39cfb7a0080d 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -585,13 +585,13 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke t: state_ids[t] for t in state_ids if t[0] == EventTypes.Member } - else: - member_state_ids = {} if not include_redundant_members: # add any types we are about to send into our LruCache for t in types: cache.set(t[1], True) + else: + member_state_ids = {} timeline_state = { (event.type, event.state_key): event.event_id @@ -638,13 +638,13 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke t: state_at_timeline_start[t] for t in state_at_timeline_start if t[0] == EventTypes.Member } - else: - member_state_ids = {} if not include_redundant_members: # add any types we are about to send into our LruCache for t in types: cache.set(t[1], True) + else: + member_state_ids = {} timeline_state = { (event.type, event.state_key): event.event_id From 7362e6c6679414388484cbd3093dbd1b028cb8be Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 20 Jul 2018 02:53:18 +0100 Subject: [PATCH 04/31] make /context lazyload & filter aware --- synapse/handlers/room.py | 24 +++++++++++++++++++++--- synapse/handlers/search.py | 2 +- synapse/rest/client/v1/room.py | 9 +++++++++ synapse/storage/stream.py | 14 +++++++++++--- 4 files changed, 42 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index f67512078ba9..73237a8eeacb 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -397,7 +397,7 @@ def send(etype, content, **kwargs): class RoomContextHandler(BaseHandler): @defer.inlineCallbacks - def get_event_context(self, user, room_id, event_id, limit): + def get_event_context(self, user, room_id, event_id, limit, event_filter): """Retrieves events, pagination tokens and state around a given event in a room. @@ -407,6 +407,8 @@ def get_event_context(self, user, room_id, event_id, limit): event_id (str) limit (int): The maximum number of events to return in total (excluding state). + event_filter (Filter): the filter to apply to the events returned + (excluding the target event_id) Returns: dict, or None if the event isn't found @@ -441,7 +443,7 @@ def filter_evts(events): ) results = yield self.store.get_events_around( - room_id, event_id, before_limit, after_limit + room_id, event_id, before_limit, after_limit, event_filter ) results["events_before"] = yield filter_evts(results["events_before"]) @@ -453,8 +455,24 @@ def filter_evts(events): else: last_event_id = event_id + types = None + filtered_types = None + if event_filter and event_filter.lazy_load_members(): + members = {} + for ev in ( + results["events_before"] + + results["event"] + + results["events_after"] + ): + members[ev.sender] = True + filtered_types = [EventTypes.Member] + types = [(EventTypes.Member, member) for member in members.keys()] + + # XXX: why do we return the state as of the last event rather than the + # first? Shouldn't we be consistent with /sync? + state = yield self.store.get_state_for_events( - [last_event_id], None + [last_event_id], types, filtered_types=filtered_types ) results["state"] = list(state[last_event_id].values()) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 69ae9731d585..c464adbd0b9c 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -287,7 +287,7 @@ def search(self, user, content, batch=None): contexts = {} for event in allowed_events: res = yield self.store.get_events_around( - event.room_id, event.event_id, before_limit, after_limit + event.room_id, event.event_id, before_limit, after_limit, ) logger.info( diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index b9512a2b61bc..c8f48220eb33 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -533,11 +533,20 @@ def on_GET(self, request, room_id, event_id): limit = parse_integer(request, "limit", default=10) + # for symmetry with /messages for now + filter_bytes = parse_string(request, "filter") + if filter_bytes: + filter_json = urlparse.unquote(filter_bytes).decode("UTF-8") + event_filter = Filter(json.loads(filter_json)) + else: + event_filter = None + results = yield self.handlers.room_context_handler.get_event_context( requester.user, room_id, event_id, limit, + event_filter, ) if not results: diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 66856342f0f9..aa12c76af542 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -527,7 +527,9 @@ def _set_before_and_after(events, rows, topo_order=True): ) @defer.inlineCallbacks - def get_events_around(self, room_id, event_id, before_limit, after_limit): + def get_events_around( + self, room_id, event_id, before_limit, after_limit, event_filter=None, + ): """Retrieve events and pagination tokens around a given event in a room. @@ -536,6 +538,7 @@ def get_events_around(self, room_id, event_id, before_limit, after_limit): event_id (str) before_limit (int) after_limit (int) + event_filter (Filter|None) Returns: dict @@ -543,7 +546,7 @@ def get_events_around(self, room_id, event_id, before_limit, after_limit): results = yield self.runInteraction( "get_events_around", self._get_events_around_txn, - room_id, event_id, before_limit, after_limit + room_id, event_id, before_limit, after_limit, event_filter ) events_before = yield self._get_events( @@ -563,7 +566,9 @@ def get_events_around(self, room_id, event_id, before_limit, after_limit): "end": results["after"]["token"], }) - def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_limit): + def _get_events_around_txn( + self, txn, room_id, event_id, before_limit, after_limit, event_filter + ): """Retrieves event_ids and pagination tokens around a given event in a room. @@ -572,6 +577,7 @@ def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_lim event_id (str) before_limit (int) after_limit (int) + event_filter (Filter|None) Returns: dict @@ -601,11 +607,13 @@ def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_lim rows, start_token = self._paginate_room_events_txn( txn, room_id, before_token, direction='b', limit=before_limit, + event_filter=event_filter, ) events_before = [r.event_id for r in rows] rows, end_token = self._paginate_room_events_txn( txn, room_id, after_token, direction='f', limit=after_limit, + event_filter=event_filter, ) events_after = [r.event_id for r in rows] From cd28d2fc2f8f89a43431a6666d997d1120af27ba Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 20 Jul 2018 03:46:56 +0100 Subject: [PATCH 05/31] speed up /members and add at= and membership params --- synapse/handlers/message.py | 22 +++++++++-- synapse/rest/client/v1/room.py | 21 ++++++++++- synapse/storage/state.py | 67 ++++++++++++++++++++++++++++++++++ 3 files changed, 106 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a39b852cebfc..32e7337053e4 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -329,7 +329,10 @@ def _check_in_room_or_world_readable(self, room_id, user_id): ) @defer.inlineCallbacks - def get_state_events(self, user_id, room_id, is_guest=False): + def get_state_events( + self, user_id, room_id, types=None, filter_types=None, + at_event=None, is_guest=False + ): """Retrieve all state events for a given room. If the user is joined to the room then return the current state. If the user has left the room return the state events from when they left. @@ -337,6 +340,12 @@ def get_state_events(self, user_id, room_id, is_guest=False): Args: user_id(str): The user requesting state events. room_id(str): The room ID to get all state events from. + types(list[(Str, (Str|None))]): the (type, state_key)s to return + results for. + filter_types(list[Str]): the list of types to apply the types filter + to. + at_event(str): the event_id we are requesting the state as of + is_guest(Boolean): whether this user is a guest Returns: A list of dicts representing state events. [{}, {}, {}] """ @@ -345,10 +354,17 @@ def get_state_events(self, user_id, room_id, is_guest=False): ) if membership == Membership.JOIN: - room_state = yield self.state_handler.get_current_state(room_id) + if at_event: + room_state = yield self.store.get_state_for_events( + [at_event], types, filter_types=filter_types + ) + else: + room_state = yield self.store.get_current_state( + room_id, types, filter_types=filter_types + ) elif membership == Membership.LEAVE: room_state = yield self.store.get_state_for_events( - [membership_event_id], None + [membership_event_id], types, filter_types=filter_types ) room_state = room_state[membership_event_id] diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index c8f48220eb33..b84552e12fc7 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -384,15 +384,32 @@ def on_GET(self, request, room_id): # TODO support Pagination stream API (limit/tokens) requester = yield self.auth.get_user_by_req(request) handler = self.handlers.message_handler + + # request the state as of a given event + # useful for synchronising with /messages + at_event = parse_string(request, "at") + + # let you filter down on particular memberships + membership = parse_string(request, "membership") + not_membership = parse_string(request, "not_membership") + events = yield handler.get_state_events( room_id=room_id, user_id=requester.user.to_string(), + at_event=at_event, + types=[(EventTypes.Member, None)], ) chunk = [] for event in events: - if event["type"] != EventTypes.Member: + if ( + membership and + ( + event.content.get("membership") != membership or + event.content.get("membership") == not_membership + ) + ): continue chunk.append(event) @@ -401,6 +418,8 @@ def on_GET(self, request, room_id): })) +# deprecated in favour of /members?membership=join? +# except it does custom AS logic and has a simpler return format class JoinedRoomMemberListRestServlet(ClientV1RestServlet): PATTERNS = client_path_patterns("/rooms/(?P[^/]*)/joined_members$") diff --git a/synapse/storage/state.py b/synapse/storage/state.py index f09be7172dc0..0361f48f2ef0 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -89,6 +89,73 @@ def _get_current_state_ids_txn(txn): _get_current_state_ids_txn, ) + # FIXME: how should this be cached? + @defer.inlineCallbacks + def get_current_state(self, room_id, types, filtered_types=None): + """Get the current state event of a given type for a room based on the + current_state_events table. This may not be as up-to-date as the result + of doing a fresh state resolution as per state_handler.get_current_state + Args: + room_id (str) + types (list[(Str, (Str|None))]): List of (type, state_key) tuples + which are used to filter the state fetched. `state_key` may be + None, which matches any `state_key` + filtered_types (list[Str]|None): List of types to apply the above filter to. + Returns: + deferred: dict of (type, state_key) -> event + """ + + include_other_types = False if filtered_types is None else True + + def _get_current_state_txn(txn): + sql = """SELECT type, state_key, event_id FROM current_state_events + WHERE room_id = ? and %s""" + # Turns out that postgres doesn't like doing a list of OR's and + # is about 1000x slower, so we just issue a query for each specific + # type seperately. + if types: + clause_to_args = [ + ( + "AND type = ? AND state_key = ?", + (etype, state_key) + ) if state_key is not None else ( + "AND type = ?", + (etype,) + ) + for etype, state_key in types + ] + + if include_other_types: + unique_types = set(filtered_types) + clause_to_args.append( + ( + "AND type <> ? " * len(unique_types), + list(unique_types) + ) + ) + else: + # If types is None we fetch all the state, and so just use an + # empty where clause with no extra args. + clause_to_args = [("", [])] + for where_clause, where_args in clause_to_args: + args = [room_id] + args.extend(where_args) + txn.execute(sql % (where_clause,), args) + for row in txn: + typ, state_key, event_id = row + key = (typ, state_key) + results[intern_string(key)] = event_id + return results + + results = self.runInteraction( + "get_current_state", + _get_current_state_txn, + ) + for (key, event_id) in iteritems(results): + results[key] = yield self.store.get_event(event_id, allow_none=True) + + defer.returnValue(results) + @cached(max_entries=10000, iterable=True) def get_state_group_delta(self, state_group): """Given a state group try to return a previous group and a delta between From a17f0b63df1009af84dc02f43b733523ee915682 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 20 Jul 2018 14:44:37 +0100 Subject: [PATCH 06/31] make it work --- synapse/handlers/room.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 73237a8eeacb..9bfe610e1fda 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -407,7 +407,7 @@ def get_event_context(self, user, room_id, event_id, limit, event_filter): event_id (str) limit (int): The maximum number of events to return in total (excluding state). - event_filter (Filter): the filter to apply to the events returned + event_filter (Filter|None): the filter to apply to the events returned (excluding the target event_id) Returns: @@ -461,7 +461,7 @@ def filter_evts(events): members = {} for ev in ( results["events_before"] + - results["event"] + + [results["event"]] + results["events_after"] ): members[ev.sender] = True From c6117fab22896497037333c01f51035d888515db Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sat, 21 Jul 2018 00:55:27 +0100 Subject: [PATCH 07/31] make it work --- synapse/handlers/message.py | 17 ++++++++++------- synapse/storage/state.py | 25 ++++++++++--------------- tests/storage/test_state.py | 2 +- 3 files changed, 21 insertions(+), 23 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 32e7337053e4..50d69fcc7929 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -330,19 +330,20 @@ def _check_in_room_or_world_readable(self, room_id, user_id): @defer.inlineCallbacks def get_state_events( - self, user_id, room_id, types=None, filter_types=None, + self, user_id, room_id, types=None, filtered_types=None, at_event=None, is_guest=False ): """Retrieve all state events for a given room. If the user is joined to the room then return the current state. If the user has - left the room return the state events from when they left. + left the room return the state events from when they left. If an explicit + 'at' parameter is passed, return the state events as of that event. Args: user_id(str): The user requesting state events. room_id(str): The room ID to get all state events from. types(list[(Str, (Str|None))]): the (type, state_key)s to return results for. - filter_types(list[Str]): the list of types to apply the types filter + filtered_types(list[Str]): the list of types to apply the types filter to. at_event(str): the event_id we are requesting the state as of is_guest(Boolean): whether this user is a guest @@ -356,15 +357,17 @@ def get_state_events( if membership == Membership.JOIN: if at_event: room_state = yield self.store.get_state_for_events( - [at_event], types, filter_types=filter_types + [at_event], types, filtered_types=filtered_types ) + room_state = room_state[at_event] else: - room_state = yield self.store.get_current_state( - room_id, types, filter_types=filter_types + state_ids = yield self.store.get_filtered_current_state_ids( + room_id, types, filtered_types=filtered_types ) + room_state = yield self.store.get_events(state_ids.values()) elif membership == Membership.LEAVE: room_state = yield self.store.get_state_for_events( - [membership_event_id], types, filter_types=filter_types + [membership_event_id], types, filtered_types=filtered_types ) room_state = room_state[membership_event_id] diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 0361f48f2ef0..88b21ef80e11 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -90,8 +90,7 @@ def _get_current_state_ids_txn(txn): ) # FIXME: how should this be cached? - @defer.inlineCallbacks - def get_current_state(self, room_id, types, filtered_types=None): + def get_filtered_current_state_ids(self, room_id, types, filtered_types=None): """Get the current state event of a given type for a room based on the current_state_events table. This may not be as up-to-date as the result of doing a fresh state resolution as per state_handler.get_current_state @@ -107,9 +106,10 @@ def get_current_state(self, room_id, types, filtered_types=None): include_other_types = False if filtered_types is None else True - def _get_current_state_txn(txn): + def _get_filtered_current_state_ids_txn(txn): + results = {} sql = """SELECT type, state_key, event_id FROM current_state_events - WHERE room_id = ? and %s""" + WHERE room_id = ? %s""" # Turns out that postgres doesn't like doing a list of OR's and # is about 1000x slower, so we just issue a query for each specific # type seperately. @@ -143,18 +143,14 @@ def _get_current_state_txn(txn): txn.execute(sql % (where_clause,), args) for row in txn: typ, state_key, event_id = row - key = (typ, state_key) - results[intern_string(key)] = event_id + key = (intern_string(typ), intern_string(state_key)) + results[key] = event_id return results - results = self.runInteraction( - "get_current_state", - _get_current_state_txn, + return self.runInteraction( + "get_filtered_current_state_ids", + _get_filtered_current_state_ids_txn, ) - for (key, event_id) in iteritems(results): - results[key] = yield self.store.get_event(event_id, allow_none=True) - - defer.returnValue(results) @cached(max_entries=10000, iterable=True) def get_state_group_delta(self, state_group): @@ -452,8 +448,7 @@ def get_state_for_events(self, event_ids, types, filtered_types=None): If None, `types` filtering is applied to all events. Returns: - deferred: A list of dicts corresponding to the event_ids given. - The dicts are mappings from (type, state_key) -> state_events + deferred: A dict of (event_id) -> (type, state_key) -> [state_events] """ event_to_groups = yield self._get_state_group_for_events( event_ids, diff --git a/tests/storage/test_state.py b/tests/storage/test_state.py index 8924ba9f7f90..5f21b1ec5e9a 100644 --- a/tests/storage/test_state.py +++ b/tests/storage/test_state.py @@ -146,7 +146,7 @@ def test_get_state_for_event(self): (e5.type, e5.state_key): e5, }, state) - # check we can use filter_types to grab a specific room member + # check we can use filtered_types to grab a specific room member # without filtering out the other event types state = yield self.store.get_state_for_event( e5.event_id, [(EventTypes.Member, self.u_alice.to_string())], From 8f1585d207b7a98a3c7b0046df524a868ba77e45 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sat, 21 Jul 2018 12:19:15 +0100 Subject: [PATCH 08/31] make filtering work --- synapse/rest/client/v1/room.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index b84552e12fc7..8fa8230f9de4 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -404,11 +404,8 @@ def on_GET(self, request, room_id): for event in events: if ( - membership and - ( - event.content.get("membership") != membership or - event.content.get("membership") == not_membership - ) + (membership and event['content'].get("membership") != membership) or + (not_membership and event['content'].get("membership") == not_membership) ): continue chunk.append(event) From 42308c01a31dc6fb041ca9358b123cc3f630538a Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 23 Jul 2018 04:07:12 +0100 Subject: [PATCH 09/31] initial cut at a room summary API works, although considers all room membership events rather than just joined ones as i'm failing to see a nice way to filter based on membership --- synapse/handlers/sync.py | 110 ++++++++++++++++++++++++++- synapse/rest/client/v2_alpha/sync.py | 1 + synapse/storage/_base.py | 5 +- synapse/storage/state.py | 4 +- 4 files changed, 113 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 39cfb7a0080d..f370934a7c48 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -75,6 +75,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ "ephemeral", "account_data", "unread_notifications", + "summary", ])): __slots__ = [] @@ -494,10 +495,105 @@ def get_state_at(self, room_id, stream_position, types=None, filtered_types=None state = {} defer.returnValue(state) + @defer.inlineCallbacks + def compute_summary(self, room_id, sync_config, batch, state, now_token): + """ Works out a room summary block for this room, summarising the number + of joined members in the room, and providing the 'hero' members if the + room has no name so clients can consistently name rooms. Also adds + state events to 'state' if needed to describe the heros. + + Args: + room_id(str): + sync_config(synapse.handlers.sync.SyncConfig): + batch(synapse.handlers.sync.TimelineBatch): The timeline batch for + the room that will be sent to the user. + state(dict): dict of (type, state_key) -> Event as returned by + compute_state_delta + now_token(str): Token of the end of the current batch. + full_state(bool): Whether to force returning the full state. + + Returns: + A deferred dict describing the room summary + """ + + # FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305 + last_events, _ = yield self.store.get_recent_events_for_room( + room_id, end_token=now_token.room_key, limit=1, + ) + + if not last_events: + defer.returnValue(None) + return + + last_event = last_events[-1] + state_ids = yield self.store.get_state_ids_for_event( + last_event.event_id, [ + (EventTypes.Member, None), + (EventTypes.Name, ''), + (EventTypes.CanonicalAlias, ''), + ] + ) + + logger.warn("got state_ids %r", state_ids) + + member_ids = { + state_key: event_id + for (t, state_key), event_id in state_ids.iteritems() + if t == EventTypes.Member + } + name_id = state_ids.get((EventTypes.Name, '')) + canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, '')) + + summary = {} + + # ideally we could do this, but the actual events haven't been loaded... + # summary['joined_member_count'] = len([ + # s for s in member_ids if s.content.get("membership") == 'join' + # ]) + + # FIXME: this includes left/banned/invited users + summary['joined_member_count'] = len(member_ids) + + if not name_id and not canonical_alias_id: + # FIXME: this includes left/banned/invited users + # FIXME: if joined_member_count is 0, return left users instead + summary['heros'] = sorted(member_ids.keys())[0:5] + + # ensure we send membership events for heros if needed + cache_key = (sync_config.user.to_string(), sync_config.device_id) + cache = self.lazy_loaded_members_cache.get(cache_key) + + existing_members = { + user_id: True for (typ, user_id) in state.keys() + if typ == EventTypes.Member + } + + for ev in batch.events: + if ev.type == EventTypes.Member: + existing_members[ev.state_key] = True + + missing_hero_event_ids = [ + member_ids[hero_id] + for hero_id in summary['heros'] + if ( + not cache.get(hero_id) and + hero_id not in existing_members + ) + ] + + missing_hero_state = yield self.store.get_events(missing_hero_event_ids) + missing_hero_state = missing_hero_state.values() + + for s in missing_hero_state: + cache.set(s.state_key, True) + state[(EventTypes.Member, s.state_key)] = s + + defer.returnValue(summary) + @defer.inlineCallbacks def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token, full_state): - """ Works out the differnce in state between the start of the timeline + """ Works out the difference in state between the start of the timeline and the previous sync. Args: @@ -511,7 +607,7 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke full_state(bool): Whether to force returning the full state. Returns: - A deferred new event dictionary + A deferred dict of (type, state_key) -> Event """ # TODO(mjark) Check if the state events were received by the server # after the previous sync, since we need to include those state @@ -1504,6 +1600,15 @@ def _generate_room_entry(self, sync_result_builder, ignored_users, full_state=full_state ) + summary = None + if ( + sync_config.filter_collection.lazy_load_members() and + any(ev.type == EventTypes.Member for ev in batch.events) + ): + summary = yield self.compute_summary( + room_id, sync_config, batch, state, now_token + ) + if room_builder.rtype == "joined": unread_notifications = {} room_sync = JoinedSyncResult( @@ -1513,6 +1618,7 @@ def _generate_room_entry(self, sync_result_builder, ignored_users, ephemeral=ephemeral, account_data=account_data_events, unread_notifications=unread_notifications, + summary=summary, ) if room_sync or always_include: diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 8aa06faf23d7..1275baa1ba9a 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -370,6 +370,7 @@ def serialize(event): ephemeral_events = room.ephemeral result["ephemeral"] = {"events": ephemeral_events} result["unread_notifications"] = room.unread_notifications + result["summary"] = room.summary return result diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 1d41d8d44535..144e15106b85 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -1144,7 +1144,7 @@ def get_user_list_paginate(self, table, keyvalues, pagevalues, retcols, defer.returnValue(retval) def get_user_count_txn(self, txn): - """Get a total number of registerd users in the users list. + """Get a total number of registered users in the users list. Args: txn : Transaction object @@ -1153,8 +1153,7 @@ def get_user_count_txn(self, txn): """ sql_count = "SELECT COUNT(*) FROM users WHERE is_guest = 0;" txn.execute(sql_count) - count = txn.fetchone()[0] - defer.returnValue(count) + return txn.fetchone()[0] def _simple_search_list(self, table, term, col, retcols, desc="_simple_search_list"): diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 88b21ef80e11..1056393a6745 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -476,7 +476,7 @@ def get_state_for_events(self, event_ids, types, filtered_types=None): @defer.inlineCallbacks def get_state_ids_for_events(self, event_ids, types=None, filtered_types=None): """ - Get the state dicts corresponding to a list of events + Get the state ids corresponding to a list of events Args: event_ids(list(str)): events whose state should be returned @@ -489,7 +489,7 @@ def get_state_ids_for_events(self, event_ids, types=None, filtered_types=None): If None, `types` filtering is applied to all events. Returns: - A deferred dict from event_id -> (type, state_key) -> state_event + A deferred dict from event_id -> (type, state_key) -> event_id """ event_to_groups = yield self._get_state_group_for_events( event_ids, From 0beeecf7f211285b3a9b658a242b49feeba4caa2 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 23 Jul 2018 04:12:26 +0100 Subject: [PATCH 10/31] remove debug log --- synapse/handlers/sync.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index f370934a7c48..6873c72c7ad7 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -534,8 +534,6 @@ def compute_summary(self, room_id, sync_config, batch, state, now_token): ] ) - logger.warn("got state_ids %r", state_ids) - member_ids = { state_key: event_id for (t, state_key), event_id in state_ids.iteritems() From 63ce31b3e840f2e43613683fef2ccd005dbeb0c7 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 23 Jul 2018 04:28:13 +0100 Subject: [PATCH 11/31] namespace the summary fields correctly --- synapse/handlers/sync.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 6873c72c7ad7..9accadd8032d 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -545,17 +545,19 @@ def compute_summary(self, room_id, sync_config, batch, state, now_token): summary = {} # ideally we could do this, but the actual events haven't been loaded... - # summary['joined_member_count'] = len([ + # summary['m.joined_member_count'] = len([ # s for s in member_ids if s.content.get("membership") == 'join' # ]) # FIXME: this includes left/banned/invited users - summary['joined_member_count'] = len(member_ids) + summary['m.joined_member_count'] = len(member_ids) if not name_id and not canonical_alias_id: # FIXME: this includes left/banned/invited users # FIXME: if joined_member_count is 0, return left users instead - summary['heros'] = sorted(member_ids.keys())[0:5] + # FIXME: order by stream ordering, not alphabetic + # FIXME: exclude the current logged in user + summary['m.heros'] = sorted(member_ids.keys())[0:5] # ensure we send membership events for heros if needed cache_key = (sync_config.user.to_string(), sync_config.device_id) From c8cbeded45eead76c0996e2729ea4ab53cf221c1 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Mon, 23 Jul 2018 12:43:28 +0200 Subject: [PATCH 12/31] fix key typo --- synapse/handlers/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9accadd8032d..2c7c4abe5d84 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -574,7 +574,7 @@ def compute_summary(self, room_id, sync_config, batch, state, now_token): missing_hero_event_ids = [ member_ids[hero_id] - for hero_id in summary['heros'] + for hero_id in summary['m.heros'] if ( not cache.get(hero_id) and hero_id not in existing_members From 7d99b0efcfdb0f65bf30227ce58df19c2a91cc96 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 23 Jul 2018 22:33:31 +0100 Subject: [PATCH 13/31] changelog --- changelog.d/3331.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3331.feature diff --git a/changelog.d/3331.feature b/changelog.d/3331.feature new file mode 100644 index 000000000000..e574b9bcc3af --- /dev/null +++ b/changelog.d/3331.feature @@ -0,0 +1 @@ +add support for the include_redundant_members filter param as per MSC1227 From cd27a77c4e51fcdaa9f373244c69a424b16ba8c7 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 23 Jul 2018 22:35:03 +0100 Subject: [PATCH 14/31] changelog --- changelog.d/3567.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3567.feature diff --git a/changelog.d/3567.feature b/changelog.d/3567.feature new file mode 100644 index 000000000000..c74c1f57a90a --- /dev/null +++ b/changelog.d/3567.feature @@ -0,0 +1 @@ +make the /context API filter & lazy-load aware as per MSC1227 From 4018a6df604ef3984cec94c88e3e116adb703f65 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 23 Jul 2018 22:36:10 +0100 Subject: [PATCH 15/31] changelog --- changelog.d/3568.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3568.feature diff --git a/changelog.d/3568.feature b/changelog.d/3568.feature new file mode 100644 index 000000000000..247f02ba4e13 --- /dev/null +++ b/changelog.d/3568.feature @@ -0,0 +1 @@ +speed up /members API and add `at` and `membership` params as per MSC1227 From c238a88cac36030ac0a66b15b0b91db7dfbb4fd4 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 23 Jul 2018 22:36:58 +0100 Subject: [PATCH 16/31] changelog --- changelog.d/3574.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3574.feature diff --git a/changelog.d/3574.feature b/changelog.d/3574.feature new file mode 100644 index 000000000000..87ac32df7249 --- /dev/null +++ b/changelog.d/3574.feature @@ -0,0 +1 @@ +implement `summary` block in /sync response as per MSC688 From 1ba36830c0850944b9992cb2cf276da0d2165611 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 24 Jul 2018 14:53:25 +0100 Subject: [PATCH 17/31] return the correct counts & self-exclude from heros --- synapse/handlers/sync.py | 37 +++++++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 739784ca6f8c..957d4374d574 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -544,20 +544,37 @@ def compute_summary(self, room_id, sync_config, batch, state, now_token): summary = {} - # ideally we could do this, but the actual events haven't been loaded... - # summary['m.joined_member_count'] = len([ - # s for s in member_ids if s.content.get("membership") == 'join' - # ]) + # FIXME: it feels very heavy to load up every single membership event + # just to calculate the counts. get_joined_users_from_context might + # help us, but we don't have an EventContext at this point, and we need + # to know more than just the joined user stats. + member_events = yield self.store.get_events(member_ids.values()) - # FIXME: this includes left/banned/invited users - summary['m.joined_member_count'] = len(member_ids) + joined_user_ids = [] + invited_user_ids = [] + + for ev in member_events.values(): + if ev.content.get("membership") == Membership.JOIN: + joined_user_ids.append(ev.state_key) + elif ev.content.get("membership") == Membership.INVITE: + invited_user_ids.append(ev.state_key) + + summary["m.joined_member_count"] = len(joined_user_ids) + if invited_user_ids: + summary["m.invited_member_count"] = len(invited_user_ids) if not name_id and not canonical_alias_id: - # FIXME: this includes left/banned/invited users - # FIXME: if joined_member_count is 0, return left users instead # FIXME: order by stream ordering, not alphabetic - # FIXME: exclude the current logged in user - summary['m.heros'] = sorted(member_ids.keys())[0:5] + + me = sync_config.user.to_string() + if summary["m.joined_member_count"] == 0: + summary['m.heros'] = sorted( + [user_id for user_id in member_ids.keys() if user_id != me] + )[0:5] + else: + summary['m.heros'] = sorted( + [user_id for user_id in joined_user_ids if user_id != me] + )[0:5] # ensure we send membership events for heros if needed cache_key = (sync_config.user.to_string(), sync_config.device_id) From f12939038f5526b4918374d3bef70fcffc56dc47 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 24 Jul 2018 14:56:40 +0100 Subject: [PATCH 18/31] spell heroes correctly --- synapse/handlers/sync.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 957d4374d574..dd01fa8d4657 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -500,7 +500,7 @@ def compute_summary(self, room_id, sync_config, batch, state, now_token): """ Works out a room summary block for this room, summarising the number of joined members in the room, and providing the 'hero' members if the room has no name so clients can consistently name rooms. Also adds - state events to 'state' if needed to describe the heros. + state events to 'state' if needed to describe the heroes. Args: room_id(str): @@ -568,15 +568,15 @@ def compute_summary(self, room_id, sync_config, batch, state, now_token): me = sync_config.user.to_string() if summary["m.joined_member_count"] == 0: - summary['m.heros'] = sorted( + summary['m.heroes'] = sorted( [user_id for user_id in member_ids.keys() if user_id != me] )[0:5] else: - summary['m.heros'] = sorted( + summary['m.heroes'] = sorted( [user_id for user_id in joined_user_ids if user_id != me] )[0:5] - # ensure we send membership events for heros if needed + # ensure we send membership events for heroes if needed cache_key = (sync_config.user.to_string(), sync_config.device_id) cache = self.lazy_loaded_members_cache.get(cache_key) @@ -591,7 +591,7 @@ def compute_summary(self, room_id, sync_config, batch, state, now_token): missing_hero_event_ids = [ member_ids[hero_id] - for hero_id in summary['m.heros'] + for hero_id in summary['m.heroes'] if ( not cache.get(hero_id) and hero_id not in existing_members From e61071a9bf987056649830fece6b7f0642f2de64 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 24 Jul 2018 21:40:38 +0100 Subject: [PATCH 19/31] fix heroes definition to match MSC --- synapse/handlers/sync.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index dd01fa8d4657..efb3f055beea 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -567,7 +567,10 @@ def compute_summary(self, room_id, sync_config, batch, state, now_token): # FIXME: order by stream ordering, not alphabetic me = sync_config.user.to_string() - if summary["m.joined_member_count"] == 0: + if ( + summary["m.joined_member_count"] == 0 and + summary["m.invited_member_count"] == 0 + ): summary['m.heroes'] = sorted( [user_id for user_id in member_ids.keys() if user_id != me] )[0:5] From 238f750da209056eea6f15ab3ec55a11cee9ab5e Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 24 Jul 2018 23:30:48 +0100 Subject: [PATCH 20/31] deduplicating redundant members via event_id rather than mxid --- synapse/handlers/sync.py | 81 +++++++++++++++++----------------------- 1 file changed, 34 insertions(+), 47 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 4c074c0d2c91..64db6613adf7 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -192,7 +192,7 @@ def __init__(self, hs): self.response_cache = ResponseCache(hs, "sync") self.state = hs.get_state_handler() - # ExpiringCache((User, Device)) -> LruCache(member mxid string) + # ExpiringCache((User, Device)) -> LruCache(membership event_id) self.lazy_loaded_members_cache = ExpiringCache( "lazy_loaded_members_cache", self.clock, max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE @@ -541,15 +541,11 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke ] if not include_redundant_members: - # we can filter out redundant members based on their mxids (not - # their event_ids) at this point. We know we can do it based on - # mxid as this is an non-gappy incremental sync. - cache_key = (sync_config.user.to_string(), sync_config.device_id) cache = self.lazy_loaded_members_cache.get(cache_key) if cache is None: logger.debug("creating LruCache for %r", cache_key) - cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_AGE) + cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE) self.lazy_loaded_members_cache[cache_key] = cache else: logger.debug("found LruCache for %r", cache_key) @@ -557,6 +553,11 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke # only apply the filtering to room members filtered_types = [EventTypes.Member] + timeline_state = { + (event.type, event.state_key): event.event_id + for event in batch.events if event.is_state() + } + if full_state: if batch: current_state_ids = yield self.store.get_state_ids_for_event( @@ -577,16 +578,6 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke state_ids = current_state_ids - if lazy_load_members and not include_redundant_members: - # add any types we are about to send into our LruCache - for t in types: - cache.set(t[1], True) - - timeline_state = { - (event.type, event.state_key): event.event_id - for event in batch.events if event.is_state() - } - state_ids = _calculate_state( timeline_contains=timeline_state, timeline_start=state_ids, @@ -610,16 +601,6 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke filtered_types=filtered_types, ) - if lazy_load_members and not include_redundant_members: - # add any types we are about to send into our LruCache - for t in types: - cache.set(t[1], True) - - timeline_state = { - (event.type, event.state_key): event.event_id - for event in batch.events if event.is_state() - } - # TODO: optionally filter out redundant membership events at this # point, to stop repeatedly sending members in every /sync as if # the client isn't tracking them. @@ -640,33 +621,39 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke else: state_ids = {} if lazy_load_members: - if not include_redundant_members: - # if it's a new sync sequence, then assume the client has had - # amnesia and doesn't want any recent lazy-loaded members - # de-duplicated. - if since_token is None: - logger.debug("clearing LruCache for %r", cache_key) - cache.clear() - else: - # only send members which aren't in our LruCache (either - # because they're new to this client or have been pushed out - # of the cache) - logger.debug("filtering types from %r...", types) - types = [ - t for t in types if not cache.get(t[1]) - ] - logger.debug("...to %r", types) - - # add any types we are about to send into our LruCache - for t in types: - cache.set(t[1], True) - if types: state_ids = yield self.store.get_state_ids_for_event( batch.events[0].event_id, types=types, filtered_types=filtered_types, ) + if lazy_load_members and not include_redundant_members: + # if it's a new sync sequence, then assume the client has had + # amnesia and doesn't want any recent lazy-loaded members + # de-duplicated. + if since_token is None: + logger.debug("clearing LruCache for %r", cache_key) + cache.clear() + else: + # only send members which aren't in our LruCache (either + # because they're new to this client or have been pushed out + # of the cache) + logger.debug("filtering state from %r...", state_ids) + state_ids = { + t: state_id + for t, state_id in state_ids.iteritems() + if not cache.get(state_id) + } + logger.debug("...to %r", state_ids) + + # add any member IDs we are about to send into our LruCache + for t, event_id in itertools.chain( + state_ids.items(), + timeline_state.items(), + ): + if t[0] == EventTypes.Member: + cache.set(event_id, True) + state = {} if state_ids: state = yield self.store.get_events(list(state_ids.values())) From 08af91dd02b3f738f66cbae877c564dbdc3ae19a Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 24 Jul 2018 23:59:47 +0100 Subject: [PATCH 21/31] fix merge fail --- synapse/rest/client/v1/room.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 0312ecd8efc7..e061da5a2167 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -384,7 +384,7 @@ def __init__(self, hs): def on_GET(self, request, room_id): # TODO support Pagination stream API (limit/tokens) requester = yield self.auth.get_user_by_req(request) - handler = self.handlers.message_handler + handler = self.message_handler # request the state as of a given event # useful for synchronising with /messages From e9523684cb8337aa8335056b1b84753a2fc1234f Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 30 Jul 2018 09:17:28 -0700 Subject: [PATCH 22/31] incorporate review --- synapse/handlers/message.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 5529ebd1a836..04fca6849a1c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -119,12 +119,17 @@ def get_state_events( Args: user_id(str): The user requesting state events. room_id(str): The room ID to get all state events from. - types(list[(Str, (Str|None))]): the (type, state_key)s to return - results for. - filtered_types(list[Str]): the list of types to apply the types filter - to. - at_event(str): the event_id we are requesting the state as of - is_guest(Boolean): whether this user is a guest + types(list[(str, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. + May be None, which matches any key. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. + at_event(str|None): the event_id we are requesting the state as of. + If None, returns the current state based on the current_state_events + table. + is_guest(bool): whether this user is a guest Returns: A list of dicts representing state events. [{}, {}, {}] """ @@ -135,17 +140,17 @@ def get_state_events( if membership == Membership.JOIN: if at_event: room_state = yield self.store.get_state_for_events( - [at_event], types, filtered_types=filtered_types + [at_event], types, filtered_types=filtered_types, ) room_state = room_state[at_event] else: state_ids = yield self.store.get_filtered_current_state_ids( - room_id, types, filtered_types=filtered_types + room_id, types, filtered_types=filtered_types, ) room_state = yield self.store.get_events(state_ids.values()) elif membership == Membership.LEAVE: room_state = yield self.store.get_state_for_events( - [membership_event_id], types, filtered_types=filtered_types + [membership_event_id], types, filtered_types=filtered_types, ) room_state = room_state[membership_event_id] From eebee084009212397e017a5e9ee8f1ff0b71c6c6 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sun, 12 Aug 2018 17:17:25 +0200 Subject: [PATCH 23/31] convert /members?at= to take a stream token this feels much clunkier and more complicated for both clients and senders than just querying based on event_id, and i'm failing to see any way it's more correct than querying based on event_id. also fixes a thinko to check whether the user is allowed to view membership as of the given token --- synapse/handlers/message.py | 61 ++++++++++++++++++++++++---------- synapse/rest/client/v1/room.py | 21 ++++++++---- synapse/storage/events.py | 2 +- 3 files changed, 59 insertions(+), 25 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 681f58ae8250..6f938b877a10 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -31,11 +31,12 @@ from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.replication.http.send_event import ReplicationSendEventRestServlet -from synapse.types import RoomAlias, UserID +from synapse.types import RoomAlias, RoomStreamToken, UserID from synapse.util.async_helpers import Linearizer from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.logcontext import run_in_background from synapse.util.metrics import measure_func +from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -109,12 +110,13 @@ def _check_in_room_or_world_readable(self, room_id, user_id): @defer.inlineCallbacks def get_state_events( self, user_id, room_id, types=None, filtered_types=None, - at_event=None, is_guest=False + at_token=None, is_guest=False ): """Retrieve all state events for a given room. If the user is joined to the room then return the current state. If the user has left the room return the state events from when they left. If an explicit - 'at' parameter is passed, return the state events as of that event. + 'at' parameter is passed, return the state events as of that event, if + visible. Args: user_id(str): The user requesting state events. @@ -126,33 +128,56 @@ def get_state_events( filtered_types(list[str]|None): Only apply filtering via `types` to this list of event types. Other types of events are returned unfiltered. If None, `types` filtering is applied to all events. - at_event(str|None): the event_id we are requesting the state as of. - If None, returns the current state based on the current_state_events - table. + at_token(StreamToken|None): the stream token of the at which we are requesting + the stats. If the user is not allowed to view the state as of that + stream token, no events are returned. If None, returns the current + state based on the current_state_events table. is_guest(bool): whether this user is a guest Returns: A list of dicts representing state events. [{}, {}, {}] """ - membership, membership_event_id = yield self.auth.check_in_room_or_world_readable( - room_id, user_id - ) + if at_token: + # we have to turn the token into a event + stream_ordering = RoomStreamToken.parse_stream_token( + at_token.room_key + ).stream + + # XXX: is this the right method to be using? What id we don't yet have an + # event after this stream token? + (stream_ordering, topo_ordering, event_id) = ( + yield self.store.get_room_event_after_stream_ordering( + room_id, stream_ordering + ) + ) - if membership == Membership.JOIN: - if at_event: + # check we are even allowed to be reading the room at this point + event = yield self.store.get_event(event_id, allow_none=True) + visible_events = yield filter_events_for_client(self.store, user_id, [event]) + + if len(visible_events) > 0: room_state = yield self.store.get_state_for_events( - [at_event], types, filtered_types=filtered_types, + [event.event_id], types, filtered_types=filtered_types, ) - room_state = room_state[at_event] + room_state = room_state[event.event_id] else: + room_state = {} + else: + membership, membership_event_id = ( + yield self.auth.check_in_room_or_world_readable( + room_id, user_id + ) + ) + + if membership == Membership.JOIN: state_ids = yield self.store.get_filtered_current_state_ids( room_id, types, filtered_types=filtered_types, ) room_state = yield self.store.get_events(state_ids.values()) - elif membership == Membership.LEAVE: - room_state = yield self.store.get_state_for_events( - [membership_event_id], types, filtered_types=filtered_types, - ) - room_state = room_state[membership_event_id] + elif membership == Membership.LEAVE: + room_state = yield self.store.get_state_for_events( + [membership_event_id], types, filtered_types=filtered_types, + ) + room_state = room_state[membership_event_id] now = self.clock.time_msec() defer.returnValue( diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index fad6d2b79266..fcc109176014 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -34,7 +34,7 @@ parse_string, ) from synapse.streams.config import PaginationConfig -from synapse.types import RoomAlias, RoomID, ThirdPartyInstanceID, UserID +from synapse.types import RoomAlias, RoomID, StreamToken, ThirdPartyInstanceID, UserID from .base import ClientV1RestServlet, client_path_patterns @@ -386,18 +386,27 @@ def on_GET(self, request, room_id): requester = yield self.auth.get_user_by_req(request) handler = self.message_handler - # request the state as of a given event - # useful for synchronising with /messages - at_event = parse_string(request, "at") + # request the state as of a given event, as identified by a stream token, + # for consistency with /messages etc. + # useful for getting the membership in retrospect as of a given /sync + # response. + at_token_string = parse_string(request, "at") + if at_token_string is None: + at_token = None + else: + at_token = StreamToken.from_string(at_token_string) - # let you filter down on particular memberships + # let you filter down on particular memberships. + # XXX: this may not be the best shape for this API - we could pass in a filter + # instead, except filters aren't currently aware of memberships. + # See https://github.com/matrix-org/matrix-doc/issues/1337 for more details. membership = parse_string(request, "membership") not_membership = parse_string(request, "not_membership") events = yield handler.get_state_events( room_id=room_id, user_id=requester.user.to_string(), - at_event=at_event, + at_token=at_token, types=[(EventTypes.Member, None)], ) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d4aa192a0a49..eb4744b640ed 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1993,7 +1993,7 @@ def _purge_history_txn( max_depth = max(row[0] for row in rows) if max_depth <= token.topological: - # We need to ensure we don't delete all the events from the datanase + # We need to ensure we don't delete all the events from the database # otherwise we wouldn't be able to send any events (due to not # having any backwards extremeties) raise SynapseError( From d0c0d7208dcfadd4bd40640f9a4c8899653874db Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sun, 12 Aug 2018 18:34:11 +0200 Subject: [PATCH 24/31] incorporate all the review feedback --- synapse/handlers/sync.py | 91 +++++++++++++++++++++------------------- synapse/storage/_base.py | 2 +- synapse/storage/state.py | 3 +- 3 files changed, 51 insertions(+), 45 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 0a34334310b6..a3062b062bae 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -510,24 +510,23 @@ def compute_summary(self, room_id, sync_config, batch, state, now_token): state(dict): dict of (type, state_key) -> Event as returned by compute_state_delta now_token(str): Token of the end of the current batch. - full_state(bool): Whether to force returning the full state. Returns: A deferred dict describing the room summary """ # FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305 - last_events, _ = yield self.store.get_recent_events_for_room( + last_event_ids, _ = yield self.store.get_recent_event_ids_for_room( room_id, end_token=now_token.room_key, limit=1, ) - if not last_events: + if not last_event_ids: defer.returnValue(None) return - last_event = last_events[-1] + last_event_id = last_event_ids[-1] state_ids = yield self.store.get_state_ids_for_event( - last_event.event_id, [ + last_event_id, [ (EventTypes.Member, None), (EventTypes.Name, ''), (EventTypes.CanonicalAlias, ''), @@ -545,9 +544,7 @@ def compute_summary(self, room_id, sync_config, batch, state, now_token): summary = {} # FIXME: it feels very heavy to load up every single membership event - # just to calculate the counts. get_joined_users_from_context might - # help us, but we don't have an EventContext at this point, and we need - # to know more than just the joined user stats. + # just to calculate the counts. member_events = yield self.store.get_events(member_ids.values()) joined_user_ids = [] @@ -559,54 +556,60 @@ def compute_summary(self, room_id, sync_config, batch, state, now_token): elif ev.content.get("membership") == Membership.INVITE: invited_user_ids.append(ev.state_key) + # TODO: only send these when they change. summary["m.joined_member_count"] = len(joined_user_ids) - if invited_user_ids: - summary["m.invited_member_count"] = len(invited_user_ids) + summary["m.invited_member_count"] = len(invited_user_ids) if not name_id and not canonical_alias_id: # FIXME: order by stream ordering, not alphabetic me = sync_config.user.to_string() - if ( - summary["m.joined_member_count"] == 0 and - summary["m.invited_member_count"] == 0 - ): + if (joined_user_ids or invited_user_ids): summary['m.heroes'] = sorted( - [user_id for user_id in member_ids.keys() if user_id != me] + [ + user_id + for user_id in (joined_user_ids + invited_user_ids) + if user_id != me + ] )[0:5] else: summary['m.heroes'] = sorted( - [user_id for user_id in joined_user_ids if user_id != me] + [user_id for user_id in member_ids.keys() if user_id != me] )[0:5] - # ensure we send membership events for heroes if needed - cache_key = (sync_config.user.to_string(), sync_config.device_id) - cache = self.lazy_loaded_members_cache.get(cache_key) - - existing_members = { - user_id: True for (typ, user_id) in state.keys() - if typ == EventTypes.Member - } - - for ev in batch.events: - if ev.type == EventTypes.Member: - existing_members[ev.state_key] = True + if sync_config.filter_collection.lazy_load_members(): + # ensure we send membership events for heroes if needed + cache_key = (sync_config.user.to_string(), sync_config.device_id) + cache = self.lazy_loaded_members_cache.get(cache_key) - missing_hero_event_ids = [ - member_ids[hero_id] - for hero_id in summary['m.heroes'] - if ( - not cache.get(hero_id) and - hero_id not in existing_members + # track which members the client should already know about via LL: + # Ones which are already in state... + existing_members = set( + user_id for (typ, user_id) in state.keys() + if typ == EventTypes.Member ) - ] - missing_hero_state = yield self.store.get_events(missing_hero_event_ids) - missing_hero_state = missing_hero_state.values() + # ...or ones which are in the timeline... + for ev in batch.events: + if ev.type == EventTypes.Member: + existing_members.add(ev.state_key) + + # ...and then ensure any missing ones get included in state. + missing_hero_event_ids = [ + member_ids[hero_id] + for hero_id in summary['m.heroes'] + if ( + cache.get(hero_id) != member_ids[hero_id] and + hero_id not in existing_members + ) + ] + + missing_hero_state = yield self.store.get_events(missing_hero_event_ids) + missing_hero_state = missing_hero_state.values() - for s in missing_hero_state: - cache.set(s.state_key, True) - state[(EventTypes.Member, s.state_key)] = s + for s in missing_hero_state: + cache.set(s.state_key, s.event_id) + state[(EventTypes.Member, s.state_key)] = s defer.returnValue(summary) @@ -1532,7 +1535,6 @@ def _generate_room_entry(self, sync_result_builder, ignored_users, if events == [] and tags is None: return - since_token = sync_result_builder.since_token now_token = sync_result_builder.now_token sync_config = sync_result_builder.sync_config @@ -1575,10 +1577,13 @@ def _generate_room_entry(self, sync_result_builder, ignored_users, full_state=full_state ) - summary = None + summary = {} if ( sync_config.filter_collection.lazy_load_members() and - any(ev.type == EventTypes.Member for ev in batch.events) + ( + any(ev.type == EventTypes.Member for ev in batch.events) or + since_token is None + ) ): summary = yield self.compute_summary( room_id, sync_config, batch, state, now_token diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index f3efbbf4308f..08dffd774f81 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -1155,7 +1155,7 @@ def get_user_count_txn(self, txn): Args: txn : Transaction object Returns: - defer.Deferred: resolves to int + int : number of users """ sql_count = "SELECT COUNT(*) FROM users WHERE is_guest = 0;" txn.execute(sql_count) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 2d87b07b03c3..dd03c4168b6c 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -480,7 +480,8 @@ def get_state_for_events(self, event_ids, types, filtered_types=None): @defer.inlineCallbacks def get_state_ids_for_events(self, event_ids, types=None, filtered_types=None): """ - Get the state ids corresponding to a list of events + Get the state dicts corresponding to a list of events, containing the event_ids + of the state events (as opposed to the events themselves) Args: event_ids(list(str)): events whose state should be returned From b327e07e8df3cd4475dbb4a89def090d4c5ee7b0 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sun, 12 Aug 2018 18:49:37 +0200 Subject: [PATCH 25/31] fix use of get_recent_event_ids_for_room --- synapse/handlers/sync.py | 8 ++++---- synapse/storage/stream.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index a3062b062bae..b45c2163311a 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -516,17 +516,17 @@ def compute_summary(self, room_id, sync_config, batch, state, now_token): """ # FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305 - last_event_ids, _ = yield self.store.get_recent_event_ids_for_room( + last_events, _ = yield self.store.get_recent_event_ids_for_room( room_id, end_token=now_token.room_key, limit=1, ) - if not last_event_ids: + if not last_events: defer.returnValue(None) return - last_event_id = last_event_ids[-1] + last_event = last_events[-1] state_ids = yield self.store.get_state_ids_for_event( - last_event_id, [ + last_event.event_id, [ (EventTypes.Member, None), (EventTypes.Name, ''), (EventTypes.CanonicalAlias, ''), diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index b9f2b74ac64b..4c296d72c0f8 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -348,7 +348,7 @@ def get_recent_events_for_room(self, room_id, limit, end_token): end_token (str): The stream token representing now. Returns: - Deferred[tuple[list[FrozenEvent], str]]: Returns a list of + Deferred[tuple[list[FrozenEvent], str]]: Returns a list of events and a token pointing to the start of the returned events. The events returned are in ascending order. @@ -379,7 +379,7 @@ def get_recent_event_ids_for_room(self, room_id, limit, end_token): end_token (str): The stream token representing now. Returns: - Deferred[tuple[list[_EventDictReturn], str]]: Returns a list of + Deferred[tuple[list[_EventDictReturn], str]]: Returns a list of _EventDictReturn and a token pointing to the start of the returned events. The events returned are in ascending order. From 32bf4fa9534cdd3af2286cedd4a183ca72dc8d82 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 14 Aug 2018 15:41:16 +0200 Subject: [PATCH 26/31] return early rather than big if blocks --- synapse/handlers/sync.py | 98 +++++++++++++++++++++------------------- 1 file changed, 51 insertions(+), 47 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b45c2163311a..7610ef0272be 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -560,56 +560,60 @@ def compute_summary(self, room_id, sync_config, batch, state, now_token): summary["m.joined_member_count"] = len(joined_user_ids) summary["m.invited_member_count"] = len(invited_user_ids) - if not name_id and not canonical_alias_id: - # FIXME: order by stream ordering, not alphabetic - - me = sync_config.user.to_string() - if (joined_user_ids or invited_user_ids): - summary['m.heroes'] = sorted( - [ - user_id - for user_id in (joined_user_ids + invited_user_ids) - if user_id != me - ] - )[0:5] - else: - summary['m.heroes'] = sorted( - [user_id for user_id in member_ids.keys() if user_id != me] - )[0:5] - - if sync_config.filter_collection.lazy_load_members(): - # ensure we send membership events for heroes if needed - cache_key = (sync_config.user.to_string(), sync_config.device_id) - cache = self.lazy_loaded_members_cache.get(cache_key) - - # track which members the client should already know about via LL: - # Ones which are already in state... - existing_members = set( - user_id for (typ, user_id) in state.keys() - if typ == EventTypes.Member - ) - - # ...or ones which are in the timeline... - for ev in batch.events: - if ev.type == EventTypes.Member: - existing_members.add(ev.state_key) - - # ...and then ensure any missing ones get included in state. - missing_hero_event_ids = [ - member_ids[hero_id] - for hero_id in summary['m.heroes'] - if ( - cache.get(hero_id) != member_ids[hero_id] and - hero_id not in existing_members - ) + if name_id or canonical_alias_id: + defer.returnValue(summary) + + # FIXME: order by stream ordering, not alphabetic + + me = sync_config.user.to_string() + if (joined_user_ids or invited_user_ids): + summary['m.heroes'] = sorted( + [ + user_id + for user_id in (joined_user_ids + invited_user_ids) + if user_id != me ] + )[0:5] + else: + summary['m.heroes'] = sorted( + [user_id for user_id in member_ids.keys() if user_id != me] + )[0:5] + + if not sync_config.filter_collection.lazy_load_members(): + defer.returnValue(summary) + + # ensure we send membership events for heroes if needed + cache_key = (sync_config.user.to_string(), sync_config.device_id) + cache = self.lazy_loaded_members_cache.get(cache_key) + + # track which members the client should already know about via LL: + # Ones which are already in state... + existing_members = set( + user_id for (typ, user_id) in state.keys() + if typ == EventTypes.Member + ) + + # ...or ones which are in the timeline... + for ev in batch.events: + if ev.type == EventTypes.Member: + existing_members.add(ev.state_key) + + # ...and then ensure any missing ones get included in state. + missing_hero_event_ids = [ + member_ids[hero_id] + for hero_id in summary['m.heroes'] + if ( + cache.get(hero_id) != member_ids[hero_id] and + hero_id not in existing_members + ) + ] - missing_hero_state = yield self.store.get_events(missing_hero_event_ids) - missing_hero_state = missing_hero_state.values() + missing_hero_state = yield self.store.get_events(missing_hero_event_ids) + missing_hero_state = missing_hero_state.values() - for s in missing_hero_state: - cache.set(s.state_key, s.event_id) - state[(EventTypes.Member, s.state_key)] = s + for s in missing_hero_state: + cache.set(s.state_key, s.event_id) + state[(EventTypes.Member, s.state_key)] = s defer.returnValue(summary) From 859ad35d89bd7a892baaf5f6f714743cd9f5a243 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 14 Aug 2018 22:04:39 +0200 Subject: [PATCH 27/31] incorporate PR review --- synapse/handlers/message.py | 50 +++++++++++++++++++++++-------------- 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 6f938b877a10..88f1161683c6 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -31,7 +31,7 @@ from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.replication.http.send_event import ReplicationSendEventRestServlet -from synapse.types import RoomAlias, RoomStreamToken, UserID +from synapse.types import RoomAlias, UserID from synapse.util.async_helpers import Linearizer from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.logcontext import run_in_background @@ -110,7 +110,7 @@ def _check_in_room_or_world_readable(self, room_id, user_id): @defer.inlineCallbacks def get_state_events( self, user_id, room_id, types=None, filtered_types=None, - at_token=None, is_guest=False + at_token=None, is_guest=False, ): """Retrieve all state events for a given room. If the user is joined to the room then return the current state. If the user has @@ -130,37 +130,49 @@ def get_state_events( If None, `types` filtering is applied to all events. at_token(StreamToken|None): the stream token of the at which we are requesting the stats. If the user is not allowed to view the state as of that - stream token, no events are returned. If None, returns the current + stream token, we raise a 403 SynapseError. If None, returns the current state based on the current_state_events table. is_guest(bool): whether this user is a guest Returns: A list of dicts representing state events. [{}, {}, {}] + Raises: + SynapseError (404) if the at token does not yield an event + + AuthError (403) if the user doesn't have permission to view + members of this room. """ if at_token: - # we have to turn the token into a event - stream_ordering = RoomStreamToken.parse_stream_token( - at_token.room_key - ).stream - - # XXX: is this the right method to be using? What id we don't yet have an - # event after this stream token? - (stream_ordering, topo_ordering, event_id) = ( - yield self.store.get_room_event_after_stream_ordering( - room_id, stream_ordering - ) + # FIXME this claims to get the state at a stream position, but + # get_recent_events_for_room operates by topo ordering. This therefore + # does not reliably give you the state at the given stream position. + # (https://github.com/matrix-org/synapse/issues/3305) + last_events, _ = yield self.store.get_recent_events_for_room( + room_id, end_token=at_token.room_key, limit=1, ) - # check we are even allowed to be reading the room at this point - event = yield self.store.get_event(event_id, allow_none=True) - visible_events = yield filter_events_for_client(self.store, user_id, [event]) + if not last_events: + raise SynapseError( + 404, + "Can't find event for token %s" % at_token, + Codes.NOT_FOUND + ) + + visible_events = yield filter_events_for_client( + self.store, user_id, last_events + ) - if len(visible_events) > 0: + event = last_events[0] + if visible_events: room_state = yield self.store.get_state_for_events( [event.event_id], types, filtered_types=filtered_types, ) room_state = room_state[event.event_id] else: - room_state = {} + raise AuthError( + 403, "User %s not allowed to view events in room %s at token %s" % ( + user_id, room_id, at_token + ) + ) else: membership, membership_event_id = ( yield self.auth.check_in_room_or_world_readable( From f5189b95e345e2155788b9bb1408d79c1be583b7 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 15 Aug 2018 16:15:06 +0100 Subject: [PATCH 28/31] remove incorrectly reintroduced method --- synapse/handlers/message.py | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 88f1161683c6..e5dc267cb715 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -82,31 +82,6 @@ def get_room_data(self, user_id=None, room_id=None, defer.returnValue(data) - @defer.inlineCallbacks - def _check_in_room_or_world_readable(self, room_id, user_id): - try: - # check_user_was_in_room will return the most recent membership - # event for the user if: - # * The user is a non-guest user, and was ever in the room - # * The user is a guest user, and has joined the room - # else it will throw. - member_event = yield self.auth.check_user_was_in_room(room_id, user_id) - defer.returnValue((member_event.membership, member_event.event_id)) - return - except AuthError: - visibility = yield self.state_handler.get_current_state( - room_id, EventTypes.RoomHistoryVisibility, "" - ) - if ( - visibility and - visibility.content["history_visibility"] == "world_readable" - ): - defer.returnValue((Membership.JOIN, None)) - return - raise AuthError( - 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN - ) - @defer.inlineCallbacks def get_state_events( self, user_id, room_id, types=None, filtered_types=None, From 0d5770d734f03b2aef13fdd7aef79607fe2cef1a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 15 Aug 2018 16:22:30 +0100 Subject: [PATCH 29/31] cleanups --- synapse/handlers/message.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index e5dc267cb715..893c9bcdc4db 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -25,7 +25,13 @@ from twisted.internet.defer import succeed from synapse.api.constants import MAX_DEPTH, EventTypes, Membership -from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError +from synapse.api.errors import ( + AuthError, + Codes, + ConsentNotGivenError, + NotFoundError, + SynapseError, +) from synapse.api.urls import ConsentURIBuilder from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event @@ -111,7 +117,7 @@ def get_state_events( Returns: A list of dicts representing state events. [{}, {}, {}] Raises: - SynapseError (404) if the at token does not yield an event + NotFoundError (404) if the at token does not yield an event AuthError (403) if the user doesn't have permission to view members of this room. @@ -126,14 +132,10 @@ def get_state_events( ) if not last_events: - raise SynapseError( - 404, - "Can't find event for token %s" % at_token, - Codes.NOT_FOUND - ) + raise NotFoundError("Can't find event for token %s" % (at_token, )) visible_events = yield filter_events_for_client( - self.store, user_id, last_events + self.store, user_id, last_events, ) event = last_events[0] @@ -144,14 +146,15 @@ def get_state_events( room_state = room_state[event.event_id] else: raise AuthError( - 403, "User %s not allowed to view events in room %s at token %s" % ( - user_id, room_id, at_token + 403, + "User %s not allowed to view events in room %s at token %s" % ( + user_id, room_id, at_token, ) ) else: membership, membership_event_id = ( yield self.auth.check_in_room_or_world_readable( - room_id, user_id + room_id, user_id, ) ) From 73060743225333d7904da093284aea8f47ebd833 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 15 Aug 2018 16:45:47 +0100 Subject: [PATCH 30/31] remove spurious changelogs These are no longer relevant as the changes were in 0.33.2. --- changelog.d/3331.feature | 1 - changelog.d/3567.feature | 1 - 2 files changed, 2 deletions(-) delete mode 100644 changelog.d/3331.feature delete mode 100644 changelog.d/3567.feature diff --git a/changelog.d/3331.feature b/changelog.d/3331.feature deleted file mode 100644 index e574b9bcc3af..000000000000 --- a/changelog.d/3331.feature +++ /dev/null @@ -1 +0,0 @@ -add support for the include_redundant_members filter param as per MSC1227 diff --git a/changelog.d/3567.feature b/changelog.d/3567.feature deleted file mode 100644 index c74c1f57a90a..000000000000 --- a/changelog.d/3567.feature +++ /dev/null @@ -1 +0,0 @@ -make the /context API filter & lazy-load aware as per MSC1227 From c3cdc21f700a8d9fd68b6718de5536c0e72cabb1 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 16 Aug 2018 00:33:45 +0200 Subject: [PATCH 31/31] factor out get_lazy_loaded_members_cache --- synapse/handlers/sync.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b83b7b046108..ac3edf0cc9bc 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -593,7 +593,7 @@ def compute_summary(self, room_id, sync_config, batch, state, now_token): # ensure we send membership events for heroes if needed cache_key = (sync_config.user.to_string(), sync_config.device_id) - cache = self.lazy_loaded_members_cache.get(cache_key) + cache = self.get_lazy_loaded_members_cache(cache_key) # track which members the client should already know about via LL: # Ones which are already in state... @@ -626,6 +626,16 @@ def compute_summary(self, room_id, sync_config, batch, state, now_token): defer.returnValue(summary) + def get_lazy_loaded_members_cache(self, cache_key): + cache = self.lazy_loaded_members_cache.get(cache_key) + if cache is None: + logger.debug("creating LruCache for %r", cache_key) + cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE) + self.lazy_loaded_members_cache[cache_key] = cache + else: + logger.debug("found LruCache for %r", cache_key) + return cache + @defer.inlineCallbacks def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token, full_state): @@ -741,13 +751,7 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke if lazy_load_members and not include_redundant_members: cache_key = (sync_config.user.to_string(), sync_config.device_id) - cache = self.lazy_loaded_members_cache.get(cache_key) - if cache is None: - logger.debug("creating LruCache for %r", cache_key) - cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE) - self.lazy_loaded_members_cache[cache_key] = cache - else: - logger.debug("found LruCache for %r", cache_key) + cache = self.get_lazy_loaded_members_cache(cache_key) # if it's a new sync sequence, then assume the client has had # amnesia and doesn't want any recent lazy-loaded members