Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Allow profile updates to happen on workers #3659

Merged
merged 14 commits into from
Aug 22, 2018
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3659.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support profile API endpoints on workers
1 change: 1 addition & 0 deletions docs/workers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ Handles some event creation. It can handle REST endpoints matching::
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$
^/_matrix/client/(api/v1|r0|unstable)/join/
^/_matrix/client/(api/v1|r0|unstable)/profile/

It will create events locally and then send them on to the main synapse
instance to be persisted and handled.
12 changes: 12 additions & 0 deletions synapse/app/event_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.profile import (
ProfileAvatarURLRestServlet,
ProfileDisplaynameRestServlet,
ProfileRestServlet,
)
from synapse.rest.client.v1.room import (
JoinRoomAliasServlet,
RoomMembershipRestServlet,
Expand All @@ -53,6 +58,7 @@
)
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.storage.user_directory import UserDirectoryStore
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
Expand All @@ -62,6 +68,9 @@


class EventCreatorSlavedStore(
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
# rather than going via the correct worker.
UserDirectoryStore,
DirectoryStore,
SlavedTransactionStore,
SlavedProfileStore,
Expand Down Expand Up @@ -101,6 +110,9 @@ def _listen_http(self, listener_config):
RoomMembershipRestServlet(self).register(resource)
RoomStateEventRestServlet(self).register(resource)
JoinRoomAliasServlet(self).register(resource)
ProfileAvatarURLRestServlet(self).register(resource)
ProfileDisplaynameRestServlet(self).register(resource)
ProfileRestServlet(self).register(resource)
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,
Expand Down
21 changes: 14 additions & 7 deletions synapse/handlers/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
logger = logging.getLogger(__name__)


class ProfileHandler(BaseHandler):
class WorkerProfileHandler(BaseHandler):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that this is also used as a base class on the master instance, can you either give it a different name (BaseProfileHandler?) or a docstring that explains the situation?

PROFILE_UPDATE_MS = 60 * 1000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are redundant now

PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000

def __init__(self, hs):
super(ProfileHandler, self).__init__(hs)
super(WorkerProfileHandler, self).__init__(hs)

self.federation = hs.get_federation_client()
hs.get_federation_registry().register_query_handler(
Expand All @@ -46,11 +46,6 @@ def __init__(self, hs):

self.user_directory_handler = hs.get_user_directory_handler()

if hs.config.worker_app is None:
self.clock.looping_call(
self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS,
)

@defer.inlineCallbacks
def get_profile(self, user_id):
target_user = UserID.from_string(user_id)
Expand Down Expand Up @@ -282,6 +277,18 @@ def _update_join_states(self, requester, target_user):
room_id, str(e.message)
)


class MasterProfileHandler(WorkerProfileHandler):
PROFILE_UPDATE_MS = 60 * 1000
PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000

def __init__(self, hs):
super(MasterProfileHandler, self).__init__(hs)

self.clock.looping_call(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we assert that hs.config.worker_app is None here?

self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS,
)

def _start_update_remote_profile_cache(self):
return run_as_background_process(
"Update remote profile", self._update_remote_profile_cache,
Expand Down
4 changes: 4 additions & 0 deletions synapse/handlers/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ def handle_local_profile_change(self, user_id, profile):
"""Called to update index of our local user profiles when they change
irrespective of any rooms the user may be in.
"""
# FIXME(#3714): We should probably do this in the same worker as all
# the other changes.
yield self.store.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url, None,
)
Expand All @@ -127,6 +129,8 @@ def handle_local_profile_change(self, user_id, profile):
def handle_user_deactivated(self, user_id):
"""Called when a user ID is deactivated
"""
# FIXME(#3714): We should probably do this in the same worker as all
# the other changes.
yield self.store.remove_from_user_dir(user_id)
yield self.store.remove_from_user_in_public_room(user_id)

Expand Down
7 changes: 5 additions & 2 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
from synapse.handlers.message import EventCreationHandler, MessageHandler
from synapse.handlers.pagination import PaginationHandler
from synapse.handlers.presence import PresenceHandler
from synapse.handlers.profile import ProfileHandler
from synapse.handlers.profile import MasterProfileHandler, WorkerProfileHandler
from synapse.handlers.read_marker import ReadMarkerHandler
from synapse.handlers.receipts import ReceiptsHandler
from synapse.handlers.room import RoomContextHandler, RoomCreationHandler
Expand Down Expand Up @@ -308,7 +308,10 @@ def build_initial_sync_handler(self):
return InitialSyncHandler(self)

def build_profile_handler(self):
return ProfileHandler(self)
if self.config.worker_app:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we have this return WorkerProfileHandler and then override it in synapse.server.HomeServer? possibly with a comment here to say that's what's going on?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but this is how we do it for other handlers. I'm not sure I really think it'll be clearer moving it into the actual app

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hrm, ok, let's punt it for now, though I'm not sure I agree.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly something we can revisit if/when we try and clean up the worker split out

return WorkerProfileHandler(self)
else:
return MasterProfileHandler(self)

def build_event_creation_handler(self):
return EventCreationHandler(self)
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ def get_from_remote_profile_cache(self, user_id):
desc="get_from_remote_profile_cache",
)


class ProfileStore(ProfileWorkerStore):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there not a danger of races if the *_remote_profile_cache methods are called from a worker?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on closer inspection, it looks like efforts are made to not call those methods from a worker, in which case I think they should probably stay in ProfileStore rather than moving.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woops, fixed

def create_profile(self, user_localpart):
return self._simple_insert(
table="profiles",
Expand All @@ -96,6 +94,8 @@ def set_profile_avatar_url(self, user_localpart, new_avatar_url):
desc="set_profile_avatar_url",
)


class ProfileStore(ProfileWorkerStore):
def add_remote_profile_cache(self, user_id, displayname, avatar_url):
"""Ensure we are caching the remote user's profiles.

Expand Down
58 changes: 29 additions & 29 deletions synapse/storage/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,35 @@ def is_room_blocked(self, room_id):
desc="is_room_blocked",
)

@cachedInlineCallbacks(max_entries=10000)
def get_ratelimit_for_user(self, user_id):
"""Check if there are any overrides for ratelimiting for the given
user

Args:
user_id (str)

Returns:
RatelimitOverride if there is an override, else None. If the contents
of RatelimitOverride are None or 0 then ratelimitng has been
disabled for that user entirely.
"""
row = yield self._simple_select_one(
table="ratelimit_override",
keyvalues={"user_id": user_id},
retcols=("messages_per_second", "burst_count"),
allow_none=True,
desc="get_ratelimit_for_user",
)

if row:
defer.returnValue(RatelimitOverride(
messages_per_second=row["messages_per_second"],
burst_count=row["burst_count"],
))
else:
defer.returnValue(None)


class RoomStore(RoomWorkerStore, SearchStore):

Expand Down Expand Up @@ -469,35 +498,6 @@ def get_all_new_public_rooms(txn):
"get_all_new_public_rooms", get_all_new_public_rooms
)

@cachedInlineCallbacks(max_entries=10000)
def get_ratelimit_for_user(self, user_id):
"""Check if there are any overrides for ratelimiting for the given
user

Args:
user_id (str)

Returns:
RatelimitOverride if there is an override, else None. If the contents
of RatelimitOverride are None or 0 then ratelimitng has been
disabled for that user entirely.
"""
row = yield self._simple_select_one(
table="ratelimit_override",
keyvalues={"user_id": user_id},
retcols=("messages_per_second", "burst_count"),
allow_none=True,
desc="get_ratelimit_for_user",
)

if row:
defer.returnValue(RatelimitOverride(
messages_per_second=row["messages_per_second"],
burst_count=row["burst_count"],
))
else:
defer.returnValue(None)

@defer.inlineCallbacks
def block_room(self, room_id, user_id):
yield self._simple_insert(
Expand Down
4 changes: 2 additions & 2 deletions tests/handlers/test_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import synapse.types
from synapse.api.errors import AuthError
from synapse.handlers.profile import ProfileHandler
from synapse.handlers.profile import MasterProfileHandler
from synapse.types import UserID

from tests import unittest
Expand All @@ -29,7 +29,7 @@

class ProfileHandlers(object):
def __init__(self, hs):
self.profile_handler = ProfileHandler(hs)
self.profile_handler = MasterProfileHandler(hs)


class ProfileTestCase(unittest.TestCase):
Expand Down