Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Use a database table to hold the users that should have full presence sent to them, instead of something in-memory #9823

Merged
merged 30 commits into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
cce22cd
Add a util file for non API-specific constants, and move ONE_HOUR to it
anoadragon453 Apr 15, 2021
84fb5f0
Add migration and storage methods for users_to_send_full_presence_to …
anoadragon453 Apr 15, 2021
f7e6887
Modify ModuleApi to upsert entries into our new table
anoadragon453 Apr 15, 2021
9d9f46d
Modify SyncHandler to pull from the new table
anoadragon453 Apr 15, 2021
a71fd40
Changelog
anoadragon453 Apr 15, 2021
0951d36
Remove users_to_send_full_presence_to table culling; add fk for user_id
anoadragon453 May 5, 2021
3ebbc96
Add presence_stream_id column to users_to_send_full_presence_to table
anoadragon453 May 5, 2021
245e672
Updating tests to account for new stream-token based sync tracking
anoadragon453 May 10, 2021
b4e7e9d
Fix stream token multiple devices
anoadragon453 May 11, 2021
337197d
Add a new key to presence_set_state replication request, force_notify
anoadragon453 May 11, 2021
0b15230
self -> test_case. Fix worker-based test
anoadragon453 May 12, 2021
822f87a
Modify send_local_online_presence_to to send to local users on non-fe…
anoadragon453 May 12, 2021
2e408ac
lint
anoadragon453 May 12, 2021
7ca6618
Revert server_notices_manager change
anoadragon453 May 12, 2021
6b54010
Don't commit :memory: -> test.db :)
anoadragon453 May 12, 2021
ce73710
Revert "Modify send_local_online_presence_to to send to local users o…
anoadragon453 May 12, 2021
839f671
Send Server Notices change was required after all
anoadragon453 May 12, 2021
a907e0a
Switch to using PresenceFederationQueue.send_presence_to_destinations
anoadragon453 May 12, 2021
d2f07a7
No need to split a single-use constant out to a separate file
anoadragon453 May 12, 2021
682e125
Describe force_notify in docstrings
anoadragon453 May 12, 2021
6839bcf
Have the handler call the storage method, not the other way around
anoadragon453 May 12, 2021
726d705
Call send_presence_to_destinations properly
anoadragon453 May 13, 2021
e603a77
Fix presence router test
anoadragon453 May 13, 2021
0bbca77
Only bump the max presence stream ID once
anoadragon453 May 13, 2021
7b8d008
Clarify notice of where ModuleApi.send_local_online_presece_to can be…
anoadragon453 May 13, 2021
db3befd
Fix iterator/iterable, and passing UserID
anoadragon453 May 14, 2021
48bedea
Update synapse/module_api/__init__.py
anoadragon453 May 17, 2021
a9875aa
Iterable -> Sequence; lint
anoadragon453 May 17, 2021
22e1795
Perform an empty check, switch back to next(iter(x))
anoadragon453 May 17, 2021
68073af
Don't have the same function name in a Handler and Store class
anoadragon453 May 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9823.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow sending full presence to users via workers other than the one that called `ModuleApi.send_local_online_presence_to`.
4 changes: 3 additions & 1 deletion docs/presence_router_module.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ async def ModuleApi.send_local_online_presence_to(users: Iterable[str]) -> None
which can be given a list of local or remote MXIDs to broadcast known, online user
presence to (for those users that the receiving user is considered interested in).
It does not include state for users who are currently offline, and it can only be
called on workers that support sending federation.
called on workers that support sending federation. Additionally, this method must
only be called from the main process, or from a worker that supports writing to
the [presence stream](https://github.com/matrix-org/synapse/blob/master/docs/workers.md#stream-writers).

### Module structure

Expand Down
130 changes: 104 additions & 26 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,21 @@ async def current_state_for_users(

@abc.abstractmethod
async def set_state(
self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False
self,
target_user: UserID,
state: JsonDict,
ignore_status_msg: bool = False,
force_notify: bool = False,
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
"""Set the presence state of the user. """
"""Set the presence state of the user.

Args:
target_user: The ID of the user to set the presence state of.
state: The presence state as a JSON dictionary.
ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
If False, the user's current status will be updated.
force_notify: Whether to force notification of the update to clients.
"""

@abc.abstractmethod
async def bump_presence_active_time(self, user: UserID):
Expand Down Expand Up @@ -296,6 +308,45 @@ async def maybe_send_presence_to_interested_destinations(
for destinations, states in hosts_and_states:
self._federation.send_presence_to_destinations(states, destinations)

async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]):
"""
Adds to the list of users who should receive a full snapshot of presence
upon their next sync. Note that this only works for local users.

Then, grabs the current presence state for a given set of users and adds it
to the top of the presence stream.

Args:
user_ids: The IDs of the local users to send full presence to.
"""
# Mark the user as receiving full presence on their next sync
await self.store.add_users_to_send_full_presence_to(user_ids)

# Add a new entry to the presence stream. Since we use stream tokens to determine whether a
# local user should receive a full snapshot of presence when they sync, we need to bump the
# presence stream so that subsequent syncs with no presence activity in between won't result
# in the client receiving multiple full snapshots of presence.
#
# If we bump the stream ID, then the user will get a higher stream token next sync, and thus
# correctly won't receive a second snapshot.

# Get the current presence state for each user (defaults to offline if not found)
current_presence_for_users = await self.current_state_for_users(user_ids)

for user_id, current_presence_state in current_presence_for_users.items():
# Convert the UserPresenceState object into a serializable dict
state = {
"presence": current_presence_state.state,
"status_message": current_presence_state.status_msg,
}

# Copy the presence state to the tip of the presence stream.

# We set force_notify=True here so that this presence update is guaranteed to
# increment the presence stream ID (which resending the current user's presence
# otherwise would not do).
await self.set_state(UserID.from_string(user_id), state, force_notify=True)
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved


class _NullContextManager(ContextManager[None]):
"""A context manager which does nothing."""
Expand Down Expand Up @@ -480,8 +531,17 @@ async def set_state(
target_user: UserID,
state: JsonDict,
ignore_status_msg: bool = False,
force_notify: bool = False,
) -> None:
"""Set the presence state of the user."""
"""Set the presence state of the user.

Args:
target_user: The ID of the user to set the presence state of.
state: The presence state as a JSON dictionary.
ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
If False, the user's current status will be updated.
force_notify: Whether to force notification of the update to clients.
"""
presence = state["presence"]

valid_presence = (
Expand All @@ -508,6 +568,7 @@ async def set_state(
user_id=user_id,
state=state,
ignore_status_msg=ignore_status_msg,
force_notify=force_notify,
)

async def bump_presence_active_time(self, user: UserID) -> None:
Expand Down Expand Up @@ -677,13 +738,19 @@ async def _persist_unpersisted_changes(self) -> None:
[self.user_to_current_state[user_id] for user_id in unpersisted]
)

async def _update_states(self, new_states: Iterable[UserPresenceState]) -> None:
async def _update_states(
self, new_states: Iterable[UserPresenceState], force_notify: bool = False
) -> None:
"""Updates presence of users. Sets the appropriate timeouts. Pokes
the notifier and federation if and only if the changed presence state
should be sent to clients/servers.

Args:
new_states: The new user presence state updates to process.
force_notify: Whether to force notifying clients of this presence state update,
even if it doesn't change the state of a user's presence (e.g online -> online).
This is currently used to bump the max presence stream ID without changing any
user's presence (see PresenceHandler.add_users_to_send_full_presence_to).
"""
now = self.clock.time_msec()

Expand Down Expand Up @@ -720,6 +787,9 @@ async def _update_states(self, new_states: Iterable[UserPresenceState]) -> None:
now=now,
)

if force_notify:
should_notify = True

self.user_to_current_state[user_id] = new_state

if should_notify:
Expand Down Expand Up @@ -1058,9 +1128,21 @@ async def incoming_presence(self, origin: str, content: JsonDict) -> None:
await self._update_states(updates)

async def set_state(
self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False
self,
target_user: UserID,
state: JsonDict,
ignore_status_msg: bool = False,
force_notify: bool = False,
) -> None:
"""Set the presence state of the user."""
"""Set the presence state of the user.

Args:
target_user: The ID of the user to set the presence state of.
state: The presence state as a JSON dictionary.
ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
If False, the user's current status will be updated.
force_notify: Whether to force notification of the update to clients.
"""
status_msg = state.get("status_msg", None)
presence = state["presence"]

Expand Down Expand Up @@ -1091,7 +1173,9 @@ async def set_state(
):
new_fields["last_active_ts"] = self.clock.time_msec()

await self._update_states([prev_state.copy_and_replace(**new_fields)])
await self._update_states(
[prev_state.copy_and_replace(**new_fields)], force_notify=force_notify
)

async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool:
"""Returns whether a user can see another user's presence."""
Expand Down Expand Up @@ -1389,11 +1473,10 @@ def __init__(self, hs: "HomeServer"):
#
# Presence -> Notifier -> PresenceEventSource -> Presence
#
# Same with get_module_api, get_presence_router
# Same with get_presence_router:
#
# AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler
self.get_presence_handler = hs.get_presence_handler
self.get_module_api = hs.get_module_api
self.get_presence_router = hs.get_presence_router
self.clock = hs.get_clock()
self.store = hs.get_datastore()
Expand Down Expand Up @@ -1424,16 +1507,21 @@ async def get_new_events(
stream_change_cache = self.store.presence_stream_cache

with Measure(self.clock, "presence.get_new_events"):
if user_id in self.get_module_api()._send_full_presence_to_local_users:
# This user has been specified by a module to receive all current, online
# user presence. Removing from_key and setting include_offline to false
# will do effectively this.
from_key = None
include_offline = False

if from_key is not None:
from_key = int(from_key)

# Check if this user should receive all current, online user presence. We only
# bother to do this if from_key is set, as otherwise the user will receive all
# user presence anyways.
if await self.store.should_user_receive_full_presence_with_token(
user_id, from_key
):
# This user has been specified by a module to receive all current, online
# user presence. Removing from_key and setting include_offline to false
# will do effectively this.
from_key = None
include_offline = False

max_token = self.store.get_current_presence_token()
if from_key == max_token:
# This is necessary as due to the way stream ID generators work
Expand Down Expand Up @@ -1467,12 +1555,6 @@ async def get_new_events(
user_id, include_offline, from_key
)

# Remove the user from the list of users to receive all presence
if user_id in self.get_module_api()._send_full_presence_to_local_users:
self.get_module_api()._send_full_presence_to_local_users.remove(
user_id
)

return presence_updates, max_token

# Make mypy happy. users_interested_in should now be a set
Expand Down Expand Up @@ -1522,10 +1604,6 @@ async def get_new_events(
)
presence_updates = list(users_to_state.values())

# Remove the user from the list of users to receive all presence
if user_id in self.get_module_api()._send_full_presence_to_local_users:
self.get_module_api()._send_full_presence_to_local_users.remove(user_id)

if not include_offline:
# Filter out offline presence states
presence_updates = self._filter_offline_presence_state(presence_updates)
Expand Down
66 changes: 33 additions & 33 deletions synapse/module_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,6 @@ def __init__(self, hs, auth_handler):
self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient
self._public_room_list_manager = PublicRoomListManager(hs)

# The next time these users sync, they will receive the current presence
# state of all local users. Users are added by send_local_online_presence_to,
# and removed after a successful sync.
#
# We make this a private variable to deter modules from accessing it directly,
# though other classes in Synapse will still do so.
self._send_full_presence_to_local_users = set()

@property
def http_client(self):
"""Allows making outbound HTTP requests to remote resources.
Expand Down Expand Up @@ -405,39 +397,47 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None:
Updates to remote users will be sent immediately, whereas local users will receive
them on their next sync attempt.

Note that this method can only be run on the main or federation_sender worker
processes.
Note that this method can only be run on the main or worker processes that have the
ability to write to the presence stream.
"""
if not self._hs.should_send_federation():
if (
self._hs.config.worker_app
and self._hs._instance_name not in self._hs.config.worker.writers.presence
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
):
raise Exception(
"send_local_online_presence_to can only be run "
"on processes that send federation",
"on processes that have the ability to write to the"
"presence stream (this includes the main process)",
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
)

local_users = set()
remote_users = set()
for user in users:
if self._hs.is_mine_id(user):
# Modify SyncHandler._generate_sync_entry_for_presence to call
# presence_source.get_new_events with an empty `from_key` if
# that user's ID were in a list modified by ModuleApi somewhere.
# That user would then get all presence state on next incremental sync.

# Force a presence initial_sync for this user next time
self._send_full_presence_to_local_users.add(user)
local_users.add(user)
else:
# Retrieve presence state for currently online users that this user
# is considered interested in
presence_events, _ = await self._presence_stream.get_new_events(
UserID.from_string(user), from_key=None, include_offline=False
)

# Send to remote destinations.

# We pull out the presence handler here to break a cyclic
# dependency between the presence router and module API.
presence_handler = self._hs.get_presence_handler()
await presence_handler.maybe_send_presence_to_interested_destinations(
presence_events
)
remote_users.add(user)

# We pull out the presence handler here to break a cyclic
# dependency between the presence router and module API.
presence_handler = self._hs.get_presence_handler()

if local_users:
# Force a presence initial_sync for these users next time they sync.
await presence_handler.add_users_to_send_full_presence_to(local_users)

for user in remote_users:
# Retrieve presence state for currently online users that this user
# is considered interested in.
presence_events, _ = await self._presence_stream.get_new_events(
UserID.from_string(user), from_key=None, include_offline=False
)

# Send to remote destinations.
destination = UserID.from_string(user).domain
presence_handler.get_federation_queue().send_presence_to_destinations(
presence_events, destination
)


class PublicRoomListManager:
Expand Down
11 changes: 9 additions & 2 deletions synapse/replication/http/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class ReplicationPresenceSetState(ReplicationEndpoint):
{
"state": { ... },
"ignore_status_msg": false,
"force_notify": false
}

200 OK
Expand All @@ -91,17 +92,23 @@ def __init__(self, hs: "HomeServer"):
self._presence_handler = hs.get_presence_handler()

@staticmethod
async def _serialize_payload(user_id, state, ignore_status_msg=False):
async def _serialize_payload(
user_id, state, ignore_status_msg=False, force_notify=False
):
return {
"state": state,
"ignore_status_msg": ignore_status_msg,
"force_notify": force_notify,
}

async def _handle_request(self, request, user_id):
content = parse_json_object_from_request(request)

await self._presence_handler.set_state(
UserID.from_string(user_id), content["state"], content["ignore_status_msg"]
UserID.from_string(user_id),
content["state"],
content["ignore_status_msg"],
content["force_notify"],
)

return (
Expand Down
8 changes: 5 additions & 3 deletions synapse/rest/admin/server_notice_servlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ def __init__(self, hs: "HomeServer"):
self.hs = hs
self.auth = hs.get_auth()
self.txns = HttpTransactionCache(hs)
self.snm = hs.get_server_notices_manager()

def register(self, json_resource: HttpServer):
PATTERN = "/send_server_notice"
Expand All @@ -77,15 +76,18 @@ async def on_POST(
event_type = body.get("type", EventTypes.Message)
state_key = body.get("state_key")

if not self.snm.is_enabled():
# We grab the server notices manager here as its initialisation has a check for worker processes,
# but worker processes still need to initialise SendServerNoticeServlet (as it is part of the
# admin api).
if not self.hs.get_server_notices_manager().is_enabled():
raise SynapseError(400, "Server notices are not enabled on this server")

user_id = body["user_id"]
UserID.from_string(user_id)
if not self.hs.is_mine_id(user_id):
raise SynapseError(400, "Server notices can only be sent to local users")

event = await self.snm.send_notice(
event = await self.hs.get_server_notices_manager().send_notice(
user_id=body["user_id"],
type=event_type,
state_key=state_key,
Expand Down
Loading