Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #2022 from matrix-org/erikj/no_op_sync
Browse files Browse the repository at this point in the history
Implement no op for room stream in sync
  • Loading branch information
erikjohnston authored Mar 16, 2017
2 parents 54d2b7e + da14665 commit 248eb46
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 52 deletions.
3 changes: 1 addition & 2 deletions synapse/app/synchrotron.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,7 @@ def notify_device_list_update(result):
position = row[position_index]
user_id = row[user_index]

rooms = yield store.get_rooms_for_user(user_id)
room_ids = [r.room_id for r in rooms]
room_ids = yield store.get_rooms_for_user(user_id)

notifier.on_new_event(
"device_list_key", position, rooms=room_ids,
Expand Down
14 changes: 6 additions & 8 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ def notify_device_update(self, user_id, device_ids):
user_id, device_ids, list(hosts)
)

rooms = yield self.store.get_rooms_for_user(user_id)
room_ids = [r.room_id for r in rooms]
room_ids = yield self.store.get_rooms_for_user(user_id)

yield self.notifier.on_new_event(
"device_list_key", position, rooms=room_ids,
Expand All @@ -270,8 +269,7 @@ def get_user_ids_changed(self, user_id, from_token):
user_id (str)
from_token (StreamToken)
"""
rooms = yield self.store.get_rooms_for_user(user_id)
room_ids = set(r.room_id for r in rooms)
room_ids = yield self.store.get_rooms_for_user(user_id)

# First we check if any devices have changed
changed = yield self.store.get_user_whose_devices_changed(
Expand Down Expand Up @@ -347,8 +345,8 @@ def on_federation_query_user_devices(self, user_id):
@defer.inlineCallbacks
def user_left_room(self, user, room_id):
user_id = user.to_string()
rooms = yield self.store.get_rooms_for_user(user_id)
if not rooms:
room_ids = yield self.store.get_rooms_for_user(user_id)
if not room_ids:
# We no longer share rooms with this user, so we'll no longer
# receive device updates. Mark this in DB.
yield self.store.mark_remote_user_device_list_as_unsubscribed(user_id)
Expand Down Expand Up @@ -404,8 +402,8 @@ def incoming_device_list_update(self, origin, edu_content):
logger.warning("Got device list update edu for %r from %r", user_id, origin)
return

rooms = yield self.store.get_rooms_for_user(user_id)
if not rooms:
room_ids = yield self.store.get_rooms_for_user(user_id)
if not room_ids:
# We don't share any rooms with this user. Ignore update, as we
# probably won't get any further updates.
return
Expand Down
17 changes: 9 additions & 8 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,9 +557,9 @@ def _get_interested_parties(self, states, calculate_remote_hosts=True):
room_ids_to_states = {}
users_to_states = {}
for state in states:
events = yield self.store.get_rooms_for_user(state.user_id)
for e in events:
room_ids_to_states.setdefault(e.room_id, []).append(state)
room_ids = yield self.store.get_rooms_for_user(state.user_id)
for room_id in room_ids:
room_ids_to_states.setdefault(room_id, []).append(state)

plist = yield self.store.get_presence_list_observers_accepted(state.user_id)
for u in plist:
Expand Down Expand Up @@ -913,11 +913,12 @@ def drop(self, observed_user, observer_user):
def is_visible(self, observed_user, observer_user):
"""Returns whether a user can see another user's presence.
"""
observer_rooms = yield self.store.get_rooms_for_user(observer_user.to_string())
observed_rooms = yield self.store.get_rooms_for_user(observed_user.to_string())

observer_room_ids = set(r.room_id for r in observer_rooms)
observed_room_ids = set(r.room_id for r in observed_rooms)
observer_room_ids = yield self.store.get_rooms_for_user(
observer_user.to_string()
)
observed_room_ids = yield self.store.get_rooms_for_user(
observed_user.to_string()
)

if observer_room_ids & observed_room_ids:
defer.returnValue(True)
Expand Down
8 changes: 4 additions & 4 deletions synapse/handlers/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ def _update_join_states(self, requester):

self.ratelimit(requester)

joins = yield self.store.get_rooms_for_user(
room_ids = yield self.store.get_rooms_for_user(
user.to_string(),
)

for j in joins:
for room_id in room_ids:
handler = self.hs.get_handlers().room_member_handler
try:
# Assume the user isn't a guest because we don't let guests set
Expand All @@ -171,12 +171,12 @@ def _update_join_states(self, requester):
yield handler.update_membership(
requester,
user,
j.room_id,
room_id,
"join", # We treat a profile update like a join.
ratelimit=False, # Try to hide that these events aren't atomic.
)
except Exception as e:
logger.warn(
"Failed to update join event for room %s - %s",
j.room_id, str(e.message)
room_id, str(e.message)
)
5 changes: 2 additions & 3 deletions synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,9 @@ def get_pagination_rows(self, user, config, key):
else:
from_key = None

rooms = yield self.store.get_rooms_for_user(user.to_string())
rooms = [room.room_id for room in rooms]
room_ids = yield self.store.get_rooms_for_user(user.to_string())
events = yield self.store.get_linearized_receipts_for_rooms(
rooms,
room_ids,
from_key=from_key,
to_key=to_key,
)
Expand Down
68 changes: 56 additions & 12 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user
from synapse.visibility import filter_events_for_client
from synapse.types import RoomStreamToken

from twisted.internet import defer

Expand Down Expand Up @@ -225,8 +226,7 @@ def ephemeral_by_room(self, sync_config, now_token, since_token=None):
with Measure(self.clock, "ephemeral_by_room"):
typing_key = since_token.typing_key if since_token else "0"

rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
room_ids = [room.room_id for room in rooms]
room_ids = yield self.store.get_rooms_for_user(sync_config.user.to_string())

typing_source = self.event_sources.sources["typing"]
typing, typing_key = yield typing_source.get_new_events(
Expand Down Expand Up @@ -568,16 +568,15 @@ def _generate_sync_entry_for_device_list(self, sync_result_builder):
since_token = sync_result_builder.since_token

if since_token and since_token.device_list_key:
rooms = yield self.store.get_rooms_for_user(user_id)
room_ids = set(r.room_id for r in rooms)
room_ids = yield self.store.get_rooms_for_user(user_id)

user_ids_changed = set()
changed = yield self.store.get_user_whose_devices_changed(
since_token.device_list_key
)
for other_user_id in changed:
other_rooms = yield self.store.get_rooms_for_user(other_user_id)
if room_ids.intersection(e.room_id for e in other_rooms):
other_room_ids = yield self.store.get_rooms_for_user(other_user_id)
if room_ids.intersection(other_room_ids):
user_ids_changed.add(other_user_id)

defer.returnValue(user_ids_changed)
Expand Down Expand Up @@ -765,6 +764,21 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro
)
sync_result_builder.now_token = now_token

# We check up front if anything has changed, if it hasn't then there is
# no point in going futher.
since_token = sync_result_builder.since_token
if not sync_result_builder.full_state:
if since_token and not ephemeral_by_room and not account_data_by_room:
have_changed = yield self._have_rooms_changed(sync_result_builder)
if not have_changed:
tags_by_room = yield self.store.get_updated_tags(
user_id,
since_token.account_data_key,
)
if not tags_by_room:
logger.debug("no-oping sync")
defer.returnValue(([], []))

ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
"m.ignored_user_list", user_id=user_id,
)
Expand All @@ -774,13 +788,12 @@ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_ro
else:
ignored_users = frozenset()

if sync_result_builder.since_token:
if since_token:
res = yield self._get_rooms_changed(sync_result_builder, ignored_users)
room_entries, invited, newly_joined_rooms = res

tags_by_room = yield self.store.get_updated_tags(
user_id,
sync_result_builder.since_token.account_data_key,
user_id, since_token.account_data_key,
)
else:
res = yield self._get_all_rooms(sync_result_builder, ignored_users)
Expand All @@ -805,7 +818,7 @@ def handle_room_entries(room_entry):

# Now we want to get any newly joined users
newly_joined_users = set()
if sync_result_builder.since_token:
if since_token:
for joined_sync in sync_result_builder.joined:
it = itertools.chain(
joined_sync.timeline.events, joined_sync.state.values()
Expand All @@ -817,6 +830,38 @@ def handle_room_entries(room_entry):

defer.returnValue((newly_joined_rooms, newly_joined_users))

@defer.inlineCallbacks
def _have_rooms_changed(self, sync_result_builder):
"""Returns whether there may be any new events that should be sent down
the sync. Returns True if there are.
"""
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token

assert since_token

# Get a list of membership change events that have happened.
rooms_changed = yield self.store.get_membership_changes_for_user(
user_id, since_token.room_key, now_token.room_key
)

if rooms_changed:
defer.returnValue(True)

app_service = self.store.get_app_service_by_user_id(user_id)
if app_service:
rooms = yield self.store.get_app_service_rooms(app_service)
joined_room_ids = set(r.room_id for r in rooms)
else:
joined_room_ids = yield self.store.get_rooms_for_user(user_id)

stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream
for room_id in joined_room_ids:
if self.store.has_room_changed_since(room_id, stream_id):
defer.returnValue(True)
defer.returnValue(False)

@defer.inlineCallbacks
def _get_rooms_changed(self, sync_result_builder, ignored_users):
"""Gets the the changes that have happened since the last sync.
Expand All @@ -841,8 +886,7 @@ def _get_rooms_changed(self, sync_result_builder, ignored_users):
rooms = yield self.store.get_app_service_rooms(app_service)
joined_room_ids = set(r.room_id for r in rooms)
else:
rooms = yield self.store.get_rooms_for_user(user_id)
joined_room_ids = set(r.room_id for r in rooms)
joined_room_ids = yield self.store.get_rooms_for_user(user_id)

# Get a list of membership change events that have happened.
rooms_changed = yield self.store.get_membership_changes_for_user(
Expand Down
6 changes: 2 additions & 4 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,7 @@ def wait_for_events(self, user_id, timeout, callback, room_ids=None,
if user_stream is None:
current_token = yield self.event_sources.get_current_token()
if room_ids is None:
rooms = yield self.store.get_rooms_for_user(user_id)
room_ids = [room.room_id for room in rooms]
room_ids = yield self.store.get_rooms_for_user(user_id)
user_stream = _NotifierUserStream(
user_id=user_id,
rooms=room_ids,
Expand Down Expand Up @@ -454,8 +453,7 @@ def check_for_updates(before_token, after_token):

@defer.inlineCallbacks
def _get_room_ids(self, user, explicit_room_id):
joined_rooms = yield self.store.get_rooms_for_user(user.to_string())
joined_room_ids = map(lambda r: r.room_id, joined_rooms)
joined_room_ids = yield self.store.get_rooms_for_user(user.to_string())
if explicit_room_id:
if explicit_room_id in joined_room_ids:
defer.returnValue(([explicit_room_id], True))
Expand Down
8 changes: 4 additions & 4 deletions synapse/push/push_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ def get_badge_count(store, user_id):

badge = len(invites)

for r in joins:
if r.room_id in my_receipts_by_room:
last_unread_event_id = my_receipts_by_room[r.room_id]
for room_id in joins:
if room_id in my_receipts_by_room:
last_unread_event_id = my_receipts_by_room[room_id]

notifs = yield (
store.get_unread_event_push_actions_by_room_for_user(
r.room_id, user_id, last_unread_event_id
room_id, user_id, last_unread_event_id
)
)
# return one badge count per conversation, as count per
Expand Down
3 changes: 1 addition & 2 deletions synapse/rest/client/v1/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,8 +748,7 @@ def __init__(self, hs):
def on_GET(self, request):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)

rooms = yield self.store.get_rooms_for_user(requester.user.to_string())
room_ids = set(r.room_id for r in rooms) # Ensure they're unique.
room_ids = yield self.store.get_rooms_for_user(requester.user.to_string())
defer.returnValue((200, {"joined_rooms": list(room_ids)}))


Expand Down
13 changes: 8 additions & 5 deletions synapse/storage/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,24 +274,27 @@ def _get_members_rows_txn(self, txn, room_id, membership=None, user_id=None):

return rows

@cached(max_entries=500000, iterable=True)
@cachedInlineCallbacks(max_entries=500000, iterable=True)
def get_rooms_for_user(self, user_id):
return self.get_rooms_for_user_where_membership_is(
"""Returns a set of room_ids the user is currently joined to
"""
rooms = yield self.get_rooms_for_user_where_membership_is(
user_id, membership_list=[Membership.JOIN],
)
defer.returnValue(frozenset(r.room_id for r in rooms))

@cachedInlineCallbacks(max_entries=500000, cache_context=True, iterable=True)
def get_users_who_share_room_with_user(self, user_id, cache_context):
"""Returns the set of users who share a room with `user_id`
"""
rooms = yield self.get_rooms_for_user(
room_ids = yield self.get_rooms_for_user(
user_id, on_invalidate=cache_context.invalidate,
)

user_who_share_room = set()
for room in rooms:
for room_id in room_ids:
user_ids = yield self.get_users_in_room(
room.room_id, on_invalidate=cache_context.invalidate,
room_id, on_invalidate=cache_context.invalidate,
)
user_who_share_room.update(user_ids)

Expand Down

0 comments on commit 248eb46

Please sign in to comment.