Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

speed up /members and add at= and membership params #3568

Merged
merged 31 commits into from
Aug 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7b7fd27
untested attempt at deduplicating lazy-loaded members
ara4n Jun 3, 2018
c341d81
Merge branch 'develop' into matthew/remove_redundant_lazy_members
ara4n Jun 10, 2018
f7bd5da
add include_redundant_members filter option & make it work
ara4n Jun 10, 2018
589e5aa
merge and apply isort
ara4n Jul 19, 2018
8e66dd1
merge in #2970
ara4n Jul 19, 2018
a08b37b
fix bad merge
ara4n Jul 19, 2018
7362e6c
make /context lazyload & filter aware
ara4n Jul 20, 2018
cd28d2f
speed up /members and add at= and membership params
ara4n Jul 20, 2018
a17f0b6
make it work
ara4n Jul 20, 2018
9ba6ef2
Merge branch 'matthew/lazy_load_apis' into matthew/members_at
ara4n Jul 20, 2018
c6117fa
make it work
ara4n Jul 20, 2018
8f1585d
make filtering work
ara4n Jul 21, 2018
f9c3c26
Merge branch 'matthew/filter_members' into matthew/remove_redundant_l…
ara4n Jul 23, 2018
c2870ab
Merge branch 'matthew/remove_redundant_lazy_members' into matthew/laz…
ara4n Jul 23, 2018
ffb7a4c
Merge branch 'matthew/lazy_load_apis' into matthew/members_at
ara4n Jul 23, 2018
7d99b0e
changelog
ara4n Jul 23, 2018
cd27a77
changelog
ara4n Jul 23, 2018
4018a6d
changelog
ara4n Jul 23, 2018
d32e5f8
Merge branch 'matthew/filter_members' into matthew/remove_redundant_l…
ara4n Jul 24, 2018
238f750
deduplicating redundant members via event_id rather than mxid
ara4n Jul 24, 2018
2a79e1a
Merge branch 'matthew/remove_redundant_lazy_members' into matthew/laz…
ara4n Jul 24, 2018
48f3e43
Merge branch 'matthew/lazy_load_apis' into matthew/members_at
ara4n Jul 24, 2018
08af91d
fix merge fail
ara4n Jul 24, 2018
e952368
incorporate review
ara4n Jul 30, 2018
2d9c062
Merge branch 'develop' into matthew/members_at
ara4n Aug 12, 2018
eebee08
convert /members?at= to take a stream token
ara4n Aug 12, 2018
859ad35
incorporate PR review
ara4n Aug 14, 2018
dd4498f
Merge branch 'develop' into matthew/members_at
ara4n Aug 14, 2018
217d5dd
Merge branch 'develop' into matthew/members_at
ara4n Aug 15, 2018
f5189b9
remove incorrectly reintroduced method
richvdh Aug 15, 2018
0d5770d
cleanups
richvdh Aug 15, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3331.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
add support for the include_redundant_members filter param as per MSC1227
1 change: 1 addition & 0 deletions changelog.d/3567.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
make the /context API filter & lazy-load aware as per MSC1227
1 change: 1 addition & 0 deletions changelog.d/3568.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
speed up /members API and add `at` and `membership` params as per MSC1227
88 changes: 76 additions & 12 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@
from twisted.internet.defer import succeed

from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
from synapse.api.errors import (
AuthError,
Codes,
ConsentNotGivenError,
NotFoundError,
SynapseError,
)
from synapse.api.urls import ConsentURIBuilder
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
Expand All @@ -36,6 +42,7 @@
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
from synapse.visibility import filter_events_for_client

from ._base import BaseHandler

Expand Down Expand Up @@ -82,28 +89,85 @@ def get_room_data(self, user_id=None, room_id=None,
defer.returnValue(data)

@defer.inlineCallbacks
def get_state_events(self, user_id, room_id, is_guest=False):
def get_state_events(
self, user_id, room_id, types=None, filtered_types=None,
at_token=None, is_guest=False,
):
"""Retrieve all state events for a given room. If the user is
joined to the room then return the current state. If the user has
left the room return the state events from when they left.
left the room return the state events from when they left. If an explicit
'at' parameter is passed, return the state events as of that event, if
visible.

Args:
user_id(str): The user requesting state events.
room_id(str): The room ID to get all state events from.
types(list[(str, str|None)]|None): List of (type, state_key) tuples
which are used to filter the state fetched. If `state_key` is None,
all events are returned of the given type.
May be None, which matches any key.
filtered_types(list[str]|None): Only apply filtering via `types` to this
list of event types. Other types of events are returned unfiltered.
If None, `types` filtering is applied to all events.
at_token(StreamToken|None): the stream token of the at which we are requesting
the stats. If the user is not allowed to view the state as of that
stream token, we raise a 403 SynapseError. If None, returns the current
state based on the current_state_events table.
is_guest(bool): whether this user is a guest
Returns:
A list of dicts representing state events. [{}, {}, {}]
Raises:
NotFoundError (404) if the at token does not yield an event

AuthError (403) if the user doesn't have permission to view
members of this room.
"""
membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
room_id, user_id
)
if at_token:
# FIXME this claims to get the state at a stream position, but
# get_recent_events_for_room operates by topo ordering. This therefore
# does not reliably give you the state at the given stream position.
# (https://github.com/matrix-org/synapse/issues/3305)
last_events, _ = yield self.store.get_recent_events_for_room(
room_id, end_token=at_token.room_key, limit=1,
)

if membership == Membership.JOIN:
room_state = yield self.state.get_current_state(room_id)
elif membership == Membership.LEAVE:
room_state = yield self.store.get_state_for_events(
[membership_event_id], None
if not last_events:
raise NotFoundError("Can't find event for token %s" % (at_token, ))

visible_events = yield filter_events_for_client(
self.store, user_id, last_events,
)

event = last_events[0]
if visible_events:
room_state = yield self.store.get_state_for_events(
[event.event_id], types, filtered_types=filtered_types,
)
room_state = room_state[event.event_id]
else:
raise AuthError(
403,
"User %s not allowed to view events in room %s at token %s" % (
user_id, room_id, at_token,
)
)
else:
membership, membership_event_id = (
yield self.auth.check_in_room_or_world_readable(
room_id, user_id,
)
)
room_state = room_state[membership_event_id]

if membership == Membership.JOIN:
state_ids = yield self.store.get_filtered_current_state_ids(
room_id, types, filtered_types=filtered_types,
)
room_state = yield self.store.get_events(state_ids.values())
elif membership == Membership.LEAVE:
room_state = yield self.store.get_state_for_events(
[membership_event_id], types, filtered_types=filtered_types,
)
room_state = room_state[membership_event_id]

now = self.clock.time_msec()
defer.returnValue(
Expand Down
32 changes: 29 additions & 3 deletions synapse/rest/client/v1/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
parse_string,
)
from synapse.streams.config import PaginationConfig
from synapse.types import RoomAlias, RoomID, ThirdPartyInstanceID, UserID
from synapse.types import RoomAlias, RoomID, StreamToken, ThirdPartyInstanceID, UserID

from .base import ClientV1RestServlet, client_path_patterns

Expand Down Expand Up @@ -384,15 +384,39 @@ def __init__(self, hs):
def on_GET(self, request, room_id):
# TODO support Pagination stream API (limit/tokens)
requester = yield self.auth.get_user_by_req(request)
events = yield self.message_handler.get_state_events(
handler = self.message_handler

# request the state as of a given event, as identified by a stream token,
# for consistency with /messages etc.
# useful for getting the membership in retrospect as of a given /sync
# response.
at_token_string = parse_string(request, "at")
if at_token_string is None:
at_token = None
else:
at_token = StreamToken.from_string(at_token_string)

# let you filter down on particular memberships.
# XXX: this may not be the best shape for this API - we could pass in a filter
# instead, except filters aren't currently aware of memberships.
# See https://github.com/matrix-org/matrix-doc/issues/1337 for more details.
membership = parse_string(request, "membership")
not_membership = parse_string(request, "not_membership")

events = yield handler.get_state_events(
room_id=room_id,
user_id=requester.user.to_string(),
at_token=at_token,
types=[(EventTypes.Member, None)],
)

chunk = []

for event in events:
if event["type"] != EventTypes.Member:
if (
(membership and event['content'].get("membership") != membership) or
(not_membership and event['content'].get("membership") == not_membership)
):
continue
chunk.append(event)

Expand All @@ -401,6 +425,8 @@ def on_GET(self, request, room_id):
}))


# deprecated in favour of /members?membership=join?
# except it does custom AS logic and has a simpler return format
class JoinedRoomMemberListRestServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/joined_members$")

Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1911,7 +1911,7 @@ def _purge_history_txn(
max_depth = max(row[0] for row in rows)

if max_depth <= token.topological:
# We need to ensure we don't delete all the events from the datanase
# We need to ensure we don't delete all the events from the database
# otherwise we wouldn't be able to send any events (due to not
# having any backwards extremeties)
raise SynapseError(
Expand Down
66 changes: 64 additions & 2 deletions synapse/storage/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,69 @@ def _get_current_state_ids_txn(txn):
_get_current_state_ids_txn,
)

# FIXME: how should this be cached?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understanding the caching stuff to know how best to handle this - or whether this method should be combined with get_current_state_ids above, and somehow share the same caching.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you've got three choices:

  1. treat each call to get_filtered_current_state_ids as a separate cacheable thing - so for example if you do a lookup of get_filtered_current_state_ids(room, None, None), a lookup of get_filtered_current_state_ids(room, [(t, k)], None) would not hit the cache. To do this, you just need an @cached decorator, plus invalidation calls.

  2. Implement this in terms of a call to get_current_state_ids followed by a filter. This might increase cache hits, but cache misses will of course be very expensive.

  3. use a DictionaryCache, like _state_group_cache uses, since DictionaryCache has support for partial caches. The keys would be room ids rather than state group ids. again you'd need to consider invalidation.

  4. Not bother with caching and see how much it hurts?

def get_filtered_current_state_ids(self, room_id, types, filtered_types=None):
"""Get the current state event of a given type for a room based on the
current_state_events table. This may not be as up-to-date as the result
of doing a fresh state resolution as per state_handler.get_current_state
Args:
room_id (str)
types (list[(Str, (Str|None))]): List of (type, state_key) tuples
which are used to filter the state fetched. `state_key` may be
None, which matches any `state_key`
filtered_types (list[Str]|None): List of types to apply the above filter to.
Returns:
deferred: dict of (type, state_key) -> event
"""

include_other_types = False if filtered_types is None else True

def _get_filtered_current_state_ids_txn(txn):
results = {}
sql = """SELECT type, state_key, event_id FROM current_state_events
WHERE room_id = ? %s"""
# Turns out that postgres doesn't like doing a list of OR's and
# is about 1000x slower, so we just issue a query for each specific
# type seperately.
if types:
clause_to_args = [
(
"AND type = ? AND state_key = ?",
(etype, state_key)
) if state_key is not None else (
"AND type = ?",
(etype,)
)
for etype, state_key in types
]

if include_other_types:
unique_types = set(filtered_types)
clause_to_args.append(
(
"AND type <> ? " * len(unique_types),
list(unique_types)
)
)
else:
# If types is None we fetch all the state, and so just use an
# empty where clause with no extra args.
clause_to_args = [("", [])]
for where_clause, where_args in clause_to_args:
args = [room_id]
args.extend(where_args)
txn.execute(sql % (where_clause,), args)
for row in txn:
typ, state_key, event_id = row
key = (intern_string(typ), intern_string(state_key))
results[key] = event_id
return results

return self.runInteraction(
"get_filtered_current_state_ids",
_get_filtered_current_state_ids_txn,
)

@cached(max_entries=10000, iterable=True)
def get_state_group_delta(self, state_group):
"""Given a state group try to return a previous group and a delta between
Expand Down Expand Up @@ -389,8 +452,7 @@ def get_state_for_events(self, event_ids, types, filtered_types=None):
If None, `types` filtering is applied to all events.

Returns:
deferred: A list of dicts corresponding to the event_ids given.
The dicts are mappings from (type, state_key) -> state_events
deferred: A dict of (event_id) -> (type, state_key) -> [state_events]
"""
event_to_groups = yield self._get_state_group_for_events(
event_ids,
Expand Down
2 changes: 1 addition & 1 deletion tests/storage/test_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def test_get_state_for_event(self):
{(e3.type, e3.state_key): e3, (e5.type, e5.state_key): e5}, state
)

# check we can use filter_types to grab a specific room member
# check we can use filtered_types to grab a specific room member
# without filtering out the other event types
state = yield self.store.get_state_for_event(
e5.event_id,
Expand Down