Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Pass the requester during event serialization #15174

Merged
merged 9 commits into from
Mar 6, 2023
1 change: 1 addition & 0 deletions changelog.d/15174.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Pass the requester when serializing events.
clokep marked this conversation as resolved.
Show resolved Hide resolved
32 changes: 24 additions & 8 deletions synapse/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
)
from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import RoomVersion
from synapse.types import JsonDict
from synapse.types import JsonDict, Requester
from synapse.util.frozenutils import unfreeze

from . import EventBase
Expand Down Expand Up @@ -317,8 +317,9 @@ class SerializeEventConfig:
as_client_event: bool = True
# Function to convert from federation format to client format
event_format: Callable[[JsonDict], JsonDict] = format_event_for_client_v1
# ID of the user's auth token - used for namespacing of transaction IDs
token_id: Optional[int] = None
# The entity that requested the event. This is used to determine whether to include
# the transaction_id in the unsigned section of the event.
requester: Optional[Requester] = None
# List of event fields to include. If empty, all fields will be returned.
only_event_fields: Optional[List[str]] = None
# Some events can have stripped room state stored in the `unsigned` field.
Expand Down Expand Up @@ -368,11 +369,26 @@ def serialize_event(
e.unsigned["redacted_because"], time_now_ms, config=config
)

if config.token_id is not None:
if config.token_id == getattr(e.internal_metadata, "token_id", None):
txn_id = getattr(e.internal_metadata, "txn_id", None)
if txn_id is not None:
d["unsigned"]["transaction_id"] = txn_id
# If we have a txn_id saved in the internal_metadata, we should include it in the
# unsigned section of the event if it was sent by the same session as the one
# requesting the event.
# There is a special case for guests, because they only have one access token
# without associated access_token_id, so we always include the txn_id for events
# they sent.
txn_id = getattr(e.internal_metadata, "txn_id", None)
sandhose marked this conversation as resolved.
Show resolved Hide resolved
if txn_id is not None and config.requester is not None:
sandhose marked this conversation as resolved.
Show resolved Hide resolved
requester: Requester = config.requester
clokep marked this conversation as resolved.
Show resolved Hide resolved
event_token_id = getattr(e.internal_metadata, "token_id", None)
sandhose marked this conversation as resolved.
Show resolved Hide resolved
sandhose marked this conversation as resolved.
Show resolved Hide resolved
event_user_id = getattr(e, "user_id", None)
clokep marked this conversation as resolved.
Show resolved Hide resolved
if requester.user.to_string() == event_user_id and (
(
event_token_id is not None
and requester.access_token_id is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is the access_token_id on a requester None? Is this only for guests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly, yes, as well as some other places where we're manually creating a requester for some operations

and event_token_id == requester.access_token_id
)
or config.requester.is_guest
):
d["unsigned"]["transaction_id"] = txn_id

# invite_room_state and knock_room_state are a list of stripped room state events
# that are meant to provide metadata about a room to an invitee/knocker. They are
Expand Down
20 changes: 10 additions & 10 deletions synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from synapse.handlers.presence import format_user_presence_state
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, UserID
from synapse.types import JsonDict, Requester, UserID
from synapse.visibility import filter_events_for_client

if TYPE_CHECKING:
Expand All @@ -46,13 +46,12 @@ def __init__(self, hs: "HomeServer"):

async def get_stream(
self,
auth_user_id: str,
requester: Requester,
pagin_config: PaginationConfig,
timeout: int = 0,
as_client_event: bool = True,
affect_presence: bool = True,
room_id: Optional[str] = None,
is_guest: bool = False,
) -> JsonDict:
"""Fetches the events stream for a given user."""

Expand All @@ -62,13 +61,12 @@ async def get_stream(
raise SynapseError(403, "This room has been blocked on this server")

# send any outstanding server notices to the user.
await self._server_notices_sender.on_user_syncing(auth_user_id)
await self._server_notices_sender.on_user_syncing(requester.user.to_string())

auth_user = UserID.from_string(auth_user_id)
presence_handler = self.hs.get_presence_handler()

context = await presence_handler.user_syncing(
auth_user_id,
requester.user.to_string(),
affect_presence=affect_presence,
presence_state=PresenceState.ONLINE,
)
Expand All @@ -82,10 +80,10 @@ async def get_stream(
timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1))

stream_result = await self.notifier.get_events_for(
auth_user,
requester.user,
pagin_config,
timeout,
is_guest=is_guest,
is_guest=requester.is_guest,
explicit_room_id=room_id,
)
events = stream_result.events
Expand All @@ -102,7 +100,7 @@ async def get_stream(
if event.membership != Membership.JOIN:
continue
# Send down presence.
if event.state_key == auth_user_id:
if event.state_key == requester.user.to_string():
# Send down presence for everyone in the room.
users: Iterable[str] = await self.store.get_users_in_room(
event.room_id
Expand All @@ -124,7 +122,9 @@ async def get_stream(
chunks = self._event_serializer.serialize_events(
events,
time_now,
config=SerializeEventConfig(as_client_event=as_client_event),
config=SerializeEventConfig(
as_client_event=as_client_event, requester=requester
),
)

chunk = {
Expand Down
56 changes: 39 additions & 17 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,29 +318,34 @@ async def room_initial_sync(
)
is_peeking = member_event_id is None

user_id = requester.user.to_string()

if membership == Membership.JOIN:
result = await self._room_initial_sync_joined(
user_id, room_id, pagin_config, membership, is_peeking
requester, room_id, pagin_config, membership, is_peeking
)
elif membership == Membership.LEAVE:
# The member_event_id will always be available if membership is set
# to leave.
assert member_event_id

result = await self._room_initial_sync_parted(
user_id, room_id, pagin_config, membership, member_event_id, is_peeking
requester,
room_id,
pagin_config,
membership,
member_event_id,
is_peeking,
)

account_data_events = []
tags = await self.store.get_tags_for_room(user_id, room_id)
tags = await self.store.get_tags_for_room(requester.user.to_string(), room_id)
clokep marked this conversation as resolved.
Show resolved Hide resolved
if tags:
account_data_events.append(
{"type": AccountDataTypes.TAG, "content": {"tags": tags}}
)

account_data = await self.store.get_account_data_for_room(user_id, room_id)
account_data = await self.store.get_account_data_for_room(
requester.user.to_string(), room_id
)
for account_data_type, content in account_data.items():
account_data_events.append({"type": account_data_type, "content": content})

Expand All @@ -350,7 +355,7 @@ async def room_initial_sync(

async def _room_initial_sync_parted(
self,
user_id: str,
requester: Requester,
room_id: str,
pagin_config: PaginationConfig,
membership: str,
Expand All @@ -369,36 +374,44 @@ async def _room_initial_sync_parted(
)

messages = await filter_events_for_client(
self._storage_controllers, user_id, messages, is_peeking=is_peeking
self._storage_controllers,
requester.user.to_string(),
messages,
is_peeking=is_peeking,
)

start_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, token)
end_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, stream_token)

time_now = self.clock.time_msec()
serialize_options = SerializeEventConfig(requester=requester)

return {
"membership": membership,
"room_id": room_id,
"messages": {
"chunk": (
# Don't bundle aggregations as this is a deprecated API.
self._event_serializer.serialize_events(messages, time_now)
self._event_serializer.serialize_events(
messages, time_now, config=serialize_options
)
),
"start": await start_token.to_string(self.store),
"end": await end_token.to_string(self.store),
},
"state": (
# Don't bundle aggregations as this is a deprecated API.
self._event_serializer.serialize_events(room_state.values(), time_now)
self._event_serializer.serialize_events(
room_state.values(), time_now, config=serialize_options
)
),
"presence": [],
"receipts": [],
}

async def _room_initial_sync_joined(
self,
user_id: str,
requester: Requester,
room_id: str,
pagin_config: PaginationConfig,
membership: str,
Expand All @@ -410,9 +423,12 @@ async def _room_initial_sync_joined(

# TODO: These concurrently
time_now = self.clock.time_msec()
serialize_options = SerializeEventConfig(requester=requester)
# Don't bundle aggregations as this is a deprecated API.
state = self._event_serializer.serialize_events(
current_state.values(), time_now
current_state.values(),
time_now,
config=serialize_options,
)

now_token = self.hs.get_event_sources().get_current_token()
Expand Down Expand Up @@ -450,7 +466,10 @@ async def get_receipts() -> List[JsonDict]:
if not receipts:
return []

return ReceiptEventSource.filter_out_private_receipts(receipts, user_id)
return ReceiptEventSource.filter_out_private_receipts(
receipts,
requester.user.to_string(),
)

presence, receipts, (messages, token) = await make_deferred_yieldable(
gather_results(
Expand All @@ -469,20 +488,23 @@ async def get_receipts() -> List[JsonDict]:
)

messages = await filter_events_for_client(
self._storage_controllers, user_id, messages, is_peeking=is_peeking
self._storage_controllers,
requester.user.to_string(),
messages,
is_peeking=is_peeking,
)

start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token)
end_token = now_token

time_now = self.clock.time_msec()

ret = {
"room_id": room_id,
"messages": {
"chunk": (
# Don't bundle aggregations as this is a deprecated API.
self._event_serializer.serialize_events(messages, time_now)
self._event_serializer.serialize_events(
messages, time_now, config=serialize_options
)
),
"start": await start_token.to_string(self.store),
"end": await end_token.to_string(self.store),
Expand Down
7 changes: 5 additions & 2 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from synapse.events import EventBase, relation_from_event
from synapse.events.builder import EventBuilder
from synapse.events.snapshot import EventContext, UnpersistedEventContextBase
from synapse.events.utils import maybe_upsert_event_field
from synapse.events.utils import SerializeEventConfig, maybe_upsert_event_field
from synapse.events.validator import EventValidator
from synapse.handlers.directory import DirectoryHandler
from synapse.logging import opentracing
Expand Down Expand Up @@ -246,7 +246,10 @@ async def get_state_events(
room_state = room_state_events[membership_event_id]

now = self.clock.time_msec()
events = self._event_serializer.serialize_events(room_state.values(), now)
serialize_options = SerializeEventConfig(requester=requester)
events = self._event_serializer.serialize_events(
room_state.values(), now, config=serialize_options
)
clokep marked this conversation as resolved.
Show resolved Hide resolved
return events

async def _user_can_see_state_at_event(
Expand Down
4 changes: 3 additions & 1 deletion synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,9 @@ async def get_messages(

time_now = self.clock.time_msec()

serialize_options = SerializeEventConfig(as_client_event=as_client_event)
serialize_options = SerializeEventConfig(
as_client_event=as_client_event, requester=requester
)

chunk = {
"chunk": (
Expand Down
12 changes: 10 additions & 2 deletions synapse/handlers/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from synapse.api.constants import Direction, EventTypes, RelationTypes
from synapse.api.errors import SynapseError
from synapse.events import EventBase, relation_from_event
from synapse.events.utils import SerializeEventConfig
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import trace
from synapse.storage.databases.main.relations import ThreadsNextBatch, _RelatedEvent
Expand Down Expand Up @@ -152,16 +153,23 @@ async def get_relations(
)

now = self._clock.time_msec()
serialize_options = SerializeEventConfig(requester=requester)
return_value: JsonDict = {
"chunk": self._event_serializer.serialize_events(
events, now, bundle_aggregations=aggregations
events,
now,
bundle_aggregations=aggregations,
config=serialize_options,
),
}
if include_original_event:
# Do not bundle aggregations when retrieving the original event because
# we want the content before relations are applied to it.
return_value["original_event"] = self._event_serializer.serialize_event(
event, now, bundle_aggregations=None
event,
now,
bundle_aggregations=None,
config=serialize_options,
)

if next_token:
Expand Down
Loading