From 7b7fd273e312b62f4a39aa8b38cfd87c716e1ade Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 4 Jun 2018 01:04:20 +0300 Subject: [PATCH 01/18] untested attempt at deduplicating lazy-loaded members as per the proposal; we can deduplicate redundant lazy-loaded members which are sent in the same sync sequence. we do this heuristically rather than requiring the client to somehow tell us which members it has chosen to cache, by instead caching the last N members sent to a client, and not sending them again. For now we hardcode N to 100. Each cache for a given (user,device) tuple is in turn cached for up to X minutes (to avoid the caches building up). For now we hardcode X to 30. --- synapse/handlers/sync.py | 40 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index aaf2a406df92..7c50f8f7022c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -19,6 +19,8 @@ from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure, measure_func from synapse.util.caches.response_cache import ResponseCache +from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.caches.lrucache import LruCache from synapse.push.clientformat import format_push_rules_for_user from synapse.visibility import filter_events_for_client from synapse.types import RoomStreamToken @@ -33,6 +35,14 @@ logger = logging.getLogger(__name__) +# Store the cache that tracks which lazy-loaded members have been sent to a given +# client for no more than 30 minutes. +LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000 + +# Remember the last 100 members we sent to a client for the purposes of +# avoiding redundantly sending the same lazy-loaded members to the client +LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100 + SyncConfig = collections.namedtuple("SyncConfig", [ "user", @@ -182,6 +192,12 @@ def __init__(self, hs): self.response_cache = ResponseCache(hs, "sync") self.state = hs.get_state_handler() + # ExpiringCache((User, Device)) -> LruCache(member mxid string) + self.lazy_loaded_members_cache = ExpiringCache( + "lazy_loaded_members_cache", self.clock, + max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE + ) + def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, full_state=False): """Get the sync for a client if we have new data for it now. Otherwise @@ -599,10 +615,32 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke else: state_ids = {} if lazy_load_members: - # TODO: filter out redundant members based on their mxids (not their + # we can filter out redundant members based on their mxids (not their # event_ids) at this point. We know we can do it based on mxid as this # is an non-gappy incremental sync. + cache_key = (sync_config.user, sync_config.device) + cache = self.lazy_loaded_members_cache.get(cache_key) + if cache is None: + cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_AGE) + self.lazy_loaded_members_cache[cache_key] = cache + + # if it's a new sync sequence, then assume the client has had + # amnesia and doesn't want any recent lazy-loaded members + # de-duplicated. + if since_token is None: + cache.clear() + else: + # only send members which aren't in our LruCache (either because + # they're new to this client or have been pushed out of the cache) + types = [ + t for t in types if not cache.get(t[1]) + ] + + # add any types we are about to send into our LruCache + for t in types: + cache.put(t[1], True) + # strip off the (None, None) and filter to just room members types = types[:-1] if types: From f7bd5dad09bf2d44622682fa006f7759ba016c06 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sun, 10 Jun 2018 14:39:29 +0300 Subject: [PATCH 02/18] add include_redundant_members filter option & make it work --- synapse/api/filtering.py | 9 +++++ synapse/handlers/sync.py | 79 ++++++++++++++++++++++++++-------------- 2 files changed, 61 insertions(+), 27 deletions(-) diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 1278f8c07c49..00955fef6e27 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -116,6 +116,9 @@ "lazy_load_members": { "type": "boolean" }, + "include_redundant_members": { + "type": "boolean" + }, } } @@ -266,6 +269,9 @@ def ephemeral_limit(self): def lazy_load_members(self): return self._room_state_filter.lazy_load_members() + def include_redundant_members(self): + return self._room_state_filter.include_redundant_members() + def filter_presence(self, events): return self._presence_filter.filter(events) @@ -425,6 +431,9 @@ def limit(self): def lazy_load_members(self): return self.filter_json.get("lazy_load_members", False) + def include_redundant_members(self): + return self.filter_json.get("include_redundant_members", False) + def _matches_wildcard(actual_value, filter_value): if filter_value.endswith("*"): diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 7c50f8f7022c..966aa24c813b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -515,6 +515,9 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke types = None member_state_ids = {} lazy_load_members = sync_config.filter_collection.lazy_load_members() + include_redundant_members = ( + sync_config.filter_collection.include_redundant_members() + ) if lazy_load_members: # We only request state for the members needed to display the @@ -528,10 +531,26 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke ) ] - # We can't remove redundant member types at this stage as it has + # We can't remove redundant member types at this stage as it sometimes has # to be done based on event_id, and we don't have the member # event ids until we've pulled them out of the DB. + # however, we can create the cache if needed: + if not include_redundant_members: + # we can filter out redundant members based on their mxids (not + # their event_ids) at this point. We know we can do it based on + # mxid as this is an non-gappy incremental sync. + + cache_key = (sync_config.user.to_string(), sync_config.device_id) + cache = self.lazy_loaded_members_cache.get(cache_key) + if cache is None: + logger.debug("creating LruCache for %r", cache_key) + cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_AGE) + self.lazy_loaded_members_cache[cache_key] = cache + else: + logger.debug("found LruCache for %r", cache_key) + + if not types: # an optimisation to stop needlessly trying to calculate # member_state_ids @@ -565,6 +584,11 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke for t in state_ids if t[0] == EventTypes.Member } + if not include_redundant_members: + # add any types we are about to send into our LruCache + for t in types[:-1]: + cache.set(t[1], True) + timeline_state = { (event.type, event.state_key): event.event_id for event in batch.events if event.is_state() @@ -600,6 +624,11 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke for t in state_at_timeline_start if t[0] == EventTypes.Member } + if not include_redundant_members: + # add any types we are about to send into our LruCache + for t in types[:-1]: + cache.set(t[1], True) + timeline_state = { (event.type, event.state_key): event.event_id for event in batch.events if event.is_state() @@ -615,34 +644,30 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke else: state_ids = {} if lazy_load_members: - # we can filter out redundant members based on their mxids (not their - # event_ids) at this point. We know we can do it based on mxid as this - # is an non-gappy incremental sync. - - cache_key = (sync_config.user, sync_config.device) - cache = self.lazy_loaded_members_cache.get(cache_key) - if cache is None: - cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_AGE) - self.lazy_loaded_members_cache[cache_key] = cache - - # if it's a new sync sequence, then assume the client has had - # amnesia and doesn't want any recent lazy-loaded members - # de-duplicated. - if since_token is None: - cache.clear() - else: - # only send members which aren't in our LruCache (either because - # they're new to this client or have been pushed out of the cache) - types = [ - t for t in types if not cache.get(t[1]) - ] - - # add any types we are about to send into our LruCache - for t in types: - cache.put(t[1], True) - # strip off the (None, None) and filter to just room members types = types[:-1] + + if not include_redundant_members: + # if it's a new sync sequence, then assume the client has had + # amnesia and doesn't want any recent lazy-loaded members + # de-duplicated. + if since_token is None: + logger.debug("clearing LruCache for %r", cache_key) + cache.clear() + else: + # only send members which aren't in our LruCache (either + # because they're new to this client or have been pushed out + # of the cache) + logger.debug("filtering types from %r...", types) + types = [ + t for t in types if not cache.get(t[1]) + ] + logger.debug("...to %r", types) + + # add any types we are about to send into our LruCache + for t in types: + cache.set(t[1], True) + if types: state_ids = yield self.store.get_state_ids_for_event( batch.events[0].event_id, types=types From a08b37b296bd9b37e0ba9b9224e265a4e145d1e4 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 20 Jul 2018 00:20:13 +0100 Subject: [PATCH 03/18] fix bad merge --- synapse/handlers/sync.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 0dad3626fbff..39cfb7a0080d 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -585,13 +585,13 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke t: state_ids[t] for t in state_ids if t[0] == EventTypes.Member } - else: - member_state_ids = {} if not include_redundant_members: # add any types we are about to send into our LruCache for t in types: cache.set(t[1], True) + else: + member_state_ids = {} timeline_state = { (event.type, event.state_key): event.event_id @@ -638,13 +638,13 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke t: state_at_timeline_start[t] for t in state_at_timeline_start if t[0] == EventTypes.Member } - else: - member_state_ids = {} if not include_redundant_members: # add any types we are about to send into our LruCache for t in types: cache.set(t[1], True) + else: + member_state_ids = {} timeline_state = { (event.type, event.state_key): event.event_id From 7362e6c6679414388484cbd3093dbd1b028cb8be Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 20 Jul 2018 02:53:18 +0100 Subject: [PATCH 04/18] make /context lazyload & filter aware --- synapse/handlers/room.py | 24 +++++++++++++++++++++--- synapse/handlers/search.py | 2 +- synapse/rest/client/v1/room.py | 9 +++++++++ synapse/storage/stream.py | 14 +++++++++++--- 4 files changed, 42 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index f67512078ba9..73237a8eeacb 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -397,7 +397,7 @@ def send(etype, content, **kwargs): class RoomContextHandler(BaseHandler): @defer.inlineCallbacks - def get_event_context(self, user, room_id, event_id, limit): + def get_event_context(self, user, room_id, event_id, limit, event_filter): """Retrieves events, pagination tokens and state around a given event in a room. @@ -407,6 +407,8 @@ def get_event_context(self, user, room_id, event_id, limit): event_id (str) limit (int): The maximum number of events to return in total (excluding state). + event_filter (Filter): the filter to apply to the events returned + (excluding the target event_id) Returns: dict, or None if the event isn't found @@ -441,7 +443,7 @@ def filter_evts(events): ) results = yield self.store.get_events_around( - room_id, event_id, before_limit, after_limit + room_id, event_id, before_limit, after_limit, event_filter ) results["events_before"] = yield filter_evts(results["events_before"]) @@ -453,8 +455,24 @@ def filter_evts(events): else: last_event_id = event_id + types = None + filtered_types = None + if event_filter and event_filter.lazy_load_members(): + members = {} + for ev in ( + results["events_before"] + + results["event"] + + results["events_after"] + ): + members[ev.sender] = True + filtered_types = [EventTypes.Member] + types = [(EventTypes.Member, member) for member in members.keys()] + + # XXX: why do we return the state as of the last event rather than the + # first? Shouldn't we be consistent with /sync? + state = yield self.store.get_state_for_events( - [last_event_id], None + [last_event_id], types, filtered_types=filtered_types ) results["state"] = list(state[last_event_id].values()) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 69ae9731d585..c464adbd0b9c 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -287,7 +287,7 @@ def search(self, user, content, batch=None): contexts = {} for event in allowed_events: res = yield self.store.get_events_around( - event.room_id, event.event_id, before_limit, after_limit + event.room_id, event.event_id, before_limit, after_limit, ) logger.info( diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index b9512a2b61bc..c8f48220eb33 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -533,11 +533,20 @@ def on_GET(self, request, room_id, event_id): limit = parse_integer(request, "limit", default=10) + # for symmetry with /messages for now + filter_bytes = parse_string(request, "filter") + if filter_bytes: + filter_json = urlparse.unquote(filter_bytes).decode("UTF-8") + event_filter = Filter(json.loads(filter_json)) + else: + event_filter = None + results = yield self.handlers.room_context_handler.get_event_context( requester.user, room_id, event_id, limit, + event_filter, ) if not results: diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 66856342f0f9..aa12c76af542 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -527,7 +527,9 @@ def _set_before_and_after(events, rows, topo_order=True): ) @defer.inlineCallbacks - def get_events_around(self, room_id, event_id, before_limit, after_limit): + def get_events_around( + self, room_id, event_id, before_limit, after_limit, event_filter=None, + ): """Retrieve events and pagination tokens around a given event in a room. @@ -536,6 +538,7 @@ def get_events_around(self, room_id, event_id, before_limit, after_limit): event_id (str) before_limit (int) after_limit (int) + event_filter (Filter|None) Returns: dict @@ -543,7 +546,7 @@ def get_events_around(self, room_id, event_id, before_limit, after_limit): results = yield self.runInteraction( "get_events_around", self._get_events_around_txn, - room_id, event_id, before_limit, after_limit + room_id, event_id, before_limit, after_limit, event_filter ) events_before = yield self._get_events( @@ -563,7 +566,9 @@ def get_events_around(self, room_id, event_id, before_limit, after_limit): "end": results["after"]["token"], }) - def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_limit): + def _get_events_around_txn( + self, txn, room_id, event_id, before_limit, after_limit, event_filter + ): """Retrieves event_ids and pagination tokens around a given event in a room. @@ -572,6 +577,7 @@ def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_lim event_id (str) before_limit (int) after_limit (int) + event_filter (Filter|None) Returns: dict @@ -601,11 +607,13 @@ def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_lim rows, start_token = self._paginate_room_events_txn( txn, room_id, before_token, direction='b', limit=before_limit, + event_filter=event_filter, ) events_before = [r.event_id for r in rows] rows, end_token = self._paginate_room_events_txn( txn, room_id, after_token, direction='f', limit=after_limit, + event_filter=event_filter, ) events_after = [r.event_id for r in rows] From cd28d2fc2f8f89a43431a6666d997d1120af27ba Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 20 Jul 2018 03:46:56 +0100 Subject: [PATCH 05/18] speed up /members and add at= and membership params --- synapse/handlers/message.py | 22 +++++++++-- synapse/rest/client/v1/room.py | 21 ++++++++++- synapse/storage/state.py | 67 ++++++++++++++++++++++++++++++++++ 3 files changed, 106 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a39b852cebfc..32e7337053e4 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -329,7 +329,10 @@ def _check_in_room_or_world_readable(self, room_id, user_id): ) @defer.inlineCallbacks - def get_state_events(self, user_id, room_id, is_guest=False): + def get_state_events( + self, user_id, room_id, types=None, filter_types=None, + at_event=None, is_guest=False + ): """Retrieve all state events for a given room. If the user is joined to the room then return the current state. If the user has left the room return the state events from when they left. @@ -337,6 +340,12 @@ def get_state_events(self, user_id, room_id, is_guest=False): Args: user_id(str): The user requesting state events. room_id(str): The room ID to get all state events from. + types(list[(Str, (Str|None))]): the (type, state_key)s to return + results for. + filter_types(list[Str]): the list of types to apply the types filter + to. + at_event(str): the event_id we are requesting the state as of + is_guest(Boolean): whether this user is a guest Returns: A list of dicts representing state events. [{}, {}, {}] """ @@ -345,10 +354,17 @@ def get_state_events(self, user_id, room_id, is_guest=False): ) if membership == Membership.JOIN: - room_state = yield self.state_handler.get_current_state(room_id) + if at_event: + room_state = yield self.store.get_state_for_events( + [at_event], types, filter_types=filter_types + ) + else: + room_state = yield self.store.get_current_state( + room_id, types, filter_types=filter_types + ) elif membership == Membership.LEAVE: room_state = yield self.store.get_state_for_events( - [membership_event_id], None + [membership_event_id], types, filter_types=filter_types ) room_state = room_state[membership_event_id] diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index c8f48220eb33..b84552e12fc7 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -384,15 +384,32 @@ def on_GET(self, request, room_id): # TODO support Pagination stream API (limit/tokens) requester = yield self.auth.get_user_by_req(request) handler = self.handlers.message_handler + + # request the state as of a given event + # useful for synchronising with /messages + at_event = parse_string(request, "at") + + # let you filter down on particular memberships + membership = parse_string(request, "membership") + not_membership = parse_string(request, "not_membership") + events = yield handler.get_state_events( room_id=room_id, user_id=requester.user.to_string(), + at_event=at_event, + types=[(EventTypes.Member, None)], ) chunk = [] for event in events: - if event["type"] != EventTypes.Member: + if ( + membership and + ( + event.content.get("membership") != membership or + event.content.get("membership") == not_membership + ) + ): continue chunk.append(event) @@ -401,6 +418,8 @@ def on_GET(self, request, room_id): })) +# deprecated in favour of /members?membership=join? +# except it does custom AS logic and has a simpler return format class JoinedRoomMemberListRestServlet(ClientV1RestServlet): PATTERNS = client_path_patterns("/rooms/(?P[^/]*)/joined_members$") diff --git a/synapse/storage/state.py b/synapse/storage/state.py index f09be7172dc0..0361f48f2ef0 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -89,6 +89,73 @@ def _get_current_state_ids_txn(txn): _get_current_state_ids_txn, ) + # FIXME: how should this be cached? + @defer.inlineCallbacks + def get_current_state(self, room_id, types, filtered_types=None): + """Get the current state event of a given type for a room based on the + current_state_events table. This may not be as up-to-date as the result + of doing a fresh state resolution as per state_handler.get_current_state + Args: + room_id (str) + types (list[(Str, (Str|None))]): List of (type, state_key) tuples + which are used to filter the state fetched. `state_key` may be + None, which matches any `state_key` + filtered_types (list[Str]|None): List of types to apply the above filter to. + Returns: + deferred: dict of (type, state_key) -> event + """ + + include_other_types = False if filtered_types is None else True + + def _get_current_state_txn(txn): + sql = """SELECT type, state_key, event_id FROM current_state_events + WHERE room_id = ? and %s""" + # Turns out that postgres doesn't like doing a list of OR's and + # is about 1000x slower, so we just issue a query for each specific + # type seperately. + if types: + clause_to_args = [ + ( + "AND type = ? AND state_key = ?", + (etype, state_key) + ) if state_key is not None else ( + "AND type = ?", + (etype,) + ) + for etype, state_key in types + ] + + if include_other_types: + unique_types = set(filtered_types) + clause_to_args.append( + ( + "AND type <> ? " * len(unique_types), + list(unique_types) + ) + ) + else: + # If types is None we fetch all the state, and so just use an + # empty where clause with no extra args. + clause_to_args = [("", [])] + for where_clause, where_args in clause_to_args: + args = [room_id] + args.extend(where_args) + txn.execute(sql % (where_clause,), args) + for row in txn: + typ, state_key, event_id = row + key = (typ, state_key) + results[intern_string(key)] = event_id + return results + + results = self.runInteraction( + "get_current_state", + _get_current_state_txn, + ) + for (key, event_id) in iteritems(results): + results[key] = yield self.store.get_event(event_id, allow_none=True) + + defer.returnValue(results) + @cached(max_entries=10000, iterable=True) def get_state_group_delta(self, state_group): """Given a state group try to return a previous group and a delta between From a17f0b63df1009af84dc02f43b733523ee915682 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 20 Jul 2018 14:44:37 +0100 Subject: [PATCH 06/18] make it work --- synapse/handlers/room.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 73237a8eeacb..9bfe610e1fda 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -407,7 +407,7 @@ def get_event_context(self, user, room_id, event_id, limit, event_filter): event_id (str) limit (int): The maximum number of events to return in total (excluding state). - event_filter (Filter): the filter to apply to the events returned + event_filter (Filter|None): the filter to apply to the events returned (excluding the target event_id) Returns: @@ -461,7 +461,7 @@ def filter_evts(events): members = {} for ev in ( results["events_before"] + - results["event"] + + [results["event"]] + results["events_after"] ): members[ev.sender] = True From c6117fab22896497037333c01f51035d888515db Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sat, 21 Jul 2018 00:55:27 +0100 Subject: [PATCH 07/18] make it work --- synapse/handlers/message.py | 17 ++++++++++------- synapse/storage/state.py | 25 ++++++++++--------------- tests/storage/test_state.py | 2 +- 3 files changed, 21 insertions(+), 23 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 32e7337053e4..50d69fcc7929 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -330,19 +330,20 @@ def _check_in_room_or_world_readable(self, room_id, user_id): @defer.inlineCallbacks def get_state_events( - self, user_id, room_id, types=None, filter_types=None, + self, user_id, room_id, types=None, filtered_types=None, at_event=None, is_guest=False ): """Retrieve all state events for a given room. If the user is joined to the room then return the current state. If the user has - left the room return the state events from when they left. + left the room return the state events from when they left. If an explicit + 'at' parameter is passed, return the state events as of that event. Args: user_id(str): The user requesting state events. room_id(str): The room ID to get all state events from. types(list[(Str, (Str|None))]): the (type, state_key)s to return results for. - filter_types(list[Str]): the list of types to apply the types filter + filtered_types(list[Str]): the list of types to apply the types filter to. at_event(str): the event_id we are requesting the state as of is_guest(Boolean): whether this user is a guest @@ -356,15 +357,17 @@ def get_state_events( if membership == Membership.JOIN: if at_event: room_state = yield self.store.get_state_for_events( - [at_event], types, filter_types=filter_types + [at_event], types, filtered_types=filtered_types ) + room_state = room_state[at_event] else: - room_state = yield self.store.get_current_state( - room_id, types, filter_types=filter_types + state_ids = yield self.store.get_filtered_current_state_ids( + room_id, types, filtered_types=filtered_types ) + room_state = yield self.store.get_events(state_ids.values()) elif membership == Membership.LEAVE: room_state = yield self.store.get_state_for_events( - [membership_event_id], types, filter_types=filter_types + [membership_event_id], types, filtered_types=filtered_types ) room_state = room_state[membership_event_id] diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 0361f48f2ef0..88b21ef80e11 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -90,8 +90,7 @@ def _get_current_state_ids_txn(txn): ) # FIXME: how should this be cached? - @defer.inlineCallbacks - def get_current_state(self, room_id, types, filtered_types=None): + def get_filtered_current_state_ids(self, room_id, types, filtered_types=None): """Get the current state event of a given type for a room based on the current_state_events table. This may not be as up-to-date as the result of doing a fresh state resolution as per state_handler.get_current_state @@ -107,9 +106,10 @@ def get_current_state(self, room_id, types, filtered_types=None): include_other_types = False if filtered_types is None else True - def _get_current_state_txn(txn): + def _get_filtered_current_state_ids_txn(txn): + results = {} sql = """SELECT type, state_key, event_id FROM current_state_events - WHERE room_id = ? and %s""" + WHERE room_id = ? %s""" # Turns out that postgres doesn't like doing a list of OR's and # is about 1000x slower, so we just issue a query for each specific # type seperately. @@ -143,18 +143,14 @@ def _get_current_state_txn(txn): txn.execute(sql % (where_clause,), args) for row in txn: typ, state_key, event_id = row - key = (typ, state_key) - results[intern_string(key)] = event_id + key = (intern_string(typ), intern_string(state_key)) + results[key] = event_id return results - results = self.runInteraction( - "get_current_state", - _get_current_state_txn, + return self.runInteraction( + "get_filtered_current_state_ids", + _get_filtered_current_state_ids_txn, ) - for (key, event_id) in iteritems(results): - results[key] = yield self.store.get_event(event_id, allow_none=True) - - defer.returnValue(results) @cached(max_entries=10000, iterable=True) def get_state_group_delta(self, state_group): @@ -452,8 +448,7 @@ def get_state_for_events(self, event_ids, types, filtered_types=None): If None, `types` filtering is applied to all events. Returns: - deferred: A list of dicts corresponding to the event_ids given. - The dicts are mappings from (type, state_key) -> state_events + deferred: A dict of (event_id) -> (type, state_key) -> [state_events] """ event_to_groups = yield self._get_state_group_for_events( event_ids, diff --git a/tests/storage/test_state.py b/tests/storage/test_state.py index 8924ba9f7f90..5f21b1ec5e9a 100644 --- a/tests/storage/test_state.py +++ b/tests/storage/test_state.py @@ -146,7 +146,7 @@ def test_get_state_for_event(self): (e5.type, e5.state_key): e5, }, state) - # check we can use filter_types to grab a specific room member + # check we can use filtered_types to grab a specific room member # without filtering out the other event types state = yield self.store.get_state_for_event( e5.event_id, [(EventTypes.Member, self.u_alice.to_string())], From 8f1585d207b7a98a3c7b0046df524a868ba77e45 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sat, 21 Jul 2018 12:19:15 +0100 Subject: [PATCH 08/18] make filtering work --- synapse/rest/client/v1/room.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index b84552e12fc7..8fa8230f9de4 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -404,11 +404,8 @@ def on_GET(self, request, room_id): for event in events: if ( - membership and - ( - event.content.get("membership") != membership or - event.content.get("membership") == not_membership - ) + (membership and event['content'].get("membership") != membership) or + (not_membership and event['content'].get("membership") == not_membership) ): continue chunk.append(event) From 7d99b0efcfdb0f65bf30227ce58df19c2a91cc96 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 23 Jul 2018 22:33:31 +0100 Subject: [PATCH 09/18] changelog --- changelog.d/3331.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3331.feature diff --git a/changelog.d/3331.feature b/changelog.d/3331.feature new file mode 100644 index 000000000000..e574b9bcc3af --- /dev/null +++ b/changelog.d/3331.feature @@ -0,0 +1 @@ +add support for the include_redundant_members filter param as per MSC1227 From cd27a77c4e51fcdaa9f373244c69a424b16ba8c7 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 23 Jul 2018 22:35:03 +0100 Subject: [PATCH 10/18] changelog --- changelog.d/3567.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3567.feature diff --git a/changelog.d/3567.feature b/changelog.d/3567.feature new file mode 100644 index 000000000000..c74c1f57a90a --- /dev/null +++ b/changelog.d/3567.feature @@ -0,0 +1 @@ +make the /context API filter & lazy-load aware as per MSC1227 From 4018a6df604ef3984cec94c88e3e116adb703f65 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 23 Jul 2018 22:36:10 +0100 Subject: [PATCH 11/18] changelog --- changelog.d/3568.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3568.feature diff --git a/changelog.d/3568.feature b/changelog.d/3568.feature new file mode 100644 index 000000000000..247f02ba4e13 --- /dev/null +++ b/changelog.d/3568.feature @@ -0,0 +1 @@ +speed up /members API and add `at` and `membership` params as per MSC1227 From 238f750da209056eea6f15ab3ec55a11cee9ab5e Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 24 Jul 2018 23:30:48 +0100 Subject: [PATCH 12/18] deduplicating redundant members via event_id rather than mxid --- synapse/handlers/sync.py | 81 +++++++++++++++++----------------------- 1 file changed, 34 insertions(+), 47 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 4c074c0d2c91..64db6613adf7 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -192,7 +192,7 @@ def __init__(self, hs): self.response_cache = ResponseCache(hs, "sync") self.state = hs.get_state_handler() - # ExpiringCache((User, Device)) -> LruCache(member mxid string) + # ExpiringCache((User, Device)) -> LruCache(membership event_id) self.lazy_loaded_members_cache = ExpiringCache( "lazy_loaded_members_cache", self.clock, max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE @@ -541,15 +541,11 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke ] if not include_redundant_members: - # we can filter out redundant members based on their mxids (not - # their event_ids) at this point. We know we can do it based on - # mxid as this is an non-gappy incremental sync. - cache_key = (sync_config.user.to_string(), sync_config.device_id) cache = self.lazy_loaded_members_cache.get(cache_key) if cache is None: logger.debug("creating LruCache for %r", cache_key) - cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_AGE) + cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE) self.lazy_loaded_members_cache[cache_key] = cache else: logger.debug("found LruCache for %r", cache_key) @@ -557,6 +553,11 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke # only apply the filtering to room members filtered_types = [EventTypes.Member] + timeline_state = { + (event.type, event.state_key): event.event_id + for event in batch.events if event.is_state() + } + if full_state: if batch: current_state_ids = yield self.store.get_state_ids_for_event( @@ -577,16 +578,6 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke state_ids = current_state_ids - if lazy_load_members and not include_redundant_members: - # add any types we are about to send into our LruCache - for t in types: - cache.set(t[1], True) - - timeline_state = { - (event.type, event.state_key): event.event_id - for event in batch.events if event.is_state() - } - state_ids = _calculate_state( timeline_contains=timeline_state, timeline_start=state_ids, @@ -610,16 +601,6 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke filtered_types=filtered_types, ) - if lazy_load_members and not include_redundant_members: - # add any types we are about to send into our LruCache - for t in types: - cache.set(t[1], True) - - timeline_state = { - (event.type, event.state_key): event.event_id - for event in batch.events if event.is_state() - } - # TODO: optionally filter out redundant membership events at this # point, to stop repeatedly sending members in every /sync as if # the client isn't tracking them. @@ -640,33 +621,39 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke else: state_ids = {} if lazy_load_members: - if not include_redundant_members: - # if it's a new sync sequence, then assume the client has had - # amnesia and doesn't want any recent lazy-loaded members - # de-duplicated. - if since_token is None: - logger.debug("clearing LruCache for %r", cache_key) - cache.clear() - else: - # only send members which aren't in our LruCache (either - # because they're new to this client or have been pushed out - # of the cache) - logger.debug("filtering types from %r...", types) - types = [ - t for t in types if not cache.get(t[1]) - ] - logger.debug("...to %r", types) - - # add any types we are about to send into our LruCache - for t in types: - cache.set(t[1], True) - if types: state_ids = yield self.store.get_state_ids_for_event( batch.events[0].event_id, types=types, filtered_types=filtered_types, ) + if lazy_load_members and not include_redundant_members: + # if it's a new sync sequence, then assume the client has had + # amnesia and doesn't want any recent lazy-loaded members + # de-duplicated. + if since_token is None: + logger.debug("clearing LruCache for %r", cache_key) + cache.clear() + else: + # only send members which aren't in our LruCache (either + # because they're new to this client or have been pushed out + # of the cache) + logger.debug("filtering state from %r...", state_ids) + state_ids = { + t: state_id + for t, state_id in state_ids.iteritems() + if not cache.get(state_id) + } + logger.debug("...to %r", state_ids) + + # add any member IDs we are about to send into our LruCache + for t, event_id in itertools.chain( + state_ids.items(), + timeline_state.items(), + ): + if t[0] == EventTypes.Member: + cache.set(event_id, True) + state = {} if state_ids: state = yield self.store.get_events(list(state_ids.values())) From 08af91dd02b3f738f66cbae877c564dbdc3ae19a Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 24 Jul 2018 23:59:47 +0100 Subject: [PATCH 13/18] fix merge fail --- synapse/rest/client/v1/room.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 0312ecd8efc7..e061da5a2167 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -384,7 +384,7 @@ def __init__(self, hs): def on_GET(self, request, room_id): # TODO support Pagination stream API (limit/tokens) requester = yield self.auth.get_user_by_req(request) - handler = self.handlers.message_handler + handler = self.message_handler # request the state as of a given event # useful for synchronising with /messages From e9523684cb8337aa8335056b1b84753a2fc1234f Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 30 Jul 2018 09:17:28 -0700 Subject: [PATCH 14/18] incorporate review --- synapse/handlers/message.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 5529ebd1a836..04fca6849a1c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -119,12 +119,17 @@ def get_state_events( Args: user_id(str): The user requesting state events. room_id(str): The room ID to get all state events from. - types(list[(Str, (Str|None))]): the (type, state_key)s to return - results for. - filtered_types(list[Str]): the list of types to apply the types filter - to. - at_event(str): the event_id we are requesting the state as of - is_guest(Boolean): whether this user is a guest + types(list[(str, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. + May be None, which matches any key. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. + at_event(str|None): the event_id we are requesting the state as of. + If None, returns the current state based on the current_state_events + table. + is_guest(bool): whether this user is a guest Returns: A list of dicts representing state events. [{}, {}, {}] """ @@ -135,17 +140,17 @@ def get_state_events( if membership == Membership.JOIN: if at_event: room_state = yield self.store.get_state_for_events( - [at_event], types, filtered_types=filtered_types + [at_event], types, filtered_types=filtered_types, ) room_state = room_state[at_event] else: state_ids = yield self.store.get_filtered_current_state_ids( - room_id, types, filtered_types=filtered_types + room_id, types, filtered_types=filtered_types, ) room_state = yield self.store.get_events(state_ids.values()) elif membership == Membership.LEAVE: room_state = yield self.store.get_state_for_events( - [membership_event_id], types, filtered_types=filtered_types + [membership_event_id], types, filtered_types=filtered_types, ) room_state = room_state[membership_event_id] From eebee084009212397e017a5e9ee8f1ff0b71c6c6 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sun, 12 Aug 2018 17:17:25 +0200 Subject: [PATCH 15/18] convert /members?at= to take a stream token this feels much clunkier and more complicated for both clients and senders than just querying based on event_id, and i'm failing to see any way it's more correct than querying based on event_id. also fixes a thinko to check whether the user is allowed to view membership as of the given token --- synapse/handlers/message.py | 61 ++++++++++++++++++++++++---------- synapse/rest/client/v1/room.py | 21 ++++++++---- synapse/storage/events.py | 2 +- 3 files changed, 59 insertions(+), 25 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 681f58ae8250..6f938b877a10 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -31,11 +31,12 @@ from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.replication.http.send_event import ReplicationSendEventRestServlet -from synapse.types import RoomAlias, UserID +from synapse.types import RoomAlias, RoomStreamToken, UserID from synapse.util.async_helpers import Linearizer from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.logcontext import run_in_background from synapse.util.metrics import measure_func +from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -109,12 +110,13 @@ def _check_in_room_or_world_readable(self, room_id, user_id): @defer.inlineCallbacks def get_state_events( self, user_id, room_id, types=None, filtered_types=None, - at_event=None, is_guest=False + at_token=None, is_guest=False ): """Retrieve all state events for a given room. If the user is joined to the room then return the current state. If the user has left the room return the state events from when they left. If an explicit - 'at' parameter is passed, return the state events as of that event. + 'at' parameter is passed, return the state events as of that event, if + visible. Args: user_id(str): The user requesting state events. @@ -126,33 +128,56 @@ def get_state_events( filtered_types(list[str]|None): Only apply filtering via `types` to this list of event types. Other types of events are returned unfiltered. If None, `types` filtering is applied to all events. - at_event(str|None): the event_id we are requesting the state as of. - If None, returns the current state based on the current_state_events - table. + at_token(StreamToken|None): the stream token of the at which we are requesting + the stats. If the user is not allowed to view the state as of that + stream token, no events are returned. If None, returns the current + state based on the current_state_events table. is_guest(bool): whether this user is a guest Returns: A list of dicts representing state events. [{}, {}, {}] """ - membership, membership_event_id = yield self.auth.check_in_room_or_world_readable( - room_id, user_id - ) + if at_token: + # we have to turn the token into a event + stream_ordering = RoomStreamToken.parse_stream_token( + at_token.room_key + ).stream + + # XXX: is this the right method to be using? What id we don't yet have an + # event after this stream token? + (stream_ordering, topo_ordering, event_id) = ( + yield self.store.get_room_event_after_stream_ordering( + room_id, stream_ordering + ) + ) - if membership == Membership.JOIN: - if at_event: + # check we are even allowed to be reading the room at this point + event = yield self.store.get_event(event_id, allow_none=True) + visible_events = yield filter_events_for_client(self.store, user_id, [event]) + + if len(visible_events) > 0: room_state = yield self.store.get_state_for_events( - [at_event], types, filtered_types=filtered_types, + [event.event_id], types, filtered_types=filtered_types, ) - room_state = room_state[at_event] + room_state = room_state[event.event_id] else: + room_state = {} + else: + membership, membership_event_id = ( + yield self.auth.check_in_room_or_world_readable( + room_id, user_id + ) + ) + + if membership == Membership.JOIN: state_ids = yield self.store.get_filtered_current_state_ids( room_id, types, filtered_types=filtered_types, ) room_state = yield self.store.get_events(state_ids.values()) - elif membership == Membership.LEAVE: - room_state = yield self.store.get_state_for_events( - [membership_event_id], types, filtered_types=filtered_types, - ) - room_state = room_state[membership_event_id] + elif membership == Membership.LEAVE: + room_state = yield self.store.get_state_for_events( + [membership_event_id], types, filtered_types=filtered_types, + ) + room_state = room_state[membership_event_id] now = self.clock.time_msec() defer.returnValue( diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index fad6d2b79266..fcc109176014 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -34,7 +34,7 @@ parse_string, ) from synapse.streams.config import PaginationConfig -from synapse.types import RoomAlias, RoomID, ThirdPartyInstanceID, UserID +from synapse.types import RoomAlias, RoomID, StreamToken, ThirdPartyInstanceID, UserID from .base import ClientV1RestServlet, client_path_patterns @@ -386,18 +386,27 @@ def on_GET(self, request, room_id): requester = yield self.auth.get_user_by_req(request) handler = self.message_handler - # request the state as of a given event - # useful for synchronising with /messages - at_event = parse_string(request, "at") + # request the state as of a given event, as identified by a stream token, + # for consistency with /messages etc. + # useful for getting the membership in retrospect as of a given /sync + # response. + at_token_string = parse_string(request, "at") + if at_token_string is None: + at_token = None + else: + at_token = StreamToken.from_string(at_token_string) - # let you filter down on particular memberships + # let you filter down on particular memberships. + # XXX: this may not be the best shape for this API - we could pass in a filter + # instead, except filters aren't currently aware of memberships. + # See https://github.com/matrix-org/matrix-doc/issues/1337 for more details. membership = parse_string(request, "membership") not_membership = parse_string(request, "not_membership") events = yield handler.get_state_events( room_id=room_id, user_id=requester.user.to_string(), - at_event=at_event, + at_token=at_token, types=[(EventTypes.Member, None)], ) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d4aa192a0a49..eb4744b640ed 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1993,7 +1993,7 @@ def _purge_history_txn( max_depth = max(row[0] for row in rows) if max_depth <= token.topological: - # We need to ensure we don't delete all the events from the datanase + # We need to ensure we don't delete all the events from the database # otherwise we wouldn't be able to send any events (due to not # having any backwards extremeties) raise SynapseError( From 859ad35d89bd7a892baaf5f6f714743cd9f5a243 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 14 Aug 2018 22:04:39 +0200 Subject: [PATCH 16/18] incorporate PR review --- synapse/handlers/message.py | 50 +++++++++++++++++++++++-------------- 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 6f938b877a10..88f1161683c6 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -31,7 +31,7 @@ from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.replication.http.send_event import ReplicationSendEventRestServlet -from synapse.types import RoomAlias, RoomStreamToken, UserID +from synapse.types import RoomAlias, UserID from synapse.util.async_helpers import Linearizer from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.logcontext import run_in_background @@ -110,7 +110,7 @@ def _check_in_room_or_world_readable(self, room_id, user_id): @defer.inlineCallbacks def get_state_events( self, user_id, room_id, types=None, filtered_types=None, - at_token=None, is_guest=False + at_token=None, is_guest=False, ): """Retrieve all state events for a given room. If the user is joined to the room then return the current state. If the user has @@ -130,37 +130,49 @@ def get_state_events( If None, `types` filtering is applied to all events. at_token(StreamToken|None): the stream token of the at which we are requesting the stats. If the user is not allowed to view the state as of that - stream token, no events are returned. If None, returns the current + stream token, we raise a 403 SynapseError. If None, returns the current state based on the current_state_events table. is_guest(bool): whether this user is a guest Returns: A list of dicts representing state events. [{}, {}, {}] + Raises: + SynapseError (404) if the at token does not yield an event + + AuthError (403) if the user doesn't have permission to view + members of this room. """ if at_token: - # we have to turn the token into a event - stream_ordering = RoomStreamToken.parse_stream_token( - at_token.room_key - ).stream - - # XXX: is this the right method to be using? What id we don't yet have an - # event after this stream token? - (stream_ordering, topo_ordering, event_id) = ( - yield self.store.get_room_event_after_stream_ordering( - room_id, stream_ordering - ) + # FIXME this claims to get the state at a stream position, but + # get_recent_events_for_room operates by topo ordering. This therefore + # does not reliably give you the state at the given stream position. + # (https://github.com/matrix-org/synapse/issues/3305) + last_events, _ = yield self.store.get_recent_events_for_room( + room_id, end_token=at_token.room_key, limit=1, ) - # check we are even allowed to be reading the room at this point - event = yield self.store.get_event(event_id, allow_none=True) - visible_events = yield filter_events_for_client(self.store, user_id, [event]) + if not last_events: + raise SynapseError( + 404, + "Can't find event for token %s" % at_token, + Codes.NOT_FOUND + ) + + visible_events = yield filter_events_for_client( + self.store, user_id, last_events + ) - if len(visible_events) > 0: + event = last_events[0] + if visible_events: room_state = yield self.store.get_state_for_events( [event.event_id], types, filtered_types=filtered_types, ) room_state = room_state[event.event_id] else: - room_state = {} + raise AuthError( + 403, "User %s not allowed to view events in room %s at token %s" % ( + user_id, room_id, at_token + ) + ) else: membership, membership_event_id = ( yield self.auth.check_in_room_or_world_readable( From f5189b95e345e2155788b9bb1408d79c1be583b7 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 15 Aug 2018 16:15:06 +0100 Subject: [PATCH 17/18] remove incorrectly reintroduced method --- synapse/handlers/message.py | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 88f1161683c6..e5dc267cb715 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -82,31 +82,6 @@ def get_room_data(self, user_id=None, room_id=None, defer.returnValue(data) - @defer.inlineCallbacks - def _check_in_room_or_world_readable(self, room_id, user_id): - try: - # check_user_was_in_room will return the most recent membership - # event for the user if: - # * The user is a non-guest user, and was ever in the room - # * The user is a guest user, and has joined the room - # else it will throw. - member_event = yield self.auth.check_user_was_in_room(room_id, user_id) - defer.returnValue((member_event.membership, member_event.event_id)) - return - except AuthError: - visibility = yield self.state_handler.get_current_state( - room_id, EventTypes.RoomHistoryVisibility, "" - ) - if ( - visibility and - visibility.content["history_visibility"] == "world_readable" - ): - defer.returnValue((Membership.JOIN, None)) - return - raise AuthError( - 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN - ) - @defer.inlineCallbacks def get_state_events( self, user_id, room_id, types=None, filtered_types=None, From 0d5770d734f03b2aef13fdd7aef79607fe2cef1a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 15 Aug 2018 16:22:30 +0100 Subject: [PATCH 18/18] cleanups --- synapse/handlers/message.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index e5dc267cb715..893c9bcdc4db 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -25,7 +25,13 @@ from twisted.internet.defer import succeed from synapse.api.constants import MAX_DEPTH, EventTypes, Membership -from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError +from synapse.api.errors import ( + AuthError, + Codes, + ConsentNotGivenError, + NotFoundError, + SynapseError, +) from synapse.api.urls import ConsentURIBuilder from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event @@ -111,7 +117,7 @@ def get_state_events( Returns: A list of dicts representing state events. [{}, {}, {}] Raises: - SynapseError (404) if the at token does not yield an event + NotFoundError (404) if the at token does not yield an event AuthError (403) if the user doesn't have permission to view members of this room. @@ -126,14 +132,10 @@ def get_state_events( ) if not last_events: - raise SynapseError( - 404, - "Can't find event for token %s" % at_token, - Codes.NOT_FOUND - ) + raise NotFoundError("Can't find event for token %s" % (at_token, )) visible_events = yield filter_events_for_client( - self.store, user_id, last_events + self.store, user_id, last_events, ) event = last_events[0] @@ -144,14 +146,15 @@ def get_state_events( room_state = room_state[event.event_id] else: raise AuthError( - 403, "User %s not allowed to view events in room %s at token %s" % ( - user_id, room_id, at_token + 403, + "User %s not allowed to view events in room %s at token %s" % ( + user_id, room_id, at_token, ) ) else: membership, membership_event_id = ( yield self.auth.check_in_room_or_world_readable( - room_id, user_id + room_id, user_id, ) )