From 12d9100cb16aab8715d9fe148391e137acba4345 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 21 Aug 2023 12:58:11 -0400 Subject: [PATCH 1/4] Update user_syncing to take a device_id. --- synapse/handlers/events.py | 1 + synapse/handlers/presence.py | 17 ++++++++++++++--- synapse/rest/client/sync.py | 1 + tests/handlers/test_presence.py | 21 +++++++++++++++------ 4 files changed, 31 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 33359f6ed748..d12803bf0f31 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -67,6 +67,7 @@ async def get_stream( context = await presence_handler.user_syncing( requester.user.to_string(), + requester.device_id, affect_presence=affect_presence, presence_state=PresenceState.ONLINE, ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index c395dcdb43e6..bd2724324e7a 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -165,7 +165,11 @@ def __init__(self, hs: "HomeServer"): @abc.abstractmethod async def user_syncing( - self, user_id: str, affect_presence: bool, presence_state: str + self, + user_id: str, + device_id: Optional[str], + affect_presence: bool, + presence_state: str, ) -> ContextManager[None]: """Returns a context manager that should surround any stream requests from the user. @@ -176,6 +180,7 @@ async def user_syncing( Args: user_id: the user that is starting a sync + device_id: the user's device that is starting a sync affect_presence: If false this function will be a no-op. Useful for streams that are not associated with an actual client that is being used by a user. @@ -481,7 +486,11 @@ def send_stop_syncing(self) -> None: self.send_user_sync(user_id, False, last_sync_ms) async def user_syncing( - self, user_id: str, affect_presence: bool, presence_state: str + self, + user_id: str, + device_id: Optional[str], + affect_presence: bool, + presence_state: str, ) -> ContextManager[None]: """Record that a user is syncing. @@ -966,6 +975,7 @@ async def bump_presence_active_time(self, user: UserID) -> None: async def user_syncing( self, user_id: str, + device_id: Optional[str], affect_presence: bool = True, presence_state: str = PresenceState.ONLINE, ) -> ContextManager[None]: @@ -977,7 +987,8 @@ async def user_syncing( when users disconnect/reconnect. Args: - user_id + user_id: the user that is starting a sync + device_id: the user's device that is starting a sync affect_presence: If false this function will be a no-op. Useful for streams that are not associated with an actual client that is being used by a user. diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index d7854ed4fd9d..42bdd3bb108b 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -205,6 +205,7 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: context = await self.presence_handler.user_syncing( user.to_string(), + requester.device_id, affect_presence=affect_presence, presence_state=set_presence, ) diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index a3fdcf7f9346..d389dd7493f7 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -524,6 +524,7 @@ def default_config(self) -> JsonDict: def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.user_id = f"@test:{self.hs.config.server.server_name}" + self.device_id = "dev-1" # Move the reactor to the initial time. self.reactor.advance(1000) @@ -608,7 +609,10 @@ def test_restored_presence_online_after_sync( self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000 / 2) self.get_success( presence_handler.user_syncing( - self.user_id, sync_state != PresenceState.OFFLINE, sync_state + self.user_id, + self.device_id, + sync_state != PresenceState.OFFLINE, + sync_state, ) ) @@ -632,6 +636,7 @@ def test_restored_presence_online_after_sync( class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): user_id = "@test:server" user_id_obj = UserID.from_string(user_id) + device_id = "dev-1" def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.presence_handler = hs.get_presence_handler() @@ -652,7 +657,7 @@ def test_external_process_timeout(self) -> None: self.get_success( worker_presence_handler.user_syncing( - self.user_id, True, PresenceState.ONLINE + self.user_id, self.device_id, True, PresenceState.ONLINE ), by=0.1, ) @@ -769,7 +774,7 @@ def test_set_presence_from_syncing_not_set(self) -> None: self.get_success( self.presence_handler.user_syncing( - self.user_id, False, PresenceState.ONLINE + self.user_id, self.device_id, False, PresenceState.ONLINE ) ) @@ -786,7 +791,9 @@ def test_set_presence_from_syncing_is_set(self) -> None: self._set_presencestate_with_status_msg(PresenceState.UNAVAILABLE, status_msg) self.get_success( - self.presence_handler.user_syncing(self.user_id, True, PresenceState.ONLINE) + self.presence_handler.user_syncing( + self.user_id, self.device_id, True, PresenceState.ONLINE + ) ) state = self.get_success(self.presence_handler.get_state(self.user_id_obj)) @@ -800,7 +807,9 @@ def test_set_presence_from_syncing_keeps_status(self) -> None: self._set_presencestate_with_status_msg(PresenceState.UNAVAILABLE, status_msg) self.get_success( - self.presence_handler.user_syncing(self.user_id, True, PresenceState.ONLINE) + self.presence_handler.user_syncing( + self.user_id, self.device_id, True, PresenceState.ONLINE + ) ) state = self.get_success(self.presence_handler.get_state(self.user_id_obj)) @@ -838,7 +847,7 @@ def test_set_presence_from_syncing_keeps_busy( # /presence/*. self.get_success( worker_to_sync_against.get_presence_handler().user_syncing( - self.user_id, True, PresenceState.ONLINE + self.user_id, self.device_id, True, PresenceState.ONLINE ), by=0.1, ) From 1685a054125cf53f940c71dcc40023b443aa7f11 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 21 Aug 2023 13:24:10 -0400 Subject: [PATCH 2/4] Update set_state to take a device_id. --- synapse/handlers/presence.py | 13 ++++++++++++- synapse/replication/http/presence.py | 5 ++++- synapse/rest/client/presence.py | 2 +- tests/handlers/test_presence.py | 17 ++++++++++++----- 4 files changed, 29 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index bd2724324e7a..da809233f3fd 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -257,6 +257,7 @@ async def current_state_for_user(self, user_id: str) -> UserPresenceState: async def set_state( self, target_user: UserID, + device_id: Optional[str], state: JsonDict, force_notify: bool = False, is_sync: bool = False, @@ -265,6 +266,7 @@ async def set_state( Args: target_user: The ID of the user to set the presence state of. + device_id: the device that the user is setting the presence state of. state: The presence state as a JSON dictionary. force_notify: Whether to force notification of the update to clients. is_sync: True if this update was from a sync, which results in @@ -386,7 +388,9 @@ async def send_full_presence_to_users(self, user_ids: StrCollection) -> None: # We set force_notify=True here so that this presence update is guaranteed to # increment the presence stream ID (which resending the current user's presence # otherwise would not do). - await self.set_state(UserID.from_string(user_id), state, force_notify=True) + await self.set_state( + UserID.from_string(user_id), None, state, force_notify=True + ) async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool: raise NotImplementedError( @@ -504,6 +508,7 @@ async def user_syncing( # what the spec wants. await self.set_state( UserID.from_string(user_id), + device_id, state={"presence": presence_state}, is_sync=True, ) @@ -601,6 +606,7 @@ def get_currently_syncing_users_for_replication(self) -> Iterable[str]: async def set_state( self, target_user: UserID, + device_id: Optional[str], state: JsonDict, force_notify: bool = False, is_sync: bool = False, @@ -609,6 +615,7 @@ async def set_state( Args: target_user: The ID of the user to set the presence state of. + device_id: the device that the user is setting the presence state of. state: The presence state as a JSON dictionary. force_notify: Whether to force notification of the update to clients. is_sync: True if this update was from a sync, which results in @@ -631,6 +638,7 @@ async def set_state( await self._set_state_client( instance_name=self._presence_writer_instance, user_id=user_id, + device_id=device_id, state=state, force_notify=force_notify, is_sync=is_sync, @@ -1004,6 +1012,7 @@ async def user_syncing( # what the spec wants. await self.set_state( UserID.from_string(user_id), + device_id, state={"presence": presence_state}, is_sync=True, ) @@ -1174,6 +1183,7 @@ async def incoming_presence(self, origin: str, content: JsonDict) -> None: async def set_state( self, target_user: UserID, + device_id: Optional[str], state: JsonDict, force_notify: bool = False, is_sync: bool = False, @@ -1182,6 +1192,7 @@ async def set_state( Args: target_user: The ID of the user to set the presence state of. + device_id: the device that the user is setting the presence state of. state: The presence state as a JSON dictionary. force_notify: Whether to force notification of the update to clients. is_sync: True if this update was from a sync, which results in diff --git a/synapse/replication/http/presence.py b/synapse/replication/http/presence.py index a24fb9310be1..72d2f4c7a557 100644 --- a/synapse/replication/http/presence.py +++ b/synapse/replication/http/presence.py @@ -13,7 +13,7 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING, Tuple +from typing import TYPE_CHECKING, Optional, Tuple from twisted.web.server import Request @@ -95,11 +95,13 @@ def __init__(self, hs: "HomeServer"): @staticmethod async def _serialize_payload( # type: ignore[override] user_id: str, + device_id: Optional[str], state: JsonDict, force_notify: bool = False, is_sync: bool = False, ) -> JsonDict: return { + "device_id": device_id, "state": state, "force_notify": force_notify, "is_sync": is_sync, @@ -110,6 +112,7 @@ async def _handle_request( # type: ignore[override] ) -> Tuple[int, JsonDict]: await self._presence_handler.set_state( UserID.from_string(user_id), + content.get("device_id"), content["state"], content["force_notify"], content.get("is_sync", False), diff --git a/synapse/rest/client/presence.py b/synapse/rest/client/presence.py index 8e193330f8bc..d578faa96984 100644 --- a/synapse/rest/client/presence.py +++ b/synapse/rest/client/presence.py @@ -97,7 +97,7 @@ async def on_PUT( raise SynapseError(400, "Unable to parse state") if self._use_presence: - await self.presence_handler.set_state(user, state) + await self.presence_handler.set_state(user, requester.device_id, state) return 200, {} diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index d389dd7493f7..a987267308ee 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -713,7 +713,7 @@ def test_user_goes_offline_manually_with_no_status_msg(self) -> None: # Mark user as offline self.get_success( self.presence_handler.set_state( - self.user_id_obj, {"presence": PresenceState.OFFLINE} + self.user_id_obj, self.device_id, {"presence": PresenceState.OFFLINE} ) ) @@ -745,7 +745,7 @@ def test_user_reset_online_with_no_status(self) -> None: # Mark user as online again self.get_success( self.presence_handler.set_state( - self.user_id_obj, {"presence": PresenceState.ONLINE} + self.user_id_obj, self.device_id, {"presence": PresenceState.ONLINE} ) ) @@ -884,6 +884,7 @@ def _set_presencestate_with_status_msg( self.get_success( self.presence_handler.set_state( self.user_id_obj, + self.device_id, {"presence": state, "status_msg": status_msg}, ) ) @@ -1125,7 +1126,9 @@ def test_remote_joins(self) -> None: # Mark test2 as online, test will be offline with a last_active of 0 self.get_success( self.presence_handler.set_state( - UserID.from_string("@test2:server"), {"presence": PresenceState.ONLINE} + UserID.from_string("@test2:server"), + "dev-1", + {"presence": PresenceState.ONLINE}, ) ) self.reactor.pump([0]) # Wait for presence updates to be handled @@ -1172,7 +1175,9 @@ def test_remote_gets_presence_when_local_user_joins(self) -> None: # Mark test as online self.get_success( self.presence_handler.set_state( - UserID.from_string("@test:server"), {"presence": PresenceState.ONLINE} + UserID.from_string("@test:server"), + "dev-1", + {"presence": PresenceState.ONLINE}, ) ) @@ -1180,7 +1185,9 @@ def test_remote_gets_presence_when_local_user_joins(self) -> None: # Note we don't join them to the room yet self.get_success( self.presence_handler.set_state( - UserID.from_string("@test2:server"), {"presence": PresenceState.ONLINE} + UserID.from_string("@test2:server"), + "dev-1", + {"presence": PresenceState.ONLINE}, ) ) From ccc980cd826087e6efaa82a88610dc5c8e479445 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 21 Aug 2023 13:33:35 -0400 Subject: [PATCH 3/4] Update bump_presence_active_time to take a device_id. --- synapse/handlers/message.py | 9 ++++++--- synapse/handlers/presence.py | 16 ++++++++++++---- synapse/replication/http/presence.py | 6 +++--- synapse/rest/client/read_marker.py | 4 +++- synapse/rest/client/receipts.py | 4 +++- synapse/rest/client/room.py | 4 +++- 6 files changed, 30 insertions(+), 13 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 3184bfb0471f..4a15c76a7b2a 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1921,7 +1921,10 @@ async def persist_and_notify_client_events( # We don't want to block sending messages on any presence code. This # matters as sometimes presence code can take a while. run_as_background_process( - "bump_presence_active_time", self._bump_active_time, requester.user + "bump_presence_active_time", + self._bump_active_time, + requester.user, + requester.device_id, ) async def _notify() -> None: @@ -1958,10 +1961,10 @@ async def _maybe_kick_guest_users( logger.info("maybe_kick_guest_users %r", current_state) await self.hs.get_room_member_handler().kick_guest_users(current_state) - async def _bump_active_time(self, user: UserID) -> None: + async def _bump_active_time(self, user: UserID, device_id: Optional[str]) -> None: try: presence = self.hs.get_presence_handler() - await presence.bump_presence_active_time(user) + await presence.bump_presence_active_time(user, device_id) except Exception: logger.exception("Error bumping presence active time") diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index da809233f3fd..50c68c86ceeb 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -276,7 +276,9 @@ async def set_state( """ @abc.abstractmethod - async def bump_presence_active_time(self, user: UserID) -> None: + async def bump_presence_active_time( + self, user: UserID, device_id: Optional[str] + ) -> None: """We've seen the user do something that indicates they're interacting with the app. """ @@ -644,7 +646,9 @@ async def set_state( is_sync=is_sync, ) - async def bump_presence_active_time(self, user: UserID) -> None: + async def bump_presence_active_time( + self, user: UserID, device_id: Optional[str] + ) -> None: """We've seen the user do something that indicates they're interacting with the app. """ @@ -655,7 +659,9 @@ async def bump_presence_active_time(self, user: UserID) -> None: # Proxy request to instance that writes presence user_id = user.to_string() await self._bump_active_client( - instance_name=self._presence_writer_instance, user_id=user_id + instance_name=self._presence_writer_instance, + user_id=user_id, + device_id=device_id, ) @@ -960,7 +966,9 @@ async def _handle_timeouts(self) -> None: return await self._update_states(changes) - async def bump_presence_active_time(self, user: UserID) -> None: + async def bump_presence_active_time( + self, user: UserID, device_id: Optional[str] + ) -> None: """We've seen the user do something that indicates they're interacting with the app. """ diff --git a/synapse/replication/http/presence.py b/synapse/replication/http/presence.py index 72d2f4c7a557..6c9e79fb07c9 100644 --- a/synapse/replication/http/presence.py +++ b/synapse/replication/http/presence.py @@ -51,14 +51,14 @@ def __init__(self, hs: "HomeServer"): self._presence_handler = hs.get_presence_handler() @staticmethod - async def _serialize_payload(user_id: str) -> JsonDict: # type: ignore[override] - return {} + async def _serialize_payload(user_id: str, device_id: Optional[str]) -> JsonDict: # type: ignore[override] + return {"device_id": device_id} async def _handle_request( # type: ignore[override] self, request: Request, content: JsonDict, user_id: str ) -> Tuple[int, JsonDict]: await self._presence_handler.bump_presence_active_time( - UserID.from_string(user_id) + UserID.from_string(user_id), content.get("device_id") ) return (200, {}) diff --git a/synapse/rest/client/read_marker.py b/synapse/rest/client/read_marker.py index 4f96e51eeb93..1707e519723a 100644 --- a/synapse/rest/client/read_marker.py +++ b/synapse/rest/client/read_marker.py @@ -52,7 +52,9 @@ async def on_POST( ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) - await self.presence_handler.bump_presence_active_time(requester.user) + await self.presence_handler.bump_presence_active_time( + requester.user, requester.device_id + ) body = parse_json_object_from_request(request) diff --git a/synapse/rest/client/receipts.py b/synapse/rest/client/receipts.py index 316e7b99821e..869a37445950 100644 --- a/synapse/rest/client/receipts.py +++ b/synapse/rest/client/receipts.py @@ -94,7 +94,9 @@ async def on_POST( Codes.INVALID_PARAM, ) - await self.presence_handler.bump_presence_active_time(requester.user) + await self.presence_handler.bump_presence_active_time( + requester.user, requester.device_id + ) if receipt_type == ReceiptTypes.FULLY_READ: await self.read_marker_handler.received_client_read_marker( diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index dc498001e450..553938ce9d13 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -1229,7 +1229,9 @@ async def on_PUT( content = parse_json_object_from_request(request) - await self.presence_handler.bump_presence_active_time(requester.user) + await self.presence_handler.bump_presence_active_time( + requester.user, requester.device_id + ) # Limit timeout to stop people from setting silly typing timeouts. timeout = min(content.get("timeout", 30000), 120000) From 6df911e0fe516f3be96836fa3fde157db6f188c5 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 23 Aug 2023 11:11:44 -0400 Subject: [PATCH 4/4] Newsfragment --- changelog.d/16171.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/16171.misc diff --git a/changelog.d/16171.misc b/changelog.d/16171.misc new file mode 100644 index 000000000000..4d709cb56e19 --- /dev/null +++ b/changelog.d/16171.misc @@ -0,0 +1 @@ +Track per-device information in the presence code.