Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Pass the device ID around in the presence handler (#16171)
Browse files Browse the repository at this point in the history
Refactoring to pass the device ID (in addition to the user ID) through
the presence handler (specifically the `user_syncing`, `set_state`,
and `bump_presence_active_time` methods and their replication
versions).
  • Loading branch information
clokep authored and hughns committed Sep 4, 2023
1 parent 50a3b04 commit 4c6f62c
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 30 deletions.
1 change: 1 addition & 0 deletions changelog.d/16171.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Track per-device information in the presence code.
1 change: 1 addition & 0 deletions synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ async def get_stream(

context = await presence_handler.user_syncing(
requester.user.to_string(),
requester.device_id,
affect_presence=affect_presence,
presence_state=PresenceState.ONLINE,
)
Expand Down
9 changes: 6 additions & 3 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -1921,7 +1921,10 @@ async def persist_and_notify_client_events(
# We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while.
run_as_background_process(
"bump_presence_active_time", self._bump_active_time, requester.user
"bump_presence_active_time",
self._bump_active_time,
requester.user,
requester.device_id,
)

async def _notify() -> None:
Expand Down Expand Up @@ -1958,10 +1961,10 @@ async def _maybe_kick_guest_users(
logger.info("maybe_kick_guest_users %r", current_state)
await self.hs.get_room_member_handler().kick_guest_users(current_state)

async def _bump_active_time(self, user: UserID) -> None:
async def _bump_active_time(self, user: UserID, device_id: Optional[str]) -> None:
try:
presence = self.hs.get_presence_handler()
await presence.bump_presence_active_time(user)
await presence.bump_presence_active_time(user, device_id)
except Exception:
logger.exception("Error bumping presence active time")

Expand Down
46 changes: 38 additions & 8 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,11 @@ def __init__(self, hs: "HomeServer"):

@abc.abstractmethod
async def user_syncing(
self, user_id: str, affect_presence: bool, presence_state: str
self,
user_id: str,
device_id: Optional[str],
affect_presence: bool,
presence_state: str,
) -> ContextManager[None]:
"""Returns a context manager that should surround any stream requests
from the user.
Expand All @@ -176,6 +180,7 @@ async def user_syncing(
Args:
user_id: the user that is starting a sync
device_id: the user's device that is starting a sync
affect_presence: If false this function will be a no-op.
Useful for streams that are not associated with an actual
client that is being used by a user.
Expand Down Expand Up @@ -252,6 +257,7 @@ async def current_state_for_user(self, user_id: str) -> UserPresenceState:
async def set_state(
self,
target_user: UserID,
device_id: Optional[str],
state: JsonDict,
force_notify: bool = False,
is_sync: bool = False,
Expand All @@ -260,6 +266,7 @@ async def set_state(
Args:
target_user: The ID of the user to set the presence state of.
device_id: the device that the user is setting the presence state of.
state: The presence state as a JSON dictionary.
force_notify: Whether to force notification of the update to clients.
is_sync: True if this update was from a sync, which results in
Expand All @@ -269,7 +276,9 @@ async def set_state(
"""

@abc.abstractmethod
async def bump_presence_active_time(self, user: UserID) -> None:
async def bump_presence_active_time(
self, user: UserID, device_id: Optional[str]
) -> None:
"""We've seen the user do something that indicates they're interacting
with the app.
"""
Expand Down Expand Up @@ -381,7 +390,9 @@ async def send_full_presence_to_users(self, user_ids: StrCollection) -> None:
# We set force_notify=True here so that this presence update is guaranteed to
# increment the presence stream ID (which resending the current user's presence
# otherwise would not do).
await self.set_state(UserID.from_string(user_id), state, force_notify=True)
await self.set_state(
UserID.from_string(user_id), None, state, force_notify=True
)

async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool:
raise NotImplementedError(
Expand Down Expand Up @@ -481,7 +492,11 @@ def send_stop_syncing(self) -> None:
self.send_user_sync(user_id, False, last_sync_ms)

async def user_syncing(
self, user_id: str, affect_presence: bool, presence_state: str
self,
user_id: str,
device_id: Optional[str],
affect_presence: bool,
presence_state: str,
) -> ContextManager[None]:
"""Record that a user is syncing.
Expand All @@ -495,6 +510,7 @@ async def user_syncing(
# what the spec wants.
await self.set_state(
UserID.from_string(user_id),
device_id,
state={"presence": presence_state},
is_sync=True,
)
Expand Down Expand Up @@ -592,6 +608,7 @@ def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
async def set_state(
self,
target_user: UserID,
device_id: Optional[str],
state: JsonDict,
force_notify: bool = False,
is_sync: bool = False,
Expand All @@ -600,6 +617,7 @@ async def set_state(
Args:
target_user: The ID of the user to set the presence state of.
device_id: the device that the user is setting the presence state of.
state: The presence state as a JSON dictionary.
force_notify: Whether to force notification of the update to clients.
is_sync: True if this update was from a sync, which results in
Expand All @@ -622,12 +640,15 @@ async def set_state(
await self._set_state_client(
instance_name=self._presence_writer_instance,
user_id=user_id,
device_id=device_id,
state=state,
force_notify=force_notify,
is_sync=is_sync,
)

async def bump_presence_active_time(self, user: UserID) -> None:
async def bump_presence_active_time(
self, user: UserID, device_id: Optional[str]
) -> None:
"""We've seen the user do something that indicates they're interacting
with the app.
"""
Expand All @@ -638,7 +659,9 @@ async def bump_presence_active_time(self, user: UserID) -> None:
# Proxy request to instance that writes presence
user_id = user.to_string()
await self._bump_active_client(
instance_name=self._presence_writer_instance, user_id=user_id
instance_name=self._presence_writer_instance,
user_id=user_id,
device_id=device_id,
)


Expand Down Expand Up @@ -943,7 +966,9 @@ async def _handle_timeouts(self) -> None:

return await self._update_states(changes)

async def bump_presence_active_time(self, user: UserID) -> None:
async def bump_presence_active_time(
self, user: UserID, device_id: Optional[str]
) -> None:
"""We've seen the user do something that indicates they're interacting
with the app.
"""
Expand All @@ -966,6 +991,7 @@ async def bump_presence_active_time(self, user: UserID) -> None:
async def user_syncing(
self,
user_id: str,
device_id: Optional[str],
affect_presence: bool = True,
presence_state: str = PresenceState.ONLINE,
) -> ContextManager[None]:
Expand All @@ -977,7 +1003,8 @@ async def user_syncing(
when users disconnect/reconnect.
Args:
user_id
user_id: the user that is starting a sync
device_id: the user's device that is starting a sync
affect_presence: If false this function will be a no-op.
Useful for streams that are not associated with an actual
client that is being used by a user.
Expand All @@ -993,6 +1020,7 @@ async def user_syncing(
# what the spec wants.
await self.set_state(
UserID.from_string(user_id),
device_id,
state={"presence": presence_state},
is_sync=True,
)
Expand Down Expand Up @@ -1163,6 +1191,7 @@ async def incoming_presence(self, origin: str, content: JsonDict) -> None:
async def set_state(
self,
target_user: UserID,
device_id: Optional[str],
state: JsonDict,
force_notify: bool = False,
is_sync: bool = False,
Expand All @@ -1171,6 +1200,7 @@ async def set_state(
Args:
target_user: The ID of the user to set the presence state of.
device_id: the device that the user is setting the presence state of.
state: The presence state as a JSON dictionary.
force_notify: Whether to force notification of the update to clients.
is_sync: True if this update was from a sync, which results in
Expand Down
11 changes: 7 additions & 4 deletions synapse/replication/http/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import logging
from typing import TYPE_CHECKING, Tuple
from typing import TYPE_CHECKING, Optional, Tuple

from twisted.web.server import Request

Expand Down Expand Up @@ -51,14 +51,14 @@ def __init__(self, hs: "HomeServer"):
self._presence_handler = hs.get_presence_handler()

@staticmethod
async def _serialize_payload(user_id: str) -> JsonDict: # type: ignore[override]
return {}
async def _serialize_payload(user_id: str, device_id: Optional[str]) -> JsonDict: # type: ignore[override]
return {"device_id": device_id}

async def _handle_request( # type: ignore[override]
self, request: Request, content: JsonDict, user_id: str
) -> Tuple[int, JsonDict]:
await self._presence_handler.bump_presence_active_time(
UserID.from_string(user_id)
UserID.from_string(user_id), content.get("device_id")
)

return (200, {})
Expand Down Expand Up @@ -95,11 +95,13 @@ def __init__(self, hs: "HomeServer"):
@staticmethod
async def _serialize_payload( # type: ignore[override]
user_id: str,
device_id: Optional[str],
state: JsonDict,
force_notify: bool = False,
is_sync: bool = False,
) -> JsonDict:
return {
"device_id": device_id,
"state": state,
"force_notify": force_notify,
"is_sync": is_sync,
Expand All @@ -110,6 +112,7 @@ async def _handle_request( # type: ignore[override]
) -> Tuple[int, JsonDict]:
await self._presence_handler.set_state(
UserID.from_string(user_id),
content.get("device_id"),
content["state"],
content["force_notify"],
content.get("is_sync", False),
Expand Down
2 changes: 1 addition & 1 deletion synapse/rest/client/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async def on_PUT(
raise SynapseError(400, "Unable to parse state")

if self._use_presence:
await self.presence_handler.set_state(user, state)
await self.presence_handler.set_state(user, requester.device_id, state)

return 200, {}

Expand Down
4 changes: 3 additions & 1 deletion synapse/rest/client/read_marker.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ async def on_POST(
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)

await self.presence_handler.bump_presence_active_time(requester.user)
await self.presence_handler.bump_presence_active_time(
requester.user, requester.device_id
)

body = parse_json_object_from_request(request)

Expand Down
4 changes: 3 additions & 1 deletion synapse/rest/client/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ async def on_POST(
Codes.INVALID_PARAM,
)

await self.presence_handler.bump_presence_active_time(requester.user)
await self.presence_handler.bump_presence_active_time(
requester.user, requester.device_id
)

if receipt_type == ReceiptTypes.FULLY_READ:
await self.read_marker_handler.received_client_read_marker(
Expand Down
4 changes: 3 additions & 1 deletion synapse/rest/client/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,9 @@ async def on_PUT(

content = parse_json_object_from_request(request)

await self.presence_handler.bump_presence_active_time(requester.user)
await self.presence_handler.bump_presence_active_time(
requester.user, requester.device_id
)

# Limit timeout to stop people from setting silly typing timeouts.
timeout = min(content.get("timeout", 30000), 120000)
Expand Down
1 change: 1 addition & 0 deletions synapse/rest/client/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:

context = await self.presence_handler.user_syncing(
user.to_string(),
requester.device_id,
affect_presence=affect_presence,
presence_state=set_presence,
)
Expand Down
Loading

0 comments on commit 4c6f62c

Please sign in to comment.