Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Use event streams to calculate presence #4942

Merged
merged 5 commits into from
Mar 28, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/4942.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug where presence updates were sent to all servers in a room when a new server joined, rather than to just the new server.
73 changes: 71 additions & 2 deletions synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ def __init__(self, hs):
self.is_mine_id = hs.is_mine_id

self.presence_map = {} # Pending presence map user_id -> UserPresenceState
self.presence_changed = SortedDict() # Stream position -> user_id
self.presence_changed = SortedDict() # Stream position -> list[user_id]

# Stores the destinations we need to explicitly send presence to about a
# given user.
# Stream position -> (user_id, destinations)
self.presence_destinations = SortedDict()

self.keyed_edu = {} # (destination, key) -> EDU
self.keyed_edu_changed = SortedDict() # stream position -> (destination, key)
Expand All @@ -77,7 +82,7 @@ def register(name, queue):

for queue_name in [
"presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
"edus", "device_messages", "pos_time",
"edus", "device_messages", "pos_time", "presence_destinations",
]:
register(queue_name, getattr(self, queue_name))

Expand Down Expand Up @@ -121,6 +126,15 @@ def _clear_queue_before_pos(self, position_to_delete):
for user_id in uids
)

keys = self.presence_destinations.keys()
i = self.presence_destinations.bisect_left(position_to_delete)
for key in keys[:i]:
del self.presence_destinations[key]

user_ids.update(
user_id for user_id, _ in self.presence_destinations.values()
)

to_del = [
user_id for user_id in self.presence_map if user_id not in user_ids
]
Expand Down Expand Up @@ -209,6 +223,20 @@ def send_presence(self, states):

self.notifier.on_new_replication_data()

def send_presence_to_destinations(self, states, destinations):
"""As per FederationSender

Args:
states (list[UserPresenceState])
destinations (list[str])
"""
for state in states:
pos = self._next_pos()
self.presence_map.update({state.user_id: state for state in states})
self.presence_destinations[pos] = (state.user_id, destinations)

self.notifier.on_new_replication_data()

def send_device_messages(self, destination):
"""As per FederationSender"""
pos = self._next_pos()
Expand Down Expand Up @@ -261,6 +289,16 @@ def get_replication_rows(self, from_token, to_token, limit, federation_ack=None)
state=self.presence_map[user_id],
)))

# Fetch presence to send to destinations
i = self.presence_destinations.bisect_right(from_token)
j = self.presence_destinations.bisect_right(to_token) + 1

for pos, (user_id, dests) in self.presence_destinations.items()[i:j]:
rows.append((pos, PresenceDestinationsRow(
state=self.presence_map[user_id],
destinations=list(dests),
)))

# Fetch changes keyed edus
i = self.keyed_edu_changed.bisect_right(from_token)
j = self.keyed_edu_changed.bisect_right(to_token) + 1
Expand Down Expand Up @@ -357,6 +395,29 @@ def add_to_buffer(self, buff):
buff.presence.append(self.state)


class PresenceDestinationsRow(BaseFederationRow, namedtuple("PresenceDestinationsRow", (
"state", # UserPresenceState
"destinations", # list[str]
))):
TypeId = "pd"

@staticmethod
def from_data(data):
return PresenceDestinationsRow(
state=UserPresenceState.from_dict(data["state"]),
destinations=data["dests"],
)

def to_data(self):
return {
"state": self.state.as_dict(),
"dests": self.destinations,
}

def add_to_buffer(self, buff):
buff.presence_destinations.append((self.state, self.destinations))


class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
"key", # tuple(str) - the edu key passed to send_edu
"edu", # Edu
Expand Down Expand Up @@ -428,6 +489,7 @@ def add_to_buffer(self, buff):
Row.TypeId: Row
for Row in (
PresenceRow,
PresenceDestinationsRow,
KeyedEduRow,
EduRow,
DeviceRow,
Expand All @@ -437,6 +499,7 @@ def add_to_buffer(self, buff):

ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
"presence", # list(UserPresenceState)
"presence_destinations", # list of tuples of UserPresenceState and destinations
"keyed_edus", # dict of destination -> { key -> Edu }
"edus", # dict of destination -> [Edu]
"device_destinations", # set of destinations
Expand All @@ -458,6 +521,7 @@ def process_rows_for_federation(transaction_queue, rows):

buff = ParsedFederationStreamData(
presence=[],
presence_destinations=[],
keyed_edus={},
edus={},
device_destinations=set(),
Expand All @@ -476,6 +540,11 @@ def process_rows_for_federation(transaction_queue, rows):
if buff.presence:
transaction_queue.send_presence(buff.presence)

for state, destinations in buff.presence_destinations:
transaction_queue.send_presence_to_destinations(
states=[state], destinations=destinations,
)

for destination, edu_map in iteritems(buff.keyed_edus):
for key, edu in edu_map.items():
transaction_queue.send_edu(edu, key)
Expand Down
19 changes: 18 additions & 1 deletion synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def send_presence(self, states):
return

# First we queue up the new presence by user ID, so multiple presence
# updates in quick successtion are correctly handled
# updates in quick succession are correctly handled.
# We only want to send presence for our own users, so lets always just
# filter here just in case.
self.pending_presence.update({
Expand Down Expand Up @@ -402,6 +402,23 @@ def send_presence(self, states):
finally:
self._processing_pending_presence = False

def send_presence_to_destinations(self, states, destinations):
"""Send the given presence states to the given destinations.

Args:
states (list[UserPresenceState])
destinations (list[str])
"""

if not states or not self.hs.config.use_presence:
# No-op if presence is disabled.
return

for destination in destinations:
if destination == self.server_name:
continue
self._get_per_destination_queue(destination).send_presence(states)

@measure_func("txnqueue._process_presence")
@defer.inlineCallbacks
def _process_presence_inner(self, states):
Expand Down
177 changes: 148 additions & 29 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@

from twisted.internet import defer

from synapse.api.constants import PresenceState
import synapse.metrics
from synapse.api.constants import EventTypes, Membership, PresenceState
from synapse.api.errors import SynapseError
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.presence import UserPresenceState
from synapse.types import UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer
Expand Down Expand Up @@ -98,6 +100,7 @@ def __init__(self, hs):
self.hs = hs
self.is_mine = hs.is_mine
self.is_mine_id = hs.is_mine_id
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.store = hs.get_datastore()
self.wheel_timer = WheelTimer()
Expand Down Expand Up @@ -132,9 +135,6 @@ def __init__(self, hs):
)
)

distributor = hs.get_distributor()
distributor.observe("user_joined_room", self.user_joined_room)

active_presence = self.store.take_presence_startup_info()

# A dictionary of the current state of users. This is prefilled with
Expand Down Expand Up @@ -220,6 +220,15 @@ def __init__(self, hs):
LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [],
lambda: len(self.wheel_timer))

# Used to handle sending of presence to newly joined users/servers
if hs.config.use_presence:
self.notifier.add_replication_callback(self.notify_new_event)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love it if the stuff in the notifier could be renamed to be less about "replication", since we seem to be using it for things which aren't replication. Guess we can leave that for another time, though.


# Presence is best effort and quickly heals itself, so lets just always
# stream from the current state when we restart.
self._event_pos = self.store.get_current_events_token()
self._event_processing = False

@defer.inlineCallbacks
def _on_shutdown(self):
"""Gets called when shutting down. This lets us persist any updates that
Expand Down Expand Up @@ -750,31 +759,6 @@ def set_state(self, target_user, state, ignore_status_msg=False):

yield self._update_states([prev_state.copy_and_replace(**new_fields)])

@defer.inlineCallbacks
def user_joined_room(self, user, room_id):
"""Called (via the distributor) when a user joins a room. This funciton
sends presence updates to servers, either:
1. the joining user is a local user and we send their presence to
all servers in the room.
2. the joining user is a remote user and so we send presence for all
local users in the room.
"""
# We only need to send presence to servers that don't have it yet. We
# don't need to send to local clients here, as that is done as part
# of the event stream/sync.
# TODO: Only send to servers not already in the room.
if self.is_mine(user):
state = yield self.current_state_for_user(user.to_string())

self._push_to_remotes([state])
else:
user_ids = yield self.store.get_users_in_room(room_id)
user_ids = list(filter(self.is_mine_id, user_ids))

states = yield self.current_state_for_users(user_ids)

self._push_to_remotes(list(states.values()))

@defer.inlineCallbacks
def get_presence_list(self, observer_user, accepted=None):
"""Returns the presence for all users in their presence list.
Expand Down Expand Up @@ -945,6 +929,141 @@ def get_all_presence_updates(self, last_id, current_id):
rows = yield self.store.get_all_presence_updates(last_id, current_id)
defer.returnValue(rows)

def notify_new_event(self):
"""Called when new events have happened. Handles users and servers
joining rooms and require being sent presence.
"""

if self._event_processing:
return

@defer.inlineCallbacks
def _process_presence():
if self._event_processing:
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
return

self._event_processing = True
try:
yield self._unsafe_process()
finally:
self._event_processing = False

run_as_background_process("presence.notify_new_event", _process_presence)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

@defer.inlineCallbacks
def _unsafe_process(self):
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "presence_delta"):
deltas = yield self.store.get_current_state_deltas(self._event_pos)
if not deltas:
return

yield self._handle_state_delta(deltas)

self._event_pos = deltas[-1]["stream_id"]

# Expose current event processing position to prometheus
synapse.metrics.event_processing_positions.labels("presence").set(
self._event_pos
)

@defer.inlineCallbacks
def _handle_state_delta(self, deltas):
"""Process current state deltas to find new joins that need to be
handled.
"""
for delta in deltas:
typ = delta["type"]
state_key = delta["state_key"]
room_id = delta["room_id"]
event_id = delta["event_id"]
prev_event_id = delta["prev_event_id"]

logger.debug("Handling: %r %r, %s", typ, state_key, event_id)

if typ != EventTypes.Member:
continue

event = yield self.store.get_event(event_id)
if event.content.get("membership") != Membership.JOIN:
# We only care about joins
continue

if prev_event_id:
prev_event = yield self.store.get_event(prev_event_id)
if prev_event.content.get("membership") == Membership.JOIN:
# Ignore changes to join events.
continue

yield self._on_user_joined_room(room_id, state_key)

@defer.inlineCallbacks
def _on_user_joined_room(self, room_id, user_id):
"""Called when we detect a user joining the room via the current state
delta stream.

Args:
room_id (str)
user_id (str)

Returns:
Deferred
"""

if self.is_mine_id(user_id):
# If this is a local user then we need to send their presence
# out to hosts in the room (who don't already have it)

# TODO: We should be able to filter the hosts down to those that
# haven't previously seen the user

state = yield self.current_state_for_user(user_id)
hosts = yield self.state.get_current_hosts_in_room(room_id)

# Filter out ourselves.
hosts = set(host for host in hosts if host != self.server_name)

self.federation.send_presence_to_destinations(
states=[state],
destinations=hosts,
)
else:
# A remote user has joined the room, so we need to:
# 1. Check if this is a new server in the room
# 2. If so send any presence they don't already have for
# local users in the room.

# TODO: We should be able to filter the users down to those that
# the server hasn't previously seen

# TODO: Check that this is actually a new server joining the
# room.

user_ids = yield self.state.get_current_user_in_room(room_id)
user_ids = list(filter(self.is_mine_id, user_ids))

states = yield self.current_state_for_users(user_ids)

# Filter out old presence, i.e. offline presence states where
# the user hasn't been active for a week. We can change this
# depending on what we want the UX to be, but at the least we
# should filter out offline presence where the state is just the
# default state.
now = self.clock.time_msec()
states = [
state for state in states.values()
if state.state != PresenceState.OFFLINE
or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
or state.status_msg is not None
]

if states:
self.federation.send_presence_to_destinations(
states=states,
destinations=[get_domain_from_id(user_id)],
)


def should_notify(old_state, new_state):
"""Decides if a presence state change should be sent to interested parties.
Expand Down
Loading