Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Stub out ServerNoticesSender on the workers
Browse files Browse the repository at this point in the history
... and have the sync endpoints call it directly rather than obsure indirection
via PresenceHandler
  • Loading branch information
richvdh committed May 22, 2018
1 parent d5dca9a commit 8810685
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 8 deletions.
5 changes: 5 additions & 0 deletions synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(self, hs):

self.notifier = hs.get_notifier()
self.state = hs.get_state_handler()
self._server_notices_sender = hs.get_server_notices_sender()

@defer.inlineCallbacks
@log_function
Expand All @@ -58,6 +59,10 @@ def get_stream(self, auth_user_id, pagin_config, timeout=0,
If `only_keys` is not None, events from keys will be sent down.
"""

# send any outstanding server notices to the user.
yield self._server_notices_sender.on_user_syncing(auth_user_id)

auth_user = UserID.from_string(auth_user_id)
presence_handler = self.hs.get_presence_handler()

Expand Down
4 changes: 0 additions & 4 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ def __init__(self, hs):
self.notifier = hs.get_notifier()
self.federation = hs.get_federation_sender()
self.state = hs.get_state_handler()
self._server_notices_sender = hs.get_server_notices_sender()

federation_registry = hs.get_federation_registry()

Expand Down Expand Up @@ -433,9 +432,6 @@ def user_syncing(self, user_id, affect_presence=True):
last_user_sync_ts=self.clock.time_msec(),
)])

# send any outstanding server notices to the user.
yield self._server_notices_sender.on_user_syncing(user_id)

@defer.inlineCallbacks
def _end():
try:
Expand Down
4 changes: 4 additions & 0 deletions synapse/rest/client/v2_alpha/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def __init__(self, hs):
self.clock = hs.get_clock()
self.filtering = hs.get_filtering()
self.presence_handler = hs.get_presence_handler()
self._server_notices_sender = hs.get_server_notices_sender()

@defer.inlineCallbacks
def on_GET(self, request):
Expand Down Expand Up @@ -149,6 +150,9 @@ def on_GET(self, request):
else:
since_token = None

# send any outstanding server notices to the user.
yield self._server_notices_sender.on_user_syncing(user.to_string())

affect_presence = set_presence != PresenceState.OFFLINE

if affect_presence:
Expand Down
7 changes: 7 additions & 0 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@
)
from synapse.server_notices.server_notices_manager import ServerNoticesManager
from synapse.server_notices.server_notices_sender import ServerNoticesSender
from synapse.server_notices.worker_server_notices_sender import (
WorkerServerNoticesSender,
)
from synapse.state import StateHandler, StateResolutionHandler
from synapse.storage import DataStore
from synapse.streams.events import EventSources
Expand Down Expand Up @@ -403,9 +406,13 @@ def build_federation_registry(self):
return FederationHandlerRegistry()

def build_server_notices_manager(self):
if self.config.worker_app:
raise Exception("Workers cannot send server notices")
return ServerNoticesManager(self)

def build_server_notices_sender(self):
if self.config.worker_app:
return WorkerServerNoticesSender(self)
return ServerNoticesSender(self)

def remove_pusher(self, app_id, push_key, user_id):
Expand Down
8 changes: 4 additions & 4 deletions synapse/server_notices/server_notices_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ def __init__(self, hs):
def on_user_syncing(self, user_id):
"""Called when the user performs a sync operation.
This is only called when /sync (or /events) is called on the synapse
master. In a deployment with synchrotrons, on_user_ip is called
Args:
user_id (str): mxid of user who synced
Expand All @@ -45,14 +42,17 @@ def on_user_syncing(self, user_id):
)

def on_user_ip(self, user_id):
"""Called when a worker process saw a client request.
"""Called on the master when a worker process saw a client request.
Args:
user_id (str): mxid
Returns:
Deferred
"""
# The synchrotrons use a stubbed version of ServerNoticesSender, so
# we check for notices to send to the user in on_user_ip as well as
# in on_user_syncing
return self._consent_server_notices.maybe_send_server_notice_to_user(
user_id,
)
46 changes: 46 additions & 0 deletions synapse/server_notices/worker_server_notices_sender.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer


class WorkerServerNoticesSender(object):
"""Stub impl of ServerNoticesSender which does nothing"""
def __init__(self, hs):
"""
Args:
hs (synapse.server.HomeServer):
"""

def on_user_syncing(self, user_id):
"""Called when the user performs a sync operation.
Args:
user_id (str): mxid of user who synced
Returns:
Deferred
"""
return defer.succeed()

def on_user_ip(self, user_id):
"""Called on the master when a worker process saw a client request.
Args:
user_id (str): mxid
Returns:
Deferred
"""
raise AssertionError("on_user_ip unexpectedly called on worker")

0 comments on commit 8810685

Please sign in to comment.