Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add all local users to the user_directory and optionally search them #2723

Merged
merged 15 commits into from
Dec 5, 2017
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion synapse/config/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from .push import PushConfig
from .spam_checker import SpamCheckerConfig
from .groups import GroupsConfig
from .user_directory import UserDirectoryConfig


class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
Expand All @@ -44,7 +45,7 @@ class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
AppServiceConfig, KeyConfig, SAML2Config, CasConfig,
JWTConfig, PasswordConfig, EmailConfig,
WorkerConfig, PasswordAuthProviderConfig, PushConfig,
SpamCheckerConfig, GroupsConfig,):
SpamCheckerConfig, GroupsConfig, UserDirectoryConfig,):
pass


Expand Down
44 changes: 44 additions & 0 deletions synapse/config/user_directory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
# Copyright 2017 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from ._base import Config


class UserDirectoryConfig(Config):
"""User Directory Configuration
Configuration for the behaviour of the /user_directory API
"""

def read_config(self, config):
self.user_directory_search_all_users = False
user_directory_config = config.get("user_directory", None)
if user_directory_config:
self.user_directory_search_all_users = (
user_directory_config.get("search_all_users", False)
)

def default_config(self, config_dir_path, server_name, **kwargs):
return """
# User Directory configuration
#
# 'search_all_users' defines whether to search all users visible to your HS
# when searching the user directory, rather than limiting to users visible
# in public rooms. Defaults to false. If you set it True, you'll have to run
# UPDATE user_directory_stream_pos SET stream_id = NULL;
# on your database to tell it to rebuild the user_directory search indexes.
#
#user_directory:
# search_all_users: false
"""
14 changes: 14 additions & 0 deletions synapse/handlers/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ def __init__(self, hs):
"profile", self.on_profile_query
)

self.user_directory_handler = hs.get_user_directory_handler()

self.clock.looping_call(self._update_remote_profile_cache, self.PROFILE_UPDATE_MS)

@defer.inlineCallbacks
Expand Down Expand Up @@ -139,6 +141,12 @@ def set_displayname(self, target_user, requester, new_displayname, by_admin=Fals
target_user.localpart, new_displayname
)

if self.hs.config.user_directory_search_all_users:
profile = yield self.store.get_profileinfo(target_user.localpart)
yield self.user_directory_handler.handle_local_profile_change(
target_user.to_string(), profile
)

yield self._update_join_states(requester, target_user)

@defer.inlineCallbacks
Expand Down Expand Up @@ -183,6 +191,12 @@ def set_avatar_url(self, target_user, requester, new_avatar_url, by_admin=False)
target_user.localpart, new_avatar_url
)

if self.hs.config.user_directory_search_all_users:
profile = yield self.store.get_profileinfo(target_user.localpart)
yield self.user_directory_handler.handle_local_profile_change(
target_user.to_string(), profile
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move the if and fetching of profile into handle_local_profile_change rather than duplicating it everywhere

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mm, point.


yield self._update_join_states(requester, target_user)

@defer.inlineCallbacks
Expand Down
8 changes: 8 additions & 0 deletions synapse/handlers/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(self, hs):
self.auth = hs.get_auth()
self._auth_handler = hs.get_auth_handler()
self.profile_handler = hs.get_profile_handler()
self.user_directory_handler = hs.get_user_directory_handler()
self.captcha_client = CaptchaServerHttpClient(hs)

self._next_generated_user_id = None
Expand Down Expand Up @@ -165,6 +166,13 @@ def register(
),
admin=admin,
)

if self.hs.config.user_directory_search_all_users:
profile = yield self.store.get_profileinfo(localpart)
yield self.user_directory_handler.handle_local_profile_change(
user_id, profile
)

else:
# autogen a sequential user ID
attempts = 0
Expand Down
72 changes: 56 additions & 16 deletions synapse/handlers/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
from synapse.storage.roommember import ProfileInfo
from synapse.util.metrics import Measure
from synapse.util.async import sleep
from synapse.types import get_localpart_from_id


logger = logging.getLogger(__name__)


class UserDirectoyHandler(object):
class UserDirectoryHandler(object):
"""Handles querying of and keeping updated the user_directory.

N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY
Expand All @@ -41,9 +42,10 @@ class UserDirectoyHandler(object):
one public room.
"""

INITIAL_SLEEP_MS = 50
INITIAL_SLEEP_COUNT = 100
INITIAL_BATCH_SIZE = 100
INITIAL_ROOM_SLEEP_MS = 50
INITIAL_ROOM_SLEEP_COUNT = 100
INITIAL_ROOM_BATCH_SIZE = 100
INITIAL_USER_SLEEP_MS = 10

def __init__(self, hs):
self.store = hs.get_datastore()
Expand All @@ -53,6 +55,7 @@ def __init__(self, hs):
self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id
self.update_user_directory = hs.config.update_user_directory
self.search_all_users = hs.config.user_directory_search_all_users

# When start up for the first time we need to populate the user_directory.
# This is a set of user_id's we've inserted already
Expand Down Expand Up @@ -110,6 +113,15 @@ def notify_new_event(self):
finally:
self._is_processing = False

@defer.inlineCallbacks
def handle_local_profile_change(self, user_id, profile):
"""Called to update index of our local user profiles when they change
irrespective of any rooms the user may be in.
"""
yield self.store.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url, None,
)

@defer.inlineCallbacks
def _unsafe_process(self):
# If self.pos is None then means we haven't fetched it from DB
Expand Down Expand Up @@ -148,16 +160,30 @@ def _do_initial_spam(self):
room_ids = yield self.store.get_all_rooms()

logger.info("Doing initial update of user directory. %d rooms", len(room_ids))
num_processed_rooms = 1
num_processed_rooms = 0

for room_id in room_ids:
logger.info("Handling room %d/%d", num_processed_rooms, len(room_ids))
logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
yield self._handle_initial_room(room_id)
num_processed_rooms += 1
yield sleep(self.INITIAL_SLEEP_MS / 1000.)
yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)

logger.info("Processed all rooms.")

if self.search_all_users:
num_processed_users = 0
user_ids = yield self.store.get_all_local_users()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to include appservice users here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also worried that pulling out millions of users is going to be painful.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And its going to take matrix.org more than a day on the default settings to get through all its users

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, point. so your suggestion is to batch? and yes, we probably should be including AS users.

logger.info("Doing initial update of user directory. %d users", len(user_ids))
for user_id in user_ids:
# We add profiles for all users even if they don't match the
# include pattern, just in case we want to change it in future
logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids))
yield self._handle_local_user(user_id)
num_processed_users += 1
yield sleep(self.INITIAL_USER_SLEEP_MS / 1000.)

logger.info("Processed all users")

self.initially_handled_users = None
self.initially_handled_users_in_public = None
self.initially_handled_users_share = None
Expand Down Expand Up @@ -201,8 +227,8 @@ def _handle_initial_room(self, room_id):
to_update = set()
count = 0
for user_id in user_ids:
if count % self.INITIAL_SLEEP_COUNT == 0:
yield sleep(self.INITIAL_SLEEP_MS / 1000.)
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)

if not self.is_mine_id(user_id):
count += 1
Expand All @@ -216,8 +242,8 @@ def _handle_initial_room(self, room_id):
if user_id == other_user_id:
continue

if count % self.INITIAL_SLEEP_COUNT == 0:
yield sleep(self.INITIAL_SLEEP_MS / 1000.)
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
count += 1

user_set = (user_id, other_user_id)
Expand All @@ -237,13 +263,13 @@ def _handle_initial_room(self, room_id):
else:
self.initially_handled_users_share_private_room.add(user_set)

if len(to_insert) > self.INITIAL_BATCH_SIZE:
if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
yield self.store.add_users_who_share_room(
room_id, not is_public, to_insert,
)
to_insert.clear()

if len(to_update) > self.INITIAL_BATCH_SIZE:
if len(to_update) > self.INITIAL_ROOM_BATCH_SIZE:
yield self.store.update_users_who_share_room(
room_id, not is_public, to_update,
)
Expand Down Expand Up @@ -384,15 +410,29 @@ def _handle_room_publicity_change(self, room_id, prev_event_id, event_id, typ):
for user_id in users:
yield self._handle_remove_user(room_id, user_id)

@defer.inlineCallbacks
def _handle_local_user(self, user_id):
"""Adds a new local roomless user into the user_directory_search table.
Used to populate up the user index when we have an
user_directory_search_all_users specified.
"""
logger.debug("Adding new local user to dir, %r", user_id)

profile = yield self.store.get_profileinfo(get_localpart_from_id(user_id))

row = yield self.store.get_user_in_directory(user_id)
if not row:
yield self.store.add_profiles_to_user_dir(None, {user_id: profile})

@defer.inlineCallbacks
def _handle_new_user(self, room_id, user_id, profile):
"""Called when we might need to add user to directory

Args:
room_id (str): room_id that user joined or started being public that
room_id (str): room_id that user joined or started being public
user_id (str)
"""
logger.debug("Adding user to dir, %r", user_id)
logger.debug("Adding new user to dir, %r", user_id)

row = yield self.store.get_user_in_directory(user_id)
if not row:
Expand All @@ -407,7 +447,7 @@ def _handle_new_user(self, room_id, user_id, profile):
if not row:
yield self.store.add_users_to_public_room(room_id, [user_id])
else:
logger.debug("Not adding user to public dir, %r", user_id)
logger.debug("Not adding new user to public dir, %r", user_id)

# Now we update users who share rooms with users. We do this by getting
# all the current users in the room and seeing which aren't already
Expand Down
4 changes: 2 additions & 2 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
from synapse.handlers.initial_sync import InitialSyncHandler
from synapse.handlers.receipts import ReceiptsHandler
from synapse.handlers.read_marker import ReadMarkerHandler
from synapse.handlers.user_directory import UserDirectoyHandler
from synapse.handlers.user_directory import UserDirectoryHandler
from synapse.handlers.groups_local import GroupsLocalHandler
from synapse.handlers.profile import ProfileHandler
from synapse.groups.groups_server import GroupsServerHandler
Expand Down Expand Up @@ -339,7 +339,7 @@ def build_action_generator(self):
return ActionGenerator(self)

def build_user_directory_handler(self):
return UserDirectoyHandler(self)
return UserDirectoryHandler(self)

def build_groups_local_handler(self):
return GroupsLocalHandler(self)
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={},
def _simple_select_one(self, table, keyvalues, retcols,
allow_none=False, desc="_simple_select_one"):
"""Executes a SELECT query on the named table, which is expected to
return a single row, returning a single column from it.
return a single row, returning multiple columns from it.

Args:
table : string giving the table name
Expand Down
27 changes: 27 additions & 0 deletions synapse/storage/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

from twisted.internet import defer

from synapse.storage.roommember import ProfileInfo
from synapse.api.errors import StoreError

from ._base import SQLBaseStore


Expand All @@ -26,6 +29,30 @@ def create_profile(self, user_localpart):
desc="create_profile",
)

@defer.inlineCallbacks
def get_profileinfo(self, user_localpart):
try:
profile = yield self._simple_select_one(
table="profiles",
keyvalues={"user_id": user_localpart},
retcols=("displayname", "avatar_url"),
desc="get_profileinfo",
)
except StoreError, e:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should really be using StoreError as e syntax

if e.code == 404:
# no match
defer.returnValue(ProfileInfo(None, None))
return
else:
raise

defer.returnValue(
ProfileInfo(
avatar_url=profile['avatar_url'],
display_name=profile['displayname'],
)
)

def get_profile_displayname(self, user_localpart):
return self._simple_select_one_onecol(
table="profiles",
Expand Down
35 changes: 35 additions & 0 deletions synapse/storage/schema/delta/46/user_dir_null_room_ids.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/* Copyright 2017 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- change the user_directory table to also cover global local user profiles
-- rather than just profiles within specific rooms.

CREATE TABLE user_directory2 (
user_id TEXT NOT NULL,
room_id TEXT,
display_name TEXT,
avatar_url TEXT
);

INSERT INTO user_directory2(user_id, room_id, display_name, avatar_url)
SELECT user_id, room_id, display_name, avatar_url from user_directory;

DROP TABLE user_directory;
ALTER TABLE user_directory2 RENAME TO user_directory;

-- create indexes after doing the inserts because that's more efficient.
-- it also means we can give it the same name as the old one without renaming.
CREATE INDEX user_directory_room_idx ON user_directory(room_id);
CREATE UNIQUE INDEX user_directory_user_idx ON user_directory(user_id);
Loading