Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Allow client event serialization to be async #5183

Merged
merged 3 commits into from
May 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5183.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow client event serialization to be async.
44 changes: 44 additions & 0 deletions synapse/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

from frozendict import frozendict

from twisted.internet import defer

from synapse.api.constants import EventTypes
from synapse.util.async_helpers import yieldable_gather_results

from . import EventBase

Expand Down Expand Up @@ -311,3 +314,44 @@ def serialize_event(e, time_now_ms, as_client_event=True,
d = only_fields(d, only_event_fields)

return d


class EventClientSerializer(object):
"""Serializes events that are to be sent to clients.

This is used for bundling extra information with any events to be sent to
clients.
"""

def __init__(self, hs):
pass

def serialize_event(self, event, time_now, **kwargs):
"""Serializes a single event.

Args:
event (EventBase)
time_now (int): The current time in milliseconds
**kwargs: Arguments to pass to `serialize_event`

Returns:
Deferred[dict]: The serialized event
"""
event = serialize_event(event, time_now, **kwargs)
return defer.succeed(event)

def serialize_events(self, events, time_now, **kwargs):
"""Serializes multiple events.

Args:
event (iter[EventBase])
time_now (int): The current time in milliseconds
**kwargs: Arguments to pass to `serialize_event`

Returns:
Deferred[list[dict]]: The list of serialized events
"""
return yieldable_gather_results(
self.serialize_event, events,
time_now=time_now, **kwargs
)
8 changes: 4 additions & 4 deletions synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, SynapseError
from synapse.events import EventBase
from synapse.events.utils import serialize_event
from synapse.types import UserID
from synapse.util.logutils import log_function
from synapse.visibility import filter_events_for_client
Expand Down Expand Up @@ -50,6 +49,7 @@ def __init__(self, hs):
self.notifier = hs.get_notifier()
self.state = hs.get_state_handler()
self._server_notices_sender = hs.get_server_notices_sender()
self._event_serializer = hs.get_event_client_serializer()

@defer.inlineCallbacks
@log_function
Expand Down Expand Up @@ -120,9 +120,9 @@ def get_stream(self, auth_user_id, pagin_config, timeout=0,

time_now = self.clock.time_msec()

chunks = [
serialize_event(e, time_now, as_client_event) for e in events
]
chunks = yield self._event_serializer.serialize_events(
events, time_now, as_client_event=as_client_event,
)

chunk = {
"chunk": chunks,
Expand Down
44 changes: 27 additions & 17 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.handlers.presence import format_user_presence_state
from synapse.streams.config import PaginationConfig
Expand All @@ -43,6 +42,7 @@ def __init__(self, hs):
self.clock = hs.get_clock()
self.validator = EventValidator()
self.snapshot_cache = SnapshotCache()
self._event_serializer = hs.get_event_client_serializer()

def snapshot_all_rooms(self, user_id=None, pagin_config=None,
as_client_event=True, include_archived=False):
Expand Down Expand Up @@ -138,7 +138,9 @@ def handle_room(event):
d["inviter"] = event.sender

invite_event = yield self.store.get_event(event.event_id)
d["invite"] = serialize_event(invite_event, time_now, as_client_event)
d["invite"] = yield self._event_serializer.serialize_event(
invite_event, time_now, as_client_event,
)

rooms_ret.append(d)

Expand Down Expand Up @@ -185,18 +187,21 @@ def handle_room(event):
time_now = self.clock.time_msec()

d["messages"] = {
"chunk": [
serialize_event(m, time_now, as_client_event)
for m in messages
],
"chunk": (
yield self._event_serializer.serialize_events(
messages, time_now=time_now,
as_client_event=as_client_event,
)
),
"start": start_token.to_string(),
"end": end_token.to_string(),
}

d["state"] = [
serialize_event(c, time_now, as_client_event)
for c in current_state.values()
]
d["state"] = yield self._event_serializer.serialize_events(
current_state.values(),
time_now=time_now,
as_client_event=as_client_event
)

account_data_events = []
tags = tags_by_room.get(event.room_id)
Expand Down Expand Up @@ -337,11 +342,15 @@ def _room_initial_sync_parted(self, user_id, room_id, pagin_config,
"membership": membership,
"room_id": room_id,
"messages": {
"chunk": [serialize_event(m, time_now) for m in messages],
"chunk": (yield self._event_serializer.serialize_events(
messages, time_now,
)),
"start": start_token.to_string(),
"end": end_token.to_string(),
},
"state": [serialize_event(s, time_now) for s in room_state.values()],
"state": (yield self._event_serializer.serialize_events(
room_state.values(), time_now,
)),
"presence": [],
"receipts": [],
})
Expand All @@ -355,10 +364,9 @@ def _room_initial_sync_joined(self, user_id, room_id, pagin_config,

# TODO: These concurrently
time_now = self.clock.time_msec()
state = [
serialize_event(x, time_now)
for x in current_state.values()
]
state = yield self._event_serializer.serialize_events(
current_state.values(), time_now,
)

now_token = yield self.hs.get_event_sources().get_current_token()

Expand Down Expand Up @@ -425,7 +433,9 @@ def get_receipts():
ret = {
"room_id": room_id,
"messages": {
"chunk": [serialize_event(m, time_now) for m in messages],
"chunk": (yield self._event_serializer.serialize_events(
messages, time_now,
)),
"start": start_token.to_string(),
"end": end_token.to_string(),
},
Expand Down
7 changes: 4 additions & 3 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
)
from synapse.api.room_versions import RoomVersions
from synapse.api.urls import ConsentURIBuilder
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.state import StateFilter
Expand All @@ -57,6 +56,7 @@ def __init__(self, hs):
self.clock = hs.get_clock()
self.state = hs.get_state_handler()
self.store = hs.get_datastore()
self._event_serializer = hs.get_event_client_serializer()

@defer.inlineCallbacks
def get_room_data(self, user_id=None, room_id=None,
Expand Down Expand Up @@ -164,9 +164,10 @@ def get_state_events(
room_state = room_state[membership_event_id]

now = self.clock.time_msec()
defer.returnValue(
[serialize_event(c, now) for c in room_state.values()]
events = yield self._event_serializer.serialize_events(
room_state.values(), now,
)
defer.returnValue(events)

@defer.inlineCallbacks
def get_joined_members(self, requester, room_id):
Expand Down
22 changes: 13 additions & 9 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.events.utils import serialize_event
from synapse.storage.state import StateFilter
from synapse.types import RoomStreamToken
from synapse.util.async_helpers import ReadWriteLock
Expand Down Expand Up @@ -78,6 +77,7 @@ def __init__(self, hs):
self._purges_in_progress_by_room = set()
# map from purge id to PurgeStatus
self._purges_by_id = {}
self._event_serializer = hs.get_event_client_serializer()

def start_purge_history(self, room_id, token,
delete_local_events=False):
Expand Down Expand Up @@ -278,18 +278,22 @@ def get_messages(self, requester, room_id=None, pagin_config=None,
time_now = self.clock.time_msec()

chunk = {
"chunk": [
serialize_event(e, time_now, as_client_event)
for e in events
],
"chunk": (
yield self._event_serializer.serialize_events(
events, time_now,
as_client_event=as_client_event,
)
),
"start": pagin_config.from_token.to_string(),
"end": next_token.to_string(),
}

if state:
chunk["state"] = [
serialize_event(e, time_now, as_client_event)
for e in state
]
chunk["state"] = (
yield self._event_serializer.serialize_events(
state, time_now,
as_client_event=as_client_event,
)
)

defer.returnValue(chunk)
42 changes: 23 additions & 19 deletions synapse/handlers/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
from synapse.events.utils import serialize_event
from synapse.storage.state import StateFilter
from synapse.visibility import filter_events_for_client

Expand All @@ -36,6 +35,7 @@ class SearchHandler(BaseHandler):

def __init__(self, hs):
super(SearchHandler, self).__init__(hs)
self._event_serializer = hs.get_event_client_serializer()

@defer.inlineCallbacks
def get_old_rooms_from_upgraded_room(self, room_id):
Expand Down Expand Up @@ -401,14 +401,16 @@ def search(self, user, content, batch=None):
time_now = self.clock.time_msec()

for context in contexts.values():
context["events_before"] = [
serialize_event(e, time_now)
for e in context["events_before"]
]
context["events_after"] = [
serialize_event(e, time_now)
for e in context["events_after"]
]
context["events_before"] = (
yield self._event_serializer.serialize_events(
context["events_before"], time_now,
)
)
context["events_after"] = (
yield self._event_serializer.serialize_events(
context["events_after"], time_now,
)
)

state_results = {}
if include_state:
Expand All @@ -422,14 +424,13 @@ def search(self, user, content, batch=None):
# We're now about to serialize the events. We should not make any
# blocking calls after this. Otherwise the 'age' will be wrong

results = [
{
results = []
for e in allowed_events:
results.append({
"rank": rank_map[e.event_id],
"result": serialize_event(e, time_now),
"result": (yield self._event_serializer.serialize_event(e, time_now)),
"context": contexts.get(e.event_id, {}),
}
for e in allowed_events
]
})

rooms_cat_res = {
"results": results,
Expand All @@ -438,10 +439,13 @@ def search(self, user, content, batch=None):
}

if state_results:
rooms_cat_res["state"] = {
room_id: [serialize_event(e, time_now) for e in state]
for room_id, state in state_results.items()
}
s = {}
for room_id, state in state_results.items():
s[room_id] = yield self._event_serializer.serialize_events(
state, time_now,
)

rooms_cat_res["state"] = s

if room_groups and "room_id" in group_keys:
rooms_cat_res.setdefault("groups", {})["room_id"] = room_groups
Expand Down
5 changes: 3 additions & 2 deletions synapse/rest/client/v1/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from twisted.internet import defer

from synapse.api.errors import SynapseError
from synapse.events.utils import serialize_event
from synapse.streams.config import PaginationConfig

from .base import ClientV1RestServlet, client_path_patterns
Expand Down Expand Up @@ -84,6 +83,7 @@ def __init__(self, hs):
super(EventRestServlet, self).__init__(hs)
self.clock = hs.get_clock()
self.event_handler = hs.get_event_handler()
self._event_serializer = hs.get_event_client_serializer()

@defer.inlineCallbacks
def on_GET(self, request, event_id):
Expand All @@ -92,7 +92,8 @@ def on_GET(self, request, event_id):

time_now = self.clock.time_msec()
if event:
defer.returnValue((200, serialize_event(event, time_now)))
event = yield self._event_serializer.serialize_event(event, time_now)
defer.returnValue((200, event))
else:
defer.returnValue((404, "Event not found."))

Expand Down
Loading