Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Include bundled aggregations in /sync and related fixes #11478

Merged
merged 14 commits into from
Dec 6, 2021
Merged
1 change: 1 addition & 0 deletions changelog.d/11478.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Include bundled relations during a limited `/sync` request, per [MSC2675](https://github.com/matrix-org/matrix-doc/pull/2675).
clokep marked this conversation as resolved.
Show resolved Hide resolved
46 changes: 26 additions & 20 deletions synapse/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,15 +393,15 @@ async def serialize_event(
self,
event: Union[JsonDict, EventBase],
time_now: int,
bundle_relations: bool = True,
bundle_aggregations: bool = True,
**kwargs: Any,
) -> JsonDict:
"""Serializes a single event.

Args:
event: The event being serialized.
time_now: The current time in milliseconds
bundle_relations: Whether to include the bundled relations for this
bundle_aggregations: Whether to include the bundled aggregations for this
event.
**kwargs: Arguments to pass to `serialize_event`

Expand All @@ -416,26 +416,30 @@ async def serialize_event(

# If MSC1849 is enabled then we need to look if there are any relations
# we need to bundle in with the event.
clokep marked this conversation as resolved.
Show resolved Hide resolved
# Do not bundle relations if the event has been redacted
if not event.internal_metadata.is_redacted() and (
self._msc1849_enabled and bundle_relations
# Do not bundle aggregations if the event has been redacted or if the event
# is a state event.
if (
self._msc1849_enabled
and bundle_aggregations
and not event.is_state()
and not event.internal_metadata.is_redacted()
):
await self._injected_bundled_relations(event, time_now, serialized_event)
await self._injected_bundled_aggregations(event, time_now, serialized_event)

return serialized_event

async def _injected_bundled_relations(
async def _injected_bundled_aggregations(
self, event: EventBase, time_now: int, serialized_event: JsonDict
) -> None:
"""Potentially injects bundled relations into the unsigned portion of the serialized event.
"""Potentially injects bundled aggregations into the unsigned portion of the serialized event.

Args:
event: The event being serialized.
time_now: The current time in milliseconds
serialized_event: The serialized event which may be modified.

"""
# Do not bundle relations for an event which represents an edit or an
# Do not bundle aggregations for an event which represents an edit or an
# annotation. It does not make sense for them to have related events.
relates_to = event.content.get("m.relates_to")
if isinstance(relates_to, (dict, frozendict)):
Expand All @@ -445,18 +449,18 @@ async def _injected_bundled_relations(

event_id = event.event_id

# The bundled relations to include.
relations = {}
# The bundled aggregations to include.
aggregations = {}

annotations = await self.store.get_aggregation_groups_for_event(event_id)
if annotations.chunk:
relations[RelationTypes.ANNOTATION] = annotations.to_dict()
aggregations[RelationTypes.ANNOTATION] = annotations.to_dict()

references = await self.store.get_relations_for_event(
event_id, RelationTypes.REFERENCE, direction="f"
)
if references.chunk:
relations[RelationTypes.REFERENCE] = references.to_dict()
aggregations[RelationTypes.REFERENCE] = references.to_dict()

edit = None
if event.type == EventTypes.Message:
Expand All @@ -482,7 +486,7 @@ async def _injected_bundled_relations(
else:
serialized_event["content"].pop("m.relates_to", None)

relations[RelationTypes.REPLACE] = {
aggregations[RelationTypes.REPLACE] = {
"event_id": edit.event_id,
"origin_server_ts": edit.origin_server_ts,
"sender": edit.sender,
Expand All @@ -495,17 +499,19 @@ async def _injected_bundled_relations(
latest_thread_event,
) = await self.store.get_thread_summary(event_id)
if latest_thread_event:
relations[RelationTypes.THREAD] = {
# Don't bundle relations as this could recurse forever.
aggregations[RelationTypes.THREAD] = {
# Don't bundle aggregations as this could recurse forever.
"latest_event": await self.serialize_event(
latest_thread_event, time_now, bundle_relations=False
latest_thread_event, time_now, bundle_aggregations=False
),
"count": thread_count,
}

# If any bundled relations were found, include them.
if relations:
serialized_event["unsigned"].setdefault("m.relations", {}).update(relations)
# If any bundled aggregations were found, include them.
if aggregations:
serialized_event["unsigned"].setdefault("m.relations", {}).update(
aggregations
)

async def serialize_events(
self, events: Iterable[Union[JsonDict, EventBase]], time_now: int, **kwargs: Any
Expand Down
5 changes: 2 additions & 3 deletions synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,8 @@ async def get_stream(
events,
time_now,
as_client_event=as_client_event,
# We don't bundle "live" events, as otherwise clients
# will end up double counting annotations.
bundle_relations=False,
# Don't bundle aggregations as this is a deprecated API.
bundle_aggregations=False,
)

chunk = {
Expand Down
28 changes: 22 additions & 6 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,9 @@ async def handle_room(event: RoomsForUser) -> None:

invite_event = await self.store.get_event(event.event_id)
d["invite"] = await self._event_serializer.serialize_event(
invite_event, time_now, as_client_event
invite_event,
time_now,
as_client_event=as_client_event,
clokep marked this conversation as resolved.
Show resolved Hide resolved
)

rooms_ret.append(d)
Expand Down Expand Up @@ -216,7 +218,11 @@ async def handle_room(event: RoomsForUser) -> None:
d["messages"] = {
"chunk": (
await self._event_serializer.serialize_events(
messages, time_now=time_now, as_client_event=as_client_event
messages,
time_now=time_now,
# Don't bundle aggregations as this is a deprecated API.
bundle_aggregations=False,
as_client_event=as_client_event,
)
),
"start": await start_token.to_string(self.store),
Expand All @@ -226,6 +232,8 @@ async def handle_room(event: RoomsForUser) -> None:
d["state"] = await self._event_serializer.serialize_events(
current_state.values(),
time_now=time_now,
# No need to bundle aggregations for state events.
bundle_aggregations=False,
clokep marked this conversation as resolved.
Show resolved Hide resolved
as_client_event=as_client_event,
)

Expand Down Expand Up @@ -366,14 +374,18 @@ async def _room_initial_sync_parted(
"room_id": room_id,
"messages": {
"chunk": (
await self._event_serializer.serialize_events(messages, time_now)
# Don't bundle aggregations as this is a deprecated API.
await self._event_serializer.serialize_events(
messages, time_now, bundle_aggregations=False
)
),
"start": await start_token.to_string(self.store),
"end": await end_token.to_string(self.store),
},
"state": (
# No need to bundle aggregations for state events.
await self._event_serializer.serialize_events(
room_state.values(), time_now
room_state.values(), time_now, bundle_aggregations=False
)
),
"presence": [],
Expand All @@ -392,8 +404,9 @@ async def _room_initial_sync_joined(

# TODO: These concurrently
time_now = self.clock.time_msec()
# No need to bundle aggregations for state events.
state = await self._event_serializer.serialize_events(
current_state.values(), time_now
current_state.values(), time_now, bundle_aggregations=False
)

now_token = self.hs.get_event_sources().get_current_token()
Expand Down Expand Up @@ -467,7 +480,10 @@ async def get_receipts() -> List[JsonDict]:
"room_id": room_id,
"messages": {
"chunk": (
await self._event_serializer.serialize_events(messages, time_now)
# Don't bundle aggregations as this is a deprecated API.
await self._event_serializer.serialize_events(
messages, time_now, bundle_aggregations=False
)
),
"start": await start_token.to_string(self.store),
"end": await end_token.to_string(self.store),
Expand Down
5 changes: 2 additions & 3 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,8 @@ async def get_state_events(
events = await self._event_serializer.serialize_events(
room_state.values(),
now,
# We don't bother bundling aggregations in when asked for state
# events, as clients won't use them.
bundle_relations=False,
# No need to bundle aggregations for state events.
bundle_aggregations=False,
)
return events

Expand Down
6 changes: 5 additions & 1 deletion synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,11 @@ async def get_messages(

if state:
chunk["state"] = await self._event_serializer.serialize_events(
state, time_now, as_client_event=as_client_event
state,
time_now,
as_client_event=as_client_event,
# No need to bundle aggregations for state events.
bundle_aggregations=False,
)

return chunk
Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,9 @@ async def search(
if state_results:
s = {}
for room_id, state_events in state_results.items():
# No need to bundle aggregations for state events.
s[room_id] = await self._event_serializer.serialize_events(
state_events, time_now
state_events, time_now, bundle_aggregations=False
)

rooms_cat_res["state"] = s
Expand Down
9 changes: 4 additions & 5 deletions synapse/rest/admin/rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,9 +452,8 @@ async def on_GET(
room_state = await self._event_serializer.serialize_events(
events.values(),
now,
# We don't bother bundling aggregations in when asked for state
# events, as clients won't use them.
bundle_relations=False,
# No need to bundle aggregations for state events.
bundle_aggregations=False,
)
ret = {"state": room_state}

Expand Down Expand Up @@ -791,8 +790,8 @@ async def on_GET(
results["state"] = await self._event_serializer.serialize_events(
results["state"],
time_now,
# No need to bundle aggregations for state events
bundle_relations=False,
# No need to bundle aggregations for state events.
bundle_aggregations=False,
)

return HTTPStatus.OK, results
Expand Down
9 changes: 4 additions & 5 deletions synapse/rest/client/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,13 @@ async def on_GET(
)

now = self.clock.time_msec()
# We set bundle_relations to False when retrieving the original
# event because we want the content before relations were applied to
# it.
# Do not bundle aggregations when retrieving the original event because
# we want the content before relations are applied to it.
original_event = await self._event_serializer.serialize_event(
event, now, bundle_relations=False
event, now, bundle_aggregations=False
)
# The relations returned for the requested event do include their
# bundled relations.
# bundled aggregations.
serialized_events = await self._event_serializer.serialize_events(events, now)

return_value = pagination_chunk.to_dict()
Expand Down
4 changes: 2 additions & 2 deletions synapse/rest/client/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,8 +718,8 @@ async def on_GET(
results["state"] = await self._event_serializer.serialize_events(
results["state"],
time_now,
# No need to bundle aggregations for state events
bundle_relations=False,
# No need to bundle aggregations for state events.
bundle_aggregations=False,
)

return 200, results
Expand Down
17 changes: 11 additions & 6 deletions synapse/rest/client/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,13 +516,13 @@ async def encode_room(
The room, encoded in our response format
"""

def serialize(events: Iterable[EventBase]) -> Awaitable[List[JsonDict]]:
def serialize(
events: Iterable[EventBase], bundle_aggregations: bool
) -> Awaitable[List[JsonDict]]:
return self._event_serializer.serialize_events(
events,
time_now=time_now,
# We don't bundle "live" events, as otherwise clients
# will end up double counting annotations.
bundle_relations=False,
bundle_aggregations=bundle_aggregations,
token_id=token_id,
event_format=event_formatter,
only_event_fields=only_fields,
Expand All @@ -544,8 +544,13 @@ def serialize(events: Iterable[EventBase]) -> Awaitable[List[JsonDict]]:
event.room_id,
)

serialized_state = await serialize(state_events)
serialized_timeline = await serialize(timeline_events)
# No need to bundle aggregations for state events.
serialized_state = await serialize(state_events, bundle_aggregations=False)
# Only bundle aggregations if the room is limited, as clients could be
# missing events.
clokep marked this conversation as resolved.
Show resolved Hide resolved
serialized_timeline = await serialize(
timeline_events, bundle_aggregations=not room.timeline.limited
)

account_data = room.account_data

Expand Down