Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Make presence.get_new_events a bit faster #1876

Merged
merged 5 commits into from
Feb 2, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 15 additions & 29 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -1011,7 +1011,7 @@ def __init__(self, hs):
@defer.inlineCallbacks
@log_function
def get_new_events(self, user, from_key, room_ids=None, include_offline=True,
**kwargs):
explicit_room_id=None, **kwargs):
# The process for getting presence events are:
# 1. Get the rooms the user is in.
# 2. Get the list of user in the rooms.
Expand All @@ -1028,22 +1028,24 @@ def get_new_events(self, user, from_key, room_ids=None, include_offline=True,
user_id = user.to_string()
if from_key is not None:
from_key = int(from_key)
room_ids = room_ids or []

presence = self.get_presence_handler()
stream_change_cache = self.store.presence_stream_cache

if not room_ids:
rooms = yield self.store.get_rooms_for_user(user_id)
room_ids = set(e.room_id for e in rooms)
else:
room_ids = set(room_ids)

max_token = self.store.get_current_presence_token()

plist = yield self.store.get_presence_list_accepted(user.localpart)
friends = set(row["observed_user_id"] for row in plist)
friends.add(user_id) # So that we receive our own presence
users_interested_in = set(row["observed_user_id"] for row in plist)
users_interested_in.add(user_id) # So that we receive our own presence

users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id
)
users_interested_in.update(users_who_share_room)

if explicit_room_id:
user_ids = yield self.store.get_users_in_room(explicit_room_id)
users_interested_in.update(user_ids)

user_ids_changed = set()
changed = None
Expand All @@ -1055,35 +1057,19 @@ def get_new_events(self, user, from_key, room_ids=None, include_offline=True,
# work out if we share a room or they're in our presence list
get_updates_counter.inc("stream")
for other_user_id in changed:
if other_user_id in friends:
if other_user_id in users_interested_in:
user_ids_changed.add(other_user_id)
continue
other_rooms = yield self.store.get_rooms_for_user(other_user_id)
if room_ids.intersection(e.room_id for e in other_rooms):
user_ids_changed.add(other_user_id)
continue
else:
# Too many possible updates. Find all users we can see and check
# if any of them have changed.
get_updates_counter.inc("full")

user_ids_to_check = set()
for room_id in room_ids:
users = yield self.store.get_users_in_room(room_id)
user_ids_to_check.update(users)

user_ids_to_check.update(friends)

# Always include yourself. Only really matters for when the user is
# not in any rooms, but still.
user_ids_to_check.add(user_id)

if from_key:
user_ids_changed = stream_change_cache.get_entities_changed(
user_ids_to_check, from_key,
users_interested_in, from_key,
)
else:
user_ids_changed = user_ids_to_check
user_ids_changed = users_interested_in

updates = yield presence.current_state_for_users(user_ids_changed)

Expand Down
1 change: 1 addition & 0 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ def get_new_events(
limit,
room_ids,
is_guest,
explicit_room_id=None,
):
# We just ignore the key for now.

Expand Down
1 change: 1 addition & 0 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ def check_for_updates(before_token, after_token):
limit=limit,
is_guest=is_peeking,
room_ids=room_ids,
explicit_room_id=explicit_room_id,
)

if name == "room":
Expand Down
17 changes: 17 additions & 0 deletions synapse/storage/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,23 @@ def get_rooms_for_user(self, user_id):
user_id, membership_list=[Membership.JOIN],
)

@cachedInlineCallbacks(max_entries=50000, cache_context=True, iterable=True)
def get_users_who_share_room_with_user(self, user_id, cache_context):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we are leaking the cache_context.invalidate callbacks if this cache is smaller than the get_users_in_room or get_rooms_for_user caches. I wonder if we need a max_entries at all for dependant caches like this one since it can't be bigger than the get_rooms_for_user or get_users_in_room caches.

"""Returns the set of users who share a room with `user_id`
"""
rooms = yield self.get_rooms_for_user(
user_id, on_invalidate=cache_context.invalidate,
)

user_who_share_room = set()
for room in rooms:
user_ids = yield self.get_users_in_room(
room.room_id, on_invalidate=cache_context.invalidate,
)
user_who_share_room.update(user_ids)

defer.returnValue(user_who_share_room)

def forget(self, user_id, room_id):
"""Indicate that user_id wishes to discard history for room_id."""
def f(txn):
Expand Down
2 changes: 2 additions & 0 deletions synapse/util/caches/descriptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,8 @@ def update_results_dict(res):


class _CacheContext(namedtuple("_CacheContext", ("cache", "key"))):
# We rely on _CacheContext implementing __eq__ and __hash__ sensibly,
# which namedtuple does for us.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe expound on the nature of the reliance?

Copy link
Contributor

@NegativeMjark NegativeMjark Feb 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I'm worried that future me will find this comment and will have forgotten what bit of code relies on these having __eq__ and __hash__ defined "sensibly" or what sensible means in this case)

def invalidate(self):
self.cache.invalidate(self.key)

Expand Down