Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Rewrite presence for performance. #582

Merged
merged 17 commits into from
Feb 19, 2016
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion synapse/api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ class PresenceState(object):
OFFLINE = u"offline"
UNAVAILABLE = u"unavailable"
ONLINE = u"online"
FREE_FOR_CHAT = u"free_for_chat"


class JoinRules(object):
Expand Down
43 changes: 35 additions & 8 deletions synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from synapse.types import UserID
from synapse.events.utils import serialize_event
from synapse.util.logcontext import preserve_context_over_fn
from synapse.api.constants import Membership, EventTypes
from synapse.events import EventBase

from ._base import BaseHandler

Expand Down Expand Up @@ -126,11 +128,12 @@ def get_stream(self, auth_user_id, pagin_config, timeout=0,
If `only_keys` is not None, events from keys will be sent down.
"""
auth_user = UserID.from_string(auth_user_id)
presence_handler = self.hs.get_handlers().presence_handler

try:
if affect_presence:
yield self.started_stream(auth_user)

context = yield presence_handler.user_syncing(
auth_user_id, affect_presence=affect_presence,
)
with context:
if timeout:
# If they've set a timeout set a minimum limit.
timeout = max(timeout, 500)
Expand All @@ -145,6 +148,34 @@ def get_stream(self, auth_user_id, pagin_config, timeout=0,
is_guest=is_guest, explicit_room_id=room_id
)

# When the user joins a new room, or another user joins a currently
# joined room, we need to send down presence for those users.
to_add = []
for event in events:
if not isinstance(event, EventBase):
continue
if event.type == EventTypes.Member:
if event.membership != Membership.JOIN:
continue
# Send down presence.
if event.state_key == auth_user_id:
# Send down presence for everyone in the room.
users = yield self.store.get_users_in_room(event.room_id)
states = yield presence_handler.get_states(
users,
as_event=True,
)
to_add.extend(states)
else:

ev = yield presence_handler.get_state(
UserID.from_string(event.state_key),
as_event=True,
)
to_add.append(ev)

events.extend(to_add)

time_now = self.clock.time_msec()

chunks = [
Expand All @@ -159,10 +190,6 @@ def get_stream(self, auth_user_id, pagin_config, timeout=0,

defer.returnValue(chunk)

finally:
if affect_presence:
self.stopped_stream(auth_user)


class EventHandler(BaseHandler):

Expand Down
14 changes: 3 additions & 11 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError
from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.caches.snapshot_cache import SnapshotCache
from synapse.types import UserID, RoomStreamToken, StreamToken

Expand Down Expand Up @@ -254,8 +253,7 @@ def send_event(self, event, context, ratelimit=True, is_guest=False):

if event.type == EventTypes.Message:
presence = self.hs.get_handlers().presence_handler
with PreserveLoggingContext():
presence.bump_presence_active_time(user)
yield presence.bump_presence_active_time(user)

@defer.inlineCallbacks
def create_and_send_event(self, event_dict, ratelimit=True,
Expand Down Expand Up @@ -660,10 +658,6 @@ def _room_initial_sync_joined(self, user_id, room_id, pagin_config,
room_id=room_id,
)

# TODO(paul): I wish I was called with user objects not user_id
# strings...
auth_user = UserID.from_string(user_id)

# TODO: These concurrently
time_now = self.clock.time_msec()
state = [
Expand All @@ -688,13 +682,11 @@ def _room_initial_sync_joined(self, user_id, room_id, pagin_config,
@defer.inlineCallbacks
def get_presence():
states = yield presence_handler.get_states(
target_users=[UserID.from_string(m.user_id) for m in room_members],
auth_user=auth_user,
[m.user_id for m in room_members],
as_event=True,
check_auth=False,
)

defer.returnValue(states.values())
defer.returnValue(states)

@defer.inlineCallbacks
def get_receipts():
Expand Down
Loading