Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #582 from matrix-org/erikj/presence
Browse files Browse the repository at this point in the history
Rewrite presence for performance.
  • Loading branch information
erikjohnston committed Feb 19, 2016
2 parents 220231d + e12ec33 commit e5ad2e5
Show file tree
Hide file tree
Showing 30 changed files with 1,450 additions and 3,102 deletions.
1 change: 0 additions & 1 deletion synapse/api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ class PresenceState(object):
OFFLINE = u"offline"
UNAVAILABLE = u"unavailable"
ONLINE = u"online"
FREE_FOR_CHAT = u"free_for_chat"


class JoinRules(object):
Expand Down
43 changes: 35 additions & 8 deletions synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from synapse.types import UserID
from synapse.events.utils import serialize_event
from synapse.util.logcontext import preserve_context_over_fn
from synapse.api.constants import Membership, EventTypes
from synapse.events import EventBase

from ._base import BaseHandler

Expand Down Expand Up @@ -126,11 +128,12 @@ def get_stream(self, auth_user_id, pagin_config, timeout=0,
If `only_keys` is not None, events from keys will be sent down.
"""
auth_user = UserID.from_string(auth_user_id)
presence_handler = self.hs.get_handlers().presence_handler

try:
if affect_presence:
yield self.started_stream(auth_user)

context = yield presence_handler.user_syncing(
auth_user_id, affect_presence=affect_presence,
)
with context:
if timeout:
# If they've set a timeout set a minimum limit.
timeout = max(timeout, 500)
Expand All @@ -145,6 +148,34 @@ def get_stream(self, auth_user_id, pagin_config, timeout=0,
is_guest=is_guest, explicit_room_id=room_id
)

# When the user joins a new room, or another user joins a currently
# joined room, we need to send down presence for those users.
to_add = []
for event in events:
if not isinstance(event, EventBase):
continue
if event.type == EventTypes.Member:
if event.membership != Membership.JOIN:
continue
# Send down presence.
if event.state_key == auth_user_id:
# Send down presence for everyone in the room.
users = yield self.store.get_users_in_room(event.room_id)
states = yield presence_handler.get_states(
users,
as_event=True,
)
to_add.extend(states)
else:

ev = yield presence_handler.get_state(
UserID.from_string(event.state_key),
as_event=True,
)
to_add.append(ev)

events.extend(to_add)

time_now = self.clock.time_msec()

chunks = [
Expand All @@ -159,10 +190,6 @@ def get_stream(self, auth_user_id, pagin_config, timeout=0,

defer.returnValue(chunk)

finally:
if affect_presence:
self.stopped_stream(auth_user)


class EventHandler(BaseHandler):

Expand Down
14 changes: 3 additions & 11 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError
from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.caches.snapshot_cache import SnapshotCache
from synapse.types import UserID, RoomStreamToken, StreamToken

Expand Down Expand Up @@ -249,8 +248,7 @@ def send_nonmember_event(self, event, context, ratelimit=True):

if event.type == EventTypes.Message:
presence = self.hs.get_handlers().presence_handler
with PreserveLoggingContext():
presence.bump_presence_active_time(user)
yield presence.bump_presence_active_time(user)

def deduplicate_state_event(self, event, context):
"""
Expand Down Expand Up @@ -674,10 +672,6 @@ def _room_initial_sync_joined(self, user_id, room_id, pagin_config,
room_id=room_id,
)

# TODO(paul): I wish I was called with user objects not user_id
# strings...
auth_user = UserID.from_string(user_id)

# TODO: These concurrently
time_now = self.clock.time_msec()
state = [
Expand All @@ -702,13 +696,11 @@ def _room_initial_sync_joined(self, user_id, room_id, pagin_config,
@defer.inlineCallbacks
def get_presence():
states = yield presence_handler.get_states(
target_users=[UserID.from_string(m.user_id) for m in room_members],
auth_user=auth_user,
[m.user_id for m in room_members],
as_event=True,
check_auth=False,
)

defer.returnValue(states.values())
defer.returnValue(states)

@defer.inlineCallbacks
def get_receipts():
Expand Down
Loading

0 comments on commit e5ad2e5

Please sign in to comment.