Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

speed up /members and add at= and membership params #3568

Merged
merged 31 commits into from
Aug 15, 2018
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7b7fd27
untested attempt at deduplicating lazy-loaded members
ara4n Jun 3, 2018
c341d81
Merge branch 'develop' into matthew/remove_redundant_lazy_members
ara4n Jun 10, 2018
f7bd5da
add include_redundant_members filter option & make it work
ara4n Jun 10, 2018
589e5aa
merge and apply isort
ara4n Jul 19, 2018
8e66dd1
merge in #2970
ara4n Jul 19, 2018
a08b37b
fix bad merge
ara4n Jul 19, 2018
7362e6c
make /context lazyload & filter aware
ara4n Jul 20, 2018
cd28d2f
speed up /members and add at= and membership params
ara4n Jul 20, 2018
a17f0b6
make it work
ara4n Jul 20, 2018
9ba6ef2
Merge branch 'matthew/lazy_load_apis' into matthew/members_at
ara4n Jul 20, 2018
c6117fa
make it work
ara4n Jul 20, 2018
8f1585d
make filtering work
ara4n Jul 21, 2018
f9c3c26
Merge branch 'matthew/filter_members' into matthew/remove_redundant_l…
ara4n Jul 23, 2018
c2870ab
Merge branch 'matthew/remove_redundant_lazy_members' into matthew/laz…
ara4n Jul 23, 2018
ffb7a4c
Merge branch 'matthew/lazy_load_apis' into matthew/members_at
ara4n Jul 23, 2018
7d99b0e
changelog
ara4n Jul 23, 2018
cd27a77
changelog
ara4n Jul 23, 2018
4018a6d
changelog
ara4n Jul 23, 2018
d32e5f8
Merge branch 'matthew/filter_members' into matthew/remove_redundant_l…
ara4n Jul 24, 2018
238f750
deduplicating redundant members via event_id rather than mxid
ara4n Jul 24, 2018
2a79e1a
Merge branch 'matthew/remove_redundant_lazy_members' into matthew/laz…
ara4n Jul 24, 2018
48f3e43
Merge branch 'matthew/lazy_load_apis' into matthew/members_at
ara4n Jul 24, 2018
08af91d
fix merge fail
ara4n Jul 24, 2018
e952368
incorporate review
ara4n Jul 30, 2018
2d9c062
Merge branch 'develop' into matthew/members_at
ara4n Aug 12, 2018
eebee08
convert /members?at= to take a stream token
ara4n Aug 12, 2018
859ad35
incorporate PR review
ara4n Aug 14, 2018
dd4498f
Merge branch 'develop' into matthew/members_at
ara4n Aug 14, 2018
217d5dd
Merge branch 'develop' into matthew/members_at
ara4n Aug 15, 2018
f5189b9
remove incorrectly reintroduced method
richvdh Aug 15, 2018
0d5770d
cleanups
richvdh Aug 15, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions synapse/api/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@
"lazy_load_members": {
"type": "boolean"
},
"include_redundant_members": {
"type": "boolean"
},
}
}

Expand Down Expand Up @@ -267,6 +270,9 @@ def ephemeral_limit(self):
def lazy_load_members(self):
return self._room_state_filter.lazy_load_members()

def include_redundant_members(self):
return self._room_state_filter.include_redundant_members()

def filter_presence(self, events):
return self._presence_filter.filter(events)

Expand Down Expand Up @@ -426,6 +432,9 @@ def limit(self):
def lazy_load_members(self):
return self.filter_json.get("lazy_load_members", False)

def include_redundant_members(self):
return self.filter_json.get("include_redundant_members", False)


def _matches_wildcard(actual_value, filter_value):
if filter_value.endswith("*"):
Expand Down
27 changes: 23 additions & 4 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,14 +329,24 @@ def _check_in_room_or_world_readable(self, room_id, user_id):
)

@defer.inlineCallbacks
def get_state_events(self, user_id, room_id, is_guest=False):
def get_state_events(
self, user_id, room_id, types=None, filtered_types=None,
at_event=None, is_guest=False
):
"""Retrieve all state events for a given room. If the user is
joined to the room then return the current state. If the user has
left the room return the state events from when they left.
left the room return the state events from when they left. If an explicit
'at' parameter is passed, return the state events as of that event.

Args:
user_id(str): The user requesting state events.
room_id(str): The room ID to get all state events from.
types(list[(Str, (Str|None))]): the (type, state_key)s to return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lower-case str, and it can be None.

also: sorry, but can you copy the slightly longer/clearer descriptions from get_state_ids_for_events etc? It's really not obvious how this works from reading this description.

results for.
filtered_types(list[Str]): the list of types to apply the types filter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

to.
at_event(str): the event_id we are requesting the state as of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

str|None. Can you explain what it means if it is None?

is_guest(Boolean): whether this user is a guest
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you've been writing too much javascript :)

bool

Returns:
A list of dicts representing state events. [{}, {}, {}]
"""
Expand All @@ -345,10 +355,19 @@ def get_state_events(self, user_id, room_id, is_guest=False):
)

if membership == Membership.JOIN:
room_state = yield self.state_handler.get_current_state(room_id)
if at_event:
room_state = yield self.store.get_state_for_events(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we not need to check that the user has permission to view the room at this point?

[at_event], types, filtered_types=filtered_types
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gentle reminder to use trailing commas in this situation. don't mind if you want to leave these now they are done.

)
room_state = room_state[at_event]
else:
state_ids = yield self.store.get_filtered_current_state_ids(
room_id, types, filtered_types=filtered_types
)
room_state = yield self.store.get_events(state_ids.values())
elif membership == Membership.LEAVE:
room_state = yield self.store.get_state_for_events(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it feels confusing that at_event doesn't work in the Leave case.

[membership_event_id], None
[membership_event_id], types, filtered_types=filtered_types
)
room_state = room_state[membership_event_id]

Expand Down
24 changes: 21 additions & 3 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ def send(etype, content, **kwargs):

class RoomContextHandler(BaseHandler):
@defer.inlineCallbacks
def get_event_context(self, user, room_id, event_id, limit):
def get_event_context(self, user, room_id, event_id, limit, event_filter):
"""Retrieves events, pagination tokens and state around a given event
in a room.

Expand All @@ -407,6 +407,8 @@ def get_event_context(self, user, room_id, event_id, limit):
event_id (str)
limit (int): The maximum number of events to return in total
(excluding state).
event_filter (Filter|None): the filter to apply to the events returned
(excluding the target event_id)

Returns:
dict, or None if the event isn't found
Expand Down Expand Up @@ -441,7 +443,7 @@ def filter_evts(events):
)

results = yield self.store.get_events_around(
room_id, event_id, before_limit, after_limit
room_id, event_id, before_limit, after_limit, event_filter
)

results["events_before"] = yield filter_evts(results["events_before"])
Expand All @@ -453,8 +455,24 @@ def filter_evts(events):
else:
last_event_id = event_id

types = None
filtered_types = None
if event_filter and event_filter.lazy_load_members():
members = {}
for ev in (
results["events_before"] +
[results["event"]] +
results["events_after"]
):
members[ev.sender] = True
filtered_types = [EventTypes.Member]
types = [(EventTypes.Member, member) for member in members.keys()]

# XXX: why do we return the state as of the last event rather than the
# first? Shouldn't we be consistent with /sync?

state = yield self.store.get_state_for_events(
[last_event_id], None
[last_event_id], types, filtered_types=filtered_types
)
results["state"] = list(state[last_event_id].values())

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ def search(self, user, content, batch=None):
contexts = {}
for event in allowed_events:
res = yield self.store.get_events_around(
event.room_id, event.event_id, before_limit, after_limit
event.room_id, event.event_id, before_limit, after_limit,
)

logger.info(
Expand Down
69 changes: 65 additions & 4 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,23 @@
from synapse.push.clientformat import format_push_rules_for_user
from synapse.types import RoomStreamToken
from synapse.util.async import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure, measure_func
from synapse.visibility import filter_events_for_client

logger = logging.getLogger(__name__)

# Store the cache that tracks which lazy-loaded members have been sent to a given
# client for no more than 30 minutes.
LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000

# Remember the last 100 members we sent to a client for the purposes of
# avoiding redundantly sending the same lazy-loaded members to the client
LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100


SyncConfig = collections.namedtuple("SyncConfig", [
"user",
Expand Down Expand Up @@ -182,6 +192,12 @@ def __init__(self, hs):
self.response_cache = ResponseCache(hs, "sync")
self.state = hs.get_state_handler()

# ExpiringCache((User, Device)) -> LruCache(member mxid string)
self.lazy_loaded_members_cache = ExpiringCache(
"lazy_loaded_members_cache", self.clock,
max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE
)

def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise
Expand Down Expand Up @@ -505,9 +521,13 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke
with Measure(self.clock, "compute_state_delta"):

types = None
lazy_load_members = sync_config.filter_collection.lazy_load_members()
filtered_types = None

lazy_load_members = sync_config.filter_collection.lazy_load_members()
include_redundant_members = (
sync_config.filter_collection.include_redundant_members()
)

if lazy_load_members:
# We only request state for the members needed to display the
# timeline:
Expand All @@ -520,6 +540,20 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke
)
]

if not include_redundant_members:
# we can filter out redundant members based on their mxids (not
# their event_ids) at this point. We know we can do it based on
# mxid as this is an non-gappy incremental sync.

cache_key = (sync_config.user.to_string(), sync_config.device_id)
cache = self.lazy_loaded_members_cache.get(cache_key)
if cache is None:
logger.debug("creating LruCache for %r", cache_key)
cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_AGE)
self.lazy_loaded_members_cache[cache_key] = cache
else:
logger.debug("found LruCache for %r", cache_key)

# only apply the filtering to room members
filtered_types = [EventTypes.Member]

Expand Down Expand Up @@ -551,6 +585,11 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke
t: state_ids[t]
for t in state_ids if t[0] == EventTypes.Member
}

if not include_redundant_members:
# add any types we are about to send into our LruCache
for t in types:
cache.set(t[1], True)
else:
member_state_ids = {}

Expand Down Expand Up @@ -599,6 +638,11 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke
t: state_at_timeline_start[t]
for t in state_at_timeline_start if t[0] == EventTypes.Member
}

if not include_redundant_members:
# add any types we are about to send into our LruCache
for t in types:
cache.set(t[1], True)
else:
member_state_ids = {}

Expand All @@ -617,9 +661,26 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke
else:
state_ids = {}
if lazy_load_members:
# TODO: filter out redundant members based on their mxids (not their
# event_ids) at this point. We know we can do it based on mxid as this
# is an non-gappy incremental sync.
if not include_redundant_members:
# if it's a new sync sequence, then assume the client has had
# amnesia and doesn't want any recent lazy-loaded members
# de-duplicated.
if since_token is None:
logger.debug("clearing LruCache for %r", cache_key)
cache.clear()
else:
# only send members which aren't in our LruCache (either
# because they're new to this client or have been pushed out
# of the cache)
logger.debug("filtering types from %r...", types)
types = [
t for t in types if not cache.get(t[1])
]
logger.debug("...to %r", types)

# add any types we are about to send into our LruCache
for t in types:
cache.set(t[1], True)

if types:
state_ids = yield self.store.get_state_ids_for_event(
Expand Down
27 changes: 26 additions & 1 deletion synapse/rest/client/v1/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,15 +384,29 @@ def on_GET(self, request, room_id):
# TODO support Pagination stream API (limit/tokens)
requester = yield self.auth.get_user_by_req(request)
handler = self.handlers.message_handler

# request the state as of a given event
# useful for synchronising with /messages
at_event = parse_string(request, "at")

# let you filter down on particular memberships
membership = parse_string(request, "membership")
not_membership = parse_string(request, "not_membership")

events = yield handler.get_state_events(
room_id=room_id,
user_id=requester.user.to_string(),
at_event=at_event,
types=[(EventTypes.Member, None)],
)

chunk = []

for event in events:
if event["type"] != EventTypes.Member:
if (
(membership and event['content'].get("membership") != membership) or
(not_membership and event['content'].get("membership") == not_membership)
):
continue
chunk.append(event)

Expand All @@ -401,6 +415,8 @@ def on_GET(self, request, room_id):
}))


# deprecated in favour of /members?membership=join?
# except it does custom AS logic and has a simpler return format
class JoinedRoomMemberListRestServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/joined_members$")

Expand Down Expand Up @@ -533,11 +549,20 @@ def on_GET(self, request, room_id, event_id):

limit = parse_integer(request, "limit", default=10)

# for symmetry with /messages for now
filter_bytes = parse_string(request, "filter")
if filter_bytes:
filter_json = urlparse.unquote(filter_bytes).decode("UTF-8")
event_filter = Filter(json.loads(filter_json))
else:
event_filter = None

results = yield self.handlers.room_context_handler.get_event_context(
requester.user,
room_id,
event_id,
limit,
event_filter,
)

if not results:
Expand Down
Loading