Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Implement no op for room stream in sync #2022

Merged
merged 4 commits into from
Mar 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions synapse/app/synchrotron.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,7 @@ def notify_device_list_update(result):
position = row[position_index]
user_id = row[user_index]

rooms = yield store.get_rooms_for_user(user_id)
room_ids = [r.room_id for r in rooms]
room_ids = yield store.get_rooms_for_user(user_id)

notifier.on_new_event(
"device_list_key", position, rooms=room_ids,
Expand Down
14 changes: 6 additions & 8 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ def notify_device_update(self, user_id, device_ids):
user_id, device_ids, list(hosts)
)

rooms = yield self.store.get_rooms_for_user(user_id)
room_ids = [r.room_id for r in rooms]
room_ids = yield self.store.get_rooms_for_user(user_id)

yield self.notifier.on_new_event(
"device_list_key", position, rooms=room_ids,
Expand All @@ -270,8 +269,7 @@ def get_user_ids_changed(self, user_id, from_token):
user_id (str)
from_token (StreamToken)
"""
rooms = yield self.store.get_rooms_for_user(user_id)
room_ids = set(r.room_id for r in rooms)
room_ids = yield self.store.get_rooms_for_user(user_id)

# First we check if any devices have changed
changed = yield self.store.get_user_whose_devices_changed(
Expand Down Expand Up @@ -347,8 +345,8 @@ def on_federation_query_user_devices(self, user_id):
@defer.inlineCallbacks
def user_left_room(self, user, room_id):
user_id = user.to_string()
rooms = yield self.store.get_rooms_for_user(user_id)
if not rooms:
room_ids = yield self.store.get_rooms_for_user(user_id)
if not room_ids:
# We no longer share rooms with this user, so we'll no longer
# receive device updates. Mark this in DB.
yield self.store.mark_remote_user_device_list_as_unsubscribed(user_id)
Expand Down Expand Up @@ -404,8 +402,8 @@ def incoming_device_list_update(self, origin, edu_content):
logger.warning("Got device list update edu for %r from %r", user_id, origin)
return

rooms = yield self.store.get_rooms_for_user(user_id)
if not rooms:
room_ids = yield self.store.get_rooms_for_user(user_id)
if not room_ids:
# We don't share any rooms with this user. Ignore update, as we
# probably won't get any further updates.
return
Expand Down
17 changes: 9 additions & 8 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,9 +557,9 @@ def _get_interested_parties(self, states, calculate_remote_hosts=True):
room_ids_to_states = {}
users_to_states = {}
for state in states:
events = yield self.store.get_rooms_for_user(state.user_id)
for e in events:
room_ids_to_states.setdefault(e.room_id, []).append(state)
room_ids = yield self.store.get_rooms_for_user(state.user_id)
for room_id in room_ids:
room_ids_to_states.setdefault(room_id, []).append(state)

plist = yield self.store.get_presence_list_observers_accepted(state.user_id)
for u in plist:
Expand Down Expand Up @@ -913,11 +913,12 @@ def drop(self, observed_user, observer_user):
def is_visible(self, observed_user, observer_user):
"""Returns whether a user can see another user's presence.
"""
observer_rooms = yield self.store.get_rooms_for_user(observer_user.to_string())
observed_rooms = yield self.store.get_rooms_for_user(observed_user.to_string())

observer_room_ids = set(r.room_id for r in observer_rooms)
observed_room_ids = set(r.room_id for r in observed_rooms)
observer_room_ids = yield self.store.get_rooms_for_user(
observer_user.to_string()
)
observed_room_ids = yield self.store.get_rooms_for_user(
observed_user.to_string()
)

if observer_room_ids & observed_room_ids:
defer.returnValue(True)
Expand Down
8 changes: 4 additions & 4 deletions synapse/handlers/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ def _update_join_states(self, requester):

self.ratelimit(requester)

joins = yield self.store.get_rooms_for_user(
room_ids = yield self.store.get_rooms_for_user(
user.to_string(),
)

for j in joins:
for room_id in room_ids:
handler = self.hs.get_handlers().room_member_handler
try:
# Assume the user isn't a guest because we don't let guests set
Expand All @@ -171,12 +171,12 @@ def _update_join_states(self, requester):
yield handler.update_membership(
requester,
user,
j.room_id,
room_id,
"join", # We treat a profile update like a join.
ratelimit=False, # Try to hide that these events aren't atomic.
)
except Exception as e:
logger.warn(
"Failed to update join event for room %s - %s",
j.room_id, str(e.message)
room_id, str(e.message)
)
5 changes: 2 additions & 3 deletions synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,9 @@ def get_pagination_rows(self, user, config, key):
else:
from_key = None

rooms = yield self.store.get_rooms_for_user(user.to_string())
rooms = [room.room_id for room in rooms]
room_ids = yield self.store.get_rooms_for_user(user.to_string())
events = yield self.store.get_linearized_receipts_for_rooms(
rooms,
room_ids,
from_key=from_key,
to_key=to_key,
)
Expand Down
68 changes: 56 additions & 12 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user
from synapse.visibility import filter_events_for_client
from synapse.types import RoomStreamToken

from twisted.internet import defer

Expand Down Expand Up @@ -225,8 +226,7 @@ def ephemeral_by_room(self, sync_config, now_token, since_token=None):
with Measure(self.clock, "ephemeral_by_room"):
typing_key = since_token.typing_key if since_token else "0"

rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
room_ids = [room.room_id for room in rooms]
room_ids = yield self.store.get_rooms_for_user(sync_config.user.to_string())

typing_source = self.event_sources.sources["typing"]
typing, typing_key = yield typing_source.get_new_events(
Expand Down Expand Up @@ -568,16 +568,15 @@ def _generate_sync_entry_for_device_list(self, sync_result_builder):
since_token = sync_result_builder.since_token

if since_token and since_token.device_list_key:
rooms = yield self.store.get_rooms_for_user(user_id)
room_ids = set(r.room_id for r in rooms)
room_ids = yield self.store.get_rooms_for_user(user_id)

user_ids_changed = set()
changed = yield self.store.get_user_whose_devices_changed(
since_token.device_list_key
)
for other_user_id in changed:
other_rooms = yield self.store.get_rooms_for_user(other_user_id)
if room_ids.intersection(e.room_id for e in other_rooms):
other_room_ids = yield self.store.get_rooms_for_user(other_user_id)
if room_ids.intersection(other_room_ids):
user_ids_changed.add(other_user_id)

defer.returnValue(user_ids_changed)
Expand Down Expand Up @@ -765,6 +764,21 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro
)
sync_result_builder.now_token = now_token

# We check up front if anything has changed, if it hasn't then there is
# no point in going futher.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe comment on what expensive operations this avoids?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like that sort of comment would get out of date quickly, and wouldn't necessarily help that much.

since_token = sync_result_builder.since_token
if not sync_result_builder.full_state:
if since_token and not ephemeral_by_room and not account_data_by_room:
have_changed = yield self._have_rooms_changed(sync_result_builder)
if not have_changed:
tags_by_room = yield self.store.get_updated_tags(
user_id,
since_token.account_data_key,
)
if not tags_by_room:
logger.debug("no-oping sync")
defer.returnValue(([], []))

ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
"m.ignored_user_list", user_id=user_id,
)
Expand All @@ -774,13 +788,12 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro
else:
ignored_users = frozenset()

if sync_result_builder.since_token:
if since_token:
res = yield self._get_rooms_changed(sync_result_builder, ignored_users)
room_entries, invited, newly_joined_rooms = res

tags_by_room = yield self.store.get_updated_tags(
user_id,
sync_result_builder.since_token.account_data_key,
user_id, since_token.account_data_key,
)
else:
res = yield self._get_all_rooms(sync_result_builder, ignored_users)
Expand All @@ -805,7 +818,7 @@ def handle_room_entries(room_entry):

# Now we want to get any newly joined users
newly_joined_users = set()
if sync_result_builder.since_token:
if since_token:
for joined_sync in sync_result_builder.joined:
it = itertools.chain(
joined_sync.timeline.events, joined_sync.state.values()
Expand All @@ -817,6 +830,38 @@ def handle_room_entries(room_entry):

defer.returnValue((newly_joined_rooms, newly_joined_users))

@defer.inlineCallbacks
def _have_rooms_changed(self, sync_result_builder):
"""Returns whether there may be any new events that should be sent down
the sync. Returns True if there are.
"""
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token

assert since_token

# Get a list of membership change events that have happened.
rooms_changed = yield self.store.get_membership_changes_for_user(
user_id, since_token.room_key, now_token.room_key
)

if rooms_changed:
defer.returnValue(True)

app_service = self.store.get_app_service_by_user_id(user_id)
if app_service:
rooms = yield self.store.get_app_service_rooms(app_service)
joined_room_ids = set(r.room_id for r in rooms)
else:
joined_room_ids = yield self.store.get_rooms_for_user(user_id)

stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream
for room_id in joined_room_ids:
if self.store.has_room_changed_since(room_id, stream_id):
defer.returnValue(True)
defer.returnValue(False)

@defer.inlineCallbacks
def _get_rooms_changed(self, sync_result_builder, ignored_users):
"""Gets the the changes that have happened since the last sync.
Expand All @@ -841,8 +886,7 @@ def _get_rooms_changed(self, sync_result_builder, ignored_users):
rooms = yield self.store.get_app_service_rooms(app_service)
joined_room_ids = set(r.room_id for r in rooms)
else:
rooms = yield self.store.get_rooms_for_user(user_id)
joined_room_ids = set(r.room_id for r in rooms)
joined_room_ids = yield self.store.get_rooms_for_user(user_id)

# Get a list of membership change events that have happened.
rooms_changed = yield self.store.get_membership_changes_for_user(
Expand Down
6 changes: 2 additions & 4 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,7 @@ def wait_for_events(self, user_id, timeout, callback, room_ids=None,
if user_stream is None:
current_token = yield self.event_sources.get_current_token()
if room_ids is None:
rooms = yield self.store.get_rooms_for_user(user_id)
room_ids = [room.room_id for room in rooms]
room_ids = yield self.store.get_rooms_for_user(user_id)
user_stream = _NotifierUserStream(
user_id=user_id,
rooms=room_ids,
Expand Down Expand Up @@ -454,8 +453,7 @@ def check_for_updates(before_token, after_token):

@defer.inlineCallbacks
def _get_room_ids(self, user, explicit_room_id):
joined_rooms = yield self.store.get_rooms_for_user(user.to_string())
joined_room_ids = map(lambda r: r.room_id, joined_rooms)
joined_room_ids = yield self.store.get_rooms_for_user(user.to_string())
if explicit_room_id:
if explicit_room_id in joined_room_ids:
defer.returnValue(([explicit_room_id], True))
Expand Down
8 changes: 4 additions & 4 deletions synapse/push/push_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ def get_badge_count(store, user_id):

badge = len(invites)

for r in joins:
if r.room_id in my_receipts_by_room:
last_unread_event_id = my_receipts_by_room[r.room_id]
for room_id in joins:
if room_id in my_receipts_by_room:
last_unread_event_id = my_receipts_by_room[room_id]

notifs = yield (
store.get_unread_event_push_actions_by_room_for_user(
r.room_id, user_id, last_unread_event_id
room_id, user_id, last_unread_event_id
)
)
# return one badge count per conversation, as count per
Expand Down
3 changes: 1 addition & 2 deletions synapse/rest/client/v1/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,8 +748,7 @@ def __init__(self, hs):
def on_GET(self, request):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)

rooms = yield self.store.get_rooms_for_user(requester.user.to_string())
room_ids = set(r.room_id for r in rooms) # Ensure they're unique.
room_ids = yield self.store.get_rooms_for_user(requester.user.to_string())
defer.returnValue((200, {"joined_rooms": list(room_ids)}))


Expand Down
13 changes: 8 additions & 5 deletions synapse/storage/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,24 +274,27 @@ def _get_members_rows_txn(self, txn, room_id, membership=None, user_id=None):

return rows

@cached(max_entries=500000, iterable=True)
@cachedInlineCallbacks(max_entries=500000, iterable=True)
def get_rooms_for_user(self, user_id):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to have some docstring along the lines of:

"""Returns an immutable set of room_id strings for the rooms the user is joined to."""

return self.get_rooms_for_user_where_membership_is(
"""Returns a set of room_ids the user is currently joined to
"""
rooms = yield self.get_rooms_for_user_where_membership_is(
user_id, membership_list=[Membership.JOIN],
)
defer.returnValue(frozenset(r.room_id for r in rooms))

@cachedInlineCallbacks(max_entries=500000, cache_context=True, iterable=True)
def get_users_who_share_room_with_user(self, user_id, cache_context):
"""Returns the set of users who share a room with `user_id`
"""
rooms = yield self.get_rooms_for_user(
room_ids = yield self.get_rooms_for_user(
user_id, on_invalidate=cache_context.invalidate,
)

user_who_share_room = set()
for room in rooms:
for room_id in room_ids:
user_ids = yield self.get_users_in_room(
room.room_id, on_invalidate=cache_context.invalidate,
room_id, on_invalidate=cache_context.invalidate,
)
user_who_share_room.update(user_ids)

Expand Down