Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Improve performance of getting typing updates for replication #3794

Merged
merged 3 commits into from
Sep 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3794.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up calculation of typing updates for replication
22 changes: 19 additions & 3 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from synapse.api.errors import AuthError, SynapseError
from synapse.types import UserID, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
Expand Down Expand Up @@ -68,6 +69,11 @@ def __init__(self, hs):
# map room IDs to sets of users currently typing
self._room_typing = {}

# caches which room_ids changed at which serials
self._typing_stream_change_cache = StreamChangeCache(
"TypingStreamChangeCache", self._latest_room_serial,
)

self.clock.looping_call(
self._handle_timeouts,
5000,
Expand Down Expand Up @@ -274,19 +280,29 @@ def _push_update_local(self, member, typing):

self._latest_room_serial += 1
self._room_serials[member.room_id] = self._latest_room_serial
self._typing_stream_change_cache.entity_has_changed(
member.room_id, self._latest_room_serial,
)

self.notifier.on_new_event(
"typing_key", self._latest_room_serial, rooms=[member.room_id]
)

def get_all_typing_updates(self, last_id, current_id):
# TODO: Work out a way to do this without scanning the entire state.
if last_id == current_id:
return []

changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
last_id,
)

if changed_rooms is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't see why this would happen.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_all_entities_changed can return None if last_id is sufficiently old?

changed_rooms = self._room_serials

rows = []
for room_id, serial in self._room_serials.items():
if last_id < serial and serial <= current_id:
for room_id in changed_rooms:
serial = self._room_serials[room_id]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we bother to update and check room_serials as well as the stream change cache?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly because the stream change caches are bound in size and will drop old rooms. Agreed its a bit wasteful to have both, but I'm not sure the best way of amalgamating them without causing us to send down all typing notifications for old clients.

if last_id < serial <= current_id:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL you can do this in python.

typing = self._room_typing[room_id]
rows.append((serial, room_id, list(typing)))
rows.sort()
Expand Down