Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Format presence events on the edges instead of reformatting them multiple times #2013

Merged
merged 5 commits into from
Mar 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions synapse/api/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.api.errors import SynapseError
from synapse.storage.presence import UserPresenceState
from synapse.types import UserID, RoomID

from twisted.internet import defer
Expand Down Expand Up @@ -253,19 +254,35 @@ def check(self, event):
Returns:
bool: True if the event matches
"""
sender = event.get("sender", None)
if not sender:
# Presence events have their 'sender' in content.user_id
content = event.get("content")
# account_data has been allowed to have non-dict content, so check type first
if isinstance(content, dict):
sender = content.get("user_id")
# We usually get the full "events" as dictionaries coming through,
# except for presence which actually gets passed around as its own
# namedtuple type.
if isinstance(event, UserPresenceState):
sender = event.user_id
room_id = None
ev_type = "m.presence"
is_url = False
else:
sender = event.get("sender", None)
if not sender:
# Presence events had their 'sender' in content.user_id, but are
# now handled above. We don't know if anything else uses this
# form. TODO: Check this and probably remove it.
content = event.get("content")
# account_data has been allowed to have non-dict content, so
# check type first
if isinstance(content, dict):
sender = content.get("user_id")

room_id = event.get("room_id", None)
ev_type = event.get("type", None)
is_url = "url" in event.get("content", {})

return self.check_fields(
event.get("room_id", None),
room_id,
sender,
event.get("type", None),
"url" in event.get("content", {})
ev_type,
is_url,
)

def check_fields(self, room_id, sender, event_type, contains_url):
Expand Down
11 changes: 10 additions & 1 deletion synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from synapse.api.errors import AuthError, Codes
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.handlers.presence import format_user_presence_state
from synapse.streams.config import PaginationConfig
from synapse.types import (
UserID, StreamToken,
Expand Down Expand Up @@ -225,9 +226,17 @@ def handle_room(event):
"content": content,
})

now = self.clock.time_msec()

ret = {
"rooms": rooms_ret,
"presence": presence,
"presence": [
{
"type": "m.presence",
"content": format_user_presence_state(event, now),
}
for event in presence
],
"account_data": account_data_events,
"receipts": receipt,
"end": now_token.to_string(),
Expand Down
73 changes: 45 additions & 28 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from synapse.api.constants import PresenceState
from synapse.storage.presence import UserPresenceState

from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.logcontext import preserve_fn
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
Expand Down Expand Up @@ -719,9 +720,7 @@ def get_states(self, target_user_ids, as_event=False):
for state in updates
])
else:
defer.returnValue([
format_user_presence_state(state, now) for state in updates
])
defer.returnValue(updates)

@defer.inlineCallbacks
def set_state(self, target_user, state, ignore_status_msg=False):
Expand Down Expand Up @@ -795,6 +794,9 @@ def get_presence_list(self, observer_user, accepted=None):
as_event=False,
)

now = self.clock.time_msec()
results[:] = [format_user_presence_state(r, now) for r in results]

is_accepted = {
row["observed_user_id"]: row["accepted"] for row in presence_list
}
Expand Down Expand Up @@ -847,6 +849,7 @@ def invite_presence(self, observed_user, observer_user):
)

state_dict = yield self.get_state(observed_user, as_event=False)
state_dict = format_user_presence_state(state_dict, self.clock.time_msec())

self.federation.send_edu(
destination=observer_user.domain,
Expand Down Expand Up @@ -979,14 +982,18 @@ def should_notify(old_state, new_state):
return False


def format_user_presence_state(state, now):
def format_user_presence_state(state, now, include_user_id=True):
"""Convert UserPresenceState to a format that can be sent down to clients
and to other servers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment to explain why the "user_id" is optional? something like

The "user_id" is optional so that this function can be used to format presence 
updates for client /sync responses and for federation /send requests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(done)


The "user_id" is optional so that this function can be used to format presence
updates for client /sync responses and for federation /send requests.
"""
content = {
"presence": state.state,
"user_id": state.user_id,
}
if include_user_id:
content["user_id"] = state.user_id
if state.last_active_ts:
content["last_active_ago"] = now - state.last_active_ts
if state.status_msg and state.state != PresenceState.OFFLINE:
Expand Down Expand Up @@ -1025,7 +1032,6 @@ def get_new_events(self, user, from_key, room_ids=None, include_offline=True,
# sending down the rare duplicate is not a concern.

with Measure(self.clock, "presence.get_new_events"):
user_id = user.to_string()
if from_key is not None:
from_key = int(from_key)

Expand All @@ -1034,18 +1040,7 @@ def get_new_events(self, user, from_key, room_ids=None, include_offline=True,

max_token = self.store.get_current_presence_token()

plist = yield self.store.get_presence_list_accepted(user.localpart)
users_interested_in = set(row["observed_user_id"] for row in plist)
users_interested_in.add(user_id) # So that we receive our own presence

users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id
)
users_interested_in.update(users_who_share_room)

if explicit_room_id:
user_ids = yield self.store.get_users_in_room(explicit_room_id)
users_interested_in.update(user_ids)
users_interested_in = yield self._get_interested_in(user, explicit_room_id)

user_ids_changed = set()
changed = None
Expand Down Expand Up @@ -1073,23 +1068,45 @@ def get_new_events(self, user, from_key, room_ids=None, include_offline=True,

updates = yield presence.current_state_for_users(user_ids_changed)

now = self.clock.time_msec()

defer.returnValue(([
{
"type": "m.presence",
"content": format_user_presence_state(s, now),
}
for s in updates.values()
if include_offline or s.state != PresenceState.OFFLINE
], max_token))
if include_offline:
defer.returnValue((updates.values(), max_token))
else:
defer.returnValue(([
s for s in updates.itervalues()
if s.state != PresenceState.OFFLINE
], max_token))

def get_current_key(self):
return self.store.get_current_presence_token()

def get_pagination_rows(self, user, pagination_config, key):
return self.get_new_events(user, from_key=None, include_offline=False)

@cachedInlineCallbacks(num_args=2, cache_context=True)
def _get_interested_in(self, user, explicit_room_id, cache_context):
"""Returns the set of users that the given user should see presence
updates for
"""
user_id = user.to_string()
plist = yield self.store.get_presence_list_accepted(
user.localpart, on_invalidate=cache_context.invalidate,
)
users_interested_in = set(row["observed_user_id"] for row in plist)
users_interested_in.add(user_id) # So that we receive our own presence

users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id, on_invalidate=cache_context.invalidate,
)
users_interested_in.update(users_who_share_room)

if explicit_room_id:
user_ids = yield self.store.get_users_in_room(
explicit_room_id, on_invalidate=cache_context.invalidate,
)
users_interested_in.update(user_ids)

defer.returnValue(users_interested_in)


def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
"""Checks the presence of users that have timed out and updates as
Expand Down
14 changes: 7 additions & 7 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,14 +721,14 @@ def _generate_sync_entry_for_presence(self, sync_result_builder, newly_joined_ro
extra_users_ids.update(users)
extra_users_ids.discard(user.to_string())

states = yield self.presence_handler.get_states(
extra_users_ids,
as_event=True,
)
presence.extend(states)
if extra_users_ids:
states = yield self.presence_handler.get_states(
extra_users_ids,
)
presence.extend(states)

# Deduplicate the presence entries so that there's at most one per user
presence = {p["content"]["user_id"]: p for p in presence}.values()
# Deduplicate the presence entries so that there's at most one per user
presence = {p.user_id: p for p in presence}.values()

presence = sync_config.filter_collection.filter_presence(
presence
Expand Down
10 changes: 10 additions & 0 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state

from synapse.util import DeferredTimedOutError
from synapse.util.logutils import log_function
Expand Down Expand Up @@ -412,6 +413,15 @@ def check_for_updates(before_token, after_token):
new_events,
is_peeking=is_peeking,
)
elif name == "presence":
now = self.clock.time_msec()
new_events[:] = [
{
"type": "m.presence",
"content": format_user_presence_state(event, now),
}
for event in new_events
]

events.extend(new_events)
end_token = end_token.copy_and_replace(keyname, new_key)
Expand Down
3 changes: 3 additions & 0 deletions synapse/rest/client/v1/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from synapse.api.errors import SynapseError, AuthError
from synapse.types import UserID
from synapse.handlers.presence import format_user_presence_state
from synapse.http.servlet import parse_json_object_from_request
from .base import ClientV1RestServlet, client_path_patterns

Expand All @@ -33,6 +34,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(PresenceStatusRestServlet, self).__init__(hs)
self.presence_handler = hs.get_presence_handler()
self.clock = hs.get_clock()

@defer.inlineCallbacks
def on_GET(self, request, user_id):
Expand All @@ -48,6 +50,7 @@ def on_GET(self, request, user_id):
raise AuthError(403, "You are not allowed to see their presence.")

state = yield self.presence_handler.get_state(target_user=user)
state = format_user_presence_state(state, self.clock.time_msec())

defer.returnValue((200, state))

Expand Down
20 changes: 13 additions & 7 deletions synapse/rest/client/v2_alpha/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from synapse.http.servlet import (
RestServlet, parse_string, parse_integer, parse_boolean
)
from synapse.handlers.presence import format_user_presence_state
from synapse.handlers.sync import SyncConfig
from synapse.types import StreamToken
from synapse.events.utils import (
Expand All @@ -28,7 +29,6 @@
from synapse.api.constants import PresenceState
from ._base import client_v2_patterns

import copy
import itertools
import logging

Expand Down Expand Up @@ -194,12 +194,18 @@ def on_GET(self, request):
defer.returnValue((200, response_content))

def encode_presence(self, events, time_now):
formatted = []
for event in events:
event = copy.deepcopy(event)
event['sender'] = event['content'].pop('user_id')
formatted.append(event)
return {"events": formatted}
return {
"events": [
{
"type": "m.presence",
"sender": event.user_id,
"content": format_user_presence_state(
event, time_now, include_user_id=False
),
}
for event in events
]
}

def encode_joined(self, rooms, time_now, token_id, event_fields):
"""
Expand Down