-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
speed up /members and add at= and membership params #3568
Changes from 29 commits
7b7fd27
c341d81
f7bd5da
589e5aa
8e66dd1
a08b37b
7362e6c
cd28d2f
a17f0b6
9ba6ef2
c6117fa
8f1585d
f9c3c26
c2870ab
ffb7a4c
7d99b0e
cd27a77
4018a6d
d32e5f8
238f750
2a79e1a
48f3e43
08af91d
e952368
2d9c062
eebee08
859ad35
dd4498f
217d5dd
f5189b9
0d5770d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
add support for the include_redundant_members filter param as per MSC1227 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
make the /context API filter & lazy-load aware as per MSC1227 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
speed up /members API and add `at` and `membership` params as per MSC1227 |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,6 +36,7 @@ | |
from synapse.util.frozenutils import frozendict_json_encoder | ||
from synapse.util.logcontext import run_in_background | ||
from synapse.util.metrics import measure_func | ||
from synapse.visibility import filter_events_for_client | ||
|
||
from ._base import BaseHandler | ||
|
||
|
@@ -82,28 +83,113 @@ def get_room_data(self, user_id=None, room_id=None, | |
defer.returnValue(data) | ||
|
||
@defer.inlineCallbacks | ||
def get_state_events(self, user_id, room_id, is_guest=False): | ||
def _check_in_room_or_world_readable(self, room_id, user_id): | ||
try: | ||
# check_user_was_in_room will return the most recent membership | ||
# event for the user if: | ||
# * The user is a non-guest user, and was ever in the room | ||
# * The user is a guest user, and has joined the room | ||
# else it will throw. | ||
member_event = yield self.auth.check_user_was_in_room(room_id, user_id) | ||
defer.returnValue((member_event.membership, member_event.event_id)) | ||
return | ||
except AuthError: | ||
visibility = yield self.state_handler.get_current_state( | ||
room_id, EventTypes.RoomHistoryVisibility, "" | ||
) | ||
if ( | ||
visibility and | ||
visibility.content["history_visibility"] == "world_readable" | ||
): | ||
defer.returnValue((Membership.JOIN, None)) | ||
return | ||
raise AuthError( | ||
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN | ||
) | ||
|
||
@defer.inlineCallbacks | ||
def get_state_events( | ||
self, user_id, room_id, types=None, filtered_types=None, | ||
at_token=None, is_guest=False, | ||
): | ||
"""Retrieve all state events for a given room. If the user is | ||
joined to the room then return the current state. If the user has | ||
left the room return the state events from when they left. | ||
left the room return the state events from when they left. If an explicit | ||
'at' parameter is passed, return the state events as of that event, if | ||
visible. | ||
|
||
Args: | ||
user_id(str): The user requesting state events. | ||
room_id(str): The room ID to get all state events from. | ||
types(list[(str, str|None)]|None): List of (type, state_key) tuples | ||
which are used to filter the state fetched. If `state_key` is None, | ||
all events are returned of the given type. | ||
May be None, which matches any key. | ||
filtered_types(list[str]|None): Only apply filtering via `types` to this | ||
list of event types. Other types of events are returned unfiltered. | ||
If None, `types` filtering is applied to all events. | ||
at_token(StreamToken|None): the stream token of the at which we are requesting | ||
the stats. If the user is not allowed to view the state as of that | ||
stream token, we raise a 403 SynapseError. If None, returns the current | ||
state based on the current_state_events table. | ||
is_guest(bool): whether this user is a guest | ||
Returns: | ||
A list of dicts representing state events. [{}, {}, {}] | ||
Raises: | ||
SynapseError (404) if the at token does not yield an event | ||
|
||
AuthError (403) if the user doesn't have permission to view | ||
members of this room. | ||
""" | ||
membership, membership_event_id = yield self.auth.check_in_room_or_world_readable( | ||
room_id, user_id | ||
) | ||
if at_token: | ||
# FIXME this claims to get the state at a stream position, but | ||
# get_recent_events_for_room operates by topo ordering. This therefore | ||
# does not reliably give you the state at the given stream position. | ||
# (https://github.com/matrix-org/synapse/issues/3305) | ||
last_events, _ = yield self.store.get_recent_events_for_room( | ||
room_id, end_token=at_token.room_key, limit=1, | ||
) | ||
|
||
if membership == Membership.JOIN: | ||
room_state = yield self.state.get_current_state(room_id) | ||
elif membership == Membership.LEAVE: | ||
room_state = yield self.store.get_state_for_events( | ||
[membership_event_id], None | ||
if not last_events: | ||
raise SynapseError( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's use NotFoundError here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (done in 0d5770d) |
||
404, | ||
"Can't find event for token %s" % at_token, | ||
Codes.NOT_FOUND | ||
) | ||
|
||
visible_events = yield filter_events_for_client( | ||
self.store, user_id, last_events | ||
) | ||
room_state = room_state[membership_event_id] | ||
|
||
event = last_events[0] | ||
if visible_events: | ||
room_state = yield self.store.get_state_for_events( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we not need to check that the user has permission to view the room at this point? |
||
[event.event_id], types, filtered_types=filtered_types, | ||
) | ||
room_state = room_state[event.event_id] | ||
else: | ||
raise AuthError( | ||
403, "User %s not allowed to view events in room %s at token %s" % ( | ||
user_id, room_id, at_token | ||
) | ||
) | ||
else: | ||
membership, membership_event_id = ( | ||
yield self.auth.check_in_room_or_world_readable( | ||
room_id, user_id | ||
) | ||
) | ||
|
||
if membership == Membership.JOIN: | ||
state_ids = yield self.store.get_filtered_current_state_ids( | ||
room_id, types, filtered_types=filtered_types, | ||
) | ||
room_state = yield self.store.get_events(state_ids.values()) | ||
elif membership == Membership.LEAVE: | ||
room_state = yield self.store.get_state_for_events( | ||
[membership_event_id], types, filtered_types=filtered_types, | ||
) | ||
room_state = room_state[membership_event_id] | ||
|
||
now = self.clock.time_msec() | ||
defer.returnValue( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -116,6 +116,69 @@ def _get_current_state_ids_txn(txn): | |
_get_current_state_ids_txn, | ||
) | ||
|
||
# FIXME: how should this be cached? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understanding the caching stuff to know how best to handle this - or whether this method should be combined with get_current_state_ids above, and somehow share the same caching. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess you've got three choices:
|
||
def get_filtered_current_state_ids(self, room_id, types, filtered_types=None): | ||
"""Get the current state event of a given type for a room based on the | ||
current_state_events table. This may not be as up-to-date as the result | ||
of doing a fresh state resolution as per state_handler.get_current_state | ||
Args: | ||
room_id (str) | ||
types (list[(Str, (Str|None))]): List of (type, state_key) tuples | ||
which are used to filter the state fetched. `state_key` may be | ||
None, which matches any `state_key` | ||
filtered_types (list[Str]|None): List of types to apply the above filter to. | ||
Returns: | ||
deferred: dict of (type, state_key) -> event | ||
""" | ||
|
||
include_other_types = False if filtered_types is None else True | ||
|
||
def _get_filtered_current_state_ids_txn(txn): | ||
results = {} | ||
sql = """SELECT type, state_key, event_id FROM current_state_events | ||
WHERE room_id = ? %s""" | ||
# Turns out that postgres doesn't like doing a list of OR's and | ||
# is about 1000x slower, so we just issue a query for each specific | ||
# type seperately. | ||
if types: | ||
clause_to_args = [ | ||
( | ||
"AND type = ? AND state_key = ?", | ||
(etype, state_key) | ||
) if state_key is not None else ( | ||
"AND type = ?", | ||
(etype,) | ||
) | ||
for etype, state_key in types | ||
] | ||
|
||
if include_other_types: | ||
unique_types = set(filtered_types) | ||
clause_to_args.append( | ||
( | ||
"AND type <> ? " * len(unique_types), | ||
list(unique_types) | ||
) | ||
) | ||
else: | ||
# If types is None we fetch all the state, and so just use an | ||
# empty where clause with no extra args. | ||
clause_to_args = [("", [])] | ||
for where_clause, where_args in clause_to_args: | ||
args = [room_id] | ||
args.extend(where_args) | ||
txn.execute(sql % (where_clause,), args) | ||
for row in txn: | ||
typ, state_key, event_id = row | ||
key = (intern_string(typ), intern_string(state_key)) | ||
results[key] = event_id | ||
return results | ||
|
||
return self.runInteraction( | ||
"get_filtered_current_state_ids", | ||
_get_filtered_current_state_ids_txn, | ||
) | ||
|
||
@cached(max_entries=10000, iterable=True) | ||
def get_state_group_delta(self, state_group): | ||
"""Given a state group try to return a previous group and a delta between | ||
|
@@ -389,8 +452,7 @@ def get_state_for_events(self, event_ids, types, filtered_types=None): | |
If None, `types` filtering is applied to all events. | ||
|
||
Returns: | ||
deferred: A list of dicts corresponding to the event_ids given. | ||
The dicts are mappings from (type, state_key) -> state_events | ||
deferred: A dict of (event_id) -> (type, state_key) -> [state_events] | ||
""" | ||
event_to_groups = yield self._get_state_group_for_events( | ||
event_ids, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks like bad merge?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no? surely it's just the diffing algorithm inserting
_check_in_room_or_world_readable
beforeget_state_events
but getting confused by the@defer.inlineCallbacks
and so reusing the existing one rather than inserting it cleanly?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's been removed on develop; you are reintroducing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
("it" being _check_in_room_or_world_readable)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in f5189b9