Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #4942 from matrix-org/erikj/fix_presence
Browse files Browse the repository at this point in the history
Use event streams to calculate presence
  • Loading branch information
erikjohnston authored Mar 28, 2019
2 parents 4eeb2c2 + 4e5f0f7 commit 2480143
Show file tree
Hide file tree
Showing 5 changed files with 412 additions and 33 deletions.
1 change: 1 addition & 0 deletions changelog.d/4942.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug where presence updates were sent to all servers in a room when a new server joined, rather than to just the new server.
73 changes: 71 additions & 2 deletions synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ def __init__(self, hs):
self.is_mine_id = hs.is_mine_id

self.presence_map = {} # Pending presence map user_id -> UserPresenceState
self.presence_changed = SortedDict() # Stream position -> user_id
self.presence_changed = SortedDict() # Stream position -> list[user_id]

# Stores the destinations we need to explicitly send presence to about a
# given user.
# Stream position -> (user_id, destinations)
self.presence_destinations = SortedDict()

self.keyed_edu = {} # (destination, key) -> EDU
self.keyed_edu_changed = SortedDict() # stream position -> (destination, key)
Expand All @@ -77,7 +82,7 @@ def register(name, queue):

for queue_name in [
"presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
"edus", "device_messages", "pos_time",
"edus", "device_messages", "pos_time", "presence_destinations",
]:
register(queue_name, getattr(self, queue_name))

Expand Down Expand Up @@ -121,6 +126,15 @@ def _clear_queue_before_pos(self, position_to_delete):
for user_id in uids
)

keys = self.presence_destinations.keys()
i = self.presence_destinations.bisect_left(position_to_delete)
for key in keys[:i]:
del self.presence_destinations[key]

user_ids.update(
user_id for user_id, _ in self.presence_destinations.values()
)

to_del = [
user_id for user_id in self.presence_map if user_id not in user_ids
]
Expand Down Expand Up @@ -209,6 +223,20 @@ def send_presence(self, states):

self.notifier.on_new_replication_data()

def send_presence_to_destinations(self, states, destinations):
"""As per FederationSender
Args:
states (list[UserPresenceState])
destinations (list[str])
"""
for state in states:
pos = self._next_pos()
self.presence_map.update({state.user_id: state for state in states})
self.presence_destinations[pos] = (state.user_id, destinations)

self.notifier.on_new_replication_data()

def send_device_messages(self, destination):
"""As per FederationSender"""
pos = self._next_pos()
Expand Down Expand Up @@ -261,6 +289,16 @@ def get_replication_rows(self, from_token, to_token, limit, federation_ack=None)
state=self.presence_map[user_id],
)))

# Fetch presence to send to destinations
i = self.presence_destinations.bisect_right(from_token)
j = self.presence_destinations.bisect_right(to_token) + 1

for pos, (user_id, dests) in self.presence_destinations.items()[i:j]:
rows.append((pos, PresenceDestinationsRow(
state=self.presence_map[user_id],
destinations=list(dests),
)))

# Fetch changes keyed edus
i = self.keyed_edu_changed.bisect_right(from_token)
j = self.keyed_edu_changed.bisect_right(to_token) + 1
Expand Down Expand Up @@ -357,6 +395,29 @@ def add_to_buffer(self, buff):
buff.presence.append(self.state)


class PresenceDestinationsRow(BaseFederationRow, namedtuple("PresenceDestinationsRow", (
"state", # UserPresenceState
"destinations", # list[str]
))):
TypeId = "pd"

@staticmethod
def from_data(data):
return PresenceDestinationsRow(
state=UserPresenceState.from_dict(data["state"]),
destinations=data["dests"],
)

def to_data(self):
return {
"state": self.state.as_dict(),
"dests": self.destinations,
}

def add_to_buffer(self, buff):
buff.presence_destinations.append((self.state, self.destinations))


class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
"key", # tuple(str) - the edu key passed to send_edu
"edu", # Edu
Expand Down Expand Up @@ -428,6 +489,7 @@ def add_to_buffer(self, buff):
Row.TypeId: Row
for Row in (
PresenceRow,
PresenceDestinationsRow,
KeyedEduRow,
EduRow,
DeviceRow,
Expand All @@ -437,6 +499,7 @@ def add_to_buffer(self, buff):

ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
"presence", # list(UserPresenceState)
"presence_destinations", # list of tuples of UserPresenceState and destinations
"keyed_edus", # dict of destination -> { key -> Edu }
"edus", # dict of destination -> [Edu]
"device_destinations", # set of destinations
Expand All @@ -458,6 +521,7 @@ def process_rows_for_federation(transaction_queue, rows):

buff = ParsedFederationStreamData(
presence=[],
presence_destinations=[],
keyed_edus={},
edus={},
device_destinations=set(),
Expand All @@ -476,6 +540,11 @@ def process_rows_for_federation(transaction_queue, rows):
if buff.presence:
transaction_queue.send_presence(buff.presence)

for state, destinations in buff.presence_destinations:
transaction_queue.send_presence_to_destinations(
states=[state], destinations=destinations,
)

for destination, edu_map in iteritems(buff.keyed_edus):
for key, edu in edu_map.items():
transaction_queue.send_edu(edu, key)
Expand Down
19 changes: 18 additions & 1 deletion synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def send_presence(self, states):
return

# First we queue up the new presence by user ID, so multiple presence
# updates in quick successtion are correctly handled
# updates in quick succession are correctly handled.
# We only want to send presence for our own users, so lets always just
# filter here just in case.
self.pending_presence.update({
Expand Down Expand Up @@ -402,6 +402,23 @@ def send_presence(self, states):
finally:
self._processing_pending_presence = False

def send_presence_to_destinations(self, states, destinations):
"""Send the given presence states to the given destinations.
Args:
states (list[UserPresenceState])
destinations (list[str])
"""

if not states or not self.hs.config.use_presence:
# No-op if presence is disabled.
return

for destination in destinations:
if destination == self.server_name:
continue
self._get_per_destination_queue(destination).send_presence(states)

@measure_func("txnqueue._process_presence")
@defer.inlineCallbacks
def _process_presence_inner(self, states):
Expand Down
176 changes: 147 additions & 29 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@

from twisted.internet import defer

from synapse.api.constants import PresenceState
import synapse.metrics
from synapse.api.constants import EventTypes, Membership, PresenceState
from synapse.api.errors import SynapseError
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.presence import UserPresenceState
from synapse.types import UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer
Expand Down Expand Up @@ -98,6 +100,7 @@ def __init__(self, hs):
self.hs = hs
self.is_mine = hs.is_mine
self.is_mine_id = hs.is_mine_id
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.store = hs.get_datastore()
self.wheel_timer = WheelTimer()
Expand Down Expand Up @@ -132,9 +135,6 @@ def __init__(self, hs):
)
)

distributor = hs.get_distributor()
distributor.observe("user_joined_room", self.user_joined_room)

active_presence = self.store.take_presence_startup_info()

# A dictionary of the current state of users. This is prefilled with
Expand Down Expand Up @@ -220,6 +220,15 @@ def __init__(self, hs):
LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [],
lambda: len(self.wheel_timer))

# Used to handle sending of presence to newly joined users/servers
if hs.config.use_presence:
self.notifier.add_replication_callback(self.notify_new_event)

# Presence is best effort and quickly heals itself, so lets just always
# stream from the current state when we restart.
self._event_pos = self.store.get_current_events_token()
self._event_processing = False

@defer.inlineCallbacks
def _on_shutdown(self):
"""Gets called when shutting down. This lets us persist any updates that
Expand Down Expand Up @@ -750,31 +759,6 @@ def set_state(self, target_user, state, ignore_status_msg=False):

yield self._update_states([prev_state.copy_and_replace(**new_fields)])

@defer.inlineCallbacks
def user_joined_room(self, user, room_id):
"""Called (via the distributor) when a user joins a room. This funciton
sends presence updates to servers, either:
1. the joining user is a local user and we send their presence to
all servers in the room.
2. the joining user is a remote user and so we send presence for all
local users in the room.
"""
# We only need to send presence to servers that don't have it yet. We
# don't need to send to local clients here, as that is done as part
# of the event stream/sync.
# TODO: Only send to servers not already in the room.
if self.is_mine(user):
state = yield self.current_state_for_user(user.to_string())

self._push_to_remotes([state])
else:
user_ids = yield self.store.get_users_in_room(room_id)
user_ids = list(filter(self.is_mine_id, user_ids))

states = yield self.current_state_for_users(user_ids)

self._push_to_remotes(list(states.values()))

@defer.inlineCallbacks
def get_presence_list(self, observer_user, accepted=None):
"""Returns the presence for all users in their presence list.
Expand Down Expand Up @@ -945,6 +929,140 @@ def get_all_presence_updates(self, last_id, current_id):
rows = yield self.store.get_all_presence_updates(last_id, current_id)
defer.returnValue(rows)

def notify_new_event(self):
"""Called when new events have happened. Handles users and servers
joining rooms and require being sent presence.
"""

if self._event_processing:
return

@defer.inlineCallbacks
def _process_presence():
assert not self._event_processing

self._event_processing = True
try:
yield self._unsafe_process()
finally:
self._event_processing = False

run_as_background_process("presence.notify_new_event", _process_presence)

@defer.inlineCallbacks
def _unsafe_process(self):
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "presence_delta"):
deltas = yield self.store.get_current_state_deltas(self._event_pos)
if not deltas:
return

yield self._handle_state_delta(deltas)

self._event_pos = deltas[-1]["stream_id"]

# Expose current event processing position to prometheus
synapse.metrics.event_processing_positions.labels("presence").set(
self._event_pos
)

@defer.inlineCallbacks
def _handle_state_delta(self, deltas):
"""Process current state deltas to find new joins that need to be
handled.
"""
for delta in deltas:
typ = delta["type"]
state_key = delta["state_key"]
room_id = delta["room_id"]
event_id = delta["event_id"]
prev_event_id = delta["prev_event_id"]

logger.debug("Handling: %r %r, %s", typ, state_key, event_id)

if typ != EventTypes.Member:
continue

event = yield self.store.get_event(event_id)
if event.content.get("membership") != Membership.JOIN:
# We only care about joins
continue

if prev_event_id:
prev_event = yield self.store.get_event(prev_event_id)
if prev_event.content.get("membership") == Membership.JOIN:
# Ignore changes to join events.
continue

yield self._on_user_joined_room(room_id, state_key)

@defer.inlineCallbacks
def _on_user_joined_room(self, room_id, user_id):
"""Called when we detect a user joining the room via the current state
delta stream.
Args:
room_id (str)
user_id (str)
Returns:
Deferred
"""

if self.is_mine_id(user_id):
# If this is a local user then we need to send their presence
# out to hosts in the room (who don't already have it)

# TODO: We should be able to filter the hosts down to those that
# haven't previously seen the user

state = yield self.current_state_for_user(user_id)
hosts = yield self.state.get_current_hosts_in_room(room_id)

# Filter out ourselves.
hosts = set(host for host in hosts if host != self.server_name)

self.federation.send_presence_to_destinations(
states=[state],
destinations=hosts,
)
else:
# A remote user has joined the room, so we need to:
# 1. Check if this is a new server in the room
# 2. If so send any presence they don't already have for
# local users in the room.

# TODO: We should be able to filter the users down to those that
# the server hasn't previously seen

# TODO: Check that this is actually a new server joining the
# room.

user_ids = yield self.state.get_current_user_in_room(room_id)
user_ids = list(filter(self.is_mine_id, user_ids))

states = yield self.current_state_for_users(user_ids)

# Filter out old presence, i.e. offline presence states where
# the user hasn't been active for a week. We can change this
# depending on what we want the UX to be, but at the least we
# should filter out offline presence where the state is just the
# default state.
now = self.clock.time_msec()
states = [
state for state in states.values()
if state.state != PresenceState.OFFLINE
or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
or state.status_msg is not None
]

if states:
self.federation.send_presence_to_destinations(
states=states,
destinations=[get_domain_from_id(user_id)],
)


def should_notify(old_state, new_state):
"""Decides if a presence state change should be sent to interested parties.
Expand Down
Loading

0 comments on commit 2480143

Please sign in to comment.