Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Use event streams to calculate presence #4942

Merged
merged 5 commits into from
Mar 28, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 62 additions & 44 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,9 @@ def notify_new_event(self):
joining rooms and require being sent presence.
"""

if self._event_processing:
return

@defer.inlineCallbacks
def _process_presence():
if self._event_processing:
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -993,58 +996,73 @@ def _handle_state_delta(self, deltas):
# Ignore changes to join events.
continue

if self.is_mine_id(state_key):
# If this is a local user then we need to send their presence
# out to hosts in the room (who don't already have it)

# TODO: We should be able to filter the hosts down to those that
# haven't previously seen the user
yield self._on_user_joined_room(room_id, state_key)

state = yield self.current_state_for_user(state_key)
hosts = yield self.state.get_current_hosts_in_room(room_id)

# Filter out ourselves.
hosts = set(host for host in hosts if host != self.server_name)
@defer.inlineCallbacks
def _on_user_joined_room(self, room_id, user_id):
"""Called when we detect a user joining the room via the current state
delta stream.

self.federation.send_presence_to_destinations(
states=[state],
destinations=hosts,
)
else:
# A remote user has joined the room, so we need to:
# 1. Check if this is a new server in the room
# 2. If so send any presence they don't already have for
# local users in the room.
Args:
room_id (str)
user_id (str)

# TODO: We should be able to filter the users down to those that
# the server hasn't previously seen
Returns:
Deferred
"""

# TODO: Check that this is actually a new server joining the
# room.
if self.is_mine_id(user_id):
# If this is a local user then we need to send their presence
# out to hosts in the room (who don't already have it)

user_ids = yield self.state.get_current_user_in_room(room_id)
user_ids = list(filter(self.is_mine_id, user_ids))
# TODO: We should be able to filter the hosts down to those that
# haven't previously seen the user

states = yield self.current_state_for_users(user_ids)
state = yield self.current_state_for_user(user_id)
hosts = yield self.state.get_current_hosts_in_room(room_id)

# Filter out old presence, i.e. offline presence states where
# the user hasn't been active for a week. We can change this
# depending on what we want the UX to be, but at the least we
# should filter out offline presence where the state is just the
# default state.
now = self.clock.time_msec()
states = [
state for state in states.values()
if state.state != PresenceState.OFFLINE
or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
or state.status_msg is not None
]
# Filter out ourselves.
hosts = set(host for host in hosts if host != self.server_name)

if states:
self.federation.send_presence_to_destinations(
states=states,
destinations=[get_domain_from_id(state_key)],
)
self.federation.send_presence_to_destinations(
states=[state],
destinations=hosts,
)
else:
# A remote user has joined the room, so we need to:
# 1. Check if this is a new server in the room
# 2. If so send any presence they don't already have for
# local users in the room.

# TODO: We should be able to filter the users down to those that
# the server hasn't previously seen

# TODO: Check that this is actually a new server joining the
# room.

user_ids = yield self.state.get_current_user_in_room(room_id)
user_ids = list(filter(self.is_mine_id, user_ids))

states = yield self.current_state_for_users(user_ids)

# Filter out old presence, i.e. offline presence states where
# the user hasn't been active for a week. We can change this
# depending on what we want the UX to be, but at the least we
# should filter out offline presence where the state is just the
# default state.
now = self.clock.time_msec()
states = [
state for state in states.values()
if state.state != PresenceState.OFFLINE
or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
or state.status_msg is not None
]

if states:
self.federation.send_presence_to_destinations(
states=states,
destinations=[get_domain_from_id(user_id)],
)


def should_notify(old_state, new_state):
Expand Down
14 changes: 9 additions & 5 deletions tests/handlers/test_presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,9 @@ def test_remote_joins(self):
)
self.reactor.pump([0]) # Wait for presence updates to be handled

# Test that a new server gets told about existing presence #
#
# Test that a new server gets told about existing presence
#

self.federation_sender.reset_mock()

Expand All @@ -482,7 +484,9 @@ def test_remote_joins(self):
destinations=["server2"], states=[expected_state]
)

# Test that only the new server gets sent presence and not existing servers #
#
# Test that only the new server gets sent presence and not existing servers
#

self.federation_sender.reset_mock()
self._add_new_user(room_id, "@bob:server3")
Expand Down Expand Up @@ -517,7 +521,9 @@ def test_remote_gets_presence_when_local_user_joins(self):

self.reactor.pump([0]) # Wait for presence updates to be handled

# Test that when a local join happens remote servers get told about it #
#
# Test that when a local join happens remote servers get told about it
#

self.federation_sender.reset_mock()

Expand Down Expand Up @@ -547,15 +553,13 @@ def _add_new_user(self, room_id, user_id):

room_version = self.get_success(self.store.get_room_version(room_id))

# No we want to have other servers "join"
builder = EventBuilder(
state=self.state,
auth=self.auth,
store=self.store,
clock=self.clock,
hostname=hostname,
signing_key=self.random_signing_key,

format_version=room_version_to_event_format(room_version),
room_id=room_id,
type=EventTypes.Member,
Expand Down