Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #2013 from matrix-org/erikj/presence_FASTER
Browse files Browse the repository at this point in the history
Format presence events on the edges instead of reformatting them multiple times
  • Loading branch information
erikjohnston authored Mar 15, 2017
2 parents 0ad44ac + a8f96c6 commit 9d52719
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 53 deletions.
37 changes: 27 additions & 10 deletions synapse/api/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.api.errors import SynapseError
from synapse.storage.presence import UserPresenceState
from synapse.types import UserID, RoomID

from twisted.internet import defer
Expand Down Expand Up @@ -253,19 +254,35 @@ def check(self, event):
Returns:
bool: True if the event matches
"""
sender = event.get("sender", None)
if not sender:
# Presence events have their 'sender' in content.user_id
content = event.get("content")
# account_data has been allowed to have non-dict content, so check type first
if isinstance(content, dict):
sender = content.get("user_id")
# We usually get the full "events" as dictionaries coming through,
# except for presence which actually gets passed around as its own
# namedtuple type.
if isinstance(event, UserPresenceState):
sender = event.user_id
room_id = None
ev_type = "m.presence"
is_url = False
else:
sender = event.get("sender", None)
if not sender:
# Presence events had their 'sender' in content.user_id, but are
# now handled above. We don't know if anything else uses this
# form. TODO: Check this and probably remove it.
content = event.get("content")
# account_data has been allowed to have non-dict content, so
# check type first
if isinstance(content, dict):
sender = content.get("user_id")

room_id = event.get("room_id", None)
ev_type = event.get("type", None)
is_url = "url" in event.get("content", {})

return self.check_fields(
event.get("room_id", None),
room_id,
sender,
event.get("type", None),
"url" in event.get("content", {})
ev_type,
is_url,
)

def check_fields(self, room_id, sender, event_type, contains_url):
Expand Down
11 changes: 10 additions & 1 deletion synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from synapse.api.errors import AuthError, Codes
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.handlers.presence import format_user_presence_state
from synapse.streams.config import PaginationConfig
from synapse.types import (
UserID, StreamToken,
Expand Down Expand Up @@ -225,9 +226,17 @@ def handle_room(event):
"content": content,
})

now = self.clock.time_msec()

ret = {
"rooms": rooms_ret,
"presence": presence,
"presence": [
{
"type": "m.presence",
"content": format_user_presence_state(event, now),
}
for event in presence
],
"account_data": account_data_events,
"receipts": receipt,
"end": now_token.to_string(),
Expand Down
73 changes: 45 additions & 28 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from synapse.api.constants import PresenceState
from synapse.storage.presence import UserPresenceState

from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.logcontext import preserve_fn
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
Expand Down Expand Up @@ -719,9 +720,7 @@ def get_states(self, target_user_ids, as_event=False):
for state in updates
])
else:
defer.returnValue([
format_user_presence_state(state, now) for state in updates
])
defer.returnValue(updates)

@defer.inlineCallbacks
def set_state(self, target_user, state, ignore_status_msg=False):
Expand Down Expand Up @@ -795,6 +794,9 @@ def get_presence_list(self, observer_user, accepted=None):
as_event=False,
)

now = self.clock.time_msec()
results[:] = [format_user_presence_state(r, now) for r in results]

is_accepted = {
row["observed_user_id"]: row["accepted"] for row in presence_list
}
Expand Down Expand Up @@ -847,6 +849,7 @@ def invite_presence(self, observed_user, observer_user):
)

state_dict = yield self.get_state(observed_user, as_event=False)
state_dict = format_user_presence_state(state_dict, self.clock.time_msec())

self.federation.send_edu(
destination=observer_user.domain,
Expand Down Expand Up @@ -979,14 +982,18 @@ def should_notify(old_state, new_state):
return False


def format_user_presence_state(state, now):
def format_user_presence_state(state, now, include_user_id=True):
"""Convert UserPresenceState to a format that can be sent down to clients
and to other servers.
The "user_id" is optional so that this function can be used to format presence
updates for client /sync responses and for federation /send requests.
"""
content = {
"presence": state.state,
"user_id": state.user_id,
}
if include_user_id:
content["user_id"] = state.user_id
if state.last_active_ts:
content["last_active_ago"] = now - state.last_active_ts
if state.status_msg and state.state != PresenceState.OFFLINE:
Expand Down Expand Up @@ -1025,7 +1032,6 @@ def get_new_events(self, user, from_key, room_ids=None, include_offline=True,
# sending down the rare duplicate is not a concern.

with Measure(self.clock, "presence.get_new_events"):
user_id = user.to_string()
if from_key is not None:
from_key = int(from_key)

Expand All @@ -1034,18 +1040,7 @@ def get_new_events(self, user, from_key, room_ids=None, include_offline=True,

max_token = self.store.get_current_presence_token()

plist = yield self.store.get_presence_list_accepted(user.localpart)
users_interested_in = set(row["observed_user_id"] for row in plist)
users_interested_in.add(user_id) # So that we receive our own presence

users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id
)
users_interested_in.update(users_who_share_room)

if explicit_room_id:
user_ids = yield self.store.get_users_in_room(explicit_room_id)
users_interested_in.update(user_ids)
users_interested_in = yield self._get_interested_in(user, explicit_room_id)

user_ids_changed = set()
changed = None
Expand Down Expand Up @@ -1073,23 +1068,45 @@ def get_new_events(self, user, from_key, room_ids=None, include_offline=True,

updates = yield presence.current_state_for_users(user_ids_changed)

now = self.clock.time_msec()

defer.returnValue(([
{
"type": "m.presence",
"content": format_user_presence_state(s, now),
}
for s in updates.values()
if include_offline or s.state != PresenceState.OFFLINE
], max_token))
if include_offline:
defer.returnValue((updates.values(), max_token))
else:
defer.returnValue(([
s for s in updates.itervalues()
if s.state != PresenceState.OFFLINE
], max_token))

def get_current_key(self):
return self.store.get_current_presence_token()

def get_pagination_rows(self, user, pagination_config, key):
return self.get_new_events(user, from_key=None, include_offline=False)

@cachedInlineCallbacks(num_args=2, cache_context=True)
def _get_interested_in(self, user, explicit_room_id, cache_context):
"""Returns the set of users that the given user should see presence
updates for
"""
user_id = user.to_string()
plist = yield self.store.get_presence_list_accepted(
user.localpart, on_invalidate=cache_context.invalidate,
)
users_interested_in = set(row["observed_user_id"] for row in plist)
users_interested_in.add(user_id) # So that we receive our own presence

users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id, on_invalidate=cache_context.invalidate,
)
users_interested_in.update(users_who_share_room)

if explicit_room_id:
user_ids = yield self.store.get_users_in_room(
explicit_room_id, on_invalidate=cache_context.invalidate,
)
users_interested_in.update(user_ids)

defer.returnValue(users_interested_in)


def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
"""Checks the presence of users that have timed out and updates as
Expand Down
14 changes: 7 additions & 7 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,14 +721,14 @@ def _generate_sync_entry_for_presence(self, sync_result_builder, newly_joined_ro
extra_users_ids.update(users)
extra_users_ids.discard(user.to_string())

states = yield self.presence_handler.get_states(
extra_users_ids,
as_event=True,
)
presence.extend(states)
if extra_users_ids:
states = yield self.presence_handler.get_states(
extra_users_ids,
)
presence.extend(states)

# Deduplicate the presence entries so that there's at most one per user
presence = {p["content"]["user_id"]: p for p in presence}.values()
# Deduplicate the presence entries so that there's at most one per user
presence = {p.user_id: p for p in presence}.values()

presence = sync_config.filter_collection.filter_presence(
presence
Expand Down
10 changes: 10 additions & 0 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state

from synapse.util import DeferredTimedOutError
from synapse.util.logutils import log_function
Expand Down Expand Up @@ -412,6 +413,15 @@ def check_for_updates(before_token, after_token):
new_events,
is_peeking=is_peeking,
)
elif name == "presence":
now = self.clock.time_msec()
new_events[:] = [
{
"type": "m.presence",
"content": format_user_presence_state(event, now),
}
for event in new_events
]

events.extend(new_events)
end_token = end_token.copy_and_replace(keyname, new_key)
Expand Down
3 changes: 3 additions & 0 deletions synapse/rest/client/v1/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from synapse.api.errors import SynapseError, AuthError
from synapse.types import UserID
from synapse.handlers.presence import format_user_presence_state
from synapse.http.servlet import parse_json_object_from_request
from .base import ClientV1RestServlet, client_path_patterns

Expand All @@ -33,6 +34,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(PresenceStatusRestServlet, self).__init__(hs)
self.presence_handler = hs.get_presence_handler()
self.clock = hs.get_clock()

@defer.inlineCallbacks
def on_GET(self, request, user_id):
Expand All @@ -48,6 +50,7 @@ def on_GET(self, request, user_id):
raise AuthError(403, "You are not allowed to see their presence.")

state = yield self.presence_handler.get_state(target_user=user)
state = format_user_presence_state(state, self.clock.time_msec())

defer.returnValue((200, state))

Expand Down
20 changes: 13 additions & 7 deletions synapse/rest/client/v2_alpha/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from synapse.http.servlet import (
RestServlet, parse_string, parse_integer, parse_boolean
)
from synapse.handlers.presence import format_user_presence_state
from synapse.handlers.sync import SyncConfig
from synapse.types import StreamToken
from synapse.events.utils import (
Expand All @@ -28,7 +29,6 @@
from synapse.api.constants import PresenceState
from ._base import client_v2_patterns

import copy
import itertools
import logging

Expand Down Expand Up @@ -194,12 +194,18 @@ def on_GET(self, request):
defer.returnValue((200, response_content))

def encode_presence(self, events, time_now):
formatted = []
for event in events:
event = copy.deepcopy(event)
event['sender'] = event['content'].pop('user_id')
formatted.append(event)
return {"events": formatted}
return {
"events": [
{
"type": "m.presence",
"sender": event.user_id,
"content": format_user_presence_state(
event, time_now, include_user_id=False
),
}
for event in events
]
}

def encode_joined(self, rooms, time_now, token_id, event_fields):
"""
Expand Down

0 comments on commit 9d52719

Please sign in to comment.