Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Clean-up presence code #16092

Merged
merged 9 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16092.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Clean-up the presence code.
169 changes: 75 additions & 94 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Collection,
Dict,
Expand All @@ -54,7 +53,10 @@
from synapse.events.presence_router import PresenceRouter
from synapse.logging.context import run_in_background
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
from synapse.replication.http.presence import (
ReplicationBumpPresenceActiveTime,
ReplicationPresenceSetState,
Expand Down Expand Up @@ -141,6 +143,8 @@ def __init__(self, hs: "HomeServer"):
self.state = hs.get_state_handler()
self.is_mine_id = hs.is_mine_id

self._presence_enabled = hs.config.server.use_presence

self._federation = None
if hs.should_send_federation():
self._federation = hs.get_federation_sender()
Expand All @@ -149,6 +153,15 @@ def __init__(self, hs: "HomeServer"):

self._busy_presence_enabled = hs.config.experimental.msc3026_enabled

self.VALID_PRESENCE: Tuple[str, ...] = (
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
PresenceState.OFFLINE,
)

if self._busy_presence_enabled:
self.VALID_PRESENCE += (PresenceState.BUSY,)

active_presence = self.store.take_presence_startup_info()
self.user_to_current_state = {state.user_id: state for state in active_presence}

Expand Down Expand Up @@ -395,8 +408,6 @@ def __init__(self, hs: "HomeServer"):

self._presence_writer_instance = hs.config.worker.writers.presence[0]

self._presence_enabled = hs.config.server.use_presence

# Route presence EDUs to the right worker
hs.get_federation_registry().register_instances_for_edu(
EduTypes.PRESENCE,
Expand All @@ -421,8 +432,6 @@ def __init__(self, hs: "HomeServer"):
self.send_stop_syncing, UPDATE_SYNCING_USERS_MS
)

self._busy_presence_enabled = hs.config.experimental.msc3026_enabled

hs.get_reactor().addSystemEventTrigger(
"before",
"shutdown",
Expand Down Expand Up @@ -490,7 +499,9 @@ async def user_syncing(
# what the spec wants: see comment in the BasePresenceHandler version
# of this function.
await self.set_state(
UserID.from_string(user_id), {"presence": presence_state}, True
UserID.from_string(user_id),
{"presence": presence_state},
ignore_status_msg=True,
)

curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
Expand Down Expand Up @@ -601,22 +612,13 @@ async def set_state(
"""
presence = state["presence"]

valid_presence = (
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
PresenceState.OFFLINE,
PresenceState.BUSY,
)

if presence not in valid_presence or (
presence == PresenceState.BUSY and not self._busy_presence_enabled
):
if presence not in self.VALID_PRESENCE:
raise SynapseError(400, "Invalid presence state")

user_id = target_user.to_string()

# If presence is disabled, no-op
if not self.hs.config.server.use_presence:
if not self._presence_enabled:
return

# Proxy request to instance that writes presence
Expand All @@ -633,7 +635,7 @@ async def bump_presence_active_time(self, user: UserID) -> None:
with the app.
"""
# If presence is disabled, no-op
if not self.hs.config.server.use_presence:
if not self._presence_enabled:
return

# Proxy request to instance that writes presence
Expand All @@ -649,7 +651,6 @@ def __init__(self, hs: "HomeServer"):
self.hs = hs
self.wheel_timer: WheelTimer[str] = WheelTimer()
self.notifier = hs.get_notifier()
self._presence_enabled = hs.config.server.use_presence

federation_registry = hs.get_federation_registry()

Expand Down Expand Up @@ -700,8 +701,6 @@ def __init__(self, hs: "HomeServer"):
self._on_shutdown,
)

self._next_serial = 1

# Keeps track of the number of *ongoing* syncs on this process. While
# this is non zero a user will never go offline.
self.user_to_num_current_syncs: Dict[str, int] = {}
Expand All @@ -723,21 +722,16 @@ def __init__(self, hs: "HomeServer"):
# Start a LoopingCall in 30s that fires every 5s.
# The initial delay is to allow disconnected clients a chance to
# reconnect before we treat them as offline.
def run_timeout_handler() -> Awaitable[None]:
return run_as_background_process(
"handle_presence_timeouts", self._handle_timeouts
)

self.clock.call_later(
30, self.clock.looping_call, run_timeout_handler, 5000
30, self.clock.looping_call, self._handle_timeouts, 5000
)

def run_persister() -> Awaitable[None]:
return run_as_background_process(
"persist_presence_changes", self._persist_unpersisted_changes
)

self.clock.call_later(60, self.clock.looping_call, run_persister, 60 * 1000)
self.clock.call_later(
60,
self.clock.looping_call,
self._persist_unpersisted_changes,
60 * 1000,
)

LaterGauge(
"synapse_handlers_presence_wheel_timer_size",
Expand Down Expand Up @@ -783,6 +777,7 @@ async def _on_shutdown(self) -> None:
)
logger.info("Finished _on_shutdown")

@wrap_as_background_process("persist_presence_changes")
async def _persist_unpersisted_changes(self) -> None:
"""We periodically persist the unpersisted changes, as otherwise they
may stack up and slow down shutdown times.
Expand Down Expand Up @@ -898,6 +893,7 @@ async def _update_states(
states, [destination]
)

@wrap_as_background_process("handle_presence_timeouts")
async def _handle_timeouts(self) -> None:
"""Checks the presence of users that have timed out and updates as
appropriate.
Expand Down Expand Up @@ -955,7 +951,7 @@ async def bump_presence_active_time(self, user: UserID) -> None:
with the app.
"""
# If presence is disabled, no-op
if not self.hs.config.server.use_presence:
if not self._presence_enabled:
return

user_id = user.to_string()
Expand Down Expand Up @@ -990,56 +986,51 @@ async def user_syncing(
client that is being used by a user.
presence_state: The presence state indicated in the sync request
"""
# Override if it should affect the user's presence, if presence is
# disabled.
if not self.hs.config.server.use_presence:
affect_presence = False
if not affect_presence or not self._presence_enabled:
return _NullContextManager()

if affect_presence:
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1

prev_state = await self.current_state_for_user(user_id)
prev_state = await self.current_state_for_user(user_id)

# If they're busy then they don't stop being busy just by syncing,
# so just update the last sync time.
if prev_state.state != PresenceState.BUSY:
# XXX: We set_state separately here and just update the last_active_ts above
# This keeps the logic as similar as possible between the worker and single
# process modes. Using set_state will actually cause last_active_ts to be
# updated always, which is not what the spec calls for, but synapse has done
# this for... forever, I think.
await self.set_state(
UserID.from_string(user_id), {"presence": presence_state}, True
)
# Retrieve the new state for the logic below. This should come from the
# in-memory cache.
prev_state = await self.current_state_for_user(user_id)
# If they're busy then they don't stop being busy just by syncing,
# so just update the last sync time.
if prev_state.state != PresenceState.BUSY:
# XXX: We set_state separately here and just update the last_active_ts above
# This keeps the logic as similar as possible between the worker and single
# process modes. Using set_state will actually cause last_active_ts to be
# updated always, which is not what the spec calls for, but synapse has done
# this for... forever, I think.
await self.set_state(
UserID.from_string(user_id),
{"presence": presence_state},
ignore_status_msg=True,
)
# Retrieve the new state for the logic below. This should come from the
# in-memory cache.
prev_state = await self.current_state_for_user(user_id)

# To keep the single process behaviour consistent with worker mode, run the
# same logic as `update_external_syncs_row`, even though it looks weird.
if prev_state.state == PresenceState.OFFLINE:
await self._update_states(
[
prev_state.copy_and_replace(
state=PresenceState.ONLINE,
last_active_ts=self.clock.time_msec(),
last_user_sync_ts=self.clock.time_msec(),
)
]
)
# otherwise, set the new presence state & update the last sync time,
# but don't update last_active_ts as this isn't an indication that
# they've been active (even though it's probably been updated by
# set_state above)
else:
await self._update_states(
[
prev_state.copy_and_replace(
last_user_sync_ts=self.clock.time_msec()
)
]
)
# To keep the single process behaviour consistent with worker mode, run the
# same logic as `update_external_syncs_row`, even though it looks weird.
if prev_state.state == PresenceState.OFFLINE:
await self._update_states(
[
prev_state.copy_and_replace(
state=PresenceState.ONLINE,
last_active_ts=self.clock.time_msec(),
last_user_sync_ts=self.clock.time_msec(),
)
]
)
# otherwise, set the new presence state & update the last sync time,
# but don't update last_active_ts as this isn't an indication that
# they've been active (even though it's probably been updated by
# set_state above)
else:
await self._update_states(
[prev_state.copy_and_replace(last_user_sync_ts=self.clock.time_msec())]
)

async def _end() -> None:
try:
Expand All @@ -1061,8 +1052,7 @@ def _user_syncing() -> Generator[None, None, None]:
try:
yield
finally:
if affect_presence:
run_in_background(_end)
run_in_background(_end)

return _user_syncing()

Expand Down Expand Up @@ -1229,20 +1219,11 @@ async def set_state(
status_msg = state.get("status_msg", None)
presence = state["presence"]

valid_presence = (
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
PresenceState.OFFLINE,
PresenceState.BUSY,
)

if presence not in valid_presence or (
presence == PresenceState.BUSY and not self._busy_presence_enabled
):
if presence not in self.VALID_PRESENCE:
raise SynapseError(400, "Invalid presence state")

# If presence is disabled, no-op
if not self.hs.config.server.use_presence:
if not self._presence_enabled:
return

user_id = target_user.to_string()
Expand Down