Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Speed up rebuilding of the user directory for local users #15529

Merged
merged 7 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15529.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up rebuilding of the user directory for local users.
13 changes: 11 additions & 2 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,21 +386,30 @@ def execute_batch(self, sql: str, args: Iterable[Iterable[Any]]) -> None:
self.executemany(sql, args)

def execute_values(
self, sql: str, values: Iterable[Iterable[Any]], fetch: bool = True
self,
sql: str,
values: Iterable[Iterable[Any]],
template: Optional[str] = None,
fetch: bool = True,
) -> List[Tuple]:
"""Corresponds to psycopg2.extras.execute_values. Only available when
using postgres.

The `fetch` parameter must be set to False if the query does not return
rows (e.g. INSERTs).

The `template` is the snippet to merge to every item in argslist to
compose the query.
"""
assert isinstance(self.database_engine, PostgresEngine)
from psycopg2.extras import execute_values

return self._do_execute(
# TODO: is it safe for values to be Iterable[Iterable[Any]] here?
# https://www.psycopg.org/docs/extras.html?highlight=execute_batch#psycopg2.extras.execute_values says values should be Sequence[Sequence]
lambda the_sql: execute_values(self.txn, the_sql, values, fetch=fetch),
lambda the_sql: execute_values(
self.txn, the_sql, values, template=template, fetch=fetch
),
sql,
)

Expand Down
235 changes: 160 additions & 75 deletions synapse/storage/databases/main/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
cast,
)

import attr

try:
# Figure out if ICU support is available for searching users.
import icu
Expand Down Expand Up @@ -66,6 +68,19 @@
TEMP_TABLE = "_temp_populate_user_directory"


@attr.s(auto_attribs=True, frozen=True)
class _UserDirProfile:
"""Helper type for the user directory code for an entry to be inserted into
the directory.
"""

user_id: str

# If the display name or avatar URL are unexpected types, replace with None
display_name: Optional[str] = attr.ib(default=None, converter=non_null_str_or_none)
avatar_url: Optional[str] = attr.ib(default=None, converter=non_null_str_or_none)


class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
# How many records do we calculate before sending it to
# add_users_who_share_private_rooms?
Expand Down Expand Up @@ -381,25 +396,65 @@ def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]:
% (len(users_to_work_on), progress["remaining"])
)

for user_id in users_to_work_on:
if await self.should_include_local_user_in_dir(user_id):
profile = await self.get_profileinfo(get_localpart_from_id(user_id)) # type: ignore[attr-defined]
await self.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url
)

# We've finished processing a user. Delete it from the table.
await self.db_pool.simple_delete_one(
TEMP_TABLE + "_users", {"user_id": user_id}
)
# Update the remaining counter.
progress["remaining"] -= 1
await self.db_pool.runInteraction(
"populate_user_directory",
self.db_pool.updates._background_update_progress_txn,
"populate_user_directory_process_users",
progress,
# First filter down to users we want to insert into the user directory.
users_to_insert = [
user_id
for user_id in users_to_work_on
if await self.should_include_local_user_in_dir(user_id)
]

# Next fetch their profiles. Note that the `user_id` here is the
# *localpart*, and that not all users have profiles.
profile_rows = await self.db_pool.simple_select_many_batch(
table="profiles",
column="user_id",
iterable=[get_localpart_from_id(u) for u in users_to_insert],
retcols=(
"user_id",
"displayname",
"avatar_url",
),
keyvalues={},
desc="populate_user_directory_process_users_get_profiles",
)
profiles = {
f"@{row['user_id']}:{self.server_name}": _UserDirProfile(
f"@{row['user_id']}:{self.server_name}",
row["displayname"],
row["avatar_url"],
)
for row in profile_rows
}

profiles_to_insert = [
profiles.get(user_id) or _UserDirProfile(user_id)
for user_id in users_to_insert
]

# Actually insert the users with their profiles into the directory.
await self.db_pool.runInteraction(
"populate_user_directory_process_users_insertion",
self._update_profiles_in_user_dir_txn,
profiles_to_insert,
)

# We've finished processing the users. Delete it from the table.
await self.db_pool.simple_delete_many(
table=TEMP_TABLE + "_users",
column="user_id",
iterable=users_to_work_on,
keyvalues={},
desc="populate_user_directory_process_users_delete",
)

# Update the remaining counter.
progress["remaining"] -= len(users_to_work_on)
await self.db_pool.runInteraction(
"populate_user_directory",
self.db_pool.updates._background_update_progress_txn,
"populate_user_directory_process_users",
progress,
)

return len(users_to_work_on)

Expand Down Expand Up @@ -584,72 +639,102 @@ async def update_profile_in_user_dir(
Update or add a user's profile in the user directory.
If the user is remote, the profile will be marked as not stale.
"""
# If the display name or avatar URL are unexpected types, replace with None.
display_name = non_null_str_or_none(display_name)
avatar_url = non_null_str_or_none(avatar_url)
await self.db_pool.runInteraction(
"update_profiles_in_user_dir",
self._update_profiles_in_user_dir_txn,
[_UserDirProfile(user_id, display_name, avatar_url)],
)

def _update_profiles_in_user_dir_txn(
self,
txn: LoggingTransaction,
profiles: Sequence[_UserDirProfile],
) -> None:
self.db_pool.simple_upsert_many_txn(
txn,
table="user_directory",
key_names=("user_id",),
key_values=[(p.user_id,) for p in profiles],
value_names=("display_name", "avatar_url"),
value_values=[
(
p.display_name,
p.avatar_url,
)
for p in profiles
],
)

def _update_profile_in_user_dir_txn(txn: LoggingTransaction) -> None:
self.db_pool.simple_upsert_txn(
# Remote users: Make sure the profile is not marked as stale anymore.
remote_users = [
p.user_id for p in profiles if not self.hs.is_mine_id(p.user_id)
]
if remote_users:
self.db_pool.simple_delete_many_txn(
txn,
table="user_directory",
keyvalues={"user_id": user_id},
values={"display_name": display_name, "avatar_url": avatar_url},
table="user_directory_stale_remote_users",
column="user_id",
values=remote_users,
keyvalues={},
)

if not self.hs.is_mine_id(user_id):
# Remote users: Make sure the profile is not marked as stale anymore.
self.db_pool.simple_delete_txn(
txn,
table="user_directory_stale_remote_users",
keyvalues={"user_id": user_id},
if isinstance(self.database_engine, PostgresEngine):
# We weight the localpart most highly, then display name and finally
# server name
template = """
(
%s,
setweight(to_tsvector('simple', %s), 'A')
|| setweight(to_tsvector('simple', %s), 'D')
|| setweight(to_tsvector('simple', COALESCE(%s, '')), 'B')
)
"""

# The display name that goes into the database index.
index_display_name = display_name
if index_display_name is not None:
index_display_name = _filter_text_for_index(index_display_name)

if isinstance(self.database_engine, PostgresEngine):
# We weight the localpart most highly, then display name and finally
# server name
sql = """
INSERT INTO user_directory_search(user_id, vector)
VALUES (?,
setweight(to_tsvector('simple', ?), 'A')
|| setweight(to_tsvector('simple', ?), 'D')
|| setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
"""
txn.execute(
sql,
sql = """
INSERT INTO user_directory_search(user_id, vector)
VALUES ? ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
"""
txn.execute_values(
sql,
[
(
user_id,
get_localpart_from_id(user_id),
get_domain_from_id(user_id),
index_display_name,
),
)
elif isinstance(self.database_engine, Sqlite3Engine):
value = (
"%s %s" % (user_id, index_display_name)
if index_display_name
else user_id
)
self.db_pool.simple_upsert_txn(
txn,
table="user_directory_search",
keyvalues={"user_id": user_id},
values={"value": value},
)
else:
# This should be unreachable.
raise Exception("Unrecognized database engine")
p.user_id,
get_localpart_from_id(p.user_id),
get_domain_from_id(p.user_id),
_filter_text_for_index(p.display_name)
if p.display_name
else None,
)
for p in profiles
],
template=template,
fetch=False,
)
elif isinstance(self.database_engine, Sqlite3Engine):
values = []
for p in profiles:
if p.display_name is not None:
index_display_name = _filter_text_for_index(p.display_name)
value = f"{p.user_id} {index_display_name}"
else:
value = p.user_id

txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
values.append((value,))

await self.db_pool.runInteraction(
"update_profile_in_user_dir", _update_profile_in_user_dir_txn
)
self.db_pool.simple_upsert_many_txn(
txn,
table="user_directory_search",
key_names=("user_id",),
key_values=[(p.user_id,) for p in profiles],
value_names=("value",),
value_values=values,
)
else:
# This should be unreachable.
raise Exception("Unrecognized database engine")

for p in profiles:
txn.call_after(self.get_user_in_directory.invalidate, (p.user_id,))

async def add_users_who_share_private_room(
self, room_id: str, user_id_tuples: Iterable[Tuple[str, str]]
Expand Down