Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Pass the device ID around in the presence handler #16171

Merged
merged 4 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16171.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Track per-device information in the presence code.
1 change: 1 addition & 0 deletions synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ async def get_stream(

context = await presence_handler.user_syncing(
requester.user.to_string(),
requester.device_id,
affect_presence=affect_presence,
presence_state=PresenceState.ONLINE,
)
Expand Down
9 changes: 6 additions & 3 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -1921,7 +1921,10 @@ async def persist_and_notify_client_events(
# We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while.
run_as_background_process(
"bump_presence_active_time", self._bump_active_time, requester.user
"bump_presence_active_time",
self._bump_active_time,
requester.user,
requester.device_id,
)

async def _notify() -> None:
Expand Down Expand Up @@ -1958,10 +1961,10 @@ async def _maybe_kick_guest_users(
logger.info("maybe_kick_guest_users %r", current_state)
await self.hs.get_room_member_handler().kick_guest_users(current_state)

async def _bump_active_time(self, user: UserID) -> None:
async def _bump_active_time(self, user: UserID, device_id: Optional[str]) -> None:
try:
presence = self.hs.get_presence_handler()
await presence.bump_presence_active_time(user)
await presence.bump_presence_active_time(user, device_id)
except Exception:
logger.exception("Error bumping presence active time")

Expand Down
46 changes: 38 additions & 8 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,11 @@ def __init__(self, hs: "HomeServer"):

@abc.abstractmethod
async def user_syncing(
self, user_id: str, affect_presence: bool, presence_state: str
self,
user_id: str,
device_id: Optional[str],
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a previous conversation about why this can be None. The tl;dr is that some types of requesters can have a None here (guests, admin tokens, appservices). We also use this in #16066 when restoring from a restart.

affect_presence: bool,
presence_state: str,
) -> ContextManager[None]:
"""Returns a context manager that should surround any stream requests
from the user.
Expand All @@ -176,6 +180,7 @@ async def user_syncing(

Args:
user_id: the user that is starting a sync
device_id: the user's device that is starting a sync
affect_presence: If false this function will be a no-op.
Useful for streams that are not associated with an actual
client that is being used by a user.
Expand Down Expand Up @@ -252,6 +257,7 @@ async def current_state_for_user(self, user_id: str) -> UserPresenceState:
async def set_state(
self,
target_user: UserID,
device_id: Optional[str],
state: JsonDict,
force_notify: bool = False,
is_sync: bool = False,
Expand All @@ -260,6 +266,7 @@ async def set_state(

Args:
target_user: The ID of the user to set the presence state of.
device_id: the device that the user is setting the presence state of.
state: The presence state as a JSON dictionary.
force_notify: Whether to force notification of the update to clients.
is_sync: True if this update was from a sync, which results in
Expand All @@ -269,7 +276,9 @@ async def set_state(
"""

@abc.abstractmethod
async def bump_presence_active_time(self, user: UserID) -> None:
async def bump_presence_active_time(
self, user: UserID, device_id: Optional[str]
) -> None:
"""We've seen the user do something that indicates they're interacting
with the app.
"""
Expand Down Expand Up @@ -381,7 +390,9 @@ async def send_full_presence_to_users(self, user_ids: StrCollection) -> None:
# We set force_notify=True here so that this presence update is guaranteed to
# increment the presence stream ID (which resending the current user's presence
# otherwise would not do).
await self.set_state(UserID.from_string(user_id), state, force_notify=True)
await self.set_state(
UserID.from_string(user_id), None, state, force_notify=True
)

async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool:
raise NotImplementedError(
Expand Down Expand Up @@ -481,7 +492,11 @@ def send_stop_syncing(self) -> None:
self.send_user_sync(user_id, False, last_sync_ms)

async def user_syncing(
self, user_id: str, affect_presence: bool, presence_state: str
self,
user_id: str,
device_id: Optional[str],
affect_presence: bool,
presence_state: str,
) -> ContextManager[None]:
"""Record that a user is syncing.

Expand All @@ -495,6 +510,7 @@ async def user_syncing(
# what the spec wants.
await self.set_state(
UserID.from_string(user_id),
device_id,
state={"presence": presence_state},
is_sync=True,
)
Expand Down Expand Up @@ -592,6 +608,7 @@ def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
async def set_state(
self,
target_user: UserID,
device_id: Optional[str],
state: JsonDict,
force_notify: bool = False,
is_sync: bool = False,
Expand All @@ -600,6 +617,7 @@ async def set_state(

Args:
target_user: The ID of the user to set the presence state of.
device_id: the device that the user is setting the presence state of.
state: The presence state as a JSON dictionary.
force_notify: Whether to force notification of the update to clients.
is_sync: True if this update was from a sync, which results in
Expand All @@ -622,12 +640,15 @@ async def set_state(
await self._set_state_client(
instance_name=self._presence_writer_instance,
user_id=user_id,
device_id=device_id,
state=state,
force_notify=force_notify,
is_sync=is_sync,
)

async def bump_presence_active_time(self, user: UserID) -> None:
async def bump_presence_active_time(
self, user: UserID, device_id: Optional[str]
) -> None:
"""We've seen the user do something that indicates they're interacting
with the app.
"""
Expand All @@ -638,7 +659,9 @@ async def bump_presence_active_time(self, user: UserID) -> None:
# Proxy request to instance that writes presence
user_id = user.to_string()
await self._bump_active_client(
instance_name=self._presence_writer_instance, user_id=user_id
instance_name=self._presence_writer_instance,
user_id=user_id,
device_id=device_id,
)


Expand Down Expand Up @@ -943,7 +966,9 @@ async def _handle_timeouts(self) -> None:

return await self._update_states(changes)

async def bump_presence_active_time(self, user: UserID) -> None:
async def bump_presence_active_time(
self, user: UserID, device_id: Optional[str]
) -> None:
"""We've seen the user do something that indicates they're interacting
with the app.
"""
Expand All @@ -966,6 +991,7 @@ async def bump_presence_active_time(self, user: UserID) -> None:
async def user_syncing(
self,
user_id: str,
device_id: Optional[str],
affect_presence: bool = True,
presence_state: str = PresenceState.ONLINE,
) -> ContextManager[None]:
Expand All @@ -977,7 +1003,8 @@ async def user_syncing(
when users disconnect/reconnect.

Args:
user_id
user_id: the user that is starting a sync
device_id: the user's device that is starting a sync
affect_presence: If false this function will be a no-op.
Useful for streams that are not associated with an actual
client that is being used by a user.
Expand All @@ -993,6 +1020,7 @@ async def user_syncing(
# what the spec wants.
await self.set_state(
UserID.from_string(user_id),
device_id,
state={"presence": presence_state},
is_sync=True,
)
Expand Down Expand Up @@ -1163,6 +1191,7 @@ async def incoming_presence(self, origin: str, content: JsonDict) -> None:
async def set_state(
self,
target_user: UserID,
device_id: Optional[str],
state: JsonDict,
force_notify: bool = False,
is_sync: bool = False,
Expand All @@ -1171,6 +1200,7 @@ async def set_state(

Args:
target_user: The ID of the user to set the presence state of.
device_id: the device that the user is setting the presence state of.
state: The presence state as a JSON dictionary.
force_notify: Whether to force notification of the update to clients.
is_sync: True if this update was from a sync, which results in
Expand Down
11 changes: 7 additions & 4 deletions synapse/replication/http/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import logging
from typing import TYPE_CHECKING, Tuple
from typing import TYPE_CHECKING, Optional, Tuple

from twisted.web.server import Request

Expand Down Expand Up @@ -51,14 +51,14 @@ def __init__(self, hs: "HomeServer"):
self._presence_handler = hs.get_presence_handler()

@staticmethod
async def _serialize_payload(user_id: str) -> JsonDict: # type: ignore[override]
return {}
async def _serialize_payload(user_id: str, device_id: Optional[str]) -> JsonDict: # type: ignore[override]
return {"device_id": device_id}

async def _handle_request( # type: ignore[override]
self, request: Request, content: JsonDict, user_id: str
) -> Tuple[int, JsonDict]:
await self._presence_handler.bump_presence_active_time(
UserID.from_string(user_id)
UserID.from_string(user_id), content.get("device_id")
)

return (200, {})
Expand Down Expand Up @@ -95,11 +95,13 @@ def __init__(self, hs: "HomeServer"):
@staticmethod
async def _serialize_payload( # type: ignore[override]
user_id: str,
device_id: Optional[str],
state: JsonDict,
force_notify: bool = False,
is_sync: bool = False,
) -> JsonDict:
return {
"device_id": device_id,
"state": state,
"force_notify": force_notify,
"is_sync": is_sync,
Expand All @@ -110,6 +112,7 @@ async def _handle_request( # type: ignore[override]
) -> Tuple[int, JsonDict]:
await self._presence_handler.set_state(
UserID.from_string(user_id),
content.get("device_id"),
content["state"],
content["force_notify"],
content.get("is_sync", False),
Expand Down
2 changes: 1 addition & 1 deletion synapse/rest/client/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async def on_PUT(
raise SynapseError(400, "Unable to parse state")

if self._use_presence:
await self.presence_handler.set_state(user, state)
await self.presence_handler.set_state(user, requester.device_id, state)

return 200, {}

Expand Down
4 changes: 3 additions & 1 deletion synapse/rest/client/read_marker.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ async def on_POST(
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)

await self.presence_handler.bump_presence_active_time(requester.user)
await self.presence_handler.bump_presence_active_time(
requester.user, requester.device_id
)

body = parse_json_object_from_request(request)

Expand Down
4 changes: 3 additions & 1 deletion synapse/rest/client/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ async def on_POST(
Codes.INVALID_PARAM,
)

await self.presence_handler.bump_presence_active_time(requester.user)
await self.presence_handler.bump_presence_active_time(
requester.user, requester.device_id
)

if receipt_type == ReceiptTypes.FULLY_READ:
await self.read_marker_handler.received_client_read_marker(
Expand Down
4 changes: 3 additions & 1 deletion synapse/rest/client/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,9 @@ async def on_PUT(

content = parse_json_object_from_request(request)

await self.presence_handler.bump_presence_active_time(requester.user)
await self.presence_handler.bump_presence_active_time(
requester.user, requester.device_id
)

# Limit timeout to stop people from setting silly typing timeouts.
timeout = min(content.get("timeout", 30000), 120000)
Expand Down
1 change: 1 addition & 0 deletions synapse/rest/client/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:

context = await self.presence_handler.user_syncing(
user.to_string(),
requester.device_id,
affect_presence=affect_presence,
presence_state=set_presence,
)
Expand Down
Loading