Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimise notifier mk2 #17766

Merged
merged 8 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17765.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Increase performance of the notifier when there are many syncing users.
1 change: 1 addition & 0 deletions changelog.d/17766.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Increase performance of the notifier when there are many syncing users.
132 changes: 77 additions & 55 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from prometheus_client import Counter

from twisted.internet import defer
from twisted.internet.defer import Deferred

from synapse.api.constants import EduTypes, EventTypes, HistoryVisibility, Membership
from synapse.api.errors import AuthError
Expand All @@ -52,6 +53,7 @@
from synapse.metrics import LaterGauge
from synapse.streams.config import PaginationConfig
from synapse.types import (
ISynapseReactor,
JsonDict,
MultiWriterStreamToken,
PersistedEventPosition,
Expand All @@ -61,8 +63,11 @@
StreamToken,
UserID,
)
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.async_helpers import (
timeout_deferred,
)
from synapse.util.metrics import Measure
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_client

if TYPE_CHECKING:
Expand All @@ -89,18 +94,6 @@ def count(func: Callable[[T], bool], it: Iterable[T]) -> int:
return n


class _NotificationListener:
"""This represents a single client connection to the events stream.
The events stream handler will have yielded to the deferred, so to
notify the handler it is sufficient to resolve the deferred.
"""

__slots__ = ["deferred"]

def __init__(self, deferred: "defer.Deferred"):
self.deferred = deferred


class _NotifierUserStream:
"""This represents a user connected to the event stream.
It tracks the most recent stream token for that user.
Expand All @@ -113,59 +106,49 @@ class _NotifierUserStream:

def __init__(
self,
reactor: ISynapseReactor,
user_id: str,
rooms: StrCollection,
current_token: StreamToken,
time_now_ms: int,
):
self.reactor = reactor
self.user_id = user_id
self.rooms = set(rooms)
self.current_token = current_token

# The last token for which we should wake up any streams that have a
# token that comes before it. This gets updated every time we get poked.
# We start it at the current token since if we get any streams
# that have a token from before we have no idea whether they should be
# woken up or not, so lets just wake them up.
self.last_notified_token = current_token
self.current_token = current_token
self.last_notified_ms = time_now_ms

self.notify_deferred: ObservableDeferred[StreamToken] = ObservableDeferred(
defer.Deferred()
)
# Set of listeners that we need to wake up when there has been a change.
self.listeners: Set[Deferred[StreamToken]] = set()

def notify(
def update_and_fetch_deferreds(
self,
stream_key: StreamKeyType,
stream_id: Union[int, RoomStreamToken, MultiWriterStreamToken],
current_token: StreamToken,
time_now_ms: int,
) -> None:
"""Notify any listeners for this user of a new event from an
event source.
) -> Collection["Deferred[StreamToken]"]:
"""Update the stream for this user because of a new event from an
event source, and return the set of deferreds to wake up.

Args:
stream_key: The stream the event came from.
stream_id: The new id for the stream the event came from.
current_token: The new current token.
time_now_ms: The current time in milliseconds.

Returns:
The set of deferreds that need to be called.
"""
self.current_token = self.current_token.copy_and_advance(stream_key, stream_id)
self.last_notified_token = self.current_token
self.current_token = current_token
self.last_notified_ms = time_now_ms
notify_deferred = self.notify_deferred

log_kv(
{
"notify": self.user_id,
"stream": stream_key,
"stream_id": stream_id,
"listeners": self.count_listeners(),
}
)

users_woken_by_stream_counter.labels(stream_key).inc()
listeners = self.listeners
self.listeners = set()

with PreserveLoggingContext():
self.notify_deferred = ObservableDeferred(defer.Deferred())
notify_deferred.callback(self.current_token)
return listeners

def remove(self, notifier: "Notifier") -> None:
"""Remove this listener from all the indexes in the Notifier
Expand All @@ -179,9 +162,9 @@ def remove(self, notifier: "Notifier") -> None:
notifier.user_to_user_stream.pop(self.user_id)

def count_listeners(self) -> int:
return len(self.notify_deferred.observers())
return len(self.listeners)

def new_listener(self, token: StreamToken) -> _NotificationListener:
def new_listener(self, token: StreamToken) -> "Deferred[StreamToken]":
"""Returns a deferred that is resolved when there is a new token
greater than the given token.

Expand All @@ -191,10 +174,17 @@ def new_listener(self, token: StreamToken) -> _NotificationListener:
"""
# Immediately wake up stream if something has already since happened
# since their last token.
if self.last_notified_token != token:
return _NotificationListener(defer.succeed(self.current_token))
else:
return _NotificationListener(self.notify_deferred.observe())
if token != self.current_token:
return defer.succeed(self.current_token)

# Create a new deferred and add it to the set of listeners. We add a
# cancel handler to remove it from the set again, to handle timeouts.
deferred: "Deferred[StreamToken]" = Deferred(
canceller=lambda d: self.listeners.discard(d)
)
self.listeners.add(deferred)

return deferred


@attr.s(slots=True, frozen=True, auto_attribs=True)
Expand Down Expand Up @@ -247,6 +237,7 @@ def __init__(self, hs: "HomeServer"):
# List of callbacks to be notified when a lock is released
self._lock_released_callback: List[Callable[[str, str, str], None]] = []

self.reactor = hs.get_reactor()
self.clock = hs.get_clock()
self.appservice_handler = hs.get_application_service_handler()
self._pusher_pool = hs.get_pusherpool()
Expand Down Expand Up @@ -342,14 +333,25 @@ async def on_un_partial_stated_room(
# Wake up all related user stream notifiers
user_streams = self.room_to_user_streams.get(room_id, set())
time_now_ms = self.clock.time_msec()
current_token = self.event_sources.get_current_token()

listeners: List["Deferred[StreamToken]"] = []
for user_stream in user_streams:
try:
user_stream.notify(
StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms
listeners.extend(
user_stream.update_and_fetch_deferreds(current_token, time_now_ms)
)
except Exception:
logger.exception("Failed to notify listener")

with PreserveLoggingContext():
for listener in listeners:
listener.callback(current_token)

users_woken_by_stream_counter.labels(StreamKeyType.UN_PARTIAL_STATED_ROOMS).inc(
len(user_streams)
)

# Poke the replication so that other workers also see the write to
# the un-partial-stated rooms stream.
self.notify_replication()
Expand Down Expand Up @@ -519,12 +521,16 @@ def on_new_event(
rooms = rooms or []

with Measure(self.clock, "on_new_event"):
user_streams = set()
user_streams: Set[_NotifierUserStream] = set()

log_kv(
{
"waking_up_explicit_users": len(users),
"waking_up_explicit_rooms": len(rooms),
"users": shortstr(users),
"rooms": shortstr(rooms),
"stream": stream_key,
"stream_id": new_token,
}
)

Expand All @@ -544,12 +550,27 @@ def on_new_event(
)

time_now_ms = self.clock.time_msec()
current_token = self.event_sources.get_current_token()
listeners: List["Deferred[StreamToken]"] = []
for user_stream in user_streams:
try:
user_stream.notify(stream_key, new_token, time_now_ms)
listeners.extend(
user_stream.update_and_fetch_deferreds(
current_token, time_now_ms
)
)
except Exception:
logger.exception("Failed to notify listener")

# We resolve all these deferreds in one go so that we only need to
# call `PreserveLoggingContext` once, as it has a bunch of overhead
# (to calculate performance stats)
with PreserveLoggingContext():
for listener in listeners:
listener.callback(current_token)

users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))

self.notify_replication()

# Notify appservices.
Expand Down Expand Up @@ -586,6 +607,7 @@ async def wait_for_events(
if room_ids is None:
room_ids = await self.store.get_rooms_for_user(user_id)
user_stream = _NotifierUserStream(
reactor=self.reactor,
user_id=user_id,
rooms=room_ids,
current_token=current_token,
Expand All @@ -608,8 +630,8 @@ async def wait_for_events(
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
listener = user_stream.new_listener(prev_token)
listener.deferred = timeout_deferred(
listener.deferred,
listener = timeout_deferred(
listener,
(end_time - now) / 1000.0,
self.hs.get_reactor(),
)
Expand All @@ -622,7 +644,7 @@ async def wait_for_events(
)

with PreserveLoggingContext():
await listener.deferred
await listener

log_kv(
{
Expand Down
31 changes: 21 additions & 10 deletions tests/rest/client/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,22 +282,33 @@ def test_sync_backwards_typing(self) -> None:
self.assertEqual(200, channel.code)
next_batch = channel.json_body["next_batch"]

# This should time out! But it does not, because our stream token is
# ahead, and therefore it's saying the typing (that we've actually
# already seen) is new, since it's got a token above our new, now-reset
# stream token.
channel = self.make_request("GET", sync_url % (access_token, next_batch))
self.assertEqual(200, channel.code)
next_batch = channel.json_body["next_batch"]

# Clear the typing information, so that it doesn't think everything is
# in the future.
# in the future. This happens automatically when the typing stream
# resets.
typing._reset()

# Now it SHOULD fail as it never completes!
# Nothing new, so we time out.
with self.assertRaises(TimedOutException):
self.make_request("GET", sync_url % (access_token, next_batch))

# Sync and start typing again.
sync_channel = self.make_request(
"GET", sync_url % (access_token, next_batch), await_result=False
)
self.assertFalse(sync_channel.is_finished())

channel = self.make_request(
"PUT",
typing_url % (room, other_user_id, other_access_token),
b'{"typing": true, "timeout": 30000}',
)
self.assertEqual(200, channel.code)

# Sync should now return.
sync_channel.await_result()
self.assertEqual(200, sync_channel.code)
next_batch = sync_channel.json_body["next_batch"]


class SyncKnockTestCase(KnockingStrippedStateEventHelperMixin):
servlets = [
Expand Down
Loading